Kafka exactly-once глазами разработчика: как не сломать консистентность в ПРОДе

Exactly-once в Kafka – это не магия “событие обработается один раз”, а набор конкретных гарантий, ограничений и эксплуатационных обязательств.

Оглавление

  1. Введение
  2. Версии и рамки статьи
  3. Термины и базовая модель Kafka
  4. Главная идея
  5. Где exactly once действительно нужен
  6. Как Kafka exactly once работает под капотом
  7. Архитектурный контекст
  8. Практическая работа в Java / Spring
  9. Типовые ошибки и анти-паттерны
  10. Наблюдаемость и диагностика
  11. Производительность, отказоустойчивость и безопасность
  12. Сравнение подходов
  13. Практика внедрения: от MVP до прода
  14. Чек-лист для разработчика
  15. Что важно запомнить
  16. Заключение
  17. Источники и что стоит почитать

Введение

Проблемы с Kafka exactly once обычно начинаются не с падения брокера. Все выглядит куда спокойнее: сервис жив, консьюмер группа активна, lag вроде бы под контролем, но в бизнес-данных появляются странные данные. Один заказ дважды зарезервировал товар. Один платеж дважды попал в книгу учета. Один отчет показал лишнюю операцию. Команда смотрит в логи и видит классическую картину: сообщение обработалось, сервис перезапустился, retry сработал, offset сдвинулся не там, где ожидали.

На бумаге Kafka выглядит просто: producer пишет событие, consumer читает событие, offset показывает позицию. В проде между этими тремя словами живут retry, rebalance, broker failover, сетевые таймауты, частично выполненные операции, внешние API, базы данных и человеческая вера в то, что “Kafka поддерживает exactly once”.

Главная ошибка – понимать exactly once как обещание уровня бизнеса: “действие произойдёт ровно один раз”. Kafka такого обещания сама по себе не даёт. Она даёт гораздо более конкретный механизм:

  • идемпотентный producer,
  • транзакционный producer,
  • атомарную фиксацию записей в Kafka и consumer offsets,
  • изоляцию read_committed,
  • ограждение старых инстансов продьюсера
  • transaction markers в логе.

Эти гарантии сильные, но их границы уже, чем кажется. Документация Kafka прямо формулирует: exactly-once для других целевых-систем обычно требует сотрудничества этих систем, а Kafka предоставляет примитивы, которые помогают это реализовать.

Поэтому статья не про то, как поставить enable.idempotence=true и успокоиться. Она про то, где exactly once действительно работает, где он не работает, как его правильно включать в Java/Spring, какие ошибки чаще всего приводят к ночным инцидентам и какие метрики должны быть на дашбордах до первого прод-релиза.

Версии и рамки статьи

Примеры и рекомендации ниже ориентированы на Apache Kafka 4.2.x, Kafka clients 4.x, Java 21, Spring Boot 3.5/4.0 и Spring for Apache Kafka 3.3/4.0.

Kafka Streams exactly-once v2 требует версию брокера 2.5.0 или новее. Старая первая реализация exactly-once в Kafka Streams была deprecated начиная с Kafka Streams 3.0.

Важное замечание: статья не обещает “глобальный exactly once” для Kafka + PostgreSQL + внешний платежный провайдер + email + HTTP callback. В распределенных системах это почти всегда достигается не одной настройкой, а сочетанием Kafka транзакций, идемпотентности, outbox, уникальных ключей, retry-политик, дедупликации и корректной диагностики.

Термины и базовая модель Kafka

Topic (Топик)

Топик – это именованный поток записей. Для разработчика топик важен не как “очередь”, а как контракт между сервисами: формат события, ключ партиционирования, порядок внутри партиции, retention, ACL, схема, правила retry и DLT.

Ошибка начинается там, где топик воспринимают как просто канал передачи JSON. Топик – это часть модели данных.

Partition (партиция)

Партиция – это упорядоченный append-only log внутри топика. Порядок Kafka гарантирует внутри одной партиции, а не глобально по всему топику. Если события одного заказа должны обрабатываться строго последовательно, ключ должен стабильно вести их в одну партицию.

Exactly once не исправляет неправильный partition key (ключ партиционирования). Если OrderCreated, PaymentCaptured и OrderCancelled для одного orderId попали в разные партиции и обрабатываются разными потребителями, у вас уже другая модель консистентности.

Offset (оффсет)

Оффсет – позиция записи в партиции. Consumer offset – это не “сообщение удалено” и не “сообщение обработано”. Это позиция, которую консьюмер группа считает следующей для чтения.

В exactly-once consume-process-produce offset должен фиксироваться атомарно вместе с output-записями. Иначе возможны две классические поломки: output уже записан, offset еще не закоммичен – будет повторная обработка, оффсет закоммичен, output не записан – будет потеря результата.

Producer (Продьюсер)

Продьюсер отправляет записи в Kafka. Важные настройки для надежности:

  • acks
  • enable.idempotence
  • retries
  • max.in.flight.requests.per.connection
  • delivery.timeout.ms
  • transactional.id.

В Kafka 4.2 producer idempotence включен по умолчанию, если нет конфликтующих настроек. Для него нужны:

  • acks=all
  • retries больше нуля
  • max.in.flight.requests.per.connection <= 5.

Consumer (консьюмер)

Консьюмер читает записи из партиции топика. Консьюмер группа распределяет партиции между экземплярами приложения. При ребалансировке “владельцы” партиций меня.тся, поэтому exactly-once не может быть только локальной логикой в коде. Брокер должен понимать, какой продьюсер связан с каким consumer group metadata.

Spring Kafka 3.0+ использует EOSMode.V2, который опирается на fetch-offset-request ограждение и требует версию брокера 2.5 или новее.

Idempotent producer

Idempotent producer (идемпотентный продьюсер) защищает от дублей при повторной отправке записи продьюсером в Kafka. Упрощенно: продьюсер получает producer id, ведет последовательные значения по партиции, а брокер может распознать повторную отправку того же батча. KIP-98 разделяет идемпотентность записи продьюсера и транзакционную атомарность по нескольким TopicPartitions: первое не заменяет второе.

Практический смысл: если продьсюер отправил батч, брокер записал его, но ack потерялся в сети, retry не должен создать второй такой же батч в партиции.

Transactional producer (транзакционный продьюсер)

Транзакционный продьюсер нужен, когда нужно атомарно:

  • записать исходящую запись в один или несколько топиков/партиций
  • зафиксировать консьюмер оффсеты для входящей записи
  • сделать так, чтобы read_committed потребители видели только закоммиченные транзакции.

Это не transaction manager для всей вашей системы. Это transaction manager для Kafka-примитивов.

transactional.id

transactional.id – это не id конкретной транзакции. Это стабильная идентичность транзакционный продьюсер. Она нужна, чтобы брокер мог связать сесиию продьюсера, выдать producer epoch и оградить старый продьюсер, который продолжает жить после прерывания сети или перезапуска.

