diff --git a/README.md b/README.md index 8bed08a..e799e8e 100644 --- a/README.md +++ b/README.md @@ -253,6 +253,15 @@ Allow Microphone access when prompted. ,) ``` +You can run the same flow against the local WebGPU microservice (see +`webgpu-local-pipeline/`) by passing `:pipeline/provider :webgpu` and, if +necessary, a custom `:webgpu/base-url`: + +```clojure +(make-local-flow {:pipeline/provider :webgpu + :webgpu/base-url "http://localhost:4173"}) +``` + Which roughly translates to: ![Flow Diagram](./resources/flow.png) diff --git a/bin/chat b/bin/chat index 9c9dcce..e56fcf5 100755 --- a/bin/chat +++ b/bin/chat @@ -27,7 +27,7 @@ fi if [ ! -f "resources/secrets.edn" ]; then print_color $RED "❌ Error: resources/secrets.edn not found" print_color $YELLOW "Please create resources/secrets.edn with your OpenAI API key:" - echo '{:openai {:new-api-sk "your-openai-api-key-here"}}' + echo '{:openai {:api-key "your-openai-api-key-here"}}' exit 1 fi @@ -45,6 +45,10 @@ while [[ $# -gt 0 ]]; do ARGS="$ARGS --scenario" shift ;; + --webgpu) + ARGS="$ARGS --webgpu" + shift + ;; --help|-h) HELP=true shift @@ -95,6 +99,10 @@ if [[ "$ARGS" == *"--scenario"* ]]; then print_color $YELLOW "🎭 Scenario mode enabled" fi +if [[ "$ARGS" == *"--webgpu"* ]]; then + print_color $YELLOW "🧠 Using local WebGPU service" +fi + print_color $GREEN "πŸ”§ Loading Clojure dependencies..." # Run the chat demo diff --git a/examples/src/simulflow_examples/local.clj b/examples/src/simulflow_examples/local.clj index e239674..1698e09 100644 --- a/examples/src/simulflow_examples/local.clj +++ b/examples/src/simulflow_examples/local.clj @@ -15,6 +15,7 @@ [simulflow.transport.out :as transport-out] [simulflow.utils.core :as u] [simulflow.vad.silero :as silero] + [simulflow-examples.webgpu-processors :as webgpu] [taoensso.telemere :as t])) (t/set-min-level! :debug) @@ -28,117 +29,145 @@ these are the defaults. See each process for details " ([] (make-local-flow {})) - ([{:keys [llm-context extra-procs extra-conns debug? - language chunk-duration-ms] - :or {llm-context {:messages - [{:role "system" - :content "You are a voice agent operating via phone. Be + ([opts] + (let [{:keys [llm-context extra-procs extra-conns debug? + language chunk-duration-ms] + :or {llm-context {:messages + [{:role "system" + :content "You are a voice agent operating via phone. Be concise in your answers. The input you receive comes from a speech-to-text (transcription) system that isn't always efficient and may send unclear text. Ask for clarification when you're unsure what the person said."}] - :tools - [{:type :function - :function - {:name "get_weather" - :handler (fn [{:keys [town]}] (str "The weather in " town " is 17 degrees celsius")) - :description "Get the current weather of a location" - :parameters {:type :object - :required [:town] - :properties {:town {:type :string - :description "Town for which to retrieve the current weather"}} - :additionalProperties false} - :strict true}}]} - - language :en - - debug? false - chunk-duration-ms 20 - extra-procs {} - extra-conns []}}] - - (flow/create-flow - {:procs - (u/deep-merge - {;; Capture audio from microphone and send raw-audio-input frames further in the pipeline - :transport-in {:proc transport-in/microphone-transport-in - :args {:vad/analyser :vad.analyser/silero}} - ;; raw-audio-input -> transcription frames - :transcriptor {:proc deepgram/deepgram-processor - :args {:transcription/api-key (secret [:deepgram :api-key]) - :transcription/interim-results? true - :transcription/punctuate? false - :transcription/vad-events? false - :transcription/smart-format? true - :transcription/model :nova-2 - :transcription/utterance-end-ms 1000 - :transcription/language language}} - - ;; user transcription & llm message frames -> llm-context frames - ;; responsible for keeping the full conversation history - :context-aggregator {:proc context/context-aggregator - :args {:llm/context llm-context - :aggregator/debug? debug?}} - - ;; Takes llm-context frames and produces new llm-text-chunk & llm-tool-call-chunk frames - :llm {:proc openai/openai-llm-process - :args {:openai/api-key (secret [:openai :new-api-sk]) - :llm/model :gpt-4.1-mini}} - - ;; llm-text-chunk & llm-tool-call-chunk -> llm-context-messages-append frames - :assistant-context-assembler {:proc context/assistant-context-assembler - :args {:debug? debug?}} - - ;; llm-text-chunk -> sentence speak frames (faster for text to speech) - :llm-sentence-assembler {:proc context/llm-sentence-assembler} - - ;; speak-frames -> audio-output-raw frames - :tts {:proc xi/elevenlabs-tts-process - :args {:elevenlabs/api-key (secret [:elevenlabs :api-key]) - :elevenlabs/model-id "eleven_flash_v2_5" - :elevenlabs/voice-id (secret [:elevenlabs :voice-id]) - :voice/stability 0.5 - :voice/similarity-boost 0.8 - :voice/use-speaker-boost? true - :pipeline/language language}} - - ;; audio-output-raw -> smaller audio-output-raw frames (used for sending audio in realtime) - :audio-splitter {:proc transport/audio-splitter - :args {:audio.out/duration-ms chunk-duration-ms}} - - ;; speakers out - :transport-out {:proc transport-out/realtime-speakers-out-processor - :args {:audio.out/sending-interval chunk-duration-ms - :audio.out/duration-ms chunk-duration-ms}} - - :activity-monitor {:proc activity-monitor/process - :args {::activity-monitor/timeout-ms 5000}}} - extra-procs) - :conns (concat - [[[:transport-in :out] [:transcriptor :in]] - - [[:transcriptor :out] [:context-aggregator :in]] - [[:transport-in :sys-out] [:context-aggregator :sys-in]] - [[:context-aggregator :out] [:llm :in]] - - ;; Aggregate full context - [[:llm :out] [:assistant-context-assembler :in]] - [[:assistant-context-assembler :out] [:context-aggregator :in]] - - ;; Assemble sentence by sentence for fast speech - [[:llm :out] [:llm-sentence-assembler :in]] - [[:llm-sentence-assembler :sys-out] [:tts :sys-in]] - - [[:tts :out] [:audio-splitter :in]] - [[:audio-splitter :out] [:transport-out :in]] - - ;; Activity detection - [[:transport-out :sys-out] [:activity-monitor :sys-in]] - [[:transport-in :sys-out] [:activity-monitor :sys-in]] - [[:transcriptor :sys-out] [:activity-monitor :sys-in]] - [[:activity-monitor :out] [:context-aggregator :in]] - [[:activity-monitor :out] [:tts :in]]] - extra-conns)}))) + :tools + [{:type :function + :function + {:name "get_weather" + :handler (fn [{:keys [town]}] (str "The weather in " town " is 17 degrees celsius")) + :description "Get the current weather of a location" + :parameters {:type :object + :required [:town] + :properties {:town {:type :string + :description "Town for which to retrieve the current weather"}} + :additionalProperties false} + :strict true}}]} + + language :en + + debug? false + chunk-duration-ms 20 + extra-procs {} + extra-conns []}} opts + provider (keyword (or (:pipeline/provider opts) :cloud)) + base-url (:webgpu/base-url opts) + stt (:webgpu/stt opts) + llm (:webgpu/llm opts) + tts (:webgpu/tts opts) + webgpu-base (or base-url (System/getenv "WEBGPU_SERVICE_URL") "http://localhost:4173") + webgpu-common {:service/base-url webgpu-base} + stt-args (merge webgpu-common {:audio/sample-rate 16000} stt) + llm-args (merge webgpu-common llm) + tts-args (merge webgpu-common tts) + transcriptor (if (= provider :webgpu) + {:proc webgpu/speech-stt-process + :args stt-args} + {:proc deepgram/deepgram-processor + :args {:transcription/api-key (secret [:deepgram :api-key]) + :transcription/interim-results? true + :transcription/punctuate? false + :transcription/vad-events? false + :transcription/smart-format? true + :transcription/model :nova-2 + :transcription/utterance-end-ms 1000 + :transcription/language language}}) + llm-proc (if (= provider :webgpu) + {:proc webgpu/text-llm-process + :args llm-args} + {:proc openai/openai-llm-process + :args {:openai/api-key (secret [:openai :api-key]) + :llm/model :gpt-4.1-mini}}) + tts-proc (if (= provider :webgpu) + {:proc webgpu/speech-tts-process + :args (merge {:service/path "/tts"} tts-args)} + {:proc xi/elevenlabs-tts-process + :args {:elevenlabs/api-key (secret [:elevenlabs :api-key]) + :elevenlabs/model-id "eleven_flash_v2_5" + :elevenlabs/voice-id (secret [:elevenlabs :voice-id]) + :voice/stability 0.5 + :voice/similarity-boost 0.8 + :voice/use-speaker-boost? true + :pipeline/language language}})] + + (when (= provider :webgpu) + (t/log! {:level :info :id :simulflow.local} + (str "Using WebGPU service " webgpu-base " for STT/LLM/TTS"))) + + (flow/create-flow + {:procs + (u/deep-merge + {;; Capture audio from microphone and send raw-audio-input frames further in the pipeline + :transport-in {:proc transport-in/microphone-transport-in + :args {:vad/analyser :vad.analyser/silero}} + ;; raw-audio-input -> transcription frames + :transcriptor transcriptor + + ;; user transcription & llm message frames -> llm-context frames + ;; responsible for keeping the full conversation history + :context-aggregator {:proc context/context-aggregator + :args {:llm/context llm-context + :aggregator/debug? debug?}} + + ;; Takes llm-context frames and produces new llm-text-chunk & llm-tool-call-chunk frames + :llm llm-proc + + ;; llm-text-chunk & llm-tool-call-chunk -> llm-context-messages-append frames + :assistant-context-assembler {:proc context/assistant-context-assembler + :args {:debug? debug?}} + + ;; llm-text-chunk -> sentence speak frames (faster for text to speech) + :llm-sentence-assembler {:proc context/llm-sentence-assembler} + + ;; speak-frames -> audio-output-raw frames + :tts tts-proc + + ;; audio-output-raw -> smaller audio-output-raw frames (used for sending audio in realtime) + :audio-splitter {:proc transport/audio-splitter + :args {:audio.out/duration-ms chunk-duration-ms}} + + ;; speakers out + :transport-out {:proc transport-out/realtime-speakers-out-processor + :args {:audio.out/sending-interval chunk-duration-ms + :audio.out/duration-ms chunk-duration-ms}} + + :activity-monitor {:proc activity-monitor/process + :args {::activity-monitor/timeout-ms 5000}}} + extra-procs) + :conns (concat + [[[:transport-in :out] [:transcriptor :in]] + + [[:transcriptor :out] [:context-aggregator :in]] + [[:transport-in :sys-out] [:context-aggregator :sys-in]] + [[:context-aggregator :out] [:llm :in]] + + ;; Aggregate full context + [[:llm :out] [:assistant-context-assembler :in]] + [[:assistant-context-assembler :out] [:context-aggregator :in]] + + ;; Assemble sentence by sentence for fast speech + [[:llm :out] [:llm-sentence-assembler :in]] + [[:llm-sentence-assembler :sys-out] [:tts :sys-in]] + + [[:tts :out] [:audio-splitter :in]] + [[:audio-splitter :out] [:transport-out :in]] + + ;; Activity detection + [[:transport-out :sys-out] [:activity-monitor :sys-in]] + [[:transport-in :sys-out] [:activity-monitor :sys-in]] + [[:transcriptor :sys-out] [:activity-monitor :sys-in]] + [[:activity-monitor :out] [:context-aggregator :in]] + [[:activity-monitor :out] [:tts :in]]] + extra-conns)}))) + ) (comment diff --git a/examples/src/simulflow_examples/text_chat.clj b/examples/src/simulflow_examples/text_chat.clj index 3834626..29dacab 100644 --- a/examples/src/simulflow_examples/text_chat.clj +++ b/examples/src/simulflow_examples/text_chat.clj @@ -10,6 +10,8 @@ [simulflow.transport.text-in :as text-in] [simulflow.transport.text-out :as text-out] [simulflow.utils.core :as u] + [simulflow-examples.webgpu-processors :as webgpu] + [simulflow-examples.scenario-example :as scenario-example] [taoensso.telemere :as t])) ;; Set log level to reduce noise during chat @@ -28,76 +30,91 @@ For a convenient CLI version, use: bin/chat " ([] (text-chat-flow-config {})) - ([{:keys [llm/context debug? extra-procs extra-conns] - :or {context {:messages - [{:role "system" - :content "You are a helpful AI assistant. Be concise and conversational. - You are communicating through text chat."}] - :tools - [{:type :function - :function - {:name "get_weather" - :handler (fn [{:keys [town]}] - (str "The weather in " town " is 17 degrees celsius")) - :description "Get the current weather of a location" - :parameters {:type :object - :required [:town] - :properties {:town {:type :string - :description "Town for which to retrieve the current weather"}} - :additionalProperties false} - :strict true}} - {:type :function - :function - {:name "quit_chat" - :handler (fn [_] - (println "\nGoodbye!") - (System/exit 0)) - :description "Quit the chat session" - :parameters {:type :object - :properties {} - :additionalProperties false} - :strict true}}]} - debug? false}}] - - {:procs - (u/deep-merge - {;; Read from stdin and emit user-speech-start/transcription/user-speech-stop sequence - :text-input {:proc text-in/text-input-process - :args {}} - ;; Handle conversation context and user speech aggregation - :context-aggregator {:proc context/context-aggregator - :args {:llm/context context - :aggregator/debug? debug?}} - ;; Generate LLM responses with streaming - :llm {:proc openai/openai-llm-process - :args {:openai/api-key (secret [:openai :api-key]) - :llm/model :gpt-4.1-mini}} - ;; Handle assistant message assembly for context - :assistant-context-assembler {:proc context/assistant-context-assembler - :args {:debug? debug?}} - ;; Stream LLM output and manage prompts - :text-output {:proc text-out/text-output-process - :args {:text-out/response-prefix "Assistant: " - :text-out/response-suffix "" - :text-out/show-thinking debug? - :text-out/user-prompt "You: " - :text-out/manage-prompts true}}} - extra-procs) - :conns (concat - [;; Main conversation flow - [[:text-input :out] [:context-aggregator :in]] - [[:context-aggregator :out] [:llm :in]] - ;; Stream LLM responses to output for clean formatting - [[:llm :out] [:text-output :in]] - ;; Assemble assistant context for conversation history - [[:llm :out] [:assistant-context-assembler :in]] - [[:assistant-context-assembler :out] [:context-aggregator :in]] - ;; System frame routing for input blocking/unblocking - ;; LLM emits llm-full-response-start/end frames to :out, which are system frames - [[:llm :out] [:text-input :sys-in]] - ;; System frames for lifecycle management - [[:text-input :sys-out] [:context-aggregator :sys-in]]] - extra-conns)})) + ([config] + (let [default-context {:messages + [{:role "system" + :content "You are a helpful AI assistant. Be concise and conversational. + You are communicating through text chat."}] + :tools + [{:type :function + :function + {:name "get_weather" + :handler (fn [{:keys [town]}] + (str "The weather in " town " is 17 degrees celsius")) + :description "Get the current weather of a location" + :parameters {:type :object + :required [:town] + :properties {:town {:type :string + :description "Town for which to retrieve the current weather"}} + :additionalProperties false} + :strict true}} + {:type :function + :function + {:name "quit_chat" + :handler (fn [_] + (println "\nGoodbye!") + (System/exit 0)) + :description "Quit the chat session" + :parameters {:type :object + :properties {} + :additionalProperties false} + :strict true}}]} + llm-context (or (:llm/context config) default-context) + debug? (boolean (:debug? config)) + provider (keyword (or (:llm/provider config) :openai)) + extra-procs (or (:extra-procs config) {}) + extra-conns (or (:extra-conns config) []) + webgpu-args (-> {} + (cond-> (:webgpu/base-url config) + (assoc :service/base-url (:webgpu/base-url config))) + (cond-> (:webgpu/path config) + (assoc :service/path (:webgpu/path config))) + (cond-> (:webgpu/options config) + (assoc :service/options (:webgpu/options config))))] + + {:procs + (u/deep-merge + {;; Text input transport + :text-input {:proc text-in/text-input-process + :args {:text-in/manage-prompts true + :text-in/prompt "You: "}} + ;; Context aggregator - handles core conversation management + :context-aggregator {:proc context/context-aggregator + :args {:llm/context llm-context + :aggregator/debug? debug?}} + ;; LLM processor - streams responses and handles tool calls + :llm (if (= provider :webgpu) + {:proc webgpu/text-llm-process + :args webgpu-args} + {:proc openai/openai-llm-process + :args {:openai/api-key (secret [:openai :api-key]) + :llm/model :gpt-4.1-mini}}) + ;; Handle assistant message assembly for context + :assistant-context-assembler {:proc context/assistant-context-assembler + :args {:debug? debug?}} + ;; Stream LLM output and manage prompts + :text-output {:proc text-out/text-output-process + :args {:text-out/response-prefix "Assistant: " + :text-out/response-suffix "" + :text-out/show-thinking debug? + :text-out/user-prompt "You: " + :text-out/manage-prompts true}}} + extra-procs) + :conns (concat + [;; Main conversation flow + [[:text-input :out] [:context-aggregator :in]] + [[:context-aggregator :out] [:llm :in]] + ;; Stream LLM responses to output for clean formatting + [[:llm :out] [:text-output :in]] + ;; Assemble assistant context for conversation history + [[:llm :out] [:assistant-context-assembler :in]] + [[:assistant-context-assembler :out] [:context-aggregator :in]] + ;; System frame routing for input blocking/unblocking + ;; LLM emits llm-full-response-start/end frames to :out, which are system frames + [[:llm :out] [:text-input :sys-in]] + ;; System frames for lifecycle management + [[:text-input :sys-out] [:context-aggregator :sys-in]]] + extra-conns)}))) (defn scenario-example "A scenario is a predefined, highly structured conversation. LLM performance @@ -148,6 +165,11 @@ (println "⏸️ Input is blocked while assistant responds") (println "πŸšͺ Ask to 'quit chat' to exit") (println "πŸ”— CLI version available at: bin/chat") + (when (= :webgpu (:llm/provider config)) + (println (str "🧠 LLM provider: WebGPU service " + (or (:webgpu/base-url config) + (System/getenv "WEBGPU_SERVICE_URL") + "http://localhost:4173")))) (println (apply str (repeat 50 "="))) ;; Create flow based on scenario choice (let [{:keys [flow scenario]} (if scenario? @@ -172,9 +194,14 @@ "Main entry point for text chat demo" [& args] (let [debug? (some #(= % "--debug") args) + scenario? (some #(= % "--scenario") args) + webgpu? (some #(= % "--webgpu") args) config (cond-> {} debug? (assoc :debug? true) - (some #(= % "--scenario") args) (assoc :scenario? true))] + scenario? (assoc :scenario? true) + webgpu? (assoc :llm/provider :webgpu) + (and webgpu? (System/getenv "WEBGPU_SERVICE_URL")) + (assoc :webgpu/base-url (System/getenv "WEBGPU_SERVICE_URL")))] (when debug? (t/set-min-level! :debug)) (start-text-chat! config))) diff --git a/examples/src/simulflow_examples/webgpu_processors.clj b/examples/src/simulflow_examples/webgpu_processors.clj new file mode 100644 index 0000000..e703b40 --- /dev/null +++ b/examples/src/simulflow_examples/webgpu_processors.clj @@ -0,0 +1,334 @@ +(ns simulflow-examples.webgpu-processors + "Processors that proxy Simulflow frames to the local WebGPU microservice." + (:require + [clojure.core.async.flow :as flow] + [clojure.string :as str] + [hato.client :as http] + [simulflow.frame :as frame] + [taoensso.telemere :as t]) + (:import + (java.io ByteArrayOutputStream) + (java.nio ByteBuffer ByteOrder) + (java.util Base64))) + +(def ^:private default-base-url + (or (System/getenv "WEBGPU_SERVICE_URL") "http://localhost:4173")) + +(defn- maybe->str + [x] + (cond + (string? x) x + (nil? x) "" + (sequential? x) (->> x (map maybe->str) (str/join " ")) + :else (str x))) + +(defn- normalize-messages + [messages] + (mapv (fn [m] + {:role (-> (:role m) + (or (:role (:frame/data m))) + (or "assistant") + name) + :content (maybe->str (:content m))}) + messages)) + +(defn- http-json-post + [base path body] + (http/post (str base path) + {:json-params body + :accept :json + :as :json + :coerce :always + :throw-exceptions false})) + +;; --------------------------------------------------------------------------- +;; LLM Processor (/chat) +;; --------------------------------------------------------------------------- + +(defn webgpu-llm-describe [] + {:ins {:in "LLM context frames" + :sys-in "System frames"} + :outs {:out "Streaming response frames" + :sys-out "System response frames"} + :params {:service/base-url "Base URL for the WebGPU service" + :service/path "Relative path for chat endpoint" + :service/options "Optional request map forwarded to the service"}}) + +(defn webgpu-llm-init! + [params] + (merge {:service/base-url default-base-url + :service/path "/chat" + :service/options {}} + params)) + +(defn- service-output-text + [body] + (or (:output body) + (:text body) + (get body "output") + (get body "text") + "")) + +(defn webgpu-llm-transform + [{:service/keys [base-url path options] :as state} in msg] + (cond + (frame/control-interrupt-start? msg) + [(assoc state :pipeline/interrupted? true) {}] + + (frame/control-interrupt-stop? msg) + [(assoc state :pipeline/interrupted? false) {}] + + (frame/llm-context? msg) + (let [context (:frame/data msg) + payload {:messages (normalize-messages (:messages context)) + :options options} + response (try + (http-json-post base-url path payload) + (catch Exception e + (t/log! {:level :error :id :webgpu-llm :error e} + "Failed to reach WebGPU LLM service") + nil)) + status (:status response) + body (:body response) + text (service-output-text body) + text (if (str/blank? text) + (str "[WebGPU LLM error" + (when status (str " (status " status ")")) + "]") + text)] + [(assoc state :last-response text) + (frame/send (frame/llm-full-response-start true) + (frame/llm-text-chunk text) + (frame/llm-full-response-end true))]) + + :else + [state {}])) + +(defn webgpu-llm-fn + ([] (webgpu-llm-describe)) + ([params] (webgpu-llm-init! params)) + ([state transition] state) + ([state in msg] (webgpu-llm-transform state in msg))) + +(def text-llm-process + "Flow process that proxies LLM requests to the local WebGPU service" + (flow/process webgpu-llm-fn)) + +;; --------------------------------------------------------------------------- +;; STT Processor (/stt) +;; --------------------------------------------------------------------------- + +(defn webgpu-stt-describe [] + {:ins {:in "Audio input frames" + :sys-in "System frames"} + :outs {:out "Transcription frames" + :sys-out "System frames"} + :params {:service/base-url "Base URL for the WebGPU service" + :service/path "Relative path for stt endpoint" + :audio/sample-rate "Sample rate to report to the service" + :audio/dtype "Audio dtype encoding (int16 or float32)"}}) + +(defn webgpu-stt-init! + [params] + (merge {:service/base-url default-base-url + :service/path "/stt" + :audio/sample-rate 16000 + :audio/dtype "int16" + ::buffer (ByteArrayOutputStream.) + :speech-active? false} + params)) + +(defn- reset-buffer! + [state] + (when-let [buf (::buffer state)] + (.reset ^ByteArrayOutputStream buf)) + (assoc state ::buffer (ByteArrayOutputStream.))) + +(defn- append-buffer! + [state ^bytes bytes] + (let [buf (or (::buffer state) (ByteArrayOutputStream.))] + (.write buf bytes 0 (alength bytes)) + (assoc state ::buffer buf))) + +(defn- encode-audio-bytes + [^ByteArrayOutputStream buf] + (let [bytes (.toByteArray buf)] + (.encodeToString (Base64/getEncoder) bytes))) + +(defn- stt-response->text + [body] + (or (:text body) + (get body "text") + (when-let [chunks (seq (:chunks body))] + (->> chunks + (map #(or (:text %) (get % "text"))) + (remove str/blank?) + (str/join " "))) + (when-let [chunks (seq (get body "chunks"))] + (->> chunks + (map #(or (:text %) (get % "text"))) + (remove str/blank?) + (str/join " "))))) + +(defn webgpu-stt-transform + [{:service/keys [base-url path] + :as state} _ msg] + (let [speech-active? (:speech-active? state) + sample-rate (:audio/sample-rate state) + audio-dtype (:audio/dtype state)] + (cond + (frame/control-interrupt-start? msg) + [(assoc state :pipeline/interrupted? true) {}] + + (frame/control-interrupt-stop? msg) + [(assoc state :pipeline/interrupted? false) {}] + + (frame/user-speech-start? msg) + [(-> state + reset-buffer! + (assoc :speech-active? true)) + {}] + + (frame/audio-input-raw? msg) + (if speech-active? + [(append-buffer! state (:frame/data msg)) {}] + [state {}]) + + (frame/user-speech-stop? msg) + (let [buf (::buffer state) + size (when buf (.size ^ByteArrayOutputStream buf))] + (if (pos? (or size 0)) + (let [payload {:audio {:data (encode-audio-bytes buf) + :encoding "base64" + :dtype audio-dtype} + :options {:sampleRate sample-rate}} + response (try + (http-json-post base-url path payload) + (catch Exception e + (t/log! {:level :error :id :webgpu-stt :error e} + "Failed to reach WebGPU STT service") + nil)) + status (:status response) + body (:body response) + text (some-> body stt-response->text str/trim)] + [(-> state + reset-buffer! + (assoc :speech-active? false)) + (if (and text (not (str/blank? text))) + (frame/send (frame/transcription text)) + (do + (t/log! {:level :warn :id :webgpu-stt :data {:status status}} + "WebGPU STT returned empty transcription") + {}))]) + [(-> state reset-buffer! (assoc :speech-active? false)) {}])) + + :else + [state {}]))) + +(defn webgpu-stt-fn + ([] (webgpu-stt-describe)) + ([params] (webgpu-stt-init! params)) + ([state transition] state) + ([state in msg] (webgpu-stt-transform state in msg))) + +(def speech-stt-process + "Flow process that proxies microphone audio to the local WebGPU STT service" + (flow/process webgpu-stt-fn)) + +;; --------------------------------------------------------------------------- +;; TTS Processor (/tts) +;; --------------------------------------------------------------------------- + +(defn webgpu-tts-describe [] + {:ins {:in "Speak frames and system frames"} + :outs {:out "Audio output frames" + :sys-out "System frames"} + :params {:service/base-url "Base URL for the WebGPU service" + :service/path "Relative path for tts endpoint"}}) + +(defn webgpu-tts-init! + [params] + (merge {:service/base-url default-base-url + :service/path "/tts"} + params)) + +(defn- base64->bytes [s] + (when s + (.decode (Base64/getDecoder) ^String s))) + +(defn- floats->pcm16-bytes + "Convert binary float32 little-endian samples into PCM16 little-endian bytes." + [^bytes f32-bytes] + (when f32-bytes + (let [float-buffer (doto (ByteBuffer/wrap f32-bytes) + (.order ByteOrder/LITTLE_ENDIAN)) + sample-count (quot (.remaining float-buffer) Float/BYTES) + out-buffer (doto (ByteBuffer/allocate (* sample-count Short/BYTES)) + (.order ByteOrder/LITTLE_ENDIAN))] + (dotimes [_ sample-count] + (let [sample (double (.getFloat float-buffer)) + clamped (min 1.0 (max -1.0 sample)) + short-val (short (Math/round (* clamped 32767.0)))] + (.putShort out-buffer short-val))) + (.flip out-buffer) + (let [out-bytes (byte-array (.remaining out-buffer))] + (.get out-buffer out-bytes) + out-bytes)))) + +(defn- ->audio-bytes + "Decode the payload emitted by the WebGPU TTS service." + [{:keys [format data] :as body}] + (let [raw (base64->bytes (or data (:data body)))] + (case (or format (:format body) "f32le") + ("f32" "f32le") (floats->pcm16-bytes raw) + ("pcm16" "s16le") raw + raw))) + +(defn webgpu-tts-transform + [{:service/keys [base-url path] :as state} _ msg] + (cond + (frame/control-interrupt-start? msg) + [(assoc state :pipeline/interrupted? true) {}] + + (frame/control-interrupt-stop? msg) + [(assoc state :pipeline/interrupted? false) {}] + + (and (frame/speak-frame? msg) (not (:pipeline/interrupted? state))) + (let [text (:frame/data msg) + response (try + (http-json-post base-url path {:text text}) + (catch Exception e + (t/log! {:level :error :id :webgpu-tts :error e} + "Failed to reach WebGPU TTS service") + nil)) + status (:status response) + body (:body response) + sample-rate (or (:samplingRate body) + (:sampleRate body) + (:sampling-rate body) + (get body "samplingRate") + (get body "sampleRate") + 16000) + audio-bytes (->audio-bytes body)] + (if (and audio-bytes (pos? (alength ^bytes audio-bytes))) + [state {:out [(frame/audio-output-raw {:audio audio-bytes + :sample-rate sample-rate})]}] + (do + (t/log! {:level :error + :id :webgpu-tts + :data {:status status}} + "WebGPU TTS returned empty payload") + [state {}]))) + + :else + [state {}])) + +(defn webgpu-tts-fn + ([] (webgpu-tts-describe)) + ([params] (webgpu-tts-init! params)) + ([state transition] state) + ([state in msg] (webgpu-tts-transform state in msg))) + +(def speech-tts-process + "Flow process that proxies TTS requests to the local WebGPU service" + (flow/process webgpu-tts-fn))