Skip to content

Add Kafka support.#65

Open
ggallen wants to merge 1 commit intorelease-engineering:mainfrom
ggallen:add-kafka-support-origin
Open

Add Kafka support.#65
ggallen wants to merge 1 commit intorelease-engineering:mainfrom
ggallen:add-kafka-support-origin

Conversation

@ggallen
Copy link

@ggallen ggallen commented Feb 1, 2026

Assisted-by: Cursor AI

Merge Request: Kafka Messaging Support

Summary

This MR adds support for Kafka messaging as an alternative transport to the existing UMB (Unified Message Bus) implementation. The implementation allows pubtools-sign to send signing requests and receive responses via Kafka brokers using SASL_SSL authentication.

Configuration

Transport Selection

The configuration file determines which messaging transport is used. Only one transport can be active at a time:

  • If msg_signer (or msg_batch_signer) is present → UMB transport is used
  • If kafka_signer (or kafka_batch_signer) is present → Kafka transport is used
  • A configuration file cannot contain both UMB and Kafka signer sections

Kafka Configuration

Use --config-file with a YAML file containing kafka_signer:

kafka_signer:
  bootstrap_servers:
    - "broker1.kafka.example.com:9096"
    - "broker2.kafka.example.com:9096"
  username: "kafka-user"
  password: "kafka-password"
  topic_send_to: "signing-requests"
  topic_listen_to: "signing-responses"
  group_id: "pubtools-sign-consumer"
  environment: "prod"
  service: "pubtools-sign"
  timeout: 60
  retries: 3
  message_id_key: "request_id"
  # Optional: exponential backoff settings
  fallback_base: 1.0
  fallback_factor: 2.0

Changes Overview

New Files

Models

  • src/pubtools/sign/models/kafka.py
    • KafkaMessage dataclass: Mirrors MsgMessage for Kafka message structure
    • KafkaError dataclass: Mirrors MsgError for Kafka error handling

Clients

  • src/pubtools/sign/clients/kafka_send_client.py

    • KafkaSendClient class: Kafka producer with retry logic and exponential backoff
    • Uses confluent-kafka library
    • Supports SASL_SSL authentication with SCRAM-SHA-512
    • Includes delivery confirmation callbacks
  • src/pubtools/sign/clients/kafka_recv_client.py

    • _KafkaRecvClient class: Internal receiver implementing request-response pattern
    • KafkaRecvClient class: Public wrapper for the receiver
    • KafkaRecvThread class: Threaded wrapper for asynchronous message receiving
    • Implements timeout handling and message ID matching

Modified Files

Configuration (src/pubtools/sign/conf/conf.py)

  • Added KafkaSignerSchema with fields:
    • bootstrap_servers (required)
    • username, password (SASL authentication)
    • topic_send_to, topic_listen_to (required)
    • group_id, retries, timeout
    • fallback_base, fallback_factor (exponential backoff settings)
  • Added KafkaBatchSignerSchema extending KafkaSignerSchema with chunk_size
  • Updated ConfigSchema with optional kafka_signer and kafka_batch_signer fields

Signer (src/pubtools/sign/signers/msgsigner.py)

  • Added Kafka configuration fields to MsgSigner and MsgBatchSigner:
    • kafka_enabled, kafka_bootstrap_servers
    • kafka_username, kafka_password
    • kafka_topic_send_to, kafka_topic_listen_to
    • kafka_group_id, kafka_retries
    • kafka_fallback_base, kafka_fallback_factor
  • Added _load_kafka_config() method for loading Kafka configuration
  • Added _load_kafka_config_batch() method for batch signer Kafka configuration
  • Added _construct_kafka_messages() method for converting UMB messages to Kafka format
  • Added _kafka_send_and_receive() method for Kafka request-response flow
  • Modified _send_and_receive() to support either UMB or Kafka transport (mutually exclusive)

Dependencies (pyproject.toml)

  • Added confluent-kafka>=2.0.0 dependency

Tests Added

Unit Tests

