Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,6 @@ demo-app/src/production/play/
video-buddy-server.log
video-buddy-console.log
video-buddy-session.json

# GSD workflow files
.planning/
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ import io.getstream.video.android.core.model.UpdateUserPermissionsData
import io.getstream.video.android.core.model.VideoTrack
import io.getstream.video.android.core.model.toIceServer
import io.getstream.video.android.core.notifications.internal.telecom.TelecomCallController
import io.getstream.video.android.core.socket.common.scope.ClientScope
import io.getstream.video.android.core.socket.common.scope.UserScope
import io.getstream.video.android.core.utils.AtomicUnitCall
import io.getstream.video.android.core.utils.RampValueUpAndDownHelper
import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl
Expand Down Expand Up @@ -153,17 +151,18 @@ public class Call(

internal var reconnectAttepmts = 0
internal val clientImpl = client as StreamVideoClient
internal val scopeProvider: ScopeProvider = ScopeProviderImpl(clientImpl.scope)
internal var scopeProvider: ScopeProvider = ScopeProviderImpl(clientImpl.scope)

// Atomic controls
private var atomicLeave = AtomicUnitCall()

private val logger by taggedLogger("Call:$type:$id")
private val supervisorJob = SupervisorJob()
private var supervisorJob = SupervisorJob()
private var callStatsReportingJob: Job? = null
private var cleanupJob: Job? = null
private var powerManager: PowerManager? = null

internal val scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob)
internal var scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob)

