Skip to content

Latest commit

 

History

History
772 lines (598 loc) · 27 KB

File metadata and controls

772 lines (598 loc) · 27 KB

6. Kafka Internals

In plain English: Understanding Kafka's internals is like knowing how your car engine works - you don't need it to drive, but when something goes wrong or you want to tune performance, this knowledge becomes invaluable.

In technical terms: Kafka internals cover the controller mechanism, replication protocols, request processing pipelines, and storage architecture that enable Kafka's distributed, fault-tolerant operation.

Why it matters: Deep knowledge of internals helps you troubleshoot production issues faster, optimize performance with precision, and understand why Kafka behaves the way it does under different conditions.


Table of Contents

  1. Introduction
  2. Cluster Membership
  3. The Controller
  4. Replication
  5. Request Processing
  6. Physical Storage
  7. Summary

1. Introduction

In plain English: Think of Kafka's internals like the inner workings of a post office - you can send and receive mail without knowing how sorting machines work, but understanding the mechanisms helps you optimize delivery times and troubleshoot when letters go missing.

While you don't need to understand Kafka's internals to run it in production or write applications, this knowledge provides crucial context when troubleshooting or optimizing performance. This chapter focuses on topics especially relevant to Kafka practitioners:

  • The Kafka controller - The brain coordinating cluster operations
  • Replication mechanics - How data stays safe across failures
  • Request handling - How producers and consumers communicate with brokers
  • Storage architecture - How Kafka stores and indexes data on disk

💡 Insight

Understanding internals transforms random configuration changes into precise tuning. When you know the mechanisms behind the knobs, you can make informed decisions rather than trial-and-error adjustments.


2. Cluster Membership

In plain English: Cluster membership is like a company's employee roster - everyone needs to know who's currently working and who has left, so work can be properly distributed.

Kafka uses Apache ZooKeeper to maintain the list of active brokers. Here's how it works:

Broker Registration Process:

1. Broker starts → Creates ephemeral node in ZooKeeper with unique ID
2. Other components watch /brokers/ids path
3. They get notified when brokers join or leave
4. Cluster adjusts automatically

Key behaviors:

  • Each broker has a unique identifier (configured or auto-generated)
  • Attempting to start a broker with duplicate ID causes an error
  • When a broker loses ZooKeeper connectivity, its ephemeral node disappears
  • Other components automatically detect the broker's departure

💡 Insight

The ephemeral node pattern is elegant: if a broker crashes or loses network connectivity, ZooKeeper automatically removes its registration. No manual cleanup needed - the distributed system heals itself.

Important detail: When a broker goes offline, its node disappears but its broker ID persists in other data structures (like replica lists). This means if you start a new broker with the same ID, it immediately takes over the old broker's partitions and topics.


3. The Controller

In plain English: The controller is like a manager in an organization - while everyone has their regular job, one person coordinates big decisions like who leads which project and what happens when someone leaves.

3.1. Controller Election

The controller is one Kafka broker with special responsibilities beyond normal broker functions. Its primary job: electing partition leaders.

Election process:

1. First broker to start creates /controller node in ZooKeeper
2. Other brokers try to create the same node
3. They receive "node already exists" error
4. They realize a controller already exists
5. They create a watch on /controller to monitor changes

Failover mechanism:

When controller stops or loses ZooKeeper connectivity:
1. Its ephemeral /controller node disappears
2. Other brokers are notified via their watches
3. They race to create the new /controller node
4. First to succeed becomes the new controller
5. Others receive "node already exists" and watch the new controller

💡 Insight

The controller epoch number acts as zombie fencing - like version numbers that let brokers ignore outdated commands from old controllers that don't know they've been replaced.

3.2. Controller Responsibilities

When controller starts:

  1. Reads latest cluster state from ZooKeeper (async for speed)
  2. Builds replica state map in memory
  3. Begins managing metadata and leader elections

When a broker leaves:

Controller Action Flow:
1. Detects broker departure (ZooKeeper watch or ControlledShutdownRequest)
2. Identifies partitions that need new leaders
3. Determines new leaders (next replica in ISR list)
4. Persists new state to ZooKeeper (async, pipelined for speed)
5. Sends LeaderAndISR requests to affected brokers
   → New leader: "Start serving client requests"
   → Followers: "Start replicating from new leader"
6. Sends UpdateMetadata to all brokers
   → Updates their metadata caches

When a broker joins:

  • Similar process, but all replicas start as followers
  • They must catch up before becoming leader-eligible

