Skip to content
30 changes: 30 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ let strictConcurrencySettings: [SwiftSetting] = {
// -warnings-as-errors here is a workaround so that IDE-based development can
// get tripped up on -require-explicit-sendable.
initialSettings.append(.unsafeFlags(["-Xfrontend", "-require-explicit-sendable", "-warnings-as-errors"]))
initialSettings.append(.enableExperimentalFeature("LifetimeDependence"))
initialSettings.append(.enableExperimentalFeature("Lifetimes"))
initialSettings.append(.enableUpcomingFeature("LifetimeDependence"))
}

return initialSettings
Expand All @@ -34,6 +37,15 @@ let package = Package(
products: [
.library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"])
],
traits: [
.trait(
name: "ExperimentalHTTPAPIsSupport",
description: """
Enables conformance to the HTTPAPIs HTTPClient protocol. This is potentially source breaking.
"""
),
.default(enabledTraits: ["ExperimentalHTTPAPIsSupport"]), // remove before MERGE!
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"),
.package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.30.0"),
Expand All @@ -46,6 +58,10 @@ let package = Package(
.package(url: "https://github.com/apple/swift-distributed-tracing.git", from: "1.3.0"),
.package(url: "https://github.com/apple/swift-configuration.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-service-context.git", from: "1.1.0"),
.package(
url: "https://github.com/apple/swift-http-api-proposal.git",
revision: "79028bea099d390935790d5d8884a61eabf448a5",
),
],
targets: [
.target(
Expand Down Expand Up @@ -76,6 +92,9 @@ let package = Package(
.product(name: "Logging", package: "swift-log"),
.product(name: "Tracing", package: "swift-distributed-tracing"),
.product(name: "ServiceContextModule", package: "swift-service-context"),

// HTTP APIs
.product(name: "HTTPAPIs", package: "swift-http-api-proposal"),
],
swiftSettings: strictConcurrencySettings
),
Expand Down Expand Up @@ -110,6 +129,17 @@ let package = Package(
],
swiftSettings: strictConcurrencySettings
),
.testTarget(
name: "ConformanceSuite",
dependencies: [
"AsyncHTTPClient",
.product(
name: "HTTPClientConformance",
package: "swift-http-api-proposal",
condition: .when(traits: ["ExperimentalHTTPAPIsSupport"])
),
]
),
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import ServiceContextModule

import struct Foundation.URL

#if canImport(HTTPAPIs)
import HTTPAPIs
#endif

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientRequest {
struct Prepared: Sendable {
Expand All @@ -34,6 +38,10 @@ extension HTTPClientRequest {
makeCompleteBody: @Sendable (ByteBufferAllocator) -> ByteBuffer
)
case byteBuffer(ByteBuffer)

#if canImport(HTTPAPIs)
case httpClientRequestBody(RequestBodyLength, AsyncStream<Transaction>.Continuation)
#endif
}

var url: URL
Expand Down Expand Up @@ -101,6 +109,10 @@ extension HTTPClientRequest.Prepared.Body {
)
case .byteBuffer(let byteBuffer):
self = .byteBuffer(byteBuffer)
#if canImport(HTTPAPIs)
case .httpClientRequestBody(let lenght, let requestBody):
self = .httpClientRequestBody(lenght, requestBody)
#endif
}
}
}
Expand All @@ -115,6 +127,10 @@ extension RequestBodyLength {
self = .known(Int64(buffer.readableBytes))
case .sequence(let length, _, _), .asyncSequence(let length, _):
self = length
#if canImport(HTTPAPIs)
case .httpClientRequestBody(let length, _):
self = length
#endif
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import NIOCore
import NIOHTTP1
import NIOSSL

#if canImport(HTTPAPIs)
import HTTPAPIs
#endif

@usableFromInline
let bagOfBytesToByteBufferConversionChunkSize = 1024 * 1024 * 4

Expand Down Expand Up @@ -92,6 +96,13 @@ extension HTTPClientRequest {
makeCompleteBody: @Sendable (ByteBufferAllocator) -> ByteBuffer
)
case byteBuffer(ByteBuffer)

#if canImport(HTTPAPIs)
case httpClientRequestBody(
length: RequestBodyLength,
startUpload: AsyncStream<Transaction>.Continuation
)
#endif
}