В Spring Kafka при transactionIdPrefix фабрика продьюсеров создает транзационные продьсеры с transactional.id = transactionIdPrefix + n, а для нескольких экземпляров приложения префикс должен быть уникален на экземпляр.

read_committed

Консьюмер с isolation.level=read_committed не возвращает записи прерванных транзакций и не читает дальше Last Stable Offset, пока есть открытая транзакция перед ним. Это важно: долгие транзакции могут визуально увеличивать lag и задерживать чтение последующих записей.

Last Stable Offset

Last Stable Offset, или LSO, – граница, до которой состояние транзакций всех записей уже решено: committed или aborted. Для read_committed потребителя это фактический предел чтения, если впереди есть незавершенная транзакция.

Fencing (ограждение)

Ограждение – механизм защиты от “zombie producer”. Если старый инстанс после прерывания сети продолжает пытаться коммитить транзакции, брокер должен отклонить его операции, потому что новый инстанс уже получил более свежий epoch для того же transactional.id.

Без ограждения exactly-once быстро превращается в “почти exactly once, пока сеть идеальная”.

Главная идея

Kafka exactly once – это атомарность Kafka-операций в сценарии

consume -> process -> produce: исходящие записи и входящие оффсеты фиксируются вместе, ретраи продьюсера не создают дубли, старые producer-инстансы ограждаются, а потребители с read_committed не видят прерванные/открытые транзакционные записи. Но эта гарантия заканчивается на границе Kafka. Как только обработка включает PostgreSQL, HTTP, платежного провайдера, email или файловую систему, корректность надо строить отдельно: через идемпотентность, outbox, уникальные ключи, дедупликацию и понятные режимы отказов.

Где exactly once действительно нужен

Exactly once нужен не везде. Более того, включать транзакции “на всякий случай” – плохая идея: они добавляют latency, оверхед на коррдинацию и новые режимы отказов.

Финансовая книга

Если событие PaymentCaptured создает бухгалтерскую запись, дубль меняет баланс. Здесь нельзя просто сказать “ну переагрегируем потом”. Бухгалтерская книга должна быть построена так, чтобы одно бизнес событие создало одну запись в бухгалтерской книге. Kafka EOS может помочь в Kafka-to-Kafka pipeline, но запись в базу все равно требует уникального event_id и транзакционной модели в БД.

Инвентаризация и бронирование

Если заказ резервирует товар, повторная обработка может дважды уменьшить доступный запас. Здесь exactly once часто пытаются использовать неправильно: транзакция Kafka не откатит изменение в PostgreSQL. Правильнее строить команду/событие как идемпотентную операцию: reservationId, unique constraint, compare-and-set, версионирование.

Выставление счетов и формирование счетов-фактур

Если событие измерения использования дважды попадет в счет, клиент получит неверный счет. При этом пайплайн часто состоит из нескольких Kafka топиков:

  • raw usage
  • normalized usage
  • billable events
  • invoice candidates.

Внутри Kafka Streams exactly-once v2 подходит хорошо, если хранилища состояний, оффесты и топик вывода участвуют в одной модели.

Антифрод и управление рисками

Дубль может привести не только к лишней записи, но и к лишнему действию: повторно заблокировать карту, повторно отправить кейс на ручную обработку, повторно увеличить уровень риска. Здесь важна не только доставка, но и семантика команды: одно решение должно иметь стабильный decisionId.

Что не требует exactly once

Логи, clickstream, технический аудит событий, телеметрия, debug-события, low-value analytics часто нормально живут с at-least-once и дедупликацией на стороне аналитики. Там важнее пропускная способность, простота и наблюдаемость.

Как Kafka exactly once работает под капотом

Простая модель

В обычном at-least-once пайпалйне потребления есть три независимых действия:

  1. Прочитать входящую запись.
  2. Записать исходящую запись.
  3. Закоммитить входящий оффест.

Если сервис упал между пунктами 2 и 3, вывод уже есть, оффсет еще старый. После рестарта записи будут прочитаны заново. Это обработка дубликатов.

Если оффсет закоммичен до вывода, а сервис упал после коммита оффсета, вывод потерян. Это потеря данных.

Транзакции Kafka решают именно эту связку: исходящая запись и оффсет должны стать видимыми как единый результат.

Что делает идемпотентный продьюсер

Идемпотентный продьюсер защищает запись в Kafka от дублей на retry. Например, продьюсер отправил батч в партицию. Брокер записал batch, но ack потерялся. Producer retry’ит. Без идемпотентности брокер может записать batch повторно. С идемпотентностью брокер видит, что это уже принятая sequence, и не создает второй батч рекорд.

Это сильная гарантия, но она не делает несколько партиций атомарными. Для атомарной записи в несколько партиций/топиков нужны транзакции. KIP-98 прямо разделяет эти уровни: идемпотентный продьюсер не дает гаратний для нескольких TopicPartitions. Для этого нужны транзакционные гарантии.

Что делает transaction coordinator

Transaction coordinator – компонент внутри Kafka брокер. Он управляет состоянием транзакции для transactional.id. Transaction log – внутренний Kafka топик, где хранится состояние transactions. Каждый transactional.id мапится к координатору через transaction log партиции.

Упрощённый lifecycle:

  1. Producer вызывает initTransactions().
  2. Координатор связывает transactional.id с producer id и epoch.
  3. Старые сессии продьюсеров для этого transactional.id ограждены.
  4. Producer вызывает beginTransaction().
  5. Producer пишет записи в исходящую партицию.
  6. Producer вызывает sendOffsetsToTransaction().
  7. Producer вызывает commitTransaction() или abortTransaction().
  8. Координатор пишет маркеры транзакций в партиции.
  9. read_committed потребители видят только committed data.

Что такое transaction markers

Transaction markers – служебные записи в Kafka log, которые показывают, была ли транзакция committed или aborted. Они не возвращаются приложению как обычные записи, но брокер использует их, чтобы read_committed потребитель не видел “прерванные” записи и записи из еще “открытых” транзакций.

Почему read_committed может увеличить видимый lag

Если потребитель читает в режиме read_committed, он не может читать дальше LSO. Если перед ним открытая транзакция, даже записи после нее могут быть временно недоступны. Поэтому длинная Kafka транзакция в одном продьюсере может создать задержку для downstream потребителя, хотя брокер жив, CPU нормальный, а сеть не падает. Это один из самых неприятных прод-эффектов: проблема выглядит как консьюмер лаг, но причина – жизненный цикл транзакции.

Таблица механизмов