3.3. KRaft: Kafka's New Raft-Based Controller

In plain English: KRaft is like replacing a company's external HR department with an internal team that uses the same communication system as everyone else - simpler, faster, and more integrated.

Why replace ZooKeeper?

The ZooKeeper-based controller had several limitations:

  1. Metadata inconsistency - Asynchronous updates between ZooKeeper, controller, and brokers created edge cases
  2. Slow restarts - Controller had to reload all metadata from ZooKeeper
  3. Complex architecture - Metadata ownership split between controller, brokers, and ZooKeeper
  4. Operational overhead - Teams needed expertise in two distributed systems

KRaft's architecture:

Old (ZooKeeper-based):
External System (ZooKeeper) ← Controller → Brokers
                ↓
        Complex, Slow, Error-prone

New (KRaft):
Controller Quorum (using Raft) → Brokers
                ↓
        Integrated, Fast, Consistent

Key improvements:

  1. Metadata as event log - All cluster metadata stored as stream of events
  2. Raft-based consensus - Controller nodes elect leader without external system
  3. Broker fetch model - Brokers pull updates instead of controller pushing
  4. Persistent metadata - Brokers store metadata on disk for fast startup
  5. Fenced state - Prevents out-of-date brokers from serving stale data

💡 Insight

KRaft applies Kafka's own log-based architecture to metadata management. The lesson: when your core competency is event streaming, use it everywhere - even for managing yourself.


4. Replication

In plain English: Replication is like having backup copies of important documents in different filing cabinets - if one cabinet burns down, you haven't lost anything critical.

Replication is at the heart of Kafka's reliability. Every partition can have multiple replicas stored on different brokers.

4.1. Leader and Follower Replicas

Two types of replicas:

Leader replica:

  • One per partition
  • Handles all produce requests (writes)
  • Usually handles consume requests (reads)
  • Guarantees consistency

Follower replicas:

  • All other replicas for the partition
  • Replicate messages from leader
  • Stay synchronized with leader
  • Become new leader if current leader fails

Visual representation:

Partition 0
├── Leader (Broker 1)    ← All writes go here
├── Follower (Broker 2)  ← Copies from leader
└── Follower (Broker 3)  ← Copies from leader

If Broker 1 fails:
Partition 0
├── Leader (Broker 2)    ← Promoted follower
└── Follower (Broker 3)  ← Continues copying

💡 Insight

Reading from followers reduces network costs when consumers are geographically distant from the leader. The tradeoff: slightly higher latency due to replication delay.

4.2. In-Sync Replicas

In plain English: An in-sync replica is like a team member who's caught up on all the meeting notes - they're ready to step in as lead at any moment.

A replica is considered in-sync if:

  1. Has active ZooKeeper session (heartbeat within 6 seconds, configurable)
  2. Fetched messages from leader in last 10 seconds (configurable)
  3. Fetched the most recent messages (caught up within 10 seconds)

The fetch mechanism:

Follower → Leader: "Send me messages starting from offset 1000"
Leader → Follower: [Messages 1000-1050]
Follower → Leader: "Send me messages starting from offset 1051"

Leader tracks each follower's progress
↓
Knows exactly how far behind each replica is

Falling out of sync:

Replica becomes out-of-sync if:
- Hasn't fetched in 10+ seconds
- Fetched but couldn't catch up in 10+ seconds
- Lost ZooKeeper connection

Getting back in sync:
1. Reconnect to ZooKeeper
2. Catch up to latest leader message
3. Usually quick after temporary issues

💡 Insight

Out-of-sync replicas don't slow down producers and consumers - the cluster stops waiting for them. But this reduces effective replication factor, increasing risk of data loss if more brokers fail.

Important configurations:

  • replica.lag.time.max.ms - How long a replica can lag before considered out-of-sync (default: 30 seconds)
  • Lower effective replication = higher risk but no performance impact
  • Higher lag time = more tolerance but longer potential data loss window

4.3. Preferred Leaders

In plain English: A preferred leader is like someone's home desk in a hot-desking office - they can work anywhere, but they're most efficient at their assigned spot.

How it works:

  • Each partition has a preferred leader - the replica that was leader when topic was created
  • Preferred leaders are initially balanced across brokers
  • Automatic rebalancing (auto.leader.rebalance.enable=true) moves leadership back to preferred leaders
  • This maintains balanced load across the cluster

Finding preferred leaders:

Replica list for partition: [Broker 3, Broker 1, Broker 5]
                                     ↑
                            This is preferred leader
                       (always first in the list)