/** The call state contains all state such as the participant list, reactions etc */
val state = CallState(client, this, user, scope)
Expand Down Expand Up @@ -225,15 +224,22 @@ public class Call(
*/
private var sfuSocketReconnectionTime: Long? = null

/**
* Lock for synchronizing join/leave lifecycle operations.
* Protects access to isDestroyed, cleanupJob, and scope re-initialization.
*/
private val lifecycleLock = Any()

/**
* Call has been left and the object is cleaned up and destroyed.
* Must be accessed under [lifecycleLock].
*/
private var isDestroyed = false

/** Session handles all real time communication for video and audio */
internal var session: RtcSession? = null
var sessionId = UUID.randomUUID().toString()
internal val unifiedSessionId = UUID.randomUUID().toString()
internal var unifiedSessionId = UUID.randomUUID().toString()

internal var connectStartTime = 0L
internal var reconnectStartTime = 0L
Expand Down Expand Up @@ -342,11 +348,10 @@ public class Call(
testInstanceProvider.mediaManagerCreator!!.invoke()
} else {
MediaManagerImpl(
clientImpl.context,
this,
scope,
eglBase.eglBaseContext,
clientImpl.callServiceConfigRegistry.get(type).audioUsage,
context = clientImpl.context,
call = this,
eglBaseContext = eglBase.eglBaseContext,
audioUsage = clientImpl.callServiceConfigRegistry.get(type).audioUsage,
) { clientImpl.callServiceConfigRegistry.get(type).audioUsage }
}
}
Expand Down Expand Up @@ -531,10 +536,13 @@ public class Call(

var result: Result<RtcSession>

atomicLeave = AtomicUnitCall()
while (retryCount < 3) {
result = _join(create, createOptions, ring, notify)
if (result is Success) {
// Reset atomicLeave AFTER successful join (after cleanup is complete)
// This prevents race conditions where leave() is called during the join process
atomicLeave = AtomicUnitCall()

// we initialise the camera, mic and other according to local + backend settings
// only when the call is joined to make sure we don't switch and override
// the settings during a call.
Expand Down Expand Up @@ -603,6 +611,12 @@ public class Call(
ring: Boolean = false,
notify: Boolean = false,
): Result<RtcSession> {
// Wait for any pending cleanup to complete before rejoining
// Read cleanupJob under lock to prevent race conditions
val pendingCleanup = synchronized(lifecycleLock) { cleanupJob }
pendingCleanup?.join()
// Note: cleanupJob is cleared by resetScopes() at the end of cleanup

reconnectAttepmts = 0
sfuEvents?.cancel()
sfuListener?.cancel()
Expand Down Expand Up @@ -946,11 +960,21 @@ public class Call(
sfuEvents?.cancel()
state._connection.value = RealtimeConnection.Disconnected
logger.v { "[leave] #ringing; disconnectionReason: $disconnectionReason, call_id = $id" }
if (isDestroyed) {
logger.w { "[leave] #ringing; Call already destroyed, ignoring" }
return@atomicLeave

// Synchronize access to isDestroyed and cleanupJob to prevent race conditions
synchronized(lifecycleLock) {
if (isDestroyed) {
logger.w { "[leave] #ringing; Call already destroyed, ignoring" }
return@atomicLeave
}
isDestroyed = true

// Guard against overwriting cleanupJob if cleanup is already in progress
if (cleanupJob?.isActive == true) {
logger.w { "[leave] Cleanup already in progress, skipping duplicate cleanup" }
return@atomicLeave
}
}
isDestroyed = true

sfuSocketReconnectionTime = null

Expand All @@ -974,16 +998,19 @@ public class Call(

(client as StreamVideoClient).onCallCleanUp(this)

clientImpl.scope.launch {
safeCall {
session?.sfuTracer?.trace(
"leave-call",
"[reason=$reason, error=${disconnectionReason?.message}]",
)
val stats = collectStats()
session?.sendCallStats(stats)
synchronized(lifecycleLock) {
cleanupJob = clientImpl.scope.launch {
safeCall {
session?.sfuTracer?.trace(
"leave-call",
"[reason=$reason, error=${disconnectionReason?.message}]",
)
val stats = collectStats()
session?.sendCallStats(stats)
}
cleanup()
resetScopes()
}
cleanup()
}
}

Expand Down Expand Up @@ -1504,21 +1531,50 @@ public class Call(
fun cleanup() {
// monitor.stop()
session?.cleanup()
shutDownJobsGracefully()
callStatsReportingJob?.cancel()
mediaManager.cleanup() // TODO Rahul, Verify Later: need to check which call has owned the media at the moment(probably use active call)
session = null
// Cleanup the call's scope provider
scopeProvider.cleanup()
}

// This will allow the Rest APIs to be executed which are in queue before leave
private fun shutDownJobsGracefully() {
UserScope(ClientScope()).launch {
supervisorJob.children.forEach { it.join() }
supervisorJob.cancel()
/**
* Resets state to allow the Call to be reusable after leave().
* Generates new session IDs, resets the scopeProvider, clears participants, and resets device statuses.
*
* IMPORTANT: We do NOT recreate [scope] or [supervisorJob] because [CallState] and its
* StateFlows depend on the original scope. The scope lives for the entire lifetime of
* the Call object.
*/
private fun resetScopes() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use mutex to re-init scopes and update isDestroyed to prevent race-condition

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should

logger.d { "[resetScopes] Resetting state to make Call reusable" }

// Reset the destroyed flag and clear cleanupJob under lock to prevent race conditions
synchronized(lifecycleLock) {
isDestroyed = false
cleanupJob = null
}
scope.cancel()

// Generate new session IDs for fresh connection
sessionId = UUID.randomUUID().toString()
unifiedSessionId = UUID.randomUUID().toString()
logger.d { "[resetScopes] New sessionId: $sessionId, unifiedSessionId: $unifiedSessionId" }

// Clear participants to remove stale video/audio tracks from previous session
state.clearParticipants()

// Reset device statuses to NotSelected so they get re-initialized on next join
mediaManager.reset()

// Reset the scope provider to allow reuse
scopeProvider.reset()

// NOTE: We intentionally do NOT recreate supervisorJob or scope here.
// CallState's StateFlows (duration, participants, etc.) use stateIn(scope, ...)
// which captures the scope at initialization. If we recreated scope, those
// StateFlows would become dead and never emit again.

logger.d { "[resetScopes] State reset successfully" }
}

suspend fun ring(): Result<GetCallResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class ScreenShareManager(

private val logger by taggedLogger("Media:ScreenShareManager")

private val _status = MutableStateFlow<DeviceStatus>(DeviceStatus.NotSelected)
internal val _status = MutableStateFlow<DeviceStatus>(DeviceStatus.NotSelected)
val status: StateFlow<DeviceStatus> = _status

public val isEnabled: StateFlow<Boolean> = _status.mapState { it is DeviceStatus.Enabled }
Expand Down Expand Up @@ -550,7 +550,7 @@ class MicrophoneManager(
internal var priorStatus: DeviceStatus? = null

// Exposed state
private val _status = MutableStateFlow<DeviceStatus>(DeviceStatus.NotSelected)
internal val _status = MutableStateFlow<DeviceStatus>(DeviceStatus.NotSelected)

/** The status of the audio */
val status: StateFlow<DeviceStatus> = _status
Expand Down Expand Up @@ -719,6 +719,13 @@ class MicrophoneManager(
setupCompleted = false
}

/**
* Resets the microphone status to NotSelected to allow re-initialization on next join.
*/
internal fun reset() {
_status.value = DeviceStatus.NotSelected
}

fun canHandleDeviceSwitch() = audioUsageProvider.invoke() != AudioAttributes.USAGE_MEDIA

// Internal logic
Expand All @@ -738,40 +745,47 @@ class MicrophoneManager(
audioManager?.allowedCapturePolicy = AudioAttributes.ALLOW_CAPTURE_BY_ALL
}

if (canHandleDeviceSwitch() && !::audioHandler.isInitialized) {
audioHandler = AudioSwitchHandler(
context = mediaManager.context,
preferredDeviceList = listOf(
AudioDevice.BluetoothHeadset::class.java,
AudioDevice.WiredHeadset::class.java,
) + if (preferSpeaker) {
listOf(
AudioDevice.Speakerphone::class.java,
AudioDevice.Earpiece::class.java,
)
} else {
listOf(
AudioDevice.Earpiece::class.java,
AudioDevice.Speakerphone::class.java,
)
},
audioDeviceChangeListener = { devices, selected ->
logger.i { "[audioSwitch] audio devices. selected $selected, available devices are $devices" }

_devices.value = devices.map { it.fromAudio() }
_selectedDevice.value = selected?.fromAudio()

setupCompleted = true

capturedOnAudioDevicesUpdate?.invoke()
capturedOnAudioDevicesUpdate = null
},
)
if (canHandleDeviceSwitch()) {
if (!::audioHandler.isInitialized) {
// First time initialization
audioHandler = AudioSwitchHandler(
context = mediaManager.context,
preferredDeviceList = listOf(
AudioDevice.BluetoothHeadset::class.java,
AudioDevice.WiredHeadset::class.java,
) + if (preferSpeaker) {
listOf(
AudioDevice.Speakerphone::class.java,
AudioDevice.Earpiece::class.java,
)
} else {
listOf(
AudioDevice.Earpiece::class.java,
AudioDevice.Speakerphone::class.java,
)
},
audioDeviceChangeListener = { devices, selected ->
logger.i { "[audioSwitch] audio devices. selected $selected, available devices are $devices" }

_devices.value = devices.map { it.fromAudio() }
_selectedDevice.value = selected?.fromAudio()

setupCompleted = true

capturedOnAudioDevicesUpdate?.invoke()
capturedOnAudioDevicesUpdate = null
},
)

logger.d { "[setup] Calling start on instance $audioHandler" }
audioHandler.start()
logger.d { "[setup] Calling start on instance $audioHandler" }
audioHandler.start()
} else {
// audioHandler exists but was stopped (cleanup was called), restart it
logger.d { "[setup] Restarting audioHandler after cleanup" }
audioHandler.start()
}
} else {
logger.d { "[MediaManager#setup] Usage is MEDIA or audioHandle is already initialized" }
logger.d { "[MediaManager#setup] Usage is MEDIA" }
capturedOnAudioDevicesUpdate?.invoke()
}
}
Expand Down Expand Up @@ -829,7 +843,7 @@ class CameraManager(
private val logger by taggedLogger("Media:CameraManager")

/** The status of the camera. enabled or disabled */
private val _status = MutableStateFlow<DeviceStatus>(DeviceStatus.NotSelected)
internal val _status = MutableStateFlow<DeviceStatus>(DeviceStatus.NotSelected)
public val status: StateFlow<DeviceStatus> = _status

/** Represents whether the camera is enabled */
Expand Down Expand Up @@ -1174,6 +1188,13 @@ class CameraManager(
setupCompleted = false
}

/**
* Resets the camera status to NotSelected to allow re-initialization on next join.
*/
internal fun reset() {
_status.value = DeviceStatus.NotSelected
}

private fun createCameraDeviceWrapper(
id: String,
cameraManager: CameraManager?,
Expand Down Expand Up @@ -1246,15 +1267,21 @@ class CameraManager(
* @see AudioSwitch
* @see BluetoothHeadsetManager
*/
@Suppress("UNUSED_PARAMETER")
class MediaManagerImpl(
val context: Context,
val call: Call,
val scope: CoroutineScope,
// Deprecated: This parameter is no longer used. Scope is now obtained dynamically from call.scope
scope: CoroutineScope? = null,
val eglBaseContext: EglBase.Context,
@Deprecated("Use audioUsageProvider instead", replaceWith = ReplaceWith("audioUsageProvider"))
val audioUsage: Int = defaultAudioUsage,
val audioUsageProvider: (() -> Int) = { audioUsage },
) {
// Use call.scope dynamically to support scope recreation after leave()
val scope: CoroutineScope
get() = call.scope

internal val camera =
CameraManager(this, eglBaseContext, DefaultCameraCharacteristicsValidator())
internal val microphone = MicrophoneManager(this, audioUsage, audioUsageProvider)
Expand Down Expand Up @@ -1377,6 +1404,17 @@ class MediaManagerImpl(
camera.cleanup()
microphone.cleanup()
}

/**
* Resets device statuses to NotSelected to allow re-initialization on next join.
* Should be called after cleanup when preparing for rejoin.
*/
internal fun reset() {
camera._status.value = DeviceStatus.NotSelected
microphone._status.value = DeviceStatus.NotSelected
speaker._status.value = DeviceStatus.NotSelected
screenShare._status.value = DeviceStatus.NotSelected
}
}

fun MediaStreamTrack.trySetEnabled(enabled: Boolean) = safeCall { setEnabled(enabled) }
Loading
Loading