МеханизмЧто делаетПочему важенЧто ломается при неправильном понимании
enable.idempotenceУбирает producer duplicates при retryЗащищает от повторной записи batchКоманда думает, что это уже «глобальный exactly once»
transactional.idДаёт producer identity между sessionsПозволяет оградить zombie producersОдин профикс на все pods приводит к ограждению; случайные IDs ухудшают управление
Producer epochВерсия producer для transactional.idСтарый producer не может продолжать работуZombie инстансы могут писать, если ограждение не работает
Transaction coordinatorКоординирует состояние транзакцииДелает commit/abort видимым для партицийЛетенси координатора становится частью end-to-end latency
Transaction markersПомечают commit/abort в логеread_committed фильтрует прерванные транзакцииDownstream видит грязные данные при read_uncommitted
sendOffsetsToTransactionДобавляет offsets в Kafka transactionИсходящие и входящие позиции фиксируются атомарноКоммит оффеста отдельно от исходящего сообщения создаёт дубликаты/потерю данных
read_committedЧитает только зафиксированные записи транзакцийНе видит прерванные/открытые транзакцииДолгие открытые транзакции задерживают чтение до LSO

Архитектурный контекст

Рис. 1: Kafka exactly-once в системе заказов

Внутри Kafka pipeline orders.validated.v1 --> inventory.reserved.v1 можно построить настоящий Kafka exactly-once: входящие оффсеты и исходящие записи фиксируются одной Kafka транзакцией.

Но Ledger Service работает с PostgreSQL. Kafka транзакция не делает PostgreSQL частью Kafka commit. Поэтому здесь правильная модель другая: БД фиксирует запись книги учета и исходящее событие в одной SQL транзакции, а публикация outbox в Kafka допускает retry и дубли, которые downstream обязан дедуплицировать по eventId.

Внешний Платежный Провайдер ещё сложнее. Его нельзя откатить через Kafka. Единственный нормальный путь – ключ идемпотентности на стороне провайдера, собственное хранилище попыток, согласованность и отдельный runbook.

Рис. 2: процесс потребления Kafka с транзакцией и read_committed

Latency накапливается не только в обработке user code. Она есть в батчинге продьюсера, RPC к координатору транзакции, записи исходящего батча, записи оффсетов, маркер фиксации и реакции downstream потребителя на LSO.

Повторная попытка допустима, но она должен быть встроен в модель транзакции. Если приложение бросает исключение до фиксации, транзакция прерывается, оффсет не фиксируется, записи читаются повторно. Если пользовательский код сделал необратимый HTTP-вызов внутри транзакции, Kafka уже не спасет: HTTP-вызов мог произойти, а Kafka транзакция потом прервалась.

Практическая работа в Java / Spring

Базовая прод-конфигурация Spring Boot

spring:
  application:
    name: inventory-worker

  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}

    consumer:
      group-id: inventory-reservation-v1
      enable-auto-commit: false
      isolation-level: read_committed
      auto-offset-reset: earliest
      properties:
        max.poll.records: 100
        max.poll.interval.ms: 300000

    producer:
      transaction-id-prefix: ${POD_NAME:${HOSTNAME:${random.uuid}}}-inventory-tx-
      acks: all
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5
        delivery.timeout.ms: 120000
        request.timeout.ms: 30000
        linger.ms: 10
        compression.type: zstd

    properties:
      client.id: ${spring.application.name}-${POD_NAME:${HOSTNAME:local}}

Если задан spring.kafka.producer.transaction-id-prefix, Spring Boot автоматически настраивает KafkaTransactionManager; если такой bean есть, он ассоциируется с listener container factory.

Здесь важны не сами значения, а принципы:

  • enable-auto-commit=false, потому что коммит оффсета должен контролироваться границами транзакции
  • isolation-level=read_committed, чтобы downstream не видел прерванные транзакции
  • transaction-id-prefix уникален для экземпляра
  • acks=all, idempotence и max.in.flight <= 5 не конфликтуют с требованиями producer idempotence.

Правильный Kafka-to-Kafka обработчик в Spring

package com.acme.inventory;

import java.time.Instant;
import java.util.UUID;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class InventoryReservationListener {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ReservationDecisionService decisionService;

    public InventoryReservationListener(
            KafkaTemplate<String, Object> kafkaTemplate,
            ReservationDecisionService decisionService
    ) {
        this.kafkaTemplate = kafkaTemplate;
        this.decisionService = decisionService;
    }

    @KafkaListener(
            topics = "orders.validated.v1",
            groupId = "inventory-reservation-v1"
    )
    public void handle(
            OrderValidatedEvent event,
            @Header(KafkaHeaders.RECEIVED_KEY) String orderId,
            @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
            @Header(KafkaHeaders.OFFSET) long offset
    ) {
        ReservationDecision decision = decisionService.decide(event);

        InventoryReservedEvent output = new InventoryReservedEvent(
                UUID.randomUUID(),
                event.orderId(),
                decision.status(),
                decision.items(),
                Instant.now()
        );

        kafkaTemplate.send("inventory.reserved.v1", orderId, output);
    }
}

Этот пример корректен только при важном условии: decisionService.decide() не делает необратимых внешних побочных эффектов. Он может вычислять решение из уже прочитанных данных или работать с локальным хранилищем состояний в Kafka Streams-модели, но если он пишет в PostgreSQL или вызывает HTTP, это уже другая консистентность.

При настроенном KafkaTransactionManager listener container начинает Kafka транзакция до вызова listener. KafkaTemplate.send() участвует в этой transaction, а оффсет отправляются в транзакцию перед коммитом. Spring Kafka описывает эту модель как exactly-once для read --> process --> write, при этом сам read/process остаётся at-least-once, а атомарность достигается на уровне результата sequence.

Неправильный пример: Kafka транзакций вокруг внешнего платежа

@Component
public class BadPaymentListener {

    private final PaymentProviderClient paymentProviderClient;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public BadPaymentListener(
            PaymentProviderClient paymentProviderClient,
            KafkaTemplate<String, Object> kafkaTemplate
    ) {
        this.paymentProviderClient = paymentProviderClient;
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(topics = "payments.requested.v1", groupId = "payment-worker-v1")
    public void handle(PaymentRequestedEvent event) {
        PaymentCaptureResult result = paymentProviderClient.capture(
                event.paymentId(),
                event.amount()
        );

        kafkaTemplate.send(
                "payments.captured.v1",
                event.paymentId().toString(),
                new PaymentCapturedEvent(event.paymentId(), result.providerOperationId())
        );
    }
}

На первый взгляд код нормальный. На деле он опасен.

Если захват провайдера прошёл, а Kafka транзакция потом откатилась, событие payments.captured.v1 не появится, оффсет не зафиксируется, входящее сообщение будет прочитан заново. Второй вызов capture() может повторить списание, если у провайдера нет ключа идемпотентности.

Kafka exactly-once не делает внешний HTTP-вызов exactly-once.

Исправление: idempotency key для внешнего провайдера

@Component
public class PaymentListener {

    private final PaymentAttemptRepository attemptRepository;
    private final PaymentProviderClient paymentProviderClient;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public PaymentListener(
            PaymentAttemptRepository attemptRepository,
            PaymentProviderClient paymentProviderClient,
            KafkaTemplate<String, Object> kafkaTemplate
    ) {
        this.attemptRepository = attemptRepository;
        this.paymentProviderClient = paymentProviderClient;
        this.kafkaTemplate = kafkaTemplate;
    }

    @KafkaListener(topics = "payments.requested.v1", groupId = "payment-worker-v1")
    public void handle(PaymentRequestedEvent event) {
        PaymentAttempt attempt = attemptRepository.findOrCreate(
                event.paymentId(),
                event.requestId()
        );

        PaymentCaptureResult result = paymentProviderClient.capture(
                event.paymentId(),
                event.amount(),
                attempt.idempotencyKey()
        );

        attempt.markCaptured(result.providerOperationId());
        attemptRepository.save(attempt);

        kafkaTemplate.send(
                "payments.captured.v1",
                event.paymentId().toString(),
                new PaymentCapturedEvent(
                        event.paymentId(),
                        event.requestId(),
                        result.providerOperationId()
                )
        );
    }
}

Это всё ещё не «чистый Kafka EOS», потому что есть БД и внешний провайдер. Но теперь повторная обработка Kafka запись не обязана повторять бизнес-эффект. Провайдер видит тот же idempotency key, а локальная БД хранит состояние попытки.

Чистая Java: transactional consume-process-produce

package com.acme.kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class TransactionalOrderCopier {

    public static void main(String[] args) {
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-copy-v1");
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-copy-instance-1");
        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");

        try (
                KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
                KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)
        ) {
            consumer.subscribe(java.util.List.of("orders.validated.v1"));
            producer.initTransactions();

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                if (records.isEmpty()) {
                    continue;
                }

                try {
                    producer.beginTransaction();

                    for (ConsumerRecord<String, String> record : records) {
                        String transformed = transform(record.value());
                        producer.send(new ProducerRecord<>(
                                "orders.normalized.v1",
                                record.key(),
                                transformed
                        ));
                    }

                    producer.sendOffsetsToTransaction(
                            offsetsFor(records),
                            consumer.groupMetadata()
                    );

                    producer.commitTransaction();
                } catch (ProducerFencedException fenced) {
                    throw fenced;
                } catch (KafkaException failure) {
                    producer.abortTransaction();
                }
            }
        }
    }

    private static String transform(String value) {
        return value.trim();
    }

    private static Map<TopicPartition, OffsetAndMetadata> offsetsFor(
            ConsumerRecords<String, String> records
    ) {
        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();

        for (TopicPartition partition : records.partitions()) {
            var partitionRecords = records.records(partition);
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
        }

        return offsets;
    }
}

Обратите внимание: consumer.commitSync() здесь не вызывается. Оффсет отправляется в транзакцию через sendOffsetsToTransaction().

БД + Kafka: production-friendly outbox

Когда обработка меняет PostgreSQL и должна породить Kafka событие, чаще всего правильнее не пытаться сделать «Kafka транзакция + БД транзакция = распределенная транзакция». Лучше использовать transactional outbox.

@Entity
@Table(
        name = "ledger_entry",
        uniqueConstraints = {
                @UniqueConstraint(
                        name = "uk_ledger_entry_source_event",
                        columnNames = "source_event_id"
                )
        }
)
public class LedgerEntry {

    @Id
    private UUID id;

    @Column(name = "source_event_id", nullable = false)
    private UUID sourceEventId;

    @Column(name = "account_id", nullable = false)
    private UUID accountId;

    @Column(name = "amount_cents", nullable = false)
    private long amountCents;

    protected LedgerEntry() {
    }

    public LedgerEntry(UUID id, UUID sourceEventId, UUID accountId, long amountCents) {
        this.id = id;
        this.sourceEventId = sourceEventId;
        this.accountId = accountId;
        this.amountCents = amountCents;
    }
}
@Entity
@Table(
        name = "outbox_event",
        indexes = {
                @Index(name = "ix_outbox_status_created_at", columnList = "status,created_at")
        }
)
public class OutboxEvent {

    @Id
    private UUID id;

    @Column(nullable = false)
    private String topic;

    @Column(name = "event_key", nullable = false)
    private String key;

    @Column(nullable = false, columnDefinition = "jsonb")
    private String payload;

    @Column(nullable = false)
    private String status;

    @Column(name = "created_at", nullable = false)
    private Instant createdAt;

    protected OutboxEvent() {
    }

    public static OutboxEvent newKafkaEvent(String topic, String key, String payload) {
        OutboxEvent event = new OutboxEvent();
        event.id = UUID.randomUUID();
        event.topic = topic;
        event.key = key;
        event.payload = payload;
        event.status = "NEW";
        event.createdAt = Instant.now();
        return event;
    }

    public void markSent() {
        this.status = "SENT";
    }
}
@Service
public class LedgerProjectionService {

    private final LedgerEntryRepository ledgerRepository;
    private final OutboxRepository outboxRepository;
    private final ObjectMapper objectMapper;

    public LedgerProjectionService(
            LedgerEntryRepository ledgerRepository,
            OutboxRepository outboxRepository,
            ObjectMapper objectMapper
    ) {
        this.ledgerRepository = ledgerRepository;
        this.outboxRepository = outboxRepository;
        this.objectMapper = objectMapper;
    }

    @Transactional
    public void apply(PaymentCapturedEvent event) throws JsonProcessingException {
        if (ledgerRepository.existsBySourceEventId(event.eventId())) {
            return;
        }

        LedgerEntry entry = new LedgerEntry(
                UUID.randomUUID(),
                event.eventId(),
                event.accountId(),
                event.amountCents()
        );

        ledgerRepository.save(entry);

        LedgerEntryCreatedEvent output = new LedgerEntryCreatedEvent(
                UUID.randomUUID(),
                event.eventId(),
                event.accountId(),
                event.amountCents(),
                Instant.now()
        );

        outboxRepository.save(OutboxEvent.newKafkaEvent(
                "ledger.entries.created.v1",
                event.accountId().toString(),
                objectMapper.writeValueAsString(output)
        ));
    }
}

В этой модели SQL транзакция фиксирует изменение записи и outbox вместе. Если сервис упал после коммита БД, исходящее событие останется в статусе NEW. Relay позже опубликует его в Kafka. Если relay опубликовал событие, но не успел отметить SENT, событие может быть опубликовано повторно. Поэтому потребители должны дедуплицировать по eventId.

Это честная модель: мы не притворяемся, что PostgreSQL стал частью Kafka transaction. Мы явно проектируем возможные дубли и не допускаем потери события.

Outbox relay

@Component
public class OutboxRelay {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final TransactionTemplate transactionTemplate;

    public OutboxRelay(
            OutboxRepository outboxRepository,
            KafkaTemplate<String, String> kafkaTemplate,
            TransactionTemplate transactionTemplate
    ) {
        this.outboxRepository = outboxRepository;
        this.kafkaTemplate = kafkaTemplate;
        this.transactionTemplate = transactionTemplate;
    }