💡 Insight

When manually reassigning replicas, the order matters. The first replica you specify becomes the preferred leader, so distribute them carefully to avoid overloading specific brokers.


5. Request Processing

In plain English: Request processing is like a restaurant's kitchen operation - orders come in, get queued, prepared by chefs, and served back to customers, all following a specific flow.

5.1. How Requests Flow

All Kafka broker operations involve processing requests from clients, replicas, and the controller. Kafka uses a binary protocol over TCP.

Request header structure:

  • Request type (API key)
  • Request version (for compatibility)
  • Correlation ID (for troubleshooting)
  • Client ID (identifies the application)

Processing pipeline:

Client Connection
        ↓
Acceptor Thread (per port)
        ↓
Processor/Network Thread (configurable number)
        ↓
Request Queue
        ↓
I/O/Request Handler Threads
        ↓
Response Queue
        ↓
Network Thread
        ↓
Client Response

Visual flow:

┌─────────────┐
│   Clients   │
└──────┬──────┘
       │
   ┌───▼────────────────┐
   │  Acceptor Thread   │
   └───┬────────────────┘
       │
   ┌───▼────────────────┐
   │ Network Threads    │
   └───┬────────────────┘
       │
   ┌───▼────────────────┐
   │  Request Queue     │
   └───┬────────────────┘
       │
   ┌───▼────────────────┐
   │ I/O Handler Threads│
   └───┬────────────────┘
       │
   ┌───▼────────────────┐
   │  Response Queue    │
   │  (+ Purgatory)     │
   └───┬────────────────┘
       │
   ┌───▼────────────────┐
   │ Network Threads    │
   └───┬────────────────┘
       │
   ┌───▼────────────────┐
   │   Clients          │
   └────────────────────┘

💡 Insight

Requests are processed in order from each client connection. This ordering guarantee is fundamental to Kafka's message queue behavior and delivery guarantees.

Common request types:

  1. Produce requests - From producers with messages to write
  2. Fetch requests - From consumers and followers reading messages
  3. Admin requests - Metadata operations like creating topics
  4. Metadata requests - Discovering partition leaders and cluster topology

5.2. Produce Requests

Client routing:

Producer must send to correct broker:
1. Send metadata request to any broker
2. Receive partition leader information
3. Send produce request to leader broker
4. Wrong broker returns "Not a Leader" error

Produce request processing:

Step 1: Validation
├── Check user write privileges
├── Validate acks value (0, 1, or "all")
└── If acks=all, check sufficient in-sync replicas

Step 2: Write to Disk
├── Write to filesystem cache (not physical disk)
└── Rely on replication for durability

Step 3: Acknowledgment
├── If acks=0 or 1: Respond immediately
└── If acks=all: Wait for follower replication
    ├── Store in purgatory buffer
    ├── Wait for all in-sync replicas
    └── Send response when replicated

💡 Insight

Kafka doesn't wait for disk persistence before acknowledging - it relies on replication. This is why replication factor matters more than disk reliability for data durability.

5.3. Fetch Requests

In plain English: Fetch requests are like placing an order at a restaurant - you specify what you want, set limits on portion size, and can even say "I'll wait if you need time to prepare more."

Request structure:

Fetch Request:
├── Topics and partitions to read from
├── Starting offset for each partition
├── Maximum bytes per partition (memory limit)
└── Minimum bytes to return (optional)
└── Maximum wait time (optional)

Processing flow:

1. Validate request arrives at partition leader
2. Check offset exists and is accessible
3. Read messages up to client's limit
4. Send using zero-copy (file → network, no buffers)

Zero-copy optimization:

Traditional:
File → Kernel Buffer → Application Buffer → Network Buffer → Network

Kafka (zero-copy):
File → Network Channel
       (Direct transfer, no intermediate buffers)

Minimum bytes configuration:

Without min bytes:
Client: "Any data?" (every few ms)
Broker: "Here's 1 message"
         ↓
    High CPU/network overhead

With min bytes (e.g., 10KB):
Client: "Tell me when you have 10KB or timeout"
Broker: <waits, accumulates data>
Broker: "Here's 10KB"
         ↓
    Much more efficient

💡 Insight

The min bytes and timeout parameters create a powerful batching mechanism. Consumers can choose between low latency (small min bytes) and high efficiency (larger min bytes).

High-water mark restriction:

Messages available to consumers:
├── Partition has offsets 0-1000
├── Only 0-800 replicated to all in-sync replicas
└── Consumers can only read 0-800

