Kafka Study Guide
Covers: Topics & Partitions, Producer Partitioning, Consumer Groups & Assignment, Replication & Durability, Offset Management, Compacted Topics & CDC, Exactly-Once Semantics Language: Java
1. Topics vs Partitions
| Aspect | Topic | Partition |
|---|---|---|
| Nature | Logical abstraction (the "what") | Physical log segment on disk (the "how") |
| Ordering | No global order | Strict ordering guaranteed within partition |
| Replication | Configured at topic level | Each partition has N replicas |
| Parallelism | Defined by partition count | One consumer per partition per group |
| Scalability unit | N/A | Each partition has one broker leader |
Key insight: Partition count is irreversible (can increase, not decrease). Over-partitioning causes:
- Leader election overhead
- 5MB memory buffer per partition per broker
- Slower rebalancing
Rule of thumb: partitions = max expected consumers × 2–3, backed by throughput math.
Hard Limits to Know
| Limit | Value | Notes |
|---|---|---|
| Throughput | Millions of msgs/sec per broker | LinkedIn processes 7 trillion messages/day across their cluster |
| Default message size | 1MB | Configurable up to 10MB+, but large messages hurt throughput significantly |
| Partitions per broker | ~4K practical limit | More partitions = more memory + slower failover |
| Consumer lag tolerance | Hours or days behind | Catches up without loss within retention window |
| Retention | Configurable by time or size | e.g. 7 days or 100GB per partition — kept regardless of consumption |
| End-to-end latency | 5–15ms at acks=1 | 10–30ms at acks=all due to replication wait |
2. Who Partitions Messages? The Partitioner
The Producer decides which partition to send to — not Kafka, not the broker. The broker just accepts and stores.
Producer → [Partitioner] → decides partition number → sends to correct broker
Scenario 1: Key Provided
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // topic
"user-123", // key
"{amount:500}" // value
);
producer.send(record);
// Kafka runs: partition = murmur2Hash("user-123") % numberOfPartitions
// "user-123" ALWAYS goes to the same partition → ordering guaranteed
Guarantee: Same key → always same partition → ordered processing for that key.
Scenario 2: No Key (null)
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders",
null, // no key
"{amount:500}"
);
// Kafka 2.4+: Sticky Partitioner (default)
// Fills one partition until linger.ms expires or batch.size reached → then picks new partition
// Before 2.4: pure round-robin (P0 → P1 → P2 → ...)
Why Sticky beats Round Robin: Round-robin creates tiny batches (bad throughput). Sticky fills full batches first → better compression and throughput.
Scenario 3: Custom Partitioner
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
String orderType = extractOrderType(value.toString());
// VIP orders → dedicated partition 0
if ("VIP".equals(orderType)) {
return 0;
}
// Everyone else → spread across remaining partitions
return (murmur2Hash(keyBytes) % (numPartitions - 1)) + 1;
}
}
// Register it:
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());
Partitioner Decision Tree
Message arrives at producer
│
▼
Has a key?
├── YES → hash(key) % numPartitions → same partition every time
│
└── NO → Sticky Partitioner
├── linger.ms not expired AND batch not full? → stay on same partition
└── linger.ms expired OR batch full? → pick new partition
Key Design Cheat Sheet
| Goal | Key Strategy |
|---|---|
| Strict ordering per entity | Use entity ID (userId, orderId) |
| Max throughput, no ordering needed | null key (round-robin/sticky) |
| Co-locate related events | Same key for related event types |
| Avoid hot partitions | Composite key with time bucket or shard suffix |
| Transactional grouping | Same key = same partition = same consumer |
The Hot Partition Trap
// BAD — "US" = 80% of traffic → one partition overwhelmed
String key = order.getCountry();
// GOOD — composite key spreads load
String key = order.getCountry() + "-" + (order.getUserId() % 10);
// "US-0", "US-1" ... "US-9" → spread across 10 effective partitions
// Tradeoff: lose strict global ordering for US (keep ordering within a shard)
3. linger.ms and Batch Sending
Think of it like a bus departure schedule — how long the bus waits before leaving, hoping more passengers arrive.
props.put(ProducerConfig.LINGER_MS_CONFIG, 0); // leave immediately (default) — low latency
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // wait 5ms — better throughput
props.put(ProducerConfig.LINGER_MS_CONFIG, 20); // wait 20ms — even bigger batches, higher latency
Two Triggers (whichever comes first)
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // Trigger 1: timer expires
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // Trigger 2: 16KB batch full
Timeline with Sticky Partitioner + linger.ms=5
t=0ms: msg1 arrives → goes to P2 → start 5ms timer
t=1ms: msg2 arrives → also P2 (sticky — same partition, within 5ms window)
t=3ms: msg3 arrives → also P2 (sticky)
t=5ms: TIMER EXPIRES → send batch [msg1, msg2, msg3] to P2 together
t=5ms: pick new partition P4 → start fresh
Real World Tuning
| System | linger.ms | Reason |
|---|---|---|
| Stripe payments | 0 | Every ms counts |
| Walmart analytics | 20 | Throughput > latency |
| IoT sensors | 50 | Millions of tiny msgs, batch aggressively |
| Audit logs | 10 | Balanced |
4. VIP Partitions — When and Why
How VIP Partitioning Works
// VIP consumer: manually assigned to partition 0 — dedicated, no rebalancing
KafkaConsumer<String, String> vipConsumer = new KafkaConsumer<>(props);
vipConsumer.assign(List.of(new TopicPartition("orders", 0)));
// Polls with smallest interval, highest priority processing
// Regular consumers: get P1-P5 via normal group assignment
KafkaConsumer<String, String> regularConsumer = new KafkaConsumer<>(props);
regularConsumer.subscribe(List.of("orders")); // coordinator assigns P1-P5
When VIP Partition Makes Sense
- Topic has genuinely mixed priority traffic
- SLA difference is significant (VIP < 100ms, regular < 5s)
- You can reliably classify messages as VIP at produce time
When It's Overengineering
- All messages have the same SLA
- VIP traffic is < 1% of volume (dedicated consumer sits idle 99% of the time)
- Can't reliably classify VIP at produce time
The Better Pattern: Separate Topics
// Separate topics beat VIP partitions for true priority differences
producer.send(new ProducerRecord<>("orders-vip", key, value));
producer.send(new ProducerRecord<>("orders-standard", key, value));
producer.send(new ProducerRecord<>("orders-bulk", key, value));
Why separate topics win:
- Independent scaling — scale VIP consumers without touching standard
- Independent retention — VIP 30 days, bulk 3 days
- Independent replication — VIP RF=3, bulk RF=2 (cost saving)
- Cleaner monitoring — lag alerts per topic, not per partition
- Simpler consumer code — no "is this VIP?" check
Decision Framework
Same message type, mixed priority?
└── Small SLA difference → same topic, custom partitioner
└── Large SLA difference → separate topics
Multiple message types?
└── Always separate topics
Same SLA for everything?
└── No VIP needed — key-based partitioning for ordering only
5. Consumer Groups & Partition Assignment
The Core Rule
One partition can only be consumed by ONE consumer within a group — but one consumer can hold multiple partitions.
// Consumer subscribes to a TOPIC, not a partition — Kafka assigns partitions
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors"); // group is key
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders")); // Kafka decides which partitions you get
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Partition: %d | Offset: %d | Key: %s%n",
record.partition(), record.offset(), record.key());
}
}
How Assignment Works Internally
Step 1: Consumer joins group → sends JoinGroup to Group Coordinator broker
Step 2: One consumer elected "Group Leader" (first to join)
Step 3: Group Leader runs PartitionAssignor algorithm
Step 4: Leader sends assignments back to Coordinator
Step 5: Coordinator distributes via SyncGroup response to each consumer
Three Assignment Strategies
RangeAssignor (default — can be uneven):
Topic A: P0,P1,P2 | Topic B: P0,P1,P2 | 2 Consumers
Consumer 1 → A-P0, A-P1, B-P0, B-P1 (4 partitions)
Consumer 2 → A-P2, B-P2 (2 partitions) ← UNEVEN
RoundRobinAssignor:
Consumer 1 → A-P0, A-P2, B-P1 (3 partitions)
Consumer 2 → A-P1, B-P0, B-P2 (3 partitions) ← balanced
CooperativeStickyAssignor (use in production):
Consumer3 dies.
Eager (old): STOP ALL → reassign everything → resume ← stop-the-world!
Cooperative: ONLY P4,P5 revoked → redistributed → rest keep running ✓
Rebalance Listener — Walmart Inventory Example
consumer.subscribe(List.of("inventory-updates"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
consumer.commitSync(); // commit before giving up partitions
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(tp ->
System.out.println("Now responsible for region: " +
PARTITION_TO_REGION.get(tp.partition())));
}
});
What Triggers a Rebalance
- Consumer joins (scale up)
- Consumer leaves gracefully (scale down)
- Consumer crashes (missed heartbeats)
- Topic partition count changes
Key Configs to Tune
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000"); // ping every 3s
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "45000"); // dead after 45s
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000"); // 5 min to process batch
// WARNING: if processing takes > max.poll.interval.ms → consumer kicked → rebalance triggered
Independent Consumer Groups Per Service
Each service gets its own group.id — meaning every service receives all events independently. They don't compete. Each maintains its own offset position in the log.
Topic: payment-events
│
├──► group.id="reconciliation-service" → owns its own offsets
├──► group.id="webhook-delivery-service" → owns its own offsets
└──► group.id="analytics-service" → owns its own offsets
Reconciliation being slow does NOT affect webhook delivery.
A slow analytics consumer does NOT cause any other service to lag.
// Reconciliation service
props.put(ConsumerConfig.GROUP_ID_CONFIG, "reconciliation-service");
// Webhook delivery service — completely independent
props.put(ConsumerConfig.GROUP_ID_CONFIG, "webhook-delivery-service");
// Analytics — can lag hours behind, nobody else is affected
props.put(ConsumerConfig.GROUP_ID_CONFIG, "analytics-service");
Real examples:
- Stripe uses consumer groups per service — webhook delivery, reconciliation, and analytics each consume the same payment events independently at their own pace.
- Uber publishes every GPS location update to Kafka. Multiple consumer groups (routing, surge pricing, analytics) read the same events independently without interfering with each other.
Consumer Scaling — Staged Model
Start with minimum consumers, scale up as lag grows, cap at partition count for maximum parallelism, optionally exceed it for hot standby.
Topic: payment-events — 5 partitions
Stage 1: Normal traffic — 1 consumer handles all 5 partitions
P0 ──┐
P1 ──┤
P2 ──┼──► Consumer A
P3 ──┤
P4 ──┘
Stage 2: Lag detected → scale up to 3 consumers
P0 ──► Consumer A
P1 ──► Consumer A
P2 ──► Consumer B
P3 ──► Consumer B
P4 ──► Consumer C
Stage 3: Peak traffic → 5 consumers → maximum parallelism
P0 ──► Consumer A
P1 ──► Consumer B
P2 ──► Consumer C
P3 ──► Consumer D
P4 ──► Consumer E
Stage 4: Hot standby → 7 consumers (2 extra as backup)
P0 ──► Consumer A
P1 ──► Consumer B
P2 ──► Consumer C
P3 ──► Consumer D
P4 ──► Consumer E
Consumer F ← idle, ready
Consumer G ← idle, ready
Consumer A crashes → Kafka immediately assigns P0 to Consumer F
No waiting for a new pod to spin up ✓
Who triggers each stage in production:
Stage 1→2→3 KEDA watches consumer lag metric → spins up pods automatically
Stage 4 Pre-provisioned hot standby, always running
Stage 3→2→1 Scale down after lag clears (cost saving)
The key rules:
- Partitions = the ceiling on parallelism (you set this, never changes automatically)
- Consumers = the workers (you scale these, Kafka assigns partitions to them)
- More consumers than partitions = idle consumers, but useful as hot standby
- Kafka only auto-handles who gets which partition as consumers come and go — everything else is your infrastructure
6. Replication & Durability (acks, ISR)
Replication Architecture
Topic: payments | Partition 0 | Replication Factor: 3
Broker 1 (Leader) ← Producers write HERE only
├──► Broker 2 (Follower / ISR) fetches from leader
└──► Broker 3 (Follower / ISR) fetches from leader
Consumers read from Leader (or followers with rack-aware config)
ISR — In-Sync Replicas
ISR = replicas that are caught up with leader within replica.lag.time.max.ms (default 30s).
ISR = {B1, B2, B3} ← healthy
Broker3 gets slow (GC pause, network):
ISR = {B1, B2} ← B3 kicked from ISR
Broker3 catches up:
ISR = {B1, B2, B3} ← rejoins
The acks Setting
// acks=0: Fire and forget
props.put(ProducerConfig.ACKS_CONFIG, "0");
// No wait for acknowledgment | Use: metrics, logs | Throughput: MAX | Durability: NONE | Latency: ~1ms
// acks=1: Leader only
props.put(ProducerConfig.ACKS_CONFIG, "1");
// Wait for leader to write | Risk: leader acks → dies → MESSAGE LOST
// Use: moderate importance | Throughput: HIGH | Durability: PARTIAL | Latency: 5–15ms
// acks=all: All ISR must acknowledge
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Wait for ALL ISR replicas | Use: payments, orders, financial
// Throughput: LOWER | Durability: STRONG | Latency: 10–30ms (replication overhead)
The Critical Combo: acks=all + min.insync.replicas
// Producer:
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Topic/Broker config: min.insync.replicas=2
// Replication Factor=3, min.insync.replicas=2
// ISR={B1,B2,B3} → writes succeed ✓
// ISR={B1,B2} → writes succeed ✓ (meets minimum of 2)
// ISR={B1} → writes FAIL with NotEnoughReplicasException ✗
// Better to reject than silently lose data!
Stripe Payment Producer — Full Example
public class StripePaymentProducer {
public StripePaymentProducer() {
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
this.producer = new KafkaProducer<>(props);
}
public CompletableFuture<RecordMetadata> sendPayment(String paymentId, String json) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"payments", paymentId, json // paymentId as key → ordering per payment
);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
producer.send(record, (metadata, ex) -> {
if (ex != null) future.completeExceptionally(ex);
else future.complete(metadata);
});
return future;
}
}
Leader Failure and Election
Broker1 (Leader) crashes
Controller detects via KRaft (within replica.lag.time.max.ms)
Only ISR members eligible → Broker2 elected as new leader
Producer gets LeaderNotAvailableException → retries → connects to Broker2
unclean.leader.election.enable=true ← DANGEROUS, never in finance
→ Out-of-sync replica can become leader → data loss possible
→ Only acceptable for log aggregation where some loss is OK
7. Offset Management
What Is an Offset?
Partition 0:
Offset 0: {"orderId":"A1","amount":100}
Offset 1: {"orderId":"A2","amount":200}
Offset 2: {"orderId":"A3","amount":300} ← consumer processed this
Offset 3: {"orderId":"A4","amount":400} ← next to read
Committed offset = 2 → "I have successfully processed up to offset 2"
Stored in internal topic: __consumer_offsets (50 partitions by default)
Auto Commit — The Danger
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
// PROBLEM TIMELINE:
// t=0s: Poll [offset 0,1,2,3,4]
// t=3s: Processing offset 2
// t=5s: AUTO COMMIT fires → commits offset 4 ← HAVEN'T PROCESSED 3,4 YET!
// t=6s: Consumer crashes
// t=7s: Restarts → reads from offset 5 → offsets 3,4 NEVER PROCESSED
Manual Commit — Production Standard
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
// Commit per record (safe, slower):
Map<TopicPartition, OffsetAndMetadata> offset = Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1) // +1 = next offset to read
);
consumer.commitSync(offset);
}
// OR commit entire batch (faster):
consumer.commitSync();
}
commitSync vs commitAsync
// commitSync: blocks, retries on failure — use for critical checkpoints
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// partition was revoked (rebalance) — handle gracefully
}
// commitAsync: non-blocking, does NOT retry (to avoid out-of-order commits)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) log.error("Async commit failed: {}", offsets, exception);
// Don't retry — a later commitAsync may have already succeeded
});
// BEST PRACTICE — hybrid (Stripe/Walmart scale):
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processRecords(records);
consumer.commitAsync(); // fast path: non-blocking during normal operation
}
} finally {
consumer.commitSync(); // on shutdown: blocking, ensures last offsets saved
consumer.close();
}
Offset Reset and Seeking
// First start or lost offsets:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // replay from beginning
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // only new messages
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); // throw exception
// Seek to specific offset (after bad deploy):
consumer.seek(new TopicPartition("payments", 2), 10500L);
// Seek by timestamp — replay last 1 hour (Walmart incident recovery):
Map<TopicPartition, Long> timestampMap = consumer.assignment().stream()
.collect(Collectors.toMap(tp -> tp,
tp -> System.currentTimeMillis() - Duration.ofHours(1).toMillis()));
consumer.offsetsForTimes(timestampMap).forEach((tp, offsetAndTs) -> {
if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});
8. Compacted Topics & CDC
Log Compaction vs Regular Retention
Regular topic (time-based retention):
[key=A val=1] [key=B val=2] [key=A val=3] [key=C val=4] [key=A val=5]
After 7 days → everything deleted
Compacted topic:
[key=A val=1] [key=B val=2] [key=A val=3] [key=C val=4] [key=A val=5]
After compaction:
[key=B val=2] [key=C val=4] [key=A val=5] ← only LATEST value per key
→ Like a changelog: current state of every entity, forever
Creating a Compacted Topic
NewTopic inventoryTopic = new NewTopic("inventory-state", 12, (short) 3);
inventoryTopic.configs(Map.of(
"cleanup.policy", "compact",
"min.cleanable.dirty.ratio", "0.1", // compact when 10% of log is dirty
"segment.ms", "3600000", // roll new segment every hour
"delete.retention.ms", "86400000" // tombstones kept 24h before deletion
));
AdminClient.create(props).createTopics(List.of(inventoryTopic));
Tombstone Records — Deleting from Compacted Topic
ProducerRecord<String, String> tombstone = new ProducerRecord<>(
"inventory-state",
"SKU-12345", // key
null // null value = tombstone = delete this key after delete.retention.ms
);
producer.send(tombstone);
CDC with Debezium — Walmart Inventory
MySQL inventory DB → Debezium (reads binlog) → Kafka compacted topic
│
┌─────────────────┼─────────────────┐
▼ ▼ ▼
Elasticsearch Redis Warehouse
(search index) (cache inval) (restock logic)
// Debezium CDC event:
{
"before": {"sku": "W123", "quantity": 50},
"after": {"sku": "W123", "quantity": 45},
"op": "u", // u=update, c=create, d=delete, r=read(snapshot)
"ts_ms": 1714000000000
}
// Consumer:
public void processCDCEvent(ConsumerRecord<String, String> record) {
JsonNode event = objectMapper.readTree(record.value());
switch (event.get("op").asText()) {
case "u":
int delta = event.get("after").get("quantity").asInt()
- event.get("before").get("quantity").asInt();
if (delta < 0 && event.get("after").get("quantity").asInt() < LOW_STOCK) {
triggerRestockAlert(event.get("after").get("sku").asText());
}
break;
case "d":
removeFromSearchIndex(record.key());
invalidateCache(record.key());
break;
case "c":
indexNewProduct(event.get("after"));
break;
}
}
Bootstrap State from Compacted Topic
// On startup: replay entire compacted topic → rebuild in-memory state (no DB call needed)
Map<String, Integer> inventoryCache = new HashMap<>();
consumer.subscribe(List.of("inventory-state"));
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
while (!isCaughtUp(consumer, endOffsets)) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.value() == null) {
inventoryCache.remove(record.key()); // tombstone = delete
} else {
inventoryCache.put(record.key(), Integer.parseInt(record.value()));
}
}
}
// inventoryCache = full current inventory state, loaded from Kafka alone
9. Exactly-Once Semantics (EOS)
The Three Delivery Guarantees
At-most-once: may LOSE messages, never duplicates
→ commit offset BEFORE processing
At-least-once: never lost, may DUPLICATE ← most common
→ commit offset AFTER processing
Exactly-once: never lost, never duplicated ← hardest, most important
Layer 1: Idempotent Producer
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Kafka assigns producer a PID + sequence number per partition
// Broker deduplicates: same PID + sequence arrives twice → second ignored
// Scope: single producer session, single partition
// Protects against: network retries creating duplicates
Layer 2: Transactions — Atomic Multi-Partition Writes
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "payment-processor-1"); // stable across restarts
producer.initTransactions();
try {
producer.beginTransaction();
// ALL or NOTHING — atomic across multiple partitions/topics
producer.send(new ProducerRecord<>("account-debits", accountId, debitJson));
producer.send(new ProducerRecord<>("account-credits", accountId, creditJson));
producer.send(new ProducerRecord<>("payment-events", paymentId, eventJson));
producer.commitTransaction(); // ALL visible atomically
} catch (ProducerFencedException e) {
producer.close(); // zombie — another instance with same ID took over
} catch (KafkaException e) {
producer.abortTransaction(); // NONE visible → safe to retry
}
Layer 3: Transactional Read-Process-Write (Full EOS)
// consume → process → produce → commit offsets — ALL atomic
// Consumer must use isolation.level=read_committed to only see committed transactions
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String processed = enrichPayment(record.value());
producer.send(new ProducerRecord<>("enriched-payments", record.key(), processed));
}
// Commit consumer offsets INSIDE the transaction — this is what makes it atomic
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
long lastOffset = records.records(tp).get(records.records(tp).size() - 1).offset();
offsets.put(tp, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// Records not committed → will be re-polled → safe retry
}
}
When to Use What
| System | Guarantee | Reason |
|---|---|---|
| Stripe payments | Exactly-once | Double charge = disaster |
| Walmart inventory | At-least-once | Duplicate stock update is idempotent (SET not ADD) |
| Click analytics | At-most-once | Losing 0.01% of clicks is fine, throughput > accuracy |
| Audit logs | At-least-once | Duplicates filtered by log ID downstream |
| Order fulfillment | Exactly-once | Ship twice = costly |
| Metrics/monitoring | At-most-once | Approximate is fine, latency matters more |
10. Brokers, Partitions & Memory
What is a Broker?
A broker is simply a Kafka server — a machine (or container) running the Kafka process that stores messages on disk and serves producers and consumers.
Your Kafka Cluster
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Broker 1 │ │ Broker 2 │ │ Broker 3 │
│ (server) │ │ (server) │ │ (server) │
└─────────────┘ └─────────────┘ └─────────────┘
Producers write to brokers.
Brokers store messages on disk.
Consumers read from brokers.
A cluster typically has 3, 6, or more brokers. Stripe might have dozens. LinkedIn has thousands.
What Does Each Broker Hold?
Each broker holds partitions — some as leader, some as follower replicas.
Topic: payment-events 6 partitions Replication Factor: 3
Broker 1: Leader of P0, P2 Follower of P1, P3, P4, P5
Broker 2: Leader of P1, P3 Follower of P0, P2, P4, P5
Broker 3: Leader of P4, P5 Follower of P0, P1, P2, P3
Every partition — whether leader or follower — consumes resources on that broker.
The 5MB Memory Buffer — What It Actually Is
For every partition it holds, a broker allocates memory buffers to handle reads and writes efficiently.
Per partition, per broker:
Producer buffer ~1MB holds incoming messages before writing to disk
Consumer buffer ~1MB holds messages being sent out to consumers
Replication buffer ~1MB holds messages being replicated to followers
Index + metadata ~2MB tracks offsets, file positions, partition state
Total: ~5MB per partition per broker
The 4K Partition Limit — Not the Same as 5MB
These are two separate facts that are related but not the same:
5MB = the cost of ONE partition on ONE broker (memory buffers)
4K = the maximum number of partitions recommended PER broker
5MB is one ingredient.
4K is the total limit when ALL costs combined become dangerous.
Each partition costs more than just memory:
Each partition on a broker costs:
1. ~5MB memory buffers ← the 5MB rule
2. File handles on disk ← OS has limits on open files
3. Network connections ← followers replicate from leader
4. Metadata tracked by ← Kafka controller tracks every
the Kafka controller partition state in the cluster
All four add up → at ~4K partitions per broker the combined
cost becomes dangerous, especially during failure and recovery.
Leader Election Overhead — Why Too Many Partitions Hurts
Every partition has one leader broker. When a broker crashes, all its leader partitions need a new leader elected.
Broker 1 crashes — it was leader of 500 partitions
Kafka Controller must now run election for each partition:
election for P0 → pick new leader from ISR (~10ms)
election for P1 → pick new leader from ISR (~10ms)
... 500 times
500 elections × 10ms = 5 seconds of disruption
Now imagine over-partitioned:
4000 elections × 10ms = 40 seconds of disruption
← producers getting errors
← consumers stopped receiving
← your system looks down
RAM Calculation — How Much Do You Actually Need?
Your math:
4000 partitions × 5MB = 20GB ← partition buffers alone ✓
But a broker needs more than just partition buffers:
Partition buffers 20GB your calculation, correct
JVM heap (Kafka) 6GB recommended minimum for broker process
OS page cache 10GB+ Kafka relies on this for fast reads (see below)
OS itself + other 2GB
Realistic minimum: ~38GB
Production standard: 64GB–128GB per broker
Page Cache — The Part Most People Miss
Kafka does not read from disk directly when serving consumers. It reads from the OS page cache — messages the OS has cached in RAM from recent disk writes.
Producer writes message → saved to disk → OS also caches it in RAM (page cache)
Consumer reads message → Kafka serves it from page cache, NOT from disk
← this is why Kafka is so fast
More page cache RAM = more messages served from RAM = lower latency
Less page cache RAM = more disk reads = slower consumers
Full Picture Together
More partitions =
1. More memory consumed (5MB × partitions × brokers)
2. Slower failover (election per partition on broker crash)
3. More file handles on disk (each partition = multiple segment files)
4. Slower consumer rebalancing (more partitions to reassign)
~4K per broker is the practical ceiling where these costs become dangerous.
RAM sizing summary:
20GB = floor (partition buffers only)
38GB = realistic minimum
64GB–128GB = what you actually provision in production
11. Partition Count Math
Target throughput: 500 MB/s ingress
Single partition: ~10 MB/s (typical broker write speed)
Partitions needed: 500 / 10 = 50 partitions minimum
Consumer parallelism: 20 consumer instances max planned → need at least 20 partitions
50 satisfies both constraints
Final decision: 60 partitions (next multiple of common consumer counts: 2,3,4,5,6)
→ can run 60,30,20,15,12,10,6,5,4,3,2,1 consumers evenly
Replication factor: 3 (tolerate 2 broker failures, min.insync.replicas=2)
12. Key Answers Cheat Sheet
"Who decides which partition a message goes to?"
The Producer, via a Partitioner class. Default: murmur2 hash of key mod partition count for keyed messages; sticky partitioning for null-key messages. Fully overridable with a custom Partitioner.
"How does a consumer know which partition to consume?"
It doesn't choose — it subscribes to a topic and the Group Coordinator assigns partitions via a PartitionAssignor. The consumer uses CooperativeStickyAssignor in production to avoid stop-the-world rebalances.
"What is ISR and why does it matter?"
ISR is the set of replicas caught up with the leader within replica.lag.time.max.ms. Combined with acks=all and min.insync.replicas=2, it guarantees writes are only acknowledged when durably replicated. If ISR falls below min.insync.replicas, writes fail fast rather than silently losing data.
"Explain exactly-once semantics."
Three layers: idempotent producer (deduplicates retries per partition), transactions (atomic multi-partition writes), and transactional read-process-write with sendOffsetsToTransaction (atomic consume + produce + offset commit). Consumer needs isolation.level=read_committed to only see committed data.
"How do you handle a hot partition?"
Diagnose with consumer lag metrics per partition. Fix options: composite key with shard suffix to spread load, custom partitioner for business-driven routing, or separate topics for fundamentally different traffic classes. Always backed by throughput math — partitions = target MB/s / 10 MB/s per partition.
"The tradeoff triangle"
Every Kafka config decision trades off throughput, durability, and latency. Stripe sacrifices throughput for durability (acks=all, linger.ms=0). Walmart analytics sacrifices durability for throughput (acks=1, linger.ms=20). Justify your choice for the system being designed.
13. System Design — Kafka Usage, Partition Keys & Hot Key Solutions
Stripe — Payment Processing
Where Kafka is used:
Payment submitted → Kafka → reconciliation, webhook delivery, fraud detection, analytics
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
payment-events |
paymentId |
All events for one payment stay ordered |
account-events |
accountId |
Debit + credit for same account stay ordered |
fraud-signals |
userId |
All fraud signals per user in sequence |
webhook-delivery |
merchantId |
All webhooks per merchant in order |
Hot Key Problem:
Key = countryCode → "US" = 80% of all payments → P2 overwhelmed
Fix: composite key with shard
key = countryCode + "-" + (paymentId.hashCode() % 10)
→ "US-0", "US-1" ... "US-9"
→ US traffic spread across 10 effective slots
Tradeoff: lose strict global ordering across all US payments
keep ordering within each shard (acceptable for Stripe)
acks setting: acks=all — double charge = disaster, durability over throughput.
Uber — Real-Time Location Tracking
Where Kafka is used:
Driver GPS update (every 4 seconds) → Kafka → routing, surge pricing, ETA, analytics
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
driver-location |
driverId |
All location updates for one driver in order |
ride-events |
rideId |
All events for one ride stay ordered |
surge-signals |
cityZoneId |
Group signals by geographic zone |
driver-status |
driverId |
Status changes per driver in sequence |
Hot Key Problem:
Key = cityId → "NYC" has 50,000 active drivers
→ All NYC updates hammer one partition
Fix 1: composite key with geohash
key = geohash(lat, lng, precision=6)
→ splits city into ~1km grid cells
→ "dr5ru", "dr5rv", "dr5rw" ... each gets own partition
Fix 2: separate topic per city tier
topic: driver-location-tier1 (NYC, LA, Chicago — high volume)
topic: driver-location-tier2 (mid-size cities)
topic: driver-location-tier3 (small cities)
→ independent scaling per tier
acks setting: acks=1 — losing one GPS ping is acceptable, latency matters more.
Walmart — Inventory Management
Where Kafka is used:
POS sale → Kafka → inventory update, restock alerts, analytics, CDC to search index
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
inventory-updates |
skuId |
All updates for one SKU ordered — no race conditions |
inventory-state |
skuId |
Compacted topic — current state per SKU |
restock-alerts |
warehouseId |
Group alerts by warehouse for local processing |
sales-events |
storeId |
All sales per store in order |
price-changes |
skuId |
Price updates per SKU stay ordered |
Hot Key Problem:
Key = categoryId → "Electronics" category has 10,000 SKUs
→ All electronics inventory updates go to one partition
Fix: use skuId as key, not categoryId
→ each SKU gets naturally distributed across partitions
→ ordering guaranteed per SKU (what actually matters)
→ no artificial grouping that creates hot spots
acks setting: acks=1 — duplicate inventory update is idempotent (SET quantity, not ADD).
Special: inventory-state is a compacted topic — keeps only latest quantity per SKU.
LinkedIn — Activity Feed & Notifications
Where Kafka is used:
User action (post, like, connect) → Kafka → feed ranking, notifications, search index, analytics
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
user-activity |
userId |
All actions per user in order for feed ranking |
notifications |
recipientId |
All notifs for one user delivered in order |
connection-events |
userId |
Connection graph updates per user |
feed-updates |
userId |
Feed items per user in sequence |
Hot Key Problem:
Key = userId → celebrity/influencer with 5M followers
→ Every action by that user fans out to millions of notifications
→ Single userId key = one partition overwhelmed
Fix: fan-out at consumer side, not producer side
Producer: key = sourceUserId (stays on one partition — correct)
Consumer: reads event → fans out to follower notification queues
topic: notification-fanout key = recipientId + "-" + bucket(0-9)
→ spread notification delivery across shards per recipient
acks setting: acks=1 — losing a feed update is acceptable, throughput is critical at LinkedIn scale (7 trillion messages/day).
Netflix — Video Streaming Events
Where Kafka is used:
Play/pause/seek/error events → Kafka → recommendations, billing, A/B testing, error tracking
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
playback-events |
sessionId |
All events for one viewing session in order |
user-events |
userId |
User behaviour in sequence for recommendations |
error-events |
deviceType |
Group errors by device for monitoring |
billing-events |
accountId |
Billing events per account ordered |
Hot Key Problem:
Key = contentId → "Stranger Things S4 premiere"
→ 10M concurrent viewers all generate events with same contentId
→ one partition overwhelmed
Fix: key = sessionId (not contentId)
→ each viewing session is independent
→ naturally distributed across all partitions
→ you still know which content via the message value, not the key
Rule: The key controls distribution and ordering. If you don't need ordering across sessions for the same content, don't use contentId as the key.
Twitter/X — Tweet & Timeline Events
Where Kafka is used:
Tweet posted → Kafka → timeline fanout, search index, trends, moderation, analytics
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
tweet-events |
tweetId |
All events for one tweet (edit, delete) ordered |
timeline-fanout |
followerId |
Fanout delivery per follower |
trend-signals |
hashtag |
Group signals per hashtag for trend calculation |
moderation-queue |
null |
No ordering needed, max throughput |
Hot Key Problem:
Key = hashtag → #WorldCup trending
→ millions of tweets per minute with same hashtag
→ one partition overwhelmed
Fix 1: composite key with time bucket
key = hashtag + "-" + (epochSecond / 10)
→ "#WorldCup-171400000", "#WorldCup-171400001" ...
→ each 10-second window gets its own partition slot
→ trend aggregation merges buckets downstream
Fix 2: null key for ingestion, aggregate downstream
→ tweets distributed evenly across partitions
→ Kafka Streams or Flink aggregates by hashtag with windowed counts
→ trend topic emits results per hashtag
Ride-Share / Food Delivery (Generic — DoorDash, Lyft)
Where Kafka is used:
Order placed → Kafka → driver matching, restaurant notification, ETA, payment, analytics
Topics & Keys:
| Topic | Partition Key | Why |
|---|---|---|
order-events |
orderId |
All order lifecycle events in sequence |
driver-assignments |
driverId |
All assignments per driver ordered |
restaurant-events |
restaurantId |
All events per restaurant in order |
delivery-tracking |
orderId |
All tracking updates per order in sequence |
Hot Key Problem:
Key = restaurantId → popular restaurant gets 500 orders/minute during lunch
→ one partition overwhelmed
Fix: composite key
key = restaurantId + "-" + (orderId.hashCode() % 5)
→ "rest-123-0", "rest-123-1" ... "rest-123-4"
→ spreads load while keeping most restaurant orders grouped
→ 5 shards = 5x throughput for hot restaurants
Hot Key — Universal Fix Patterns
No matter the system, hot keys are solved with the same small set of patterns:
Pattern 1: Composite Key with Shard Suffix
// When: key is naturally skewed (country, category, popular entity)
String key = naturalKey + "-" + (entityId.hashCode() % NUM_SHARDS);
// Tradeoff: lose strict global ordering for that key — acceptable if per-entity ordering is enough
Pattern 2: Geohash Key (location data)
// When: key is a location (city, region)
String key = GeoHash.encode(lat, lng, precision); // precision=6 → ~1km cells
// Tradeoff: nearby cells may have different load — tune precision to balance
Pattern 3: Time-Bucketed Key (trending/aggregation)
// When: key is a trending entity (hashtag, event)
String key = naturalKey + "-" + (Instant.now().getEpochSecond() / BUCKET_SIZE_SECONDS);
// Tradeoff: downstream must merge buckets to get full aggregation
Pattern 4: Null Key + Downstream Aggregation
// When: ordering at ingest doesn't matter, aggregate later
producer.send(new ProducerRecord<>("events", null, value)); // max throughput
// Kafka Streams or Flink aggregates by real key downstream
// Tradeoff: no ordering guarantee at ingest — only use when truly not needed
Pattern 5: Separate Topics by Tier
// When: a small number of entities generate disproportionate volume
producer.send(new ProducerRecord<>("events-tier1", key, value)); // top 100 entities
producer.send(new ProducerRecord<>("events-tier2", key, value)); // everyone else
// Tradeoff: need to classify at produce time, manage two consumer pipelines
Quick Reference — All Systems
| System | Key Topic | Partition Key | Hot Key Risk | Fix |
|---|---|---|---|---|
| Stripe | payment-events |
paymentId |
countryCode |
Composite with shard |
| Uber | driver-location |
driverId |
cityId |
Geohash key |
| Walmart | inventory-updates |
skuId |
categoryId |
Use skuId not category |
user-activity |
userId |
Celebrity userId | Fan-out at consumer | |
| Netflix | playback-events |
sessionId |
contentId |
Use sessionId not content |
tweet-events |
tweetId |
Trending hashtag | Time-bucketed key | |
| DoorDash | order-events |
orderId |
Popular restaurant | Composite with shard |