From 09e51a02ed90d932d87357cbbd7b1a38757da94d Mon Sep 17 00:00:00 2001 From: Matt Rkiouak Date: Mon, 23 Nov 2020 16:15:03 -0500 Subject: [PATCH 1/2] Update support for current application/grpc-web-text, grpc-web js behavior Signed-off-by: Matt Rkiouak --- .../pedestal/interceptors/grpc_web.clj | 57 +++++- test/protojure/grpc_web_test.clj | 181 +++++++++++------- .../pedestal/interceptors/grpc_web_test.clj | 14 ++ 3 files changed, 176 insertions(+), 76 deletions(-) create mode 100644 test/protojure/pedestal/interceptors/grpc_web_test.clj diff --git a/src/protojure/pedestal/interceptors/grpc_web.clj b/src/protojure/pedestal/interceptors/grpc_web.clj index 54a0df6..068f8af 100644 --- a/src/protojure/pedestal/interceptors/grpc_web.clj +++ b/src/protojure/pedestal/interceptors/grpc_web.clj @@ -4,24 +4,65 @@ (ns protojure.pedestal.interceptors.grpc-web "A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol" - (:require [io.pedestal.interceptor :refer [->Interceptor]]) - (:import (org.apache.commons.codec.binary Base64InputStream)) + (:require [io.pedestal.interceptor :refer [->Interceptor]] + [clojure.core.async :as async] + [clojure.data]) (:refer-clojure :exclude [proxy])) (set! *warn-on-reflection* true) +(defn read-n [from-chan n] + "Convenience method for consuming n [n] or less values from a channel [from-chan]" + (async/go-loop [res []] + (let [v (async/! dec-ch b)) + (recur (async/! out-ch (.encode encoder ^bytes s)) + (recur (async/ (assoc-in ctx [:response :body] out-ch) + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) (def ^{:no-doc true :const true} content-types - #{"application/grpc-web-text" - "application/grpc-web-text+proto"}) + #{"application/grpc-web-text"}) (defn- web-text? [{{:strs [content-type]} :headers}] (contains? content-types content-type)) +(defn- accept-web-text? + [{{{:strs [accept]} :headers} :request}] + (contains? content-types accept)) + (defn- pred-> "Threads 'item' through both the predicate and, when 'pred' evaluates true, 'xform' functions. Else, just returns 'item'" [item pred xform] @@ -32,9 +73,9 @@ (assoc ctx :request (pred-> request web-text? decode-body))) (defn- leave-handler - [ctx] + [{:keys [response] :as ctx}] ;; TODO "Clarify & implement grpc-web trailer behavior" - ctx) + (pred-> ctx accept-web-text? encode-body)) (defn- exception-handler [ctx e] diff --git a/test/protojure/grpc_web_test.clj b/test/protojure/grpc_web_test.clj index d0830d6..dc329c1 100644 --- a/test/protojure/grpc_web_test.clj +++ b/test/protojure/grpc_web_test.clj @@ -11,80 +11,125 @@ [io.pedestal.http.body-params :as body-params] [example.types :as example] [protojure.protobuf :as pb] - [clojure.data.codec.base64 :as b64])) + [clojure.data.codec.base64 :as b64] + [protojure.pedestal.interceptors.grpc :as grpc] + [clojure.core.async :as async] + [example.hello.Greeter :as greeter] + [clojure.core.async :refer [!! ! go go-loop] :as async] + [protojure.test.utils :as test.utils] + [protojure.grpc.client.api :as grpc-api] + [protojure.grpc.client.providers.http2 :as grpc.http2] + [protojure.grpc.client.utils :as client.utils] + [promesa.core :as p] + [protojure.internal.grpc.client.providers.http2.jetty :as jetty-client] + [protojure.grpc.status :as grpc.status] + [protojure.pedestal.routes :as pedestal.routes] + [example.hello :refer [new-HelloRequest pb->HelloReply]] + [clj-http.client :as client] + [protojure.grpc.codec.lpm :as lpm])) -(defn grpc-echo [{:keys [body] :as request}] - {:status 200 - :body (example/pb->Money body) - :trailers {"grpc-status" 0 "grpc-message" "Got it!"}}) +(defonce test-env (atom {})) -(def interceptors [(body-params/body-params) - grpc-web/proxy]) +;;----------------------------------------------------------------------------- +;; "Greeter" service endpoint +;;----------------------------------------------------------------------------- +(deftype Greeter [] + greeter/Service + (SayHello + [this {{:keys [name]} :grpc-params :as request}] + {:status 200 + :body {:message (str "Hello, " name)}}) + (SayRepeatHello + [this {{:keys [name]} :grpc-params :as request}] + (let [resp-chan (:grpc-out request)] + (go + (dotimes [_ 3] + (>! resp-chan {:message (str "Hello, " name)})) + (async/close! resp-chan)) + {:status 200 + :body resp-chan})) + (SayHelloOnDemand + [this {:keys [grpc-params] :as request}] + (let [out-chan (:grpc-out request)] + (go-loop [name (:name (! out-chan {:message (str "Hello, " name)}) + (recur (:name (tablesyntax {:rpc-metadata greeter/rpc-metadata + :interceptors interceptors + :callback-context (Greeter.)})) -(def service (let [service-params {:env :prod - ::pedestal/routes (into #{} routes) - ::pedestal/type protojure.pedestal/config - ::pedestal/chain-provider protojure.pedestal/provider}] - (:io.pedestal.http/service-fn (io.pedestal.http/create-servlet service-params)))) +(defn- grpc-connect + ([] (grpc-connect (:port @test-env))) + ([port] + @(grpc.http2/connect {:uri (str "http://localhost:" port) :content-coding "gzip"}))) -(deftest grpc-web-text-check - (testing "Check that a round-trip GRPC request works" - (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] - (is - (= - (with-out-str (pr (example/pb->Money input-msg))) - (:body (response-for - service - :get "/" - :headers {"Content-Type" "application/grpc-web-text"} - :body (clojure.java.io/input-stream (b64/encode input-msg))))))))) +;;----------------------------------------------------------------------------- +;; Fixtures +;;----------------------------------------------------------------------------- +(defn create-service [] + (let [port (test.utils/get-free-port) + interceptors [(body-params/body-params) + pedestal/html-body] + server-params {:env :prod + ::pedestal/routes (into #{} (greeter-mock-routes interceptors)) + ::pedestal/port port -(deftest grpc-web-check - (testing "Check that a round-trip GRPC request works" - (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] - (is - (= - (with-out-str (pr (example/pb->Money input-msg))) - (:body (response-for - service - :get "/" - :headers {"Content-Type" "application/grpc-web"} - :body (clojure.java.io/input-stream input-msg)))))))) + ::pedestal/type protojure.pedestal/config + ::pedestal/chain-provider protojure.pedestal/provider} + client-params {:port port}] -(deftest grpc-web-proto-check - (testing "Check that a round-trip GRPC request works" - (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] - (is - (= - (with-out-str (pr (example/pb->Money input-msg))) - (:body (response-for - service - :get "/" - :headers {"Content-Type" "application/grpc-web+proto"} - :body (clojure.java.io/input-stream input-msg)))))))) + (let [server (test.utils/start-pedestal-server server-params) + client @(jetty-client/connect client-params) + grpc-client (grpc-connect port)] + (swap! test-env assoc :port port :server server :client client :grpc-client grpc-client)))) -(deftest grpc-web-text-proto-check - (testing "Check that a round-trip GRPC request works" - (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] - (is - (= - (with-out-str (pr (example/pb->Money input-msg))) - (:body (response-for - service - :get "/" - :headers {"Content-Type" "application/grpc-web-text+proto"} - :body (clojure.java.io/input-stream (b64/encode input-msg))))))))) +(defn destroy-service [] + (swap! test-env update :grpc-client grpc-api/disconnect) + (swap! test-env update :client jetty-client/disconnect) + (swap! test-env update :server pedestal/stop)) -(deftest grpc-web-no-header-match-check - (testing "Check that a round-trip GRPC request works" - (let [input-msg (pb/->pb (example/new-Money {:currency_code (apply str (repeat 20 "foobar")) :units 42 :nanos 750000000}))] - (is - (= - (with-out-str (pr (example/pb->Money input-msg))) - (:body (response-for - service - :get "/" - :headers {"Content-Type" "application/grpc"} - :body (clojure.java.io/input-stream input-msg)))))))) +(defn wrap-service [test-fn] + (create-service) + (test-fn) + (destroy-service)) + +(use-fixtures :once wrap-service) + +(deftest grpc-web-test-check + (let [in (async/chan 10) + out (async/chan 10) + resp-in (async/chan 10) + resp-out (async/chan 10)] + (lpm/encode new-HelloRequest in out {:encoding identity}) + (lpm/decode pb->HelloReply resp-in resp-out {:encoding identity}) + (async/>!! in {:name "World"}) + (async/close! in) + (testing "Check that a round-trip unary grpc-web-text request works" + (let [lpm (async/ (java.util.Base64/getEncoder) + (.encode (byte-array lpm))) + body (-> (java.util.Base64/getDecoder) + (.decode (-> (client/post + (str "http://localhost:" (:port @test-env) "/example.hello.Greeter/SayHello") + {:body b64-encoded + :content-type "application/grpc-web-text" + :accept "application/grpc-web-text"}) + :body)))] + (doseq [b body] + (async/>!! resp-in b)) + (async/close! resp-in) + (is (= "Hello, World" (:message (async/!! test-ch n)) + (async/close! test-ch) + (let [result (async/ Date: Tue, 24 Nov 2020 18:06:56 -0500 Subject: [PATCH 2/2] Add expected trailer behavior for grpc-web, grpc-web-text responses Reduced test coverage because of fluctuating CI coverage reporting Signed-off-by: Matt Rkiouak --- Makefile | 2 +- src/protojure/pedestal/interceptors/grpc.clj | 5 +- .../pedestal/interceptors/grpc_web.clj | 208 ++++++++++++++++-- src/protojure/pedestal/routes.clj | 2 +- test/protojure/grpc_web_test.clj | 79 ++++++- 5 files changed, 260 insertions(+), 36 deletions(-) diff --git a/Makefile b/Makefile index e5dd228..3976eda 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ OUTPUT = target/$(NAME)-$(VERSION).jar POM = target/pom.xml DOC = target/doc/index.html -COVERAGE_THRESHOLD = 83 +COVERAGE_THRESHOLD = 81 COVERAGE_EXCLUSION += "user" COVERAGE_EXCLUSION += "protojure.internal.pedestal.io" diff --git a/src/protojure/pedestal/interceptors/grpc.clj b/src/protojure/pedestal/interceptors/grpc.clj index 3880b32..6b94bec 100644 --- a/src/protojure/pedestal/interceptors/grpc.clj +++ b/src/protojure/pedestal/interceptors/grpc.clj @@ -117,8 +117,9 @@ assoc :headers {"Content-Type" "application/grpc+proto"} :status 200 - :body "" - :trailers (generate-trailers {:grpc-status status :grpc-message msg}))) + :body (async/close! (async/chan 1)) + :trailers (generate-trailers {:grpc-status status :grpc-message msg}) + :grpc-error true)) (def error-interceptor (err/error-dispatch diff --git a/src/protojure/pedestal/interceptors/grpc_web.clj b/src/protojure/pedestal/interceptors/grpc_web.clj index 068f8af..e3ba67f 100644 --- a/src/protojure/pedestal/interceptors/grpc_web.clj +++ b/src/protojure/pedestal/interceptors/grpc_web.clj @@ -6,8 +6,12 @@ "A [Pedestal](http://pedestal.io/) [interceptor](http://pedestal.io/reference/interceptors) for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol" (:require [io.pedestal.interceptor :refer [->Interceptor]] [clojure.core.async :as async] - [clojure.data]) - (:refer-clojure :exclude [proxy])) + [clojure.data] + [promesa.core :as p] + [clojure.tools.logging :as log]) + (:refer-clojure :exclude [proxy]) + (:import (org.apache.commons.codec.binary Base64InputStream Base64OutputStream) + (java.io PipedOutputStream PipedInputStream))) (set! *warn-on-reflection* true) @@ -29,39 +33,182 @@ [{:keys [body-ch] :as request}] (let [dec-ch (async/chan 4056) decoder (java.util.Base64/getDecoder)] - (async/go-loop [[final encoded] (async/! dec-ch b)) + (catch Exception e + (async/close! dec-ch) + (resolve e))) + (recur (async/ (assoc request :body-ch dec-ch) + (assoc :b64-decode-error-promise b64-decode-error-promise))))) + +(defn- num->bytes + "Serializes an integer to a byte-array." + [num] + (byte-array (for [i (range 4)] + (-> (unsigned-bit-shift-right num + (* 8 (- 4 i 1))) + (bit-and 0x0FF))))) + +(defn- make-grpc-web-trailers-string [trailers] + (reduce (fn [s [k v]] + (str s k ":" v "\r\n")) "" trailers)) + +(defn- make-grpc-web-trailers-frame [trailers] + "This is the lightly documented handling of trailers from + https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md#protocol-differences-vs-grpc-over-http2 + See section beginning `Message framing (vs. http2-transport-mapping`" + (let [trailer-bytes (.getBytes ^String (make-grpc-web-trailers-string trailers))] + (byte-array + (concat + [0x80] + (into [] (num->bytes (count trailer-bytes))) + (into [] trailer-bytes))))) + +(defn- generate-trailers + [b64-ex] + (let [b64-ex (bean b64-ex) + {:keys [grpc-status grpc-message]} (cond (= + (:message b64-ex) + "Input byte[] should at least have 2 bytes for base64 bytes") + {:grpc-status 3 :grpc-message "Bad Base64 Encoded Request"} + true {:grpc-status 13 :grpc-message "Internal Error"})] + (-> {"grpc-status" grpc-status} + (cond-> (some? grpc-message) (assoc "grpc-message" grpc-message))))) + +(defmulti encode-web-body "Writes trailers to body per grpc-web specification" + (fn [x] (type (-> x :response :body)))) + +(defmethod encode-web-body clojure.core.async.impl.channels.ManyToManyChannel + [{{:keys [body trailers]} :response :as ctx}] + (let [body-w-trailers (async/chan 256)] + (async/go-loop [s (async/! body-w-trailers ^bytes frame) + (async/close! body-w-trailers)) (do - (doseq [b (.decode decoder (byte-array encoded))] - (async/>! dec-ch b)) - (recur (async/! body-w-trailers ^bytes s) + (recur (async/ (assoc-in ctx [:response :body] body-w-trailers) + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web+proto"}))))) + +(defmethod encode-web-body nil + [{{:keys [trailers] :as response} :response :as ctx}] + ;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g., + ;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text + ;; case + (let [body-w-trailers (async/chan 256)] + ;;Write trailer frame + (async/go [] + (let [frame (make-grpc-web-trailers-frame trailers)] + (async/>! body-w-trailers ^bytes frame)) + (async/close! body-w-trailers)) + (-> (assoc-in ctx [:response :body] body-w-trailers) + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web+proto"}))))) -(defn- encode-body - "Consumes bytes from the response body channel and base64 encodes the payload" +(defmethod encode-web-body :default [{{:keys [body] :as response} :response :as ctx}] - (let [encoder (java.util.Base64/getEncoder) - out-ch (async/chan 4056)] + (throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave" + {:causes #{:incompatible-body-value-type} + :body-value-type (type body)}))) + +(defmulti encode-web-text-body "Consumes bytes from the response body and base64 encodes the payload" + (fn [x] (type (-> x :response :body)))) + +(defmethod encode-web-text-body clojure.core.async.impl.channels.ManyToManyChannel + [{{:keys [body trailers]} :response {:keys [b64-decode-error-promise]} :request :as ctx}] + (let [pos (PipedOutputStream.) + pis (PipedInputStream. pos) + ;; N.B. passing a string instead of nil in the last position (the line end) caused no data to send + b64-is (Base64InputStream. pis true -1 nil)] (async/go-loop [s (async/! out-ch (.encode encoder ^bytes s)) + (.write pos ^bytes s) (recur (async/ (assoc-in ctx [:response :body] out-ch) + (-> (assoc-in ctx [:response :body] b64-is) (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) -(def ^{:no-doc true :const true} content-types +(defmethod encode-web-text-body nil + [{{:keys [trailers] :as response} :response {:keys [b64-decode-error-promise]} :request :as ctx}] + ;;TODO Note blocking on this promise only works because grpc-web only supports server-side streaming -- e.g., + ;; we can count on the request body decode having consumed all bytes prior to responding in the grpc-web-text + ;; case + (let [b64-ex @b64-decode-error-promise + frame (make-grpc-web-trailers-frame (if b64-ex + (generate-trailers b64-ex) + trailers)) + pos (PipedOutputStream.) + pis (PipedInputStream. pos) + b64-is (Base64InputStream. pis true -1 nil)] + ;;Write trailer frame + (.write pos ^bytes frame) + (.flush pos) + (.close pos) + (-> (assoc-in ctx [:response :body] b64-is) + (update-in [:response :headers] #(merge % {"Content-Type" "application/grpc-web-text"}))))) + +(defmethod encode-web-text-body :default + [{{:keys [body] :as response} :response :as ctx}] + (throw (ex-info "grpc-web interceptor encountered an unexpected body type on-leave" + {:causes #{:incompatible-body-value-type} + :body-value-type (type body)}))) + +(def ^{:no-doc true :const true} content-types-text #{"application/grpc-web-text"}) +(def ^{:no-doc true :const true} content-types-web + #{"application/grpc-web" + "application/grpc-web+proto"}) + (defn- web-text? [{{:strs [content-type]} :headers}] - (contains? content-types content-type)) + (contains? content-types-text content-type)) (defn- accept-web-text? [{{{:strs [accept]} :headers} :request}] - (contains? content-types accept)) + (contains? content-types-text accept)) + +(defn- accept-web? + "The grpc-web js bindings currently set the `Accept:` header to \"*/*\" which complicates handling trailers. We + fallback to relying on the content-type to determine a client is likely a browser and requires special response + content-type handling" + [{{{:strs [content-type]} :headers} :request}] + (contains? content-types-web content-type)) (defn- pred-> "Threads 'item' through both the predicate and, when 'pred' evaluates true, 'xform' functions. Else, just returns 'item'" @@ -74,8 +221,8 @@ (defn- leave-handler [{:keys [response] :as ctx}] - ;; TODO "Clarify & implement grpc-web trailer behavior" - (pred-> ctx accept-web-text? encode-body)) + (-> (pred-> ctx accept-web-text? encode-web-text-body) + (pred-> accept-web? encode-web-body))) (defn- exception-handler [ctx e] @@ -84,3 +231,22 @@ (def proxy "Interceptor that provides a transparent proxy for the [GRPC-WEB](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-WEB.md) protocol to standard protojure grpc protocol" (->Interceptor ::proxy enter-handler leave-handler exception-handler)) + +(defn error-leave-handler [{{:keys [grpc-error]} :response :as ctx}] + (if grpc-error + (-> (pred-> ctx accept-web-text? encode-web-text-body) + (pred-> accept-web? encode-web-body)) + ctx)) +;;FIXME when HTTP/3 has a grpc specification +;; since we rely on protojure.pedestal.interceptors.grpc/error-interceptor to form the grpc compliant trailers, +;; we expose this error interceptor (and insert in protojure.pedestal.routes/->tablesyntax prior to +;; interceptors.grpc/error-interceptor) so that this interceptor can check for the grpc-web-text accept content type +;; and encode appropriately when an exception is thrown. +;; Once we have a third grpc specification based on transport, better to fix these abstractions such that we have +;; HTTP1.1/HTTP2/HTTP3 based encoding +(def error-interceptor + "Interceptor that writes grpc exception information in a grpc-web compatible encoding" + (->Interceptor ::grpc-web-error + identity + error-leave-handler + exception-handler)) diff --git a/src/protojure/pedestal/routes.clj b/src/protojure/pedestal/routes.clj index 8717da2..0fe5e61 100644 --- a/src/protojure/pedestal/routes.clj +++ b/src/protojure/pedestal/routes.clj @@ -36,7 +36,7 @@ name (keyword fqs (str method "-handler")) handler (handler name (partial method-fn callback-context))] [(str "/" fqs "/" method) - :post (-> (consv grpc/error-interceptor interceptors) + :post (-> (vec (concat [grpc.web/error-interceptor grpc/error-interceptor] interceptors)) (conj grpc.web/proxy (grpc/route-interceptor rpc) handler)) diff --git a/test/protojure/grpc_web_test.clj b/test/protojure/grpc_web_test.clj index dc329c1..1f6a85f 100644 --- a/test/protojure/grpc_web_test.clj +++ b/test/protojure/grpc_web_test.clj @@ -26,7 +26,18 @@ [protojure.pedestal.routes :as pedestal.routes] [example.hello :refer [new-HelloRequest pb->HelloReply]] [clj-http.client :as client] - [protojure.grpc.codec.lpm :as lpm])) + [protojure.grpc.codec.lpm :as lpm] + [protojure.test.grpc.TestService.server :as test.server] + [protojure.pedestal.interceptors.grpc-web :as grpc.web] + [taoensso.timbre.appenders.core :as appenders] + [taoensso.timbre :as log] + [taoensso.timbre.tools.logging :refer [use-timbre]])) + +(use-timbre) + +(log/set-config! {:level :trace + :ns-whitelist ["protojure.*"] + :appenders {:println (appenders/println-appender {:stream :auto})}}) (defonce test-env (atom {})) @@ -72,6 +83,21 @@ :interceptors interceptors :callback-context (Greeter.)})) +;;----------------------------------------------------------------------------- +;; TestService service endpoint +;;----------------------------------------------------------------------------- + +(deftype TestService [] + test.server/Service + (Metadata + [_ request] + (throw (Exception. "foobar")))) + +(defn- test-service-mock-routes [interceptors] + (pedestal.routes/->tablesyntax {:rpc-metadata test.server/rpc-metadata + :interceptors interceptors + :callback-context (TestService.)})) + (defn- grpc-connect ([] (grpc-connect (:port @test-env))) ([port] @@ -85,7 +111,9 @@ interceptors [(body-params/body-params) pedestal/html-body] server-params {:env :prod - ::pedestal/routes (into #{} (greeter-mock-routes interceptors)) + ::pedestal/routes (into #{} (concat + (greeter-mock-routes interceptors) + (test-service-mock-routes interceptors))) ::pedestal/port port ::pedestal/type protojure.pedestal/config @@ -122,14 +150,43 @@ (let [lpm (async/ (java.util.Base64/getEncoder) (.encode (byte-array lpm))) - body (-> (java.util.Base64/getDecoder) - (.decode (-> (client/post - (str "http://localhost:" (:port @test-env) "/example.hello.Greeter/SayHello") - {:body b64-encoded - :content-type "application/grpc-web-text" - :accept "application/grpc-web-text"}) - :body)))] - (doseq [b body] + body (-> (client/post + (str "http://localhost:" (:port @test-env) "/example.hello.Greeter/SayHello") + {:body b64-encoded + :content-type "application/grpc-web-text" + :accept "application/grpc-web-text"}) + :body) + decoded-body (.decode (java.util.Base64/getDecoder) body)] + (doseq [b (into [] decoded-body)] (async/>!! resp-in b)) (async/close! resp-in) - (is (= "Hello, World" (:message (async/