Why? Safety guarantee - unreplicated messages
might disappear if leader fails

Fetch sessions:

  • Consumers cache partition metadata across requests
  • Incremental fetch requests only send changes
  • Reduces overhead for consumers with many partitions
  • Broker can evict sessions if memory constrained

5.4. Other Requests

The Kafka protocol includes 61+ request types (and growing):

Consumer coordination:

  • 15 request types for group formation and coordination
  • Offset commit and fetch requests
  • Group membership management

Metadata management:

  • CreateTopic, DeleteTopic
  • AlterConfig, DescribeConfig
  • Replica reassignment

Internal broker communication:

  • LeaderAndIsr (controller to brokers)
  • UpdateMetadata (partition leadership changes)
  • ControlledShutdown (graceful broker shutdown)

💡 Insight

Protocol evolution uses versioning: old clients send version N requests, new brokers respond with version N responses. Always upgrade brokers before clients - new brokers understand old protocols, but not vice versa.


6. Physical Storage

In plain English: Physical storage is like a library's filing system - how books are organized on shelves, indexed for quick lookup, and periodically weeded to make room for new acquisitions.

6.1. Partition Allocation

Basic unit: A partition replica is the smallest storage unit. Partitions cannot split across brokers or disks.

Configuration:

  • log.dirs - List of directories for partition storage
  • Each directory typically represents a mount point (single disk or RAID)

Allocation goals when creating a topic:

Example: 6 brokers, topic with 10 partitions, replication factor 3
         = 30 partition replicas to allocate

Goals:
1. Spread replicas evenly (5 per broker)
2. Each partition's replicas on different brokers
3. If rack-aware, replicas on different racks

Allocation algorithm:

Step 1: Choose random starting broker (e.g., Broker 4)
Step 2: Assign leaders round-robin
├── Partition 0 leader → Broker 4
├── Partition 1 leader → Broker 5
├── Partition 2 leader → Broker 0
└── (wrap around at broker 6)

Step 3: Assign followers at increasing offsets
├── Partition 0: Leader=4, Followers=5,0
├── Partition 1: Leader=5, Followers=0,1
└── Partition 2: Leader=0, Followers=1,2

Rack-aware allocation:

Brokers: [0,1] on Rack A, [2,3] on Rack B
Ordering: [0,2,1,3] (alternates racks)

Result:
├── Partition 0: Broker 0 (Rack A), Broker 2 (Rack B)
└── Entire rack failure → Still have replicas

💡 Insight

Partition allocation doesn't consider available space or current load. If brokers have different disk sizes or some partitions are huge, manual intervention may be needed.

Disk allocation within broker:

  • Count partitions on each directory
  • Add new partition to directory with fewest partitions
  • New disk gets all new partitions until balanced

6.2. File Management

Retention mechanism:

Retention options:
1. Time-based: Keep 7 days of data
2. Size-based: Keep 1GB per partition
3. Log compaction: Keep latest value per key

Segment strategy:

Partition = Multiple segments
├── Active segment (currently writing)
├── Closed segment (sealed, eligible for deletion)
└── Closed segment (sealed, eligible for deletion)

Default limits:
├── 1 GB per segment, OR
└── 1 week of data

Retention example:

If retention = 1 day, segment = 5 days of data:
├── Segment won't close until 5 days pass
└── Actually keeps 5 days (not 1)

Active segment never deleted!

💡 Insight

Brokers keep open file handles for all segments (even inactive ones). High partition counts require OS tuning to allow many open files.

6.3. File Format

In plain English: The file format is like the structure of entries in a ledger - each entry has a standard format, with some overhead for the structure and most space for the actual data.

Key principle: Wire format = Disk format

  • Enables zero-copy optimization
  • Allows producer compression to work end-to-end
  • Changing format requires updating both protocol and storage

Message structure (v2 format):

Batch Header (96 bits overhead):
├── Magic number (format version)
├── Offset range (first and last in batch)
├── Timestamps (first and max in batch)
├── Size in bytes
├── Leader epoch
├── Checksum
├── Attributes (compression, timestamp type, transactional)
├── Producer ID, epoch, sequence (for exactly-once)
└── Record set

Record (minimal overhead per message):
├── Size
├── Attributes (currently unused)
├── Offset delta (from first in batch)
├── Timestamp delta (from first in batch)
└── User payload (key, value, headers)

💡 Insight

Batching efficiency: system information is mostly at batch level, not per record. Larger batches dramatically reduce overhead as a percentage of total size.