tests/test_kafka_send_client.py

  • TestKafkaSendClient.test_send_zero_messages: Verifies empty message list handling
  • TestKafkaSendClient.test_send_single_message: Tests successful single message delivery
  • TestKafkaSendClient.test_send_multiple_messages: Tests batch message sending
  • TestKafkaSendClient.test_send_with_delivery_error: Tests error handling on delivery failure
  • TestKafkaSendClient.test_send_with_retry: Tests retry logic with exponential backoff
  • TestKafkaSendClient.test_close: Tests client cleanup

tests/test_kafka_recv_client.py

  • TestKafkaRecvClient.test_recv_zero_messages: Tests empty expected message list
  • TestKafkaRecvClient.test_recv_single_message: Tests receiving a single expected message
  • TestKafkaRecvClient.test_recv_multiple_messages: Tests receiving multiple messages
  • TestKafkaRecvClient.test_recv_timeout: Tests timeout handling
  • TestKafkaRecvClient.test_recv_ignore_unexpected_message: Tests filtering unexpected messages
  • TestKafkaRecvClient.test_recv_close: Tests consumer cleanup
  • TestKafkaRecvClient.test_recv_consumer_error: Tests handling of consumer poll errors
  • TestKafkaRecvThread.test_recv_thread: Tests threaded receiver operation
  • TestKafkaRecvThread.test_recv_thread_stop: Tests stopping the receiver thread

tests/test_msg_signer_kafka.py

  • TestMsgSignerKafkaConfig.test_kafka_config_loading: Tests Kafka configuration loading
  • TestMsgSignerKafkaConfig.test_kafka_config_standalone_format: Tests standalone Kafka config format
  • TestMsgSignerKafkaConfig.test_kafka_config_umb_only: Tests UMB-only config (Kafka disabled)
  • TestMsgSignerKafkaConfig.test_kafka_config_defaults: Tests default values when Kafka not configured
  • TestMsgSignerKafkaConfig.test_kafka_doc_arguments: Tests Kafka options in doc_arguments
  • TestMsgSignerKafkaMessaging.test_kafka_disabled_no_send: Tests Kafka not called when disabled
  • TestMsgSignerKafkaMessaging.test_kafka_enabled_attributes: Tests Kafka attributes when enabled

Test Fixtures Added (tests/conftest.py)

  • f_kafka_bootstrap_servers: Kafka broker URLs fixture
  • f_kafka_topic_send: Send topic fixture
  • f_kafka_topic_recv: Receive topic fixture
  • f_kafka_credentials: Username/password credentials fixture
  • f_kafka_group_id: Consumer group ID fixture
  • f_config_msg_signer_with_kafka: Kafka-only configuration fixture
  • f_config_kafka_standalone: Standalone Kafka config fixture

Additional Changes to src/pubtools/sign/clients/msg_send_client.py (UMB)

The following changes were made to the UMB send client, not related to Kafka support:

Improved Logging

  • Added INFO-level log in on_connection_opened(): Logs when the sender connection is opened
  • Changed message sending log to INFO level with full details (address, body, headers)
  • Added INFO-level log in on_accepted(): Logs when messages are accepted by broker

Error Handling Improvements

  • Added on_rejected() handler:

    • Logs detailed error when broker rejects a message
    • Extracts rejection condition from delivery remote state
    • Appends MsgError with rejection details to errors list
    • Closes connection immediately on rejection
  • Added on_released() handler:

    • Logs warning when broker releases a message (not delivered)
    • Appends MsgError to errors list
    • Closes connection to prevent hanging

Receiver Thread Cleanup

  • Modified _umb_send_and_receive() in msgsigner.py to stop the receiver thread when sender errors occur, preventing the script from hanging indefinitely

Signed-off-by: Greg Allen <gallen@redhat.com>
Assisted-by: Cursor AI
@ggallen
Copy link
Author

ggallen commented Feb 1, 2026

I also tested these changes using a fake RADAS server for both UMB and Kafka, using real UMB and Kafka test topics. The server just receives the signing request and then sends the response. Everything works fine.

I can provide these test scripts if desired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant