Exactly-once в Kafka – это не магия “событие обработается один раз”, а набор конкретных гарантий, ограничений и эксплуатационных обязательств.
Оглавление
- Введение
- Версии и рамки статьи
- Термины и базовая модель Kafka
- Главная идея
- Где exactly once действительно нужен
- Как Kafka exactly once работает под капотом
- Архитектурный контекст
- Практическая работа в Java / Spring
- Типовые ошибки и анти-паттерны
- Наблюдаемость и диагностика
- Производительность, отказоустойчивость и безопасность
- Сравнение подходов
- Практика внедрения: от MVP до production
- Чек-лист для разработчика
- Что важно запомнить
- Заключение
- Источники и что стоит почитать
Введение
Проблемы с Kafka exactly once обычно начинаются не с падения брокера. Все выглядит куда спокойнее: сервис жив, консьюмер группа активна, lag вроде бы под контролем, но в бизнес-данных появляются странные данные. Один заказ дважды зарезервировал товар. Один платеж дважды попал в книгу учета. Один отчет показал лишнюю операцию. Команда смотрит в логи и видит классическую картину: сообщение обработалось, сервис перезапустился, retry сработал, offset сдвинулся не там, где ожидали.
На бумаге Kafka выглядит просто: producer пишет событие, consumer читает событие, offset показывает позицию. В проде между этими тремя словами живут retry, rebalance, broker failover, сетевые таймауты, частично выполненные операции, внешние API, базы данных и человеческая вера в то, что “Kafka поддерживает exactly once”.
Главная ошибка – понимать exactly once как обещание уровня бизнеса: “действие произойдёт ровно один раз”. Kafka такого обещания сама по себе не даёт. Она даёт гораздо более конкретный механизм:
- идемпотентный producer,
- транзакционный producer,
- атомарную фиксацию записей в Kafka и consumer offsets,
- изоляцию
read_committed, - ограждение старых инстансов продьюсера
- transaction markers в логе.
Эти гарантии сильные, но их границы уже, чем кажется. Документация Kafka прямо формулирует: exactly-once для других целевых-систем обычно требует сотрудничества этих систем, а Kafka предоставляет примитивы, которые помогают это реализовать.
Поэтому статья не про то, как поставить enable.idempotence=true и успокоиться. Она про то, где exactly once действительно работает, где он не работает, как его правильно включать в Java/Spring, какие ошибки чаще всего приводят к ночным инцидентам и какие метрики должны быть на дашбордах до первого прод-релиза.
Версии и рамки статьи
Примеры и рекомендации ниже ориентированы на Apache Kafka 4.2.x, Kafka clients 4.x, Java 21, Spring Boot 3.5/4.0 и Spring for Apache Kafka 3.3/4.0.
Kafka Streams exactly-once v2 требует версию брокера 2.5.0 или новее. Старая первая реализация exactly-once в Kafka Streams была deprecated начиная с Kafka Streams 3.0.
Важное замечание: статья не обещает “глобальный exactly once” для Kafka + PostgreSQL + внешний платежный провайдер + email + HTTP callback. В распределенных системах это почти всегда достигается не одной настройкой, а сочетанием Kafka transactions, идемпотентности, outbox, уникальных ключей, retry-политик, дедупликации и корректной диагностики.
Термины и базовая модель Kafka
Topic (Топик)
Топик – это именованный поток записей. Для разработчика топик важен не как “очередь”, а как контракт между сервисами: формат события, ключ партиционирования, порядок внутри партиции, retention, ACL, схема, правила retry и DLT.
Ошибка начинается там, где топик воспринимают как просто канал передачи JSON. Топик – это часть модели данных.
Partition (партиция)
Партиция – это упорядоченный append-only log внутри топика. Порядок Kafka гарантирует внутри одной партиции, а не глобально по всему топику. Если события одного заказа должны обрабатываться строго последовательно, ключ должен стабильно вести их в одну партицию.
Exactly once не исправляет неправильный partition key (ключ партиционирования). Если OrderCreated, PaymentCaptured и OrderCancelled для одного orderId попали в разные партиции и обрабатываются разными потребителями, у вас уже другая модель консистентности.
Offset (оффсет)
Оффсет – позиция записи в партиции. Consumer offset – это не “сообщение удалено” и не “сообщение обработано”. Это позиция, которую консьюмер группа считает следующей для чтения.
В exactly-once consume-process-produce offset должен фиксироваться атомарно вместе с output-записями. Иначе возможны две классические поломки: output уже записан, offset еще не закоммичен – будет повторная обработка, оффсет закоммичен, output не записан – будет потеря результата.
Producer (Продьюсер)
Продьюсер отправляет записи в Kafka. Важные настройки для надежности:
acksenable.idempotenceretriesmax.in.flight.requests.per.connectiondelivery.timeout.mstransactional.id.
В Kafka 4.2 producer idempotence включен по умолчанию, если нет конфликтующих настроек. Для него нужны:
- acks=all
- retries больше нуля
- max.in.flight.requests.per.connection <= 5.
Consumer (консьюмер)
Консьюмер читает записи из партиции топика. Консьюмер группа распределяет партиции между экземплярами приложения. При ребалансировке “владельцы” партиций меня.тся, поэтому exactly-once не может быть только локальной логикой в коде. Брокер должен понимать, какой продьюсер связан с каким consumer group metadata.
Spring Kafka 3.0+ использует EOSMode.V2, который опирается на fetch-offset-request ограждение и требует версию брокера 2.5 или новее.
Idempotent producer
Idempotent producer (идемпотентный продьюсер) защищает от дублей при повторной отправке записи продьюсером в Kafka. Упрощенно: продьюсер получает producer id, ведет последовательные значения по партиции, а брокер может распознать повторную отправку того же батча. KIP-98 разделяет идемпотентность записи продьюсера и транзакционную атомарность по нескольким TopicPartitions: первое не заменяет второе.
Практический смысл: если продьсюер отправил батч, брокер записал его, но ack потерялся в сети, retry не должен создать второй такой же батч в партиции.
Transactional producer (транзакционный продьюсер)
Транзакционный продьюсер нужен, когда нужно атомарно:
- записать исходящую запись в один или несколько топиков/партиций
- зафиксировать консьюмер оффсеты для входящей записи
- сделать так, чтобы
read_committedпотребители видели только закоммиченные транзакции.
Это не transaction manager для всей вашей системы. Это transaction manager для Kafka-примитивов.
transactional.id
transactional.id – это не id конкретной транзакции. Это стабильная идентичность транзакционный продьюсер. Она нужна, чтобы брокер мог связать сесиию продьюсера, выдать producer epoch и оградить старый продьюсер, который продолжает жить после прерывания сети или перезапуска.
В Spring Kafka при transactionIdPrefix фабрика продьюсеров создает транзационные продьсеры с transactional.id = transactionIdPrefix + n, а для нескольких экземпляров приложения префикс должен быть уникален на экземпляр.
read_committed
Консьюмер с isolation.level=read_committed не возвращает записи прерванных транзакций и не читает дальше Last Stable Offset, пока есть открытая транзакция перед ним. Это важно: долгие транзакции могут визуально увеличивать lag и задерживать чтение последующих записей.
Last Stable Offset
Last Stable Offset, или LSO, – граница, до которой состояние транзакций всех записей уже решено: committed или aborted. Для read_committed потребителя это фактический предел чтения, если впереди есть незавершенная транзакция.
Fencing (ограждение)
Ограждение – механизм защиты от “zombie producer”. Если старый инстанс после прерывания сети продолжает пытаться коммитить транзакции, брокер должен отклонить его операции, потому что новый инстанс уже получил более свежий epoch для того же transactional.id.
Без ограждения exactly-once быстро превращается в “почти exactly once, пока сеть идеальная”.
Главная идея
Kafka exactly once – это атомарность Kafka-операций в сценарии
consume -> process -> produce: исходящие записи и входящие оффсеты фиксируются вместе, ретраи продьюсера не создают дубли, старые producer-инстансы ограждаются, а потребители с read_committed не видят прерванные/открытые транзакционные записи. Но эта гарантия заканчивается на границе Kafka. Как только обработка включает PostgreSQL, HTTP, платежного провайдера, email или файловую систему, корректность надо строить отдельно: через идемпотентность, outbox, уникальные ключи, дедупликацию и понятные режимы отказов.
Где exactly once действительно нужен
Exactly once нужен не везде. Более того, включать транзакции “на всякий случай” – плохая идея: они добавляют latency, оверхед на коррдинацию и новые режимы отказов.
Финансовая книга
Если событие PaymentCaptured создает бухгалтерскую запись, дубль меняет баланс. Здесь нельзя просто сказать “ну переагрегируем потом”. Бухгалтерская книга должна быть построена так, чтобы одно бизнес событие создало одну запись в бухгалтерской книге. Kafka EOS может помочь в Kafka-to-Kafka pipeline, но запись в базу все равно требует уникального event_id и транзакционной модели в БД.
Инвентаризация и бронирование
Если заказ резервирует товар, повторная обработка может дважды уменьшить доступный запас. Здесь exactly once часто пытаются использовать неправильно: Kafka transaction не откатит изменение в PostgreSQL. Правильнее строить команду/событие как идемпотентную операцию: reservationId, unique constraint, compare-and-set, версионирование.
Выставление счетов и формирование счетов-фактур
Если событие измерения использования дважды попадет в счет, клиент получит неверный счет. При этом пайплайн часто состоит из нескольких Kafka топиков:
- raw usage
- normalized usage
- billable events
- invoice candidates.
Внутри Kafka Streams exactly-once v2 подходит хорошо, если хранилища состояний, оффесты и топик вывода участвуют в одной модели.
Антифрод и управление рисками
Дубль может привести не только к лишней записи, но и к лишнему действию: повторно заблокировать карту, повторно отправить кейс на ручную обработку, повторно увеличить уровень риска. Здесь важна не только доставка, но и семантика команды: одно решение должно иметь стабильный decisionId.
Что не требует exactly once
Логи, clickstream, технический аудит событий, телеметрия, debug-события, low-value analytics часто нормально живут с at-least-once и дедупликацией на стороне аналитики. Там важнее пропускная способность, простота и наблюдаемость.
Как Kafka exactly once работает под капотом
Простая модель
В обычном at-least-once пайпалйне потребления есть три независимых действия:
- Прочитать входящую запись.
- Записать исходящую запись.
- Закоммитить входящий оффест.
Если сервис упал между пунктами 2 и 3, вывод уже есть, оффсет еще старый. После рестарта записи будут прочитаны заново. Это обработка дубликатов.
Если оффсет закоммичен до вывода, а сервис упал после коммита оффсета, вывод потерян. Это потеря данных.
Транзакции Kafka решают именно эту связку: исходящая запись и оффсет должны стать видимыми как единый результат.
Что делает идемпотентный продьюсер
Идемпотентный продьюсер защищает запись в Kafka от дублей на retry. Например, продьюсер отправил батч в партицию. Брокер записал batch, но ack потерялся. Producer retry’ит. Без идемпотентности брокер может записать batch повторно. С идемпотентностью брокер видит, что это уже принятая sequence, и не создает второй record batch.
Это сильная гарантия, но она не делает несколько партиций атомарными. Для атомарной записи в несколько партиций/топиков нужны транзакции. KIP-98 прямо разделяет эти уровни: идемпотентный продьюсер не дает гаратний для нескольких TopicPartitions. Для этого нужны транзакционные гарантии.
Что делает transaction coordinator
Transaction coordinator – компонент внутри Kafka брокер. Он управляет transaction state для transactional.id. Transaction log – внутренний Kafka топик, где хранится состояние transactions. Каждый transactional.id мапится к coordinator через transaction log партиции.
Упрощённый lifecycle:
- Producer вызывает
initTransactions(). - Coordinator связывает
transactional.idс producer id и epoch. - Старые producer sessions для этого
transactional.idfenced. - Producer вызывает
beginTransaction(). - Producer пишет records в исходящую партицию.
- Producer вызывает
sendOffsetsToTransaction(). - Producer вызывает
commitTransaction()илиabortTransaction(). - Coordinator пишет transaction markers в партиции.
read_committedпотребители видят только committed data.
Что такое transaction markers
Transaction markers – служебные записи в Kafka log, которые показывают, committed или aborted была transaction. Они не возвращаются приложению как обычные records, но брокер использует их, чтобы read_committed потребитель не видел прерванные записи и записи из еще открытых транзакций.
Почему read_committed может увеличить видимый lag
Если потребитель читает в режиме read_committed, он не может читать дальше LSO. Если перед ним open transaction, даже records после нее могут быть временно недоступны. Поэтому длинная Kafka transaction в одном producer может создать задержку для downstream потребитель, хотя брокер жив, CPU нормальный, а сеть не падает. Это один из самых неприятных production-эффектов: проблема выглядит как консьюмер лаг, но причина – жизненный цикл транзакции.
Таблица механизмов
| Механизм | Что делает | Почему важен | Что ломается при неправильном понимании |
|---|---|---|---|
enable.idempotence | Убирает producer duplicates при retry | Защищает от повторной записи batch | Команда думает, что это уже «глобальный exactly once» |
transactional.id | Даёт producer identity между sessions | Позволяет оградить zombie producers | Один профикс на все pods приводит к ограждению; случайные IDs ухудшают управление |
| Producer epoch | Версия producer для transactional.id | Старый producer не может продолжать работу | Zombie инстансы могут писать, если ограждение не работает |
| Transaction coordinator | Координирует transaction state | Делает commit/abort видимым для партиций | Coordinator latency становится частью end-to-end latency |
| Transaction markers | Помечают commit/abort в логе | read_committed фильтрует прерванные транзакции | Downstream видит грязные данные при read_uncommitted |
sendOffsetsToTransaction | Добавляет offsets в Kafka transaction | Output и input position фиксируются атомарно | Offset commit отдельно от output создаёт duplicates/loss |
read_committed | Читает только зафиксированные записи транзакций | Не видит прерванные/открытые транзакции | Долгие открытые транзакции задерживают чтение до LSO |
Архитектурный контекст

