diff --git a/assets/svelte/consumers/types.ts b/assets/svelte/consumers/types.ts index af1a2fef2..3b19f419a 100644 --- a/assets/svelte/consumers/types.ts +++ b/assets/svelte/consumers/types.ts @@ -176,6 +176,7 @@ export type KafkaConsumer = BaseConsumer & { tls: boolean; topic: string; sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512"; + compression: "none" | "gzip"; }; }; diff --git a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte index f176074eb..4d6ba1f71 100644 --- a/assets/svelte/sinks/kafka/KafkaSinkForm.svelte +++ b/assets/svelte/sinks/kafka/KafkaSinkForm.svelte @@ -33,6 +33,12 @@ { value: "aws_msk_iam", label: "AWS MSK IAM" }, ]; + // Compression options + let compressionOptions = [ + { value: "none", label: "None" }, + { value: "gzip", label: "Gzip" }, + ]; + // Add internal state for AWS credentials let internalAwsKeyId = form.sink.aws_access_key_id || ""; let internalAwsSecretKey = form.sink.aws_secret_access_key || ""; @@ -252,6 +258,27 @@

{errors.sink.tls}

{/if} + + +
+ + +

+ Compress messages before sending to Kafka using Gzip. This can + significantly reduce network bandwidth and storage requirements. +

+ {#if errors.sink?.compression} +

{errors.sink.compression}

+ {/if} +
diff --git a/docs/reference/sinks/kafka.mdx b/docs/reference/sinks/kafka.mdx index c0ce5829c..3b93cb268 100644 --- a/docs/reference/sinks/kafka.mdx +++ b/docs/reference/sinks/kafka.mdx @@ -36,6 +36,14 @@ The Kafka sink sends messages to a Kafka topic. Enable TLS encryption for connections to Kafka. Currently, Sequin only supports TLS with verify none. If you need a different TLS configuration, please [let us know](https://github.com/sequinstream/sequin/issues/592). +- **Compression** (optional) + + Choose a compression algorithm to reduce the size of messages sent to Kafka. Options include: + - **None** (default): No compression + - **Gzip**: Good compression ratio with wide compatibility, uses Erlang's built-in `:zlib` module + + Compression is applied to batches of messages before sending to Kafka, which can significantly reduce network bandwidth and storage requirements. When using compression, be mindful of the `max_batch_size` setting to ensure compressed batches don't exceed Kafka's `max.message.bytes` limit. + ## Advanced configuration - **Max Ack Pending** diff --git a/lib/sequin/consumers/kafka_sink.ex b/lib/sequin/consumers/kafka_sink.ex index 273218286..4df056612 100644 --- a/lib/sequin/consumers/kafka_sink.ex +++ b/lib/sequin/consumers/kafka_sink.ex @@ -25,6 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do field :aws_secret_access_key, EncryptedField field :connection_id, :string field :routing_mode, Ecto.Enum, values: [:dynamic, :static] + field :compression, Ecto.Enum, values: [:none, :gzip], default: :none end def changeset(struct, params) do @@ -39,7 +40,8 @@ defmodule Sequin.Consumers.KafkaSink do :aws_region, :aws_access_key_id, :aws_secret_access_key, - :routing_mode + :routing_mode, + :compression ]) |> validate_required([:hosts, :tls]) |> validate_routing() @@ -175,6 +177,7 @@ defmodule Sequin.Consumers.KafkaSink do [] |> maybe_add_sasl(sink) |> maybe_add_ssl(sink) + |> maybe_add_compression(sink) |> Keyword.put(:query_api_versions, true) |> Keyword.put(:auto_start_producers, true) end @@ -200,4 +203,12 @@ defmodule Sequin.Consumers.KafkaSink do end defp maybe_add_ssl(config, _), do: config + + # Add compression configuration if not :none + defp maybe_add_compression(config, %{compression: :none}), do: config + defp maybe_add_compression(config, %{compression: nil}), do: config + + defp maybe_add_compression(config, %{compression: compression}) do + Keyword.put(config, :compression, compression) + end end diff --git a/test/sequin/kafka_sink_test.exs b/test/sequin/kafka_sink_test.exs index 72a2de17c..dc09afb2d 100644 --- a/test/sequin/kafka_sink_test.exs +++ b/test/sequin/kafka_sink_test.exs @@ -215,6 +215,86 @@ defmodule Sequin.Consumers.KafkaSinkTest do refute :topic in changeset.changes end + + test "accepts valid compression values" do + for compression <- [:none, :gzip] do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + routing_mode: :static, + compression: compression + }) + + assert changeset.valid?, "Expected compression #{compression} to be valid" + end + end + + test "defaults compression to :none" do + changeset = + KafkaSink.changeset(%KafkaSink{}, %{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + routing_mode: :static + }) + + assert changeset.valid? + sink = Ecto.Changeset.apply_changes(changeset) + assert sink.compression == :none + end + end + + describe "to_brod_config/1" do + test "includes compression when set to gzip" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :gzip + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :gzip + end + + test "does not include compression when set to :none" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: :none + } + + config = KafkaSink.to_brod_config(sink) + refute Keyword.has_key?(config, :compression) + end + + test "does not include compression when nil" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: false, + compression: nil + } + + config = KafkaSink.to_brod_config(sink) + refute Keyword.has_key?(config, :compression) + end + + test "includes both compression and ssl when both are set" do + sink = %KafkaSink{ + hosts: "localhost:9092", + topic: "test-topic", + tls: true, + compression: :gzip + } + + config = KafkaSink.to_brod_config(sink) + assert Keyword.get(config, :compression) == :gzip + assert Keyword.get(config, :ssl) == true + end end # Helper function to extract error messages