    @Scheduled(fixedDelayString = "${outbox.relay.fixed-delay:1000}")
    public void publishBatch() {
        List<OutboxEvent> batch = transactionTemplate.execute(status ->
                outboxRepository.lockNextNewEvents(100)
        );

        if (batch == null || batch.isEmpty()) {
            return;
        }

        kafkaTemplate.executeInTransaction(operations -> {
            for (OutboxEvent event : batch) {
                operations.send(event.topic(), event.key(), event.payload());
            }
            return null;
        });

        transactionTemplate.executeWithoutResult(status -> {
            for (OutboxEvent event : batch) {
                event.markSent();
            }
            outboxRepository.saveAll(batch);
        });
    }
}

Repository-запрос для PostgreSQL:

public interface OutboxRepository extends JpaRepository<OutboxEvent, UUID> {

    @Query(value = """
            select *
            from outbox_event
            where status = 'NEW'
            order by created_at
            for update skip locked
            limit :limit
            """, nativeQuery = true)
    List<OutboxEvent> lockNextNewEvents(@Param("limit") int limit);
}

Outbox relay обычно at-least-once. Это нормально, если идентификатор события стабилен, downstream идемпотентен, а runbook объясняет, как переигрывать зависшие события.

Интеграционный тест с Testcontainers

@Testcontainers
@SpringBootTest
class InventoryReservationEosIT {

    @Container
    static KafkaContainer kafka =
            new KafkaContainer("apache/kafka-native:3.8.0");

    @DynamicPropertySource
    static void kafkaProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
        registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
        registry.add("spring.kafka.consumer.isolation-level", () -> "read_committed");
        registry.add("spring.kafka.consumer.enable-auto-commit", () -> "false");
        registry.add("spring.kafka.producer.transaction-id-prefix",
                () -> "test-" + UUID.randomUUID() + "-tx-");
    }

    @Autowired
    KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    TestReservationConsumer reservationConsumer;

    @Test
    void shouldPublishSingleReservationEvent() throws Exception {
        String orderId = "order-42";

        kafkaTemplate.send(
                "orders.validated.v1",
                orderId,
                new OrderValidatedEvent(orderId, List.of("sku-1", "sku-2"))
        ).get(10, TimeUnit.SECONDS);

        await()
                .atMost(Duration.ofSeconds(10))
                .untilAsserted(() -> {
                    List<InventoryReservedEvent> events =
                            reservationConsumer.receivedFor(orderId);

                    assertThat(events).hasSize(1);
                    assertThat(events.getFirst().orderId()).isEqualTo(orderId);
                });
    }
}

Testcontainers Kafka module в актуальной документации рекомендует org.testcontainers.kafka.KafkaContainer для Apache Kafka images; старый org.testcontainers.containers.KafkaContainer deprecated.

В таком тесте важно не просто проверить happy path. Нужны сценарии:

  • listener бросает exception до отправки;
  • listener бросает исключение после отправки, но до коммита транзакции;
  • перезапуск потребителя между poll и commit;
  • дубликат входящего события;
  • downstream потребитель читает с read_committed.

Типовые ошибки и анти-паттерны

Ошибка 1. Считать Kafka exactly once глобальной бизнес-гарантией

Как выглядит:

"Мы включили Kafka transactions, значит платёж не спишется дважды."

Почему это плохо:

Kafka транзакция не управляет внешним HTTP провайдером, PostgreSQL коммитом, email gateway или загрузкой на S3.

К чему приводит в проде:

Повторные списания, потерянные события, расхождение книги учета и платежного провайдера.

Как диагностировать:

Сравнивайте eventId, ID операции провайдера, ledger entry id и Kafka оффсеты. Если один Kafka рекорд породил несколько операций провайдера, проблема не в брокере.

Как исправить:

Использовать ключ идемпотентности у внешнего провайдера, хранить по ID, делать согласование, применять outbox для БД –> Kafka.

Ошибка 2. Коммит оффсета отдельно от публикации исходящего сообщения

Как выглядит:

kafkaTemplate.send("output.topic", key, event);
acknowledgment.acknowledge();

Почему это плохо:

Send и offset commit не атомарны. Между ними может быть падение системы.

К чему приводит в проде:

Дубли output или потеря результата.

Как диагностировать:

Ищите записи, где входящий оффсет уже зафиксирован, но исходящее сообщение отсутствует; или наоборот – output есть, а входящий оффсет был переобработан.

Как исправить:

Для Kafka-to-Kafka использовать Kafka транзакция и sendOffsetsToTransaction. В Spring – контейнер листнера транзакций

.

Ошибка 3. Потребитель читает транзакционные топики с read_uncommitted

Как выглядит:

spring:
  kafka:
    consumer:
      isolation-level: read_uncommitted

Почему это плохо:

Потребитель может увидеть записи из прерванной транзакции.

К чему приводит в проде:

Downstream обрабатывает данные, которые producer потом abort’нул.

Как диагностировать:

Сравнить поведение kafka-console-consumer с --isolation-level read_committed и read_uncommitted.

Как исправить:

Для топиков, где пишут транзакционные продьюсеры, downstream потребители должны использовать read_committed.

Ошибка 4. Один transaction-id-prefix на все pods

Как выглядит:

spring.kafka.producer.transaction-id-prefix: inventory-tx-

и это значение одинаковое во всех replicas.

Почему это плохо:

Транзакционные продьюсеры в разных инстансах начинают ограждать друг друга.

К чему приводит в проде:

ProducerFencedException, внезапные прерывания, нестабильная обработка.

Как диагностировать:

Искать ProducerFencedException, рост частотs прерываний, корреляция с развертыванием или масштабированием.

Как исправить:

Добавлять pod/инстанс identity в prefix: inventory-${POD_NAME}-tx-. Для Kubernetes это обычно StatefulSet ordinal или pod name.

Ошибка 5. Длинные внешние вызовы внутри Kafka transaction

Как выглядит:

@KafkaListener(...)
public void handle(Event event) {
    kafkaTemplate.send("step.started", event.id(), event);
    slowHttpClient.call(event);
    kafkaTemplate.send("step.finished", event.id(), event);
}

Почему это плохо:

Транзакция остается open, LSO не двигается, read_committed потребители могут ждать.

К чему приводит в проде:

Видимый lag downstream, рост длительности транзакции, таймауты, отмены.

Как диагностировать:

Смотреть лэтенси коммита транзакции/отмены, консьюмер лаг, latency запроса, трейсы внешнего HTTP.

Как исправить:

Не держать Kafka транзакция вокруг долгих побочных эффектов. Делить пайплайн на команды, outbox, идемпотентные внешние вызовы и отдельное событие результата.

Ошибка 6. транзакция на каждое сообщениебез необходимости

Как выглядит:

Каждая запись обрабатывается отдельной Kafka транзакцией даже при высокой нагрузке.

Почему это плохо:

Каждая транзакция требует координации, маркеры и обновлений состояния. Накладные расходы на транзакцию: при commit/abort координатор пишет маркеры в участвующие партиции и обновляет transaction log.

К чему приводит в проде:

Падение пропускной способности, рост latency, нагрузка на координатор транзакций.

Как диагностировать:

Сравнить количество записей в транзакцииr, latency коммита, частоту запросов продьюсера, метрики запроса брокера.