Compression benefits:

Larger batches = Better compression
├── More similar data to compress together
├── Compression algorithms work better on more data
└── Network and disk savings multiply

Viewing log segments:

bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
    --deep-iteration \
    --files /path/to/segment

# Shows message contents, including compressed messages

6.4. Indexes

In plain English: Indexes are like the table of contents in a book - instead of reading every page to find a topic, you jump directly to the right section.

Two index types:

  1. Offset index - Maps offsets to file positions
  2. Timestamp index - Maps timestamps to offsets (for time-based search)

How offset index works:

Consumer asks for offset 100:
1. Check index: offset 100 → segment file X, position Y
2. Seek to position Y in file
3. Start reading from there

Index properties:

  • Broken into segments (like log files)
  • Deleted when corresponding log segment is purged
  • No checksums (will regenerate if corrupted)
  • Safe to delete (will regenerate on restart)

💡 Insight

Index corruption recovery can be slow but is safe. Kafka rebuilds indexes from message logs, which are the source of truth. Lengthy recovery times with millions of partitions are the tradeoff for safety.

6.5. Compaction

In plain English: Compaction is like cleaning out a filing cabinet - instead of keeping every revision of a document, you only keep the latest version, saving space while preserving current information.

Use cases:

  1. Storing current state (shipping addresses, app state)
  2. Change data capture (keep latest database row)
  3. GDPR compliance (delete old data after retention period)

Retention policies:

  • delete - Remove events older than retention time
  • compact - Keep only latest value per key
  • compact,delete - Compact + remove old records

Compaction structure:

Partition:
├── Clean portion (already compacted)
│   └── One value per key (most recent)
└── Dirty portion (since last compaction)
    └── Multiple values per key possible

Compaction algorithm:

Step 1: Build offset map (dirty section)
├── Hash(key) → Latest offset for this key
├── 16-byte hash + 8-byte offset = 24 bytes/entry
└── 1GB segment ≈ 24MB map (very efficient!)

Step 2: Clean segments
├── Read from oldest clean segment
├── For each message:
│   ├── If key NOT in map → Copy to replacement
│   └── If key IS in map → Skip (newer version exists)
└── Swap replacement for original

Result: One message per key (the latest)

Memory efficiency:

1 GB segment, 1 KB messages:
├── ~1 million messages
├── Offset map needs ~24 MB
└── Can compact with modest memory

💡 Insight

Compaction's algorithm is beautifully efficient. The offset map uses fixed space per key regardless of message size, making it practical even for large datasets.

Tombstones (deletion):

To delete key completely:
1. Produce message with key and NULL value
2. Compaction keeps tombstone temporarily
3. Consumers see deletion marker
4. After min time, tombstone removed
5. Key no longer exists in partition

Timing configuration:
├── delete.retention.ms (how long to keep tombstone)
└── Give consumers time to see the deletion

Compaction triggers:

  • Active segment never compacted
  • Default: compact when 50% of topic is dirty
  • min.compaction.lag.ms - Minimum message age before eligible
  • max.compaction.lag.ms - Maximum delay before must compact (GDPR compliance)

7. Summary

What we learned:

  1. Cluster Membership - ZooKeeper's ephemeral nodes track active brokers, enabling automatic detection of failures and self-healing behavior

  2. The Controller - One broker coordinates cluster operations, managing leader elections and metadata. KRaft replaces ZooKeeper with a Raft-based controller for simpler, faster operation

  3. Replication - Leaders handle writes, followers replicate. In-sync replicas are ready to become leaders. Understanding ISR is crucial for reliability tuning

  4. Request Processing - A sophisticated pipeline routes requests through acceptor, network, and I/O threads. Zero-copy and batching enable high throughput

  5. Physical Storage - Partitions split into segments, indexed for fast access. Compaction enables Kafka as a long-term state store

Key takeaway: Kafka's internals reveal elegant distributed systems patterns - ephemeral nodes for membership, leader election via consensus, zero-copy for performance, and log-based everything. Each mechanism solves a specific problem while contributing to the whole system's reliability and performance.

Pro tip: When facing production issues, trace through these internals:

  • Broker down? Check controller logs for leader election
  • Slow consumers? Examine fetch session cache and zero-copy path
  • Data loss? Verify in-sync replicas and replication lag
  • Disk full? Review segment rolling and compaction settings

Previous: Chapter 5: Managing Apache Kafka Programmatically | Next: Chapter 7: Reliable Data Delivery →