kafka-consumer-tests
Tests Apache Kafka consumer and producer logic across KafkaJS (Node.js), kafka-go (Go), and Spring Kafka (Java) - spins up a real broker via the Testcontainers Kafka module, asserts offset management and consumer-group rebalance behavior, distinguishes at-least-once from exactly-once (EOS / transactions), validates idempotent producer configuration, and routes unprocessable messages to a dead-letter topic. Use when the user works with Kafka producers or consumers in any language and needs integration or unit tests that exercise delivery semantics, offset commits, rebalance handling, or dead-letter routing.
kafka-consumer-tests
Overview
Apache Kafka is a distributed event-streaming platform. Unlike SQS (visibility-timeout), RabbitMQ (ack/nack), or BullMQ (job-state machine), Kafka uses consumer-owned committed offsets as its delivery primitive - every test must control where the read position is. This skill covers that model: offset management, consumer-group rebalance, at-least-once vs exactly-once (EOS / transactions), idempotent producer, and dead-letter routing across KafkaJS (Node.js), kafka-go (Go), and Spring Kafka (Java).
Step 1 - Spin up Kafka with Testcontainers
Per testcontainers.com/modules/kafka:
Node.js / TypeScript
npm install @testcontainers/kafka --save-devimport { KafkaContainer } from '@testcontainers/kafka';
let container: Awaited<ReturnType<KafkaContainer['start']>>;
beforeAll(async () => {
container = await new KafkaContainer('confluentinc/cp-kafka:7.2.2').start();
});
afterAll(async () => {
await container.stop();
});
function brokers(): string[] {
return [`${container.getHost()}:${container.getMappedPort(9093)}`];
}Per node.testcontainers.org/modules/kafka, the mapped port is 9093; retrieve the host with container.getHost().
Java (Maven)
Per java.testcontainers.org/modules/kafka:
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-kafka</artifactId>
<version>2.0.5</version>
<scope>test</scope>
</dependency>KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0");
kafka.start();
String bootstrapServers = kafka.getBootstrapServers();Go
Per testcontainers.com/modules/kafka:
import "github.com/testcontainers/testcontainers-go/modules/kafka"
kafkaContainer, err := kafka.Run(
context.Background(),
"confluentinc/confluent-local:7.5.0",
)Python
from testcontainers.kafka import KafkaContainer
with KafkaContainer() as kafka:
bootstrap_server = kafka.get_bootstrap_server()Step 2 - Basic producer test (KafkaJS)
Per kafka.js.org/docs/getting-started:
import { Kafka } from 'kafkajs';
it('produces a message to the topic', async () => {
const kafka = new Kafka({ brokers: brokers() });
const producer = kafka.producer();
await producer.connect();
const result = await producer.send({
topic: 'orders',
messages: [{ key: 'order-1', value: JSON.stringify({ amount: 42 }) }],
});
expect(result[0].errorCode).toBe(0);
await producer.disconnect();
});producer.send() returns partition metadata per kjs-gs; errorCode: 0 confirms the broker accepted the record.
Step 3 - Basic consumer test: offset management
Per kafka.js.org/docs/consuming:
Two commit strategies:
| Strategy | KafkaJS config | Risk |
|---|---|---|
| Auto-commit | autoCommit: true (default) | offset committed before processing finishes - can skip on crash |
| Manual commit | autoCommit: false + commitOffsetsIfNecessary() | offset committed only after processing - at-least-once |
it('consumes messages and commits offsets manually', async () => {
const kafka = new Kafka({ brokers: brokers() });
const received: string[] = [];
const consumer = kafka.consumer({ groupId: 'test-group-1' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({
autoCommit: false,
eachBatch: async ({ batch, commitOffsetsIfNecessary, resolveOffset }) => {
for (const msg of batch.messages) {
received.push(msg.value!.toString());
resolveOffset(msg.offset); // mark processed
}
await commitOffsetsIfNecessary(); // commit atomically
},
});
// wait for consumption then assert
await new Promise(r => setTimeout(r, 1000));
expect(received).toContain(JSON.stringify({ amount: 42 }));
await consumer.disconnect();
});fromBeginning: true sets auto.offset.reset to earliest per kjs-consuming: "starts from earliest [offset] when no committed offset exists."
Step 4 - Consumer-group rebalance
Per kafka.js.org/docs/consuming, the three timing parameters that govern rebalance are:
| Parameter | Default | Purpose |
|---|---|---|
sessionTimeout | 30 000 ms | failure-detection window; broker removes member if no heartbeat within this window |
rebalanceTimeout | 60 000 ms | time allowed for members to rejoin after a rebalance is triggered |
heartbeatInterval | 3 000 ms | frequency of liveness pings to the broker |
A rebalance test starts two consumers in the same group, sends messages, stops one consumer, then asserts the survivor processes all remaining messages:
it('surviving consumer processes all messages after rebalance', async () => {
const kafka = new Kafka({ brokers: brokers() });
const groupId = 'rebalance-test-group';
const c1 = kafka.consumer({ groupId, sessionTimeout: 6000, heartbeatInterval: 1000 });
const c2 = kafka.consumer({ groupId, sessionTimeout: 6000, heartbeatInterval: 1000 });
const received: string[] = [];
for (const c of [c1, c2]) {
await c.connect();
await c.subscribe({ topic: 'events', fromBeginning: true });
}
c2.run({ eachMessage: async ({ message }) => { received.push(message.value!.toString()); } });
c1.run({ eachMessage: async ({ message }) => { received.push(message.value!.toString()); } });
await c1.disconnect(); // trigger rebalance: c2 inherits all partitions
// send more messages after rebalance
const producer = kafka.producer();
await producer.connect();
await producer.send({ topic: 'events', messages: [{ value: 'post-rebalance' }] });
await producer.disconnect();
await new Promise(r => setTimeout(r, 3000));
expect(received).toContain('post-rebalance');
await c2.disconnect();
});Lowering sessionTimeout to 6 000 ms speeds up the rebalance in tests; production values are higher to avoid spurious rebalances.
Step 5 - At-least-once vs exactly-once (EOS / transactions)
At-least-once delivery is the default when autoCommit: false and the processor retries on failure - a crash before commitOffsetsIfNecessary() causes the broker to re-deliver the unacknowledged messages.
Exactly-once semantics (EOS) requires the transactional producer. Per kafka.js.org/docs/transactions:
const producer = kafka.producer({
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 1, // required for EOS
idempotent: true, // required for EOS
});
await producer.connect();
const tx = await producer.transaction();
try {
await tx.send({ topic: 'output', messages: [{ value: 'result' }] });
// atomically commit the consumer offset with the produced message
await tx.sendOffsets({ consumerGroupId: 'test-group', topics: [...] });
await tx.commit();
} catch (err) {
await tx.abort();
throw err;
}Per kjs-tx: "The producer must have a max in flight requests of 1" and must use acks: -1 (all replicas). The transactionalId should encode topic and partition (e.g. 'myapp-producer-topic-0') so the broker can fence out duplicate instances.
A consumer must use readUncommitted: false (the default) to exclude in-flight transactional messages per kjs-consuming: "Set readUncommitted: false (default) to exclude uncommitted transactional messages from consumption."
EOS test pattern:
it('exactly-once: aborted transaction produces no visible messages', async () => {
const producer = kafka.producer({
transactionalId: 'eos-test',
maxInFlightRequests: 1,
idempotent: true,
});
await producer.connect();
const tx = await producer.transaction();
await tx.send({ topic: 'eos-topic', messages: [{ value: 'ghost' }] });
await tx.abort(); // consumer should never see 'ghost'
const consumer = kafka.consumer({ groupId: 'eos-verify' });
await consumer.connect();
await consumer.subscribe({ topic: 'eos-topic', fromBeginning: true });
const received: string[] = [];
consumer.run({ eachMessage: async ({ message }) => { received.push(message.value!.toString()); } });
await new Promise(r => setTimeout(r, 1500));
expect(received).not.toContain('ghost');
await consumer.disconnect();
await producer.disconnect();
});Step 6 - Idempotent producer (standalone)
Per kafka.js.org/docs/transactions, idempotent: true can be set without a transactionalId. The broker deduplicates retried produce requests within the same producer session and sequence number:
const producer = kafka.producer({ idempotent: true });This is sufficient to prevent duplicate records from network-level retries. It does not deduplicate across producer restarts - use the transactional API (Step 5) or application-level idempotency (see idempotency-test-author) for cross-restart deduplication.
Step 7 - Dead-letter topic (DLT)
KafkaJS provides no built-in DLT routing per the consuming documentation - the application must catch errors in eachMessage / eachBatch and produce to a <topic>.DLT topic manually:
const consumer = kafka.consumer({ groupId: 'dlt-group' });
const dlqProducer = kafka.producer();
await consumer.connect();
await dlqProducer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
consumer.run({
eachMessage: async ({ topic, message }) => {
try {
await processOrder(JSON.parse(message.value!.toString()));
} catch (err) {
// route poison message to dead-letter topic
await dlqProducer.send({
topic: `${topic}.DLT`,
messages: [{
key: message.key,
value: message.value,
headers: { 'x-error': String(err) },
}],
});
}
},
});Test the DLT route:
it('routes unprocessable message to DLT', async () => {
// arrange: produce a poison message
await producer.send({ topic: 'orders', messages: [{ value: 'NOT-JSON' }] });
await new Promise(r => setTimeout(r, 1500));
// assert: DLT received the record
const dltConsumer = kafka.consumer({ groupId: 'dlt-verifier' });
await dltConsumer.connect();
await dltConsumer.subscribe({ topic: 'orders.DLT', fromBeginning: true });
const dltMessages: string[] = [];
dltConsumer.run({ eachMessage: async ({ message }) => {
dltMessages.push(message.value!.toString());
}});
await new Promise(r => setTimeout(r, 1500));
expect(dltMessages).toContain('NOT-JSON');
await dltConsumer.disconnect();
});Spring Kafka provides @RetryableTopic for declarative DLT routing per docs.spring.io/spring-kafka/reference/testing.html.
Step 8 - Other languages
Java (Spring Kafka + EmbeddedKafka)
Per sk-test, since Kafka 4.0 only EmbeddedKafkaKraftBroker is available (KRaft / no ZooKeeper):
@SpringJUnitConfig
@EmbeddedKafka(partitions = 1, topics = {"orders"},
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class OrderConsumerTest {
@Autowired EmbeddedKafkaBroker embeddedKafka;
@Test
void testTemplate() throws Exception {
Map<String, Object> cp = KafkaTestUtils.consumerProps("tg", "false", embeddedKafka);
cp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // default earliest since 2.5
Consumer<Integer, String> consumer =
new DefaultKafkaConsumerFactory<Integer, String>(cp).createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "orders");
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka)), true);
template.send("orders", "test-order");
ConsumerRecord<Integer, String> received =
KafkaTestUtils.getSingleRecord(consumer, "orders");
assertThat(received).has(value("test-order"));
}
}Go (kafka-go + Testcontainers)
Per pkg.go.dev/github.com/segmentio/kafka-go:
func TestRoundTrip(t *testing.T) {
ctx := context.Background()
kc, _ := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0")
defer kc.Terminate(ctx)
addr := kc.MustConnectionString(ctx)
w := &kafkago.Writer{Addr: kafkago.TCP(addr), Topic: "events",
AllowAutoTopicCreation: true}
w.WriteMessages(ctx, kafkago.Message{Value: []byte("hello")})
w.Close()
r := kafkago.NewReader(kafkago.ReaderConfig{
Brokers: []string{addr}, GroupID: "tg", Topic: "events"})
m, _ := r.FetchMessage(ctx) // FetchMessage = manual commit (at-least-once)
require.Equal(t, "hello", string(m.Value))
r.CommitMessages(ctx, m)
r.Close()
}Per kgo: FetchMessage + CommitMessages is the explicit commit path; ReadMessage auto-commits and is at-most-once on crash.
Step 9 - CI integration
Testcontainers pulls the broker image at runtime; no services: block is needed. Example for Node.js (GitHub Actions):
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with: { node-version: '20' }
- run: npm ci
- run: npx jest --testPathPattern=kafka
env:
TESTCONTAINERS_RYUK_DISABLED: 'true'For Java, replace the Node steps with actions/setup-java and mvn test. For Go, replace with actions/setup-go and go test.
Anti-patterns
| Anti-pattern | Why it fails | Fix |
|---|---|---|
Use ReadMessage (kafka-go) for at-least-once tests | Commits before caller processes; at-most-once on crash | Use FetchMessage + CommitMessages |
Share groupId across parallel tests | Rebalance storms; flaky message allocation | Use unique groupId per test (e.g. append uuid) |
Skip await producer.disconnect() / r.Close() | Goroutine / connection leaks; CI hangs | Close in afterAll / defer |
Assert message count with autoCommit: true | Offset committed before processing; race on crash path | Use autoCommit: false + manual commit for assertion tests |
Test EOS with readUncommitted: true | Consumer sees aborted messages; false-passing test | Keep readUncommitted: false (default) |
Use sessionTimeout > rebalanceTimeout | Rebalance never completes | Ensure sessionTimeout < rebalanceTimeout |