Как исправить:

Бачт записе внутри разумной границы транзакции. Размер batch подбирать нагрузочным тестированием, не по вкусу.

Ошибка 7. Смешать container transactions и non-blocking retries в Spring Kafka

Как выглядит:

Команда включает контейнер слушателя транзакций и retry топики, ожидая, что все будет атомарно.

Почему это плохо:

Spring Kafka документация прямо указывает, что non-blocking retries не комбинируются с container transactions.

К чему приводит в проде:

Неожиданное поведение оффсет коммита, retry топик, rollback и DLT.

Как диагностировать:

Проверить конфигурацию listener container, error handler, retry топик configuration.

Как исправить:

Для транзакционной обработки использовать блокирующий процесс повторной попытки/после отката либо явно проектировать retry топик как отдельный этап без иллюзии общей транзакции.

Ошибка 8. Игнорировать ProducerFencedException

Как выглядит:

catch (Exception e) {
    log.warn("Kafka error", e);
}

Почему это плохо:

Ограждение означает, что этот producer больше не имеет права продолжать транзакцию. Это не обычная transient error.

К чему приводит в проде:

Сервис продолжает работать в некорректном состоянии, записи зависают, процессинг ломается.

Как диагностировать:

Логи ProducerFencedException, прерывание транзакций, события перезапуска, дубликаты экземпляров с одинаковым идентификатором транзакции.

Как исправить:

При ограждении закрывать проьюсер и завершать инстанс. В Kubernetes – дать pod рестартовать.

Ошибка 9. Верить, что @Transactional делает Kafka + БД атомарными

Как выглядит:

@Transactional
public void process(Event event) {
    repository.save(entity);
    kafkaTemplate.send("topic", key, output);
}

Почему это плохо:

Spring может синхронизировать Kafka транзакцию с БД транзакцией, но это не полноценный распределнный 2PC. Spring Kafka документация отдельно предупреждает о сценариях, где коммит транзакции синхронизации может упасть после основной транзакции, и приложение должно компенсировать последствия.

К чему приводит в проде:

БД коммит есть, Kafka события нет, или Kafka событие есть, БД отметка не обновилась.

Как диагностировать:

Сравнивать таблицы бизнес-состояния и Kafka output по идентификатору события, искать ошибки коммита после БД коммита.

Как исправить:

Использовать outbox/inbox, уникальные ключи, reconciliation jobs.

Ошибка 10. Мониторить только consumer lag

Как выглядит:

Dashboard показывает records-lag-max, и команда считает этого достаточно.

Почему это плохо:

Lag – симптом, не причина. Причина может быть в открытой транзакции, “rebalance storm”, истощение буффера продьюсера, ACL ошибки.

К чему приводит в проде:

Команда масштабирует потребителей, хотя проблема в latency транзакций продьюсера или ISR брокера.

Как диагностировать:

Смотреть метрики транзакций производителя, метрики ребаланса потребителя, недостаточное количество реплик в разделах брокера, сбои при выполнении запросов на создание/получение данных..

Как исправить:

Дашборд должен связывать лаг с продьюсером, потребителем, брокеров и бизнес метриками.

Наблюдаемость и диагностика

Метрики

МетрикаГде взятьЧто означаетКогда тревожиться
records-lag-maxKafka consumer JMXМаксимальный lag потребителя по партициямРастёт дольше бизнес-SLO
records-consumed-rateKafka consumer JMXСкорость потребленияПадает при стабильной частоте входящих сообщений
last-poll-seconds-agoKafka consumer JMXСколько секунд с последнего poll()Растёт к max.poll.interval.ms
rebalance-total, failed-rebalance-totalConsumer coordinator metricsСтабильность консьюмер группыРастут во время нормальной нагрузки
record-error-rateProducer metricsОшибки producer sendsЛюбой ненулевой рост требует анализа
record-retry-rateProducer metricsПовтор записи продьюсераРост вместе с latency
request-latency-avg/maxProducer/consumer metricsLatency запросов к брокеруРост после раскатки или аварийное переключение брокера
bufferpool-wait-time, waiting-threadsProducer metricsProducer ждёт buffer memoryПризнак saturation producer
txn-commit-time-ns-total, txn-abort-time-ns-totalProducer metricsВремя commit/abort transactionsРост latency или abort storm
UnderReplicatedPartitions, UnderMinIsrPartitionCountBroker JMXРепликация и ISRВ production нормальное значение — 0

Kafka клиенты и брокеры экспонируют метрики через JMX; официальная документация Kafka также подчеркивает, что remote JMX по умолчанию выключен и при включении в production должен быть защищен.

Метрики потребителя вроде records-lag-max, last-poll-seconds-ago, fetch-latency, rebalance metrics описаны в документации Confluent по Kafka client JMX metrics.

Метрики транзакций продьюсера вроде txn-commit-time-ns-total, txn-abort-time-ns-total, txn-send-offsets-time-ns-total, а также record-error-rate, record-retry-rate, bufferpool-wait-time полезны для диагностики EOS-пайплайнов.

Prometheus alert examples

Имена метрик зависят от JMX exporter и naming rules. Ниже — пример логики, а не универсальные имена.

groups:
  - name: kafka-eos
    rules:
      - alert: KafkaProducerRecordErrors
        expr: increase(kafka_producer_record_error_total{client_id=~"inventory-worker.*"}[5m]) > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Kafka producer sends are failing"
          description: "Check broker errors, ACLs, ISR, transaction state and recent rollout."

      - alert: KafkaConsumerLagGrowing
        expr: max_over_time(kafka_consumer_records_lag_max{client_id=~"inventory-worker.*"}[10m]) > 10000
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "Consumer lag is growing"
          description: "Correlate with transaction commit latency, rebalances and broker health."

      - alert: KafkaTransactionAbortSpike
        expr: increase(kafka_producer_txn_abort_time_ns_total{client_id=~"inventory-worker.*"}[10m]) > 0
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Kafka transaction abort activity detected"
          description: "Check application exceptions, fencing, transaction timeout and downstream broker health."

Логи

Для транзакционных потребителей логируйте:

  • eventId
  • topic
  • partition
  • offset
  • consumerGroup
  • transactionalId или хотя бы instance id
  • business key: orderId, paymentId, accountId
  • результат обработки
  • причину abort
  • retry attempt
  • traceId.

Не логируйте наполнение целиком, если там PII, платежные данные или большие JSON. Не включайте постоянный DEBUG Kafka клиентов в проде без плана: объем логов может сам стать инцидентом.

Трейсы

OpenTelemetry span полезен не на каждый low-level poll, а на business processing:

Kafka message processing
  attributes:
    messaging.system = kafka
    messaging.destination.name = orders.validated.v1
    messaging.kafka.partition = 3
    messaging.kafka.message.offset = 928381
    app.order_id = order-42
    app.event_id = 2f4f...

Для внешних вызовов внутри пайплайна трейс должен показать, что latency ушла не в Kafka, а в HTTP провайдер или БД. Если трейс показывает долгий HTTP вызов внутри Kafka транзакции – это архитектурная ошибка.