Внутри Kafka pipeline orders.validated.v1 --> inventory.reserved.v1 можно построить настоящий Kafka exactly-once: input offsets и output records фиксируются одной Kafka transaction.
Но Ledger Service работает с PostgreSQL. Kafka transaction не делает PostgreSQL частью Kafka commit. Поэтому здесь правильная модель другая: БД фиксирует ledger entry и outbox event в одной SQL transaction, а публикация outbox в Kafka допускает retry и дубли, которые downstream обязан дедуплицировать по eventId.
Внешний Payment Provider ещё сложнее. Его нельзя rollback’нуть через Kafka. Единственный нормальный путь — idempotency key на стороне провайдера, собственное хранилище попыток, reconciliation и отдельный runbook.

Latency накапливается не только в обработке user code. Она есть в батчинге продьюсера, RPC к координатору транзакции, записи исходящего батча, записи оффсетов, маркер фиксации и реакции downstream потребителя на LSO.
Повторная попытка допустима, но она должен быть встроен в модель транзакции. Если приложение бросает исключение до фиксации, транзакция прерывается, оффсет не фиксируется, записи читаются повторно. Если пользовательский код сделал необратимый HTTP-вызов внутри транзакции, Kafka уже не спасет: HTTP-вызов мог произойти, а Kafka транзакция потом прервалась.
Практическая работа в Java / Spring
Базовая production-конфигурация Spring Boot
spring:
application:
name: inventory-worker
kafka:
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
consumer:
group-id: inventory-reservation-v1
enable-auto-commit: false
isolation-level: read_committed
auto-offset-reset: earliest
properties:
max.poll.records: 100
max.poll.interval.ms: 300000
producer:
transaction-id-prefix: ${POD_NAME:${HOSTNAME:${random.uuid}}}-inventory-tx-
acks: all
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
delivery.timeout.ms: 120000
request.timeout.ms: 30000
linger.ms: 10
compression.type: zstd
properties:
client.id: ${spring.application.name}-${POD_NAME:${HOSTNAME:local}}
Если задан spring.kafka.producer.transaction-id-prefix, Spring Boot автоматически настраивает KafkaTransactionManager; если такой bean есть, он ассоциируется с listener container factory.
Здесь важны не сами значения, а принципы:
enable-auto-commit=false, потому что offset commit должен контролироваться transaction boundaryisolation-level=read_committed, чтобы downstream не видел прерванные транзакцииtransaction-id-prefixуникален для экземпляраacks=all, idempotence иmax.in.flight <= 5не конфликтуют с требованиями producer idempotence.
Правильный Kafka-to-Kafka обработчик в Spring
package com.acme.inventory;
import java.time.Instant;
import java.util.UUID;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Component
public class InventoryReservationListener {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ReservationDecisionService decisionService;
public InventoryReservationListener(
KafkaTemplate<String, Object> kafkaTemplate,
ReservationDecisionService decisionService
) {
this.kafkaTemplate = kafkaTemplate;
this.decisionService = decisionService;
}
@KafkaListener(
topics = "orders.validated.v1",
groupId = "inventory-reservation-v1"
)
public void handle(
OrderValidatedEvent event,
@Header(KafkaHeaders.RECEIVED_KEY) String orderId,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
ReservationDecision decision = decisionService.decide(event);
InventoryReservedEvent output = new InventoryReservedEvent(
UUID.randomUUID(),
event.orderId(),
decision.status(),
decision.items(),
Instant.now()
);
kafkaTemplate.send("inventory.reserved.v1", orderId, output);
}
}
Этот пример корректен только при важном условии: decisionService.decide() не делает необратимых внешних side effects. Он может вычислять решение из уже прочитанных данных или работать с локальным state store в Kafka Streams-модели, но если он пишет в PostgreSQL или вызывает HTTP, это уже другая консистентность.
При настроенном KafkaTransactionManager listener container начинает Kafka transaction до вызова listener. KafkaTemplate.send() участвует в этой transaction, а offsets отправляются в transaction перед commit. Spring Kafka описывает эту модель как exactly-once для read --> process --> write, при этом сам read/process остаётся at-least-once, а атомарность достигается на уровне результата sequence.
Неправильный пример: Kafka transaction вокруг внешнего платежа
@Component
public class BadPaymentListener {
private final PaymentProviderClient paymentProviderClient;
private final KafkaTemplate<String, Object> kafkaTemplate;
public BadPaymentListener(
PaymentProviderClient paymentProviderClient,
KafkaTemplate<String, Object> kafkaTemplate
) {
this.paymentProviderClient = paymentProviderClient;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(topics = "payments.requested.v1", groupId = "payment-worker-v1")
public void handle(PaymentRequestedEvent event) {
PaymentCaptureResult result = paymentProviderClient.capture(
event.paymentId(),
event.amount()
);
kafkaTemplate.send(
"payments.captured.v1",
event.paymentId().toString(),
new PaymentCapturedEvent(event.paymentId(), result.providerOperationId())
);
}
}
На первый взгляд код нормальный. На деле он опасен.
Если provider capture прошёл, а Kafka transaction потом abort’илась, событие payments.captured.v1 не появится, offset не зафиксируется, input будет прочитан заново. Второй вызов capture() может повторить списание, если у провайдера нет idempotency key.
Kafka exactly-once не делает внешний HTTP-вызов exactly-once.
Исправление: idempotency key для внешнего провайдера
@Component
public class PaymentListener {
private final PaymentAttemptRepository attemptRepository;
private final PaymentProviderClient paymentProviderClient;
private final KafkaTemplate<String, Object> kafkaTemplate;
public PaymentListener(
PaymentAttemptRepository attemptRepository,
PaymentProviderClient paymentProviderClient,
KafkaTemplate<String, Object> kafkaTemplate
) {
this.attemptRepository = attemptRepository;
this.paymentProviderClient = paymentProviderClient;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(topics = "payments.requested.v1", groupId = "payment-worker-v1")
public void handle(PaymentRequestedEvent event) {
PaymentAttempt attempt = attemptRepository.findOrCreate(
event.paymentId(),
event.requestId()
);
PaymentCaptureResult result = paymentProviderClient.capture(
event.paymentId(),
event.amount(),
attempt.idempotencyKey()
);
attempt.markCaptured(result.providerOperationId());
attemptRepository.save(attempt);
kafkaTemplate.send(
"payments.captured.v1",
event.paymentId().toString(),
new PaymentCapturedEvent(
event.paymentId(),
event.requestId(),
result.providerOperationId()
)
);
}
}
Это всё ещё не «чистый Kafka EOS», потому что есть БД и внешний provider. Но теперь повторная обработка Kafka record не обязана повторять бизнес-эффект. Provider видит тот же idempotency key, а локальная БД хранит состояние попытки.
Чистая Java: transactional consume-process-produce
package com.acme.kafka;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
public class TransactionalOrderCopier {
public static void main(String[] args) {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-copy-v1");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-copy-instance-1");
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
try (
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)
) {
consumer.subscribe(java.util.List.of("orders.validated.v1"));
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
if (records.isEmpty()) {
continue;
}
try {
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
String transformed = transform(record.value());
producer.send(new ProducerRecord<>(
"orders.normalized.v1",
record.key(),
transformed
));
}
producer.sendOffsetsToTransaction(
offsetsFor(records),
consumer.groupMetadata()
);
producer.commitTransaction();
} catch (ProducerFencedException fenced) {
throw fenced;
} catch (KafkaException failure) {
producer.abortTransaction();
}
}
}
}
private static String transform(String value) {
return value.trim();
}
private static Map<TopicPartition, OffsetAndMetadata> offsetsFor(
ConsumerRecords<String, String> records
) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
var partitionRecords = records.records(partition);
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
return offsets;
}
}
Обратите внимание: consumer.commitSync() здесь не вызывается. Offset отправляется в transaction через sendOffsetsToTransaction().
DB + Kafka: production-friendly outbox
Когда обработка меняет PostgreSQL и должна породить Kafka event, чаще всего правильнее не пытаться сделать «Kafka transaction + DB transaction = distributed transaction». Лучше использовать transactional outbox.
@Entity
@Table(
name = "ledger_entry",
uniqueConstraints = {
@UniqueConstraint(
name = "uk_ledger_entry_source_event",
columnNames = "source_event_id"
)
}
)
public class LedgerEntry {
@Id
private UUID id;
@Column(name = "source_event_id", nullable = false)
private UUID sourceEventId;
@Column(name = "account_id", nullable = false)
private UUID accountId;
@Column(name = "amount_cents", nullable = false)
private long amountCents;
protected LedgerEntry() {
}
public LedgerEntry(UUID id, UUID sourceEventId, UUID accountId, long amountCents) {
this.id = id;
this.sourceEventId = sourceEventId;
this.accountId = accountId;
this.amountCents = amountCents;
}
}
@Entity
@Table(
name = "outbox_event",
indexes = {
@Index(name = "ix_outbox_status_created_at", columnList = "status,created_at")
}
)
public class OutboxEvent {
@Id
private UUID id;
@Column(nullable = false)
private String topic;
@Column(name = "event_key", nullable = false)
private String key;
@Column(nullable = false, columnDefinition = "jsonb")
private String payload;
@Column(nullable = false)
private String status;
@Column(name = "created_at", nullable = false)
private Instant createdAt;
protected OutboxEvent() {
}
public static OutboxEvent newKafkaEvent(String topic, String key, String payload) {
OutboxEvent event = new OutboxEvent();
event.id = UUID.randomUUID();
event.topic = topic;
event.key = key;
event.payload = payload;
event.status = "NEW";
event.createdAt = Instant.now();
return event;
}
public void markSent() {
this.status = "SENT";
}
}
@Service
public class LedgerProjectionService {
private final LedgerEntryRepository ledgerRepository;
private final OutboxRepository outboxRepository;
private final ObjectMapper objectMapper;
public LedgerProjectionService(
LedgerEntryRepository ledgerRepository,
OutboxRepository outboxRepository,
ObjectMapper objectMapper
) {
this.ledgerRepository = ledgerRepository;
this.outboxRepository = outboxRepository;
this.objectMapper = objectMapper;
}
@Transactional
public void apply(PaymentCapturedEvent event) throws JsonProcessingException {
if (ledgerRepository.existsBySourceEventId(event.eventId())) {
return;
}
LedgerEntry entry = new LedgerEntry(
UUID.randomUUID(),
event.eventId(),
event.accountId(),
event.amountCents()
);
ledgerRepository.save(entry);
LedgerEntryCreatedEvent output = new LedgerEntryCreatedEvent(
UUID.randomUUID(),
event.eventId(),
event.accountId(),
event.amountCents(),
Instant.now()
);
outboxRepository.save(OutboxEvent.newKafkaEvent(
"ledger.entries.created.v1",
event.accountId().toString(),
objectMapper.writeValueAsString(output)
));
}
}
В этой модели SQL transaction фиксирует ledger и outbox вместе. Если сервис упал после DB commit, outbox event останется в статусе NEW. Relay позже опубликует его в Kafka. Если relay опубликовал event, но не успел отметить SENT, event может быть опубликован повторно. Поэтому потребители должны дедуплицировать по eventId.
Это честная модель: мы не притворяемся, что PostgreSQL стал частью Kafka transaction. Мы явно проектируем возможные дубли и не допускаем потери события.
Outbox relay
@Component
public class OutboxRelay {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final TransactionTemplate transactionTemplate;
public OutboxRelay(
OutboxRepository outboxRepository,
KafkaTemplate<String, String> kafkaTemplate,
TransactionTemplate transactionTemplate
) {
this.outboxRepository = outboxRepository;
this.kafkaTemplate = kafkaTemplate;
this.transactionTemplate = transactionTemplate;
}
@Scheduled(fixedDelayString = "${outbox.relay.fixed-delay:1000}")
public void publishBatch() {
List<OutboxEvent> batch = transactionTemplate.execute(status ->
outboxRepository.lockNextNewEvents(100)
);
if (batch == null || batch.isEmpty()) {
return;
}
kafkaTemplate.executeInTransaction(operations -> {
for (OutboxEvent event : batch) {
operations.send(event.topic(), event.key(), event.payload());
}
return null;
});
transactionTemplate.executeWithoutResult(status -> {
for (OutboxEvent event : batch) {
event.markSent();
}
outboxRepository.saveAll(batch);
});
}
}
Repository-запрос для PostgreSQL:
public interface OutboxRepository extends JpaRepository<OutboxEvent, UUID> {
@Query(value = """
select *
from outbox_event
where status = 'NEW'
order by created_at
for update skip locked
limit :limit
""", nativeQuery = true)
List<OutboxEvent> lockNextNewEvents(@Param("limit") int limit);
}
Outbox relay обычно at-least-once. Это нормально, если event id стабилен, downstream идемпотентен, а runbook объясняет, как переигрывать stuck events.
Интеграционный тест с Testcontainers
@Testcontainers
@SpringBootTest
class InventoryReservationEosIT {
@Container
static KafkaContainer kafka =
new KafkaContainer("apache/kafka-native:3.8.0");
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
registry.add("spring.kafka.consumer.isolation-level", () -> "read_committed");
registry.add("spring.kafka.consumer.enable-auto-commit", () -> "false");
registry.add("spring.kafka.producer.transaction-id-prefix",
() -> "test-" + UUID.randomUUID() + "-tx-");
}
@Autowired
KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
TestReservationConsumer reservationConsumer;
@Test
void shouldPublishSingleReservationEvent() throws Exception {
String orderId = "order-42";
kafkaTemplate.send(
"orders.validated.v1",
orderId,
new OrderValidatedEvent(orderId, List.of("sku-1", "sku-2"))
).get(10, TimeUnit.SECONDS);
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
List<InventoryReservedEvent> events =
reservationConsumer.receivedFor(orderId);
assertThat(events).hasSize(1);
assertThat(events.getFirst().orderId()).isEqualTo(orderId);
});
}
}
Testcontainers Kafka module в актуальной документации рекомендует org.testcontainers.kafka.KafkaContainer для Apache Kafka images; старый org.testcontainers.containers.KafkaContainer deprecated.
В таком тесте важно не просто проверить happy path. Нужны сценарии:
- listener бросает exception до send;
- listener бросает exception после send, но до commit transaction;
- перезапуск потребителя между poll и commit;
- duplicate input event;
- downstream потребитель читает с
read_committed.
Типовые ошибки и анти-паттерны
Ошибка 1. Считать Kafka exactly once глобальной бизнес-гарантией
Как выглядит:
"Мы включили Kafka transactions, значит платёж не спишется дважды."
Почему это плохо:
Kafka transaction не управляет внешним HTTP provider, PostgreSQL commit, email gateway или S3 upload.
К чему приводит в production:
Повторные списания, потерянные события, расхождение ledger и payment provider.
Как диагностировать:
Сравнивайте eventId, provider operation id, ledger entry id и Kafka offsets. Если один Kafka record породил несколько provider operations, проблема не в брокере.
Как исправить:
Использовать idempotency key у внешнего provider, хранить attempts, делать reconciliation, применять outbox для DB –> Kafka.
Ошибка 2. Offset commit отдельно от output publish
Как выглядит:
kafkaTemplate.send("output.topic", key, event);
acknowledgment.acknowledge();
Почему это плохо:
Send и offset commit не атомарны. Между ними может быть crash.
К чему приводит в production:
Дубли output или потеря результата.
Как диагностировать:
Ищите records, где input offset уже committed, но output event отсутствует; или наоборот — output есть, а input offset был переобработан.
Как исправить:
Для Kafka-to-Kafka использовать Kafka transaction и sendOffsetsToTransaction. В Spring – контейнер листнера транзакций
.
Ошибка 3. Потребитель читает транзакционные топики с read_uncommitted
Как выглядит:
spring:
kafka:
consumer:
isolation-level: read_uncommitted
Почему это плохо:
Потребитель может увидеть записи из прерванной транзакции.
К чему приводит в production:
Downstream обрабатывает данные, которые producer потом abort’нул.
Как диагностировать:
Сравнить поведение kafka-console-consumer с --isolation-level read_committed и read_uncommitted.
Как исправить:
Для топиков, где пишут транзакционные продьюсеры, downstream потребители должны использовать read_committed.
Ошибка 4. Один transaction-id-prefix на все pods
Как выглядит:
spring.kafka.producer.transaction-id-prefix: inventory-tx-
и это значение одинаковое во всех replicas.
Почему это плохо:
Транзакционные продьюсеры в разных инстансах начинают ограждать друг друга.
К чему приводит в проде:
ProducerFencedException, внезапные прерывания, нестабильная обработка.
Как диагностировать:
Искать ProducerFencedException, рост частотs прерываний, корреляция с развертыванием или масштабированием.
Как исправить:
Добавлять pod/инстанс identity в prefix: inventory-${POD_NAME}-tx-. Для Kubernetes это обычно StatefulSet ordinal или pod name.
Ошибка 5. Длинные внешние вызовы внутри Kafka transaction
Как выглядит:
@KafkaListener(...)
public void handle(Event event) {
kafkaTemplate.send("step.started", event.id(), event);
slowHttpClient.call(event);
kafkaTemplate.send("step.finished", event.id(), event);
}
Почему это плохо:
Transaction остается open, LSO не двигается, read_committed потребители могут ждать.
К чему приводит в production:
Видимый lag downstream, рост transaction duration, timeouts, aborts.
Как диагностировать:
Смотреть transaction commit/abort latency, консьюмер лаг, latency запроса, трейсы внешнего HTTP.
Как исправить:
Не держать Kafka transaction вокруг долгих побочных эффектов. Делить пайплайн на команды, outbox, идемпотентные внешние вызовы и отдельное событие результата.
Ошибка 6. Transaction per message без необходимости
Как выглядит:
Каждая запись обрабатывается отдельной Kafka транзакцией даже при высокой нагрузке.
Почему это плохо:
Каждая transaction требует coordination, markers и state updates. Transaction overhead не бесплатный: при commit/abort coordinator пишет markers в участвующие партиции и обновляет transaction log.
К чему приводит в production:
Падение throughput, рост latency, нагрузка на transaction coordinator.
Как диагностировать:
Сравнить количество записей в транзакцииr, latency коммита, частоту запросов продьюсера, метрики запроса брокера.
Как исправить:
Batch records внутри разумной transaction boundary. Размер batch подбирать нагрузочным тестированием, не по вкусу.
Ошибка 7. Смешать container transactions и non-blocking retries в Spring Kafka
Как выглядит:
Команда включает контейнер слушателя транзакций и retry топики, ожидая, что все будет атомарно.
Почему это плохо:
Spring Kafka документация прямо указывает, что non-blocking retries не комбинируются с container transactions.
К чему приводит в production:
Неожиданное поведение offset commit, retry топик, rollback и DLT.
Как диагностировать:
Проверить конфигурацию listener container, error handler, retry топик configuration.
Как исправить:
Для транзакционной обработки использовать блокирующий процесс повторной попытки/после отката либо явно проектировать retry топик как отдельный этап без иллюзии общей транзакции.
Ошибка 8. Игнорировать ProducerFencedException
Как выглядит:
catch (Exception e) {
log.warn("Kafka error", e);
}
Почему это плохо:
Ограждение означает, что этот producer больше не имеет права продолжать transaction. Это не обычный transient error.
К чему приводит в production:
Сервис продолжает работать в некорректном состоянии, records зависают, processing ломается.
Как диагностировать:
Логи ProducerFencedException, прерывание транзакций, события перезапуска, дубликаты экземпляров с одинаковым идентификатором транзакции.
Как исправить:
При ограждении закрывать проьюсер и завершать инстанс. В Kubernetes – дать pod рестартовать.
Ошибка 9. Верить, что @Transactional делает Kafka + DB атомарными
Как выглядит:
@Transactional
public void process(Event event) {
repository.save(entity);
kafkaTemplate.send("topic", key, output);
}
Почему это плохо:
Spring может синхронизировать Kafka transaction с DB transaction, но это не полноценный distributed 2PC. Spring Kafka документация отдельно предупреждает о сценариях, где commit synchronized transaction может упасть после primary transaction, и приложение должно компенсировать последствия.
К чему приводит в production:
DB commit есть, Kafka event нет; или Kafka event есть, DB mark не обновился.
Как диагностировать:
Сравнивать таблицы бизнес-состояния и Kafka output по event id; искать commit failures после DB commit.
Как исправить:
Использовать outbox/inbox, уникальные ключи, reconciliation jobs.
Ошибка 10. Мониторить только consumer lag
Как выглядит:
Dashboard показывает records-lag-max, и команда считает этого достаточно.
Почему это плохо:
Lag – симптом, не причина. Причина может быть в открытой транзакции, “rebalance storm”, истощение буффера продьюсера, ACL ошибки.
К чему приводит в production:
Команда масштабирует потребителей, хотя проблема в latency транзакций продьюсера или ISR брокера.
Как диагностировать:
Смотреть метрики транзакций производителя, метрики ребаланса потребителя, недостаточное количество реплик в разделах брокера, сбои при выполнении запросов на создание/получение данных..
Как исправить:
Дашборд должен связывать лаг с продьюсером, потребителем, брокеров и бизнес метриками.
Наблюдаемость и диагностика
Метрики
| Метрика | Где взять | Что означает | Когда тревожиться |
|---|---|---|---|
records-lag-max | Kafka consumer JMX | Максимальный lag потребителя по партициям | Растёт дольше бизнес-SLO |
records-consumed-rate | Kafka consumer JMX | Скорость потребления | Падает при стабильном input rate |
last-poll-seconds-ago | Kafka consumer JMX | Сколько секунд с последнего poll() | Растёт к max.poll.interval.ms |
rebalance-total, failed-rebalance-total | Consumer coordinator metrics | Стабильность консьюмер группы | Растут во время нормальной нагрузки |
record-error-rate | Producer metrics | Ошибки producer sends | Любой ненулевой рост требует анализа |
record-retry-rate | Producer metrics | Retry producer records | Рост вместе с latency |
request-latency-avg/max | Producer/consumer metrics | Latency запросов к брокеру | Рост после раскатки или аварийное переключение брокера |
bufferpool-wait-time, waiting-threads | Producer metrics | Producer ждёт buffer memory | Признак saturation producer |
txn-commit-time-ns-total, txn-abort-time-ns-total | Producer metrics | Время commit/abort transactions | Рост latency или abort storm |
UnderReplicatedPartitions, UnderMinIsrPartitionCount | Broker JMX | Репликация и ISR | В production нормальное значение — 0 |
Kafka клиенты и брокеры экспонируют метрики через JMX; официальная документация Kafka также подчеркивает, что remote JMX по умолчанию выключен и при включении в production должен быть защищен.
Метрики потребителя вроде records-lag-max, last-poll-seconds-ago, fetch-latency, rebalance metrics описаны в документации Confluent по Kafka client JMX metrics.
Producer transaction metrics вроде txn-commit-time-ns-total, txn-abort-time-ns-total, txn-send-offsets-time-ns-total, а также record-error-rate, record-retry-rate, bufferpool-wait-time полезны для диагностики EOS-пайплайнов.
Prometheus alert examples
Имена метрик зависят от JMX exporter и naming rules. Ниже — пример логики, а не универсальные имена.
groups:
- name: kafka-eos
rules:
- alert: KafkaProducerRecordErrors
expr: increase(kafka_producer_record_error_total{client_id=~"inventory-worker.*"}[5m]) > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka producer sends are failing"
description: "Check broker errors, ACLs, ISR, transaction state and recent rollout."
- alert: KafkaConsumerLagGrowing
expr: max_over_time(kafka_consumer_records_lag_max{client_id=~"inventory-worker.*"}[10m]) > 10000
for: 10m
labels:
severity: critical
annotations:
summary: "Consumer lag is growing"
description: "Correlate with transaction commit latency, rebalances and broker health."
- alert: KafkaTransactionAbortSpike
expr: increase(kafka_producer_txn_abort_time_ns_total{client_id=~"inventory-worker.*"}[10m]) > 0
for: 10m
labels:
severity: warning
annotations:
summary: "Kafka transaction abort activity detected"
description: "Check application exceptions, fencing, transaction timeout and downstream broker health."
Логи
Для транзакционных потребителей логируйте:
eventIdtopicpartitionoffsetconsumerGrouptransactionalIdили хотя бы instance id- business key:
orderId,paymentId,accountId - результат обработки
- причину abort
- retry attempt
traceId.
Не логируйте наполнение целиком, если там PII, платежные данные или большие JSON. Не включайте постоянный DEBUG Kafka clients в production без плана: объем логов может сам стать инцидентом.
Трейсы
OpenTelemetry span полезен не на каждый low-level poll, а на business processing:
Kafka message processing
attributes:
messaging.system = kafka
messaging.destination.name = orders.validated.v1
messaging.kafka.partition = 3
messaging.kafka.message.offset = 928381
app.order_id = order-42
app.event_id = 2f4f...
Для внешних calls внутри pipeline trace должен показать, что latency ушла не в Kafka, а в HTTP provider или DB. Если trace показывает долгий HTTP внутри Kafka transaction — это архитектурный запах.
Диагностические команды
# Посмотреть lag и assignment consumer group
kafka-consumer-groups.sh \
--bootstrap-server kafka-1:9092 \
--describe \
--group inventory-reservation-v1
# Прочитать топик только с зафиксированными транзакционными записями
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic inventory.reserved.v1 \
--from-beginning \
--isolation-level read_committed
# Сравнить с read_uncommitted при расследовании прерванных/открытых записей
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic inventory.reserved.v1 \
--from-beginning \
--isolation-level read_uncommitted
# Проверить топик configuration
kafka-topics.sh \
--bootstrap-server kafka-1:9092 \
--describe \
--topic inventory.reserved.v1
# Проверить ACL на transactional id prefix
kafka-acls.sh \
--bootstrap-server kafka-1:9092 \
--list \
--transactional-id inventory-
# Проверить, что principal имеет producer/idempotent privileges
kafka-acls.sh \
--bootstrap-server kafka-1:9092 \
--list \
--principal User:inventory-worker
Для транзакционного продьюсера нужны права не только на топик. Модель Kafka ACL включает:
- resource type
TransactionalId - transactional producer с
transactional.id - требует
WriteнаTransactionalId, - idempotent produce требует
IdempotentWriteнаCluster, - обычный produce –
WriteнаTopic.
Производительность, отказоустойчивость и безопасность
Latency
Kafka transactions добавляют координацию. Это не значит, что они медленные «всегда». Это значит, что latency теперь включает:
- батчинг продьюсера
- запросы к координатору транзакций
- запись состояния транзакции
- запись маркеров транзакции
- фиксация оффсета внутри транзакции
- ожидание LSO downstream потребителей.
Если SLA ендпоинта 50 ms, Kafka транзакция вокруг нескольких топиков и партициях может быть плохой идеей. Если это асинхронный пайплайн с SLO в секунды, цена может быть приемлемой.
Throughput (пропускная способность)
Главный способ не убить пропускную способность – не делать транзакции за мелкие операции без необходимости. Группируйте записи в транзакцию, но не держите транзакцию открытой слишком долго.
Компромисс:
- маленькая транзакция – ниже latency, выше накладные расходы на запись
- большая транзакция – выше пропускная способность, но выше задержка видимости и риск прерывания большого батча.
Backpressure
Producer может начать блокироваться на buffer memory. Это проявится через bufferpool-wait-time и waiting-threads. Потребитель может не успевать из-за медленной обработки, тогда last-poll-seconds-ago и records-lag-max растут.
Backpressure должен быть явным: ограничение concurrency, batch size, max.poll.records, circuit breaker для внешних вызовов, bounded executor queues.
Повторные попытки
Повторные попытки внутри Kafka продьюсера нормалбны, если продьюсер идемпотентен. Повторные попытки бизнес операций – отдельный вопрос.
Не выполняйте повторные попытки бесконечно внутри транзакции. Если обработка не может завершиться быстро и предсказуемо, лучше прерывание, роллбек оффсета и дать контроллируемую повторную попытку через обработчик ошибок или retry топик, если транзакционная модель это допускает.
Failover
При rперезапуске транзакционного продьюсера вызывает initTransactions(), брокеры завершает ожидание для того же transactional.id и ограждает старого потребителя. Это причина, почему transactional identity должна быть спроектирована, а не генерироваться как попало.
Security
Для EOS нужны ACL не только на input/output топики и консьюмер группа, но и на TransactionalId. В shared Kafka cluster это часто забывают: приложение читает топик, пишет топик, но падает на transaction coordinator authorization.
Также не включайте remote JMX без authentication и network restrictions. Kafka прямо предупреждает, что production JMX требует security configuration.
Trade-off таблица
| Решение | Плюсы | Минусы | Когда выбирать | Когда не выбирать |
|---|---|---|---|---|
| Idempotent producer only | Просто, защищает producer retry duplicates | Нет атомарности offsets + output | Producer-only запись в один/несколько топиков без consume-process-produce | Нужна атомарность input offsets |
| Kafka transactions | Сильная Kafka-to-Kafka гарантия | Latency, coordinator overhead, сложнее ошибки | Stream processing, read-process-write | Внешние side effects без идемпотентности |
| Kafka Streams EOS v2 | Хорошая интеграция state stores/output/offsets | Требует Streams model | Stateful transformations, aggregations | Императивный service с DB/HTTP side effects |
| Transactional outbox | Хорошо для DB -> Kafka | At-least-once publish, нужна дедупликация | Бизнес-состояние в PostgreSQL | Когда нужен low-latency pure Kafka pipeline |
| Idempotent consumer | Простая защита от дублей | Нужно хранить processed ids | Внешние побочные эффекты , исходящие потребители | Когда event volume огромный и нет retention для dedupe |
| Distributed 2PC | Теоретически атомарно | Сложно, хрупко, редко поддерживается | Очень ограниченные enterprise-сценарии | Большинство микросервисов |
Сравнение подходов
| Подход | Где силён | Где слаб | Риски | Типичный production-сценарий |
|---|---|---|---|---|
| At-most-once | Минимум дублей | Возможна потеря данных | Коммит оффсета до обработки | Низкоценные уведомления |
| At-least-once | Не теряет records при нормальной настройке | Возможны дубли | Нужна идемпотентность | Большинство event-driven сервисов |
| Idempotent producer | Защита от producer retry duplicates | Не решает consumer offsets | Ложное чувство EOS | Producer пишет events в Kafka |
| Kafka transactions | Атомарность output + offsets | Не покрывает DB/HTTP | Длинный транзакции, ограждение, ACL | Kafka-to-Kafka пайплайн |
| Kafka Streams exactly_once_v2 | Интегрированная модель stream processing | Меньше контроля imperative code | Сложность хранилища состояний/журнала изменений | Агрегации, объединения, обогащение |
| Outbox + idempotent consumers | Надёжная DB integration | Дубли возможны, но управляемы | Лаг релея, зависание исходящего сообщения | Order DB -> Kafka events |
| Inbox/dedupe table | Защита external side effects | Storage и cleanup | Высокая кардинальность идентификаторов | Payments, ledger, inventory |
Практика внедрения: от MVP до production
- На MVP начните с at-least-once и идемпотентной бизнес-операции. Не включайте Kafka transactions, пока не ясно, какую конкретно консистентность они должны дать.
- Опишите pipeline: input топик, output топик, консьюмер группа, business key, event id, partition key, retry, DLT, owner.
- Решите, где граница exactly-once: только Kafka-to-Kafka или Kafka + DB + external side effect.
- Для Kafka-to-Kafka включите транзакционный продьюсер,
read_committed,enable-auto-commit=false, uniquetransaction-id-prefix. - Для DB side effects добавьте unique constraints на
eventId/commandId. - Для DB -> Kafka добавьте outbox, relay, idempotent downstream.
- Для external provider добавьте idempotency key и таблицу attempts.
- Проверьте ACL: Topic Read/Write, Group Read, TransactionalId Write/Describe, IdempotentWrite.
- Нагрузочно протестируйте batch size, transaction duration, max.poll.records, producer buffer.
- Протестируйте crash после processing, после send, до commit, после DB commit, до outbox mark sent.
- Настройте dashboard: lag, transaction latency, aborts, retries, rebalances, ISR брокера.
- Добавьте алерты не только на lag, но и на abort spike, producer errors, under min ISR.
- Сделайте rollout на малом traffic slice.
- Во время rollout сравнивайте business counts: input events, output events, unique event ids, duplicate ids.
- Подготовьте rollback: как отключить потребителя, как переиграть offsets, как replay топик, как восстановить outbox.
- Подготовьте runbook для
ProducerFencedException, transaction timeout, authorization failure, зависшая консьюмер группа. - Документируйте ADR: почему выбрали transactions/outbox/idempotency, какие гарантии есть и каких нет.
- Проверьте, что команда поддержки понимает разницу между duplicate Kafka record и duplicate business effect.
- Не используйте exactly once, если occasional duplicates дешевле, чем эксплуатационная сложность transactions.
- Пересмотрите решение после первой реальной нагрузки: многие параметры корректно подбираются только по production-like тесту.
Чек-лист для разработчика
- Проверьте, что input records имеют стабильный
eventId. - Проверьте, что partition key соответствует требуемому порядку обработки.
- Проверьте, что
enable-auto-commit=false. - Проверьте, что транзакционные топики потребителя используют
read_committed. - Проверьте, что producer config не конфликтует с idempotence.
- Проверьте, что
transaction-id-prefixуникален на инстанс. - Проверьте, что разные реплики не ограждают друг друга.
- Проверьте, что внешние HTTP-вызовы не спрятаны внутри долгой Kafka transaction.
- Проверьте, что DB side effects защищены unique constraints.
- Проверьте, что outbox relay допускает повторную публикацию без бизнес-дубля.
- Проверьте, что downstream потребители дедуплицируют по business event id, а не по Kafka offset.
- Проверьте, что retry не выполняет повторно необратимую операцию без idempotency key.
- Проверьте, что DLT содержит достаточно metadata: topic, partition, offset, eventId, exception class.
- Проверьте, что
ProducerFencedExceptionне глушится. - Проверьте, что transaction timeout больше нормального processing time, но не скрывает зависания.
- Проверьте, что
max.poll.interval.msсогласован с worst-case processing time. - Проверьте, что dashboard показывает producer errors/retries, transaction timings, lag и rebalances вместе.
- Проверьте ACL на Topic, Group, TransactionalId и IdempotentWrite.
- Проверьте, что replay топик не создает повторный business effect.
- Проверьте, что есть runbook для stuck outbox и offset reset.
- Проверьте, что integration tests моделируют crash до и после commit.
- Проверьте, что команда может объяснить, где exactly-once заканчивается.
Что важно запомнить
- Kafka exactly once — это не «любое действие выполнится один раз», а атомарность Kafka output records и input offsets.
- Idempotent producer защищает от producer retry duplicates, но не заменяет transactions.
read_committedобязателен для потребителей, которые не должны видеть прерванные транзакционные записи.transactional.id— identity producer, а не id транзакции; его надо проектировать.- Kafka transactions не делают PostgreSQL, HTTP и payment provider частью Kafka commit.
- Для побочных эффектов БД чаще нужен outbox, unique constraints и идемпотентные потребители.
- Долгие транзакции могут задерживать downstream через LSO.
- Exactly-once без observability превращается в сложный механизм, который команда не сможет диагностировать ночью.
Заключение
Kafka exactly once — сильный инструмент, но он требует трезвого отношения. Он отлично работает там, где задача действительно Kafka-native: прочитать records, вычислить результат, записать output records и атомарно зафиксировать offsets. В таких pipelines он убирает целый класс дублей и потерь, которые иначе приходится закрывать ручной логикой.
Но как только pipeline выходит за границу Kafka, магия заканчивается. Платёжный провайдер не знает про Kafka transaction. PostgreSQL не обязан commit’иться вместе с Kafka offset. Email gateway не откатит письмо, если producer abort’нулся. Здесь инженерная зрелость не в том, чтобы везде включить transactions, а в том, чтобы честно нарисовать failure modes и закрыть их правильными механизмами.
Exactly once – это не кнопка надежности. Это часть архитектуры консистентности. Понимание его границ отличает разработчика, который просто использует Kafka API, от инженера, который способен отвечать за данные в production.
Источники и что стоит почитать
- Apache Kafka Design – раздел про exactly-once доставку, транзакционный продьюсер, оффсеты потербителя и границы гарантий Kafka.
- Apache Kafka Producer Configs 4.2 —
acks,enable.idempotence,retries,max.in.flight.requests.per.connectionи связанные ограничения. - Apache Kafka Consumer Configs —
isolation.level=read_committed, Last Stable Offset и поведение consumers при open transactions. - KIP-98: Exactly Once Delivery and Transactional Messaging – исходная мотивация Kafka транзакций, различие идемпотентного продсьюера и гарантии транзакционности.
- KIP-447: Producer scalability for exactly once semantics – EOS v2, ограждение и связь транзакционных продьюсеров с семантикой потребительской группы..
- Apache Kafka Streams Core Concepts — exactly-once processing semantics и
exactly_once_v2. - Spring Kafka Exactly Once Semantics – как контейнер листенера начинает транзакцию, отправляет оффсет в транзакцию и что означает EOS в Spring Kafka.
- Spring Kafka Transactions –
transactionIdPrefix,KafkaTransactionManager, синхронизация транзакций, ограничения повторных попыток. - Spring Boot Apache Kafka Support — auto-configuration
KafkaTemplate,@KafkaListener,KafkaTransactionManagerприtransaction-id-prefix. - Confluent: Transactions in Apache Kafka — transaction coordinator, transaction log, transaction markers и performance overhead.
- Apache Kafka Monitoring – JMX, метрики брокера, вопросы безопасности для удаленного JMX.
- Confluent Producer and Consumer Metrics -метрики транзакций производителя, задержка потребителя, опрос, выборка и метрики перебалансировки.
- Apache Kafka Authorization and ACLs — Topic, Group, TransactionalId, IdempotentWrite и CLI для ACL.
- Testcontainers Kafka Module – актуальный
org.testcontainers.kafka.KafkaContainerи Kafka integration tests.
![]()
You must be logged in to post a comment.