From d84da5bdc004060064088fec141fd7db7dc2a568 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 2 Mar 2026 10:58:20 +0100 Subject: [PATCH 1/2] Adding a test --- .../HTTP1ClientChannelHandlerTests.swift | 115 ++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index 401c14ff0..f0f278687 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -947,6 +947,121 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { ) } + func testDemandResponseBodyStreamAfterEarlyResponseDoesNotCrash() async throws { + // This test reproduces a crash where `demandMoreResponseBodyParts()` was called on the + // state machine after it had already transitioned to `.idle`. The scenario is: + // + // 1. A streaming POST request is in progress + // 2. The full response (head + end) arrives before the request body is finished + // 3. The response end is forwarded with `finalAction: .none` (body still uploading) + // 4. The request body stream finishes -> `requestStreamFinished` -> state machine + // transitions to `.idle` and returns `.sendRequestEnd(.informConnectionIsIdle)` + // 5. The `.sendRequestEnd` handler writes `.end` to the channel and registers a + // callback on the write promise + // 6. Before the write callback fires (write hasn't completed yet), + // `demandResponseBodyStream` is called (from a delegate on another event loop) + // 7. Without the fix, `self.request` was still set (only nilled in the write + // callback), so the guard passed and `demandMoreResponseBodyParts()` hit + // `fatalError("Invalid state: idle")` + // + // The fix nils out `self.request` synchronously in `.sendRequestEnd` (before the + // write callback), so the guard in `demandResponseBodyStream0` fails and returns early. + + final class DelayEndHandler: ChannelOutboundHandler { + typealias OutboundIn = HTTPClientRequestPart + typealias OutboundOut = HTTPClientRequestPart + + private(set) var endPromise: EventLoopPromise? + + func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { + if case .end = self.unwrapOutboundIn(data) { + self.endPromise = promise + context.write(data, promise: nil) + } else { + context.write(data, promise: promise) + } + } + } + + let eventLoop = EmbeddedEventLoop() + let delayEndHandler = DelayEndHandler() + let handler = HTTP1ClientChannelHandler( + eventLoop: eventLoop, + backgroundLogger: Logger(label: "no-op", factory: SwiftLogNoOpLogHandler.init), + connectionIdLoggerMetadata: "test connection" + ) + var connectionIsIdle = false + handler.onConnectionIdle = { connectionIsIdle = true } + let channel = EmbeddedChannel(handlers: [delayEndHandler, handler], loop: eventLoop) + XCTAssertNoThrow(try channel.connect(to: .init(ipAddress: "127.0.0.1", port: 80)).wait()) + + let request = MockHTTPExecutableRequest( + head: .init(version: .http1_1, method: .POST, uri: "http://localhost/"), + framingMetadata: RequestFramingMetadata(connectionClose: false, body: .stream), + raiseErrorIfUnimplementedMethodIsCalled: false + ) + + let executor = handler.requestExecutor + + // When the body stream is resumed, write one part but do NOT finish the stream yet. + request.resumeRequestBodyStreamCallback = { + executor.writeRequestBodyPart(.byteBuffer(.init(string: "Hello")), request: request, promise: nil) + } + + // Start the request + channel.write(request, promise: nil) + + // Verify request head was sent + XCTAssertEqual(try channel.readOutbound(as: HTTPClientRequestPart.self), .head(request.requestHead)) + // Verify body part was sent + XCTAssertEqual( + try channel.readOutbound(as: HTTPClientRequestPart.self), + .body(.byteBuffer(.init(string: "Hello"))) + ) + + // Now send the full response while the request body stream is still open. + // This causes forwardResponseEnd with finalAction: .none (body not done yet). + XCTAssertNoThrow(try channel.writeInbound(HTTPClientResponsePart.head(.init(version: .http1_1, status: .ok)))) + // Issue a read to advance the response stream state so it accepts the end properly. + channel.read() + XCTAssertNoThrow(try channel.writeInbound(HTTPClientResponsePart.end(nil))) + + // Finish the request body stream. This transitions the state machine to `.idle` + // and writes `.end` to the channel. The DelayEndHandler intercepts the `.end` + // write and holds the promise, preventing the write callback from firing. + executor.finishRequestBodyStream(trailers: nil, request: request, promise: nil) + + // Verify the .end was written through to the channel + XCTAssertEqual(try channel.readOutbound(as: HTTPClientRequestPart.self), .end(nil)) + + // At this point: + // - The state machine has transitioned to `.idle` + // - The write promise has NOT been fulfilled (held by DelayEndHandler) + // - In old code: self.request is still set (only nilled in the write callback) + // - In fixed code: self.request is already nil (nilled synchronously) + + // Now call demandResponseBodyStream, simulating a delegate on a different event + // loop calling it after receiving the response end but before the write completes. + // Without the fix, self.request is still set, the guard passes, and + // state.demandMoreResponseBodyParts() crashes with "Invalid state: idle". + // With the fix, self.request was already nilled, the guard fails, and this is a no-op. + executor.demandResponseBodyStream(request) + + // Complete the delayed write to clean up properly. + delayEndHandler.endPromise?.succeed(()) + eventLoop.run() + + XCTAssertTrue(connectionIsIdle) + + XCTAssertEqual( + request.events.map(\.kind), + [ + .willExecuteRequest, .requestHeadSent, .resumeRequestBodyStream, + .receiveResponseHead, .receiveResponseEnd, .requestBodySent, + ] + ) + } + func testDefaultMaxBufferSize() { if MemoryLayout.size == 8 { XCTAssertEqual(ResponseAccumulator.maxByteBufferSize, Int(UInt32.max)) From 407849edd3acfa5a97763052fb16f9aa61cc7ae1 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 2 Mar 2026 10:59:48 +0100 Subject: [PATCH 2/2] Fix crash --- .../HTTP1/HTTP1ClientChannelHandler.swift | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 9ca82f9a9..e59f0ab7e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -243,24 +243,32 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise) case .sendRequestEnd(let trailers, let writePromise, let finalAction): - - let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self) // We need to defer succeeding the old request to avoid ordering issues + let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self) + // It is fine to bang the request here, as we have just verified with the state machine + // that the request is still ongoing. + // TODO: In the future, we should likely move the request into the state machine to + // prevent diverging state. + let oldRequest = self.request! + + switch finalAction { + case .none: + // we must not nil out the request here, as we are still uploading the request + // and therefore still need the reference to it. + break + case .informConnectionIsIdle: + self.request = nil + case .close: + self.request = nil + } writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in - guard let oldRequest = self.request else { - // in the meantime an error might have happened, which is why this request is - // not reference anymore. - return - } - oldRequest.requestBodyStreamSent() switch result { case .success: // If our final action is not `none`, that means we've already received // the complete response. As a result, once we've uploaded all the body parts // we need to tell the pool that the connection is idle or, if we were asked to // close when we're done, send the close. Either way, we then succeed the request - switch finalAction { case .none: // we must not nil out the request here, as we are still uploading the request @@ -268,13 +276,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { break case .informConnectionIsIdle: - self.request = nil self.onConnectionIdle() case .close: - self.request = nil context.close(promise: nil) } + oldRequest.requestBodyStreamSent() case .failure(let error): context.close(promise: nil)