Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,38 +243,45 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)

case .sendRequestEnd(let trailers, let writePromise, let finalAction):

let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// It is fine to bang the request here, as we have just verified with the state machine
// that the request is still ongoing.
// TODO: In the future, we should likely move the request into the state machine to
// prevent diverging state.
let oldRequest = self.request!

switch finalAction {
case .none:
// we must not nil out the request here, as we are still uploading the request
// and therefore still need the reference to it.
break
case .informConnectionIsIdle:
self.request = nil
case .close:
self.request = nil
}

writePromise.futureResult.hop(to: context.eventLoop).assumeIsolated().whenComplete { result in
guard let oldRequest = self.request else {
// in the meantime an error might have happened, which is why this request is
// not reference anymore.
return
}
oldRequest.requestBodyStreamSent()
switch result {
case .success:
// If our final action is not `none`, that means we've already received
// the complete response. As a result, once we've uploaded all the body parts
// we need to tell the pool that the connection is idle or, if we were asked to
// close when we're done, send the close. Either way, we then succeed the request

switch finalAction {
case .none:
// we must not nil out the request here, as we are still uploading the request
// and therefore still need the reference to it.
break

case .informConnectionIsIdle:
self.request = nil
self.onConnectionIdle()

case .close:
self.request = nil
context.close(promise: nil)
}
oldRequest.requestBodyStreamSent()

case .failure(let error):
context.close(promise: nil)
Expand Down
115 changes: 115 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,121 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
)
}

func testDemandResponseBodyStreamAfterEarlyResponseDoesNotCrash() async throws {
// This test reproduces a crash where `demandMoreResponseBodyParts()` was called on the
// state machine after it had already transitioned to `.idle`. The scenario is:
//
// 1. A streaming POST request is in progress
// 2. The full response (head + end) arrives before the request body is finished
// 3. The response end is forwarded with `finalAction: .none` (body still uploading)
// 4. The request body stream finishes -> `requestStreamFinished` -> state machine
// transitions to `.idle` and returns `.sendRequestEnd(.informConnectionIsIdle)`
// 5. The `.sendRequestEnd` handler writes `.end` to the channel and registers a
// callback on the write promise
// 6. Before the write callback fires (write hasn't completed yet),
// `demandResponseBodyStream` is called (from a delegate on another event loop)
// 7. Without the fix, `self.request` was still set (only nilled in the write
// callback), so the guard passed and `demandMoreResponseBodyParts()` hit
// `fatalError("Invalid state: idle")`
//
// The fix nils out `self.request` synchronously in `.sendRequestEnd` (before the
// write callback), so the guard in `demandResponseBodyStream0` fails and returns early.

final class DelayEndHandler: ChannelOutboundHandler {
typealias OutboundIn = HTTPClientRequestPart
typealias OutboundOut = HTTPClientRequestPart

private(set) var endPromise: EventLoopPromise<Void>?

func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
if case .end = self.unwrapOutboundIn(data) {
self.endPromise = promise
context.write(data, promise: nil)
} else {
context.write(data, promise: promise)
}
}
}

let eventLoop = EmbeddedEventLoop()
let delayEndHandler = DelayEndHandler()
let handler = HTTP1ClientChannelHandler(
eventLoop: eventLoop,
backgroundLogger: Logger(label: "no-op", factory: SwiftLogNoOpLogHandler.init),
connectionIdLoggerMetadata: "test connection"
)
var connectionIsIdle = false
handler.onConnectionIdle = { connectionIsIdle = true }
let channel = EmbeddedChannel(handlers: [delayEndHandler, handler], loop: eventLoop)
XCTAssertNoThrow(try channel.connect(to: .init(ipAddress: "127.0.0.1", port: 80)).wait())

let request = MockHTTPExecutableRequest(
head: .init(version: .http1_1, method: .POST, uri: "http://localhost/"),
framingMetadata: RequestFramingMetadata(connectionClose: false, body: .stream),
raiseErrorIfUnimplementedMethodIsCalled: false
)

let executor = handler.requestExecutor

// When the body stream is resumed, write one part but do NOT finish the stream yet.
request.resumeRequestBodyStreamCallback = {
executor.writeRequestBodyPart(.byteBuffer(.init(string: "Hello")), request: request, promise: nil)
}

// Start the request
channel.write(request, promise: nil)

// Verify request head was sent
XCTAssertEqual(try channel.readOutbound(as: HTTPClientRequestPart.self), .head(request.requestHead))
// Verify body part was sent
XCTAssertEqual(
try channel.readOutbound(as: HTTPClientRequestPart.self),
.body(.byteBuffer(.init(string: "Hello")))
)

// Now send the full response while the request body stream is still open.
// This causes forwardResponseEnd with finalAction: .none (body not done yet).
XCTAssertNoThrow(try channel.writeInbound(HTTPClientResponsePart.head(.init(version: .http1_1, status: .ok))))
// Issue a read to advance the response stream state so it accepts the end properly.
channel.read()
XCTAssertNoThrow(try channel.writeInbound(HTTPClientResponsePart.end(nil)))

// Finish the request body stream. This transitions the state machine to `.idle`
// and writes `.end` to the channel. The DelayEndHandler intercepts the `.end`
// write and holds the promise, preventing the write callback from firing.
executor.finishRequestBodyStream(trailers: nil, request: request, promise: nil)

// Verify the .end was written through to the channel
XCTAssertEqual(try channel.readOutbound(as: HTTPClientRequestPart.self), .end(nil))

// At this point:
// - The state machine has transitioned to `.idle`
// - The write promise has NOT been fulfilled (held by DelayEndHandler)
// - In old code: self.request is still set (only nilled in the write callback)
// - In fixed code: self.request is already nil (nilled synchronously)

// Now call demandResponseBodyStream, simulating a delegate on a different event
// loop calling it after receiving the response end but before the write completes.
// Without the fix, self.request is still set, the guard passes, and
// state.demandMoreResponseBodyParts() crashes with "Invalid state: idle".
// With the fix, self.request was already nilled, the guard fails, and this is a no-op.
executor.demandResponseBodyStream(request)

// Complete the delayed write to clean up properly.
delayEndHandler.endPromise?.succeed(())
eventLoop.run()

XCTAssertTrue(connectionIsIdle)

XCTAssertEqual(
request.events.map(\.kind),
[
.willExecuteRequest, .requestHeadSent, .resumeRequestBodyStream,
.receiveResponseHead, .receiveResponseEnd, .requestBodySent,
]
)
}

func testDefaultMaxBufferSize() {
if MemoryLayout<Int>.size == 8 {
XCTAssertEqual(ResponseAccumulator.maxByteBufferSize, Int(UInt32.max))
Expand Down