Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions assets/svelte/consumers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ export type KafkaConsumer = BaseConsumer & {
tls: boolean;
topic: string;
sasl_mechanism: null | "plain" | "scram_sha_256" | "scram_sha_512";
compression: "none" | "gzip";
};
};

Expand Down
27 changes: 27 additions & 0 deletions assets/svelte/sinks/kafka/KafkaSinkForm.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
{ value: "aws_msk_iam", label: "AWS MSK IAM" },
];

// Compression options
let compressionOptions = [
{ value: "none", label: "None" },
{ value: "gzip", label: "Gzip" },
];

// Add internal state for AWS credentials
let internalAwsKeyId = form.sink.aws_access_key_id || "";
let internalAwsSecretKey = form.sink.aws_secret_access_key || "";
Expand Down Expand Up @@ -252,6 +258,27 @@
<p class="text-destructive text-sm">{errors.sink.tls}</p>
{/if}
</div>

<!-- Compression Dropdown -->
<div class="space-y-2">
<Label for="compression">Compression</Label>
<select
id="compression"
bind:value={form.sink.compression}
class="block w-full border border-gray-300 rounded-md p-2"
>
{#each compressionOptions as option}
<option value={option.value}>{option.label}</option>
{/each}
</select>
<p class="text-xs">
Compress messages before sending to Kafka using Gzip. This can
significantly reduce network bandwidth and storage requirements.
</p>
{#if errors.sink?.compression}
<p class="text-destructive text-sm">{errors.sink.compression}</p>
{/if}
</div>
</CardContent>
</Card>

Expand Down
8 changes: 8 additions & 0 deletions docs/reference/sinks/kafka.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ The Kafka sink sends messages to a Kafka topic.

Enable TLS encryption for connections to Kafka. Currently, Sequin only supports TLS with verify none. If you need a different TLS configuration, please [let us know](https://github.com/sequinstream/sequin/issues/592).

- **Compression** (optional)

Choose a compression algorithm to reduce the size of messages sent to Kafka. Options include:
- **None** (default): No compression
- **Gzip**: Good compression ratio with wide compatibility, uses Erlang's built-in `:zlib` module

Compression is applied to batches of messages before sending to Kafka, which can significantly reduce network bandwidth and storage requirements. When using compression, be mindful of the `max_batch_size` setting to ensure compressed batches don't exceed Kafka's `max.message.bytes` limit.

## Advanced configuration

- **Max Ack Pending**
Expand Down
13 changes: 12 additions & 1 deletion lib/sequin/consumers/kafka_sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Sequin.Consumers.KafkaSink do
field :aws_secret_access_key, EncryptedField
field :connection_id, :string
field :routing_mode, Ecto.Enum, values: [:dynamic, :static]
field :compression, Ecto.Enum, values: [:none, :gzip], default: :none
end

def changeset(struct, params) do
Expand All @@ -39,7 +40,8 @@ defmodule Sequin.Consumers.KafkaSink do
:aws_region,
:aws_access_key_id,
:aws_secret_access_key,
:routing_mode
:routing_mode,
:compression
])
|> validate_required([:hosts, :tls])
|> validate_routing()
Expand Down Expand Up @@ -175,6 +177,7 @@ defmodule Sequin.Consumers.KafkaSink do
[]
|> maybe_add_sasl(sink)
|> maybe_add_ssl(sink)
|> maybe_add_compression(sink)
|> Keyword.put(:query_api_versions, true)
|> Keyword.put(:auto_start_producers, true)
end
Expand All @@ -200,4 +203,12 @@ defmodule Sequin.Consumers.KafkaSink do
end

defp maybe_add_ssl(config, _), do: config

# Add compression configuration if not :none
defp maybe_add_compression(config, %{compression: :none}), do: config
defp maybe_add_compression(config, %{compression: nil}), do: config

defp maybe_add_compression(config, %{compression: compression}) do
Keyword.put(config, :compression, compression)
end
end
80 changes: 80 additions & 0 deletions test/sequin/kafka_sink_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,86 @@ defmodule Sequin.Consumers.KafkaSinkTest do

refute :topic in changeset.changes
end

test "accepts valid compression values" do
for compression <- [:none, :gzip] do
changeset =
KafkaSink.changeset(%KafkaSink{}, %{
hosts: "localhost:9092",
topic: "test-topic",
tls: false,
routing_mode: :static,
compression: compression
})

assert changeset.valid?, "Expected compression #{compression} to be valid"
end
end

test "defaults compression to :none" do
changeset =
KafkaSink.changeset(%KafkaSink{}, %{
hosts: "localhost:9092",
topic: "test-topic",
tls: false,
routing_mode: :static
})

assert changeset.valid?
sink = Ecto.Changeset.apply_changes(changeset)
assert sink.compression == :none
end
end

describe "to_brod_config/1" do
test "includes compression when set to gzip" do
sink = %KafkaSink{
hosts: "localhost:9092",
topic: "test-topic",
tls: false,
compression: :gzip
}

config = KafkaSink.to_brod_config(sink)
assert Keyword.get(config, :compression) == :gzip
end

test "does not include compression when set to :none" do
sink = %KafkaSink{
hosts: "localhost:9092",
topic: "test-topic",
tls: false,
compression: :none
}

config = KafkaSink.to_brod_config(sink)
refute Keyword.has_key?(config, :compression)
end

test "does not include compression when nil" do
sink = %KafkaSink{
hosts: "localhost:9092",
topic: "test-topic",
tls: false,
compression: nil
}

config = KafkaSink.to_brod_config(sink)
refute Keyword.has_key?(config, :compression)
end

test "includes both compression and ssl when both are set" do
sink = %KafkaSink{
hosts: "localhost:9092",
topic: "test-topic",
tls: true,
compression: :gzip
}

config = KafkaSink.to_brod_config(sink)
assert Keyword.get(config, :compression) == :gzip
assert Keyword.get(config, :ssl) == true
end
end

# Helper function to extract error messages
Expand Down
Loading