@usableFromInline
Expand Down Expand Up @@ -345,6 +356,9 @@ extension Optional where Wrapped == HTTPClientRequest.Body {
case .byteBuffer: return true
case .sequence(_, let canBeConsumedMultipleTimes, _): return canBeConsumedMultipleTimes
case .asyncSequence: return false
#if canImport(HTTPAPIs)
case .httpClientRequestBody: return false // TODO: I think this should be TRUE
#endif
}
}
}
Expand Down Expand Up @@ -385,6 +399,10 @@ extension HTTPClientRequest.Body: AsyncSequence {
return .init(storage: .byteBuffer(makeCompleteBody(AsyncIterator.allocator)))
case .byteBuffer(let byteBuffer):
return .init(storage: .byteBuffer(byteBuffer))
#if canImport(HTTPAPIs)
case .httpClientRequestBody:
fatalError("Unimplemented")
#endif
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientResponse.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public struct HTTPClientResponse: Sendable {
version: HTTPVersion,
status: HTTPResponseStatus,
headers: HTTPHeaders,
transaction: Transaction,
body: TransactionBody,
history: [HTTPClientRequestResponse]
) {
Expand All @@ -88,6 +89,7 @@ public struct HTTPClientResponse: Sendable {
body: .init(
.transaction(
body,
transaction,
expectedContentLength: HTTPClientResponse.expectedContentLength(
requestMethod: requestMethod,
headers: headers,
Expand Down Expand Up @@ -149,7 +151,7 @@ extension HTTPClientResponse {
/// - Returns: the number of bytes collected over time
@inlinable public func collect(upTo maxBytes: Int) async throws -> ByteBuffer {
switch self.storage {
case .transaction(_, let expectedContentLength):
case .transaction(_, _, let expectedContentLength):
if let contentLength = expectedContentLength {
if contentLength > maxBytes {
throw NIOTooManyBytesError(maxBytes: maxBytes)
Expand Down Expand Up @@ -199,7 +201,7 @@ typealias TransactionBody = NIOThrowingAsyncSequenceProducer<
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension HTTPClientResponse.Body {
@usableFromInline enum Storage: Sendable {
case transaction(TransactionBody, expectedContentLength: Int?)
case transaction(TransactionBody, Transaction, expectedContentLength: Int?)
case anyAsyncSequence(AnyAsyncSequence<ByteBuffer>)
}
}
Expand All @@ -210,7 +212,7 @@ extension HTTPClientResponse.Body.Storage: AsyncSequence {

@inlinable func makeAsyncIterator() -> AsyncIterator {
switch self {
case .transaction(let transaction, _):
case .transaction(let transaction, _, _):
return .transaction(transaction.makeAsyncIterator())
case .anyAsyncSequence(let anyAsyncSequence):
return .anyAsyncSequence(anyAsyncSequence.makeAsyncIterator())
Expand Down
59 changes: 37 additions & 22 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extension Transaction {
case queued(CheckedContinuation<HTTPClientResponse, Error>, HTTPRequestScheduler)
case deadlineExceededWhileQueued(CheckedContinuation<HTTPClientResponse, Error>)
case executing(ExecutionContext, RequestStreamState, ResponseStreamState)
case finished(error: Error?)
case finished(Result<HTTPHeaders?, any Error>)
}

fileprivate enum RequestStreamState: Sendable {
Expand All @@ -47,7 +47,7 @@ extension Transaction {
case waitingForResponseHead
// streaming response body. Valid transitions to: finished.
case streamingBody(TransactionBody.Source)
case finished
case finished(HTTPHeaders?)
}

private var state: State
Expand Down Expand Up @@ -105,11 +105,11 @@ extension Transaction {
mutating func fail(_ error: Error) -> FailAction {
switch self.state {
case .initialized(let continuation):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .failResponseHead(continuation, error, nil, nil, bodyStreamContinuation: nil)

case .queued(let continuation, let scheduler):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .failResponseHead(continuation, error, scheduler, nil, bodyStreamContinuation: nil)
case .deadlineExceededWhileQueued(let continuation):
let realError: Error = {
Expand All @@ -123,12 +123,12 @@ extension Transaction {
}
}()

self.state = .finished(error: realError)
self.state = .finished(.failure(realError))
return .failResponseHead(continuation, realError, nil, nil, bodyStreamContinuation: nil)
case .executing(let context, let requestStreamState, .waitingForResponseHead):
switch requestStreamState {
case .paused(continuation: .some(let continuation)):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .failResponseHead(
context.continuation,
error,
Expand All @@ -138,7 +138,7 @@ extension Transaction {
)

case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .failResponseHead(
context.continuation,
error,
Expand All @@ -149,7 +149,7 @@ extension Transaction {
}

case .executing(let context, let requestStreamState, .streamingBody(let source)):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
switch requestStreamState {
case .paused(let bodyStreamContinuation):
return .failResponseStream(
Expand All @@ -164,7 +164,7 @@ extension Transaction {

case .executing(let context, let requestStreamState, .finished):
// an error occured after full response received, but before the full request was sent
self.state = .finished(error: error)
self.state = .finished(.failure(error))
switch requestStreamState {
case .paused(let bodyStreamContinuation):
if let bodyStreamContinuation {
Expand Down Expand Up @@ -205,14 +205,14 @@ extension Transaction {
return .none
case .deadlineExceededWhileQueued(let continuation):
let error = HTTPClientError.deadlineExceeded
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .cancelAndFail(executor, continuation, with: error)

case .finished(error: .some):
case .finished(.failure):
return .cancel(executor)

case .executing,
.finished(error: .none):
.finished(.success):
preconditionFailure("Invalid state: \(self.state)")
}
}
Expand Down Expand Up @@ -402,8 +402,8 @@ extension Transaction {
assertionFailure("Invalid state: \(self.state)")
return .failure(HTTPClientError.internalStateFailure())

case .executing(_, .endForwarded, .finished):
self.state = .finished(error: nil)
case .executing(_, .endForwarded, .finished(let trailers)):
self.state = .finished(.success(trailers))
return .none

case .executing(let context, .endForwarded, let responseState):
Expand Down Expand Up @@ -446,12 +446,12 @@ extension Transaction {
self.state = .executing(context, requestState, .streamingBody(body.source))
return .succeedResponseHead(body.sequence, context.continuation)

case .finished(error: .some):
case .finished(.failure):
// If the request failed before, we don't need to do anything in response to
// receiving the response head.
return .none

case .finished(error: .none):
case .finished(.success):
preconditionFailure("How can the request be finished without error, before receiving response head?")
}
}
Expand Down Expand Up @@ -511,7 +511,10 @@ extension Transaction {
case none
}

mutating func receiveResponseEnd(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
mutating func receiveResponseEnd(
_ newChunks: CircularBuffer<ByteBuffer>?,
trailers: HTTPHeaders?
) -> ReceiveResponseEndAction {
switch self.state {
case .initialized,
.queued,
Expand All @@ -524,9 +527,9 @@ extension Transaction {
case .executing(let context, let requestState, .streamingBody(let source)):
switch requestState {
case .finished:
self.state = .finished(error: nil)
self.state = .finished(.success(trailers))
case .paused, .producing, .requestHeadSent, .endForwarded:
self.state = .executing(context, requestState, .finished)
self.state = .executing(context, requestState, .finished(trailers))
}
return .finishResponseStream(source, finalBody: newChunks)

Expand All @@ -540,6 +543,18 @@ extension Transaction {
}
}

var trailers: HTTPHeaders? {
switch self.state {
case .deadlineExceededWhileQueued, .initialized, .queued,
.executing(_, _, .waitingForResponseHead),
.executing(_, _, .streamingBody),
.finished(.failure):
return nil
case .executing(_, _, .finished(let trailers)), .finished(.success(let trailers)):
return trailers
}
}

mutating func httpResponseStreamTerminated() -> FailAction {
switch self.state {
case .executing(_, _, .finished), .finished:
Expand All @@ -565,7 +580,7 @@ extension Transaction {
let error = HTTPClientError.deadlineExceeded
switch self.state {
case .initialized(let continuation):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .cancel(
requestContinuation: continuation,
scheduler: nil,
Expand All @@ -583,15 +598,15 @@ extension Transaction {
case .executing(let context, let requestStreamState, .waitingForResponseHead):
switch requestStreamState {
case .paused(continuation: .some(let continuation)):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .cancel(
requestContinuation: context.continuation,
scheduler: nil,
executor: context.executor,
bodyStreamContinuation: continuation
)
case .requestHeadSent, .endForwarded, .finished, .producing, .paused(continuation: .none):
self.state = .finished(error: error)
self.state = .finished(.failure(error))
return .cancel(
requestContinuation: context.continuation,
scheduler: nil,
Expand Down
Loading
Loading