diff --git a/.gitignore b/.gitignore index b21be5dca2..9fde93456d 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,6 @@ demo-app/src/production/play/ video-buddy-server.log video-buddy-console.log video-buddy-session.json + +# GSD workflow files +.planning/ diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index 43b2b9a0b2..f31af78372 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -86,8 +86,6 @@ import io.getstream.video.android.core.model.UpdateUserPermissionsData import io.getstream.video.android.core.model.VideoTrack import io.getstream.video.android.core.model.toIceServer import io.getstream.video.android.core.notifications.internal.telecom.TelecomCallController -import io.getstream.video.android.core.socket.common.scope.ClientScope -import io.getstream.video.android.core.socket.common.scope.UserScope import io.getstream.video.android.core.utils.AtomicUnitCall import io.getstream.video.android.core.utils.RampValueUpAndDownHelper import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl @@ -153,17 +151,18 @@ public class Call( internal var reconnectAttepmts = 0 internal val clientImpl = client as StreamVideoClient - internal val scopeProvider: ScopeProvider = ScopeProviderImpl(clientImpl.scope) + internal var scopeProvider: ScopeProvider = ScopeProviderImpl(clientImpl.scope) // Atomic controls private var atomicLeave = AtomicUnitCall() private val logger by taggedLogger("Call:$type:$id") - private val supervisorJob = SupervisorJob() + private var supervisorJob = SupervisorJob() private var callStatsReportingJob: Job? = null + private var cleanupJob: Job? = null private var powerManager: PowerManager? = null - internal val scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob) + internal var scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob) /** The call state contains all state such as the participant list, reactions etc */ val state = CallState(client, this, user, scope) @@ -225,15 +224,22 @@ public class Call( */ private var sfuSocketReconnectionTime: Long? = null + /** + * Lock for synchronizing join/leave lifecycle operations. + * Protects access to isDestroyed, cleanupJob, and scope re-initialization. + */ + private val lifecycleLock = Any() + /** * Call has been left and the object is cleaned up and destroyed. + * Must be accessed under [lifecycleLock]. */ private var isDestroyed = false /** Session handles all real time communication for video and audio */ internal var session: RtcSession? = null var sessionId = UUID.randomUUID().toString() - internal val unifiedSessionId = UUID.randomUUID().toString() + internal var unifiedSessionId = UUID.randomUUID().toString() internal var connectStartTime = 0L internal var reconnectStartTime = 0L @@ -342,11 +348,10 @@ public class Call( testInstanceProvider.mediaManagerCreator!!.invoke() } else { MediaManagerImpl( - clientImpl.context, - this, - scope, - eglBase.eglBaseContext, - clientImpl.callServiceConfigRegistry.get(type).audioUsage, + context = clientImpl.context, + call = this, + eglBaseContext = eglBase.eglBaseContext, + audioUsage = clientImpl.callServiceConfigRegistry.get(type).audioUsage, ) { clientImpl.callServiceConfigRegistry.get(type).audioUsage } } } @@ -531,10 +536,13 @@ public class Call( var result: Result - atomicLeave = AtomicUnitCall() while (retryCount < 3) { result = _join(create, createOptions, ring, notify) if (result is Success) { + // Reset atomicLeave AFTER successful join (after cleanup is complete) + // This prevents race conditions where leave() is called during the join process + atomicLeave = AtomicUnitCall() + // we initialise the camera, mic and other according to local + backend settings // only when the call is joined to make sure we don't switch and override // the settings during a call. @@ -603,6 +611,12 @@ public class Call( ring: Boolean = false, notify: Boolean = false, ): Result { + // Wait for any pending cleanup to complete before rejoining + // Read cleanupJob under lock to prevent race conditions + val pendingCleanup = synchronized(lifecycleLock) { cleanupJob } + pendingCleanup?.join() + // Note: cleanupJob is cleared by resetScopes() at the end of cleanup + reconnectAttepmts = 0 sfuEvents?.cancel() sfuListener?.cancel() @@ -946,11 +960,21 @@ public class Call( sfuEvents?.cancel() state._connection.value = RealtimeConnection.Disconnected logger.v { "[leave] #ringing; disconnectionReason: $disconnectionReason, call_id = $id" } - if (isDestroyed) { - logger.w { "[leave] #ringing; Call already destroyed, ignoring" } - return@atomicLeave + + // Synchronize access to isDestroyed and cleanupJob to prevent race conditions + synchronized(lifecycleLock) { + if (isDestroyed) { + logger.w { "[leave] #ringing; Call already destroyed, ignoring" } + return@atomicLeave + } + isDestroyed = true + + // Guard against overwriting cleanupJob if cleanup is already in progress + if (cleanupJob?.isActive == true) { + logger.w { "[leave] Cleanup already in progress, skipping duplicate cleanup" } + return@atomicLeave + } } - isDestroyed = true sfuSocketReconnectionTime = null @@ -974,16 +998,19 @@ public class Call( (client as StreamVideoClient).onCallCleanUp(this) - clientImpl.scope.launch { - safeCall { - session?.sfuTracer?.trace( - "leave-call", - "[reason=$reason, error=${disconnectionReason?.message}]", - ) - val stats = collectStats() - session?.sendCallStats(stats) + synchronized(lifecycleLock) { + cleanupJob = clientImpl.scope.launch { + safeCall { + session?.sfuTracer?.trace( + "leave-call", + "[reason=$reason, error=${disconnectionReason?.message}]", + ) + val stats = collectStats() + session?.sendCallStats(stats) + } + cleanup() + resetScopes() } - cleanup() } } @@ -1504,7 +1531,6 @@ public class Call( fun cleanup() { // monitor.stop() session?.cleanup() - shutDownJobsGracefully() callStatsReportingJob?.cancel() mediaManager.cleanup() // TODO Rahul, Verify Later: need to check which call has owned the media at the moment(probably use active call) session = null @@ -1512,13 +1538,43 @@ public class Call( scopeProvider.cleanup() } - // This will allow the Rest APIs to be executed which are in queue before leave - private fun shutDownJobsGracefully() { - UserScope(ClientScope()).launch { - supervisorJob.children.forEach { it.join() } - supervisorJob.cancel() + /** + * Resets state to allow the Call to be reusable after leave(). + * Generates new session IDs, resets the scopeProvider, clears participants, and resets device statuses. + * + * IMPORTANT: We do NOT recreate [scope] or [supervisorJob] because [CallState] and its + * StateFlows depend on the original scope. The scope lives for the entire lifetime of + * the Call object. + */ + private fun resetScopes() { + logger.d { "[resetScopes] Resetting state to make Call reusable" } + + // Reset the destroyed flag and clear cleanupJob under lock to prevent race conditions + synchronized(lifecycleLock) { + isDestroyed = false + cleanupJob = null } - scope.cancel() + + // Generate new session IDs for fresh connection + sessionId = UUID.randomUUID().toString() + unifiedSessionId = UUID.randomUUID().toString() + logger.d { "[resetScopes] New sessionId: $sessionId, unifiedSessionId: $unifiedSessionId" } + + // Clear participants to remove stale video/audio tracks from previous session + state.clearParticipants() + + // Reset device statuses to NotSelected so they get re-initialized on next join + mediaManager.reset() + + // Reset the scope provider to allow reuse + scopeProvider.reset() + + // NOTE: We intentionally do NOT recreate supervisorJob or scope here. + // CallState's StateFlows (duration, participants, etc.) use stateIn(scope, ...) + // which captures the scope at initialization. If we recreated scope, those + // StateFlows would become dead and never emit again. + + logger.d { "[resetScopes] State reset successfully" } } suspend fun ring(): Result { diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt index 373f5424f6..1fadd98bfd 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt @@ -279,7 +279,7 @@ class ScreenShareManager( private val logger by taggedLogger("Media:ScreenShareManager") - private val _status = MutableStateFlow(DeviceStatus.NotSelected) + internal val _status = MutableStateFlow(DeviceStatus.NotSelected) val status: StateFlow = _status public val isEnabled: StateFlow = _status.mapState { it is DeviceStatus.Enabled } @@ -550,7 +550,7 @@ class MicrophoneManager( internal var priorStatus: DeviceStatus? = null // Exposed state - private val _status = MutableStateFlow(DeviceStatus.NotSelected) + internal val _status = MutableStateFlow(DeviceStatus.NotSelected) /** The status of the audio */ val status: StateFlow = _status @@ -719,6 +719,13 @@ class MicrophoneManager( setupCompleted = false } + /** + * Resets the microphone status to NotSelected to allow re-initialization on next join. + */ + internal fun reset() { + _status.value = DeviceStatus.NotSelected + } + fun canHandleDeviceSwitch() = audioUsageProvider.invoke() != AudioAttributes.USAGE_MEDIA // Internal logic @@ -738,40 +745,47 @@ class MicrophoneManager( audioManager?.allowedCapturePolicy = AudioAttributes.ALLOW_CAPTURE_BY_ALL } - if (canHandleDeviceSwitch() && !::audioHandler.isInitialized) { - audioHandler = AudioSwitchHandler( - context = mediaManager.context, - preferredDeviceList = listOf( - AudioDevice.BluetoothHeadset::class.java, - AudioDevice.WiredHeadset::class.java, - ) + if (preferSpeaker) { - listOf( - AudioDevice.Speakerphone::class.java, - AudioDevice.Earpiece::class.java, - ) - } else { - listOf( - AudioDevice.Earpiece::class.java, - AudioDevice.Speakerphone::class.java, - ) - }, - audioDeviceChangeListener = { devices, selected -> - logger.i { "[audioSwitch] audio devices. selected $selected, available devices are $devices" } - - _devices.value = devices.map { it.fromAudio() } - _selectedDevice.value = selected?.fromAudio() - - setupCompleted = true - - capturedOnAudioDevicesUpdate?.invoke() - capturedOnAudioDevicesUpdate = null - }, - ) + if (canHandleDeviceSwitch()) { + if (!::audioHandler.isInitialized) { + // First time initialization + audioHandler = AudioSwitchHandler( + context = mediaManager.context, + preferredDeviceList = listOf( + AudioDevice.BluetoothHeadset::class.java, + AudioDevice.WiredHeadset::class.java, + ) + if (preferSpeaker) { + listOf( + AudioDevice.Speakerphone::class.java, + AudioDevice.Earpiece::class.java, + ) + } else { + listOf( + AudioDevice.Earpiece::class.java, + AudioDevice.Speakerphone::class.java, + ) + }, + audioDeviceChangeListener = { devices, selected -> + logger.i { "[audioSwitch] audio devices. selected $selected, available devices are $devices" } + + _devices.value = devices.map { it.fromAudio() } + _selectedDevice.value = selected?.fromAudio() + + setupCompleted = true + + capturedOnAudioDevicesUpdate?.invoke() + capturedOnAudioDevicesUpdate = null + }, + ) - logger.d { "[setup] Calling start on instance $audioHandler" } - audioHandler.start() + logger.d { "[setup] Calling start on instance $audioHandler" } + audioHandler.start() + } else { + // audioHandler exists but was stopped (cleanup was called), restart it + logger.d { "[setup] Restarting audioHandler after cleanup" } + audioHandler.start() + } } else { - logger.d { "[MediaManager#setup] Usage is MEDIA or audioHandle is already initialized" } + logger.d { "[MediaManager#setup] Usage is MEDIA" } capturedOnAudioDevicesUpdate?.invoke() } } @@ -829,7 +843,7 @@ class CameraManager( private val logger by taggedLogger("Media:CameraManager") /** The status of the camera. enabled or disabled */ - private val _status = MutableStateFlow(DeviceStatus.NotSelected) + internal val _status = MutableStateFlow(DeviceStatus.NotSelected) public val status: StateFlow = _status /** Represents whether the camera is enabled */ @@ -1174,6 +1188,13 @@ class CameraManager( setupCompleted = false } + /** + * Resets the camera status to NotSelected to allow re-initialization on next join. + */ + internal fun reset() { + _status.value = DeviceStatus.NotSelected + } + private fun createCameraDeviceWrapper( id: String, cameraManager: CameraManager?, @@ -1246,15 +1267,21 @@ class CameraManager( * @see AudioSwitch * @see BluetoothHeadsetManager */ +@Suppress("UNUSED_PARAMETER") class MediaManagerImpl( val context: Context, val call: Call, - val scope: CoroutineScope, + // Deprecated: This parameter is no longer used. Scope is now obtained dynamically from call.scope + scope: CoroutineScope? = null, val eglBaseContext: EglBase.Context, @Deprecated("Use audioUsageProvider instead", replaceWith = ReplaceWith("audioUsageProvider")) val audioUsage: Int = defaultAudioUsage, val audioUsageProvider: (() -> Int) = { audioUsage }, ) { + // Use call.scope dynamically to support scope recreation after leave() + val scope: CoroutineScope + get() = call.scope + internal val camera = CameraManager(this, eglBaseContext, DefaultCameraCharacteristicsValidator()) internal val microphone = MicrophoneManager(this, audioUsage, audioUsageProvider) @@ -1377,6 +1404,17 @@ class MediaManagerImpl( camera.cleanup() microphone.cleanup() } + + /** + * Resets device statuses to NotSelected to allow re-initialization on next join. + * Should be called after cleanup when preparing for rejoin. + */ + internal fun reset() { + camera._status.value = DeviceStatus.NotSelected + microphone._status.value = DeviceStatus.NotSelected + speaker._status.value = DeviceStatus.NotSelected + screenShare._status.value = DeviceStatus.NotSelected + } } fun MediaStreamTrack.trySetEnabled(enabled: Boolean) = safeCall { setEnabled(enabled) } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt index bc85d2ac77..f0940758b1 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoBuilder.kt @@ -90,7 +90,7 @@ import java.net.ConnectException * @property audioProcessing The audio processor used for custom modifications to audio data within WebRTC. * @property callServiceConfigRegistry The audio processor used for custom modifications to audio data within WebRTC. * @property leaveAfterDisconnectSeconds The number of seconds to wait before leaving the call after the connection is disconnected. - * @property callUpdatesAfterLeave Whether to update the call state after leaving the call. + * @property callUpdatesAfterLeave [Deprecated] This parameter is no longer needed. Call updates are now always enabled after leave(). * @property connectOnInit Determines whether the socket should automatically connect as soon as a user is set. * If `false`, the connection is established only when explicitly requested or when core SDK features * (such as audio or video calls) are used. @@ -147,6 +147,10 @@ public class StreamVideoBuilder @JvmOverloads constructor( private val appName: String? = null, private val audioProcessing: ManagedAudioProcessingFactory? = null, private val leaveAfterDisconnectSeconds: Long = 30, + @Deprecated( + message = "This parameter is no longer needed. Call updates are now always enabled after leave() to support call reusability.", + level = DeprecationLevel.WARNING, + ) private val callUpdatesAfterLeave: Boolean = false, private val enableStatsReporting: Boolean = true, @InternalStreamVideoApi diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt index eba6d357c4..655544c75c 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt @@ -18,7 +18,6 @@ package io.getstream.video.android.core import android.content.Context import android.media.AudioAttributes -import androidx.collection.LruCache import androidx.lifecycle.Lifecycle import io.getstream.android.push.PushDevice import io.getstream.android.video.generated.models.AcceptCallResponse @@ -176,6 +175,10 @@ internal class StreamVideoClient internal constructor( internal val audioProcessing: ManagedAudioProcessingFactory? = null, internal val leaveAfterDisconnectSeconds: Long = 30, internal val appVersion: String? = null, + @Deprecated( + message = "This parameter is no longer needed. Call updates are now always enabled after leave() to support call reusability.", + level = DeprecationLevel.WARNING, + ) internal val enableCallUpdatesAfterLeave: Boolean = false, internal val enableStatsCollection: Boolean = true, internal val enableStereoForSubscriber: Boolean = true, @@ -202,24 +205,18 @@ internal class StreamVideoClient internal constructor( private val logger by taggedLogger("Call:StreamVideo") private var subscriptions = mutableSetOf() private var calls = mutableMapOf() - private val destroyedCalls = LruCache(maxSize = 100) internal val callSoundAndVibrationPlayer = CallSoundAndVibrationPlayer(context) val socketImpl = coordinatorConnectionModule.socketConnection fun onCallCleanUp(call: Call) { - if (enableCallUpdatesAfterLeave) { - logger.d { "[cleanup] Call updates are required, preserve the instance: ${call.cid}" } - destroyedCalls.put(call.hashCode(), call) - } - logger.d { "[cleanup] Removing call from cache: ${call.cid}" } - calls.remove(call.cid) + logger.d { "[cleanup] Call cleaned up but kept in cache for reuse: ${call.cid}" } + // Call remains in the 'calls' map to allow rejoin and continue receiving updates } override fun cleanup() { // remove all cached calls calls.clear() - destroyedCalls.evictAll() // stop all running coroutines scope.cancel() // call cleanup on the active call @@ -560,7 +557,7 @@ internal class StreamVideoClient internal constructor( // call level subscriptions if (selectedCid.isNotEmpty()) { calls[selectedCid]?.fireEvent(event) - notifyDestroyedCalls(event) + // No need to notify destroyed calls - calls remain in map after leave() } if (selectedCid.isNotEmpty()) { @@ -584,37 +581,7 @@ internal class StreamVideoClient internal constructor( it.session?.handleEvent(event) it.handleEvent(event) } - deliverIntentToDestroyedCalls(event) - } - } - - private fun shouldProcessDestroyedCall(event: VideoEvent, callCid: String): Boolean { - return when (event) { - is WSCallEvent -> event.getCallCID() == callCid - else -> true - } - } - - private fun deliverIntentToDestroyedCalls(event: VideoEvent) { - safeCall { - destroyedCalls.snapshot().forEach { (_, call) -> - call.let { - if (shouldProcessDestroyedCall(event, call.cid)) { - it.state.handleEvent(event) - it.handleEvent(event) - } - } - } - } - } - - private fun notifyDestroyedCalls(event: VideoEvent) { - safeCall { - destroyedCalls.snapshot().forEach { (_, call) -> - if (shouldProcessDestroyedCall(event, call.cid)) { - call.fireEvent(event) - } - } + // No need to deliver to destroyed calls - calls remain in map after leave() } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/audio/AudioHandler.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/audio/AudioHandler.kt index 7c4604d0ac..e24f946c8d 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/audio/AudioHandler.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/audio/AudioHandler.kt @@ -76,11 +76,15 @@ public class AudioSwitchHandler( } override fun stop() { - logger.d { "[stop] no args" } - mainThreadHandler.removeCallbacksAndMessages(null) - mainThreadHandler.post { - audioSwitch?.stop() - audioSwitch = null + synchronized(this) { + logger.d { "[stop] no args" } + mainThreadHandler.removeCallbacksAndMessages(null) + mainThreadHandler.post { + audioSwitch?.stop() + audioSwitch = null + } + // Reset flag to allow restart after stop + isAudioSwitchInitScheduled = false } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt index 28d7b31944..0bc011aecb 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt @@ -40,4 +40,10 @@ internal interface ScopeProvider { * Cleans up resources when the provider is no longer needed. */ fun cleanup() + + /** + * Resets the provider to allow reuse after cleanup. + * This clears the cleanup flag and allows executors to be recreated. + */ + fun reset() } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt index 50a80c90ee..8f39f9829a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt @@ -97,4 +97,10 @@ internal class ScopeProviderImpl( executor?.shutdown() executor = null } + + override fun reset() { + logger.d { "Resetting ScopeProvider to allow reuse" } + isCleanedUp = false + // executor is already null after cleanup, will be recreated on next use + } }