Summary
What this post covers: A production-grade walkthrough of writing Kafka consumers in Python with confluent-kafka-python — consumer groups, the rebalance protocol, offset management, delivery semantics, Schema Registry deserialization, dead letter queues, and lag monitoring — intended for engineers who already understand producers and now must ensure correctness on the consumer side.
Key insights:
- Producers transmit bytes while consumers ensure correctness — virtually every notable Kafka production bug originates on the consumer side because consumers carry state (the read position) that producers do not.
- Partition count is the absolute ceiling on consumer parallelism: extra consumers beyond the partition count remain idle, which makes the producer-side num.partitions decision a downstream consumer constraint.
- The cooperative rebalance protocol (incremental cooperative assignor) is strictly preferable to the legacy eager protocol for production workloads, as it avoids the stop-the-world partition revocation that disrupts long-running handlers.
- Silent lag is the leading cause of Kafka data loss in practice; a consumer group operating at 8,000 messages per second beneath a 12,000 messages-per-second producer can accumulate hundreds of millions of unread messages within a day and lose them to retention before the issue is detected.
- A healthy consumer combines four error-handling strategies — skip, retry with backoff, DLQ, and circuit break — and a correctly constructed DLQ preserves the original raw bytes alongside origin headers rather than a re-serialized representation of the poison pill.
Main topics: why consumers are the hard part of Kafka, consumer groups and partition assignment, the rebalance protocol (eager vs. cooperative), offset management and delivery semantics, the polling loop internals, a full production-ready Python consumer, error handling and dead letter queues, consumer lag monitoring, and scaling and stateful processing.
This post examines why Kafka consumer correctness, rather than producer throughput, determines the reliability of a streaming pipeline in production. A producer that lands 100,000 messages per second offers no value if the downstream consumer falls behind and never recovers. One representative incident involved a team that celebrated a new producer throughput record at 2 a.m., only to receive a page at 6 a.m. because the downstream consumer group had accumulated forty million unprocessed messages overnight, the retention window was about to evict the oldest records, and no lag alerts had been configured. The producer was operating correctly. The consumer was failing. By morning, the data had been discarded.
This is the practical reality of Kafka in production: producers are largely stateless and forgiving, while consumers are where the genuine distributed-systems problems reside. A consumer must track what has been read, coordinate with peers, survive rebalances without losing work, handle deserialization failures, decide what “done” means, and do all of this while keeping pace with a stream that does not slow down. When this is implemented incorrectly, the result is either dropped messages, endless reprocessing, or a pipeline so far behind that a nominally real-time system effectively becomes a batch job.
This post serves as the consumer-side companion to the Kafka producer guide for multivariate time-series ingestion, which covered Avro schemas, partitioning strategy, and producer configuration for collecting server metrics. A reader who has already worked through that material will have a topic full of Avro-encoded records sitting on a broker, awaiting consumption. The present post addresses that consumption process. The discussion covers consumer groups, the rebalance protocol, offset commits, the three delivery guarantees, the internal behaviour of the polling loop, Schema Registry deserialization, dead letter queues, lag monitoring, and a complete working Python implementation using confluent-kafka-python.
Why Consumers Are the Hard Part of Kafka
When a Kafka producer is written, the broker performs most of the difficult work. The producer hands the broker a record; the broker acknowledges receipt, decides which partition to write to, replicates the record, and returns a committed offset. If the producer crashes mid-batch, the client library retries idempotently, and when the process restarts it does not need to remember anything beyond its own configuration. Producers behave almost like pure functions: data in, acknowledgment out.
Consumers are not pure functions. A consumer must continually answer a question the producer never faces: at what offset did processing last leave off? That state resides in the __consumer_offsets internal topic, but the consumer must decide when to write to it, what to write, and how to resolve a disagreement between its local view of progress and the broker’s. The consumer must also share work with its peers, and those peers may join, leave, crash, or lag at any moment. When that happens, the group rebalances, partitions are withdrawn from running code, and whatever in-memory state the handler accumulated must be either committed, flushed, or safely discarded.
Adding deserialization compounds the difficulty. The producer writes Avro bytes with a Schema Registry ID prefix. The consumer must decode those bytes, match the schema, and handle the case in which the producer used a new schema version that the consumer has never encountered. Error handling adds another layer of decisions. When a record cannot be processed, the consumer must determine whether to retry indefinitely and block the partition, skip and discard the record, or route it elsewhere for human review.
The factor that ultimately undermines more Kafka deployments than any other is lag. A consumer group can appear to be working — no errors, no crashes, normal CPU utilisation — while processing 8,000 messages per second beneath producers writing 12,000 per second. The group falls behind by 4,000 messages per second. If this remains undetected for a day, the backlog reaches 345 million messages, and recovery requires either adding consumers or accepting that retention will delete unread data. Silent lag is the primary cause of Kafka data loss in practice, and it is exclusively a consumer-side problem.
The remainder of this post addresses each of these concerns in turn, supported by working code.
How Consumer Groups and Partition Assignment Work
The consumer group is the unit of parallelism in Kafka. When a consumer starts, it is assigned a group.id. Every consumer with the same group ID forms a single logical subscriber, and Kafka guarantees that each partition of the subscribed topics is delivered to exactly one member of that group at a time. Two consumers in the same group will never see the same partition. Two consumers in different groups will both receive every message independently, which is how a single topic fans out to multiple downstream systems.
Inside a group, one broker is designated as the group coordinator. The coordinator tracks group membership, handles joins and leaves, runs the rebalance protocol, and persists committed offsets. When a consumer calls subscribe() and starts polling, it sends a JoinGroup request to the coordinator, which either admits it into an existing group or initialises a new one. One consumer in the group is elected as the group leader, and it is the leader (not the coordinator) that computes the partition assignment. The leader runs the configured partition.assignment.strategy locally and sends the result back to the coordinator, which then distributes it to all members.
This design has one consequence that surprises newcomers and contributes to many production outages: a group cannot have more working consumers than partitions. If a topic has six partitions and eight consumers join the same group, two will remain idle, consuming nothing. They are not malfunctioning — they joined the group, received zero partitions, and will wait to take over if another member fails. This is why partition count is the absolute ceiling on consumer parallelism, and why the producer-side decision about num.partitions has significant downstream consequences.
The assignment strategy — partition.assignment.strategy — controls how the group leader divides partitions among members. Kafka provides four built-in strategies, and the differences between them are significant when a group contains dozens of consumers or when rebalances occur frequently.
| Strategy | Behavior | Rebalance Cost | When to Use |
|---|---|---|---|
| Range | Per-topic contiguous ranges. Default for historical compatibility. | Stop-the-world | Legacy workloads, or when you specifically want co-partitioning across topics for joins. |
| RoundRobin | Distributes evenly across all subscribed partitions. | Stop-the-world | Stateless processing where balance matters more than locality. |
| Sticky | Balanced, but preserves as much of the prior assignment as possible. | Stop-the-world (reduced churn) | Warm caches, expensive state rebuild, or large groups. |
| CooperativeSticky | Sticky plus incremental/cooperative rebalancing. | Non-stop; only moved partitions pause | Recommended default for new deployments. Safer scaling and rolling restarts. |
The Rebalance Protocol: Eager vs Cooperative
A rebalance is the process by which a consumer group redistributes partitions among its members. Rebalances occur for several reasons: a consumer joins the group, a consumer leaves cleanly, a consumer fails (its session times out), the partition count of the subscribed topic changes, or an operator triggers one manually. From a correctness standpoint, rebalances are the single most hazardous event in a consumer’s lifecycle. From a latency standpoint, they often represent the worst-case latency outlier.
Originally, Kafka employed eager rebalancing, also called the stop-the-world model. When a rebalance is triggered, every member of the group revokes all of its partitions, sends a JoinGroup request, waits for the leader to compute the new assignment, and then receives its new partition set. During that window, which can extend from hundreds of milliseconds to tens of seconds in unhealthy clusters, no member is processing anything. In a group of 200 consumers, if one member is slow to respond to JoinGroup, the other 199 remain idle. Furthermore, once the rebalance completes, some consumers receive the same partitions back, so the revoke-and-reassign cycle constituted pure overhead.
Cooperative rebalancing, introduced in KIP-429 and stable since Kafka 2.4, addresses this problem. Instead of revoking all partitions at once, the protocol proceeds in two phases. In the first phase, every member reports its current ownership. The leader computes the new assignment and identifies only the partitions that actually need to move from consumer X to consumer Y. Only those partitions are revoked. Consumers that are not losing any partitions continue processing throughout. A second phase then assigns the moved partitions to their new owners. The end-to-end rebalance time may be longer, but the observable pause on any individual partition is reduced substantially.
To enable cooperative rebalancing, set partition.assignment.strategy to cooperative-sticky. A mixed group may run temporarily during migration by listing both strategies; Kafka will negotiate down to the common one. The objective, however, is for all members to adopt the cooperative strategy.
max.poll.interval.ms because the processing loop has stalled. Each eviction-and-rejoin cycle triggers a full group rebalance. The symptoms are periodic latency spikes and continuous “Group is rebalancing” log lines. The remedy is almost never to increase the timeout; it is to fix the slow handler or reduce max.poll.records.
There is a second, more subtle consequence of rebalances: any in-memory state becomes invalid the moment a partition is revoked. If the consumer has been accumulating per-partition buffers, counts, or deduplication caches, these must be flushed or committed before the partition departs. The on_revoke callback is where this occurs, and handling it correctly is one of the most common sources of data-loss bugs in Kafka consumers.
Offset Management and Delivery Semantics
Every message in a Kafka partition has a monotonic offset: 0, 1, 2, 3, and so on. A consumer reads from a starting offset, processes the records, and periodically informs the broker that it has processed up to offset N on partition P. That commit is stored in the internal __consumer_offsets topic, keyed by (group, topic, partition). When a consumer restarts, or a rebalance moves a partition to a new owner, the new owner reads that committed offset and resumes from there.
The key decision is when to commit. Kafka exposes two modes:
- Auto-commit (
enable.auto.commit=true): the client library commits offsets in the background everyauto.commit.interval.ms(default 5 seconds). It commits whatever was returned by the most recentpoll(), regardless of whether the handler actually finished processing those records. The mode is simple but hazardous: if the process crashes after the offset was committed but before the handler completed, those records are lost. If it crashes before the next commit, the last five seconds of records are reprocessed. - Manual commit (
enable.auto.commit=false): the application callscommit()explicitly, either synchronously or asynchronously, deciding for itself when processing is complete. This is the only mode suitable for production when correctness matters.
From that single decision arises the entire delivery-semantics discussion, which is fundamentally a question of how commits are ordered relative to side effects.
At-most-once means the offset is committed before processing the record. If the code crashes between the commit and the side effect, the record is lost permanently. The broker assumes the record was handled, and the next poll will skip past it. The trade-off is zero duplicates at the cost of silent record loss. This mode is rarely chosen deliberately, and when it is, the typical use case is high-volume metrics where a few dropped samples are tolerable and duplicates would corrupt a downstream counter.
At-least-once means processing occurs first, followed by the commit. If a crash occurs between processing and committing, the record is redelivered on restart and processed again. This is the default for nearly every pipeline. The cost is that the handler must be idempotent, or a downstream sink must absorb duplicates through an upsert into a keyed table, a deduplication window, or a content hash. For the server-metrics pipeline described in the companion producer post, an InfluxDB sink is naturally idempotent because writes with the same timestamp, tags, and field overwrite earlier values.
Exactly-once semantics are achievable in Kafka, but only under specific conditions. For Kafka-to-Kafka pipelines, the producer-consumer transaction API permits the atomic commit of both output records and input offsets as a single transaction. Any downstream consumer reading with isolation.level=read_committed sees only records from committed transactions. For Kafka-to-external-system pipelines, exactly-once requires either an idempotent sink (so at-least-once is effectively exactly-once) or a two-phase commit protocol between Kafka and the sink, which is rarely implemented by hand; most teams use Kafka Connect with a transactional sink, or Apache Flink with its own checkpoint-and-commit machinery.
Inside the Polling Loop
The central mechanism of any Kafka consumer is the polling loop. Every call to consumer.poll(timeout) performs three functions: it fetches records from the broker, sends heartbeats to the group coordinator, and runs rebalance callbacks if the group state has changed. If poll() is not called frequently enough, the coordinator assumes the consumer has died and evicts it from the group.
Three timeouts govern this behaviour, and their interaction is the source of most consumer bugs:
| Config | Default | What It Controls |
|---|---|---|
session.timeout.ms |
45000 (45s) | Max time the coordinator will wait for a heartbeat before declaring the consumer dead and triggering a rebalance. |
heartbeat.interval.ms |
3000 (3s) | How often the background heartbeat thread pings the coordinator. Must be well below session timeout. |
max.poll.interval.ms |
300000 (5 min) | Max time between two consecutive poll() calls. If you exceed this, the consumer is kicked from the group even if heartbeats are still flowing. |
max.poll.records |
500 | Maximum records returned per poll() call. Combined with max.poll.interval.ms, this caps how long you can spend processing one batch. |
fetch.min.bytes |
1 | Minimum bytes a broker should accumulate before responding. Larger values improve throughput at the cost of latency. |
fetch.max.wait.ms |
500 | How long a broker will wait to accumulate fetch.min.bytes before responding anyway. |
Since Kafka 0.10.1, heartbeats have been sent from a background thread independent of poll(), which is why max.poll.interval.ms exists as a separate safeguard. Without it, a consumer could remain stuck inside a slow handler for an hour, never polling and never processing anything, yet still sending heartbeats and holding its partitions. The max.poll.interval.ms setting handles exactly this case: if poll() is not called frequently enough, the consumer is removed from the group regardless of how active the heartbeat thread is.
The appropriate mental model is to poll often, process quickly, and commit explicitly. If the handler is slow, reduce max.poll.records so each batch is smaller, or move heavy work off the polling thread onto a worker pool with a bounded queue so that poll() is still called frequently. Increasing max.poll.interval.ms should never be the first response, as it merely degrades dead-consumer detection latency without addressing the underlying problem.
A Full Production-Ready Python Consumer
The following is a complete working consumer using confluent-kafka-python, which wraps the production-proven librdkafka C library and is the appropriate choice for any serious Python workload. It connects to the broker, uses Schema Registry for Avro deserialization (matching the companion producer), processes messages manually, commits offsets after successful processing, routes failures to a DLQ topic, and shuts down gracefully on SIGTERM. It also registers a rebalance listener so that state can be flushed on revoke.
First, a minimal set of configuration values. These reside in environment variables so the same binary runs in development and production.
# consumer_config.py
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class ConsumerConfig:
bootstrap_servers: str
schema_registry_url: str
group_id: str
topic: str
dlq_topic: str
auto_offset_reset: str = "earliest"
@classmethod
def from_env(cls) -> "ConsumerConfig":
return cls(
bootstrap_servers=os.environ["KAFKA_BOOTSTRAP_SERVERS"],
schema_registry_url=os.environ["SCHEMA_REGISTRY_URL"],
group_id=os.environ.get("KAFKA_GROUP_ID", "metrics-consumer"),
topic=os.environ.get("KAFKA_TOPIC", "server-metrics"),
dlq_topic=os.environ.get("KAFKA_DLQ_TOPIC", "server-metrics-dlq"),
auto_offset_reset=os.environ.get("AUTO_OFFSET_RESET", "earliest"),
)
The main consumer follows. The structure should be read top to bottom, as it provides a production template suitable for cloning for any new consumer.
# metrics_consumer.py
import json
import logging
import signal
import sys
import time
from typing import Any
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException, TopicPartition
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField
from consumer_config import ConsumerConfig
log = logging.getLogger("metrics_consumer")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
)
class MetricsConsumer:
def __init__(self, cfg: ConsumerConfig):
self.cfg = cfg
self._running = True
self.consumer = Consumer({
"bootstrap.servers": cfg.bootstrap_servers,
"group.id": cfg.group_id,
"auto.offset.reset": cfg.auto_offset_reset,
# Correctness: manual commit after successful processing.
"enable.auto.commit": False,
# Cooperative rebalancing: safer scaling, less stop-the-world.
"partition.assignment.strategy": "cooperative-sticky",
# Session timeouts tuned for a well-behaved handler.
"session.timeout.ms": 45000,
"heartbeat.interval.ms": 3000,
"max.poll.interval.ms": 300000,
# Throughput / latency tuning.
"fetch.min.bytes": 1024 * 64, # 64 KB
"fetch.max.wait.ms": 250,
"max.partition.fetch.bytes": 1024 * 1024, # 1 MB
# Only see committed transactional records if the producer uses txns.
"isolation.level": "read_committed",
# Give the consumer a stable client id for lag tooling and logs.
"client.id": f"{cfg.group_id}-{int(time.time())}",
})
# Schema Registry wiring. The producer in the companion post
# wrote Avro with a magic byte + schema ID prefix; this decodes it.
sr_client = SchemaRegistryClient({"url": cfg.schema_registry_url})
self.deserializer = AvroDeserializer(
schema_registry_client=sr_client,
# schema_str=None lets the deserializer fetch by ID from each message.
)
# DLQ producer. Stateless from our point of view; just a sink.
self.dlq_producer = Producer({
"bootstrap.servers": cfg.bootstrap_servers,
"enable.idempotence": True,
"acks": "all",
"compression.type": "zstd",
"linger.ms": 20,
})
signal.signal(signal.SIGTERM, self._on_signal)
signal.signal(signal.SIGINT, self._on_signal)
def _on_signal(self, signum, frame):
log.info("received signal %s, shutting down", signum)
self._running = False
def _on_assign(self, consumer, partitions):
log.info("assigned partitions: %s",
[(p.topic, p.partition) for p in partitions])
# If you kept local state keyed by partition, restore it here.
def _on_revoke(self, consumer, partitions):
log.info("revoked partitions: %s",
[(p.topic, p.partition) for p in partitions])
# Last chance to flush in-memory state before partitions move away.
try:
consumer.commit(asynchronous=False)
except KafkaException as e:
log.warning("final commit on revoke failed: %s", e)
def _on_lost(self, consumer, partitions):
# Triggered when the consumer has lost ownership without a clean revoke
# (e.g. session timeout). Do NOT commit — the offsets are no longer ours.
log.warning("partitions lost: %s",
[(p.topic, p.partition) for p in partitions])
def run(self) -> None:
self.consumer.subscribe(
[self.cfg.topic],
on_assign=self._on_assign,
on_revoke=self._on_revoke,
on_lost=self._on_lost,
)
try:
while self._running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
self._handle_kafka_error(msg.error())
continue
try:
payload = self._deserialize(msg)
self._handle_record(payload, msg)
# Store offset; commit below will use it.
# store_offsets + periodic commit keeps throughput high
# compared to committing after every single record.
self.consumer.store_offsets(message=msg)
except PoisonPillError as e:
log.error("poison pill on %s[%d]@%d: %s",
msg.topic(), msg.partition(), msg.offset(), e)
self._route_to_dlq(msg, reason=str(e))
# Advance past the bad record so we don't block the partition.
self.consumer.store_offsets(message=msg)
except RetriableError as e:
log.warning("retriable error, will replay: %s", e)
# Do NOT store offset — next poll will retry the same record.
time.sleep(1.0)
# Commit roughly every second in batches for throughput.
self._maybe_commit()
finally:
self._shutdown()
def _deserialize(self, msg) -> dict[str, Any]:
try:
ctx = SerializationContext(msg.topic(), MessageField.VALUE)
value = self.deserializer(msg.value(), ctx)
if value is None:
raise PoisonPillError("deserialized to None")
return value
except Exception as e:
raise PoisonPillError(f"deserialization failed: {e}") from e
def _handle_record(self, payload: dict[str, Any], msg) -> None:
# ---- YOUR BUSINESS LOGIC LIVES HERE ----
# Must be idempotent (at-least-once semantics).
# Example: upsert into InfluxDB / TimescaleDB / Iceberg by (host, timestamp).
host = payload.get("host")
ts = payload.get("timestamp")
cpu = payload.get("cpu_percent")
if not host or ts is None:
raise PoisonPillError("missing required fields host/timestamp")
log.debug("ingest host=%s ts=%s cpu=%s", host, ts, cpu)
_last_commit_ts = 0.0
def _maybe_commit(self) -> None:
now = time.monotonic()
if now - self._last_commit_ts >= 1.0:
try:
self.consumer.commit(asynchronous=True)
self._last_commit_ts = now
except KafkaException as e:
log.warning("async commit failed: %s", e)
def _handle_kafka_error(self, err) -> None:
if err.code() == KafkaError._PARTITION_EOF:
return # benign
log.error("kafka error: %s", err)
if not err.retriable():
raise KafkaException(err)
def _route_to_dlq(self, msg, reason: str) -> None:
headers = [
("original_topic", msg.topic().encode()),
("original_partition", str(msg.partition()).encode()),
("original_offset", str(msg.offset()).encode()),
("error_reason", reason.encode()),
("failed_at", str(int(time.time() * 1000)).encode()),
]
self.dlq_producer.produce(
topic=self.cfg.dlq_topic,
key=msg.key(),
value=msg.value(), # preserve raw bytes for forensic replay
headers=headers,
)
self.dlq_producer.poll(0)
def _shutdown(self) -> None:
log.info("flushing DLQ producer")
self.dlq_producer.flush(10)
log.info("committing final offsets")
try:
self.consumer.commit(asynchronous=False)
except KafkaException as e:
log.warning("final commit failed: %s", e)
self.consumer.close()
log.info("consumer closed cleanly")
class PoisonPillError(Exception):
"""Record cannot be processed and should be routed to the DLQ."""
class RetriableError(Exception):
"""Transient failure — do not commit, retry on next poll."""
def main() -> int:
cfg = ConsumerConfig.from_env()
MetricsConsumer(cfg).run()
return 0
if __name__ == "__main__":
sys.exit(main())
Several details in this code are load-bearing and merit explicit attention.
The implementation uses store_offsets plus periodic commit rather than committing after each message. store_offsets updates the client’s in-memory record of the next offset to commit, and commit then sends that snapshot to the broker. Committing after every single record creates substantial latency at high throughput; committing approximately every second batches the work while limiting worst-case replay to roughly one second of records.
The on_revoke callback calls commit(asynchronous=False). This is the final synchronous commit before the partition is withdrawn. If it is omitted, any records processed since the last periodic commit will replay after the rebalance — not a correctness violation under at-least-once semantics, but a significant inefficiency. The on_lost callback deliberately does not commit, because by the time it executes, another consumer may already own those partitions, and a commit would be incorrect.
Poison pills advance the offset; retriable errors do not. This distinguishes “this record will never succeed, skip it and log” from “this record may succeed on a subsequent attempt, do not move the offset.” Conflating the two leads to infinite replay loops.
Error Handling and Dead Letter Queues
Every running consumer eventually encounters a message it cannot process. The cause may be a bug in the producer, an Avro schema incompatibility, a field that is technically valid but semantically incorrect, or a downstream service rejecting writes for reasons unrelated to the record. How that record is handled determines whether the pipeline continues or stalls.
There are four broad strategies, and a healthy consumer uses at least three of them at different points:
- Skip. Log the record, advance the offset, and continue. Appropriate when the record is genuinely unprocessable and loss is acceptable, such as corrupted telemetry or malformed log lines.
- Retry with backoff. Do not commit, pause briefly, and allow the next poll to redeliver. Appropriate for transient failures such as downstream HTTP timeouts, temporary database connection drops, or rate limits. Cap the retries to avoid blocking the partition indefinitely.
- Route to a DLQ topic. Produce the raw bytes, headers, and failure metadata to a separate dead-letter topic, then advance the offset. A human operator or scheduled job can later inspect the DLQ, fix the underlying bug, and optionally replay the records. This is the appropriate default for almost all poison-pill cases in production.
- Circuit break. If the error rate exceeds a threshold, pause consumption entirely and page an operator. This prevents the DLQ from accumulating millions of messages because a downstream service is unavailable.
The DLQ pattern merits additional attention because it is frequently implemented incorrectly. A well-formed DLQ record preserves the original raw bytes of the value, so it can still be deserialized using whatever schema was current at produce time, and includes headers with the original topic, partition, offset, error reason, and timestamp. A poison pill should never be re-serialized into a tidier representation for the DLQ, as doing so destroys the evidence needed for diagnosis. The snippet above handles this correctly by passing msg.value() through unchanged.
DLQ topics should have their own retention, longer than the main topic, because investigators require time to examine failures. They also require their own monitoring. A DLQ that silently grows is almost as harmful as a consumer that silently lags. Alerting should consider DLQ production rate in addition to main-consumer lag.
Consumer Lag Monitoring
Consumer lag is the difference, per partition, between the latest offset produced and the latest offset committed by a consumer group. A lag of zero indicates the consumer is fully caught up. Positive and growing lag indicates the consumer is falling behind. Positive lag stable at a small value indicates steady-state, healthy operation. Large positive lag signals an imminent incident.
The simplest way to inspect lag is from the command line:
# Show lag for a group
kafka-consumer-groups.sh \
--bootstrap-server broker:9092 \
--describe \
--group metrics-consumer
# Output (truncated):
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# metrics-consumer server-metrics 0 1047329 1048210 881
# metrics-consumer server-metrics 1 1046118 1047002 884
# metrics-consumer server-metrics 2 1045884 1053991 8107
# Reset a group to the beginning of a topic
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
--group metrics-consumer --topic server-metrics \
--reset-offsets --to-earliest --execute
# Reset to a specific timestamp (replay last hour)
kafka-consumer-groups.sh --bootstrap-server broker:9092 \
--group metrics-consumer --topic server-metrics \
--reset-offsets --to-datetime 2026-04-12T13:00:00.000 --execute
In production, lag should be exported as a metric with associated alerting. Two widely used tools are LinkedIn’s Burrow, which includes a sliding-window evaluator that classifies groups as OK, WARN, or ERR based on whether they are stuck or falling behind, and Kafka Lag Exporter, which exposes lag as Prometheus metrics (kafka_consumergroup_group_lag and kafka_consumergroup_group_lag_seconds).
Alerting on raw lag count is generally unreliable, as a burst of produces can spike lag without indicating a genuine problem. Alerting on lag in seconds — the age of the oldest unread record — is far more informative, because it corresponds directly to the SLA the consumer is expected to meet.
| Lag in Seconds | Severity | Action |
|---|---|---|
| < 10s | Healthy | Normal operation. |
| 10s – 60s | Warning | Check for a produce burst or transient downstream slowdown. |
| 1 min – 5 min | Page secondary | Sustained drift. Investigate handler latency and downstream health. |
| > 5 min | Page on-call | Consumer is behind SLA. Start horizontal scaling or investigate rebalance loops. |
| > retention window | Data loss imminent | Records will be deleted before you read them. All-hands incident. |
Note that the absence of any historical lag alert is itself a warning sign. It typically indicates that thresholds are too generous and real regressions are being missed. Lag alerts should be tested regularly by artificially slowing a consumer in staging and confirming that the pages are delivered.
Scaling, Stateful Processing, and Beyond
Horizontal scaling of stateless consumers is Kafka’s straightforward path: additional consumer instances may be added to the same group, and the next rebalance redistributes partitions. With cooperative-sticky assignment, only the partitions that actually move are paused. Scaling up and down can therefore proceed with minimal disruption. The ceiling is the partition count: parallelism cannot exceed the combined partition count of the subscribed topics. Once the ceiling is reached, the only options are to increase partition count (which requires planning; the producer post explains why partition keys and counts are difficult to change later) or to make each consumer faster.
Making each consumer faster usually involves one of three approaches: batching downstream writes, moving heavy work off the polling thread onto a worker pool, or tuning fetch.min.bytes and max.poll.records to trade latency for throughput. For a sink such as a time-series pipeline that lands data in InfluxDB or Iceberg, batched writes are almost always the largest single improvement; flushing 500 records per HTTP round trip rather than one yields a 50–100x throughput gain without modifying Kafka at all.
Stateless consumers cover perhaps 80% of use cases. For the remaining 20%, which require joins, windowed aggregations, sessionization, or any operation that depends on state accumulated across records, a plain consumer is not the appropriate tool. It is technically possible to maintain state in RocksDB or Redis and reconcile it on rebalance, but doing so amounts to reimplementing Kafka Streams less effectively. Apache Flink for complex event processing is a more suitable choice, or Kafka Streams when running on the JVM. Both handle partition-local state, checkpointing, and exactly-once semantics, which are features that should not be implemented manually.
Another common question is whether consumer code needs to be written at all. If the goal is to land Kafka messages in an external system such as Postgres, S3, Elasticsearch, or Snowflake, the first step should be to verify whether Kafka Connect already provides a sink connector. Kafka Connect runs as a separate cluster of workers, handles rebalancing and exactly-once semantics (with compatible sinks), and can replace dozens of hand-written consumers with a few lines of JSON configuration. The break-even point for hand-rolled Python is when the business logic genuinely requires something Connect cannot express, such as custom enrichment, invoking a model, content-based routing, or any downstream dependency Connect cannot represent.
Frequently Asked Questions
Should I use enable.auto.commit=true or manual commits in production?
Manual commits, almost always. Auto-commit is convenient for prototypes and toy examples, but it decouples “offset committed” from “record actually processed,” which means a crash at the wrong moment silently drops records. Set enable.auto.commit=false, process your batch, call store_offsets, and periodically commit. The small amount of extra code is what buys you “no silent data loss.”
What’s the difference between eager and cooperative rebalancing?
Eager rebalancing revokes every partition from every consumer at the start of a rebalance, so the entire group goes idle until the new assignment is computed and applied, this is the classic “stop-the-world” behavior. Cooperative rebalancing (KIP-429, stable since 2.4) only revokes partitions that actually need to move, letting everyone else keep processing. Under cooperative, a normal scale-up from 5 to 6 consumers pauses maybe one partition briefly instead of pausing all five existing consumers completely. Set partition.assignment.strategy=cooperative-sticky for any new deployment.
Can I have more consumers than partitions for more throughput?
No. Extra consumers in the same group beyond the partition count will be idle. Kafka’s parallelism ceiling in a single consumer group is the number of partitions subscribed. If you need more parallel throughput, you have to either increase partition count on the topic or make each consumer do more work per unit time (batching downstream writes usually helps most). You can have extra consumers as hot standbys, but they won’t process anything until someone else dies or leaves.
How do I achieve exactly-once semantics with a Python consumer?
In the strict Kafka-to-Kafka sense, exactly-once in Python requires using the transactional producer API alongside your consumer, with isolation.level=read_committed on downstream consumers. The confluent-kafka-python library supports this, but the surface is narrower and harder to get right than in Java. In practice, most Python consumers achieve “effective” exactly-once by running at-least-once and relying on an idempotent sink: upserting by a natural key, deduping by a hash in a dedupe table, or writing to a store like TimescaleDB that treats duplicate rows as overwrites. For true end-to-end EOS across heterogeneous systems, Flink or Kafka Streams is a better foundation than a hand-rolled Python consumer.
When should I use Kafka Streams or Flink instead of a plain consumer?
Use a stream processing framework when your logic needs state that spans multiple records—joining two streams, computing a 5-minute moving average, sessionizing events into user sessions, deduping with a rolling window, or emitting an alert when pattern X is followed by pattern Y within Z seconds. A plain consumer can do these, but you’ll end up writing your own checkpointing, rebalance-aware state restoration, and failure recovery, and it’ll be worse than the ones those frameworks already ship. Stick with a plain consumer when you’re doing stateless per-record transforms or simple sinks, and reach for Flink or Streams the moment you notice “I wish I had a windowed aggregation here.”
Related Reading
Related Reading
- Apache Kafka as a Multivariate Time-Series Engine—the companion producer post covering Avro schemas, partitioning, and producer configuration.
- Complex Event Processing with Apache Flink,when a plain consumer is not enough and you need stateful stream processing.
- InfluxDB to AWS Iceberg Data Pipeline—a real example of a downstream sink that consumes from Kafka-style pipelines.
- Best Databases for Time-Series Data—choosing the right sink for consumed records.
- Docker Containers Explained,the easiest way to run a local Kafka broker for development.
- Clean Code Principles—keeping a long-running consumer readable and maintainable.
- Python vs Rust—language trade-offs for very high-throughput consumers.
Concluding Observations
The central insight is that a Kafka consumer is not merely a loop that reads messages; it is a small stateful distributed system that happens to call poll(). Almost every notable production failure stems from overlooking this fact. The unhandled rebalance, the offset committed too early, the poison pill that blocked a partition for three hours, the silent lag that consumed the retention window, the heartbeat that stopped firing because the handler was stuck in a synchronous HTTP call — none of these are Kafka bugs. They are consumer-design bugs, and nearly all share the same remedy: manual commits, cooperative rebalancing, an explicit DLQ, a fast handler, and lag alerts that fire before data is lost.
The code presented in this post closely resembles what a real production consumer should look like. The structure — configuration from environment variables, manual commits with store_offsets, cooperative rebalancing, explicit poison-pill versus retriable exceptions, DLQ with header metadata, graceful shutdown on SIGTERM, and rebalance callbacks — is the same whether the consumer is processing server metrics, financial events, user-activity logs, or IoT sensor data. The handler body changes; the scaffolding remains.
For readers arriving from the producer-side material, both halves of the pipeline are now in place: a producer that ships Avro-encoded server metrics with a thoughtful partition key, and a consumer that reads them safely, handles failures without losing data, and scales horizontally without rebalance storms. What follows the consumer’s handler — landing the metrics in a time-series database, aggregating them into windows, feeding them to a FastAPI service that serves real-time dashboards, or piping them into a stream processor — depends on the application. The hardest part, however, the part that causes 3 a.m. pages when handled incorrectly, has been addressed.
References
- Apache Kafka Documentation—Consumer API
- confluent-kafka-python, Python Client Documentation
- KIP-429: Kafka Consumer Incremental Rebalance Protocol
- Confluent—Exactly-Once Semantics in Apache Kafka
- LinkedIn Burrow—Kafka Consumer Lag Monitoring
- Kafka Lag Exporter, Prometheus Metrics for Consumer Lag
Leave a Reply