diff --git a/.changeset/neat-rabbits-draw.md b/.changeset/neat-rabbits-draw.md new file mode 100644 index 000000000..4283da9aa --- /dev/null +++ b/.changeset/neat-rabbits-draw.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +fix: resume joinContinuation when LEAVE received during reconnect handshake to avoid reconnection loop hanging issue diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index 65c0cc500..913481df8 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -30,6 +30,7 @@ import io.livekit.android.util.Either import io.livekit.android.util.LKLog import io.livekit.android.util.toHttpUrl import io.livekit.android.util.toWebsocketUrl +import io.livekit.android.util.withDeadline import io.livekit.android.webrtc.toProtoSessionDescription import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.CoroutineDispatcher @@ -63,6 +64,7 @@ import javax.inject.Inject import javax.inject.Named import javax.inject.Singleton import kotlin.coroutines.resumeWithException +import kotlin.time.Duration.Companion.milliseconds /** * SignalClient to LiveKit WS servers @@ -186,17 +188,19 @@ constructor( .addHeader("Authorization", "Bearer $token") .build() - return suspendCancellableCoroutine { cont -> - // Wait for join response through WebSocketListener - joinContinuation = cont - cont.invokeOnCancellation { - // If the coroutine is cancelled, websocket needs to be cancelled. - // onFailure will handle cleanup. - LKLog.v { "connect cancelled, abort websocket" } - joinContinuation = null - currentWs?.cancel() + return withDeadline(SIGNAL_CONNECT_TIMEOUT.milliseconds) { + suspendCancellableCoroutine { cont -> + // Wait for join response through WebSocketListener + joinContinuation = cont + cont.invokeOnCancellation { + // If the coroutine is cancelled, websocket needs to be cancelled. + // onFailure will handle cleanup. + LKLog.v { "connect cancelled, abort websocket" } + joinContinuation = null + currentWs?.cancel() + } + currentWs = websocketFactory.newWebSocket(request, this@SignalClient) } - currentWs = websocketFactory.newWebSocket(request, this@SignalClient) } } @@ -691,6 +695,11 @@ constructor( } else if (response.hasLeave()) { // Some reconnects may immediately send leave back without a join response first. handleSignalResponseImpl(ws, response) + val cont = joinContinuation + joinContinuation = null + cont?.resumeWithException( + RoomException.ConnectException("Received leave during reconnect: ${response.leave.reason}"), + ) } else if (isReconnecting) { // When reconnecting, any message received means signal reconnected. // Newer servers will send a reconnect response first @@ -977,6 +986,7 @@ constructor( // iceServer("stun:stun3.l.google.com:19302"), // iceServer("stun:stun4.l.google.com:19302"), ) + private const val SIGNAL_CONNECT_TIMEOUT = 10000 const val CLOSE_REASON_NORMAL_CLOSURE = 1000 const val CLOSE_REASON_PING_TIMEOUT = 3000 const val CLOSE_REASON_WEBSOCKET_FAILURE = 3500 diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt index 7c3acca09..783342807 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RTCEngineMockE2ETest.kt @@ -407,7 +407,10 @@ class RTCEngineMockE2ETest : MockE2ETest() { connect() wsFactory.listener.onFailure(wsFactory.ws, Exception(), null) - testScheduler.advanceUntilIdle() + testScheduler.advanceTimeBy(1000) + wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + simulateMessageFromServer(TestData.RECONNECT) + val sid = wsFactory.request.url.queryParameter(SignalClient.CONNECT_QUERY_PARTICIPANT_SID) assertEquals(TestData.JOIN.join.participant.sid, sid) } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RoomMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RoomMockE2ETest.kt index 860cea187..0909ea41c 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RoomMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RoomMockE2ETest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -345,13 +345,17 @@ class RoomMockE2ETest : MockE2ETest() { callback.onAvailable(network) } - coroutineRule.dispatcher.scheduler.advanceUntilIdle() + coroutineRule.dispatcher.scheduler.advanceTimeBy(1000) + wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + simulateMessageFromServer(TestData.RECONNECT) + val events = eventCollector.stopCollecting() assertEquals( listOf( ConnectionState.CONNECTED, ConnectionState.RESUMING, + ConnectionState.CONNECTED, ), events, ) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt index e1efb6b91..1b1742bd3 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2025 LiveKit, Inc. + * Copyright 2023-2026 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import io.livekit.android.test.mock.MockDataChannel import io.livekit.android.test.mock.MockMediaStream import io.livekit.android.test.mock.MockRtpReceiver import io.livekit.android.test.mock.MockVideoStreamTrack +import io.livekit.android.test.mock.SignalRequestHandler import io.livekit.android.test.mock.TestData import io.livekit.android.test.mock.createMediaStreamId import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack @@ -197,8 +198,28 @@ class RoomReconnectionMockE2ETest : MockE2ETest() { fun softReconnectResendsPackets() = runTest { room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT) + val publisherOfferHandler: SignalRequestHandler = { request -> + if (request.hasOffer()) { + val answer = with(LivekitRtc.SignalResponse.newBuilder()) { + answer = with(LivekitRtc.SessionDescription.newBuilder()) { + sdp = "remote_answer" + type = "answer" + id = request.offer.id + build() + } + build() + } + wsFactory.receiveMessage(answer) + true + } else { + false + } + } + wsFactory.registerSignalRequestHandler(publisherOfferHandler) connect() + val lastMessageSeq = TestData.RECONNECT.reconnect.lastMessageSeq + for (i in 1..5) { assertTrue(room.localParticipant.publishData(ByteArray(i), reliability = DataPublishReliability.RELIABLE).isSuccess) } @@ -212,7 +233,8 @@ class RoomReconnectionMockE2ETest : MockE2ETest() { val pubPeerConnection = getPublisherPeerConnection() val pubDataChannel = pubPeerConnection.dataChannels[RTCEngine.RELIABLE_DATA_CHANNEL_LABEL] as MockDataChannel - assertEquals(5, pubDataChannel.sentBuffers.size) + val expectedResentCount = (1..5).count { it > lastMessageSeq } + assertEquals(5 + expectedResentCount, pubDataChannel.sentBuffers.size) } @Test diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt index 262def2fa..a10dfe3fa 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/SignalClientTest.kt @@ -33,6 +33,7 @@ import kotlinx.coroutines.async import kotlinx.coroutines.supervisorScope import kotlinx.coroutines.yield import kotlinx.serialization.json.Json +import livekit.LivekitModels import livekit.LivekitRtc import livekit.org.webrtc.SessionDescription import okhttp3.OkHttpClient @@ -429,6 +430,77 @@ class SignalClientTest : BaseTest() { assertTrue(originalWs.isClosed) } + /** + * Reconnect fails cleanly when server sends LEAVE as the initial response and closes WS cleanly. + */ + @Test + fun reconnectDoesNotHangOnLeaveStateMismatchWithCleanClose() = runTest { + val joinJob = async { client.join(EXAMPLE_URL, "") } + connectWebsocketAndJoin() + joinJob.await() + + wsFactory.ws.cancel() + + supervisorScope { + val reconnectJob = async { + client.reconnect(EXAMPLE_URL, "", "participant_sid") + } + yield() + + client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + val leaveResponse = with(LivekitRtc.SignalResponse.newBuilder()) { + leave = with(LivekitRtc.LeaveRequest.newBuilder()) { + reason = LivekitModels.DisconnectReason.STATE_MISMATCH + action = LivekitRtc.LeaveRequest.Action.RECONNECT + build() + } + build() + } + wsFactory.receiveMessage(leaveResponse) + + wsFactory.ws.close(1000, "normal") + + val result = runCatching { reconnectJob.await() } + assertTrue("reconnect should have failed", result.isFailure) + } + } + + /** + * Reconnect fails cleanly when server sends LEAVE as the initial response and drops the connection. + */ + @Test + fun reconnectDoesNotHangOnLeaveStateMismatchWithFailure() = runTest { + val joinJob = async { client.join(EXAMPLE_URL, "") } + connectWebsocketAndJoin() + joinJob.await() + + wsFactory.ws.cancel() + + supervisorScope { + val reconnectJob = async { + client.reconnect(EXAMPLE_URL, "", "participant_sid") + } + yield() + + client.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) + val leaveResponse = with(LivekitRtc.SignalResponse.newBuilder()) { + leave = with(LivekitRtc.LeaveRequest.newBuilder()) { + reason = LivekitModels.DisconnectReason.STATE_MISMATCH + action = LivekitRtc.LeaveRequest.Action.RECONNECT + build() + } + build() + } + wsFactory.receiveMessage(leaveResponse) + + // 5. Server drops connection (unclean close) + wsFactory.ws.cancel() + + val result = runCatching { reconnectJob.await() } + assertTrue("reconnect should have failed", result.isFailure) + } + } + // mock data companion object }