In plain English: AdminClient is like a remote control for Kafka - it lets your applications create topics, check configurations, manage consumer groups, and perform other administrative tasks without using command-line tools.
In technical terms: Apache Kafka's AdminClient provides a programmatic API for administrative operations that were previously only available via CLI tools or direct ZooKeeper manipulation.
Why it matters: Applications can dynamically adapt to changing requirements - creating topics on demand, validating configurations, managing consumer groups, and recovering from failures - all without manual intervention or external tooling.
- Introduction
- AdminClient Overview
- 2.1. Design Principles
- 2.2. Creating and Configuring
- Topic Management
- 3.1. Listing Topics
- 3.2. Describing Topics
- 3.3. Creating Topics
- 3.4. Deleting Topics
- Configuration Management
- Consumer Group Management
- Advanced Operations
- 6.1. Adding Partitions
- 6.2. Deleting Records
- 6.3. Leader Election
- 6.4. Reassigning Replicas
- Working with Futures
- Testing with MockAdminClient
- Summary
Before AdminClient existed, administrative tasks required either command-line tools or direct ZooKeeper manipulation. This created challenges for application developers.
The old world (before AdminClient):
Need to create a topic?
├─ Option 1: Tell users to run kafka-topics.sh
│ └─> Manual step, error-prone
├─ Option 2: Hope auto.create.topics.enable = true
│ └─> Unreliable, not always enabled
└─ Option 3: Use internal APIs
└─> No compatibility guarantees, breaks between versions
The new world (with AdminClient):
Need to create a topic?
└─> Check if exists, create if not, validate configuration
All programmatically, all in your application code
Common use cases:
- IoT applications: Create topic per device type on-demand
- Multi-tenant systems: Create topic per tenant automatically
- Application startup: Validate required topics exist with correct configuration
- SRE tooling: Build custom automation and recovery scripts
- Monitoring: Check consumer lag, detect slow consumers
In plain English: AdminClient sends requests and returns immediately with a Future. The actual work happens asynchronously, and Kafka's internal state takes time to propagate.
Timeline:
0ms: createTopics() returns immediately → Future returned
↓
10ms: Request reaches controller
↓
50ms: Controller updates its state → Future completes
↓
100ms: Some brokers know about new topic
↓
500ms: All brokers know about new topic
Eventual consistency implications:
admin.createTopics(["new-topic"])
.all().get(); // Wait for Future to complete
// At this moment:
// - Controller knows about topic ✓
// - Some brokers know about topic (maybe)
// - All brokers know about topic (eventually)
admin.listTopics() // Might not include "new-topic" yet!
💡 Insight
This is similar to DNS propagation - when you create a new domain, it takes time for all DNS servers worldwide to learn about it. Similarly, when you create a topic, it takes time for all Kafka brokers to learn about it.
Every method accepts an Options object:
// All methods follow this pattern:
admin.listTopics(options);
admin.createTopics(topics, options);
admin.describeCluster(options);Common options:
ListTopicsOptions options = new ListTopicsOptions()
.timeoutMs(30000) // Wait up to 30 seconds
.listInternal(true); // Include internal topics
DescribeClusterOptions options = new DescribeClusterOptions()
.timeoutMs(60000)
.includeAuthorizedOperations(true); // Show what client can doEverything in one class:
KafkaAdminClient admin = ...;
admin.listTopics(); // Topic operations
admin.describeConsumerGroups(); // Consumer group operations
admin.describeConfigs(); // Configuration operations
admin.describeCluster(); // Cluster operations
admin.electLeaders(); // Advanced operations
// No admin.topics().list()
// No admin.consumerGroups().describe()
// Everything is directly on AdminClientBenefit: Easy discovery - one class, one JavaDoc to search
Basic creation:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
// ... use admin ...
admin.close(Duration.ofSeconds(30)); // Wait up to 30s for ongoing operationsImportant configurations:
Scenario 1: DNS Alias for Bootstrap
Problem:
├─ Brokers: broker1.example.com, broker2.example.com, broker3.example.com
├─ Create alias: kafka.example.com → All brokers
├─ Use SASL authentication
└─> SASL expects broker name but gets alias → Authentication fails!
Solution:
props.put("client.dns.lookup", "resolve_canonical_bootstrap_servers_only");
Result:
├─ Client resolves kafka.example.com → [broker1, broker2, broker3]
└─> Connects using actual broker names → Authentication succeeds ✓
Scenario 2: Load Balancer with Multiple IPs
Problem:
├─ broker1.example.com → [IP1, IP2, IP3] (all load balancers)
├─ Client tries only first IP
└─> If IP1 down, connection fails even though broker accessible!
Solution:
props.put("client.dns.lookup", "use_all_dns_ips");
Result:
├─ Client gets all IPs
├─> Tries IP1, if fails tries IP2, if fails tries IP3
└─> Connection succeeds as long as any load balancer works ✓
props.put("request.timeout.ms", "120000"); // 2 minutes (default)Why long timeout:
Some operations are slow:
├─ Consumer group management: 30-60 seconds possible
├─ Creating many topics: Several seconds
└─> Better to wait than fail prematurely
Simple list:
ListTopicsResult topics = admin.listTopics();
topics.names().get().forEach(System.out::println);How it works:
listTopics() returns immediately → ListTopicsResult
↓
topics.names() returns Future<Set<String>>
↓
.get() blocks until Future completes
↓
Set<String> returned → Iterate and print
Check if topic exists with correct configuration:
DescribeTopicsResult demoTopic = admin.describeTopics(
Collections.singletonList("demo-topic"));
try {
TopicDescription desc = demoTopic.values().get("demo-topic").get();
System.out.println("Topic: " + desc.name());
System.out.println("Partitions: " + desc.partitions().size());
if (desc.partitions().size() != EXPECTED_PARTITIONS) {
System.out.println("Wrong number of partitions!");
System.exit(-1);
}
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
System.out.println("Topic doesn't exist");
} else {
throw e;
}
}What TopicDescription contains:
TopicDescription:
├─ name: "demo-topic"
├─ internal: false
├─ partitions: List<TopicPartitionInfo>
│ └─ For each partition:
│ ├─ partition: 0
│ ├─ leader: Broker 1
│ ├─ replicas: [Broker 1, Broker 2, Broker 3]
│ └─ isr: [Broker 1, Broker 2, Broker 3]
└─ authorizedOperations: [READ, WRITE, DELETE, ...]
💡 Insight
AdminClient result objects throw ExecutionException when Kafka returns an error. This is because they're wrapped Future objects. Always examine e.getCause() to get the actual Kafka error.
Create with all defaults:
NewTopic newTopic = new NewTopic("simple-topic", -1, (short) -1);
// -1 means use broker defaults for partitions and replicas
admin.createTopics(Collections.singletonList(newTopic)).all().get();Create with specific configuration:
NewTopic newTopic = new NewTopic(
"demo-topic", // name
NUM_PARTITIONS, // partitions
REP_FACTOR); // replication factor
CreateTopicsResult result = admin.createTopics(
Collections.singletonList(newTopic));
// Validate result
if (result.numPartitions("demo-topic").get() != NUM_PARTITIONS) {
System.out.println("Topic created with wrong number of partitions!");
System.exit(-1);
}Complete example: Check and create if needed:
try {
// Try to describe the topic
TopicDescription desc = admin.describeTopics(
Collections.singletonList(TOPIC_NAME))
.values().get(TOPIC_NAME).get();
// Topic exists, validate configuration
if (desc.partitions().size() != NUM_PARTITIONS) {
System.out.println("Topic has wrong number of partitions. Exiting.");
System.exit(-1);
}
System.out.println("Topic exists with correct configuration");
} catch (ExecutionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
e.printStackTrace();
throw e;
}
// Topic doesn't exist, create it
System.out.println("Topic doesn't exist. Creating...");
NewTopic newTopic = new NewTopic(TOPIC_NAME, NUM_PARTITIONS, REP_FACTOR);
CreateTopicsResult newTopicResult = admin.createTopics(
Collections.singletonList(newTopic));
// Validate creation
if (newTopicResult.numPartitions(TOPIC_NAME).get() != NUM_PARTITIONS) {
System.out.println("Topic created with wrong number of partitions.");
System.exit(-1);
}
System.out.println("Topic created successfully");
}admin.deleteTopics(Collections.singletonList("demo-topic")).all().get();
// Verify deletion (may take time due to async nature)
try {
admin.describeTopics(Collections.singletonList("demo-topic"))
.values().get("demo-topic").get();
System.out.println("Topic still exists");
} catch (ExecutionException e) {
System.out.println("Topic deleted successfully");
}Critical warning:
Deletion is FINAL:
├─ No recycle bin
├─ No "Are you sure?"
├─> Data is gone forever!
Best practices:
├─ Double-check topic name
├─ Require manual confirmation in tooling
└─> Consider backup/snapshot before deletion
Get topic configuration:
ConfigResource configResource = new ConfigResource(
ConfigResource.Type.TOPIC,
"demo-topic");
DescribeConfigsResult configsResult = admin.describeConfigs(
Collections.singleton(configResource));
Config configs = configsResult.all().get().get(configResource);
// Print non-default configurations
configs.entries().stream()
.filter(entry -> !entry.isDefault())
.forEach(System.out::println);Configuration types:
ConfigResource.Type:
├─ TOPIC: Topic configurations
├─ BROKER: Broker configurations
└─ BROKER_LOGGER: Broker logging levels
What ConfigEntry contains:
ConfigEntry:
├─ name: "cleanup.policy"
├─ value: "compact"
├─ isDefault: false
├─ isSensitive: false
├─ isReadOnly: false
└─ source: USER (or DEFAULT, DYNAMIC_BROKER_CONFIG, etc.)
Example: Ensure topic is compacted:
ConfigResource configResource = new ConfigResource(
ConfigResource.Type.TOPIC,
"demo-topic");
// Check current configuration
DescribeConfigsResult result = admin.describeConfigs(
Collections.singleton(configResource));
Config configs = result.all().get().get(configResource);
// Check if topic is compacted
ConfigEntry compaction = new ConfigEntry(
TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
if (!configs.entries().contains(compaction)) {
// Topic not compacted, fix it
System.out.println("Topic not compacted. Updating configuration...");
Collection<AlterConfigOp> configOp = new ArrayList<>();
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConf = new HashMap<>();
alterConf.put(configResource, configOp);
admin.incrementalAlterConfigs(alterConf).all().get();
System.out.println("Topic is now compacted");
} else {
System.out.println("Topic is already compacted");
}Operation types:
AlterConfigOp.OpType:
├─ SET: Set configuration value
│ └─> cleanup.policy = compact
├─ DELETE: Remove configuration (revert to default)
│ └─> cleanup.policy = [default]
├─ APPEND: Add to list configuration
│ └─> compression.type = [existing, snappy]
└─ SUBTRACT: Remove from list configuration
└─> compression.type = [existing - gzip]
Real-world scenario:
Incident: Broker configuration file accidentally replaced
Problem: Broker won't start with broken config
Solution:
├─ Connect to another broker
├─> admin.describeConfigs() to dump configuration
└─> Reconstruct correct configuration file
💡 Insight
Being able to describe and modify configurations programmatically is incredibly powerful during emergencies. Build tools ahead of time - don't wait until 3 AM when production is down!
admin.listConsumerGroups().valid().get().forEach(System.out::println);Understanding result methods:
listConsumerGroups() returns ListConsumerGroupsResult
.valid(): Return only successful results (ignore errors)
.errors(): Return only errors
.all(): Return first error or all successful results
Why errors occur:
Possible errors:
├─ Authorization: Client lacks permission
├─> Coordinator unavailable: Broker down
└─> Network issues: Timeout
ConsumerGroupDescription groupDesc = admin
.describeConsumerGroups(Collections.singletonList("my-group"))
.describedGroups().get("my-group").get();
System.out.println("Group: " + groupDesc.groupId());
System.out.println("State: " + groupDesc.state());
System.out.println("Coordinator: " + groupDesc.coordinator());
System.out.println("Partition assignment strategy: " +
groupDesc.partitionAssignor());
System.out.println("Members:");
for (MemberDescription member : groupDesc.members()) {
System.out.println(" Member ID: " + member.consumerId());
System.out.println(" Client ID: " + member.clientId());
System.out.println(" Host: " + member.host());
System.out.println(" Assigned partitions: " + member.assignment());
}What ConsumerGroupDescription contains:
ConsumerGroupDescription:
├─ groupId: "my-consumer-group"
├─ state: STABLE (or PREPARING_REBALANCE, COMPLETING_REBALANCE, DEAD, EMPTY)
├─ coordinator: Broker(id=2, host=kafka2.example.com:9092)
├─ partitionAssignor: "range"
├─ members: List<MemberDescription>
│ └─ For each member:
│ ├─ consumerId: "consumer-1-a1b2c3d4"
│ ├─ clientId: "consumer-1"
│ ├─ host: "/192.168.1.10"
│ └─ assignment: {topic1-0, topic1-1, topic2-0}
└─ authorizedOperations: [READ, DELETE, ...]
Complete example:
// Step 1: Get committed offsets for the consumer group
Map<TopicPartition, OffsetAndMetadata> offsets = admin
.listConsumerGroupOffsets("my-consumer-group")
.partitionsToOffsetAndMetadata().get();
// Step 2: Prepare request for latest offsets in those partitions
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for (TopicPartition tp : offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}
// Step 3: Get latest offsets
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();
// Step 4: Calculate and print lag for each partition
for (Map.Entry<TopicPartition, OffsetAndMetadata> e : offsets.entrySet()) {
String topic = e.getKey().topic();
int partition = e.getKey().partition();
long committedOffset = e.getValue().offset();
long latestOffset = latestOffsets.get(e.getKey()).offset();
long lag = latestOffset - committedOffset;
System.out.printf("Topic: %s, Partition: %d, " +
"Committed: %d, Latest: %d, Lag: %d%n",
topic, partition, committedOffset, latestOffset, lag);
}Visual representation:
Partition timeline:
[0][1][2][3][4][5][6][7][8][9]
↑ ↑
Committed: 3 Latest: 9
Lag = 9 - 3 = 6 messages behind
OffsetSpec options:
OffsetSpec.earliest() // First offset in partition
OffsetSpec.latest() // Last offset in partition
OffsetSpec.forTimestamp(timestamp) // Offset at/after timestampUse case: Reset consumer to beginning of topic
Important warnings:
Before modifying offsets:
1. Stop the consumer group
├─> If group active, changes will be overwritten
└─> Will get UnknownMemberIdException
2. Consider impact on state
├─> Stream processing apps maintain state
├─> Reprocessing data may need state reset
└─> Example: Count of shoes sold must be reset too
3. Check auto.offset.reset
├─> Determines behavior when no offset found
└─> Better to explicitly set than rely on config
Complete example:
// Step 1: Get earliest offsets for all partitions
Map<TopicPartition, OffsetSpec> requestEarliestOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
requestEarliestOffsets.put(tp, OffsetSpec.earliest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> earliestOffsets =
admin.listOffsets(requestEarliestOffsets).all().get();
// Step 2: Convert to OffsetAndMetadata format
Map<TopicPartition, OffsetAndMetadata> resetOffsets = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e :
earliestOffsets.entrySet()) {
resetOffsets.put(e.getKey(),
new OffsetAndMetadata(e.getValue().offset()));
}
// Step 3: Update the consumer group offsets
try {
admin.alterConsumerGroupOffsets("my-consumer-group", resetOffsets)
.all().get();
System.out.println("Consumer group reset to beginning");
} catch (ExecutionException e) {
System.out.println("Failed to update offsets: " + e.getMessage());
if (e.getCause() instanceof UnknownMemberIdException) {
System.out.println("Consumer group is still active. Stop it first!");
}
}Timeline of reset:
Before reset:
Partition: [0][1][2][3][4][5][6][7][8][9]
↑
Committed: 6
After reset:
Partition: [0][1][2][3][4][5][6][7][8][9]
↑
Committed: 0
Consumer will reprocess all messages
When needed:
- Throughput exceeds partition capacity
- Need more parallelism
- Scaling out consumers
Risks:
Before adding partitions (4 partitions):
key="user123" → hash % 4 = partition 2 ✓
key="user456" → hash % 4 = partition 1 ✓
After adding partitions (6 partitions):
key="user123" → hash % 6 = partition 5 ✗ (changed!)
key="user456" → hash % 6 = partition 0 ✗ (changed!)
Result:
├─ Old messages for user123: partition 2
├─ New messages for user123: partition 5
└─> Order broken! State scattered!
Adding partitions:
Map<String, NewPartitions> newPartitions = new HashMap<>();
newPartitions.put("demo-topic",
NewPartitions.increaseTo(NUM_PARTITIONS + 2)); // Total count!
admin.createPartitions(newPartitions).all().get();Critical detail: Specify TOTAL partition count, not number to add!
Use case: Comply with data retention regulations
How it works:
Before deletion:
Partition: [0][1][2][3][4][5][6][7][8][9]
↑ ↑
Earliest: 0 Latest: 9
Delete up to offset 5:
admin.deleteRecords({partition: RecordsToDelete.beforeOffset(5)})
After deletion:
Partition: [X][X][X][X][X][5][6][7][8][9]
↑ ↑
Earliest: 5 Latest: 9
Deleted records:
├─ Marked as deleted
├─> Consumers cannot access
└─> Physical cleanup happens asynchronously
Example:
// Step 1: Get offsets for records older than specific time
Long oneMonthAgo = Instant.now().atZone(ZoneId.systemDefault())
.minusMonths(1).toEpochSecond();
Map<TopicPartition, Long> partitionTimestamps = new HashMap<>();
for (TopicPartition tp : partitions) {
partitionTimestamps.put(tp, oneMonthAgo);
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> olderOffsets =
admin.listOffsets(partitionTimestamps).all().get();
// Step 2: Convert to delete records format
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
for (Map.Entry<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> e :
olderOffsets.entrySet()) {
recordsToDelete.put(e.getKey(),
RecordsToDelete.beforeOffset(e.getValue().offset()));
}
// Step 3: Delete records
admin.deleteRecords(recordsToDelete).all().get();
System.out.println("Deleted records older than one month");Two types:
Purpose: Rebalance leaders across brokers
Scenario: Broker 1 crashed, leadership moved to others
Broker 1 comes back online
Current state:
├─ Partition 0: Leader = Broker 2 (preferred: Broker 1)
├─ Partition 1: Leader = Broker 3 (preferred: Broker 1)
└─> Broker 1 has no leadership (imbalanced!)
After preferred leader election:
├─ Partition 0: Leader = Broker 1 ✓
├─ Partition 1: Leader = Broker 1 ✓
└─> Broker 1 back to normal leadership
Triggering manually:
Set<TopicPartition> electableTopics = new HashSet<>();
electableTopics.add(new TopicPartition("demo-topic", 0));
try {
admin.electLeaders(ElectionType.PREFERRED, electableTopics).all().get();
System.out.println("Preferred leaders elected");
} catch (ExecutionException e) {
if (e.getCause() instanceof ElectionNotNeededException) {
System.out.println("Already using preferred leaders");
}
}Purpose: Restore availability when all in-sync replicas gone
Scenario:
Partition replicas: [Broker 1 (leader), Broker 2, Broker 3]
├─ Broker 1 crashes (leader lost)
├─> Broker 2, 3 not in-sync (missing data)
└─> No eligible leader! Partition UNAVAILABLE
Options:
1. Wait for Broker 1 to come back (may be hours/days)
2. Trigger unclean leader election
Unclean election:
├─> Elect Broker 2 as leader (even though not in-sync)
├─> Partition available again ✓
└─> Data written to Broker 1 is LOST ✗
Critical decision:
Availability vs Durability trade-off
├─ Trigger unclean election: Availability wins (data loss acceptable)
└─ Wait for proper leader: Durability wins (downtime acceptable)
Use cases:
- Rebalance load across brokers
- Isolate noisy neighbors
- Decommission broker
- Add replicas for durability
Example: Adding new broker to cluster
// Scenario: Had 1 broker (ID=0), added new broker (ID=1)
// Want to spread replicas across both
Map<TopicPartition, Optional<NewPartitionReassignment>> reassignment =
new HashMap<>();
// Partition 0: Add replica to new broker, keep leader on old broker
reassignment.put(new TopicPartition("demo-topic", 0),
Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1))));
// Order matters: first = preferred leader
// Partition 1: Move entirely to new broker
reassignment.put(new TopicPartition("demo-topic", 1),
Optional.of(new NewPartitionReassignment(Arrays.asList(1))));
// Partition 2: Add replica on new broker, make it preferred leader
reassignment.put(new TopicPartition("demo-topic", 2),
Optional.of(new NewPartitionReassignment(Arrays.asList(1, 0))));
// New broker first = preferred leader
// Partition 3: Cancel any ongoing reassignment
reassignment.put(new TopicPartition("demo-topic", 3),
Optional.empty());
admin.alterPartitionReassignments(reassignment).all().get();
// Check progress
Map<TopicPartition, PartitionReassignment> ongoing =
admin.listPartitionReassignments().reassignments().get();
System.out.println("Currently reassigning: " + ongoing);What happens:
Reassignment process:
1. New replicas start catching up (fetch from leader)
2. Once caught up, new replicas added to ISR
3. Old replicas (if any) removed
4. Leader changes (if needed) after next preferred election
Timeline:
├─ Start: [Old replica locations]
├─> Copying data: [Old replicas + New replicas]
├─> Caught up: [New replicas in ISR]
└─> Complete: [New replica locations]
Note: Large partitions take time to copy!
Use quotas to throttle if needed.
The blocking pattern (simple but slow):
DescribeTopicsResult result = admin.describeTopics(topics);
TopicDescription desc = result.values().get("my-topic").get(); // BLOCKSThe async pattern (efficient for servers):
// Example: HTTP server that describes topics
vertx.createHttpServer().requestHandler(request -> {
String topic = request.getParam("topic");
int timeout = Integer.parseInt(request.getParam("timeout"));
DescribeTopicsResult result = admin.describeTopics(
Collections.singletonList(topic),
new DescribeTopicsOptions().timeoutMs(timeout));
// Attach callback instead of blocking
result.values().get(topic).whenComplete(
(topicDescription, throwable) -> {
if (throwable != null) {
request.response().end(
"Error: " + throwable.getMessage());
} else {
request.response().end(
topicDescription.toString());
}
});
}).listen(8080);Why this matters:
Blocking approach:
Request 1: Describe topic (timeout=60s) → BLOCKS thread for 60s
Request 2: Describe topic (timeout=5s) → Waits behind Request 1
└─> Takes 60s+ even with 5s timeout!
Async approach:
Request 1: Describe topic (timeout=60s) → Returns Future
Request 2: Describe topic (timeout=5s) → Returns Future
└─> Both execute concurrently
└─> Request 2 completes in ~5s ✓
💡 Insight
Use blocking pattern for batch scripts and CLI tools. Use async pattern for servers and applications handling concurrent requests. The API supports both styles equally well.
MockAdminClient: Test without real Kafka
Example class to test:
public class TopicCreator {
private final AdminClient admin;
public TopicCreator(AdminClient admin) {
this.admin = admin;
}
public void maybeCreateTopic(String topicName)
throws ExecutionException, InterruptedException {
if (topicName.toLowerCase().startsWith("test")) {
Collection<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(topicName, 1, (short) 1));
admin.createTopics(topics);
// Also make it compacted
ConfigResource configResource = new ConfigResource(
ConfigResource.Type.TOPIC, topicName);
ConfigEntry compaction = new ConfigEntry(
TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
Collection<AlterConfigOp> configOp = new ArrayList<>();
configOp.add(new AlterConfigOp(compaction, AlterConfigOp.OpType.SET));
Map<ConfigResource, Collection<AlterConfigOp>> alterConf =
new HashMap<>();
alterConf.put(configResource, configOp);
admin.incrementalAlterConfigs(alterConf).all().get();
}
}
}Setting up the mock:
@Before
public void setUp() {
Node broker = new Node(0, "localhost", 9092);
this.admin = spy(new MockAdminClient(
Collections.singletonList(broker), broker));
// MockAdminClient doesn't implement incrementalAlterConfigs
// Need to mock it to avoid UnsupportedOperationException
AlterConfigsResult emptyResult = mock(AlterConfigsResult.class);
doReturn(KafkaFuture.completedFuture(null))
.when(emptyResult).all();
doReturn(emptyResult)
.when(admin).incrementalAlterConfigs(any());
}Writing tests:
@Test
public void testCreateTestTopic()
throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(admin);
tc.maybeCreateTopic("test.is.a.test.topic");
// Verify createTopics was called once
verify(admin, times(1)).createTopics(any());
}
@Test
public void testNotTestTopic()
throws ExecutionException, InterruptedException {
TopicCreator tc = new TopicCreator(admin);
tc.maybeCreateTopic("not.a.test");
// Verify createTopics was never called
verify(admin, never()).createTopics(any());
}MockAdminClient capabilities:
Mocked (works):
├─ createTopics() → Following listTopics() returns them
├─ listTopics() → Returns topics "created" with createTopics()
└─> Enough for many test scenarios
Not mocked (may throw):
└─> incrementalAlterConfigs() → Can inject your own implementation
Maven dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>What we learned:
-
AdminClient Basics:
- Asynchronous API (returns Futures immediately)
- Eventually consistent (changes take time to propagate)
- Flat hierarchy (everything in one class)
-
Topic Management:
- List, describe, create, delete topics
- Validate configurations programmatically
- Check and create topics on application startup
-
Configuration Management:
- Read and modify broker/topic configurations
- Filter non-default configurations
- Use during emergencies to dump/restore configs
-
Consumer Group Management:
- List and describe consumer groups
- Calculate consumer lag
- Modify offsets for reprocessing (reset to beginning/specific time)
-
Advanced Operations:
- Add partitions (breaks key-to-partition mapping!)
- Delete records (compliance with retention laws)
- Trigger leader elections (preferred or unclean)
- Reassign replicas (rebalance load, decommission brokers)
-
Working with Futures:
- Blocking: Simple, suitable for scripts/CLI
- Async: Efficient, suitable for servers
-
Testing:
- MockAdminClient for unit tests
- Some methods mocked, others need manual injection
Key takeaway: AdminClient is the Swiss Army knife for Kafka operations. It's essential for application developers who need dynamic topic management and for SREs who need to build tooling and automation. Learn it before you need it in an emergency!
Previous: Chapter 4: Kafka Consumers | Next: Chapter 6: Kafka Internals →