Диагностические команды

# Посмотреть lag и assignment consumer group
kafka-consumer-groups.sh \
  --bootstrap-server kafka-1:9092 \
  --describe \
  --group inventory-reservation-v1

# Прочитать топик только с зафиксированными транзакционными записями
kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic inventory.reserved.v1 \
  --from-beginning \
  --isolation-level read_committed

# Сравнить с read_uncommitted при расследовании прерванных/открытых записей
kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic inventory.reserved.v1 \
  --from-beginning \
  --isolation-level read_uncommitted

# Проверить топик configuration
kafka-topics.sh \
  --bootstrap-server kafka-1:9092 \
  --describe \
  --topic inventory.reserved.v1

# Проверить ACL на transactional id prefix
kafka-acls.sh \
  --bootstrap-server kafka-1:9092 \
  --list \
  --transactional-id inventory-

# Проверить, что principal имеет producer/idempotent privileges
kafka-acls.sh \
  --bootstrap-server kafka-1:9092 \
  --list \
  --principal User:inventory-worker

Для транзакционного продьюсера нужны права не только на топик. Модель Kafka ACL включает:

  • resource type TransactionalId
  • transactional producer с transactional.id
  • требует Write на TransactionalId,
  • idempotent produce требует IdempotentWrite на Cluster,
  • обычный produce – Write на Topic.

Производительность, отказоустойчивость и безопасность

Latency

Kafka transactions добавляют координацию. Это не значит, что они медленные «всегда». Это значит, что latency теперь включает:

  • батчинг продьюсера
  • запросы к координатору транзакций
  • запись состояния транзакции
  • запись маркеров транзакции
  • фиксация оффсета внутри транзакции
  • ожидание LSO downstream потребителей.

Если SLA ендпоинта 50 ms, Kafka транзакция вокруг нескольких топиков и партициях может быть плохой идеей. Если это асинхронный пайплайн с SLO в секунды, цена может быть приемлемой.

Throughput (пропускная способность)

Главный способ не убить пропускную способность – не делать транзакции за мелкие операции без необходимости. Группируйте записи в транзакцию, но не держите транзакцию открытой слишком долго.

Компромисс:

  • маленькая транзакция – ниже latency, выше накладные расходы на запись
  • большая транзакция – выше пропускная способность, но выше задержка видимости и риск прерывания большого батча.

Backpressure

Producer может начать блокироваться на buffer memory. Это проявится через bufferpool-wait-time и waiting-threads. Потребитель может не успевать из-за медленной обработки, тогда last-poll-seconds-ago и records-lag-max растут.

Backpressure должен быть явным: ограничение concurrency, batch size, max.poll.records, circuit breaker для внешних вызовов, bounded executor queues.

Повторные попытки

Повторные попытки внутри Kafka продьюсера нормалбны, если продьюсер идемпотентен. Повторные попытки бизнес операций – отдельный вопрос.

Не выполняйте повторные попытки бесконечно внутри транзакции. Если обработка не может завершиться быстро и предсказуемо, лучше прерывание, роллбек оффсета и дать контроллируемую повторную попытку через обработчик ошибок или retry топик, если транзакционная модель это допускает.

Failover

При rперезапуске транзакционного продьюсера вызывает initTransactions(), брокеры завершает ожидание для того же transactional.id и ограждает старого потребителя. Это причина, почему transactional identity должна быть спроектирована, а не генерироваться как попало.

Security

Для EOS нужны ACL не только на входящие/исходящие топики и консьюмер группу, но и на TransactionalId. В общем Kafka кластере это часто забывают: приложение читает топик, пишет топик, но падает на авторизации координатора транзакций.

Также не включайте remote JMX без аутентификации и ограничений по сети. Kafka прямо предупреждает, что прод JMX требует настроек безопасности configuration.

Trade-off таблица

РешениеПлюсыМинусыКогда выбиратьКогда не выбирать
Idempotent producer onlyПросто, защищает producer retry duplicatesНет атомарности offsets + outputProducer-only запись в один/несколько топиков без consume-process-produceНужна атомарность входящего оффсета
Kafka transactionsСильная Kafka-to-Kafka гарантияLatency, накладные расходы координатора, сложнее ошибкиStream processing, read-process-writeВнешние побочные эффекты без идемпотентности
Kafka Streams EOS v2Хорошая интеграция state stores/output/offsetsТребует Streams modelStateful transformations, aggregationsИмперативный сервис с БД/HTTP побочными эффектами
Transactional outboxХорошо для БД –> KafkaAt-least-once publish, нужна дедупликацияБизнес-состояние в PostgreSQLКогда нужен low-latency pure Kafka pipeline
Idempotent consumerПростая защита от дублейНужно хранить processed idsВнешние побочные эффекты , исходящие потребителиКогда объем событий огромный и нет retention для дедупликации
Distributed 2PCТеоретически атомарноСложно, хрупко, редко поддерживаетсяОчень ограниченные enterprise-сценарииБольшинство микросервисов

Сравнение подходов

ПодходГде силёнГде слабРискиТипичный прод-сценарий
At-most-onceМинимум дублейВозможна потеря данныхКоммит оффсета до обработкиНизкоценные уведомления
At-least-onceНе теряет записи при нормальной настройкеВозможны дублиНужна идемпотентностьБольшинство event-driven сервисов
Idempotent producerЗащита от producer retry duplicatesНе решает оффсет потребителяЛожное чувство EOSProducer пишет события в Kafka
Kafka transactionsАтомарность output + offsetsНе покрывает БД/HTTPДлинный транзакции, ограждение, ACLKafka-to-Kafka пайплайн
Kafka Streams exactly_once_v2Интегрированная модель stream processingМеньше контроля imperative codeСложность хранилища состояний/журнала измененийАгрегации, объединения, обогащение
Outbox + idempotent consumersНадёжная интеграция с БДДубли возможны, но управляемыЛаг релея, зависание исходящего сообщенияЗаказ в БД -> Kafka события
Inbox/dedupe tableЗащита от внешних побочных эффектовStorage и cleanupВысокая кардинальность идентификаторовПлтежи, книги учета, инвентаризация

