diff --git a/stream-video-android-core/api/stream-video-android-core.api b/stream-video-android-core/api/stream-video-android-core.api index a8a2225420..7a70e7545a 100644 --- a/stream-video-android-core/api/stream-video-android-core.api +++ b/stream-video-android-core/api/stream-video-android-core.api @@ -8131,8 +8131,8 @@ public final class io/getstream/video/android/core/ParticipantState { public final fun component5 ()Lstream/video/sfu/models/ParticipantSource; public final fun component6 ()Ljava/lang/String; public final fun consumeReaction (Lio/getstream/video/android/core/model/Reaction;)V - public final fun copy (Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)Lio/getstream/video/android/core/ParticipantState; - public static synthetic fun copy$default (Lio/getstream/video/android/core/ParticipantState;Ljava/lang/String;Lkotlinx/coroutines/CoroutineScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/ParticipantState; + public final fun copy (Ljava/lang/String;Lio/getstream/video/android/core/coroutines/scopes/RestartableProducerScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;)Lio/getstream/video/android/core/ParticipantState; + public static synthetic fun copy$default (Lio/getstream/video/android/core/ParticipantState;Ljava/lang/String;Lio/getstream/video/android/core/coroutines/scopes/RestartableProducerScope;Lio/getstream/video/android/core/CallActions;Ljava/lang/String;Lstream/video/sfu/models/ParticipantSource;Ljava/lang/String;ILjava/lang/Object;)Lio/getstream/video/android/core/ParticipantState; public fun equals (Ljava/lang/Object;)Z public final fun getAudio ()Lkotlinx/coroutines/flow/StateFlow; public final fun getAudioEnabled ()Lkotlinx/coroutines/flow/StateFlow; diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt index 43b2b9a0b2..a0b4e4c70e 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/Call.kt @@ -23,7 +23,6 @@ import android.os.PowerManager import androidx.annotation.VisibleForTesting import androidx.compose.runtime.Stable import io.getstream.android.video.generated.models.AcceptCallResponse -import io.getstream.android.video.generated.models.AudioSettingsResponse import io.getstream.android.video.generated.models.BlockUserResponse import io.getstream.android.video.generated.models.CallSettingsRequest import io.getstream.android.video.generated.models.CallSettingsResponse @@ -49,18 +48,21 @@ import io.getstream.android.video.generated.models.StopTranscriptionResponse import io.getstream.android.video.generated.models.UnpinResponse import io.getstream.android.video.generated.models.UpdateCallMembersRequest import io.getstream.android.video.generated.models.UpdateCallMembersResponse -import io.getstream.android.video.generated.models.UpdateCallRequest import io.getstream.android.video.generated.models.UpdateCallResponse import io.getstream.android.video.generated.models.UpdateUserPermissionsResponse import io.getstream.android.video.generated.models.VideoEvent -import io.getstream.android.video.generated.models.VideoSettingsResponse import io.getstream.log.taggedLogger -import io.getstream.result.Error import io.getstream.result.Result -import io.getstream.result.Result.Failure -import io.getstream.result.Result.Success import io.getstream.result.flatMap -import io.getstream.video.android.core.audio.StreamAudioDevice +import io.getstream.video.android.core.call.CallApiDelegate +import io.getstream.video.android.core.call.CallCleanupManager +import io.getstream.video.android.core.call.CallEventManager +import io.getstream.video.android.core.call.CallJoinCoordinator +import io.getstream.video.android.core.call.CallMediaManager +import io.getstream.video.android.core.call.CallReInitializer +import io.getstream.video.android.core.call.CallRenderer +import io.getstream.video.android.core.call.CallSessionManager +import io.getstream.video.android.core.call.CallStatsReporter import io.getstream.video.android.core.call.RtcSession import io.getstream.video.android.core.call.audio.InputAudioFilter import io.getstream.video.android.core.call.connection.StreamPeerConnectionFactory @@ -69,61 +71,39 @@ import io.getstream.video.android.core.call.scope.ScopeProvider import io.getstream.video.android.core.call.scope.ScopeProviderImpl import io.getstream.video.android.core.call.utils.SoundInputProcessor import io.getstream.video.android.core.call.video.VideoFilter -import io.getstream.video.android.core.call.video.YuvFrame import io.getstream.video.android.core.closedcaptions.ClosedCaptionsSettings -import io.getstream.video.android.core.events.GoAwayEvent -import io.getstream.video.android.core.events.JoinCallResponseEvent +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope import io.getstream.video.android.core.events.VideoEventListener import io.getstream.video.android.core.internal.InternalStreamVideoApi -import io.getstream.video.android.core.internal.network.NetworkStateProvider -import io.getstream.video.android.core.model.AudioTrack -import io.getstream.video.android.core.model.MuteUsersData import io.getstream.video.android.core.model.PreferredVideoResolution import io.getstream.video.android.core.model.QueriedMembers import io.getstream.video.android.core.model.RejectReason import io.getstream.video.android.core.model.SortField import io.getstream.video.android.core.model.UpdateUserPermissionsData import io.getstream.video.android.core.model.VideoTrack -import io.getstream.video.android.core.model.toIceServer -import io.getstream.video.android.core.notifications.internal.telecom.TelecomCallController -import io.getstream.video.android.core.socket.common.scope.ClientScope -import io.getstream.video.android.core.socket.common.scope.UserScope import io.getstream.video.android.core.utils.AtomicUnitCall import io.getstream.video.android.core.utils.RampValueUpAndDownHelper import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl -import io.getstream.video.android.core.utils.safeCall import io.getstream.video.android.core.utils.safeCallWithDefault -import io.getstream.video.android.core.utils.toQueriedMembers import io.getstream.video.android.model.User import io.getstream.webrtc.android.ui.VideoTextureViewRenderer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job -import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancel -import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.isActive import kotlinx.coroutines.launch -import kotlinx.coroutines.suspendCancellableCoroutine import org.threeten.bp.OffsetDateTime import org.webrtc.EglBase -import org.webrtc.PeerConnection -import org.webrtc.RendererCommon -import org.webrtc.VideoSink import org.webrtc.audio.JavaAudioDeviceModule.AudioSamples -import stream.video.sfu.event.ReconnectDetails import stream.video.sfu.models.ClientCapability import stream.video.sfu.models.TrackType import stream.video.sfu.models.VideoDimension -import stream.video.sfu.models.WebsocketReconnectStrategy import java.util.Collections import java.util.UUID import java.util.concurrent.ConcurrentHashMap -import kotlin.coroutines.resume +import java.util.concurrent.atomic.AtomicBoolean /** * How long do we keep trying to make a full-reconnect (once the SFU signalling WS went down) @@ -151,24 +131,30 @@ public class Call( internal var location: String? = null private var subscriptions = Collections.synchronizedSet(mutableSetOf()) - internal var reconnectAttepmts = 0 internal val clientImpl = client as StreamVideoClient internal val scopeProvider: ScopeProvider = ScopeProviderImpl(clientImpl.scope) // Atomic controls - private var atomicLeave = AtomicUnitCall() + internal var atomicLeave = AtomicUnitCall() private val logger by taggedLogger("Call:$type:$id") - private val supervisorJob = SupervisorJob() + private var callStatsReportingJob: Job? = null private var powerManager: PowerManager? = null - internal val scope = CoroutineScope(clientImpl.scope.coroutineContext + supervisorJob) + internal val restartableProducerScope = RestartableProducerScope() + + private val callReInitializer = CallReInitializer(clientImpl.scope) { + reInitialise() + } + + internal val scope: CoroutineScope + get() = callReInitializer.currentScope /** The call state contains all state such as the participant list, reactions etc */ - val state = CallState(client, this, user, scope) + val state = CallState(client, this, user, restartableProducerScope, { sessionManager }) - private val network by lazy { clientImpl.coordinatorConnectionModule.networkStateProvider } +// private val network by lazy { clientImpl.coordinatorConnectionModule.networkStateProvider } /** Camera gives you access to the local camera */ val camera by lazy(LazyThreadSafetyMode.PUBLICATION) { mediaManager.camera } @@ -176,6 +162,17 @@ public class Call( val speaker by lazy(LazyThreadSafetyMode.PUBLICATION) { mediaManager.speaker } val screenShare by lazy(LazyThreadSafetyMode.PUBLICATION) { mediaManager.screenShare } + private val callMediaManager = CallMediaManager( + this, + { mediaManager }, + { camera }, + { microphone }, + { speaker }, + { screenShare }, + { _peerConnectionFactory }, + { _peerConnectionFactory = null }, + ) + /** The cid is type:id */ val cid = "$type:$id" @@ -189,8 +186,6 @@ public class Call( */ var audioFilter: InputAudioFilter? = null - // val monitor = CallHealthMonitor(this, scope, onIceRecoveryFailed) - private val soundInputProcessor = SoundInputProcessor(thresholdCrossedCallback = { if (!microphone.isEnabled.value) { state.markSpeakingAsMuted() @@ -228,15 +223,15 @@ public class Call( /** * Call has been left and the object is cleaned up and destroyed. */ - private var isDestroyed = false + internal var isDestroyed = AtomicBoolean(false) /** Session handles all real time communication for video and audio */ - internal var session: RtcSession? = null - var sessionId = UUID.randomUUID().toString() internal val unifiedSessionId = UUID.randomUUID().toString() - internal var connectStartTime = 0L - internal var reconnectStartTime = 0L + internal val connectStartTime: Long + get() = sessionManager.connectStartTime + internal val reconnectStartTime: Long + get() = sessionManager.reconnectStartTime /** * EGL base context shared between peerConnectionFactory and mediaManager @@ -267,59 +262,83 @@ public class Call( _peerConnectionFactory = value } - /** - * Checks if the audioBitrateProfile has changed since the factory was created, - * and recreates the factory if needed. This should only be called before joining. - * - * If the factory hasn't been created yet, it will be created with the current profile - * when first accessed, so no recreation is needed. - */ - internal fun ensureFactoryMatchesAudioProfile() { - val factory = _peerConnectionFactory - - // If factory hasn't been created yet, it will be created with current profile automatically - if (factory == null) { - return - } + val events = MutableSharedFlow(extraBufferCapacity = 150) + internal val streamSingleFlightProcessorImpl = + StreamSingleFlightProcessorImpl(restartableProducerScope) + private val callRenderer = CallRenderer() + internal val sessionManager = CallSessionManager( + call = this, + clientImpl = clientImpl, + testInstanceProvider = testInstanceProvider, + streamSingleFlightProcessorImpl, + ) - // Check if current profile differs from the profile used to create the factory - val factoryProfile = factory.audioBitrateProfile - val currentProfile = mediaManager.microphone.audioBitrateProfile.value + internal val session: RtcSession? + get() = sessionManager.session.get() - if (factoryProfile != null && currentProfile != factoryProfile) { - logger.i { - "Audio bitrate profile changed from $factoryProfile to $currentProfile. " + - "Recreating factory before joining." - } - recreateFactoryAndAudioTracks() - } - } + var sessionId: String + get() = sessionManager.sessionId.get() - /** - * Recreates peerConnectionFactory, audioSource, audioTrack, videoSource and videoTrack - * with the current audioBitrateProfile. This should only be called before the call is joined. - */ - internal fun recreateFactoryAndAudioTracks() { - val wasMicrophoneEnabled = microphone.status.value is DeviceStatus.Enabled - val wasCameraEnabled = camera.status.value is DeviceStatus.Enabled + @Deprecated( + message = "Setter kept for binary compatibility. Do not use.", // TODO Rahul ask in Pr Review, whether to mark it deprecated or not + level = DeprecationLevel.ERROR, + ) + set(value) = sessionManager.sessionId.set(value) + + internal val reconnectAttempts: Int + get() = sessionManager.reconnectAttempts + + private val apiDelegate = CallApiDelegate( + clientImpl = clientImpl, + type = type, + id = id, + call = this, + screenShareProvider = { screenShare }, + setScreenTrackCallBack = { sessionManager.session.get()?.setScreenShareTrack() }, + ) + internal val callEventManager = + CallEventManager(events, sessionManager, restartableProducerScope, { subscriptions }) + + internal val callJoinCoordinator = CallJoinCoordinator( + call = this, + client = clientImpl, + callReInitializer = callReInitializer, + streamSingleFlightProcessorImpl = streamSingleFlightProcessorImpl, + onJoinFail = { + sessionManager.session.set(null) + }, + createJoinSession = { create, createOptions, ring, notify -> + sessionManager._join(create, createOptions, ring, notify) + }, + onRejoin = { reason -> sessionManager.rejoin(reason) }, + ) - // Dispose all tracks and sources first - mediaManager.disposeTracksAndSources() + private fun reInitialise() { + logger.d { "[reInitialise]" } + sessionManager.reset() + state._connection.value = RealtimeConnection.Disconnected + atomicLeave = AtomicUnitCall() + scopeProvider.reset() - // Recreate the factory (which will use the new audioBitrateProfile) - recreatePeerConnectionFactory() + // Clear stale video tracks from participants Claude + state.clearSessionState() - // Re-enable tracks if they were enabled - if (wasMicrophoneEnabled) { - // audioTrack will be recreated on next access, then we enable it - microphone.enable(fromUser = false) - } - if (wasCameraEnabled) { - // videoTrack will be recreated on next access, then we enable it - camera.enable(fromUser = false) + with(restartableProducerScope) { + detach() + attach(scope) } } + /** + * Checks if the audioBitrateProfile has changed since the factory was created, + * and recreates the factory if needed. This should only be called before joining. + * + * If the factory hasn't been created yet, it will be created with the current profile + * when first accessed, so no recreation is needed. + */ + internal fun ensureFactoryMatchesAudioProfile() = + callMediaManager.ensureFactoryMatchesAudioProfile() + /** * Recreates peerConnectionFactory with the current audioBitrateProfile. * This should only be called before the call is joined. @@ -344,64 +363,22 @@ public class Call( MediaManagerImpl( clientImpl.context, this, - scope, + restartableProducerScope, eglBase.eglBaseContext, clientImpl.callServiceConfigRegistry.get(type).audioUsage, ) { clientImpl.callServiceConfigRegistry.get(type).audioUsage } } } - - private val listener = object : NetworkStateProvider.NetworkStateListener { - override suspend fun onConnected() { - leaveTimeoutAfterDisconnect?.cancel() - - val elapsedTimeMils = System.currentTimeMillis() - lastDisconnect - logger.d { - "[NetworkStateListener#onConnected] #network; no args, elapsedTimeMils:$elapsedTimeMils, lastDisconnect:$lastDisconnect, reconnectDeadlineMils:$reconnectDeadlineMils" - } - if (lastDisconnect > 0 && elapsedTimeMils < reconnectDeadlineMils) { - logger.d { - "[NetworkStateListener#onConnected] #network; Reconnecting (fast). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${reconnectDeadlineMils / 1000} seconds" - } - fastReconnect("NetworkStateListener#onConnected") - } else { - logger.d { - "[NetworkStateListener#onConnected] #network; Reconnecting (full). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${reconnectDeadlineMils / 1000} seconds" - } - rejoin("NetworkStateListener#onConnected") - } - } - - override suspend fun onDisconnected() { - state._connection.value = RealtimeConnection.Reconnecting - logger.d { - "[NetworkStateListener#onDisconnected] #network; old lastDisconnect:$lastDisconnect, clientImpl.leaveAfterDisconnectSeconds:${clientImpl.leaveAfterDisconnectSeconds}" - } - lastDisconnect = System.currentTimeMillis() - logger.d { - "[NetworkStateListener#onDisconnected] #network; new lastDisconnect:$lastDisconnect" - } - leaveTimeoutAfterDisconnect = scope.launch { - delay(clientImpl.leaveAfterDisconnectSeconds * 1000) - logger.d { - "[NetworkStateListener#onDisconnected] #network; Leaving after being disconnected for ${clientImpl.leaveAfterDisconnectSeconds}" - } - leave() - } - logger.d { "[NetworkStateListener#onDisconnected] #network; at $lastDisconnect" } - } - } - - private var leaveTimeoutAfterDisconnect: Job? = null - private var lastDisconnect = 0L - private var reconnectDeadlineMils: Int = 10_000 - - private var monitorPublisherPCStateJob: Job? = null - private var monitorSubscriberPCStateJob: Job? = null - private var sfuListener: Job? = null - private var sfuEvents: Job? = null - private val streamSingleFlightProcessorImpl = StreamSingleFlightProcessorImpl(scope) - + private val callStatsReporter: CallStatsReporter = CallStatsReporter(this) + private val callCleanupManager = CallCleanupManager( + call = this, + sessionManager = sessionManager, + callApiDelegate = apiDelegate, + client = clientImpl, + mediaManagerProvider = { mediaManager }, + callReInitializer = callReInitializer, + callStatsReporter = callStatsReporter, + ) init { scope.launch { soundInputProcessor.currentAudioLevel.collect { @@ -411,6 +388,7 @@ public class Call( powerManager = safeCallWithDefault(null) { clientImpl.context.getSystemService(POWER_SERVICE) as? PowerManager } + restartableProducerScope.attach(scope) } /** Basic crud operations */ @@ -434,47 +412,17 @@ public class Call( notify: Boolean = false, video: Boolean? = null, ): Result { - val response = if (members != null) { - clientImpl.getOrCreateCallFullMembers( - type = type, - id = id, - members = members, - custom = custom, - settingsOverride = settings, - startsAt = startsAt, - team = team, - ring = ring, - notify = notify, - video = video, - ) - } else { - clientImpl.getOrCreateCall( - type = type, - id = id, - memberIds = memberIds, - custom = custom, - settingsOverride = settings, - startsAt = startsAt, - team = team, - ring = ring, - notify = notify, - video = video, - ) - } - - response.onSuccess { - /** - * Because [CallState.updateFromResponse] reads the value of [ClientState.ringingCall] - */ - if (ring) { - client.state._ringingCall.value = this - } - state.updateFromResponse(it) - if (ring) { - client.state.addRingingCall(this, RingingState.Outgoing()) - } - } - return response + return apiDelegate.create( + memberIds, + members, + custom, + settings, + startsAt, + team, + ring, + notify, + video, + ) } /** Update a call */ @@ -483,16 +431,7 @@ public class Call( settingsOverride: CallSettingsRequest? = null, startsAt: OffsetDateTime? = null, ): Result { - val request = UpdateCallRequest( - custom = custom, - settingsOverride = settingsOverride, - startsAt = startsAt, - ) - val response = clientImpl.updateCall(type, id, request) - response.onSuccess { - state.updateFromResponse(it) - } - return response + return apiDelegate.update(custom, settingsOverride, startsAt) } suspend fun join( @@ -500,72 +439,7 @@ public class Call( createOptions: CreateCallOptions? = null, ring: Boolean = false, notify: Boolean = false, - ): Result { - logger.d { - "[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" - } - val permissionPass = - clientImpl.permissionCheck.checkAndroidPermissionsGroup(clientImpl.context, this) - // Check android permissions and log a warning to make sure developers requested adequate permissions prior to using the call. - if (!permissionPass.first) { - logger.w { - "\n[Call.join()] called without having the required permissions.\n" + - "This will work only if you have [runForegroundServiceForCalls = false] in the StreamVideoBuilder.\n" + - "The reason is that [Call.join()] will by default start an ongoing call foreground service,\n" + - "To start this service and send the appropriate audio/video tracks the permissions are required,\n" + - "otherwise the service will fail to start, resulting in a crash.\n" + - "You can re-define your permissions and their expected state by overriding the [permissionCheck] in [StreamVideoBuilder]\n" - } - } - // if we are a guest user, make sure we wait for the token before running the join flow - clientImpl.guestUserJob?.await() - - // Ensure factory is created with the current audioBitrateProfile before joining - ensureFactoryMatchesAudioProfile() - - // the join flow should retry up to 3 times - // if the error is not permanent - // and fail immediately on permanent errors - state._connection.value = RealtimeConnection.InProgress - var retryCount = 0 - - var result: Result - - atomicLeave = AtomicUnitCall() - while (retryCount < 3) { - result = _join(create, createOptions, ring, notify) - if (result is Success) { - // we initialise the camera, mic and other according to local + backend settings - // only when the call is joined to make sure we don't switch and override - // the settings during a call. - val settings = state.settings.value - if (settings != null) { - updateMediaManagerFromSettings(settings) - } else { - logger.w { - "[join] Call settings were null - this should never happen after a call" + - "is joined. MediaManager will not be initialised with server settings." - } - } - return result - } - if (result is Failure) { - session = null - logger.e { "Join failed with error $result" } - if (isPermanentError(result.value)) { - state._connection.value = RealtimeConnection.Failed(result.value) - return result - } else { - retryCount += 1 - } - } - delay(retryCount - 1 * 1000L) - } - session = null - val errorMessage = "Join failed after 3 retries" - state._connection.value = RealtimeConnection.Failed(errorMessage) - return Failure(value = Error.GenericError(errorMessage)) - } + ): Result = callJoinCoordinator.join(create, createOptions, ring, notify) suspend fun joinAndRing( members: List, @@ -588,150 +462,8 @@ public class Call( } } - internal fun isPermanentError(error: Any): Boolean { - if (error is Error.ThrowableError) { - if (error.message.contains("Unable to resolve host")) { - return false - } - } - return true - } - - internal suspend fun _join( - create: Boolean = false, - createOptions: CreateCallOptions? = null, - ring: Boolean = false, - notify: Boolean = false, - ): Result { - reconnectAttepmts = 0 - sfuEvents?.cancel() - sfuListener?.cancel() - - if (session != null) { - return Failure(Error.GenericError("Call $cid has already been joined")) - } - logger.d { - "[joinInternal] #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" - } - - connectStartTime = System.currentTimeMillis() - - // step 1. call the join endpoint to get a list of SFUs - val locationResult = clientImpl.getCachedLocation() - if (locationResult !is Success) { - return locationResult as Failure - } - location = locationResult.value - - val options = createOptions - ?: if (create) { - CreateCallOptions() - } else { - null - } - val result = joinRequest(options, locationResult.value, ring = ring, notify = notify) - - if (result !is Success) { - return result as Failure - } - val sfuToken = result.value.credentials.token - val sfuUrl = result.value.credentials.server.url - val sfuWsUrl = result.value.credentials.server.wsEndpoint - val iceServers = result.value.credentials.iceServers.map { it.toIceServer() } - try { - session = if (testInstanceProvider.rtcSessionCreator != null) { - testInstanceProvider.rtcSessionCreator!!.invoke() - } else { - RtcSession( - sessionId = this.sessionId, - apiKey = clientImpl.apiKey, - lifecycle = clientImpl.coordinatorConnectionModule.lifecycle, - client = client, - call = this, - sfuUrl = sfuUrl, - sfuWsUrl = sfuWsUrl, - sfuToken = sfuToken, - remoteIceServers = iceServers, - powerManager = powerManager, - ) - } - - session?.let { - state._connection.value = RealtimeConnection.Joined(it) - } - - session?.connect() - } catch (e: Exception) { - return Failure(Error.GenericError(e.message ?: "RtcSession error occurred.")) - } - client.state.setActiveCall(this) - monitorSession(result.value) - return Success(value = session!!) - } - - private fun Call.monitorSession(result: JoinCallResponse) { - sfuEvents?.cancel() - sfuListener?.cancel() - startCallStatsReporting(result.statsOptions.reportingIntervalMs.toLong()) - // listen to Signal WS - sfuEvents = scope.launch { - session?.let { - it.socket.events().collect { event -> - if (event is JoinCallResponseEvent) { - reconnectDeadlineMils = event.fastReconnectDeadlineSeconds * 1000 - logger.d { "[join] #deadline for reconnect is ${reconnectDeadlineMils / 1000} seconds" } - } - } - } - } - monitorPublisherPCStateJob?.cancel() - monitorPublisherPCStateJob = scope.launch { - session?.publisher?.iceState?.collect { - when (it) { - PeerConnection.IceConnectionState.FAILED, PeerConnection.IceConnectionState.DISCONNECTED -> { - session?.publisher?.connection?.restartIce() - } - - else -> { - logger.d { "[monitorPubConnectionState] Ice connection state is $it" } - } - } - } - } - - monitorSubscriberPCStateJob?.cancel() - monitorSubscriberPCStateJob = scope.launch { - session?.subscriber?.iceState?.collect { - when (it) { - PeerConnection.IceConnectionState.FAILED, PeerConnection.IceConnectionState.DISCONNECTED -> { - session?.requestSubscriberIceRestart() - } - - else -> { - logger.d { "[monitorSubConnectionState] Ice connection state is $it" } - } - } - } - } - network.subscribe(listener) - } - - private fun startCallStatsReporting(reportingIntervalMs: Long = 10_000) { - callStatsReportingJob?.cancel() - callStatsReportingJob = scope.launch { - // Wait a bit before we start capturing stats - delay(reportingIntervalMs) - - while (isActive) { - delay(reportingIntervalMs) - session?.sendCallStats( - report = collectStats(), - ) - } - } - } - internal suspend fun collectStats(): CallStatsReport { + val session = sessionManager.session.get() val publisherStats = session?.getPublisherStats() val subscriberStats = session?.getSubscriberStats() state.stats.updateFromRTCStats(publisherStats, isPublisher = true) @@ -758,233 +490,21 @@ public class Call( /** * Fast reconnect to the same SFU with the same participant session. */ - suspend fun fastReconnect(reason: String = "unknown") = schedule("fast") { - logger.d { "[fastReconnect] Reconnecting, reconnectAttepmts:$reconnectAttepmts" } - session?.prepareReconnect() - this@Call.state._connection.value = RealtimeConnection.Reconnecting - if (session != null) { - reconnectStartTime = System.currentTimeMillis() - - val session = session!! - val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo() - val reconnectDetails = ReconnectDetails( - previous_session_id = prevSessionId, - strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_FAST, - announced_tracks = publishingInfo, - subscriptions = subscriptionsInfo, - reconnect_attempt = reconnectAttepmts, - reason = reason, - ) - session.fastReconnect(reconnectDetails) - val oldSessionStats = collectStats() - session.sendCallStats(oldSessionStats) - } else { - logger.d { "[fastReconnect] [RealtimeConnection.Disconnected], call_id:$id" } - this@Call.state._connection.value = RealtimeConnection.Disconnected - } - } + suspend fun fastReconnect(reason: String = "unknown") = sessionManager.fastReconnect(reason) /** * Rejoin a call. Creates a new session and joins as a new participant. */ - suspend fun rejoin(reason: String = "unknown") = schedule("rejoin") { - logger.d { "[rejoin] Rejoining" } - reconnectAttepmts++ - state._connection.value = RealtimeConnection.Reconnecting - location?.let { - reconnectStartTime = System.currentTimeMillis() - - val joinResponse = joinRequest(location = it) - if (joinResponse is Success) { - // switch to the new SFU - val cred = joinResponse.value.credentials - val oldSession = this.session!! - val oldSessionStats = collectStats() - val currentOptions = this.session?.publisher?.currentOptions() - logger.i { "Rejoin SFU ${oldSession?.sfuUrl} to ${cred.server.url}" } - - this.sessionId = UUID.randomUUID().toString() - val (prevSessionId, subscriptionsInfo, publishingInfo) = oldSession.currentSfuInfo() - val reconnectDetails = ReconnectDetails( - previous_session_id = prevSessionId, - strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN, - announced_tracks = publishingInfo, - subscriptions = subscriptionsInfo, - reconnect_attempt = reconnectAttepmts, - reason = reason, - ) - this.state.removeParticipant(prevSessionId) - oldSession.prepareRejoin() - try { - this.session = RtcSession( - clientImpl, - reconnectAttepmts, - powerManager, - this, - sessionId, - clientImpl.apiKey, - clientImpl.coordinatorConnectionModule.lifecycle, - cred.server.url, - cred.server.wsEndpoint, - cred.token, - cred.iceServers.map { ice -> - ice.toIceServer() - }, - ) - this.session?.connect(reconnectDetails, currentOptions) - this.session?.sfuTracer?.trace("rejoin", reason) - oldSession.sendCallStats(oldSessionStats) - oldSession.leaveWithReason("Rejoin :: $reason") - oldSession.cleanup() - monitorSession(joinResponse.value) - } catch (ex: Exception) { - logger.e(ex) { - "[rejoin] Failed to join response with ex: ${ex.message}" - } - state._connection.value = RealtimeConnection.Failed(ex) - } - } else { - logger.e { - "[rejoin] Failed to get a join response ${joinResponse.errorOrNull()}" - } - state._connection.value = RealtimeConnection.Reconnecting - } - } - } + suspend fun rejoin(reason: String = "unknown") = sessionManager.rejoin(reason) /** * Migrate to another SFU. */ - suspend fun migrate() = schedule("migrate") { - logger.d { "[migrate] Migrating" } - state._connection.value = RealtimeConnection.Migrating - location?.let { - reconnectStartTime = System.currentTimeMillis() - - val joinResponse = joinRequest(location = it) - if (joinResponse is Success) { - // switch to the new SFU - val cred = joinResponse.value.credentials - val session = this.session!! - val currentOptions = this.session?.publisher?.currentOptions() - val oldSfuUrl = session.sfuUrl - logger.i { "Rejoin SFU $oldSfuUrl to ${cred.server.url}" } - - this.sessionId = UUID.randomUUID().toString() - val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo() - val reconnectDetails = ReconnectDetails( - previous_session_id = prevSessionId, - strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_MIGRATE, - announced_tracks = publishingInfo, - subscriptions = subscriptionsInfo, - from_sfu_id = oldSfuUrl, - reconnect_attempt = reconnectAttepmts, - ) - session.prepareRejoin() - try { - val newSession = RtcSession( - clientImpl, - reconnectAttepmts, - powerManager, - this, - sessionId, - clientImpl.apiKey, - clientImpl.coordinatorConnectionModule.lifecycle, - cred.server.url, - cred.server.wsEndpoint, - cred.token, - cred.iceServers.map { ice -> - ice.toIceServer() - }, - ) - val oldSession = this.session - this.session = newSession - this.session?.connect(reconnectDetails, currentOptions) - monitorSession(joinResponse.value) - oldSession?.leaveWithReason("migrating") - oldSession?.cleanup() - } catch (ex: Exception) { - logger.e(ex) { - "[switchSfu] Failed to join during " + - "migration - Error ${ex.message}" - } - state._connection.value = RealtimeConnection.Failed(ex) - } - } else { - logger.e { - "[switchSfu] Failed to get a join response during " + - "migration - falling back to reconnect. Error ${joinResponse.errorOrNull()}" - } - state._connection.value = RealtimeConnection.Reconnecting - } - } - } - - private var reconnectJob: Job? = null - - private suspend fun schedule(key: String, block: suspend () -> Unit) { - logger.d { "[schedule] #reconnect; no args" } - - streamSingleFlightProcessorImpl.run(key, block) - } + suspend fun migrate() = sessionManager.migrate() /** Leave the call, but don't end it for other users */ fun leave(reason: String = "user") { - logger.d { "[leave] #ringing; no args, call_cid:$cid" } - internalLeave(null, reason) - } - - private fun internalLeave(disconnectionReason: Throwable?, reason: String) = atomicLeave { - monitorSubscriberPCStateJob?.cancel() - monitorPublisherPCStateJob?.cancel() - monitorPublisherPCStateJob = null - monitorSubscriberPCStateJob = null - session?.leaveWithReason("[reason=$reason, error=${disconnectionReason?.message}]") - leaveTimeoutAfterDisconnect?.cancel() - network.unsubscribe(listener) - sfuListener?.cancel() - sfuEvents?.cancel() - state._connection.value = RealtimeConnection.Disconnected - logger.v { "[leave] #ringing; disconnectionReason: $disconnectionReason, call_id = $id" } - if (isDestroyed) { - logger.w { "[leave] #ringing; Call already destroyed, ignoring" } - return@atomicLeave - } - isDestroyed = true - - sfuSocketReconnectionTime = null - - /** - * TODO Rahul, need to check which call has owned the media at the moment(probably use active call) - */ - stopScreenSharing() - camera.disable() - microphone.disable() - - if (id == client.state.activeCall.value?.id) { - client.state.removeActiveCall(this) // Will also stop CallService - } - - if (id == client.state.ringingCall.value?.id) { - client.state.removeRingingCall(this) - } - - TelecomCallController(client.context) - .leaveCall(this) - - (client as StreamVideoClient).onCallCleanUp(this) - - clientImpl.scope.launch { - safeCall { - session?.sfuTracer?.trace( - "leave-call", - "[reason=$reason, error=${disconnectionReason?.message}]", - ) - val stats = collectStats() - session?.sendCallStats(stats) - } - cleanup() - } + callCleanupManager.leave(reason) } /** ends the call for yourself as well as other users */ @@ -1018,31 +538,13 @@ public class Call( limit: Int = 25, prev: String? = null, next: String? = null, - ): Result { - return clientImpl.queryMembersInternal( - type = type, - id = id, - filter = filter, - sort = sort, - prev = prev, - next = next, - limit = limit, - ).onSuccess { state.updateFromResponse(it) }.map { it.toQueriedMembers() } - } + ): Result = apiDelegate.queryMembers(filter, sort, limit, prev, next) suspend fun muteAllUsers( audio: Boolean = true, video: Boolean = false, screenShare: Boolean = false, - ): Result { - val request = MuteUsersData( - muteAllUsers = true, - audio = audio, - video = video, - screenShare = screenShare, - ) - return clientImpl.muteUsers(type, id, request) - } + ): Result = apiDelegate.muteAllUsers(audio, video, screenShare) fun setVisibility( sessionId: String, @@ -1053,6 +555,7 @@ public class Call( logger.i { "[setVisibility] #track; #sfu; viewportId: $viewportId, sessionId: $sessionId, trackType: $trackType, visible: $visible" } + val session = sessionManager.session.get() session?.updateTrackDimensions( sessionId, trackType, @@ -1073,6 +576,7 @@ public class Call( logger.i { "[setVisibility] #track; #sfu; viewportId: $viewportId, sessionId: $sessionId, trackType: $trackType, visible: $visible" } + val session = sessionManager.session.get() session?.updateTrackDimensions( sessionId, trackType, @@ -1083,14 +587,7 @@ public class Call( } fun handleEvent(event: VideoEvent) { - logger.v { "[call handleEvent] #sfu; event.type: ${event.getEventType()}" } - - when (event) { - is GoAwayEvent -> - scope.launch { - migrate() - } - } + callEventManager.handleEvent(event) } // TODO: review this @@ -1107,61 +604,15 @@ public class Call( trackType: TrackType, onRendered: (VideoTextureViewRenderer) -> Unit = {}, viewportId: String = sessionId, - ) { - logger.d { "[initRenderer] #sfu; #track; sessionId: $sessionId" } - - // Note this comes from the shared eglBase - videoRenderer.init( - eglBase.eglBaseContext, - object : RendererCommon.RendererEvents { - override fun onFirstFrameRendered() { - val width = videoRenderer.measuredWidth - val height = videoRenderer.measuredHeight - logger.i { - "[initRenderer.onFirstFrameRendered] #sfu; #track; " + - "trackType: $trackType, dimension: ($width - $height), " + - "sessionId: $sessionId" - } - if (trackType != TrackType.TRACK_TYPE_SCREEN_SHARE) { - session?.updateTrackDimensions( - sessionId, - trackType, - true, - VideoDimension(width, height), - viewportId, - ) - } - onRendered(videoRenderer) - } - - override fun onFrameResolutionChanged( - videoWidth: Int, - videoHeight: Int, - rotation: Int, - ) { - val width = videoRenderer.measuredWidth - val height = videoRenderer.measuredHeight - logger.v { - "[initRenderer.onFrameResolutionChanged] #sfu; #track; " + - "trackType: $trackType, " + - "viewport size: ($width - $height), " + - "video size: ($videoWidth - $videoHeight), " + - "sessionId: $sessionId" - } - - if (trackType != TrackType.TRACK_TYPE_SCREEN_SHARE) { - session?.updateTrackDimensions( - sessionId, - trackType, - true, - VideoDimension(width, height), - viewportId, - ) - } - } - }, - ) - } + ) = callRenderer.initRenderer( + videoRenderer, + sessionId, + trackType, + eglBase, + sessionManager.session.get(), + onRendered, + viewportId, + ) /** * Enables the provided client capabilities. @@ -1185,24 +636,9 @@ public class Call( startHls: Boolean = false, startRecording: Boolean = false, startTranscription: Boolean = false, - ): Result { - val result = clientImpl.goLive( - type = type, - id = id, - startHls = startHls, - startRecording = startRecording, - startTranscription = startTranscription, - ) - result.onSuccess { state.updateFromResponse(it) } - - return result - } + ): Result = apiDelegate.goLive(startHls, startRecording, startTranscription) - suspend fun stopLive(): Result { - val result = clientImpl.stopLive(type, id) - result.onSuccess { state.updateFromResponse(it) } - return result - } + suspend fun stopLive(): Result = apiDelegate.stopLive() suspend fun sendCustomEvent(data: Map): Result { return clientImpl.sendCustomEvent(this.type, this.id, data) @@ -1233,29 +669,13 @@ public class Call( fun startScreenSharing( mediaProjectionPermissionResultData: Intent, includeAudio: Boolean = false, - ) { - if (state.ownCapabilities.value.contains(OwnCapability.Screenshare)) { - session?.setScreenShareTrack() - screenShare.enable(mediaProjectionPermissionResultData, includeAudio = includeAudio) - } else { - logger.w { "Can't start screen sharing - user doesn't have wnCapability.Screenshare permission" } - } - } + ): Unit = apiDelegate.startScreenSharing(mediaProjectionPermissionResultData, includeAudio) - fun stopScreenSharing() { - screenShare.disable(fromUser = true) - } + fun stopScreenSharing(): Unit = apiDelegate.stopScreenSharing() - suspend fun startHLS(): Result { - return clientImpl.startBroadcasting(type, id) - .onSuccess { - state.updateFromResponse(it) - } - } + suspend fun startHLS(): Result = apiDelegate.startHLS() - suspend fun stopHLS(): Result { - return clientImpl.stopBroadcasting(type, id) - } + suspend fun stopHLS(): Result = apiDelegate.stopHLS() public fun subscribeFor( vararg eventTypes: Class, @@ -1329,8 +749,6 @@ public class Call( return clientImpl.updateMembers(type, id, request) } - val events = MutableSharedFlow(extraBufferCapacity = 150) - fun fireEvent(event: VideoEvent) = synchronized(subscriptions) { subscriptions.forEach { sub -> if (!sub.isDisposed) { @@ -1353,68 +771,8 @@ public class Call( } } - private fun monitorHeadset() { - microphone.devices.onEach { availableDevices -> - logger.d { - "[monitorHeadset] new available devices, prev selected: ${microphone.nonHeadsetFallbackDevice}" - } - - val bluetoothHeadset = - availableDevices.find { it is StreamAudioDevice.BluetoothHeadset } - val wiredHeadset = availableDevices.find { it is StreamAudioDevice.WiredHeadset } - - if (bluetoothHeadset != null) { - logger.d { "[monitorHeadset] BT headset selected" } - microphone.select(bluetoothHeadset) - } else if (wiredHeadset != null) { - logger.d { "[monitorHeadset] wired headset found" } - microphone.select(wiredHeadset) - } else { - logger.d { "[monitorHeadset] no headset found" } - - microphone.nonHeadsetFallbackDevice?.let { deviceBeforeHeadset -> - logger.d { "[monitorHeadset] before device selected" } - microphone.select(deviceBeforeHeadset) - } - } - }.launchIn(scope) - } - - private fun updateMediaManagerFromSettings(callSettings: CallSettingsResponse) { - // Speaker - if (speaker.status.value is DeviceStatus.NotSelected) { - val enableSpeaker = - if (callSettings.video.cameraDefaultOn || camera.status.value is DeviceStatus.Enabled) { - // if camera is enabled then enable speaker. Eventually this should - // be a new audio.defaultDevice setting returned from backend - true - } else { - callSettings.audio.defaultDevice == AudioSettingsResponse.DefaultDevice.Speaker || - callSettings.audio.speakerDefaultOn - } - - speaker.setEnabled(enabled = enableSpeaker) - } - - monitorHeadset() - - // Camera - if (camera.status.value is DeviceStatus.NotSelected) { - val defaultDirection = - if (callSettings.video.cameraFacing == VideoSettingsResponse.CameraFacing.Front) { - CameraDirection.Front - } else { - CameraDirection.Back - } - camera.setDirection(defaultDirection) - camera.setEnabled(callSettings.video.cameraDefaultOn) - } - - // Mic - if (microphone.status.value == DeviceStatus.NotSelected) { - val enabled = callSettings.audio.micDefaultOn - microphone.setEnabled(enabled) - } + internal fun updateMediaManagerFromSettings(callSettings: CallSettingsResponse) { + callMediaManager.updateMediaManagerFromSettings(callSettings) } /** @@ -1447,32 +805,14 @@ public class Call( audio: Boolean = true, video: Boolean = false, screenShare: Boolean = false, - ): Result { - val request = MuteUsersData( - users = listOf(userId), - muteAllUsers = false, - audio = audio, - video = video, - screenShare = screenShare, - ) - return clientImpl.muteUsers(type, id, request) - } + ): Result = apiDelegate.muteUser(userId, audio, video, screenShare) suspend fun muteUsers( userIds: List, audio: Boolean = true, video: Boolean = false, screenShare: Boolean = false, - ): Result { - val request = MuteUsersData( - users = userIds, - muteAllUsers = false, - audio = audio, - video = video, - screenShare = screenShare, - ) - return clientImpl.muteUsers(type, id, request) - } + ): Result = apiDelegate.muteUsers(userIds, audio, video, screenShare) @VisibleForTesting internal suspend fun joinRequest( @@ -1481,44 +821,29 @@ public class Call( migratingFrom: String? = null, ring: Boolean = false, notify: Boolean = false, - ): Result { - val result = clientImpl.joinCall( - type, id, - create = create != null, - members = create?.memberRequestsFromIds(), - custom = create?.custom, - settingsOverride = create?.settings, - startsAt = create?.startsAt, - team = create?.team, - ring = ring, - notify = notify, - location = location, - migratingFrom = migratingFrom, - ) - result.onSuccess { - state.updateFromResponse(it) - } - return result - } + ): Result = apiDelegate.joinRequest( + create, + location, + migratingFrom, + ring, + notify, + ) fun cleanup() { + val session = sessionManager.session.get() // monitor.stop() session?.cleanup() shutDownJobsGracefully() callStatsReportingJob?.cancel() mediaManager.cleanup() // TODO Rahul, Verify Later: need to check which call has owned the media at the moment(probably use active call) - session = null + sessionManager.session.set(null) // Cleanup the call's scope provider scopeProvider.cleanup() } // This will allow the Rest APIs to be executed which are in queue before leave private fun shutDownJobsGracefully() { - UserScope(ClientScope()).launch { - supervisorJob.children.forEach { it.join() } - supervisorJob.cancel() - } - scope.cancel() + callCleanupManager.shutDownJobsGracefully() } suspend fun ring(): Result { @@ -1536,13 +861,7 @@ public class Call( return clientImpl.notify(type, id) } - suspend fun accept(): Result { - logger.d { "[accept] #ringing; no args, call_id:$id" } - state.acceptedOnThisDevice = true - - clientImpl.state.transitionToAcceptCall(this) - return clientImpl.accept(type, id) - } + suspend fun accept(): Result = apiDelegate.accept() /** * Should outlive both the call scope and the service scope and needs to be executed in the client-level scope. @@ -1571,43 +890,9 @@ public class Call( rating: Int, reason: String? = null, custom: Map? = null, - ) { - scope.launch { - clientImpl.collectFeedback( - callType = type, - id = id, - sessionId = sessionId, - rating = rating, - reason = reason, - custom = custom, - ) - } - } - - suspend fun takeScreenshot(track: VideoTrack): Bitmap? { - return suspendCancellableCoroutine { continuation -> - var screenshotSink: VideoSink? = null - screenshotSink = VideoSink { - // make sure we stop after first frame is delivered - if (!continuation.isActive) { - return@VideoSink - } - it.retain() - val bitmap = YuvFrame.bitmapFromVideoFrame(it) - it.release() - - // This has to be launched asynchronously - removing the sink on the - // same thread as the videoframe is delivered will lead to a deadlock - // (needs investigation why) - scope.launch { - track.video.removeSink(screenshotSink) - } - continuation.resume(bitmap) - } + ): Unit = apiDelegate.collectUserFeedback(rating, reason, custom) - track.video.addSink(screenshotSink) - } - } + suspend fun takeScreenshot(track: VideoTrack): Bitmap? = apiDelegate.takeScreenshot(track) fun isPinnedParticipant(sessionId: String): Boolean = state.pinnedParticipants.value.containsKey( @@ -1673,6 +958,7 @@ public class Call( resolution: PreferredVideoResolution?, sessionIds: List? = null, ) { + val session = sessionManager.session.get() session?.let { session -> session.trackOverridesHandler.updateOverrides( sessionIds = sessionIds, @@ -1688,7 +974,10 @@ public class Call( * @param sessionIds The participant session IDs to enable/disable the video feed for. If `null`, the setting will be applied to all participants. */ fun setIncomingVideoEnabled(enabled: Boolean?, sessionIds: List? = null) { - session?.trackOverridesHandler?.updateOverrides(sessionIds, visible = enabled) + sessionManager.session.get()?.trackOverridesHandler?.updateOverrides( + sessionIds, + visible = enabled, + ) } /** @@ -1702,18 +991,8 @@ public class Call( * @param sessionIds Optional list of participant session IDs for which to toggle incoming audio. * If `null`, the audio setting is applied to all participants currently in the session. */ - fun setIncomingAudioEnabled(enabled: Boolean, sessionIds: List? = null) { - val participantTrackMap = session?.subscriber?.tracks ?: return - - val targetTracks = when { - sessionIds != null -> sessionIds.mapNotNull { participantTrackMap[it] } - else -> participantTrackMap.values.toList() - } - - targetTracks - .mapNotNull { it[TrackType.TRACK_TYPE_AUDIO] as? AudioTrack } - .forEach { it.enableAudio(enabled) } - } + fun setIncomingAudioEnabled(enabled: Boolean, sessionIds: List? = null) = + callMediaManager.setIncomingAudioEnabled(sessionManager.session.get(), enabled, sessionIds) @InternalStreamVideoApi public val debug = Debug(this) @@ -1722,11 +1001,11 @@ public class Call( public class Debug(val call: Call) { public fun pause() { - call.session?.subscriber?.disable() + call.sessionManager.session.get()?.subscriber?.disable() } public fun resume() { - call.session?.subscriber?.enable() + call.sessionManager.session.get()?.subscriber?.enable() } public fun rejoin() { @@ -1736,11 +1015,11 @@ public class Call( } public fun restartSubscriberIce() { - call.session?.subscriber?.connection?.restartIce() + call.sessionManager.session.get()?.subscriber?.connection?.restartIce() } public fun restartPublisherIce() { - call.session?.publisher?.connection?.restartIce() + call.sessionManager.session.get()?.publisher?.connection?.restartIce() } fun migrate() { diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt index d62863b755..3ae01d48bb 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallState.kt @@ -83,9 +83,11 @@ import io.getstream.android.video.generated.models.UpdatedCallPermissionsEvent import io.getstream.android.video.generated.models.VideoEvent import io.getstream.log.taggedLogger import io.getstream.result.Result -import io.getstream.video.android.core.call.RtcSession +import io.getstream.video.android.core.call.CallSessionManager import io.getstream.video.android.core.closedcaptions.ClosedCaptionManager import io.getstream.video.android.core.closedcaptions.ClosedCaptionsSettings +import io.getstream.video.android.core.coroutines.flows.RestartableStateFlow +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope import io.getstream.video.android.core.events.AudioLevelChangedEvent import io.getstream.video.android.core.events.CallEndedSfuEvent import io.getstream.video.android.core.events.ChangePublishQualityEvent @@ -145,7 +147,6 @@ import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.stateIn import kotlinx.coroutines.flow.transform import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -165,44 +166,6 @@ import kotlin.time.Duration import kotlin.time.DurationUnit import kotlin.time.toDuration -@Stable -public sealed interface RealtimeConnection { - /** - * We start out in the PreJoin state. This is before call.join is called - */ - public data object PreJoin : RealtimeConnection - - /** - * Join is in progress - */ - public data object InProgress : RealtimeConnection - - /** - * We set the state to Joined as soon as the call state is available - */ - public data class Joined(val session: RtcSession) : - RealtimeConnection // joined, participant state is available, you can render the call. Video isn't ready yet - - /** - * True when the peer connections are ready - */ - public data object Connected : - RealtimeConnection // connected to RTC, able to receive and send video - - /** - * Reconnecting is true whenever Rtc isn't available and trying to recover - * If the subscriber peer connection breaks we'll reconnect - * If the publisher peer connection breaks we'll reconnect - * Also if the network provider from the OS says that internet is down we'll set it to reconnecting - */ - public data object Reconnecting : - RealtimeConnection // reconnecting to recover from temporary issues - - public data object Migrating : RealtimeConnection - public data class Failed(val error: Any) : RealtimeConnection // permanent failure - public data object Disconnected : RealtimeConnection // normal disconnect by the app -} - /** * The CallState class keeps all state for a call * It's available on every call object @@ -216,17 +179,33 @@ public sealed interface RealtimeConnection { * */ @Stable -public class CallState( +public class CallState internal constructor( private val client: StreamVideo, private val call: Call, private val user: User, - @InternalStreamVideoApi - val scope: CoroutineScope, + private val restartableProducerScope: RestartableProducerScope, + private val sessionManager: () -> CallSessionManager, // Todo Rahul, can be made non-lambda ) { + @Deprecated( + "Do not use this constructor. CallState must not be constructed with CoroutineScope. " + + "It breaks call reusability and will be removed. Do not use this directly. Kept for binary compatibility.", + level = DeprecationLevel.ERROR, + ) + public constructor( + client: StreamVideo, + call: Call, + user: User, + scope: CoroutineScope, + ) : this(client, call, user, call.restartableProducerScope, { call.sessionManager }) + private val logger by taggedLogger("CallState") private var participantsVisibilityMonitor: Job? = null + @InternalStreamVideoApi + val scope: CoroutineScope + get() = restartableProducerScope + // Create a CallActions implementation that delegates to the Call object @InternalStreamVideoApi val callActions = object : CallActions { @@ -243,15 +222,15 @@ public class CallState( } override suspend fun pinParticipant(userId: String, sessionId: String) { - call.state.pin(userId, sessionId) + pin(userId, sessionId) } override suspend fun unpinParticipant(sessionId: String) { - call.state.unpin(sessionId) + unpin(sessionId) } override fun isLocalParticipant(sessionId: String): Boolean { - return sessionId == call.sessionId + return sessionId == sessionManager().sessionId.get() } } @@ -285,7 +264,9 @@ public class CallState( /** Your own participant state */ public val me: StateFlow = _participants.mapState { map -> - map[call.sessionId] ?: participants.value.find { it.isLocal } + map[sessionManager.invoke().sessionId.get()] ?: participants.value.find { + it.isLocal + } // TODO Rahul, made this needs to go inside init {} } /** Your own participant state */ @@ -298,7 +279,9 @@ public class CallState( /** participants other than yourself */ public val remoteParticipants: StateFlow> = - _participants.mapState { it.filterKeys { key -> key != call.sessionId }.values.toList() } + _participants.mapState { + it.filterKeys { key -> key != sessionManager.invoke().sessionId.get() }.values.toList() + } // TODO Rahul, made this needs to go inside init {} /** the dominant speaker */ private val _dominantSpeaker: MutableStateFlow = MutableStateFlow(null) @@ -309,22 +292,14 @@ public class CallState( internal val _serverPins: MutableStateFlow> = MutableStateFlow(emptyMap()) - internal val _pinnedParticipants: StateFlow> = - combine(_localPins, _serverPins) { local, server -> - val combined = mutableMapOf() - combined.putAll(local) - combined.putAll(server) - combined.toMap().asIterable().associate { - Pair(it.key, it.value.at) - } - }.stateIn(scope, SharingStarted.Eagerly, emptyMap()) + internal val _pinnedParticipants: StateFlow> /** * Pinned participants, combined value both from server and local pins. */ - val pinnedParticipants: StateFlow> = _pinnedParticipants + val pinnedParticipants: StateFlow> - val stats = CallStats(call, scope) + val stats = CallStats(call, restartableProducerScope) private val participantsUpdate = TaskSchedulerWithDebounce() private val participantsUpdateConfig = ScheduleConfig( @@ -412,15 +387,9 @@ public class CallState( """, ), ) - val livestream: StateFlow = - livestreamFlow.debounce(1000).stateIn(scope, SharingStarted.WhileSubscribed(10_000L), null) - - private var _sortedParticipantsState = SortedParticipantsState( - scope, - call, - _participants, - _pinnedParticipants, - ) + val livestream: StateFlow + + private var _sortedParticipantsState: SortedParticipantsState /** * Sorted participants based on @@ -433,7 +402,7 @@ public class CallState( * * Debounced 100ms to avoid rapid changes */ - val sortedParticipants = _sortedParticipantsState.asFlow().debounce(100) + val sortedParticipants: Flow> /** * Update participant sorting order @@ -492,24 +461,13 @@ public class CallState( } /** how long the call has been running, rounded to seconds, null if the call didn't start yet */ - public val duration: StateFlow = - _durationInMs.transform { emit(((it ?: 0L) / 1000L).toDuration(DurationUnit.SECONDS)) } - .stateIn(scope, SharingStarted.WhileSubscribed(10000L), null) + public val duration: StateFlow /** how many milliseconds the call has been running, null if the call didn't start yet */ - public val durationInMs: StateFlow = - _durationInMs.stateIn(scope, SharingStarted.WhileSubscribed(10000L), null) + public val durationInMs: StateFlow /** how many milliseconds the call has been running in the simple date format. */ - public val durationInDateFormat: StateFlow = durationInMs.mapState { durationInMs -> - if (durationInMs == null) { - null - } else { - val date = Date(durationInMs) - val dateFormat = SimpleDateFormat("HH:MM:SS", Locale.US) - dateFormat.format(date) - } - } + public val durationInDateFormat: StateFlow /** Check if you have permissions to do things like share your audio, video, screen etc */ public fun hasPermission(permission: String): StateFlow { @@ -548,20 +506,7 @@ public class CallState( * * @see [liveDuration] */ - public val liveDurationInMs = flow { - while (currentCoroutineContext().isActive) { - delay(1000) - - val liveStartedAt = _session.value?.liveStartedAt - val liveEndedAt = _session.value?.liveEndedAt ?: OffsetDateTime.now() - - liveStartedAt?.let { - val duration = liveEndedAt.toInstant().toEpochMilli() - liveStartedAt.toInstant() - .toEpochMilli() - emit(duration) - } - } - }.distinctUntilChanged().stateIn(scope, SharingStarted.WhileSubscribed(10000L), null) + public val liveDurationInMs: StateFlow /** * How long the call has been live for, represented as [Duration], or null if the call hasn't been live yet. @@ -569,9 +514,7 @@ public class CallState( * * @see [liveDurationInMs] */ - public val liveDuration = liveDurationInMs.mapState { durationInMs -> - durationInMs?.takeIf { it >= 1000 }?.let { (it / 1000).toDuration(DurationUnit.SECONDS) } - } + public val liveDuration: StateFlow private val _egress: MutableStateFlow = MutableStateFlow(null) val egress: StateFlow = _egress @@ -714,6 +657,83 @@ public class CallState( internal var incomingNotificationData = IncomingNotificationData(emptyMap()) + init { + /** + * If we assign [_pinnedParticipants] at declaration line, then [restartableProducerScope] + * will be null. As val's are assigned before constructor code has run. + * So we cannot use constructor args in class val + */ + _pinnedParticipants = RestartableStateFlow( + combine(_localPins, _serverPins) { local, server -> + val combined = mutableMapOf() + combined.putAll(local) + combined.putAll(server) + combined.toMap().asIterable().associate { + Pair(it.key, it.value.at) + } + }, + restartableProducerScope, + emptyMap(), + ) + + pinnedParticipants = _pinnedParticipants + + _sortedParticipantsState = SortedParticipantsState( + scope, + call, + _participants, + _pinnedParticipants, + ) + + sortedParticipants = _sortedParticipantsState.asFlow().debounce(100) + + liveDurationInMs = RestartableStateFlow( + flow { + while (currentCoroutineContext().isActive) { + delay(1000) + + val liveStartedAt = _session.value?.liveStartedAt + val liveEndedAt = _session.value?.liveEndedAt ?: OffsetDateTime.now() + + liveStartedAt?.let { + val duration = liveEndedAt.toInstant().toEpochMilli() - liveStartedAt.toInstant() + .toEpochMilli() + emit(duration) + } + } + }.distinctUntilChanged(), + restartableProducerScope, + null, + ) + + liveDuration = liveDurationInMs.mapState { durationInMs -> + durationInMs?.takeIf { it >= 1000 }?.let { (it / 1000).toDuration(DurationUnit.SECONDS) } + } + + livestream = RestartableStateFlow( + livestreamFlow.debounce(1000), + restartableProducerScope, + null, + SharingStarted.WhileSubscribed(10_000L), + ) + duration = RestartableStateFlow( + _durationInMs.transform { + emit(((it ?: 0L) / 1000L).toDuration(DurationUnit.SECONDS)) + }, + restartableProducerScope, null, SharingStarted.WhileSubscribed(10000L), + ) + durationInMs = RestartableStateFlow(_durationInMs, restartableProducerScope, null, SharingStarted.WhileSubscribed(10000L)) + durationInDateFormat = durationInMs.mapState { durationInMs -> + if (durationInMs == null) { + null + } else { + val date = Date(durationInMs) + val dateFormat = SimpleDateFormat("HH:MM:SS", Locale.US) + dateFormat.format(date) + } + } + } + fun handleEvent(event: VideoEvent) { logger.d { "[handleEvent] ${event::class.java.name.split(".").last()}" } @@ -823,7 +843,7 @@ public class CallState( } is CallEndedEvent -> { - call.state.cancelTimeout() + cancelTimeout() updateFromResponse(event.call) _endedAt.value = OffsetDateTime.now(Clock.systemUTC()) _endedByUser.value = event.user?.toUser() @@ -973,7 +993,7 @@ public class CallState( } is ChangePublishQualityEvent -> { - call.session?.handleEvent(event) + sessionManager().session.get()?.handleEvent(event) } is ErrorEvent -> { @@ -1184,9 +1204,9 @@ public class CallState( scope.launch { val callServiceConfig = StreamVideo.instanceOrNull()?.state?.callConfigRegistry?.get(call.type) ?: CallServiceConfig() if (callServiceConfig.moderationConfig.videoModerationConfig.enable) { - call.state.moderationManager.applyVideoModeration() + moderationManager.applyVideoModeration() delay(callServiceConfig.moderationConfig.videoModerationConfig.blurDuration) - call.state.moderationManager.clearVideoModeration() + moderationManager.clearVideoModeration() } } } @@ -1419,7 +1439,7 @@ public class CallState( } else { ParticipantState( sessionId = sessionId, - scope = scope, + restartableProducerScope = restartableProducerScope, callActions = callActions, initialUserId = userId, source = source, @@ -1460,6 +1480,7 @@ public class CallState( fun clearParticipants() { internalParticipants.clear() + pendingParticipantsJoined.clear() _participants.value = HashMap(internalParticipants) } @@ -1682,6 +1703,42 @@ public class CallState( } } + internal fun clearSessionState() { + clearParticipants() + _activeSpeakers.value = emptyList() + _dominantSpeaker.value = null + _screenSharingSession.value = null + pendingParticipantsJoined.clear() + + _localPins.value = emptyMap() + _serverPins.value = emptyMap() + + _session.value = null + _participantCounts.value = null + + _ringingState.value = RingingState.Idle + _acceptedBy.value = emptySet() + _rejectedBy.value = emptySet() + _rejectActionBundle.value = null + _startedAt.value = null + _reactions.value = emptyList() + _errors.value = emptyList() + _permissionRequests.value = emptyList() + _speakingWhileMuted.value = false + _participantVideoEnabledOverrides.value = emptyMap() + acceptedOnThisDevice = false + + _recording.value = false + _transcribing.value = false + _broadcasting.value = false + _egress.value = null + + _notificationIdFlow.value = null + atomicNotification.set(null) + + cancelTimeout() + } + fun updateRejectedBy(userId: Set) { _rejectedBy.value = userId } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt index 9e6ab56d45..3f9a3bb8d1 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/CallStats.kt @@ -19,12 +19,12 @@ package io.getstream.video.android.core import android.os.Build import io.getstream.log.taggedLogger import io.getstream.video.android.core.call.stats.model.RtcStatsReport +import io.getstream.video.android.core.coroutines.flows.RestartableStateFlow +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.stateIn import org.webrtc.CameraEnumerationAndroid import org.webrtc.RTCStats import stream.video.sfu.models.TrackType @@ -81,18 +81,37 @@ public data class LocalStats( val deviceModel: String, ) -public class CallStats(val call: Call, val callScope: CoroutineScope) { +// TODO Rahul, need to pass RestartableScope +public class CallStats internal constructor( + val call: Call, + private val restartableProducerScope: RestartableProducerScope, +) { + + @Deprecated( + "Kept for binary compatibility.", + level = DeprecationLevel.ERROR, + ) + public constructor( + call: Call, + callScope: CoroutineScope, + ) : this(call, RestartableProducerScope()) private val logger by taggedLogger("CallStats") private val supervisorJob = SupervisorJob() - private val scope = CoroutineScope(callScope.coroutineContext + supervisorJob) + + private val scope = CoroutineScope(restartableProducerScope.coroutineContext + supervisorJob) // TODO: cleanup the scope val publisher = PeerConnectionStats(scope) val subscriber = PeerConnectionStats(scope) val _local = MutableStateFlow(null) - val local: StateFlow = - _local.stateIn(scope, SharingStarted.WhileSubscribed(), null) + val local: StateFlow = RestartableStateFlow(_local, restartableProducerScope, null) + + @Deprecated( + "Kept for binary compatibility.", + level = DeprecationLevel.ERROR, + ) + fun getCallScope(): CoroutineScope = restartableProducerScope fun updateFromRTCStats(stats: RtcStatsReport?, isPublisher: Boolean = true) { if (stats == null) return diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt index 373f5424f6..77b46c77a7 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManager.kt @@ -75,6 +75,8 @@ import stream.video.sfu.models.AudioBitrateProfile import stream.video.sfu.models.VideoDimension import java.nio.ByteBuffer import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.resumeWithException sealed class DeviceStatus { @@ -279,7 +281,7 @@ class ScreenShareManager( private val logger by taggedLogger("Media:ScreenShareManager") - private val _status = MutableStateFlow(DeviceStatus.NotSelected) + internal val _status = MutableStateFlow(DeviceStatus.NotSelected) val status: StateFlow = _status public val isEnabled: StateFlow = _status.mapState { it is DeviceStatus.Enabled } @@ -544,13 +546,14 @@ class MicrophoneManager( // Internal data private val logger by taggedLogger("Media:MicrophoneManager") - private lateinit var audioHandler: AudioHandler - private var setupCompleted: Boolean = false + private var audioHandler: AudioHandler? = null + private var setupCompleted: AtomicBoolean = AtomicBoolean(false) + private var mediaManagerSetupState = AtomicReference(MediaManagerSetupState.NONE) internal var audioManager: AudioManager? = null internal var priorStatus: DeviceStatus? = null // Exposed state - private val _status = MutableStateFlow(DeviceStatus.NotSelected) + internal val _status = MutableStateFlow(DeviceStatus.NotSelected) /** The status of the audio */ val status: StateFlow = _status @@ -716,7 +719,16 @@ class MicrophoneManager( fun cleanup() { ifAudioHandlerInitialized { it.stop() } - setupCompleted = false + setupCompleted.set(false) + audioHandler = null + } + + /** + * Resets the microphone status to NotSelected to allow re-initialization on next join. + */ + internal fun reset() { + _status.value = DeviceStatus.NotSelected + mediaManagerSetupState.set(MediaManagerSetupState.NONE) } fun canHandleDeviceSwitch() = audioUsageProvider.invoke() != AudioAttributes.USAGE_MEDIA @@ -724,12 +736,26 @@ class MicrophoneManager( // Internal logic internal fun setup(preferSpeaker: Boolean = false, onAudioDevicesUpdate: (() -> Unit)? = null) { synchronized(this) { + logger.d { + "[setup] setupCompleted = ${setupCompleted.get()}, mediaManagerSetupState = ${mediaManagerSetupState.get()}" + } + val localMediaManagerSetupState = mediaManagerSetupState.get() + when (localMediaManagerSetupState) { + MediaManagerSetupState.FINISHED -> { + onAudioDevicesUpdate?.invoke() + return@synchronized + } + MediaManagerSetupState.STARTED -> return@synchronized // TODO Rahul, ideally the method call should be queued. Test this before merge + else -> {} + } + + mediaManagerSetupState.set(MediaManagerSetupState.STARTED) + var capturedOnAudioDevicesUpdate = onAudioDevicesUpdate - if (setupCompleted) { + if (setupCompleted.get()) { capturedOnAudioDevicesUpdate?.invoke() capturedOnAudioDevicesUpdate = null - return } @@ -738,41 +764,50 @@ class MicrophoneManager( audioManager?.allowedCapturePolicy = AudioAttributes.ALLOW_CAPTURE_BY_ALL } - if (canHandleDeviceSwitch() && !::audioHandler.isInitialized) { - audioHandler = AudioSwitchHandler( - context = mediaManager.context, - preferredDeviceList = listOf( - AudioDevice.BluetoothHeadset::class.java, - AudioDevice.WiredHeadset::class.java, - ) + if (preferSpeaker) { - listOf( - AudioDevice.Speakerphone::class.java, - AudioDevice.Earpiece::class.java, - ) - } else { - listOf( - AudioDevice.Earpiece::class.java, - AudioDevice.Speakerphone::class.java, - ) - }, - audioDeviceChangeListener = { devices, selected -> - logger.i { "[audioSwitch] audio devices. selected $selected, available devices are $devices" } - - _devices.value = devices.map { it.fromAudio() } - _selectedDevice.value = selected?.fromAudio() - - setupCompleted = true - - capturedOnAudioDevicesUpdate?.invoke() - capturedOnAudioDevicesUpdate = null - }, - ) + if (canHandleDeviceSwitch()) { + if (audioHandler == null) { + // First time initialization + audioHandler = AudioSwitchHandler( + context = mediaManager.context, + preferredDeviceList = listOf( + AudioDevice.BluetoothHeadset::class.java, + AudioDevice.WiredHeadset::class.java, + ) + if (preferSpeaker) { + listOf( + AudioDevice.Speakerphone::class.java, + AudioDevice.Earpiece::class.java, + ) + } else { + listOf( + AudioDevice.Earpiece::class.java, + AudioDevice.Speakerphone::class.java, + ) + }, + audioDeviceChangeListener = { devices, selected -> + logger.i { "[audioSwitch] audio devices. selected $selected, available devices are $devices" } + + _devices.value = devices.map { it.fromAudio() } + _selectedDevice.value = selected?.fromAudio() + + setupCompleted.set(true) + mediaManagerSetupState.set(MediaManagerSetupState.FINISHED) + capturedOnAudioDevicesUpdate?.invoke() + capturedOnAudioDevicesUpdate = null + }, + ) - logger.d { "[setup] Calling start on instance $audioHandler" } - audioHandler.start() + logger.d { "[setup] Calling start on instance $audioHandler" } + audioHandler?.start() + } else { + // audioHandler exists but was stopped (cleanup was called), restart it + logger.d { "[setup] Restarting audioHandler after cleanup" } + audioHandler?.start() + mediaManagerSetupState.set(MediaManagerSetupState.FINISHED) + } } else { - logger.d { "[MediaManager#setup] Usage is MEDIA or audioHandle is already initialized" } + logger.d { "[MediaManager#setup] Usage is MEDIA" } capturedOnAudioDevicesUpdate?.invoke() + mediaManagerSetupState.set(MediaManagerSetupState.FINISHED) } } } @@ -783,7 +818,7 @@ class MicrophoneManager( ) private fun ifAudioHandlerInitialized(then: (audioHandler: AudioSwitchHandler) -> Unit) { - if (this::audioHandler.isInitialized) { + if (audioHandler != null) { then(this.audioHandler as AudioSwitchHandler) } else { logger.e { "Audio handler not initialized. Ensure calling setup(), before using the handler." } @@ -829,7 +864,7 @@ class CameraManager( private val logger by taggedLogger("Media:CameraManager") /** The status of the camera. enabled or disabled */ - private val _status = MutableStateFlow(DeviceStatus.NotSelected) + internal val _status = MutableStateFlow(DeviceStatus.NotSelected) public val status: StateFlow = _status /** Represents whether the camera is enabled */ @@ -1174,6 +1209,13 @@ class CameraManager( setupCompleted = false } + /** + * Resets the camera status to NotSelected to allow re-initialization on next join. + */ + internal fun reset() { + _status.value = DeviceStatus.NotSelected + } + private fun createCameraDeviceWrapper( id: String, cameraManager: CameraManager?, @@ -1255,6 +1297,7 @@ class MediaManagerImpl( val audioUsage: Int = defaultAudioUsage, val audioUsageProvider: (() -> Int) = { audioUsage }, ) { + private val logger by taggedLogger("MediaManagerImpl") internal val camera = CameraManager(this, eglBaseContext, DefaultCameraCharacteristicsValidator()) internal val microphone = MicrophoneManager(this, audioUsage, audioUsageProvider) @@ -1376,6 +1419,20 @@ class MediaManagerImpl( // Cleanup camera and microphone infrastructure camera.cleanup() microphone.cleanup() + reset() + } + + /** + * Resets device statuses to NotSelected to allow re-initialization on next join. + * Should be called after cleanup when preparing for rejoin. + */ + internal fun reset() { + logger.d { "[reset]" } + camera.reset() + microphone.reset() + + speaker._status.value = DeviceStatus.NotSelected + screenShare._status.value = DeviceStatus.NotSelected } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManagerSetupState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManagerSetupState.kt new file mode 100644 index 0000000000..38df13b8be --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/MediaManagerSetupState.kt @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core + +internal enum class MediaManagerSetupState { + NONE, STARTED, FINISHED +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt index 7ae7d26a53..3c06fcfd67 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/ParticipantState.kt @@ -20,6 +20,8 @@ import androidx.compose.runtime.Stable import io.getstream.android.video.generated.models.MuteUsersResponse import io.getstream.log.taggedLogger import io.getstream.result.Result +import io.getstream.video.android.core.coroutines.flows.RestartableStateFlow +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope import io.getstream.video.android.core.internal.InternalStreamVideoApi import io.getstream.video.android.core.model.AudioTrack import io.getstream.video.android.core.model.MediaTrack @@ -31,10 +33,8 @@ import io.getstream.video.android.core.utils.combineStates import io.getstream.video.android.core.utils.mapState import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.combine -import kotlinx.coroutines.flow.stateIn import org.threeten.bp.Instant import org.threeten.bp.OffsetDateTime import org.threeten.bp.ZoneOffset @@ -46,24 +46,43 @@ import stream.video.sfu.models.TrackType * Represents the state of a participant in a call. * * * A list of participants is shared when you join a call the SFU send you the participant joined event. - * + * @param sessionId The SFU returns a session id for each participant. This session id is unique + * @param scope The coroutine scope for this participant + * @param callActions The call actions interface for performing operations on this participant + * @param initialUserId The current version of the user, this is the start for participant.user stateflow + * @param source A prefix to identify tracks, internal + * @param trackLookupPrefix */ -@Stable -public data class ParticipantState( - /** The SFU returns a session id for each participant. This session id is unique */ +@Stable // TODO Rahul, need to fix its breaking change before merge +public data class ParticipantState internal constructor( var sessionId: String = "", - /** The coroutine scope for this participant */ - private val scope: CoroutineScope, - /** The call actions interface for performing operations on this participant */ + private val restartableProducerScope: RestartableProducerScope, private val callActions: CallActions, - /** The current version of the user, this is the start for participant.user stateflow */ private val initialUserId: String, val source: ParticipantSource = ParticipantSource.PARTICIPANT_SOURCE_WEBRTC_UNSPECIFIED, - /** A prefix to identify tracks, internal */ @InternalStreamVideoApi var trackLookupPrefix: String = "", ) { + @Deprecated( + "Kept for binary compatibility.", + level = DeprecationLevel.ERROR, + ) + public constructor( + sessionId: String = "", + scope: CoroutineScope, + callActions: CallActions, + initialUserId: String, + source: ParticipantSource = ParticipantSource.PARTICIPANT_SOURCE_WEBRTC_UNSPECIFIED, + trackLookupPrefix: String = "", + ) : this( + sessionId, + RestartableProducerScope(), + callActions, + initialUserId, + source, + trackLookupPrefix, + ) private val logger by taggedLogger("ParticipantState") val isLocal by lazy { @@ -156,18 +175,22 @@ public data class ParticipantState( internal val _reactions = MutableStateFlow>(emptyList()) val reactions: StateFlow> = _reactions - val video: StateFlow = combine( - _videoTrack, - _videoEnabled, - _videoPaused, - ) { track, enabled, paused -> - Video( - sessionId = sessionId, - track = track, - enabled = enabled, - paused = paused, - ) - }.stateIn(scope, SharingStarted.Lazily, null) + val video: StateFlow = RestartableStateFlow( + combine( + _videoTrack, + _videoEnabled, + _videoPaused, + ) { track, enabled, paused -> + Video( + sessionId = sessionId, + track = track, + enabled = enabled, + paused = paused, + ) + }, + restartableProducerScope, + null, + ) val audio: StateFlow = combineStates(_audioTrack, _audioEnabled) { track, enabled -> Audio( diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RealtimeConnection.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RealtimeConnection.kt new file mode 100644 index 0000000000..6050ed8bb1 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/RealtimeConnection.kt @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core + +import androidx.compose.runtime.Stable +import io.getstream.video.android.core.call.RtcSession + +@Stable +public sealed interface RealtimeConnection { + /** + * We start out in the PreJoin state. This is before call.join is called + */ + public data object PreJoin : RealtimeConnection + + /** + * Join is in progress + */ + public data object InProgress : RealtimeConnection + + /** + * We set the state to Joined as soon as the call state is available + */ + public data class Joined(val session: RtcSession) : + RealtimeConnection // joined, participant state is available, you can render the call. Video isn't ready yet + + /** + * True when the peer connections are ready + */ + public data object Connected : + RealtimeConnection // connected to RTC, able to receive and send video + + /** + * Reconnecting is true whenever Rtc isn't available and trying to recover + * If the subscriber peer connection breaks we'll reconnect + * If the publisher peer connection breaks we'll reconnect + * Also if the network provider from the OS says that internet is down we'll set it to reconnecting + */ + public data object Reconnecting : + RealtimeConnection // reconnecting to recover from temporary issues + + public data object Migrating : RealtimeConnection + public data class Failed(val error: Any) : RealtimeConnection // permanent failure + public data object Disconnected : RealtimeConnection // normal disconnect by the app +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt index eba6d357c4..04263d376c 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/StreamVideoClient.kt @@ -213,7 +213,7 @@ internal class StreamVideoClient internal constructor( destroyedCalls.put(call.hashCode(), call) } logger.d { "[cleanup] Removing call from cache: ${call.cid}" } - calls.remove(call.cid) +// calls.remove(call.cid) TODO Rahul, uncomment before merge } override fun cleanup() { diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallApiDelegate.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallApiDelegate.kt new file mode 100644 index 0000000000..00ae8c83c3 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallApiDelegate.kt @@ -0,0 +1,335 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import android.content.Intent +import android.graphics.Bitmap +import androidx.annotation.VisibleForTesting +import io.getstream.android.video.generated.models.AcceptCallResponse +import io.getstream.android.video.generated.models.CallSettingsRequest +import io.getstream.android.video.generated.models.GetCallResponse +import io.getstream.android.video.generated.models.GetOrCreateCallResponse +import io.getstream.android.video.generated.models.GoLiveResponse +import io.getstream.android.video.generated.models.JoinCallResponse +import io.getstream.android.video.generated.models.MemberRequest +import io.getstream.android.video.generated.models.MuteUsersResponse +import io.getstream.android.video.generated.models.OwnCapability +import io.getstream.android.video.generated.models.StopLiveResponse +import io.getstream.android.video.generated.models.UpdateCallRequest +import io.getstream.android.video.generated.models.UpdateCallResponse +import io.getstream.log.taggedLogger +import io.getstream.result.Result +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CreateCallOptions +import io.getstream.video.android.core.RingingState +import io.getstream.video.android.core.ScreenShareManager +import io.getstream.video.android.core.StreamVideoClient +import io.getstream.video.android.core.call.video.YuvFrame +import io.getstream.video.android.core.model.MuteUsersData +import io.getstream.video.android.core.model.QueriedMembers +import io.getstream.video.android.core.model.SortField +import io.getstream.video.android.core.model.VideoTrack +import io.getstream.video.android.core.utils.toQueriedMembers +import kotlinx.coroutines.launch +import kotlinx.coroutines.suspendCancellableCoroutine +import org.threeten.bp.OffsetDateTime +import org.webrtc.VideoSink +import kotlin.coroutines.resume + +internal class CallApiDelegate( + private val clientImpl: StreamVideoClient, + private val type: String, + private val id: String, + private val call: Call, + private val screenShareProvider: () -> ScreenShareManager, + private val setScreenTrackCallBack: () -> Unit, +) { + private val logger by taggedLogger("CallApiDelegate call:$type:$id") + + suspend fun get(): Result { + return clientImpl.getCall(type, id) + } + + suspend fun create( + memberIds: List? = null, + members: List? = null, + custom: Map? = null, + settings: CallSettingsRequest? = null, + startsAt: OffsetDateTime? = null, + team: String? = null, + ring: Boolean = false, + notify: Boolean = false, + video: Boolean? = null, + ): Result { + val response = if (members != null) { + clientImpl.getOrCreateCallFullMembers( + type = type, + id = id, + members = members, + custom = custom, + settingsOverride = settings, + startsAt = startsAt, + team = team, + ring = ring, + notify = notify, + video = video, + ) + } else { + clientImpl.getOrCreateCall( + type = type, + id = id, + memberIds = memberIds, + custom = custom, + settingsOverride = settings, + startsAt = startsAt, + team = team, + ring = ring, + notify = notify, + video = video, + ) + } + + response.onSuccess { + call.state.updateFromResponse(it) + if (ring) { + clientImpl.state.addRingingCall(call, RingingState.Outgoing()) + } + } + return response + } + + suspend fun update( + custom: Map? = null, + settingsOverride: CallSettingsRequest? = null, + startsAt: OffsetDateTime? = null, + ): Result { + val request = UpdateCallRequest( + custom = custom, + settingsOverride = settingsOverride, + startsAt = startsAt, + ) + val response = clientImpl.updateCall(type, id, request) + response.onSuccess { + call.state.updateFromResponse(it) + } + return response + } + + suspend fun goLive( + startHls: Boolean = false, + startRecording: Boolean = false, + startTranscription: Boolean = false, + ): Result { + val result = clientImpl.goLive( + type = type, + id = id, + startHls = startHls, + startRecording = startRecording, + startTranscription = startTranscription, + ) + result.onSuccess { call.state.updateFromResponse(it) } + + return result + } + + suspend fun stopLive(): Result { + val result = clientImpl.stopLive(type, id) + result.onSuccess { call.state.updateFromResponse(it) } + return result + } + + /** + * User needs to have [OwnCapability.Screenshare] capability in order to start screen + * sharing. + * + * @param mediaProjectionPermissionResultData - intent data returned from the + * activity result after asking for screen sharing permission by launching + * MediaProjectionManager.createScreenCaptureIntent(). + * See https://developer.android.com/guide/topics/large-screens/media-projection#recommended_approach + */ + fun startScreenSharing( + mediaProjectionPermissionResultData: Intent, + includeAudio: Boolean = false, + ) { + if (call.state.ownCapabilities.value.contains(OwnCapability.Screenshare)) { + setScreenTrackCallBack.invoke() + screenShareProvider.invoke().enable( + mediaProjectionPermissionResultData, + includeAudio = includeAudio, + ) + } else { + logger.w { "Can't start screen sharing - user doesn't have wnCapability.Screenshare permission" } + } + } + + fun stopScreenSharing() { + screenShareProvider.invoke().disable(fromUser = true) + } + + suspend fun startHLS(): Result { + return clientImpl.startBroadcasting(type, id) + .onSuccess { + call.state.updateFromResponse(it) + } + } + + suspend fun stopHLS(): Result { + return clientImpl.stopBroadcasting(type, id) + } + + suspend fun accept(): Result { + logger.d { "[accept] #ringing; no args, call_id:$id" } + call.state.acceptedOnThisDevice = true + + clientImpl.state.removeRingingCall(call) + clientImpl.state.maybeStopForegroundService(call = call) + return clientImpl.accept(type, id) + } + + fun collectUserFeedback( + rating: Int, + reason: String? = null, + custom: Map? = null, + ) { + call.scope.launch { + clientImpl.collectFeedback( + callType = type, + id = id, + sessionId = call.sessionId, + rating = rating, + reason = reason, + custom = custom, + ) + } + } + + suspend fun takeScreenshot(track: VideoTrack): Bitmap? { + return suspendCancellableCoroutine { continuation -> + var screenshotSink: VideoSink? = null + screenshotSink = VideoSink { + // make sure we stop after first frame is delivered + if (!continuation.isActive) { + return@VideoSink + } + it.retain() + val bitmap = YuvFrame.bitmapFromVideoFrame(it) + it.release() + + // This has to be launched asynchronously - removing the sink on the + // same thread as the videoframe is delivered will lead to a deadlock + // (needs investigation why) + call.scope.launch { + track.video.removeSink(screenshotSink) + } + continuation.resume(bitmap) + } + + track.video.addSink(screenshotSink) + } + } + + suspend fun muteUser( + userId: String, + audio: Boolean = true, + video: Boolean = false, + screenShare: Boolean = false, + ): Result { + val request = MuteUsersData( + users = listOf(userId), + muteAllUsers = false, + audio = audio, + video = video, + screenShare = screenShare, + ) + return clientImpl.muteUsers(type, id, request) + } + + suspend fun muteUsers( + userIds: List, + audio: Boolean = true, + video: Boolean = false, + screenShare: Boolean = false, + ): Result { + val request = MuteUsersData( + users = userIds, + muteAllUsers = false, + audio = audio, + video = video, + screenShare = screenShare, + ) + return clientImpl.muteUsers(type, id, request) + } + + suspend fun muteAllUsers( + audio: Boolean = true, + video: Boolean = false, + screenShare: Boolean = false, + ): Result { + val request = MuteUsersData( + muteAllUsers = true, + audio = audio, + video = video, + screenShare = screenShare, + ) + return clientImpl.muteUsers(type, id, request) + } + + suspend fun queryMembers( + filter: Map, + sort: List = mutableListOf(SortField.Desc("created_at")), + limit: Int = 25, + prev: String? = null, + next: String? = null, + ): Result { + return clientImpl.queryMembersInternal( + type = type, + id = id, + filter = filter, + sort = sort, + prev = prev, + next = next, + limit = limit, + ).onSuccess { call.state.updateFromResponse(it) }.map { it.toQueriedMembers() } + } + + @VisibleForTesting + internal suspend fun joinRequest( + create: CreateCallOptions? = null, + location: String, + migratingFrom: String? = null, + ring: Boolean = false, + notify: Boolean = false, + ): Result { + val result = clientImpl.joinCall( + type, id, + create = create != null, + members = create?.memberRequestsFromIds(), + custom = create?.custom, + settingsOverride = create?.settings, + startsAt = create?.startsAt, + team = create?.team, + ring = ring, + notify = notify, + location = location, + migratingFrom = migratingFrom, + ) + result.onSuccess { + call.state.updateFromResponse(it) + } + return result + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallCleanupManager.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallCleanupManager.kt new file mode 100644 index 0000000000..99b729143e --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallCleanupManager.kt @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.log.taggedLogger +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.MediaManagerImpl +import io.getstream.video.android.core.RealtimeConnection +import io.getstream.video.android.core.StreamVideoClient +import io.getstream.video.android.core.notifications.internal.telecom.TelecomCallController +import io.getstream.video.android.core.socket.common.scope.ClientScope +import io.getstream.video.android.core.socket.common.scope.UserScope +import io.getstream.video.android.core.utils.safeCall +import kotlinx.coroutines.cancel +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.withLock +import kotlin.sequences.forEach + +/** + * Manages call cleanup + */ +internal class CallCleanupManager( + private val call: Call, + private val sessionManager: CallSessionManager, + private val callApiDelegate: CallApiDelegate, + private val client: StreamVideoClient, + private val callReInitializer: CallReInitializer, + private val mediaManagerProvider: () -> MediaManagerImpl, // ← Lambda provider + private val callStatsReporter: CallStatsReporter, +) { + private val logger by taggedLogger("CallLifecycleManager") + private val mediaManager: MediaManagerImpl by lazy { + mediaManagerProvider() + } + + fun leave(reason: String = "user") { + logger.d { "[leave] #ringing; no args, call_cid:${call.cid}" } + + callReInitializer.currentScope.launch { + val shouldProceed = !isCleanupInProgress() + if (shouldProceed) { + internalLeave(null, reason) + } + } + } + + private suspend fun isCleanupInProgress(): Boolean { + return callReInitializer.cleanupMutex.withLock { + val currentJob = callReInitializer.cleanupJob + if (currentJob?.isActive == true) { + logger.w { + "[isCleanupInProgress] Cleanup already in progress (job: $currentJob), " + + "ignoring duplicate leave call" + } + true + } else { + logger.v { "[isCleanupInProgress] No active cleanup, proceeding with leave" } + false + } + } + } + + private fun internalLeave(disconnectionReason: Throwable?, reason: String) = call.atomicLeave { + sessionManager.cleanupMonitor() + + val currentSession = sessionManager.session.get() + // Leave session + currentSession?.leaveWithReason( + "[reason=$reason, error=${disconnectionReason?.message}]", + ) + + // Update connection state + call.state._connection.value = RealtimeConnection.Disconnected + + logger.v { "[leave] #ringing; disconnectionReason: $disconnectionReason, call_id = ${call.id}" } + if (call.isDestroyed.get()) { + logger.w { "[leave] #ringing; Call already destroyed, ignoring" } + return@atomicLeave + } + call.isDestroyed.set(true) + + /** + * TODO Rahul, need to check which call has owned the media at the moment(probably use active call) + */ + callApiDelegate.stopScreenSharing() + with(mediaManager) { + camera.disable() + microphone.disable() + } + + if (call.id == client.state.activeCall.value?.id) { + client.state.removeActiveCall(call) // Will also stop CallService + } + + if (call.id == client.state.ringingCall.value?.id) { + client.state.removeRingingCall(call) + } + + TelecomCallController(client.context) + .leaveCall(call) + + client.onCallCleanUp(call) + + val newCleanupJob = client.scope.launch { + safeCall { + currentSession?.let { session -> + with(session) { + sfuTracer.trace( + "leave-call", + "[reason=$reason, error=${disconnectionReason?.message}]", + ) + session.sendCallStats(callStatsReporter.collectStats(session)) + } + } + } + cleanup() + } + with(callReInitializer) { + cleanupJobReference(newCleanupJob) + cleanupLockVars(newCleanupJob) + } + } + + internal fun cleanup() { + logger.d { "[cleanup] Starting cleanup" } + + sessionManager.cleanup() + shutDownJobsGracefully() + callStatsReporter.cancelJobs() + + // Access mediaManager through lazy provider + cleanupMedia() + call.scopeProvider.cleanup() + logger.d { "[cleanup] Cleanup complete" } + } + + fun cleanupMedia() { + mediaManager.cleanup() + } + + internal fun shutDownJobsGracefully() { + UserScope(ClientScope()).launch { + callReInitializer.currentSupervisorJob.children.forEach { it.join() } + callReInitializer.currentSupervisorJob.cancel() + } + callReInitializer.currentScope.cancel() + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallConnectivityMonitor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallConnectivityMonitor.kt new file mode 100644 index 0000000000..bd6666e70c --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallConnectivityMonitor.kt @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.log.taggedLogger +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope +import io.getstream.video.android.core.internal.network.NetworkStateProvider +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +internal class CallConnectivityMonitor( + val callScope: RestartableProducerScope, + val state: CallConnectivityMonitorState, + val leaveAfterDisconnectSeconds: Long, + onFastReconnect: suspend () -> Unit, + onRejoin: suspend () -> Unit, + onDisconnected: suspend () -> Unit, + onLeaveTimeout: suspend () -> Unit, +) { + private val logger by taggedLogger("CallConnectivityMonitor") + private var leaveTimeoutAfterDisconnect: Job? = null + + internal val listener = object : NetworkStateProvider.NetworkStateListener { + override suspend fun onConnected() { + leaveTimeoutAfterDisconnect?.cancel() + val elapsedTimeMils = System.currentTimeMillis() - state.lastDisconnect + logger.d { + "[NetworkStateListener#onConnected] #network; no args, elapsedTimeMils:$elapsedTimeMils, lastDisconnect:${state.lastDisconnect}, reconnectDeadlineMils:${state.reconnectDeadlineMils}" + } + if (state.lastDisconnect > 0 && elapsedTimeMils < state.reconnectDeadlineMils) { + logger.d { + "[NetworkStateListener#onConnected] #network; Reconnecting (fast). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${state.reconnectDeadlineMils / 1000} seconds" + } + onFastReconnect() + } else { + logger.d { + "[NetworkStateListener#onConnected] #network; Reconnecting (full). Time since last disconnect is ${elapsedTimeMils / 1000} seconds. Deadline is ${state.reconnectDeadlineMils / 1000} seconds" + } + onRejoin() + } + } + + override suspend fun onDisconnected() { + onDisconnected() + logger.d { + "[NetworkStateListener#onDisconnected] #network; old lastDisconnect:${state.lastDisconnect}, clientImpl.leaveAfterDisconnectSeconds:$leaveAfterDisconnectSeconds" + } + state.lastDisconnect = System.currentTimeMillis() + logger.d { + "[NetworkStateListener#onDisconnected] #network; new lastDisconnect:${state.lastDisconnect}" + } + leaveTimeoutAfterDisconnect = callScope.launch { + delay(leaveAfterDisconnectSeconds * 1000) + logger.d { + "[NetworkStateListener#onDisconnected] #network; Leaving after being disconnected for $leaveAfterDisconnectSeconds" + } + onLeaveTimeout() + } + logger.d { "[NetworkStateListener#onDisconnected] #network; at ${state.lastDisconnect}" } + } + } + + fun reset() { + leaveTimeoutAfterDisconnect?.cancel() + } +} + +internal data class CallConnectivityMonitorState(var lastDisconnect: Long = 0L, var reconnectDeadlineMils: Int = 10_000) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallEventManager.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallEventManager.kt new file mode 100644 index 0000000000..e2fb449f41 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallEventManager.kt @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.android.video.generated.models.VideoEvent +import io.getstream.log.taggedLogger +import io.getstream.video.android.core.EventSubscription +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope +import io.getstream.video.android.core.events.GoAwayEvent +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.launch +import kotlin.collections.forEach +import kotlin.let + +internal class CallEventManager( + private val events: MutableSharedFlow, + private val sessionManager: CallSessionManager, + private val callScope: RestartableProducerScope, + private val subscriptionsProvider: () -> Set, +) { + + private val logger by taggedLogger("CallEventManager") + + fun fireEvent(event: VideoEvent) = synchronized(subscriptionsProvider.invoke()) { + subscriptionsProvider.invoke().forEach { sub -> + if (!sub.isDisposed) { + // subs without filters should always fire + if (sub.filter == null) { + sub.listener.onEvent(event) + } + + // if there is a filter, check it and fire if it matches + sub.filter?.let { + if (it.invoke(event)) { + sub.listener.onEvent(event) + } + } + } + } + + if (!events.tryEmit(event)) { + logger.e { "Failed to emit event to observers: [event: $event]" } + } + } + + fun handleEvent(event: VideoEvent) { + logger.v { "[call handleEvent] #sfu; event.type: ${event.getEventType()}" } + + when (event) { + is GoAwayEvent -> + callScope.launch { + sessionManager.migrate() + } + } + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallIceConnectionMonitor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallIceConnectionMonitor.kt new file mode 100644 index 0000000000..5ff973b479 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallIceConnectionMonitor.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.log.taggedLogger +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch +import org.webrtc.PeerConnection + +internal class CallIceConnectionMonitor( + private val scope: RestartableProducerScope, + private val sessionProvider: () -> RtcSession?, +) { + + private val logger by taggedLogger("CallIceConnectionMonitor") + + private var publisherJob: Job? = null + private var subscriberJob: Job? = null + + fun start() { + stop() + + publisherJob = scope.launch { + sessionProvider()?.publisher?.iceState?.collect { state -> + when (state) { + PeerConnection.IceConnectionState.FAILED, + PeerConnection.IceConnectionState.DISCONNECTED, + -> { + sessionProvider()?.publisher?.connection?.restartIce() + } + + else -> { + logger.d { "[publisher] ICE state = $state" } + } + } + } + } + + subscriberJob = scope.launch { + sessionProvider()?.subscriber?.iceState?.collect { state -> + when (state) { + PeerConnection.IceConnectionState.FAILED, + PeerConnection.IceConnectionState.DISCONNECTED, + -> { + sessionProvider()?.requestSubscriberIceRestart() + } + + else -> { + logger.d { "[subscriber] ICE state = $state" } + } + } + } + } + } + + fun stop() { + publisherJob?.cancel() + subscriberJob?.cancel() + publisherJob = null + subscriberJob = null + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallJoinContract.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallJoinContract.kt new file mode 100644 index 0000000000..7bb7bf97b3 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallJoinContract.kt @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.result.Result +import io.getstream.video.android.core.CreateCallOptions + +internal interface CallJoinContract { + + suspend fun join( + create: Boolean, + createOptions: CreateCallOptions?, + ring: Boolean, + notify: Boolean, + ): Result + + suspend fun rejoin(reason: String) +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallJoinCoordinator.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallJoinCoordinator.kt new file mode 100644 index 0000000000..fb89b0a27f --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallJoinCoordinator.kt @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import android.annotation.SuppressLint +import io.getstream.log.taggedLogger +import io.getstream.result.Error +import io.getstream.result.Result +import io.getstream.result.Result.Failure +import io.getstream.result.Result.Success +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CreateCallOptions +import io.getstream.video.android.core.RealtimeConnection +import io.getstream.video.android.core.StreamVideoClient +import io.getstream.video.android.core.utils.AtomicUnitCall +import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl +import kotlinx.coroutines.delay +import kotlin.collections.plusAssign + +private const val PERMISSION_ERROR = "\n[Call.join()] called without having the required permissions.\n" + + "This will work only if you have [runForegroundServiceForCalls = false] in the StreamVideoBuilder.\n" + + "The reason is that [Call.join()] will by default start an ongoing call foreground service,\n" + + "To start this service and send the appropriate audio/video tracks the permissions are required,\n" + + "otherwise the service will fail to start, resulting in a crash.\n" + + "You can re-define your permissions and their expected state by overriding the [permissionCheck] in [StreamVideoBuilder]\n" + +internal class CallJoinCoordinator( + private val call: Call, + private val client: StreamVideoClient, + private val callReInitializer: CallReInitializer, + private val streamSingleFlightProcessorImpl: StreamSingleFlightProcessorImpl, + private val onJoinFail: () -> Unit, + private val createJoinSession: suspend ( + create: Boolean, + createOptions: CreateCallOptions?, + ring: Boolean, + notify: Boolean, + ) -> Result, + private val onRejoin: suspend (reason: String) -> Unit, +) : CallJoinContract { + + private val logger by taggedLogger("CallJoinCoordinator") + + override suspend fun join( + create: Boolean, + createOptions: CreateCallOptions?, + ring: Boolean, + notify: Boolean, + ): Result { + logger.d { + "[join] #ringing; #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" + } + + with(callReInitializer) { + waitFromCleanup() + reinitialiseCoroutinesIfNeeded() + } + + // CRITICAL: Reset isDestroyed for new session + call.isDestroyed.set(false) + + val permissionPass = + client.permissionCheck.checkAndroidPermissionsGroup(client.context, call) + // Check android permissions and log a warning to make sure developers requested adequate permissions prior to using the call. + if (!permissionPass.first) { + logger.w { PERMISSION_ERROR } + } + // if we are a guest user, make sure we wait for the token before running the join flow + client.guestUserJob?.await() + + // Ensure factory is created with the current audioBitrateProfile before joining + call.ensureFactoryMatchesAudioProfile() + + // the join flow should retry up to 3 times + // if the error is not permanent + // and fail immediately on permanent errors + call.state._connection.value = RealtimeConnection.InProgress + var retryCount = 0 + + var result: Result + + call.atomicLeave = AtomicUnitCall() + while (retryCount < 3) { + result = createJoinSession(create, createOptions, ring, notify) + if (result is Success) { + // we initialise the camera, mic and other according to local + backend settings + // only when the call is joined to make sure we don't switch and override + // the settings during a call. + val settings = call.state.settings.value + if (settings != null) { + call.updateMediaManagerFromSettings(settings) + } else { + logger.w { + "[join] Call settings were null - this should never happen after a call" + + "is joined. MediaManager will not be initialised with server settings." + } + } + return result + } + if (result is Failure) { + onJoinFail() + logger.e { "Join failed with error $result" } + if (isPermanentError(result.value)) { + call.state._connection.value = RealtimeConnection.Failed(result.value) + return result + } else { + retryCount += 1 + } + } + delay(retryCount - 1 * 1000L) + } + return onJoinFailAfterAllRetries() + } + + private fun onJoinFailAfterAllRetries(): Result { + onJoinFail() + val errorMessage = "Join failed after 3 retries" + call.state._connection.value = RealtimeConnection.Failed(errorMessage) + return Failure(value = Error.GenericError(errorMessage)) + } + + @SuppressLint("VisibleForTests") + override suspend fun rejoin(reason: String) = schedule("rejoin") { + logger.d { "[rejoin] Rejoining" } + onRejoin(reason) + } + + private suspend fun schedule(key: String, block: suspend () -> Unit) { + logger.d { "[schedule] #reconnect; no args" } + streamSingleFlightProcessorImpl.run(key, block) + } + internal fun isPermanentError(error: Any): Boolean { + if (error is Error.ThrowableError) { + if (error.message.contains("Unable to resolve host")) { + return false + } + } + return true + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallMediaManager.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallMediaManager.kt new file mode 100644 index 0000000000..e6709c9a11 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallMediaManager.kt @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.android.video.generated.models.AudioSettingsResponse +import io.getstream.android.video.generated.models.CallSettingsResponse +import io.getstream.android.video.generated.models.VideoSettingsResponse +import io.getstream.log.taggedLogger +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CameraDirection +import io.getstream.video.android.core.CameraManager +import io.getstream.video.android.core.DeviceStatus +import io.getstream.video.android.core.MediaManagerImpl +import io.getstream.video.android.core.MicrophoneManager +import io.getstream.video.android.core.ScreenShareManager +import io.getstream.video.android.core.SpeakerManager +import io.getstream.video.android.core.audio.StreamAudioDevice +import io.getstream.video.android.core.call.connection.StreamPeerConnectionFactory +import io.getstream.video.android.core.model.AudioTrack +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import stream.video.sfu.models.TrackType +import kotlin.collections.find +import kotlin.collections.forEach +import kotlin.collections.mapNotNull +import kotlin.collections.toList +import kotlin.getValue +import kotlin.let + +internal class CallMediaManager( + private val call: Call, + private val mediaManagerProvider: () -> MediaManagerImpl, + private val cameraProvider: () -> CameraManager, + private val microphoneProvider: () -> MicrophoneManager, + private val speakerProvider: () -> SpeakerManager, + private val screenShareProvider: () -> ScreenShareManager, + private val peerConnectionFactoryProvider: () -> StreamPeerConnectionFactory?, + private val resetPeerConnectionFactory: () -> Unit, +) { + + private val logger by taggedLogger("CallMediaManager") + + /** + * Enables or disables the reception of incoming audio tracks for all or specified participants. + * + * This method allows selective control over whether the local client receives audio from remote participants. + * It's particularly useful in scenarios such as livestreams or group calls where the user may want to mute + * specific participants' audio without affecting the overall session. + * + * @param enabled `true` to enable (subscribe to) incoming audio, `false` to disable (unsubscribe from) it. + * @param sessionIds Optional list of participant session IDs for which to toggle incoming audio. + * If `null`, the audio setting is applied to all participants currently in the session. + */ + internal fun setIncomingAudioEnabled( + session: RtcSession?, + enabled: Boolean, + sessionIds: List? = null, + ) { + val participantTrackMap = session?.subscriber?.tracks ?: return + + val targetTracks = when { + sessionIds != null -> sessionIds.mapNotNull { participantTrackMap[it] } + else -> participantTrackMap.values.toList() + } + + targetTracks + .mapNotNull { it[TrackType.TRACK_TYPE_AUDIO] as? AudioTrack } + .forEach { it.enableAudio(enabled) } + } + + internal fun updateMediaManagerFromSettings(callSettings: CallSettingsResponse) { + // Speaker + if (call.speaker.status.value is DeviceStatus.NotSelected) { + val enableSpeaker = + if (callSettings.video.cameraDefaultOn || call.camera.status.value is DeviceStatus.Enabled) { + // if camera is enabled then enable speaker. Eventually this should + // be a new audio.defaultDevice setting returned from backend + true + } else { + callSettings.audio.defaultDevice == AudioSettingsResponse.DefaultDevice.Speaker || + callSettings.audio.speakerDefaultOn + } + + call.speaker.setEnabled(enabled = enableSpeaker) + } + + monitorHeadset() + + // Camera + if (call.camera.status.value is DeviceStatus.NotSelected) { + val defaultDirection = + if (callSettings.video.cameraFacing == VideoSettingsResponse.CameraFacing.Front) { + CameraDirection.Front + } else { + CameraDirection.Back + } + call.camera.setDirection(defaultDirection) + call.camera.setEnabled(callSettings.video.cameraDefaultOn) + } + + // Mic + if (call.microphone.status.value == DeviceStatus.NotSelected) { + val enabled = callSettings.audio.micDefaultOn + call.microphone.setEnabled(enabled) + } + } + + private fun monitorHeadset() { + call.microphone.devices.onEach { availableDevices -> + logger.d { + "[monitorHeadset] new available devices, prev selected: ${call.microphone.nonHeadsetFallbackDevice}" + } + + val bluetoothHeadset = + availableDevices.find { it is StreamAudioDevice.BluetoothHeadset } + val wiredHeadset = availableDevices.find { it is StreamAudioDevice.WiredHeadset } + + if (bluetoothHeadset != null) { + logger.d { "[monitorHeadset] BT headset selected" } + call.microphone.select(bluetoothHeadset) + } else if (wiredHeadset != null) { + logger.d { "[monitorHeadset] wired headset found" } + call.microphone.select(wiredHeadset) + } else { + logger.d { "[monitorHeadset] no headset found" } + + call.microphone.nonHeadsetFallbackDevice?.let { deviceBeforeHeadset -> + logger.d { "[monitorHeadset] before device selected" } + call.microphone.select(deviceBeforeHeadset) + } + } + }.launchIn(call.scope) + } + + /** + * Checks if the audioBitrateProfile has changed since the factory was created, + * and recreates the factory if needed. This should only be called before joining. + * + * If the factory hasn't been created yet, it will be created with the current profile + * when first accessed, so no recreation is needed. + */ + internal fun ensureFactoryMatchesAudioProfile() { + val factory = peerConnectionFactoryProvider.invoke() + + // If factory hasn't been created yet, it will be created with current profile automatically + if (factory == null) { + return + } + + // Check if current profile differs from the profile used to create the factory + val factoryProfile = factory.audioBitrateProfile + val currentProfile = mediaManagerProvider.invoke().microphone.audioBitrateProfile.value + + if (factoryProfile != null && currentProfile != factoryProfile) { + logger.i { + "Audio bitrate profile changed from $factoryProfile to $currentProfile. " + + "Recreating factory before joining." + } + recreateFactoryAndAudioTracks() + } + } + + /** + * Recreates peerConnectionFactory, audioSource, audioTrack, videoSource and videoTrack + * with the current audioBitrateProfile. This should only be called before the call is joined. + */ + internal fun recreateFactoryAndAudioTracks() { + val wasMicrophoneEnabled = microphoneProvider.invoke().status.value is DeviceStatus.Enabled + val wasCameraEnabled = cameraProvider.invoke().status.value is DeviceStatus.Enabled + + // Dispose all tracks and sources first + mediaManagerProvider.invoke().disposeTracksAndSources() + + // Recreate the factory (which will use the new audioBitrateProfile) + recreatePeerConnectionFactory() + + // Re-enable tracks if they were enabled + if (wasMicrophoneEnabled) { + // audioTrack will be recreated on next access, then we enable it + microphoneProvider.invoke().enable(fromUser = false) + } + if (wasCameraEnabled) { + // videoTrack will be recreated on next access, then we enable it + cameraProvider.invoke().enable(fromUser = false) + } + } + + /** + * Recreates peerConnectionFactory with the current audioBitrateProfile. + * This should only be called before the call is joined. + */ + internal fun recreatePeerConnectionFactory() { + peerConnectionFactoryProvider.invoke()?.dispose() + resetPeerConnectionFactory() + // Next access to peerConnectionFactory will recreate it with current profile + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallNetworkSubscriptionController.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallNetworkSubscriptionController.kt new file mode 100644 index 0000000000..ce8e35ac54 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallNetworkSubscriptionController.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.video.android.core.internal.network.NetworkStateProvider + +internal class CallNetworkSubscriptionController( + private val network: NetworkStateProvider, + private val listener: NetworkStateProvider.NetworkStateListener, +) { + + fun start() { + network.subscribe(listener) + } + + fun stop() { + network.unsubscribe(listener) + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallReInitializer.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallReInitializer.kt new file mode 100644 index 0000000000..832da2f062 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallReInitializer.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.log.taggedLogger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withTimeout +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.let +import kotlin.takeIf + +internal class CallReInitializer( + val clientScope: CoroutineScope, + val onReInitialise: () -> Unit, +) { + + private val logger by taggedLogger("CallConcurrencyManager") + internal val cleanupMutex = Mutex() + internal var cleanupJob: Job? = null + + internal var needsReinitialization = AtomicBoolean(false) // TODO Rahul should be atomic + + @Volatile + internal var currentSupervisorJob: Job = SupervisorJob() + + @Volatile + internal var currentScope: CoroutineScope = + CoroutineScope(clientScope.coroutineContext + currentSupervisorJob) + + internal suspend fun waitFromCleanup() { + val job = cleanupMutex.withLock { + cleanupJob?.takeIf { it.isActive } + } + job?.let { + logger.d { "[join] Waiting for cleanup job: $it" } + try { + withTimeout(5000) { it.join() } + logger.d { "[join] Cleanup complete" } + } catch (e: TimeoutCancellationException) { + logger.w { "[join] Cleanup timeout, proceeding anyway" } + } + + cleanupMutex.withLock { + if (cleanupJob == it) { + cleanupJob = null + } + } + } + } + + internal suspend fun reinitialiseCoroutinesIfNeeded() { + val needsReinit = cleanupMutex.withLock { + if (needsReinitialization.get()) { + needsReinitialization.set(false) + true + } else { + false + } + } + + if (needsReinit) { + reinitializeCoroutines() + } + } + + internal fun reinitializeCoroutines() { + synchronized(this) { + currentSupervisorJob = SupervisorJob() + currentScope = CoroutineScope( + clientScope.coroutineContext + currentSupervisorJob, + ) + onReInitialise() + } + } + + internal fun cleanupJobReference(newCleanupJob: Job) { + newCleanupJob.invokeOnCompletion { + currentScope.launch { + cleanupMutex.withLock { + if (newCleanupJob == cleanupJob) { + cleanupJob = null + logger.v { "[cleanupJobReference] Cleared job reference" } + } + } + } + } + } + + internal fun cleanupLockVars(newCleanupJob: Job) { + currentScope.launch { + cleanupMutex.withLock { + needsReinitialization.set(true) + cleanupJob = newCleanupJob // TODO Rahul, I cannot understand why it is assigned + logger.d { "[cleanupLockVars] Cleanup job assigned" } + } + } + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallRenderer.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallRenderer.kt new file mode 100644 index 0000000000..ae35266296 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallRenderer.kt @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.log.taggedLogger +import io.getstream.webrtc.android.ui.VideoTextureViewRenderer +import org.webrtc.EglBase +import org.webrtc.RendererCommon +import stream.video.sfu.models.TrackType +import stream.video.sfu.models.VideoDimension +import kotlin.getValue + +internal class CallRenderer { + + private val logger by taggedLogger("CallRenderer") + + internal fun initRenderer( + videoRenderer: VideoTextureViewRenderer, + sessionId: String, + trackType: TrackType, + eglBase: EglBase, + session: RtcSession?, + onRendered: (VideoTextureViewRenderer) -> Unit = {}, + viewportId: String = sessionId, + ) { + logger.d { "[initRenderer] #sfu; #track; sessionId: $sessionId" } + + // Note this comes from the shared eglBase + videoRenderer.init( + eglBase.eglBaseContext, + object : RendererCommon.RendererEvents { + override fun onFirstFrameRendered() { + val width = videoRenderer.measuredWidth + val height = videoRenderer.measuredHeight + logger.i { + "[initRenderer.onFirstFrameRendered] #sfu; #track; " + + "trackType: $trackType, dimension: ($width - $height), " + + "sessionId: $sessionId" + } + if (trackType != TrackType.TRACK_TYPE_SCREEN_SHARE) { + session?.updateTrackDimensions( + sessionId, + trackType, + true, + VideoDimension(width, height), + viewportId, + ) + } + onRendered(videoRenderer) + } + + override fun onFrameResolutionChanged( + videoWidth: Int, + videoHeight: Int, + rotation: Int, + ) { + val width = videoRenderer.measuredWidth + val height = videoRenderer.measuredHeight + logger.v { + "[initRenderer.onFrameResolutionChanged] #sfu; #track; " + + "trackType: $trackType, " + + "viewport size: ($width - $height), " + + "video size: ($videoWidth - $videoHeight), " + + "sessionId: $sessionId" + } + + if (trackType != TrackType.TRACK_TYPE_SCREEN_SHARE) { + session?.updateTrackDimensions( + sessionId, + trackType, + true, + VideoDimension(width, height), + viewportId, + ) + } + } + }, + ) + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallSessionManager.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallSessionManager.kt new file mode 100644 index 0000000000..4b481fcf69 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallSessionManager.kt @@ -0,0 +1,390 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import android.annotation.SuppressLint +import android.content.Context.POWER_SERVICE +import android.os.PowerManager +import androidx.lifecycle.AtomicReference +import io.getstream.android.video.generated.models.JoinCallResponse +import io.getstream.log.taggedLogger +import io.getstream.result.Error +import io.getstream.result.Result +import io.getstream.result.Result.Failure +import io.getstream.result.Result.Success +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CreateCallOptions +import io.getstream.video.android.core.RealtimeConnection +import io.getstream.video.android.core.StreamVideoClient +import io.getstream.video.android.core.model.toIceServer +import io.getstream.video.android.core.utils.StreamSingleFlightProcessorImpl +import io.getstream.video.android.core.utils.safeCallWithDefault +import stream.video.sfu.event.ReconnectDetails +import stream.video.sfu.models.WebsocketReconnectStrategy +import java.util.UUID +import kotlin.collections.map +import kotlin.let +import kotlin.toString + +internal class CallSessionManager( + private val call: Call, + private val clientImpl: StreamVideoClient, + private val testInstanceProvider: Call.Companion.TestInstanceProvider, + private val streamSingleFlightProcessorImpl: StreamSingleFlightProcessorImpl, +) { + private val logger by taggedLogger("CallSessionManager") + private var powerManager: PowerManager? = null + + /** Session handles all real time communication for video and audio */ + internal var session: AtomicReference = AtomicReference(null) + internal var sessionId: AtomicReference = AtomicReference(UUID.randomUUID().toString()) + + // TODO Rahul, these variables could be atomicInt or AtomicLong, not sure yet + internal var reconnectAttempts = 0 + internal var reconnectStartTime = 0L + internal var connectStartTime = 0L + + private val callConnectivityMonitorState = CallConnectivityMonitorState() + internal val network by lazy { clientImpl.coordinatorConnectionModule.networkStateProvider } + private val callStatsReporter = CallStatsReporter(call) + private val callConnectivityMonitor = CallConnectivityMonitor( + call.restartableProducerScope, + callConnectivityMonitorState, + clientImpl.leaveAfterDisconnectSeconds, + { + fastReconnect("NetworkStateListener#onConnected") + }, + { + rejoin("NetworkStateListener#onConnected") + }, + { + call.state._connection.value = RealtimeConnection.Reconnecting + }, + { call.leave() }, + ) + + val sfuEventMonitor = + CallSfuEventMonitor( + call.restartableProducerScope, + { session.get() }, + callConnectivityMonitorState, + ) + val iceConnectionMonitor = + CallIceConnectionMonitor(call.restartableProducerScope, { session.get() }) + val networkSubscriptionController = + CallNetworkSubscriptionController(network, callConnectivityMonitor.listener) + + init { + powerManager = safeCallWithDefault(null) { + clientImpl.context.getSystemService(POWER_SERVICE) as? PowerManager + } + } + + @SuppressLint("VisibleForTests") + internal suspend fun _join( + create: Boolean = false, + createOptions: CreateCallOptions? = null, + ring: Boolean = false, + notify: Boolean = false, + ): Result { + reconnectAttempts = 0 + sfuEventMonitor.stop() + + if (session.get() != null) { + return Failure(Error.GenericError("Call $call.cid has already been joined")) + } + logger.d { + "[joinInternal] #track; create: $create, ring: $ring, notify: $notify, createOptions: $createOptions" + } + + connectStartTime = System.currentTimeMillis() + + // step 1. call the join endpoint to get a list of SFUs + val locationResult = clientImpl.getCachedLocation() + if (locationResult !is Success) { + return locationResult as Failure + } + call.location = locationResult.value + + val result = call.joinRequest( + getOptions(create, createOptions), + locationResult.value, + ring = ring, + notify = notify, + ) + + if (result !is Success) { + return result as Failure + } + + try { + val localSession = createJoinRtcSessionInner(result.value) + session.set(localSession) + call.state._connection.value = RealtimeConnection.Joined(localSession) + localSession.connect() + } catch (e: Exception) { + return Failure(Error.GenericError(e.message ?: "RtcSession error occurred.")) + } + + clientImpl.state.setActiveCall(call) + monitorSession(result.value) + return Success(value = session.get()!!) + } + + internal fun getOptions( + create: Boolean = false, + createOptions: CreateCallOptions? = null, + ): CreateCallOptions? { + return createOptions ?: if (create) { + CreateCallOptions() + } else { + null + } + } + + suspend fun fastReconnect(reason: String) = schedule("fast") { + logger.d { + "[fastReconnect] Reconnecting, reconnectAttempts:$reconnectAttempts" + } + session.get()?.prepareReconnect() + call.state._connection.value = RealtimeConnection.Reconnecting + if (session.get() != null) { + reconnectStartTime = System.currentTimeMillis() + + val session = session.get()!! + val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo() + val reconnectDetails = ReconnectDetails( + previous_session_id = prevSessionId, + strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_FAST, + announced_tracks = publishingInfo, + subscriptions = subscriptionsInfo, + reconnect_attempt = reconnectAttempts, + reason = reason, + ) + session.fastReconnect(reconnectDetails) + val oldSessionStats = callStatsReporter.collectStats(session) + session.sendCallStats(oldSessionStats) + } else { + logger.d { "[fastReconnect] [RealtimeConnection.Disconnected], call_id:${call.id}" } + call.state._connection.value = RealtimeConnection.Disconnected + } + } + + @SuppressLint("VisibleForTests") + internal suspend fun rejoin(reason: String) = schedule("rejoin") { + logger.d { "[rejoin] Rejoining" } + reconnectAttempts++ + call.state._connection.value = RealtimeConnection.Reconnecting + call.location?.let { + reconnectStartTime = System.currentTimeMillis() + + val joinResponse = call.joinRequest(location = it) + if (joinResponse is Success) { + replaceSession(joinResponse.value, reason) + } else { + logger.e { + "[rejoin] Failed to get a join response ${joinResponse.errorOrNull()}" + } + call.state._connection.value = RealtimeConnection.Reconnecting + } + } + } + internal fun monitorSession(result: JoinCallResponse) { + callStatsReporter.startCallStatsReporting( + session.get(), + result.statsOptions.reportingIntervalMs.toLong(), + ) + sfuEventMonitor.start() + iceConnectionMonitor.start() + networkSubscriptionController.start() + } + + suspend fun migrate() = schedule("migrate") { + logger.d { "[migrate] Migrating" } + call.state._connection.value = RealtimeConnection.Migrating + call.location?.let { + reconnectStartTime = System.currentTimeMillis() + + val joinResponse = call.joinRequest(location = it) + if (joinResponse is Success) { + // switch to the new SFU + val cred = joinResponse.value.credentials + val session = this.session.get()!! + val currentOptions = this.session.get()?.publisher?.currentOptions() + val oldSfuUrl = session.sfuUrl + logger.i { "Rejoin SFU $oldSfuUrl to ${cred.server.url}" } + + this.sessionId.set(UUID.randomUUID().toString()) + val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo() + val reconnectDetails = ReconnectDetails( + previous_session_id = prevSessionId, + strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_MIGRATE, + announced_tracks = publishingInfo, + subscriptions = subscriptionsInfo, + from_sfu_id = oldSfuUrl, + reconnect_attempt = reconnectAttempts, + ) + session.prepareRejoin() + try { + val newSession = RtcSession( + clientImpl, + reconnectAttempts, + powerManager, + call, + sessionId.get(), + clientImpl.apiKey, + clientImpl.coordinatorConnectionModule.lifecycle, + cred.server.url, + cred.server.wsEndpoint, + cred.token, + cred.iceServers.map { ice -> + ice.toIceServer() + }, + ) + val oldSession = this.session.get() + this.session.set(newSession) + newSession.connect(reconnectDetails, currentOptions) + monitorSession(joinResponse.value) + oldSession?.leaveWithReason("migrating") + oldSession?.cleanup() + } catch (ex: Exception) { + logger.e(ex) { + "[switchSfu] Failed to join during " + + "migration - Error ${ex.message}" + } + call.state._connection.value = RealtimeConnection.Failed(ex) + } + } else { + logger.e { + "[switchSfu] Failed to get a join response during " + + "migration - falling back to reconnect. Error ${joinResponse.errorOrNull()}" + } + call.state._connection.value = RealtimeConnection.Reconnecting + } + } + } + + fun createJoinRtcSessionInner(result: JoinCallResponse): RtcSession { + return if (testInstanceProvider.rtcSessionCreator != null) { + testInstanceProvider.rtcSessionCreator!!.invoke() + } else { + RtcSession( + sessionId = this.sessionId.get(), + apiKey = clientImpl.apiKey, + lifecycle = clientImpl.coordinatorConnectionModule.lifecycle, + client = clientImpl, + call = call, + sfuUrl = result.credentials.server.url, + sfuWsUrl = result.credentials.server.wsEndpoint, + sfuToken = result.credentials.token, + remoteIceServers = result.credentials.iceServers.map { it.toIceServer() }, + powerManager = powerManager, + ) + } + } + + fun createRejoinSession(joinResponse: JoinCallResponse): RtcSession { + val cred = joinResponse.credentials + return RtcSession( + clientImpl, + reconnectAttempts, + powerManager, + call, + sessionId.get(), + clientImpl.apiKey, + clientImpl.coordinatorConnectionModule.lifecycle, + cred.server.url, + cred.server.wsEndpoint, + cred.token, + cred.iceServers.map { ice -> + ice.toIceServer() + }, + ) + } + + fun createReconnectDetails(session: RtcSession, reason: String): ReconnectDetails { + val (prevSessionId, subscriptionsInfo, publishingInfo) = session.currentSfuInfo() + return ReconnectDetails( + previous_session_id = prevSessionId, + strategy = WebsocketReconnectStrategy.WEBSOCKET_RECONNECT_STRATEGY_REJOIN, + announced_tracks = publishingInfo, + subscriptions = subscriptionsInfo, + reconnect_attempt = reconnectAttempts, + reason = reason, + ) + } + + suspend fun replaceSession(joinResponse: JoinCallResponse, reason: String) { + // switch to the new SFU + val cred = joinResponse.credentials + val oldSession = this.session.get()!! + val oldSessionStats = callStatsReporter.collectStats(session.get()) + val currentOptions = this.session.get()?.publisher?.currentOptions() + logger.i { "Rejoin SFU ${oldSession?.sfuUrl} to ${cred.server.url}" } + + this.sessionId.set(UUID.randomUUID().toString()) + val (prevSessionId, _, _) = oldSession.currentSfuInfo() + val reconnectDetails = createReconnectDetails(oldSession, reason) + call.state.removeParticipant(prevSessionId) + oldSession.prepareRejoin() + try { + val localSession = createRejoinSession(joinResponse) + this.session.set(localSession) + localSession.connect(reconnectDetails, currentOptions) + localSession.sfuTracer.trace("rejoin", reason) + oldSession.sendCallStats(oldSessionStats) + oldSession.leaveWithReason("Rejoin :: $reason") + oldSession.cleanup() + monitorSession(joinResponse) + } catch (ex: Exception) { + logger.e(ex) { + "[rejoin] Failed to join response with ex: ${ex.message}" + } + call.state._connection.value = RealtimeConnection.Failed(ex) + } + } + + private suspend fun schedule(key: String, block: suspend () -> Unit) { + logger.d { "[schedule] #reconnect; no args, key: $key" } + val result = streamSingleFlightProcessorImpl.run(key, block) + result.onSuccess { logger.d { "[schedule] success #reconnect; no args, key: $key" } } + .onFailure { + logger.d { "[schedule] fail with${result.exceptionOrNull()} #reconnect; no args, key: $key" } + } + } + + fun cleanup() { + session.get()?.cleanup() + session.set(null) + } + + fun cleanupMonitor() { + iceConnectionMonitor.stop() + sfuEventMonitor.stop() + } + + fun reset() { + this.session.set(null) + this.sessionId.set(UUID.randomUUID().toString()) + reconnectAttempts = 0 + reconnectStartTime = 0L + connectStartTime = 0L + streamSingleFlightProcessorImpl.stop() + streamSingleFlightProcessorImpl.reset() + callConnectivityMonitor.reset() + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallSfuEventMonitor.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallSfuEventMonitor.kt new file mode 100644 index 0000000000..79fdde5a7e --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallSfuEventMonitor.kt @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.log.taggedLogger +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope +import io.getstream.video.android.core.events.JoinCallResponseEvent +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch + +internal class CallSfuEventMonitor( + private val scope: RestartableProducerScope, + private val sessionProvider: () -> RtcSession?, + private val connectivityState: CallConnectivityMonitorState, +) { + + private val logger by taggedLogger("CallSfuEventMonitor") + + private var sfuEventsJob: Job? = null + + fun start() { + stop() + + sfuEventsJob = scope.launch { + sessionProvider()?.socket?.events()?.collect { event -> + if (event is JoinCallResponseEvent) { + connectivityState.reconnectDeadlineMils = + event.fastReconnectDeadlineSeconds * 1000 + + logger.d { + "[SFU] reconnect deadline = " + + "${connectivityState.reconnectDeadlineMils / 1000}s" + } + } + } + } + } + + fun stop() { + sfuEventsJob?.cancel() + sfuEventsJob = null + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallStatsReporter.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallStatsReporter.kt new file mode 100644 index 0000000000..130d7c47c8 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/CallStatsReporter.kt @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.call + +import io.getstream.video.android.core.Call +import io.getstream.video.android.core.CallStatsReport +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlin.collections.plus +import kotlin.collections.takeLast + +internal class CallStatsReporter(private val call: Call) { + + private var callStatsReportingJob: Job? = null + + internal suspend fun collectStats(session: RtcSession?): CallStatsReport { + val publisherStats = session?.getPublisherStats() + val subscriberStats = session?.getSubscriberStats() + call.state.stats.updateFromRTCStats(publisherStats, isPublisher = true) + call.state.stats.updateFromRTCStats(subscriberStats, isPublisher = false) + call.state.stats.updateLocalStats() + val local = call.state.stats._local.value + + val report = CallStatsReport( + publisher = publisherStats, + subscriber = subscriberStats, + local = local, + stateStats = call.state.stats, + ) + + call.statsReport.value = report + call.statLatencyHistory.value + report.stateStats.publisher.latency.value + if (call.statLatencyHistory.value.size > 20) { + call.statLatencyHistory.value = call.statLatencyHistory.value.takeLast(20) + } + return report + } + + internal fun startCallStatsReporting(session: RtcSession?, reportingIntervalMs: Long = 10_000) { + cancelJobs() + callStatsReportingJob = call.scope.launch { + // Wait a bit before we start capturing stats + delay(reportingIntervalMs) + + while (isActive) { + delay(reportingIntervalMs) + session?.sendCallStats( + report = collectStats(session), + ) + } + } + } + + internal fun cancelJobs() { + callStatsReportingJob?.cancel() + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt index 28d7b31944..0bc011aecb 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProvider.kt @@ -40,4 +40,10 @@ internal interface ScopeProvider { * Cleans up resources when the provider is no longer needed. */ fun cleanup() + + /** + * Resets the provider to allow reuse after cleanup. + * This clears the cleanup flag and allows executors to be recreated. + */ + fun reset() } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt index 50a80c90ee..8f39f9829a 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/scope/ScopeProviderImpl.kt @@ -97,4 +97,10 @@ internal class ScopeProviderImpl( executor?.shutdown() executor = null } + + override fun reset() { + logger.d { "Resetting ScopeProvider to allow reuse" } + isCleanedUp = false + // executor is already null after cleanup, will be recreated on next use + } } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/coroutines/flows/RestartableStateFlow.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/coroutines/flows/RestartableStateFlow.kt new file mode 100644 index 0000000000..daf11786e1 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/coroutines/flows/RestartableStateFlow.kt @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.coroutines.flows + +import io.getstream.video.android.core.coroutines.scopes.RestartableProducerScope +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.launch + +/** + * A [StateFlow] implementation whose upstream collection can be safely restarted + * when the underlying call coroutine scope is cancelled and recreated. + * + * ## Why this exists + * The standard `stateIn(scope, ...)` operator permanently binds upstream collection + * to a single [CoroutineScope]. When that scope is cancelled (for example after + * `call.leave()`), the StateFlow stops updating forever and cannot be restarted. + * + * In this SDK, call- and participant-level state must survive leave/join cycles, + * while the call coroutine scope is intentionally cancelled and recreated. + * + * ## How this replaces `stateIn` + * Instead of tying upstream collection to a fixed scope, [RestartableStateFlow] + * separates: + * - **State storage** (a stable [StateFlow] exposed to clients) + * - **Producer lifecycle** (a coroutine collecting the upstream Flow) + * + * The producer coroutine is started and restarted via [RestartableProducerScope] + * whenever a new call scope is attached, while the StateFlow instance itself + * remains stable. + * + * ## Example + * + * ### ❌ Using `stateIn` (unsafe with reusable calls) + * ```kotlin + * val duration: StateFlow = + * durationInMs + * .map { it?.toDuration(DurationUnit.SECONDS) } + * .stateIn(call.scope, SharingStarted.WhileSubscribed(), null) + * ``` + * When `call.scope` is cancelled, `duration` stops updating permanently. + * + * ### ✅ Using [RestartableStateFlow] (safe) + * ```kotlin + * val duration: StateFlow = + * RestartableStateFlow( + * initialValue = null, + * upstream = durationInMs.map { it?.toDuration(DurationUnit.SECONDS) }, + * scope = restartableProducerScope + * ) + * ``` + * When the call scope is recreated, upstream collection is restarted automatically + * and `duration` continues emitting values without requiring resubscription. + * + * ## Key guarantees + * - The exposed [StateFlow] instance never changes + * - Existing collectors do not need to resubscribe + * - Upstream collection is safely restarted across call lifecycle changes + * + * This class is intended for internal use where state must outlive coroutine scopes. + */ +@OptIn(ExperimentalForInheritanceCoroutinesApi::class) +internal class RestartableStateFlow( + upstream: Flow, + scope: RestartableProducerScope, + initialValue: T, + started: SharingStarted = SharingStarted.WhileSubscribed(), +) : StateFlow { + + private val state = MutableStateFlow(initialValue) + + init { + scope.onAttach { realScope -> + realScope.launch { + upstream + .shareIn( + scope = this, + started = started, + replay = 1, + ) + .collect { state.value = it } + } + } + } + + override val value: T get() = state.value + override val replayCache: List get() = state.replayCache + override suspend fun collect(collector: FlowCollector): Nothing { + state.collect(collector) + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/coroutines/scopes/RestartableProducerScope.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/coroutines/scopes/RestartableProducerScope.kt new file mode 100644 index 0000000000..2abb75fb18 --- /dev/null +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/coroutines/scopes/RestartableProducerScope.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2014-2026 Stream.io Inc. All rights reserved. + * + * Licensed under the Stream License; + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/GetStream/stream-video-android/blob/main/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.getstream.video.android.core.coroutines.scopes + +import io.getstream.video.android.core.internal.InternalStreamVideoApi +import kotlinx.coroutines.CoroutineScope +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** + * A lifecycle-aware producer scope used to run long-lived or state-producing coroutines + * that must survive call leave/join cycles. + * + * Unlike a regular [CoroutineScope], this scope does not represent a fixed lifecycle. + * Instead, it forwards coroutine execution to the currently active call scope and + * allows producers to be explicitly restarted when the call scope is recreated. + * + * This is required to safely reuse call- and participant-level state after `call.leave()`, + * where the original coroutine scope is cancelled and replaced. + * + * This parameter is intended for internal use only. + */ +@InternalStreamVideoApi +internal class RestartableProducerScope : CoroutineScope { + + @Volatile + private var currentScope: CoroutineScope? = null + + private val onAttachCallbacks = mutableListOf<(CoroutineScope) -> Unit>() + + fun attach(scope: CoroutineScope) { + currentScope = scope + onAttachCallbacks.forEach { it(scope) } + } + + fun detach() { + currentScope = null + } + + override val coroutineContext: CoroutineContext + get() = currentScope?.coroutineContext ?: EmptyCoroutineContext + + fun onAttach(block: (CoroutineScope) -> Unit) { + onAttachCallbacks += block + currentScope?.let { block(it) } // start immediately if already attached + } +} diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StateFlow.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StateFlow.kt index 90f64a18ef..c6ae0ac02d 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StateFlow.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StateFlow.kt @@ -24,7 +24,7 @@ import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.stateIn +import kotlinx.coroutines.flow.stateIn // TODO Rahul Need to migrate to RestartableStateFlow /** * Does not produce the same value in a raw, so respect "distinct until changed emissions" diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StreamSingleFlightProcessorImpl.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StreamSingleFlightProcessorImpl.kt index 6918e50390..e0c91781fd 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StreamSingleFlightProcessorImpl.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/utils/StreamSingleFlightProcessorImpl.kt @@ -107,4 +107,8 @@ internal class StreamSingleFlightProcessorImpl( clear(cancelRunning = true).getOrThrow() } } + + fun reset() { + closed.set(false) + } } diff --git a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.kt b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.kt index db537078ea..816751e5c3 100644 --- a/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.kt +++ b/stream-video-android-core/src/test/kotlin/io/getstream/video/android/core/reconnect/ReconnectAttemptsCountTest.kt @@ -37,7 +37,7 @@ class ReconnectAttemptsCountTest : IntegrationTestBase() { // Rejoin call.rejoin() - assertEquals(1, call.reconnectAttepmts) + assertEquals(1, call.reconnectAttempts) } @Test @@ -49,7 +49,7 @@ class ReconnectAttemptsCountTest : IntegrationTestBase() { // Rejoin call.fastReconnect() - assertEquals(0, call.reconnectAttepmts) + assertEquals(0, call.reconnectAttempts) } @Test @@ -62,6 +62,6 @@ class ReconnectAttemptsCountTest : IntegrationTestBase() { // Rejoin call.rejoin() call.rejoin() - assertEquals(2, call.reconnectAttepmts) + assertEquals(2, call.reconnectAttempts) } } diff --git a/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt b/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt index 61f8956e29..4b4e662a81 100644 --- a/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt +++ b/stream-video-android-previewdata/src/main/kotlin/io/getstream/video/android/mock/StreamPreviewDataUtils.kt @@ -48,6 +48,7 @@ public object StreamPreviewDataUtils { } /** Mock a [Call] that contains a mock user. */ +@Suppress("DEPRECATION_ERROR") public val previewCall: Call = Call( client = StreamPreviewDataUtils.streamVideo, type = "default", @@ -115,6 +116,7 @@ public val previewUsers: List ) /** Mock a new list of [ParticipantState]. */ +@Suppress("DEPRECATION_ERROR") public val previewParticipantsList: List inline get() { val participants = arrayListOf()