diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts
index af1a2fef2..3b19f419a 100644
--- a/assets/svelte/consumers/types.ts
+++ b/assets/svelte/consumers/types.ts
@@ -176,6 +176,7 @@ export type KafkaConsumer = BaseConsumer & {
tls: boolean;
topic: string;
sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512";
+ compression: "none" | "gzip";
};
};
diff --git a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte
index f176074eb..4d6ba1f71 100644
--- a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte
+++ b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte
@@ -33,6 +33,12 @@
{ value: "aws_msk_iam", label: "AWS MSK IAM" },
];
+ // Compression options
+ let compressionOptions = [
+ { value: "none", label: "None" },
+ { value: "gzip", label: "Gzip" },
+ ];
+
// Add internal state for AWS credentials
let internalAwsKeyId = form.sink.aws_access_key_id || "";
let internalAwsSecretKey = form.sink.aws_secret_access_key || "";
@@ -252,6 +258,27 @@
{errors.sink.tls}
{/if}
+
+
+
+
+
+
+ Compress messages before sending to Kafka using Gzip. This can
+ significantly reduce network bandwidth and storage requirements.
+
+ {#if errors.sink?.compression}
+
{errors.sink.compression}
+ {/if}
+
diff --git a/docs/reference/sinks/kafka.mdx b/docs/reference/sinks/kafka.mdx
index c0ce5829c..3b93cb268 100644
--- a/docs/reference/sinks/kafka.mdx
+++ b/docs/reference/sinks/kafka.mdx
@@ -36,6 +36,14 @@ The Kafka sink sends messages to a Kafka topic.
Enable TLS encryption for connections to Kafka. Currently, Sequin only supports TLS with verify none. If you need a different TLS configuration, please [let us know](https://github.com/sequinstream/sequin/issues/592).
+- **Compression** (optional)
+
+ Choose a compression algorithm to reduce the size of messages sent to Kafka. Options include:
+ - **None** (default): No compression
+ - **Gzip**: Good compression ratio with wide compatibility, uses Erlang's built-in `:zlib` module
+
+ Compression is applied to batches of messages before sending to Kafka, which can significantly reduce network bandwidth and storage requirements. When using compression, be mindful of the `max_batch_size` setting to ensure compressed batches don't exceed Kafka's `max.message.bytes` limit.
+
## Advanced configuration
- **Max Ack Pending**
diff --git a/lib/sequin/consumers/kafka_sink.ex b/lib/sequin/consumers/kafka_sink.ex
index 273218286..4df056612 100644
--- a/lib/sequin/consumers/kafka_sink.ex
+++ b/lib/sequin/consumers/kafka_sink.ex
@@ -25,6 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do
field :aws_secret_access_key, EncryptedField
field :connection_id, :string
field :routing_mode, Ecto.Enum, values: [:dynamic, :static]
+ field :compression, Ecto.Enum, values: [:none, :gzip], default: :none
end
def changeset(struct, params) do
@@ -39,7 +40,8 @@ defmodule Sequin.Consumers.KafkaSink do
:aws_region,
:aws_access_key_id,
:aws_secret_access_key,
- :routing_mode
+ :routing_mode,
+ :compression
])
|> validate_required([:hosts, :tls])
|> validate_routing()
@@ -175,6 +177,7 @@ defmodule Sequin.Consumers.KafkaSink do
[]
|> maybe_add_sasl(sink)
|> maybe_add_ssl(sink)
+ |> maybe_add_compression(sink)
|> Keyword.put(:query_api_versions, true)
|> Keyword.put(:auto_start_producers, true)
end
@@ -200,4 +203,12 @@ defmodule Sequin.Consumers.KafkaSink do
end
defp maybe_add_ssl(config, _), do: config
+
+ # Add compression configuration if not :none
+ defp maybe_add_compression(config, %{compression: :none}), do: config
+ defp maybe_add_compression(config, %{compression: nil}), do: config
+
+ defp maybe_add_compression(config, %{compression: compression}) do
+ Keyword.put(config, :compression, compression)
+ end
end
diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs
index 72a2de17c..dc09afb2d 100644
--- a/test/sequin/kafka_sink_test.exs
+++ b/test/sequin/kafka_sink_test.exs
@@ -215,6 +215,86 @@ defmodule Sequin.Consumers.KafkaSinkTest do
refute :topic in changeset.changes
end
+
+ test "accepts valid compression values" do
+ for compression <- [:none, :gzip] do
+ changeset =
+ KafkaSink.changeset(%KafkaSink{}, %{
+ hosts: "localhost:9092",
+ topic: "test-topic",
+ tls: false,
+ routing_mode: :static,
+ compression: compression
+ })
+
+ assert changeset.valid?, "Expected compression #{compression} to be valid"
+ end
+ end
+
+ test "defaults compression to :none" do
+ changeset =
+ KafkaSink.changeset(%KafkaSink{}, %{
+ hosts: "localhost:9092",
+ topic: "test-topic",
+ tls: false,
+ routing_mode: :static
+ })
+
+ assert changeset.valid?
+ sink = Ecto.Changeset.apply_changes(changeset)
+ assert sink.compression == :none
+ end
+ end
+
+ describe "to_brod_config/1" do
+ test "includes compression when set to gzip" do
+ sink = %KafkaSink{
+ hosts: "localhost:9092",
+ topic: "test-topic",
+ tls: false,
+ compression: :gzip
+ }
+
+ config = KafkaSink.to_brod_config(sink)
+ assert Keyword.get(config, :compression) == :gzip
+ end
+
+ test "does not include compression when set to :none" do
+ sink = %KafkaSink{
+ hosts: "localhost:9092",
+ topic: "test-topic",
+ tls: false,
+ compression: :none
+ }
+
+ config = KafkaSink.to_brod_config(sink)
+ refute Keyword.has_key?(config, :compression)
+ end
+
+ test "does not include compression when nil" do
+ sink = %KafkaSink{
+ hosts: "localhost:9092",
+ topic: "test-topic",
+ tls: false,
+ compression: nil
+ }
+
+ config = KafkaSink.to_brod_config(sink)
+ refute Keyword.has_key?(config, :compression)
+ end
+
+ test "includes both compression and ssl when both are set" do
+ sink = %KafkaSink{
+ hosts: "localhost:9092",
+ topic: "test-topic",
+ tls: true,
+ compression: :gzip
+ }
+
+ config = KafkaSink.to_brod_config(sink)
+ assert Keyword.get(config, :compression) == :gzip
+ assert Keyword.get(config, :ssl) == true
+ end
end
# Helper function to extract error messages