Практика внедрения: от MVP до прода

  1. На MVP начните с at-least-once и идемпотентной бизнес-операции. Не включайте Kafka transactions, пока не ясно, какую конкретно консистентность они должны дать.
  2. Опишите pipeline: входящий топик, исходящий топик, консьюмер группа, business key, event id, partition key, retry, DLT, owner.
  3. Решите, где граница exactly-once: только Kafka-to-Kafka или Kafka + БД + внешними побочными эффектами.
  4. Для Kafka-to-Kafka включите транзакционный продьюсер, read_committed, enable-auto-commit=false, unique transaction-id-prefix.
  5. Для побочных эффектов БД добавьте констрейнтами уникальности на eventId/commandId.
  6. Для БД -> Kafka добавьте outbox, relay, idempotent downstream.
  7. Для внешнего провайдера добавьте idempotency key и таблицу попыток.
  8. Проверьте ACL: Topic Read/Write, Group Read, TransactionalId Write/Describe, IdempotentWrite.
  9. Нагрузочно протестируйте batch size, transaction duration, max.poll.records, producer buffer.
  10. Протестируйте crash после processing, после send, до commit, после коммита БД, до отметки outbox как SENT.
  11. Настройте dashboard: lag, transaction latency, aborts, retries, rebalances, ISR брокера.
  12. Добавьте алерты не только на lag, но и на abort spike, producer errors, under min ISR.
  13. Сделайте rollout на малом traffic slice.
  14. Во время rollout сравнивайте business counts: взодящие события, исходящие события, уникальные идентификаторы событий, дублирующиеся идентификаторы.
  15. Подготовьте rollback: как отключить потребителя, как переиграть оффсеты, как replay топик, как восстановить outbox.
  16. Подготовьте runbook для ProducerFencedException, transaction timeout, authorization failure, зависшая консьюмер группа.
  17. Документируйте ADR: почему выбрали transactions/outbox/idempotency, какие гарантии есть и каких нет.
  18. Проверьте, что команда поддержки понимает разницу между дубликатами Kafka записи и дубликатами бизнес-события.
  19. Не используйте exactly once, если occasional duplicates дешевле, чем эксплуатационная сложность transactions.
  20. Пересмотрите решение после первой реальной нагрузки: многие параметры корректно подбираются только по production-like теста.

Чек-лист для разработчика

  • Проверьте, что входящие записи имеют стабильный eventId.
  • Проверьте, что partition key соответствует требуемому порядку обработки.
  • Проверьте, что enable-auto-commit=false.
  • Проверьте, что транзакционные топики потребителя используют read_committed.
  • Проверьте, что producer config не конфликтует с idempotence.
  • Проверьте, что transaction-id-prefix уникален на инстанс.
  • Проверьте, что разные реплики не ограждают друг друга.
  • Проверьте, что внешние HTTP-вызовы не спрятаны внутри долгой Kafka transaction.
  • Проверьте, что побочные эффекты БД защищены констрейнтами уникальности.
  • Проверьте, что outbox relay допускает повторную публикацию без бизнес-дубля.
  • Проверьте, что downstream потребители дедуплицируют по идентификатору бизнес события, а не по Kafka оффсету.
  • Проверьте, что retry не выполняет повторно необратимую операцию без idempotency key.
  • Проверьте, что DLT содержит достаточно metadata: topic, partition, offset, eventId, exception class.
  • Проверьте, что ProducerFencedException не глушится.
  • Проверьте, что transaction timeout больше нормального processing time, но не скрывает зависания.
  • Проверьте, что max.poll.interval.ms согласован с worst-case processing time.
  • Проверьте, что dashboard показывает producer errors/retries, transaction timings, lag и rebalances вместе.
  • Проверьте ACL на Topic, Group, TransactionalId и IdempotentWrite.
  • Проверьте, что replay топик не создает повторный business effect.
  • Проверьте, что есть ранбук для зависшего аутбокса и сброс оффсета.
  • Проверьте, что integration tests моделируют crash до и после commit.
  • Проверьте, что команда может объяснить, где exactly-once заканчивается.

Что важно запомнить

  1. Kafka exactly once — это не «любое действие выполнится один раз», а атомарность Kafka исходящие записи и входящие оффсеты.
  2. Idempotent producer защищает от producer retry duplicates, но не заменяет transactions.
  3. read_committed обязателен для потребителей, которые не должны видеть прерванные транзакционные записи.
  4. transactional.id — identity producer, а не id транзакции; его надо проектировать.
  5. Kafka transactions не делают PostgreSQL, HTTP и платежного провайдера частью Kafka коммита.
  6. Для побочных эффектов БД чаще нужен outbox, unique constraints и идемпотентные потребители.
  7. Долгие транзакции могут задерживать downstream через LSO.
  8. Exactly-once без observability превращается в сложный механизм, который команда не сможет диагностировать ночью.

Заключение

Kafka exactly once — сильный инструмент, но он требует трезвого отношения. Он отлично работает там, где задача действительно Kafka-native: прочитать записи, вычислить результат, записать исходящие записи и атомарно зафиксировать оффсеты. В таких pipelines он убирает целый класс дублей и потерь, которые иначе приходится закрывать ручной логикой.

Но как только pipeline выходит за границу Kafka, магия заканчивается. Платёжный провайдер не знает про Kafka transaction. PostgreSQL не обязан commit’иться вместе с Kafka оффсет. Email gateway не откатит письмо, если producer abort’нулся. Здесь инженерная зрелость не в том, чтобы везде включить transactions, а в том, чтобы честно нарисовать failure modes и закрыть их правильными механизмами.

Exactly once – это не кнопка надежности. Это часть архитектуры консистентности. Понимание его границ отличает разработчика, который просто использует Kafka API, от инженера, который способен отвечать за данные в проде.

Источники и что стоит почитать

  1. Apache Kafka Design – раздел про exactly-once доставку, транзакционный продьюсер, оффсеты потербителя и границы гарантий Kafka.
  2. Apache Kafka Producer Configs 4.2acks, enable.idempotence, retries, max.in.flight.requests.per.connection и связанные ограничения.
  3. Apache Kafka Consumer Configs isolation.level=read_committed, Last Stable Offset и поведение consumers при open transactions.
  4. KIP-98: Exactly Once Delivery and Transactional Messaging – исходная мотивация Kafka транзакций, различие идемпотентного продсьюера и гарантии транзакционности.
  5. KIP-447: Producer scalability for exactly once semantics – EOS v2, ограждение и связь транзакционных продьюсеров с семантикой потребительской группы..
  6. Apache Kafka Streams Core Concepts – exactly-once processing semantics и exactly_once_v2.
  7. Spring Kafka Exactly Once Semantics – как контейнер листенера начинает транзакцию, отправляет оффсет в транзакцию и что означает EOS в Spring Kafka.
  8. Spring Kafka TransactionstransactionIdPrefix, KafkaTransactionManager, синхронизация транзакций, ограничения повторных попыток.
  9. Spring Boot Apache Kafka Support – auto-configuration KafkaTemplate, @KafkaListener, KafkaTransactionManager при transaction-id-prefix.
  10. Confluent: Transactions in Apache Kafka – transaction coordinator, transaction log, transaction markers и performance overhead.
  11. Apache Kafka Monitoring – JMX, метрики брокера, вопросы безопасности для удаленного JMX.
  12. Confluent Producer and Consumer Metrics – метрики транзакций производителя, задержка потребителя, опрос, выборка и метрики перебалансировки.
  13. Apache Kafka Authorization and ACLs – Topic, Group, TransactionalId, IdempotentWrite и CLI для ACL.
  14. Testcontainers Kafka Module – актуальный org.testcontainers.kafka.KafkaContainer и Kafka integration tests.

Loading