From 6697a102011a84fdd7dfbc62112ec74c4c07a8a0 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 24 Jun 2021 14:20:16 +0100 Subject: [PATCH 1/4] work --- examples/async/async_post.ml | 2 +- lib/body.ml | 95 +++++++++++++++++------------- lib/client_connection.ml | 6 +- lib/httpaf.mli | 2 +- lib/reqd.ml | 8 +-- lib/server_connection.ml | 3 +- lib_test/test_server_connection.ml | 75 ++++++++++++++++++++--- 7 files changed, 129 insertions(+), 62 deletions(-) diff --git a/examples/async/async_post.ml b/examples/async/async_post.ml index b0fd25b9..8c8994bd 100644 --- a/examples/async/async_post.ml +++ b/examples/async/async_post.ml @@ -28,7 +28,7 @@ let main port host () = don't_wait_for ( Reader.read_one_chunk_at_a_time stdin ~handle_chunk:(fun bs ~pos:off ~len -> Body.Writer.write_bigstring request_body bs ~off ~len; - Body.Writer.flush request_body (fun () -> ()); + Body.Writer.flush request_body Fn.ignore; return (`Consumed(len, `Need_unknown))) >>| function | `Eof_with_unconsumed_data s -> Body.Writer.write_string request_body s; diff --git a/lib/body.ml b/lib/body.ml index d5f04244..cbab45c3 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -102,18 +102,20 @@ module Reader = struct end module Writer = struct + module Writer = Serialize.Writer + type encoding = | Identity | Chunked of { mutable written_final_chunk : bool } type t = - { faraday : Faraday.t - ; encoding : encoding - ; when_ready_to_write : unit -> unit - ; buffered_bytes : int ref + { faraday : Faraday.t + ; writer : Writer.t + ; encoding : encoding + ; buffered_bytes : int ref } - let of_faraday faraday ~encoding ~when_ready_to_write = + let of_faraday faraday writer ~encoding = let encoding = match encoding with | `Fixed _ | `Close_delimited -> Identity @@ -121,29 +123,35 @@ module Writer = struct in { faraday ; encoding - ; when_ready_to_write + ; writer ; buffered_bytes = ref 0 } - let create buffer ~encoding ~when_ready_to_write = - of_faraday (Faraday.of_bigstring buffer) ~encoding ~when_ready_to_write + let create buffer writer ~encoding = + of_faraday (Faraday.of_bigstring buffer) writer ~encoding let write_char t c = - Faraday.write_char t.faraday c + if not (Faraday.is_closed t.faraday) then + Faraday.write_char t.faraday c let write_string t ?off ?len s = - Faraday.write_string ?off ?len t.faraday s + if not (Faraday.is_closed t.faraday) then + Faraday.write_string ?off ?len t.faraday s let write_bigstring t ?off ?len b = - Faraday.write_bigstring ?off ?len t.faraday b + if not (Faraday.is_closed t.faraday) then + Faraday.write_bigstring ?off ?len t.faraday b let schedule_bigstring t ?off ?len (b:Bigstringaf.t) = - Faraday.schedule_bigstring ?off ?len t.faraday b + if not (Faraday.is_closed t.faraday) then + Faraday.schedule_bigstring ?off ?len t.faraday b - let ready_to_write t = t.when_ready_to_write () + let ready_to_write t = Writer.wakeup t.writer let flush t kontinue = - Faraday.flush t.faraday kontinue; + if Writer.is_closed t.writer then kontinue `Closed; + Faraday.flush t.faraday (fun () -> + kontinue (if Writer.is_closed t.writer then `Closed else `Written)); ready_to_write t let is_closed t = @@ -166,33 +174,38 @@ module Writer = struct in faraday_has_output || additional_encoding_output - let transfer_to_writer t writer = + let transfer_to_writer t = let faraday = t.faraday in - begin match Faraday.operation faraday with - | `Yield -> () - | `Close -> - (match t.encoding with - | Identity -> () - | Chunked ({ written_final_chunk } as chunked) -> - if not written_final_chunk then begin - chunked.written_final_chunk <- true; - Serialize.Writer.schedule_chunk writer []; - end); - Serialize.Writer.unyield writer; - | `Writev iovecs -> - let buffered = t.buffered_bytes in - begin match IOVec.shiftv iovecs !buffered with - | [] -> () - | iovecs -> - let lengthv = IOVec.lengthv iovecs in - buffered := !buffered + lengthv; - begin match t.encoding with - | Identity -> Serialize.Writer.schedule_fixed writer iovecs - | Chunked _ -> Serialize.Writer.schedule_chunk writer iovecs - end; - Serialize.Writer.flush writer (fun () -> - Faraday.shift faraday lengthv; - buffered := !buffered - lengthv) - end + if Writer.is_closed t.writer then begin + Faraday.close faraday; + (* Cause all pending flushes to be called *) + ignore (Faraday.drain faraday : int) + end else begin + match Faraday.operation faraday with + | `Yield -> () + | `Close -> + (match t.encoding with + | Identity -> () + | Chunked ({ written_final_chunk } as chunked) -> + if not written_final_chunk then begin + chunked.written_final_chunk <- true; + Serialize.Writer.schedule_chunk t.writer []; + end); + Serialize.Writer.unyield t.writer; + | `Writev iovecs -> + let buffered = t.buffered_bytes in + begin match IOVec.shiftv iovecs !buffered with + | [] -> () + | iovecs -> + let lengthv = IOVec.lengthv iovecs in + buffered := !buffered + lengthv; + begin match t.encoding with + | Identity -> Serialize.Writer.schedule_fixed t.writer iovecs + | Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs + end; + Serialize.Writer.flush t.writer (fun () -> + Faraday.shift faraday lengthv; + buffered := !buffered - lengthv) + end end end diff --git a/lib/client_connection.ml b/lib/client_connection.ml index 443aad5e..5d04b7c9 100644 --- a/lib/client_connection.ml +++ b/lib/client_connection.ml @@ -71,8 +71,8 @@ module Oneshot = struct | `Error `Bad_request -> failwith "Httpaf.Client_connection.request: invalid body length" in - Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) - ~encoding ~when_ready_to_write:(fun () -> Writer.wakeup writer) + Body.Writer.create (Bigstringaf.create config.request_body_buffer_size) writer + ~encoding in let t = { request @@ -89,7 +89,7 @@ module Oneshot = struct let flush_request_body t = if Body.Writer.has_pending_output t.request_body - then Body.Writer.transfer_to_writer t.request_body t.writer + then Body.Writer.transfer_to_writer t.request_body ;; let set_error_and_handle_without_shutdown t error = diff --git a/lib/httpaf.mli b/lib/httpaf.mli index efb05ade..14e74171 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -497,7 +497,7 @@ module Body : sig modified until a subsequent call to {!flush} has successfully completed. *) - val flush : t -> (unit -> unit) -> unit + val flush : t -> ([ `Written | `Closed ] -> unit) -> unit (** [flush t f] makes all bytes in [t] available for writing to the awaiting output channel. Once those bytes have reached that output channel, [f] will be called. diff --git a/lib/reqd.ml b/lib/reqd.ml index 5569bf1a..ea6eeed6 100644 --- a/lib/reqd.ml +++ b/lib/reqd.ml @@ -162,10 +162,7 @@ let unsafe_respond_with_streaming ~flush_headers_immediately t response = | `Error (`Bad_gateway | `Internal_server_error) -> failwith "httpaf.Reqd.respond_with_streaming: invalid response body length" in - let response_body = - Body.Writer.create t.response_body_buffer ~encoding ~when_ready_to_write:(fun () -> - Writer.wakeup t.writer) - in + let response_body = Body.Writer.create t.response_body_buffer t.writer ~encoding in Writer.write_response t.writer response; if t.persistent then t.persistent <- Response.persistent_connection response; @@ -256,6 +253,5 @@ let flush_request_body t = let flush_response_body t = match t.response_state with - | Streaming (_, response_body) -> - Body.Writer.transfer_to_writer response_body t.writer + | Streaming (_, response_body) -> Body.Writer.transfer_to_writer response_body | _ -> () diff --git a/lib/server_connection.ml b/lib/server_connection.ml index 9a261037..126af5d3 100644 --- a/lib/server_connection.ml +++ b/lib/server_connection.ml @@ -197,8 +197,7 @@ let set_error_and_handle ?request t error = | `Error (`Bad_gateway | `Internal_server_error) -> failwith "httpaf.Server_connection.error_handler: invalid response body length" in - Body.Writer.of_faraday (Writer.faraday writer) ~encoding - ~when_ready_to_write:(fun () -> Writer.wakeup writer)); + Body.Writer.of_faraday (Writer.faraday writer) writer ~encoding); end let report_exn t exn = diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index b134a937..187ee892 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -296,7 +296,7 @@ let echo_handler response reqd = let response_body = Reqd.respond_with_streaming reqd response in let rec on_read buffer ~off ~len = Body.Writer.write_string response_body (Bigstringaf.substring ~off ~len buffer); - Body.Writer.flush response_body (fun () -> + Body.Writer.flush response_body (fun _ -> Body.Reader.schedule_read request_body ~on_eof ~on_read) and on_eof () = print_endline "echo handler eof"; @@ -316,7 +316,9 @@ let streaming_handler ?(flush=false) response writes reqd = | w :: ws -> Body.Writer.write_string body w; writes := ws; - Body.Writer.flush body write + Body.Writer.flush body (function + | `Closed -> () + | `Written -> write ()) in write (); ;; @@ -683,9 +685,11 @@ let test_chunked_encoding () = let response = Response.create `OK ~headers:Headers.encoding_chunked in let resp_body = Reqd.respond_with_streaming reqd response in Body.Writer.write_string resp_body "First chunk"; - Body.Writer.flush resp_body (fun () -> - Body.Writer.write_string resp_body "Second chunk"; - Body.Writer.close resp_body); + Body.Writer.flush resp_body (function + | `Closed -> () + | `Written -> + Body.Writer.write_string resp_body "Second chunk"; + Body.Writer.close resp_body); in let t = create ~error_handler request_handler in writer_yielded t; @@ -712,9 +716,11 @@ let test_chunked_encoding_for_error () = `Bad_request error; let body = start_response Headers.encoding_chunked in Body.Writer.write_string body "Bad"; - Body.Writer.flush body (fun () -> - Body.Writer.write_string body " request"; - Body.Writer.close body); + Body.Writer.flush body (function + | `Closed -> assert false + | `Written -> + Body.Writer.write_string body " request"; + Body.Writer.close body); in let t = create ~error_handler (fun _ -> assert false) in let c = feed_string t " X\r\n\r\n" in @@ -749,6 +755,57 @@ let test_blocked_write_on_chunked_encoding () = write_string t ~msg:"second write" second_write ;; +let test_body_writing_when_socket_closes () = + let response = Response.create `OK ~headers:Headers.encoding_chunked in + let body_ref = ref None in + let request_handler reqd = + let body = Reqd.respond_with_streaming reqd response in + body_ref := Some body + in + let t = create request_handler in + writer_yielded t; + read_request t (Request.create `GET "/"); + + let (flush_result_testable : [ `Closed | `Written ] Alcotest.testable) = (module struct + type t = [ `Closed | `Written ] + let pp = Fmt.using (function `Closed -> "Closed" | `Written -> "Written") Fmt.string + let equal t t' = + match t, t' with + | `Closed, `Closed | `Written, `Written -> true + | _ -> false end) + in + + let body = Option.get !body_ref in + let check_flush ~expect service_writer = + let flush_result = ref None in + Body.Writer.flush body (fun r -> flush_result := Some r); + service_writer (); + Alcotest.(check' (option flush_result_testable)) + ~msg:"flush_result is as expected" + ~expected:(Some expect) + ~actual:!flush_result; + in + + Body.Writer.write_string body "First chunk"; + check_flush (fun () -> + write_response t + ~msg:"First chunk written" + ~body:"b\r\nFirst chunk\r\n" + response) + ~expect:`Written; + + Body.Writer.write_string body "Second chunk"; + check_flush (fun () -> write_eof t) ~expect:`Closed; + + (* Writing after the writer is closed does not raise, but flushes get immediately + resolved with `Closed. *) + Body.Writer.write_string body "Chunk after closed"; + check_flush (fun () -> ()) ~expect:`Closed; + + Body.Writer.close body; + check_flush (fun () -> ()) ~expect:`Closed; +;; + let test_unexpected_eof () = let t = create default_request_handler in read_request t (Request.create `GET "/"); @@ -1051,6 +1108,7 @@ let test_schedule_read_with_data_available () = write_response t response; ;; + let tests = [ "initial reader state" , `Quick, test_initial_reader_state ; "shutdown reader closed", `Quick, test_reader_is_closed_after_eof @@ -1071,6 +1129,7 @@ let tests = ; "chunked encoding", `Quick, test_chunked_encoding ; "chunked encoding for error", `Quick, test_chunked_encoding_for_error ; "blocked write on chunked encoding", `Quick, test_blocked_write_on_chunked_encoding + ; "body writing when socket closes", `Quick, test_body_writing_when_socket_closes ; "writer unexpected eof", `Quick, test_unexpected_eof ; "input shrunk", `Quick, test_input_shrunk ; "failed request parse", `Quick, test_failed_request_parse From 34da7e998c0b9965328ba23ef0bc3c0747db8ad6 Mon Sep 17 00:00:00 2001 From: David House Date: Thu, 24 Jun 2021 14:25:00 +0100 Subject: [PATCH 2/4] finesse --- lib_test/test_server_connection.ml | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/lib_test/test_server_connection.ml b/lib_test/test_server_connection.ml index 187ee892..d6663251 100644 --- a/lib_test/test_server_connection.ml +++ b/lib_test/test_server_connection.ml @@ -686,7 +686,7 @@ let test_chunked_encoding () = let resp_body = Reqd.respond_with_streaming reqd response in Body.Writer.write_string resp_body "First chunk"; Body.Writer.flush resp_body (function - | `Closed -> () + | `Closed -> assert false | `Written -> Body.Writer.write_string resp_body "Second chunk"; Body.Writer.close resp_body); @@ -766,13 +766,9 @@ let test_body_writing_when_socket_closes () = writer_yielded t; read_request t (Request.create `GET "/"); - let (flush_result_testable : [ `Closed | `Written ] Alcotest.testable) = (module struct - type t = [ `Closed | `Written ] - let pp = Fmt.using (function `Closed -> "Closed" | `Written -> "Written") Fmt.string - let equal t t' = - match t, t' with - | `Closed, `Closed | `Written, `Written -> true - | _ -> false end) + let flush_result_testable = + Alcotest.of_pp + (Fmt.using (function `Closed -> "Closed" | `Written -> "Written") Fmt.string) in let body = Option.get !body_ref in From b0f9d8a30931c01305cd34c8bc78a1d0c9864152 Mon Sep 17 00:00:00 2001 From: David House Date: Mon, 28 Jun 2021 11:36:51 +0100 Subject: [PATCH 3/4] work --- lib/body.ml | 38 ++++++++++++++++++++++++++------------ lib/httpaf.mli | 12 +++++++----- lib/serialize.ml | 28 +++++++++++++++++----------- 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/lib/body.ml b/lib/body.ml index cbab45c3..3dd38a37 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -149,14 +149,27 @@ module Writer = struct let ready_to_write t = Writer.wakeup t.writer let flush t kontinue = - if Writer.is_closed t.writer then kontinue `Closed; - Faraday.flush t.faraday (fun () -> - kontinue (if Writer.is_closed t.writer then `Closed else `Written)); - ready_to_write t + if Writer.is_closed t.writer then + kontinue `Closed + else begin + Faraday.flush_with_reason t.faraday (fun reason -> + let result = + match reason with + | `Nothing_pending | `Shift -> `Written + | `Drain -> `Closed + in + kontinue result); + ready_to_write t + end let is_closed t = Faraday.is_closed t.faraday + let close_and_drain t = + Faraday.close t.faraday; + (* Resolve all pending flushes *) + ignore (Faraday.drain t.faraday : int) + let close t = Faraday.close t.faraday; ready_to_write t; @@ -176,11 +189,9 @@ module Writer = struct let transfer_to_writer t = let faraday = t.faraday in - if Writer.is_closed t.writer then begin - Faraday.close faraday; - (* Cause all pending flushes to be called *) - ignore (Faraday.drain faraday : int) - end else begin + if Writer.is_closed t.writer then + close_and_drain t + else begin match Faraday.operation faraday with | `Yield -> () | `Close -> @@ -203,9 +214,12 @@ module Writer = struct | Identity -> Serialize.Writer.schedule_fixed t.writer iovecs | Chunked _ -> Serialize.Writer.schedule_chunk t.writer iovecs end; - Serialize.Writer.flush t.writer (fun () -> - Faraday.shift faraday lengthv; - buffered := !buffered - lengthv) + Serialize.Writer.flush t.writer (fun result -> + match result with + | `Closed -> close_and_drain t + | `Written -> + Faraday.shift faraday lengthv; + buffered := !buffered - lengthv) end end end diff --git a/lib/httpaf.mli b/lib/httpaf.mli index 14e74171..42cef598 100644 --- a/lib/httpaf.mli +++ b/lib/httpaf.mli @@ -498,9 +498,10 @@ module Body : sig completed. *) val flush : t -> ([ `Written | `Closed ] -> unit) -> unit - (** [flush t f] makes all bytes in [t] available for writing to the awaiting - output channel. Once those bytes have reached that output channel, [f] - will be called. + (** [flush t f] makes all bytes in [t] available for writing to the awaiting output + channel. Once those bytes have reached that output channel, [f `Written] will be + called. If instead, the output channel is closed before all of those bytes are + successfully written, [f `Closed] will be called. The type of the output channel is runtime-dependent, as are guarantees about whether those packets have been queued for delivery or have @@ -512,8 +513,9 @@ module Body : sig to the output channel. *) val is_closed : t -> bool - (** [is_closed t] is [true] if {!close} has been called on [t] and [false] - otherwise. A closed [t] may still have pending output. *) + (** [is_closed t] is [true] if {!close} has been called on [t], or if the attached + output channel is closed (e.g. because [report_write_result `Closed] has been + called). A closed [t] may still have pending output. *) end end diff --git a/lib/serialize.ml b/lib/serialize.ml index 61c22131..3009e2c1 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -89,18 +89,18 @@ let schedule_bigstring_chunk t chunk = module Writer = struct type t = { buffer : Bigstringaf.t - (* The buffer that the encoder uses for buffered writes. Managed by the - * control module for the encoder. *) + (* The buffer that the encoder uses for buffered writes. Managed by the + * control module for the encoder. *) ; encoder : Faraday.t - (* The encoder that handles encoding for writes. Uses the [buffer] - * referenced above internally. *) + (* The encoder that handles encoding for writes. Uses the [buffer] + * referenced above internally. *) ; mutable drained_bytes : int - (* The number of bytes that were not written due to the output stream - * being closed before all buffered output could be written. Useful for - * detecting error cases. *) + (* The number of bytes that were not written due to the output stream + * being closed before all buffered output could be written. Useful for + * detecting error cases. *) ; mutable wakeup : Optional_thunk.t - (* The callback from the runtime to be invoked when output is ready to be - * flushed. *) + (* The callback from the runtime to be invoked when output is ready to be + * flushed. *) } let create ?(buffer_size=0x800) () = @@ -158,13 +158,19 @@ module Writer = struct ;; let flush t f = - flush t.encoder f + flush_with_reason t.encoder (fun reason -> + let result = + match reason with + | `Nothing_pending | `Shift -> `Written + | `Drain -> `Closed + in + f result) let unyield t = (* This would be better implemented by a function that just takes the encoder out of a yielded state if it's in that state. Requires a change to the faraday library. *) - flush t (fun () -> ()) + flush t (fun _result -> ()) let yield t = Faraday.yield t.encoder From e27e536be3b4b00905c87d5cfe36e5eeece1a612 Mon Sep 17 00:00:00 2001 From: Doug Patti Date: Wed, 12 Jan 2022 18:28:52 -0500 Subject: [PATCH 4/4] fix build against current faraday --- lib/body.ml | 4 ++-- lib/serialize.ml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/body.ml b/lib/body.ml index 3dd38a37..bd04c645 100644 --- a/lib/body.ml +++ b/lib/body.ml @@ -155,8 +155,8 @@ module Writer = struct Faraday.flush_with_reason t.faraday (fun reason -> let result = match reason with - | `Nothing_pending | `Shift -> `Written - | `Drain -> `Closed + | Nothing_pending | Shift -> `Written + | Drain -> `Closed in kontinue result); ready_to_write t diff --git a/lib/serialize.ml b/lib/serialize.ml index 3009e2c1..4ac9ee1f 100644 --- a/lib/serialize.ml +++ b/lib/serialize.ml @@ -161,8 +161,8 @@ module Writer = struct flush_with_reason t.encoder (fun reason -> let result = match reason with - | `Nothing_pending | `Shift -> `Written - | `Drain -> `Closed + | Nothing_pending | Shift -> `Written + | Drain -> `Closed in f result)