Open
Conversation
Signed-off-by: Greg Allen <gallen@redhat.com> Assisted-by: Cursor AI
Author
|
I also tested these changes using a fake RADAS server for both UMB and Kafka, using real UMB and Kafka test topics. The server just receives the signing request and then sends the response. Everything works fine. I can provide these test scripts if desired. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Assisted-by: Cursor AI
Merge Request: Kafka Messaging Support
Summary
This MR adds support for Kafka messaging as an alternative transport to the existing UMB (Unified Message Bus) implementation. The implementation allows
pubtools-signto send signing requests and receive responses via Kafka brokers using SASL_SSL authentication.Configuration
Transport Selection
The configuration file determines which messaging transport is used. Only one transport can be active at a time:
msg_signer(ormsg_batch_signer) is present → UMB transport is usedkafka_signer(orkafka_batch_signer) is present → Kafka transport is usedKafka Configuration
Use
--config-filewith a YAML file containingkafka_signer:Changes Overview
New Files
Models
src/pubtools/sign/models/kafka.pyKafkaMessagedataclass: MirrorsMsgMessagefor Kafka message structureKafkaErrordataclass: MirrorsMsgErrorfor Kafka error handlingClients
src/pubtools/sign/clients/kafka_send_client.pyKafkaSendClientclass: Kafka producer with retry logic and exponential backoffconfluent-kafkalibrarysrc/pubtools/sign/clients/kafka_recv_client.py_KafkaRecvClientclass: Internal receiver implementing request-response patternKafkaRecvClientclass: Public wrapper for the receiverKafkaRecvThreadclass: Threaded wrapper for asynchronous message receivingModified Files
Configuration (
src/pubtools/sign/conf/conf.py)KafkaSignerSchemawith fields:bootstrap_servers(required)username,password(SASL authentication)topic_send_to,topic_listen_to(required)group_id,retries,timeoutfallback_base,fallback_factor(exponential backoff settings)KafkaBatchSignerSchemaextendingKafkaSignerSchemawithchunk_sizeConfigSchemawith optionalkafka_signerandkafka_batch_signerfieldsSigner (
src/pubtools/sign/signers/msgsigner.py)MsgSignerandMsgBatchSigner:kafka_enabled,kafka_bootstrap_serverskafka_username,kafka_passwordkafka_topic_send_to,kafka_topic_listen_tokafka_group_id,kafka_retrieskafka_fallback_base,kafka_fallback_factor_load_kafka_config()method for loading Kafka configuration_load_kafka_config_batch()method for batch signer Kafka configuration_construct_kafka_messages()method for converting UMB messages to Kafka format_kafka_send_and_receive()method for Kafka request-response flow_send_and_receive()to support either UMB or Kafka transport (mutually exclusive)Dependencies (
pyproject.toml)confluent-kafka>=2.0.0dependencyTests Added
Unit Tests
tests/test_kafka_send_client.pyTestKafkaSendClient.test_send_zero_messages: Verifies empty message list handlingTestKafkaSendClient.test_send_single_message: Tests successful single message deliveryTestKafkaSendClient.test_send_multiple_messages: Tests batch message sendingTestKafkaSendClient.test_send_with_delivery_error: Tests error handling on delivery failureTestKafkaSendClient.test_send_with_retry: Tests retry logic with exponential backoffTestKafkaSendClient.test_close: Tests client cleanuptests/test_kafka_recv_client.pyTestKafkaRecvClient.test_recv_zero_messages: Tests empty expected message listTestKafkaRecvClient.test_recv_single_message: Tests receiving a single expected messageTestKafkaRecvClient.test_recv_multiple_messages: Tests receiving multiple messagesTestKafkaRecvClient.test_recv_timeout: Tests timeout handlingTestKafkaRecvClient.test_recv_ignore_unexpected_message: Tests filtering unexpected messagesTestKafkaRecvClient.test_recv_close: Tests consumer cleanupTestKafkaRecvClient.test_recv_consumer_error: Tests handling of consumer poll errorsTestKafkaRecvThread.test_recv_thread: Tests threaded receiver operationTestKafkaRecvThread.test_recv_thread_stop: Tests stopping the receiver threadtests/test_msg_signer_kafka.pyTestMsgSignerKafkaConfig.test_kafka_config_loading: Tests Kafka configuration loadingTestMsgSignerKafkaConfig.test_kafka_config_standalone_format: Tests standalone Kafka config formatTestMsgSignerKafkaConfig.test_kafka_config_umb_only: Tests UMB-only config (Kafka disabled)TestMsgSignerKafkaConfig.test_kafka_config_defaults: Tests default values when Kafka not configuredTestMsgSignerKafkaConfig.test_kafka_doc_arguments: Tests Kafka options in doc_argumentsTestMsgSignerKafkaMessaging.test_kafka_disabled_no_send: Tests Kafka not called when disabledTestMsgSignerKafkaMessaging.test_kafka_enabled_attributes: Tests Kafka attributes when enabledTest Fixtures Added (
tests/conftest.py)f_kafka_bootstrap_servers: Kafka broker URLs fixturef_kafka_topic_send: Send topic fixturef_kafka_topic_recv: Receive topic fixturef_kafka_credentials: Username/password credentials fixturef_kafka_group_id: Consumer group ID fixturef_config_msg_signer_with_kafka: Kafka-only configuration fixturef_config_kafka_standalone: Standalone Kafka config fixtureAdditional Changes to
src/pubtools/sign/clients/msg_send_client.py(UMB)The following changes were made to the UMB send client, not related to Kafka support:
Improved Logging
on_connection_opened(): Logs when the sender connection is openedon_accepted(): Logs when messages are accepted by brokerError Handling Improvements
Added
on_rejected()handler:MsgErrorwith rejection details to errors listAdded
on_released()handler:MsgErrorto errors listReceiver Thread Cleanup
_umb_send_and_receive()inmsgsigner.pyto stop the receiver thread when sender errors occur, preventing the script from hanging indefinitely