A Kafka Connect source connector for replicating data from one Kafka cluster to another. This connector consumes messages from a source Kafka topic and produces them to a target Kafka cluster, preserving message keys, values, headers, partitions, and timestamps.
apache-kafka-replicator/
├── src/
│ ├── main/
│ │ ├── java/com/octavalo/kafka/connector/
│ │ │ ├── CustomSourceConnector.java # Main connector class
│ │ │ ├── CustomSourceTask.java # Task implementation with consumer logic
│ │ │ └── CustomSourceConnectorConfig.java # Configuration with JAAS/SSL support
│ │ └── resources/
│ │ └── custom-source-connector.properties
│ └── test/
│ └── java/com/octavalo/kafka/connector/
│ ├── CustomSourceConnectorTest.java # Connector unit tests
│ ├── CustomSourceTaskTest.java # Task unit tests
│ └── CustomSourceConnectorConfigTest.java # Configuration unit tests
├── pom.xml # Maven build configuration
├── README.md # This file
└── .gitignore # Git ignore rules
- CustomSourceConnector: Main connector class that manages replication tasks
- CustomSourceTask: Task that consumes from source Kafka cluster and produces to target cluster
- CustomSourceConnectorConfig: Configuration management with source cluster connection settings
- Cross-cluster replication: Replicate data from one Kafka cluster to another
- Preserve message integrity: Maintains original keys, values, and headers
- Partition preservation: Option to maintain source partition assignment
- Timestamp preservation: Option to keep original message timestamps
- Offset management: Automatic offset tracking and resumption
- Security support: SASL/SSL authentication for source cluster
- Configurable performance: Tunable batch sizes and poll intervals
- Java 11 or higher
- Apache Maven 3.6+
- Apache Kafka 3.6.0 or compatible version
- Kafka Connect runtime environment
- Clone the repository:
cd /Users/octavalo/Documents/projects/apache-kafka-replicator- Build the project with Maven:
mvn clean package- The compiled JAR will be available at:
target/kafka-connector-1.0.0-jar-with-dependencies.jar
Edit src/main/resources/custom-source-connector.properties:
name=kafka-replicator-connector
connector.class=com.octavalo.kafka.connector.CustomSourceConnector
tasks.max=1
# Source Kafka cluster configuration
source.bootstrap.servers=source-kafka-broker:9092
source.topic=source-topic-name
source.group.id=kafka-replicator-connector
source.security.protocol=PLAINTEXT
# Target topic configuration
target.topic=target-topic-name
# Performance tuning
poll.timeout.ms=1000
max.poll.records=500
# Data preservation options
preserve.partitions=true
preserve.timestamps=trueConfiguration Parameters:
Required:
source.bootstrap.servers: Bootstrap servers for the source Kafka clustersource.topic: Source topic to replicate from
Optional:
target.topic: Target topic name (defaults to source topic name if not specified)source.group.id: Consumer group ID for source cluster (default:kafka-replicator-connector)poll.timeout.ms: Timeout for polling source cluster (default:1000)max.poll.records: Maximum records per poll (default:500)preserve.partitions: Preserve source partition assignment (default:true)preserve.timestamps: Preserve original message timestamps (default:true)
Security Configuration:
source.security.protocol: Security protocol - PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL (default:PLAINTEXT)source.sasl.mechanism: SASL mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPIsource.sasl.jaas.config: Direct JAAS configuration string (most flexible)source.sasl.username: Username for SASL authentication (auto-builds JAAS config)source.sasl.password: Password for SASL authentication (auto-builds JAAS config)source.ssl.truststore.location: Path to SSL truststore filesource.ssl.truststore.password: Password for SSL truststoresource.ssl.keystore.location: Path to SSL keystore filesource.ssl.keystore.password: Password for SSL keystoresource.ssl.key.password: Password for the key in the keystore
- Copy the JAR to Kafka Connect's plugin directory:
mkdir -p /path/to/kafka/plugins/kafka-connector
cp target/kafka-connector-1.0.0-jar-with-dependencies.jar /path/to/kafka/plugins/kafka-connector/- Update Kafka Connect worker configuration (
connect-standalone.properties):
plugin.path=/path/to/kafka/plugins- Start the connector in standalone mode:
connect-standalone.sh config/connect-standalone.properties \
src/main/resources/custom-source-connector.properties-
Copy the JAR to Kafka Connect's plugin directory on all worker nodes.
-
Start Kafka Connect in distributed mode:
connect-distributed.sh config/connect-distributed.properties- Deploy the connector via REST API:
Basic Configuration:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "kafka-replicator-connector",
"config": {
"connector.class": "com._8kta.kafka.connector.CustomSourceConnector",
"tasks.max": "1",
"source.bootstrap.servers": "source-kafka:9092",
"source.topic": "my-source-topic",
"target.topic": "my-target-topic",
"source.group.id": "kafka-replicator",
"poll.timeout.ms": "1000",
"max.poll.records": "500",
"preserve.partitions": "true",
"preserve.timestamps": "true"
}
}'With SASL PLAIN Authentication (Direct JAAS):
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "kafka-replicator-connector",
"config": {
"connector.class": "com.octavalo.kafka.connector.CustomSourceConnector",
"tasks.max": "1",
"source.bootstrap.servers": "source-kafka:9092",
"source.topic": "my-source-topic",
"target.topic": "my-target-topic",
"source.security.protocol": "SASL_SSL",
"source.sasl.mechanism": "PLAIN",
"source.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";"
}
}'With SASL Authentication (Simplified Username/Password):
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "kafka-replicator-connector",
"config": {
"connector.class": "com.octavalo.kafka.connector.CustomSourceConnector",
"tasks.max": "1",
"source.bootstrap.servers": "source-kafka:9092",
"source.topic": "my-source-topic",
"target.topic": "my-target-topic",
"source.security.protocol": "SASL_SSL",
"source.sasl.mechanism": "PLAIN",
"source.sasl.username": "myuser",
"source.sasl.password": "mypassword"
}
}'With SCRAM-SHA-256 Authentication:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "kafka-replicator-connector",
"config": {
"connector.class": "com.octavalo.kafka.connector.CustomSourceConnector",
"tasks.max": "1",
"source.bootstrap.servers": "source-kafka:9092",
"source.topic": "my-source-topic",
"target.topic": "my-target-topic",
"source.security.protocol": "SASL_SSL",
"source.sasl.mechanism": "SCRAM-SHA-256",
"source.sasl.username": "myuser",
"source.sasl.password": "mypassword",
"source.ssl.truststore.location": "/path/to/truststore.jks",
"source.ssl.truststore.password": "truststore-pass"
}
}'curl http://localhost:8083/connectors/kafka-replicator-connector/statuscurl http://localhost:8083/connectorscurl -X DELETE http://localhost:8083/connectors/kafka-replicator-connectorcurl -X PUT http://localhost:8083/connectors/kafka-replicator-connector/pausecurl -X PUT http://localhost:8083/connectors/kafka-replicator-connector/resume- Consumer Setup: The connector creates a Kafka consumer that connects to the source cluster
- Offset Management: Uses Kafka Connect's offset storage to track progress and enable resumption
- Message Polling: Continuously polls messages from the source topic
- Data Preservation: Maintains message keys, values, headers, partitions (optional), and timestamps (optional)
- Production: Produces messages to the target cluster via Kafka Connect framework
Simplest authentication method, suitable for development and testing.
Option 1: Direct JAAS Configuration
source.security.protocol=SASL_PLAINTEXT
source.sasl.mechanism=PLAIN
source.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";Option 2: Simplified (Auto-builds JAAS)
source.security.protocol=SASL_PLAINTEXT
source.sasl.mechanism=PLAIN
source.sasl.username=user
source.sasl.password=passwordMore secure than PLAIN, recommended for production.
source.security.protocol=SASL_SSL
source.sasl.mechanism=SCRAM-SHA-256
source.sasl.username=user
source.sasl.password=password
source.ssl.truststore.location=/path/to/truststore.jks
source.ssl.truststore.password=truststore-passwordCertificate-based authentication.
source.security.protocol=SSL
source.ssl.truststore.location=/path/to/truststore.jks
source.ssl.truststore.password=truststore-password
source.ssl.keystore.location=/path/to/keystore.jks
source.ssl.keystore.password=keystore-password
source.ssl.key.password=key-passwordFor enterprise environments with Kerberos.
source.security.protocol=SASL_PLAINTEXT
source.sasl.mechanism=GSSAPI
source.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="/path/to/kafka.keytab" principal="kafka/hostname@REALM";- Use SSL/TLS in Production: Always use
SASL_SSLorSSLprotocols for production environments - Secure Credentials: Store passwords in external secret management systems (e.g., HashiCorp Vault, AWS Secrets Manager)
- Use SCRAM over PLAIN: SCRAM-SHA-256 or SCRAM-SHA-512 are more secure than PLAIN
- Rotate Credentials: Regularly rotate passwords and certificates
- Limit Permissions: Use Kafka ACLs to restrict connector permissions to only necessary topics
- Monitor Authentication: Enable audit logging for authentication attempts
- Disaster Recovery: Replicate critical topics to a backup cluster
- Data Migration: Move data from one Kafka cluster to another
- Multi-Region Replication: Sync data across geographically distributed clusters
- Development/Testing: Copy production data to non-production environments
- Cloud Migration: Migrate from on-premise to cloud Kafka clusters
To adapt this connector for specific requirements:
-
Add Message Filtering:
- Modify
CustomSourceTask.poll()to filter messages based on criteria - Add configuration for filter rules
- Modify
-
Add Message Transformation:
- Implement transformation logic in the task
- Consider using Kafka Connect SMTs (Single Message Transforms) instead
-
Multi-Topic Replication:
- Extend configuration to support multiple source topics
- Update task to handle topic routing
-
Add Dependencies:
- Update
pom.xmlwith any additional libraries needed
- Update
Run unit tests:
mvn testThe connectors use SLF4J for logging. Configure logging levels in your Kafka Connect worker configuration:
log4j.logger.com.octavalo.kafka.connector=DEBUGAvailable log levels:
- ERROR: Critical errors and exceptions
- WARN: Warning messages (invalid configurations, processing errors)
- INFO: General operational information (startup, shutdown, replication status)
- DEBUG: Detailed debugging information (configuration details, offset management)
- TRACE: Very detailed trace information (individual record processing)
Run mvn clean install to download all dependencies.
- Check the Kafka Connect logs for detailed error messages
- Verify all required configuration parameters are provided
- Ensure the JAR is in the correct plugin directory
- Verify the connector status via REST API
- Check connector and task logs
- Ensure topics exist and are accessible
This project is open source. See LICENSE file for details.
Contributions are welcome! Please submit pull requests or open issues for bugs and feature requests