diff --git a/Package.swift b/Package.swift index 9a7ee856e..7ba3334de 100644 --- a/Package.swift +++ b/Package.swift @@ -24,6 +24,9 @@ let strictConcurrencySettings: [SwiftSetting] = { // -warnings-as-errors here is a workaround so that IDE-based development can // get tripped up on -require-explicit-sendable. initialSettings.append(.unsafeFlags(["-Xfrontend", "-require-explicit-sendable", "-warnings-as-errors"])) + initialSettings.append(.enableExperimentalFeature("LifetimeDependence")) + initialSettings.append(.enableExperimentalFeature("Lifetimes")) + initialSettings.append(.enableUpcomingFeature("LifetimeDependence")) } return initialSettings @@ -34,6 +37,15 @@ let package = Package( products: [ .library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"]) ], + traits: [ + .trait( + name: "ExperimentalHTTPAPIsSupport", + description: """ + Enables conformance to the HTTPAPIs HTTPClient protocol. This is potentially source breaking. + """ + ), + .default(enabledTraits: ["ExperimentalHTTPAPIsSupport"]), // remove before MERGE! + ], dependencies: [ .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.30.0"), @@ -46,6 +58,10 @@ let package = Package( .package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0"), .package(url: "https://github.com/apple/swift-configuration.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-service-context.git", from: "1.1.0"), + .package( + url: "https://github.com/apple/swift-http-api-proposal.git", + revision: "79028bea099d390935790d5d8884a61eabf448a5", + ), ], targets: [ .target( @@ -76,6 +92,9 @@ let package = Package( .product(name: "Logging", package: "swift-log"), .product(name: "Tracing", package: "swift-distributed-tracing"), .product(name: "ServiceContextModule", package: "swift-service-context"), + + // HTTP APIs + .product(name: "HTTPAPIs", package: "swift-http-api-proposal"), ], swiftSettings: strictConcurrencySettings ), @@ -110,6 +129,17 @@ let package = Package( ], swiftSettings: strictConcurrencySettings ), + .testTarget( + name: "ConformanceSuite", + dependencies: [ + "AsyncHTTPClient", + .product( + name: "HTTPClientConformance", + package: "swift-http-api-proposal", + condition: .when(traits: ["ExperimentalHTTPAPIsSupport"]) + ), + ] + ), ] ) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift index 03cd8e464..edc0de619 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest+Prepared.swift @@ -20,6 +20,10 @@ import ServiceContextModule import struct Foundation.URL +#if canImport(HTTPAPIs) +import HTTPAPIs +#endif + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension HTTPClientRequest { struct Prepared: Sendable { @@ -34,6 +38,10 @@ extension HTTPClientRequest { makeCompleteBody: @Sendable (ByteBufferAllocator) -> ByteBuffer ) case byteBuffer(ByteBuffer) + + #if canImport(HTTPAPIs) + case httpClientRequestBody(RequestBodyLength, AsyncStream.Continuation) + #endif } var url: URL @@ -101,6 +109,10 @@ extension HTTPClientRequest.Prepared.Body { ) case .byteBuffer(let byteBuffer): self = .byteBuffer(byteBuffer) + #if canImport(HTTPAPIs) + case .httpClientRequestBody(let lenght, let requestBody): + self = .httpClientRequestBody(lenght, requestBody) + #endif } } } @@ -115,6 +127,10 @@ extension RequestBodyLength { self = .known(Int64(buffer.readableBytes)) case .sequence(let length, _, _), .asyncSequence(let length, _): self = length + #if canImport(HTTPAPIs) + case .httpClientRequestBody(let length, _): + self = length + #endif } } } diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift index dca7de0ef..af88030b5 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift @@ -17,6 +17,10 @@ import NIOCore import NIOHTTP1 import NIOSSL +#if canImport(HTTPAPIs) +import HTTPAPIs +#endif + @usableFromInline let bagOfBytesToByteBufferConversionChunkSize = 1024 * 1024 * 4 @@ -92,6 +96,13 @@ extension HTTPClientRequest { makeCompleteBody: @Sendable (ByteBufferAllocator) -> ByteBuffer ) case byteBuffer(ByteBuffer) + + #if canImport(HTTPAPIs) + case httpClientRequestBody( + length: RequestBodyLength, + startUpload: AsyncStream.Continuation + ) + #endif } @usableFromInline @@ -345,6 +356,9 @@ extension Optional where Wrapped == HTTPClientRequest.Body { case .byteBuffer: return true case .sequence(_, let canBeConsumedMultipleTimes, _): return canBeConsumedMultipleTimes case .asyncSequence: return false + #if canImport(HTTPAPIs) + case .httpClientRequestBody: return false // TODO: I think this should be TRUE + #endif } } } @@ -385,6 +399,10 @@ extension HTTPClientRequest.Body: AsyncSequence { return .init(storage: .byteBuffer(makeCompleteBody(AsyncIterator.allocator))) case .byteBuffer(let byteBuffer): return .init(storage: .byteBuffer(byteBuffer)) + #if canImport(HTTPAPIs) + case .httpClientRequestBody: + fatalError("Unimplemented") + #endif } } } diff --git a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientResponse.swift b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientResponse.swift index 36c1cb36f..1803c9cff 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientResponse.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/HTTPClientResponse.swift @@ -78,6 +78,7 @@ public struct HTTPClientResponse: Sendable { version: HTTPVersion, status: HTTPResponseStatus, headers: HTTPHeaders, + transaction: Transaction, body: TransactionBody, history: [HTTPClientRequestResponse] ) { @@ -88,6 +89,7 @@ public struct HTTPClientResponse: Sendable { body: .init( .transaction( body, + transaction, expectedContentLength: HTTPClientResponse.expectedContentLength( requestMethod: requestMethod, headers: headers, @@ -149,7 +151,7 @@ extension HTTPClientResponse { /// - Returns: the number of bytes collected over time @inlinable public func collect(upTo maxBytes: Int) async throws -> ByteBuffer { switch self.storage { - case .transaction(_, let expectedContentLength): + case .transaction(_, _, let expectedContentLength): if let contentLength = expectedContentLength { if contentLength > maxBytes { throw NIOTooManyBytesError(maxBytes: maxBytes) @@ -199,7 +201,7 @@ typealias TransactionBody = NIOThrowingAsyncSequenceProducer< @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension HTTPClientResponse.Body { @usableFromInline enum Storage: Sendable { - case transaction(TransactionBody, expectedContentLength: Int?) + case transaction(TransactionBody, Transaction, expectedContentLength: Int?) case anyAsyncSequence(AnyAsyncSequence) } } @@ -210,7 +212,7 @@ extension HTTPClientResponse.Body.Storage: AsyncSequence { @inlinable func makeAsyncIterator() -> AsyncIterator { switch self { - case .transaction(let transaction, _): + case .transaction(let transaction, _, _): return .transaction(transaction.makeAsyncIterator()) case .anyAsyncSequence(let anyAsyncSequence): return .anyAsyncSequence(anyAsyncSequence.makeAsyncIterator()) diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift index 4128998b9..01f01d43c 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift @@ -31,7 +31,7 @@ extension Transaction { case queued(CheckedContinuation, HTTPRequestScheduler) case deadlineExceededWhileQueued(CheckedContinuation) case executing(ExecutionContext, RequestStreamState, ResponseStreamState) - case finished(error: Error?) + case finished(Result) } fileprivate enum RequestStreamState: Sendable { @@ -47,7 +47,7 @@ extension Transaction { case waitingForResponseHead // streaming response body. Valid transitions to: finished. case streamingBody(TransactionBody.Source) - case finished + case finished(HTTPHeaders?) } private var state: State @@ -105,11 +105,11 @@ extension Transaction { mutating func fail(_ error: Error) -> FailAction { switch self.state { case .initialized(let continuation): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .failResponseHead(continuation, error, nil, nil, bodyStreamContinuation: nil) case .queued(let continuation, let scheduler): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .failResponseHead(continuation, error, scheduler, nil, bodyStreamContinuation: nil) case .deadlineExceededWhileQueued(let continuation): let realError: Error = { @@ -123,12 +123,12 @@ extension Transaction { } }() - self.state = .finished(error: realError) + self.state = .finished(.failure(realError)) return .failResponseHead(continuation, realError, nil, nil, bodyStreamContinuation: nil) case .executing(let context, let requestStreamState, .waitingForResponseHead): switch requestStreamState { case .paused(continuation: .some(let continuation)): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .failResponseHead( context.continuation, error, @@ -138,7 +138,7 @@ extension Transaction { ) case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .failResponseHead( context.continuation, error, @@ -149,7 +149,7 @@ extension Transaction { } case .executing(let context, let requestStreamState, .streamingBody(let source)): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) switch requestStreamState { case .paused(let bodyStreamContinuation): return .failResponseStream( @@ -164,7 +164,7 @@ extension Transaction { case .executing(let context, let requestStreamState, .finished): // an error occured after full response received, but before the full request was sent - self.state = .finished(error: error) + self.state = .finished(.failure(error)) switch requestStreamState { case .paused(let bodyStreamContinuation): if let bodyStreamContinuation { @@ -205,14 +205,14 @@ extension Transaction { return .none case .deadlineExceededWhileQueued(let continuation): let error = HTTPClientError.deadlineExceeded - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .cancelAndFail(executor, continuation, with: error) - case .finished(error: .some): + case .finished(.failure): return .cancel(executor) case .executing, - .finished(error: .none): + .finished(.success): preconditionFailure("Invalid state: \(self.state)") } } @@ -402,8 +402,8 @@ extension Transaction { assertionFailure("Invalid state: \(self.state)") return .failure(HTTPClientError.internalStateFailure()) - case .executing(_, .endForwarded, .finished): - self.state = .finished(error: nil) + case .executing(_, .endForwarded, .finished(let trailers)): + self.state = .finished(.success(trailers)) return .none case .executing(let context, .endForwarded, let responseState): @@ -446,12 +446,12 @@ extension Transaction { self.state = .executing(context, requestState, .streamingBody(body.source)) return .succeedResponseHead(body.sequence, context.continuation) - case .finished(error: .some): + case .finished(.failure): // If the request failed before, we don't need to do anything in response to // receiving the response head. return .none - case .finished(error: .none): + case .finished(.success): preconditionFailure("How can the request be finished without error, before receiving response head?") } } @@ -511,7 +511,10 @@ extension Transaction { case none } - mutating func receiveResponseEnd(_ newChunks: CircularBuffer?) -> ReceiveResponseEndAction { + mutating func receiveResponseEnd( + _ newChunks: CircularBuffer?, + trailers: HTTPHeaders? + ) -> ReceiveResponseEndAction { switch self.state { case .initialized, .queued, @@ -524,9 +527,9 @@ extension Transaction { case .executing(let context, let requestState, .streamingBody(let source)): switch requestState { case .finished: - self.state = .finished(error: nil) + self.state = .finished(.success(trailers)) case .paused, .producing, .requestHeadSent, .endForwarded: - self.state = .executing(context, requestState, .finished) + self.state = .executing(context, requestState, .finished(trailers)) } return .finishResponseStream(source, finalBody: newChunks) @@ -540,6 +543,18 @@ extension Transaction { } } + var trailers: HTTPHeaders? { + switch self.state { + case .deadlineExceededWhileQueued, .initialized, .queued, + .executing(_, _, .waitingForResponseHead), + .executing(_, _, .streamingBody), + .finished(.failure): + return nil + case .executing(_, _, .finished(let trailers)), .finished(.success(let trailers)): + return trailers + } + } + mutating func httpResponseStreamTerminated() -> FailAction { switch self.state { case .executing(_, _, .finished), .finished: @@ -565,7 +580,7 @@ extension Transaction { let error = HTTPClientError.deadlineExceeded switch self.state { case .initialized(let continuation): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .cancel( requestContinuation: continuation, scheduler: nil, @@ -583,7 +598,7 @@ extension Transaction { case .executing(let context, let requestStreamState, .waitingForResponseHead): switch requestStreamState { case .paused(continuation: .some(let continuation)): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .cancel( requestContinuation: context.continuation, scheduler: nil, @@ -591,7 +606,7 @@ extension Transaction { bodyStreamContinuation: continuation ) case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none): - self.state = .finished(error: error) + self.state = .finished(.failure(error)) return .cancel( requestContinuation: context.continuation, scheduler: nil, diff --git a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift index 30c7c877f..e12133707 100644 --- a/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift +++ b/Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift @@ -17,6 +17,7 @@ import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import NIOSSL +import Synchronization import Tracing @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) @@ -73,7 +74,7 @@ final class Transaction: return } - self.requestBodyStreamFinished() + self.requestBodyStreamFinished(trailers: nil) } private func continueRequestBodyStream( @@ -94,7 +95,7 @@ final class Transaction: } } - self.requestBodyStreamFinished() + self.requestBodyStreamFinished(trailers: nil) } catch { // The only chance of reaching this catch block, is an error thrown in the `next` // call above. @@ -105,7 +106,7 @@ final class Transaction: struct BreakTheWriteLoopError: Swift.Error {} - private func writeRequestBodyPart(_ part: ByteBuffer) async throws { + func writeRequestBodyPart(_ part: ByteBuffer) async throws { let action = self.state.withLockedValue { state in state.writeNextRequestPart() } @@ -146,7 +147,7 @@ final class Transaction: } } - private func requestBodyStreamFinished() { + func requestBodyStreamFinished(trailers: HTTPHeaders?) { let finishAction = self.state.withLockedValue { state in state.finishRequestBodyStream() } @@ -157,7 +158,7 @@ final class Transaction: break case .forwardStreamFinished(let executor): - executor.finishRequestBodyStream(trailers: nil, request: self, promise: nil) + executor.finishRequestBodyStream(trailers: trailers, request: self, promise: nil) } return } @@ -228,12 +229,17 @@ extension Transaction: HTTPExecutableRequest { case .byteBuffer(let byteBuffer): self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer) - case .none: - break - case .sequence(_, _, let create): let byteBuffer = create(allocator) self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer) + + #if canImport(HTTPAPIs) + case .httpClientRequestBody(_, let continuation): + continuation.yield(self) + #endif + + case .none: + break } case .resumeStream(let continuation): @@ -277,6 +283,7 @@ extension Transaction: HTTPExecutableRequest { version: head.version, status: head.status, headers: head.headers, + transaction: self, body: body, history: [] ) @@ -303,7 +310,7 @@ extension Transaction: HTTPExecutableRequest { func receiveResponseEnd(_ buffer: CircularBuffer?, trailers: HTTPHeaders?) { let receiveResponseEndAction = self.state.withLockedValue { state in - state.receiveResponseEnd(buffer) + state.receiveResponseEnd(buffer, trailers: trailers) } switch receiveResponseEndAction { case .finishResponseStream(let source, let finalResponse): @@ -317,6 +324,12 @@ extension Transaction: HTTPExecutableRequest { } } + var trailers: HTTPHeaders? { + self.state.withLockedValue { + $0.trailers + } + } + func httpResponseStreamTerminated() { let action = self.state.withLockedValue { state in state.httpResponseStreamTerminated() diff --git a/Sources/AsyncHTTPClient/HTTP APIs/AHC+HTTP.swift b/Sources/AsyncHTTPClient/HTTP APIs/AHC+HTTP.swift new file mode 100644 index 000000000..b1c101b9c --- /dev/null +++ b/Sources/AsyncHTTPClient/HTTP APIs/AHC+HTTP.swift @@ -0,0 +1,240 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2025 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.2) && canImport(HTTPAPIs) +import HTTPAPIs +import HTTPTypes +import NIOHTTP1 +import Foundation +import NIOCore +import Synchronization +import BasicContainers + +@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, *) +extension AsyncHTTPClient.HTTPClient: HTTPAPIs.HTTPClient { + public typealias RequestWriter = RequestBodyWriter + public typealias ResponseConcludingReader = ResponseReader + + public struct RequestOptions: HTTPClientCapability.RequestOptions { + public init() {} + } + + public struct RequestBodyWriter: AsyncWriter, ~Copyable { + public typealias WriteElement = UInt8 + public typealias WriteFailure = any Error + + let transaction: Transaction + var byteBuffer: ByteBuffer + var rigidArray: RigidArray + + init(transaction: Transaction) { + self.transaction = transaction + self.byteBuffer = ByteBuffer() + self.byteBuffer.reserveCapacity(2 ^ 16) + self.rigidArray = RigidArray(capacity: 2 ^ 16) // ~ 65k bytes + } + + public mutating func write( + _ body: nonisolated(nonsending) (inout OutputSpan) async throws(Failure) -> Result + ) async throws(AsyncStreaming.EitherError) -> Result where Failure: Error { + let result: Result + do { + // TODO: rigidArray needs a clear all + self.rigidArray.removeAll() + self.rigidArray.reserveCapacity(1024) + result = try await self.rigidArray.append(count: 1024) { (span) async throws(Failure) -> Result in + try await body(&span) + } + } catch { + throw .second(error) + } + + do { + self.byteBuffer.clear() + + // we need to use an uninitilized helper rigidarray here to make the compiler happy + // with regards overlapping memory access. + var localArray = RigidArray(capacity: 0) + swap(&localArray, &self.rigidArray) + localArray.span.withUnsafeBufferPointer { bufferPtr in + self.byteBuffer.withUnsafeMutableWritableBytes { byteBufferPtr in + byteBufferPtr.copyBytes(from: bufferPtr) + } + self.byteBuffer.moveWriterIndex(forwardBy: bufferPtr.count) + } + + swap(&localArray, &self.rigidArray) + try await self.transaction.writeRequestBodyPart(self.byteBuffer) + } catch { + throw .first(error) + } + + return result + } + } + + public struct ResponseReader: ConcludingAsyncReader { + public typealias Underlying = ResponseBodyReader + + let underlying: HTTPClientResponse.Body + + public typealias FinalElement = HTTPFields? + + init(underlying: HTTPClientResponse.Body) { + self.underlying = underlying + } + + public consuming func consumeAndConclude( + body: + nonisolated(nonsending) (consuming sending HTTPClient.ResponseBodyReader) async throws(Failure) -> + Return + ) async throws(Failure) -> (Return, HTTPFields?) where Failure: Error { + let iterator = self.underlying.makeAsyncIterator() + let reader = ResponseBodyReader(underlying: iterator) + let returnValue = try await body(reader) + + let trailers: HTTPFields? + switch underlying.storage { + case .transaction(_, let transaction, _): + if let t = transaction.trailers { + let sequence = t.lazy.compactMap({ + if let name = HTTPField.Name($0.name) { + HTTPField(name: name, value: $0.value) + } else { + nil + } + }) + trailers = HTTPFields(sequence) + } else { + trailers = nil + } + + case .anyAsyncSequence: + trailers = nil + } + return (returnValue, trailers) + } + + } + + public struct ResponseBodyReader: AsyncReader, ~Copyable { + public typealias ReadElement = UInt8 + public typealias ReadFailure = any Error + + var underlying: HTTPClientResponse.Body.AsyncIterator + + public mutating func read( + maximumCount: Int?, + body: nonisolated(nonsending) (consuming Span) async throws(Failure) -> Return + ) async throws(AsyncStreaming.EitherError) -> Return where Failure: Error { + + do { + let buffer = try await self.underlying.next(isolation: #isolation) + if let buffer { + var array = RigidArray() + array.reserveCapacity(buffer.readableBytes) + buffer.withUnsafeReadableBytes { rawBufferPtr in + let usbptr = rawBufferPtr.assumingMemoryBound(to: UInt8.self) + array.append(copying: usbptr) + } + return try await body(array.span) + } else { + let array = InlineArray<0, UInt8> { _ in } + return try await body(array.span) + } + } catch let error as Failure { + throw .second(error) + } catch { + throw .first(error) + } + } + } + + public func perform( + request: HTTPRequest, + body: consuming HTTPClientRequestBody?, + options: HTTPClient.RequestOptions, + responseHandler: nonisolated(nonsending) (HTTPResponse, consuming ResponseReader) async throws -> Return + ) async throws -> Return { + guard let url = request.url else { + fatalError() + } + + var result: Result? + await withTaskGroup(of: Void.self) { taskGroup in + + var ahcRequest = HTTPClientRequest(url: url.absoluteString) + ahcRequest.method = .init(rawValue: request.method.rawValue) + if !request.headerFields.isEmpty { + let sequence = request.headerFields.lazy.map({ ($0.name.rawName, $0.value) }) + ahcRequest.headers.add(contentsOf: sequence) + } + + if let body { + let length = body.knownLength.map { RequestBodyLength.known($0) } ?? .unknown + let (asyncStream, startUploadContinuation) = AsyncStream.makeStream(of: Transaction.self) + + taskGroup.addTask { + // TODO: We might want to allow multiple body restarts here. + + for await transaction in asyncStream { + do { + let writer = RequestWriter(transaction: transaction) + let maybeTrailers = try await body.produce(into: writer) + let trailers: HTTPHeaders? = + if let trailers = maybeTrailers { + HTTPHeaders(.init(trailers.lazy.map({ ($0.name.rawName, $0.value) }))) + } else { + nil + } + transaction.requestBodyStreamFinished(trailers: trailers) + break // the loop + } catch let error { + // if we fail because the user throws in upload, we have to cancel the + // upload and fail the request I guess. + transaction.fail(error) + } + } + } + + ahcRequest.body = .init(.httpClientRequestBody(length: length, startUpload: startUploadContinuation)) + } + + do { + let ahcResponse = try await self.execute(ahcRequest, timeout: .seconds(30)) + + var responseFields = HTTPFields() + for (name, value) in ahcResponse.headers { + if let name = HTTPField.Name(name) { + // Add a new header field + responseFields.append(.init(name: name, value: value)) + } + } + + let response = HTTPResponse( + status: .init(code: Int(ahcResponse.status.code)), + headerFields: responseFields + ) + + result = .success(try await responseHandler(response, .init(underlying: ahcResponse.body))) + } catch { + result = .failure(error) + } + } + + return try result!.get() + } +} + +#endif diff --git a/Tests/AsyncHTTPClientTests/HTTP APIs/AHC+HTTP_Tests.swift b/Tests/AsyncHTTPClientTests/HTTP APIs/AHC+HTTP_Tests.swift new file mode 100644 index 000000000..4299502f4 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTP APIs/AHC+HTTP_Tests.swift @@ -0,0 +1,108 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2026 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if compiler(>=6.2) && $ExperimentalHTTPAPIsSupport +import NIOCore +import HTTPTypes +import HTTPAPIs +import Testing +import AsyncHTTPClient + +@Suite +struct AbstractHTTPClientTest { + + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test func testGet() async throws { + + let bin = HTTPBin(.http1_1(ssl: false)) + defer { try! bin.shutdown() } + + let request = HTTPRequest(method: .post, scheme: "http", authority: "127.0.0.1:\(bin.port)", path: "/trailers") + + try await HTTPClient.shared.perform( + request: request, + body: nil, + options: .init(), + ) { response, responseReader in + print("status: \(response.status)") + for header in response.headerFields { + print("\(header.name): \(header.value)") + } + + let trailers = try await responseReader.collect(upTo: 1024) { span in + span.withUnsafeBufferPointer { buffer in + print(String(decoding: buffer, as: Unicode.UTF8.self)) + } + } + + print(trailers) + } + + } + + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test func testEcho() async throws { + + let bin = HTTPBin(.http1_1(ssl: false)) { _ in HTTPEchoHandler() } + defer { try! bin.shutdown() } + + let request = HTTPRequest(method: .post, scheme: "http", authority: "127.0.0.1:\(bin.port)", path: "/") + + try await HTTPClient.shared.perform( + request: request, + body: .restartable { (writer: consuming AsyncHTTPClient.HTTPClient.RequestWriter) in + var mwriter = writer + + for i in 1...10 { + try await mwriter.write { outputSpan in + if i == 1 { + outputSpan.append("\(i) car\n".utf8) + } else { + outputSpan.append("\(i) cars\n".utf8) + } + } + try await Task.sleep(for: .milliseconds(400)) + } + + return [HTTPField.Name("status")!: "Look Mum, I am done counting!"] + }, + options: .init(), + ) { response, responseReader in + let trailers = try await responseReader.consumeAndConclude { bodyReader in + var bodyReader = bodyReader + var `continue` = true + while `continue` { + try await bodyReader.read(maximumCount: 1024) { span in + if span.count == 0 { `continue` = false } + + // Span does not conform to Collection + span.withUnsafeBufferPointer { buffer in + print(String(decoding: buffer, as: Unicode.UTF8.self)) + } + } + } + } + } + } +} + +extension OutputSpan { + mutating func append(_ sequence: some Sequence) { + for element in sequence { + self.append(element) + } + } +} + +#endif diff --git a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift index 54467aab7..54d9e0808 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift @@ -782,6 +782,11 @@ extension Optional where Wrapped == HTTPClientRequest.Prepared.Body { ) } return accumulatedBuffer + + #if canImport(HTTPAPIs) + case .httpClientRequestBody: + fatalError("TODO: Unimplemented") + #endif } } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 2dcce3e12..1e9aacf29 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -786,16 +786,19 @@ internal struct HTTPResponseBuilder { var body: ByteBuffer? var requestBodyByteCount: Int let responseBodyIsRequestBodyByteCount: Bool + let trailers: HTTPHeaders? init( _ version: HTTPVersion = HTTPVersion(major: 1, minor: 1), status: HTTPResponseStatus, headers: HTTPHeaders = HTTPHeaders(), - responseBodyIsRequestBodyByteCount: Bool = false + responseBodyIsRequestBodyByteCount: Bool = false, + trailers: HTTPHeaders? = nil ) { self.head = HTTPResponseHead(version: version, status: status, headers: headers) self.requestBodyByteCount = 0 self.responseBodyIsRequestBodyByteCount = responseBodyIsRequestBodyByteCount + self.trailers = trailers } mutating func add(_ part: ByteBuffer) { @@ -963,6 +966,9 @@ internal final class HTTPBinHandler: ChannelInboundHandler { } self.resps.append(HTTPResponseBuilder(status: .ok)) return + case "/trailers": + self.resps.append(HTTPResponseBuilder(status: .ok, trailers: ["hello": "world"])) + return case "/stats": var body = context.channel.allocator.buffer(capacity: 1) body.writeString("Just some stats mate.") @@ -1143,7 +1149,8 @@ internal final class HTTPBinHandler: ChannelInboundHandler { return } - context.writeAndFlush(self.wrapOutboundOut(.end(nil))).assumeIsolated().whenComplete { result in + context.writeAndFlush(self.wrapOutboundOut(.end(response.trailers))).assumeIsolated().whenComplete { + result in self.isServingRequest = false switch result { case .success: @@ -1474,8 +1481,8 @@ class HTTPEchoHandler: ChannelInboundHandler { ) case .body(let bytes): context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil) - case .end: - context.writeAndFlush(self.wrapOutboundOut(.end(nil))).assumeIsolated().whenSuccess { + case .end(let trailers): + context.writeAndFlush(self.wrapOutboundOut(.end(trailers))).assumeIsolated().whenSuccess { context.close(promise: nil) } } diff --git a/Tests/ConformanceSuite/Suite.swift b/Tests/ConformanceSuite/Suite.swift new file mode 100644 index 000000000..1254a924c --- /dev/null +++ b/Tests/ConformanceSuite/Suite.swift @@ -0,0 +1,50 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2026 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if $ExperimentalHTTPAPIsSupport +import AsyncHTTPClient +import HTTPAPIs +import HTTPClient +import HTTPClientConformance +internal import NIOPosix +import Testing + +@Suite struct AsyncHTTPClientTests { + @available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *) + @Test func conformance() async throws { + var config = HTTPClient.Configuration() + config.connectionPool.concurrentHTTP1ConnectionsPerHostSoftLimit = 1 + config.httpVersion = .automatic + let httpClient = HTTPClient(eventLoopGroup: .singletonMultiThreadedEventLoopGroup, configuration: config) + defer { Task { try await httpClient.shutdown() } } + + try await runAllConformanceTests { + httpClient + } + } +} + +@available(macOS 26.2, *) +extension AsyncHTTPClient.HTTPClient.RequestOptions: HTTPClientCapability.RedirectionHandler { + @available(macOS 26.2, *) + public var redirectionHandler: (any HTTPClientRedirectionHandler)? { + get { + nil + } + set { + + } + } +} +#endif