From a03bf01db1c2b9edbe2a9de3fdde9dfd5b1f7066 Mon Sep 17 00:00:00 2001 From: Justin Date: Fri, 23 Jan 2026 11:44:31 -0800 Subject: [PATCH 1/4] Set up payload compression options when using a Kafka sink --- docs/reference/sinks/kafka.mdx | 11 +++ lib/sequin/consumers/kafka_sink.ex | 13 +++- test/sequin/kafka_sink_test.exs | 116 +++++++++++++++++++++++++++++ 3 files changed, 139 insertions(+), 1 deletion(-) diff --git a/docs/reference/sinks/kafka.mdx b/docs/reference/sinks/kafka.mdx index c0ce5829c..c8f076a72 100644 --- a/docs/reference/sinks/kafka.mdx +++ b/docs/reference/sinks/kafka.mdx @@ -36,6 +36,17 @@ The Kafka sink sends messages to a Kafka topic. Enable TLS encryption for connections to Kafka. Currently, Sequin only supports TLS with verify none. If you need a different TLS configuration, please [let us know](https://github.com/sequinstream/sequin/issues/592). +- **Compression** (optional) + + Choose a compression algorithm to reduce the size of messages sent to Kafka. Options include: + - **None** (default): No compression + - **Gzip**: Good compression ratio with wide compatibility + - **Snappy**: Fast compression with moderate compression ratio + - **LZ4**: Very fast compression with moderate compression ratio + - **Zstd**: Best compression ratio (requires Kafka 2.1.0+) + + Compression is applied to batches of messages before sending to Kafka, which can significantly reduce network bandwidth and storage requirements. When using compression, be mindful of the `max_batch_size` setting to ensure compressed batches don't exceed Kafka's `max.message.bytes` limit. + ## Advanced configuration - **Max Ack Pending** diff --git a/lib/sequin/consumers/kafka_sink.ex b/lib/sequin/consumers/kafka_sink.ex index 273218286..b071235c6 100644 --- a/lib/sequin/consumers/kafka_sink.ex +++ b/lib/sequin/consumers/kafka_sink.ex @@ -25,6 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do field :aws_secret_access_key, EncryptedField field :connection_id, :string field :routing_mode, Ecto.Enum, values: [:dynamic, :static] + field :compression, Ecto.Enum, values: [:none, :gzip, :snappy, :lz4, :zstd], default: :none end def changeset(struct, params) do @@ -39,7 +40,8 @@ defmodule Sequin.Consumers.KafkaSink do :aws_region, :aws_access_key_id, :aws_secret_access_key, - :routing_mode + :routing_mode, + :compression ]) |> validate_required([:hosts, :tls]) |> validate_routing() @@ -175,6 +177,7 @@ defmodule Sequin.Consumers.KafkaSink do [] |> maybe_add_sasl(sink) |> maybe_add_ssl(sink) + |> maybe_add_compression(sink) |> Keyword.put(:query_api_versions, true) |> Keyword.put(:auto_start_producers, true) end @@ -200,4 +203,12 @@ defmodule Sequin.Consumers.KafkaSink do end defp maybe_add_ssl(config, _), do: config + + # Add compression configuration if not :none + defp maybe_add_compression(config, %{compression: :none}), do: config + defp maybe_add_compression(config, %{compression: nil}), do: config + + defp maybe_add_compression(config, %{compression: compression}) do + Keyword.put(config, :compression, compression) + end end diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs index 72a2de17c..99d40e6ff 100644 --- a/test/sequin/kafka_sink_test.exs +++ b/test/sequin/kafka_sink_test.exs @@ -215,6 +215,122 @@ defmodule Sequin.Consumers.KafkaSinkTest do refute :topic in changeset.changes end + + test "accepts valid compression values" do + for compression <- [:none, :gzip, :snappy, :lz4, :zstd] do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + routing_mode: :static, + compression: compression + }) + + assert changeset.valid?, "Expected compression #{compression} to be valid" + end + end + + test "defaults compression to :none" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + routing_mode: :static + }) + + assert changeset.valid? + sink = Ecto.Changeset.apply_changes(changeset) + assert sink.compression == :none + end + end + + describe "to_brod_config/1" do + test "includes compression when set to gzip" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :gzip + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :gzip + end + + test "includes compression when set to snappy" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :snappy + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :snappy + end + + test "includes compression when set to lz4" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :lz4 + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :lz4 + end + + test "includes compression when set to zstd" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :zstd + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :zstd + end + + test "does not include compression when set to :none" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :none + } + + config = KafkaSink.to_brod_config(sink) + refute Keyword.has_key?(config, :compression) + end + + test "does not include compression when nil" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: nil + } + + config = KafkaSink.to_brod_config(sink) + refute Keyword.has_key?(config, :compression) + end + + test "includes both compression and ssl when both are set" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + compression: :gzip + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :gzip + assert Keyword.get(config, :ssl) == true + end end # Helper function to extract error messages From baaf162a472f914c1ff937fb3f9fa68de2e2496b Mon Sep 17 00:00:00 2001 From: Justin Date: Fri, 23 Jan 2026 11:44:52 -0800 Subject: [PATCH 2/4] Update the UI to select payload compression options --- assets/svelte/consumers/types.ts | 1 + .../svelte/sinks/kafka/KafkaSinkForm.svelte | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts index af1a2fef2..ab128c33c 100644 --- a/assets/svelte/consumers/types.ts +++ b/assets/svelte/consumers/types.ts @@ -176,6 +176,7 @@ export type KafkaConsumer = BaseConsumer & { tls: boolean; topic: string; sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512"; + compression: "none" | "gzip" | "snappy" | "lz4" | "zstd"; }; }; diff --git a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte index f176074eb..4cb3d0d8f 100644 --- a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte +++ b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte @@ -33,6 +33,15 @@ { value: "aws_msk_iam", label: "AWS MSK IAM" }, ]; + // Compression options + let compressionOptions = [ + { value: "none", label: "None" }, + { value: "gzip", label: "Gzip" }, + { value: "snappy", label: "Snappy" }, + { value: "lz4", label: "LZ4" }, + { value: "zstd", label: "Zstd" }, + ]; + // Add internal state for AWS credentials let internalAwsKeyId = form.sink.aws_access_key_id || ""; let internalAwsSecretKey = form.sink.aws_secret_access_key || ""; @@ -252,6 +261,28 @@

{errors.sink.tls}

{/if} + + +
+ + +

+ Compress messages before sending to Kafka. Gzip offers good compression + with wide compatibility. Snappy and LZ4 are faster but offer less + compression. Zstd provides the best compression ratio. +

+ {#if errors.sink?.compression} +

{errors.sink.compression}

+ {/if} +
From d6a92ae70939b1fe5f7b2f4138994d38d3da4a29 Mon Sep 17 00:00:00 2001 From: Justin Date: Fri, 23 Jan 2026 11:59:18 -0800 Subject: [PATCH 3/4] Remove other compression formats, keeping `gzip` --- assets/svelte/consumers/types.ts | 2 +- .../svelte/sinks/kafka/KafkaSinkForm.svelte | 8 +--- docs/reference/sinks/kafka.mdx | 5 +-- lib/sequin/consumers/kafka_sink.ex | 2 +- test/sequin/kafka_sink_test.exs | 37 +------------------ 5 files changed, 6 insertions(+), 48 deletions(-) diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts index ab128c33c..3b19f419a 100644 --- a/assets/svelte/consumers/types.ts +++ b/assets/svelte/consumers/types.ts @@ -176,7 +176,7 @@ export type KafkaConsumer = BaseConsumer & { tls: boolean; topic: string; sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512"; - compression: "none" | "gzip" | "snappy" | "lz4" | "zstd"; + compression: "none" | "gzip"; }; }; diff --git a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte index 4cb3d0d8f..4d6ba1f71 100644 --- a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte +++ b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte @@ -37,9 +37,6 @@ let compressionOptions = [ { value: "none", label: "None" }, { value: "gzip", label: "Gzip" }, - { value: "snappy", label: "Snappy" }, - { value: "lz4", label: "LZ4" }, - { value: "zstd", label: "Zstd" }, ]; // Add internal state for AWS credentials @@ -275,9 +272,8 @@ {/each}

- Compress messages before sending to Kafka. Gzip offers good compression - with wide compatibility. Snappy and LZ4 are faster but offer less - compression. Zstd provides the best compression ratio. + Compress messages before sending to Kafka using Gzip. This can + significantly reduce network bandwidth and storage requirements.

{#if errors.sink?.compression}

{errors.sink.compression}

diff --git a/docs/reference/sinks/kafka.mdx b/docs/reference/sinks/kafka.mdx index c8f076a72..3b93cb268 100644 --- a/docs/reference/sinks/kafka.mdx +++ b/docs/reference/sinks/kafka.mdx @@ -40,10 +40,7 @@ The Kafka sink sends messages to a Kafka topic. Choose a compression algorithm to reduce the size of messages sent to Kafka. Options include: - **None** (default): No compression - - **Gzip**: Good compression ratio with wide compatibility - - **Snappy**: Fast compression with moderate compression ratio - - **LZ4**: Very fast compression with moderate compression ratio - - **Zstd**: Best compression ratio (requires Kafka 2.1.0+) + - **Gzip**: Good compression ratio with wide compatibility, uses Erlang's built-in `:zlib` module Compression is applied to batches of messages before sending to Kafka, which can significantly reduce network bandwidth and storage requirements. When using compression, be mindful of the `max_batch_size` setting to ensure compressed batches don't exceed Kafka's `max.message.bytes` limit. diff --git a/lib/sequin/consumers/kafka_sink.ex b/lib/sequin/consumers/kafka_sink.ex index b071235c6..4df056612 100644 --- a/lib/sequin/consumers/kafka_sink.ex +++ b/lib/sequin/consumers/kafka_sink.ex @@ -25,7 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do field :aws_secret_access_key, EncryptedField field :connection_id, :string field :routing_mode, Ecto.Enum, values: [:dynamic, :static] - field :compression, Ecto.Enum, values: [:none, :gzip, :snappy, :lz4, :zstd], default: :none + field :compression, Ecto.Enum, values: [:none, :gzip], default: :none end def changeset(struct, params) do diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs index 99d40e6ff..664d85d80 100644 --- a/test/sequin/kafka_sink_test.exs +++ b/test/sequin/kafka_sink_test.exs @@ -217,7 +217,7 @@ defmodule Sequin.Consumers.KafkaSinkTest do end test "accepts valid compression values" do - for compression <- [:none, :gzip, :snappy, :lz4, :zstd] do + for compression <- [:none, :gzip] do changeset = KafkaSink.changeset(%KafkaSink{}, %{ hosts: "localhost:9092", @@ -259,41 +259,6 @@ defmodule Sequin.Consumers.KafkaSinkTest do assert Keyword.get(config, :compression) == :gzip end - test "includes compression when set to snappy" do - sink = %KafkaSink{ - hosts: "localhost:9092", - topic: "test-topic", - tls: false, - compression: :snappy - } - - config = KafkaSink.to_brod_config(sink) - assert Keyword.get(config, :compression) == :snappy - end - - test "includes compression when set to lz4" do - sink = %KafkaSink{ - hosts: "localhost:9092", - topic: "test-topic", - tls: false, - compression: :lz4 - } - - config = KafkaSink.to_brod_config(sink) - assert Keyword.get(config, :compression) == :lz4 - end - - test "includes compression when set to zstd" do - sink = %KafkaSink{ - hosts: "localhost:9092", - topic: "test-topic", - tls: false, - compression: :zstd - } - - config = KafkaSink.to_brod_config(sink) - assert Keyword.get(config, :compression) == :zstd - end test "does not include compression when set to :none" do sink = %KafkaSink{ From 328702d3f28f84fb08b6b9cebe6ff278cbaa4a68 Mon Sep 17 00:00:00 2001 From: Justin Date: Fri, 23 Jan 2026 12:23:01 -0800 Subject: [PATCH 4/4] Fix formatting --- test/sequin/kafka_sink_test.exs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs index 664d85d80..dc09afb2d 100644 --- a/test/sequin/kafka_sink_test.exs +++ b/test/sequin/kafka_sink_test.exs @@ -259,7 +259,6 @@ defmodule Sequin.Consumers.KafkaSinkTest do assert Keyword.get(config, :compression) == :gzip end - test "does not include compression when set to :none" do sink = %KafkaSink{ hosts: "localhost:9092",