Оглавление
- Введение
- Что такое паттерн Transactional Outbox и зачем он нужен
- Архитектура паттерна Outbox
- Последовательность работы
- Реализация паттерна Outbox на Java
- Реализация паттерна Outbox на Go
- Консистентность, дедупликация и ретраи: как это обеспечивается
- Заключение
Введение
В современном мире микросервисов часто возникает задача обеспечить согласованность данных между несколькими сервисами без использования тяжелых распределенных транзакций. Например, сервис обработки заказов должен сохранить информацию о новом заказе в своей базе данных и отправить событие о создании заказа в брокер сообщений, чтобы другие сервисы (склад, доставка и т.д.) могли отреагировать. Наивная реализация — выполнить две операции (запись в базу и отправку сообщения) последовательно. Однако такой подход сталкивается с проблемой двойной записи (dual write problem): если после сохранения в базу приложение аварийно завершится до отправки события, получим несогласованность (данные в базе есть, а событие не дошло). Обратная ситуация тоже опасна: сообщение может отправиться до фиксации транзакции в базе, и тогда потребители получат событие о несуществующих данных.
Распределенные транзакции (например, двухфазный коммит 2PC) теоретически решают эту проблему, но на практике мало применимы: не каждый брокер или БД их поддерживает, да и связывать их единым менеджером транзакций затруднительно. Согласно теореме CAP, в распределенной системе невозможно одновременно гарантировать сильную согласованность и высокую доступность в условиях сетевых сбоев. Поэтому в event-driven (событийно-ориентированных) архитектурах приходится идти на компромисс – обеспечивать eventual consistency (итоговую согласованность) вместо мгновенной. Transactional Outbox Pattern (паттерн «транзакционный Outbox») – это шаблон, позволяющий добиться гарантированной доставки событий при межсервисном взаимодействии, избегая потери данных и используя подход at-least-once delivery (минимум один раз). Он решает проблему двойной записи и обеспечивает атомарность: событие будет отправлено только если транзакция с записью в базу данных успешна, и никогда – если она откатилась.
Что такое паттерн Transactional Outbox и зачем он нужен
Transactional Outbox – это архитектурный паттерн, используемый для надёжной интеграции базы данных с системой обмена сообщениями без распределенных транзакций. Идея проста: вместо отправки сообщения напрямую брокеру, сервис сначала сохраняет сообщение (событие) в специальную таблицу базы данных – так называемый Outbox – в рамках той же транзакции, что и основные изменения данных. Затем отдельный процесс (компонент Message Relay) читает эту таблицу Outbox и публикует сохраненные сообщения в брокер сообщений асинхронно. Таким образом достигается атомарность: бизнес-данные и связанное с ними событие фиксируются единой транзакцией – либо вместе, либо не фиксируется ничего. После успешной отправки во внешнюю очередь запись в Outbox помечается обработанной или удаляется.
Этот паттерн нужен в ситуациях, когда сервисы обмениваются событиями и требуется гарантия, что ни одно событие не потеряется, а бизнес-данные и события остаются синхронизированными. Например, при реализации саг или других механизмов, где изменение состояния в одном сервисе должно породить событие для других. В контексте CAP-подхода Outbox-паттерн приносит элемент последовательной (итоговой) согласованности, жертвуя мгновенной доставкой. Он устраняет необходимость в 2PC и снижает связанность сервисов с брокером: каждый сервис отвечает только за свою БД, а доставка событий возлагается на внешний механизм публикации. Паттерн Outbox решает проблему надёжности межсервисной коммуникации: нам не нужно беспокоиться, что брокер или смежный сервис недоступны в момент обработки – сообщение не потеряется, а «полежит» в базе до лучших времён и будет отправлено, когда всё заработает. Также он гарантирует, что сообщение отправится только после успешного коммита транзакции в базе, и никогда – если транзакция откатилась (тем самым предотвращая ложные срабатывания).
Важно понимать, что Outbox-подход обеспечивает семантику at-least-once: каждое событие будет доставлено как минимум один раз. Семантика exactly-once доставки в распределенных системах практически недостижима в чистом виде (проблема соглашения двух генералов), а at-most-once (максимум один раз, но возможно ни разу) нас не устраивает, так как грозит потерей событий. Поэтому выбор at-least-once – оправданный компромисс: мы гарантируем, что событие точно будет доставлено, пусть даже иногда дублем. Появление дубликатов приемлемо, если правильно с ними работать – об этом поговорим далее.
Архитектура паттерна Outbox
Ниже представлена упрощенная C4-диаграмма (уровень Container), иллюстрирующая архитектуру решения. На диаграмме показан сервис Order Service, использующий паттерн Outbox, и взаимодействие с другими компонентами:

Рис. 1: Контейнерная диаграмма архитектуры паттерна Transactional Outbox.
Пользователь инициирует создание заказа через HTTP-запрос к Order Service
. В рамках одной транзакции сохраняется заказ и соответствующее событие в таблице Outbox. Асинхронный компонент Outbox Processor
читает события из базы данных и публикует их в Kafka. Delivery Service
, подписанный на Kafka-топик, получает событие OrderCreated
и выполняет бизнес-логику обработки доставки.
На этой диаграмме Order Service при обработке запроса пользователя сохраняет новый заказ в свою базу данных (Orders DB). В той же транзакции Order Service добавляет запись о событии (например, OrderCreated) в таблицу Outbox. Отдельный поток (Outbox-процессор) затем читает из таблицы Outbox неподтвержденные события и публикует их в брокер сообщений (Kafka). Другой сервис (Delivery Service) получает событие из Kafka и выполняет соответствующие действия (например, резервирует слот доставки). Обратите внимание: Order Service не взаимодействует с Delivery Service напрямую – коммуникация идет через брокер, что обеспечивает слабую связанность и устойчивость к сбоям.
Последовательность работы
Рассмотрим последовательность шагов при использовании Transactional Outbox на примере создания заказа. Диаграмма последовательности ниже отражает ключевые этапы:

Рис. 2: Диаграмма последовательности работы паттерна Transactional Outbox.
Пользователь отправляет HTTP-запрос на создание заказа. OrderController
вызывает OrderService
, который в рамках одной транзакции сохраняет заказ и событие в таблицу Outbox. После коммита транзакции Outbox Processor
асинхронно считывает новые события и публикует их в Kafka. DeliveryConsumer
получает событие OrderCreated
и идемпотентно обрабатывает его. После успешной отправки событие удаляется или помечается как обработанное.
- HTTP-запрос: Пользователь отправляет запрос на создание заказа (например, POST
/orders
). Контроллер OrderController принимает запрос и вызывает бизнес-метод сервиса. - Локальная транзакция: OrderService начинает транзакцию базы данных. В рамках транзакции сначала сохраняется новая сущность заказа (в таблицу заказов). Затем формируется событие (например, JSON с деталями заказа) и вставляется запись в таблицу Outbox той же базы.
- Фиксация транзакции: Если операции прошли успешно, транзакция коммитится. Теперь в базе атомарно зафиксированы оба факта: новый заказ и событие для отправки. Контроллер возвращает пользователю успешный ответ (новый заказ создан). Если же что-то пошло не так (например, ошибка записи), транзакция откатится – ни заказ, ни событие не сохранятся, и внешний мир не узнает о неудачной попытке.
- Публикация события: Отдельный поток или компонент (на диаграмме обозначен как часть Order Service) асинхронно опрашивает таблицу Outbox (например, по расписанию или триггеру). Обнаружив новое событие, он отправляет его в брокер сообщений (Kafka). Отправка вне рамок основной транзакции, но данные надежно хранятся в Outbox, поэтому даже если брокер временно недоступен, событие никуда не пропадет.
- Доставка и обработка: Брокер доставляет сообщение подписчикам. Сервис-потребитель (Delivery Service) получает событие (например,
OrderCreated
) и выполняет необходимую логику – резервирует ресурсы, отправляет уведомления и т.д. Потребитель должен обработать событие идемпотентно, т.е. корректно справиться с возможными дублирующими сообщениями. - Удаление из Outbox: После успешной публикации и, желательно, подтверждения доставки, событие помечается как обработанное – например, удаляется из таблицы Outbox или обновляется его статус. Это предотвращает повторную отправку этого же события при следующем цикле обработки.
Заметим, что в этой последовательности нет единой распределенной транзакции между базой Order Service и брокером Kafka, однако за счет паттерна Outbox достигается аналогичная надежность: либо заказ и событие обоих присутствуют (и событие будет доставлено), либо ничего не произошло. Вместо жесткой синхронизации используется принцип eventual consistency: Delivery Service получит информацию о заказе чуть позже, но гарантированно получит.
Реализация паттерна Outbox на Java (Spring Boot + PostgreSQL + Kafka)
Реализуем данный паттерн в простом микросервисе на Spring Boot (Java). В качестве примера возьмем сервис заказов OrderService, который сохраняет заказы в PostgreSQL и уведомляет другие сервисы через Kafka о создании или отмене заказа. Структура приложения будет разделена на слои: REST-контроллер, сервисный слой и слой доступа к данным (репозиторий).
Кроме того, нам понадобится сущность Outbox (JPA-Entity), соответствующая таблице outbox
в базе, и компонент для фоновой отправки событий из этой таблицы в Kafka. Используем Spring Data JPA для работы с БД, Spring Kafka для интеграции с Kafka, а для интеграционных тестов – Testcontainers (поднимем реальную базу PostgreSQL и Kafka в Docker-контейнерах).
Сущности и репозитории
Начнем с определения JPA-сущностей и репозиториев. Нам нужны как минимум две таблицы: orders
(хранит заказы) и outbox
(хранит события, ожидающие отправки). Для простоты определим сущности Order и OutboxEvent.
@Entity
@Table(name = "orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY) // авто-инкремент ID
private Long id;
private String product;
private int quantity;
private String status;
public Order() {} // конструктор по умолчанию для JPA
public Order(String product, int quantity) {
this.product = product;
this.quantity = quantity;
this.status = "NEW";
}
// getters/setters ниже (можно сгенерировать автоматически)
}
Класс Order
описывает заказ с полями product
(например, название товара), quantity
и status
. При создании нового заказа статус по умолчанию “NEW” (новый).
@Entity
@Table(name = "outbox")
public class OutboxEvent {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "event_type", nullable = false)
private String eventType;
@Column(name = "payload", nullable = false)
private String payload;
@Column(name = "created_at", nullable = false)
private Instant createdAt = Instant.now();
public OutboxEvent() {}
public OutboxEvent(String eventType, String payload) {
this.eventType = eventType;
this.payload = payload;
}
// getters/setters
}
Сущность OutboxEvent
содержит поля: уникальный id
события, тип события (eventType
, например "OrderCreated"
или "OrderCancelled"
), тело события payload
(строка в формате JSON или другой, хранящая детали) и время создания createdAt
. Поле времени нужно для аудита или для упорядочивания, но можно обойтись и без него. Первичный ключ id
будет использоваться как уникальный идентификатор события (мы можем также использовать его как ключ сообщения в Kafka, чтобы обеспечить порядок сообщений по ключу).
Теперь определим репозитории. Благодаря Spring Data JPA они будут довольно простыми интерфейсами:
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
// базовых методов CRUD достаточно
}
@Repository
public interface OutboxRepository extends JpaRepository<OutboxEvent, Long> {
List<OutboxEvent> findAllByOrderByCreatedAt();
// можно добавить метод для получения событий в порядке возрастания времени
}
Репозиторий OrderRepository
предоставляет стандартные методы для CRUD операций с заказами. OutboxRepository
мы оснастили методом findAllByOrderByCreatedAt()
, чтобы получать события, отсортированные по времени вставки – это поможет сохранять порядок публикации событий в том же порядке, в котором коммитились транзакции с ними (важно для сохранения последовательности событий, связанных с одним агрегатом).
Сервисный слой: транзакционное сохранение заказа и события
Сервисный слой содержит бизнес-логику – в нашем случае создание или отмену заказа. Здесь мы применяем паттерн Outbox: при выполнении операции сохраняем в базу не только объект заказа, но и сопутствующее событие. Сделаем это внутри одной транзакции. В Spring для этого достаточно аннотации @Transactional
на методе. Если метод выполнится без ошибок – Spring коммитит транзакцию, при исключении – откатывает.
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepo;
@Autowired
private OutboxRepository outboxRepo;
@Autowired
private ObjectMapper objectMapper; // для сериализации заказа в JSON
@Transactional
public Order createOrder(String product, int quantity) {
// 1. Сохранение нового заказа
Order order = new Order(product, quantity);
order.setStatus("CREATED");
order = orderRepo.save(order);
// 2. Формирование события
try {
String eventPayload = objectMapper.writeValueAsString(order);
OutboxEvent event = new OutboxEvent("OrderCreated", eventPayload);
outboxRepo.save(event);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize order event", e);
}
return order;
}
@Transactional
public void cancelOrder(Long orderId) {
// Отмена существующего заказа
Order order = orderRepo.findById(orderId)
.orElseThrow(() -> new RuntimeException("Order not found"));
order.setStatus("CANCELLED");
orderRepo.save(order);
try {
String eventPayload = objectMapper.writeValueAsString(order);
OutboxEvent event = new OutboxEvent("OrderCancelled", eventPayload);
outboxRepo.save(event);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize cancel event", e);
}
}
}
В OrderService.createOrder
мы создаем заказ, сохраняем его через orderRepo.save
(после этого у объекта order
уже будет сгенерированный id
). Затем сериализуем объект заказа в JSON и создаем запись OutboxEvent
с типом "OrderCreated"
. Сохраняем событие через outboxRepo.save
. Поскольку метод помечен @Transactional
, оба сохранения (в orders
и в outbox
) произойдут в одной транзакции. Если, к примеру, произойдет ошибка сериализации JSON и выбросится исключение, транзакция откатится – заказ не будет сохранен и событие тоже, что корректно. Если же все прошло успешно, транзакция фиксируется: в базе теперь есть и новый заказ, и соответствующая ему запись в Outbox. Аналогично метод cancelOrder
сохраняет обновление статуса заказа и добавляет событие отмены.
REST-контроллер
Контроллер предоставляет HTTP API для использования нашего сервиса. Напишем простой контроллер с двумя endpoint’ами: создание заказа (POST) и отмена заказа (POST/DELETE). Для простоты отмену сделаем также POST запросом с ID заказа.
@RestController
@RequestMapping("/orders")
public class OrderController {
@Autowired
private OrderService orderService;
@PostMapping
public ResponseEntity<Order> createOrder(@RequestBody Order request) {
Order created = orderService.createOrder(request.getProduct(), request.getQuantity());
return ResponseEntity.status(HttpStatus.CREATED).body(created);
}
@PostMapping("/{id}/cancel")
public ResponseEntity<Void> cancelOrder(@PathVariable Long id) {
orderService.cancelOrder(id);
return ResponseEntity.ok().build();
}
}
Метод createOrder
принимает в теле запроса JSON с информацией о заказе (продукт, количество), вызывает сервис и возвращает созданный заказ вместе с сгенерированным ID. Метод cancelOrder
по URL /orders/{id}/cancel
отменяет заказ с указанным ID. После этих операций записи в Outbox уже находятся в базе, готовы к отправке.
Отправка событий из Outbox в Kafka (Event Publisher)
Теперь реализуем компонент, отвечающий за фактическую пересылку событий в Kafka. В простейшем случае это можно сделать с помощью планового задания (@Scheduled) в самом приложении. Альтернативный вариант – вынести эту логику в отдельный микросервис/процесс, который периодически читает outbox-таблицу (такой подход называют Polling Publisher). Мы воспользуемся способом с @Scheduled для наглядности. Не забудьте включить поддержу планировщика в приложении через аннотацию @EnableScheduling
(например, на классе с @SpringBootApplication
).
@Service
public class OutboxScheduler {
@Autowired
private OutboxRepository outboxRepo;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${app.outbox.topic:order_events}")
private String outboxTopic;
// Запуск каждые 5 секунд (можно настроить в application.yml)
@Scheduled(fixedRateString = "5000")
public void processOutboxEvents() {
List<OutboxEvent> events = outboxRepo.findAllByOrderByCreatedAt();
for (OutboxEvent event : events) {
try {
// Отправляем событие в Kafka (ключ – id события, значение – payload)
kafkaTemplate.send(outboxTopic, event.getId().toString(), event.getPayload()).get();
// Удаляем успешно отправленное событие из базы
outboxRepo.deleteById(event.getId());
} catch (Exception e) {
// Логируем и оставляем запись в Outbox для повторной попытки
System.err.println("Failed to send event " + event.getId() + ": " + e.getMessage());
// Можно реализовать логику увеличения счетчика попыток, DLQ и т.п.
}
}
}
}
Метод processOutboxEvents()
вытаскивает все события из таблицы Outbox (на практике можно ограничивать выборку, если событий очень много, или читать порциями). Для каждого события пытаемся отправить сообщение в Kafka: используем KafkaTemplate
(сконфигурированный продюсер) и вызываем send(topic, key, value)
. Здесь мы передаем ID события как ключ сообщения, а само тело события (payload
) как значение. Ключ позволяет Kafka гарантировать порядок сообщений с одинаковым ключом внутри раздела (partition), что полезно, если, например, все события по одному заказу отправляются с ключом заказа – тогда они будут обработаны последовательно потребителем. В нашем случае ключ – ID outbox-события – уникален для каждого события, но мы все равно получим упорядоченную по createdAt
доставку, поскольку берём события из БД в порядке их записи.
Вызов get()
на результате send()
делает отправку синхронной – метод подождет, пока Kafka подтвердит прием сообщения (или выкинет исключение). В продакшене это может замедлить обработку, но упрощает пример и дает уверенность, что сообщение действительно отправлено. Если отправка прошла успешно, мы удаляем запись из таблицы Outbox, тем самым помечая ее обработанной. Если случилось исключение (например, временный сбой сети или брокера), в блоке catch
мы просто логируем ошибку и не удаляем запись из Outbox. Благодаря этому механизм ретраев реализуется автоматически: при следующем запуске processOutboxEvents()
в списке останутся ранее неудаленные события, и будет новая попытка их отправить. Мы можем также добавить счетчик попыток в таблицу Outbox и прекращать попытки после N неудач (перемещая событие в особый список dead letter), но в простом случае можно повторять бесконечно, пока не получится (атLeast-once гарантия).
? Замечание: Если ваш сервис масштабируется горизонтально (несколько экземпляров Order Service), нужно избегать, чтобы два инстанса одновременно вычитали и отправляли одни и те же события. Для этого обычно реализуют механизм блокировки записей Outbox при выборке (
SELECT ... FOR UPDATE SKIP LOCKED
в PostgreSQL) либо используют координацию (например, распределенный lock или выбравший лидер). В нашем примерe мы предполагаем один поток обработки Outbox. В противном случае, следует расширитьOutboxRepository
методами типаfindFirstNBy...ForUpdate()
и помечать записи, над которыми работает отправитель.
Интеграционные тесты с Testcontainers
Протестируем работу паттерна End-to-End: сохраним заказ, заставим сработать Outbox-процессор и проверим, что сообщение дошло до Kafka. Для этого запускаем встроенные Docker-контейнеры с необходимыми компонентами.
С помощью Testcontainers можно поднять PostgreSQL и Kafka прямо из теста. Например, напишем JUnit 5 тестовый класс:
@Testcontainers
@SpringBootTest
public class OrderServiceIntegrationTest {
@Container // PostgreSQL в контейнере
private static PostgreSQLContainer<?> postgres =
new PostgreSQLContainer<>("postgres:15")
.withDatabaseName("ordersdb")
.withUsername("postgres").withPassword("postgres");
@Container // Kafka в контейнере (Confluent)
private static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.1"));
@DynamicPropertySource // Прописываем URL контейнеров в свойства Spring Boot
static void configureSpringProperties(DynamicPropertyRegistry registry) {
registry.add("spring.datasource.url", postgres::getJdbcUrl);
registry.add("spring.datasource.username", postgres::getUsername);
registry.add("spring.datasource.password", postgres::getPassword);
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
}
@Autowired
private OrderService orderService;
@Autowired
private OutboxRepository outboxRepo;
@Test
public void testOutboxFlow() throws Exception {
// 1. Вызываем бизнес-метод создания заказа
Order order = orderService.createOrder("Coffee machine", 1);
Long orderId = order.getId();
// После коммита транзакции должен появиться OutboxEvent
List<OutboxEvent> pendingEvents = outboxRepo.findAll();
assertFalse(pendingEvents.isEmpty());
// 2. Ждем немного, чтобы сработал @Scheduled отправитель
Thread.sleep(6000);
// 3. Проверяем, что Outbox-таблица очистилась (событие отправлено)
pendingEvents = outboxRepo.findAll();
assertTrue(pendingEvents.isEmpty());
// 4. (Опционально) Проверяем, что сообщение реально в Kafka:
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("order_events"));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
assertEquals(1, records.count());
ConsumerRecord<String, String> record = records.iterator().next();
assertTrue(record.key().matches("\\d+")); // ключ – число (ID события)
assertTrue(record.value().contains("\"id\":" + orderId));
}
}
}
Здесь мы запускаем контейнеры: PostgreSQLContainer для базы (инициализируется пустой, схема может быть создана автоматически через flyway/liquibase или с помощью spring.jpa.hibernate.ddl-auto=create
для простоты) и KafkaContainer для Kafka. @DynamicPropertySource
настраивает Spring Boot использовать URL-ы контейнеров. Затем в тесте:
- Вызываем
orderService.createOrder(...)
напрямую (минуя REST, для простоты). После этого транзакция завершилась, Order и OutboxEvent записаны в базу. Проверяем, что в Outbox-таблице есть непустая запись. - Ждем немного дольше периода, заданного в @Scheduled (у нас 5 секунд), чтобы Outbox-процессор успел сработать в фоне.
- Снова смотрим Outbox-таблицу – ожидаем, что она пустая, т.к. событие должно было отправиться и удалиться.
- Опционально, подключаемся к Kafka как потребитель (KafkaConsumer) и проверяем, что получили хотя бы одно сообщение в топике
order_events
. В реальном приложении можно вместо этого использовать @Testcontainers библиотеку для Kafka (или заменить Kafka на встроенный Redpanda для быстроты тестов). В проверке мы удостоверяемся, что ключ сообщения выглядит как число (наш ID), а значение содержит подстроку с ID заказа – т.е. это JSON нашего заказа.
Таким образом мы протестировали, что цепочка от сохранения заказа до публикации события в Kafka работает. В более сложных тестах можно поднять consumer-сервис или использовать эхо-консюмер, но это выходит за рамки примера.
Реализация паттерна Outbox на Go (Gin + PostgreSQL + Kafka)
Аналогичный подход реализуем на Go: у нас будет веб-сервис (на фреймворке Gin) с REST API для создания заказов, база данных PostgreSQL и отложенная отправка событий в Kafka. Покажем основные компоненты: модель данных, функции работы с базой (с транзакцией), HTTP-обработчики и фоновый процесс отправки. Кроме того, рассмотрим, как организовать интеграционный тест на Go с использованием dockertest (поднятие контейнеров для БД и Kafka) и, при желании, библиотеки go-testfixtures для подготовки данных.
Предположим, что у нас есть структура заказа и outbox-события. Определим их и необходимые методы:
Модели данных и работа с БД
Для работы с PostgreSQL используем стандартный database/sql
(через драйвер lib/pq
или "github.com/jackc/pgx/v4"
). В примере будем использовать database/sql
. Начнем с описания схемы (DDL):
-- SQL схема (PostgreSQL)
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
product TEXT,
quantity INT,
status TEXT
);
CREATE TABLE outbox (
id SERIAL PRIMARY KEY,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_outbox_createdat ON outbox(created_at);
Таблицы аналогичны тому, что мы сделали в Java: orders
и outbox
. Теперь, структуры на Go для этих таблиц:
type Order struct {
ID int64
Product string
Quantity int
Status string
}
type OutboxEvent struct {
ID int64
EventType string
Payload string
CreatedAt time.Time
}
Для сериализации заказа в JSON можно определить вспомогательную функцию:
import "encoding/json"
func makeOutboxEvent(eventType string, order Order) (OutboxEvent, error) {
data, err := json.Marshal(order)
if err != nil {
return OutboxEvent{}, err
}
return OutboxEvent{
EventType: eventType,
Payload: string(data),
}, nil
}
Теперь, ключевой момент – сохранение заказа и события в одной транзакции. В Go мы получаем транзакцию через db.Begin()
и далее выполняем SQL запросы. Напишем функцию-репозиторий SaveOrderWithEvent
:
import (
"context"
"database/sql"
_ "github.com/lib/pq"
)
func SaveOrderWithEvent(ctx context.Context, db *sql.DB, order Order, eventType string) (Order, error) {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return order, err
}
// Вставляем заказ
err = tx.QueryRowContext(ctx,
"INSERT INTO orders(product, quantity, status) VALUES($1,$2,$3) RETURNING id",
order.Product, order.Quantity, order.Status).Scan(&order.ID)
if err != nil {
tx.Rollback()
return order, err
}
// Формируем событие
outboxEvent, err := makeOutboxEvent(eventType, order)
if err != nil {
tx.Rollback()
return order, err
}
// Вставляем событие
err = tx.QueryRowContext(ctx,
"INSERT INTO outbox(event_type, payload) VALUES($1,$2) RETURNING id",
outboxEvent.EventType, outboxEvent.Payload).Scan(&outboxEvent.ID)
if err != nil {
tx.Rollback()
return order, err
}
// Успешно записали и заказ, и событие
if err := tx.Commit(); err != nil {
return order, err
}
return order, nil
}
Эта функция открывает транзакцию, вставляет заказ (SQL возвращает сгенерированный id
заказа), затем создает JSON событие и вставляет его в таблицу outbox
(тоже получая id
события, если нужно). Если что-то пойдет не так, выполняется tx.Rollback()
. В случае успеха – tx.Commit()
. Таким образом, гарантируется атомарность сохранения.
HTTP-обработчики (Gin) с использованием транзакции Outbox
Теперь построим HTTP API. Используем Gin для маршрутизации. Создадим endpoints для создания и отмены заказа, аналогично Java-кейсу.
import (
"github.com/gin-gonic/gin"
"net/http"
)
func main() {
db, err := sql.Open("postgres", "host=... user=... password=... dbname=ordersdb sslmode=disable")
if err != nil {
panic(err)
}
defer db.Close()
router := gin.Default()
router.POST("/orders", func(c *gin.Context) {
var req struct {
Product string `json:"product"`
Quantity int `json:"quantity"`
}
if err := c.BindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
order := Order{
Product: req.Product,
Quantity: req.Quantity,
Status: "CREATED",
}
savedOrder, err := SaveOrderWithEvent(c.Request.Context(), db, order, "OrderCreated")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Could not create order"})
} else {
c.JSON(http.StatusCreated, savedOrder)
}
})
router.POST("/orders/:id/cancel", func(c *gin.Context) {
id := c.Param("id")
// получаем order по id
var order Order
err := db.QueryRow("SELECT id, product, quantity, status FROM orders WHERE id=$1", id).
Scan(&order.ID, &order.Product, &order.Quantity, &order.Status)
if err != nil {
c.JSON(http.StatusNotFound, gin.H{"error": "Order not found"})
return
}
if order.Status == "CANCELLED" {
c.JSON(http.StatusOK, gin.H{"message": "Order already cancelled"})
return
}
order.Status = "CANCELLED"
_, err = SaveOrderWithEvent(c.Request.Context(), db, order, "OrderCancelled")
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Could not cancel order"})
} else {
c.Status(http.StatusOK)
}
})
// Запуск HTTP-сервера
router.Run(":8080")
}
В маршруте POST /orders
мы получаем JSON с product
и quantity
, создаем объект Order
со статусом "CREATED"
и вызываем нашу функцию SaveOrderWithEvent
для сохранения заказа и события "OrderCreated"
в одной транзакции. Если успех – отдаем клиенту созданный заказ с ID. В маршруте отмены /orders/:id/cancel
сначала получаем существующий заказ из базы (обычный SELECT), проверяем, что не отменен, присваиваем статус "CANCELLED"
и снова вызываем SaveOrderWithEvent
– здесь он обновит запись заказа (хитрость: в нашей функции мы всегда делаем INSERT. В реальности, лучше сделать UPDATE заказа и отдельный INSERT в outbox в рамках той же транзакции. Чтобы не усложнять, можно переименовать SaveOrderWithEvent
в более общий ExecuteOrderEvent
и определять SQL в зависимости от eventType). Тем не менее, принцип тот же: гарантируем, что и изменение заказа, и событие об этом изменении попадут в базу атомарно.
Фоновый процесс отправки (Kafka Publisher + ретраи)
Для работы с Kafka в Go существует несколько клиентов. Мы воспользуемся популярной библиотекой segmentio/kafka-go (простая в использовании) или Shopify/sarama. В примере покажем с kafka-go, которая позволяет читать и писать сообщения как потоки.
Нам нужен фоновой процесс (goroutine), который периодически просматривает таблицу outbox
и отправляет события. Можно запустить горутину с тикером, либо реализовать отдельный команду/сервис. Для простоты запустим горутину из main()
, но учтите, что в продакшене часто делается отдельный сервис для чтения outbox (во избежание влияния на latency основного сервиса).
import kafka "github.com/segmentio/kafka-go"
// ... внутри main() после router setup, перед router.Run()
go func() {
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"}, // адрес Kafka брокера
Topic: "order_events",
// можно настроить batch size, etc.
})
defer writer.Close()
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
rows, err := db.Query("SELECT id, event_type, payload FROM outbox ORDER BY created_at")
if err != nil {
fmt.Println("Outbox query error:", err)
continue
}
var events []OutboxEvent
for rows.Next() {
var e OutboxEvent
if err := rows.Scan(&e.ID, &e.EventType, &e.Payload); err == nil {
events = append(events, e)
}
}
rows.Close()
for _, event := range events {
msg := kafka.Message{
Key: []byte(fmt.Sprintf("%d", event.ID)),
Value: []byte(event.Payload),
}
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
fmt.Printf("Failed to send event %d: %v\n", event.ID, err)
// не удаляем сообщение, оставляем для ретрая
} else {
// Успешно отправлено, удаляем из таблицы
_, _ = db.Exec("DELETE FROM outbox WHERE id=$1", event.ID)
}
}
}
}()
Эта горутина каждые 5 секунд выполняет SELECT из таблицы outbox
, собирает все события и отправляет их в Kafka. Для каждого события формируется сообщение kafka.Message
с ключом (ID события) и значением (payload). Мы используем kafka.Writer
– высокоуровневый writer, который сам буферизует и пишет сообщения. После отправки каждого сообщения, при успехе, выполняем DELETE
соответствующей записи из таблицы. В случае ошибки – печатаем проблему и не удаляем запись, чтобы через следующий тиккер попробовать снова. Это и есть механизм повторных попыток (retry) в действии: пока мы не сможем доставить событие, запись будет лежать в Outbox. Если Kafka недоступен какое-то время, наш процесс будет каждые 5 секунд пытаться отправить и получать ошибки, но данные не потеряются. Как только связь восстановится, отправка пройдет и записи очистятся.
В более продвинутом варианте можно добавить ограничение на число попыток: например, завести поле
attempts
в таблице, увеличивать его при неудаче и если превышен порог – либо прекращать попытки, либо переносить событие в особую таблицу (Dead Letter). Здесь для простоты бесконечные ретраи.
Интеграционные тесты на Go (dockertest, go-testfixtures)
Протестируем корректность реализации. Используем библиотеку dockertest для поднятия PostgreSQL и Kafka в контейнерах во время теста, а также github.com/ory/dockertest/v3/docker для настройки Kafka. Optionally, можно воспользоваться go-testfixtures для подготовки начальных данных, но нам скорее нужно проверить конечный результат (дошло ли сообщение).
Пример теста (с использованием testing
):
import (
"testing"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
kafka "github.com/segmentio/kafka-go"
// ... другие импорты
)
func TestOutboxEndToEnd(t *testing.T) {
pool, err := dockertest.NewPool("")
if err != nil {
t.Fatal(err)
}
// 1. Запускаем PostgreSQL контейнер
pgResource, err := pool.Run("postgres", "15", []string{"POSTGRES_PASSWORD=postgres"})
if err != nil {
t.Fatal(err)
}
// 2. Запускаем Kafka (используем Redpanda для простоты)
kafkaResource, err := pool.Run("docker.redpanda.com/vectorized/redpanda", "latest", nil)
if err != nil {
t.Fatal(err)
}
// Очистка контейнеров после теста
t.Cleanup(func() {
pool.Purge(pgResource)
pool.Purge(kafkaResource)
})
// 3. Настраиваем подключения
pgURL := fmt.Sprintf("postgres://postgres:postgres@localhost:%s/postgres?sslmode=disable", pgResource.GetPort("5432/tcp"))
db, err := sql.Open("postgres", pgURL)
require.NoError(t, err)
require.NoError(t, db.Ping())
// Инициализируем таблицы (можно выполнить SQL схемы)
_, err = db.Exec(schemaSQL) // schemaSQL содержит DDL, как выше
require.NoError(t, err)
// Адрес Kafka брокера
kafkaBroker := fmt.Sprintf("localhost:%s", kafkaResource.GetPort("9092/tcp"))
// 4. Выполняем тестируемую логику
order := Order{Product: "Phone", Quantity: 1, Status: "CREATED"}
saved, err := SaveOrderWithEvent(context.Background(), db, order, "OrderCreated")
require.NoError(t, err)
// Стартуем отдельную горутину, чтобы эмулировать сервис отправки (или вызываем функцию обработки вручную)
go func() {
// В реальности у нас это внутри main, но для теста запустим копию:
writer := kafka.NewWriter(kafka.WriterConfig{Brokers: []string{kafkaBroker}, Topic: "order_events"})
defer writer.Close()
// Пробуем отправить все события
rows, _ := db.Query("SELECT id, event_type, payload FROM outbox")
defer rows.Close()
for rows.Next() {
var e OutboxEvent
_ = rows.Scan(&e.ID, &e.EventType, &e.Payload)
writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(fmt.Sprintf("%d", e.ID)),
Value: []byte(e.Payload),
})
db.Exec("DELETE FROM outbox WHERE id=$1", e.ID)
}
}()
// Подождем чуть, чтобы горутина успела отправить
time.Sleep(2 * time.Second)
// 5. Проверяем, что в outbox больше нет записей
var count int
_ = db.QueryRow("SELECT count(*) FROM outbox").Scan(&count)
require.Equal(t, 0, count)
// 6. Проверяем получение из Kafka
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaBroker},
GroupID: "test-group",
Topic: "order_events",
MinBytes: 1, MaxBytes: 1e6,
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
msg, err := r.ReadMessage(ctx)
require.NoError(t, err)
// Проверяем, что в сообщении содержится наш заказ
require.Contains(t, string(msg.Value), `"product":"Phone"`)
// И ключ равен ID события (в строковом виде)
require.Equal(t, fmt.Sprintf("%d", 1), string(msg.Key))
r.Close()
}
В этом тесте мы:
- Поднимаем PostgreSQL и Kafka (здесь для упрощения Kafka заменен на Redpanda – это Kafka-совместимый брокер, который не требует Zookeeper и быстрее стартует, идеально для тестов. Мы указываем образ
vectorized/redpanda
). - Создаем подключение к базе, применяем SQL-схему для создания таблиц.
- Формируем заказ и вызываем
SaveOrderWithEvent
– проверяем, что нет ошибок. - Запускаем горутину, которая прочитает все записи из outbox и отправит их в Kafka (с последующим удалением). В реальном приложении мы бы вместо этого либо вызвали функцию обработки напрямую, либо лучше инициализировали и запустили наш сервис отправки. Здесь мы вручную повторили то, что делает outbox-процессор.
- Даём немного времени на выполнение и проверяем, что таблица outbox пуста.
- Создаем Kafka reader (консюмер) и читаем сообщение из топика
order_events
. Проверяем, что сообщение содержит JSON с нашим заказом (например, product “Phone”) и что ключ сообщения соответствует ID события (в данном случае, первый вставленный outbox ID=1). Используемsegmentio/kafka-go
для чтения сообщения с таймаутом.
Если все ассерты проходят, значит, наш паттерн сработал: заказ сохранен, событие появилось в Kafka, outbox очистился.
Обратите внимание, мы могли бы использовать пакет go-testfixtures для наполнения БД фиктивными данными, но здесь мы генерируем их через код. Dockertest позволил нам поднять реальные сервисы, что дало уверенность в работоспособности интеграции.
Консистентность, дедупликация и ретраи: как это обеспечивается
Консистентность между записью в БД и отправкой события. Паттерн Outbox обеспечивает атомарность изменений в данных и публикации событий за счет использования локальной транзакции базы данных. В примерах выше и OrderService (Java), и SaveOrderWithEvent (Go) выполняют запись бизнес-объекта и outbox-сообщения в рамках одной транзакции. Таким образом, достигается условие: событие в Outbox появляется только если изменения данных успешно зафиксированы, и наоборот – если транзакция откатилась, в Outbox не останется «осиротевших» событий. Далее, внешняя публикация события отделена от транзакции, но за счет надежного хранения в БД мы знаем: если транзакция прошла, событие рано или поздно будет прочитано и отправлено. По сути, получаем тот же эффект, что и при распределенной транзакции, только осуществленный другим способом. Немного теряем во времени доставки (событие доставляется не мгновенно, а с небольшой задержкой), зато резко повышаем надежность.
Дедупликация событий на стороне потребителя. Как отмечалось, наша схема гарантирует at-least-once доставку, что значит – потребитель может получить дубликаты одного и того же события. Причины: сбой Outbox-процессора после отправки, но до удаления записи (тогда при перезапуске событие отправится повторно), либо повторная доставка брокером (Kafka в некоторых режимах тоже может доставлять дважды). Следовательно, потребители событий должны обрабатывать их идемпотентно. Есть несколько стратегий борьбы с дубликатами:
- Хранение обработанных ID: потребитель может сохранять идентификаторы уже обработанных сообщений (например, в памяти или в своей БД). Каждый входящий event имеет уникальный
ID
(у нас этоOutboxEvent.id
или составной ключ, например, тип события + ID агрегата). Перед обработкой потребитель проверяет: нет ли этого ID в списке уже увиденных? Если есть – пропускает как дубликат. При этом важно, чтобы идентификатор события назначался на стороне отправителя и оставался постоянным при повторных отправках (у нас так и есть – ID outbox-записи генерируется единожды и отправляется с событием). - Идемпотентность по сущности: если событие несет информацию о некоторой сущности (например, заказе), потребитель может использовать уникальный ключ этой сущности для проверки. В нашем примере событие
OrderCreated
содержит orderId, и приемник (скажем, сервис доставки) может хранить у себя статус заказа. Если вдруг придет второй раз событие о том же заказе – он увидит, что доставка уже обработана для этого orderId, и проигнорирует дубликат. - Использование возможностей брокера: в Kafka, например, есть режим idempotent producer и транзакционные темы, обеспечивающие exactly-once processing внутри Kafka. В сочетании с логикой Outbox это позволяет снизить вероятность появления дублей. Кроме того, Kafka поддерживает компактификацию (log compaction) по ключу – если настроить Outbox-топик компактифицируемым и использовать уникальный ключ события, то со временем дубликаты с тем же ключом могут схлопнуться на стороне брокера. Однако, полагаться только на это не стоит – потребитель все равно должен быть готов к повторной обработке.
Таким образом, Outbox-паттерн перекладывает задачу устранения дублей на потребителей, что является обычной практикой в распределенных системах. К счастью, реализовать идемпотентность зачастую несложно (как минимум, хранением processed-IDs).
Ретраи и устойчивость к сбоям брокера/сети. Как мы видели, наш Outbox-процессор (и в Java, и в Go) спроектирован так, что в случае неудачи отправки не удаляет событие из базы. Это значит, что он будет пытаться снова и снова, пока не получит подтверждение. Такой подход гарантирует, что временные проблемы с сетью или недоступность брокера не приведут к потере данных — они просто задержат доставку. При восстановлении соединения процесс возобновит отправку с того места, где остановился. В сущности, мы получаем естественный механизм повторных попыток отправки.
Стоит контролировать бесконечные ретраи: если, например, событие не удается доставить очень долго (может, из-за бага или неверных данных), outbox-таблица будет накапливать «мёртвые» сообщения. Поэтому в продакшене обычно делают дополнительную защиту:
- Счетчик попыток и порог: например, поле
attempt_count
, которое увеличивается при каждой неудаче. Если счетчик превысил определенное число, можно либо перестать пытаться (требуется ручное вмешательство), либо переместить запись в отдельную DLQ (Dead Letter Queue) таблицу для проблемных сообщений и удалить из основного outbox. - Уведомление DevOps: если сообщение застряло и постоянно не отправляется, хорошо бы оповестить инженеров (метрики, алерты).
- Экспоненциальная задержка между ретраями: вместо опроса каждые N секунд можно увеличивать интервал для постоянно фейлящихся сообщений, чтобы не перегружать брокер.
Тем не менее, базовый сценарий с периодическим polling’ом и повторной отправкой уже существенно повышает надежность системы. Даже если сам сервис с Outbox упадет, ничего страшного – при новом запуске он прочитает таблицу и продолжит с места остановки. Данные в Outbox таблице хранятся в транзакционно-надежном хранилище (обычно реляционном), которое обычно реплицируется и бэкапится, поэтому шанс потери минимален.
Заключение
Transactional Outbox Pattern отлично подходит для систем, где требуется надежная передача событий или команд между сервисами при отсутствии глобальных транзакций. Подведем итог, когда стоит его использовать, и рассмотрим сильные и слабые стороны.
Когда использовать:
- Когда у вас микросервисная архитектура и сервисы обмениваются событиями (через Kafka, RabbitMQ, etc.), особенно в случаях, критичных к потере данных (финансовые транзакции, заказы, инвентарь и т.п.).
- Если прямое синхронное взаимодействие сервисов приводит к проблемам надежности или производительности, и вы переходите на event-driven коммуникацию.
- Когда необходима гарантия, что изменения в одном сервисе не будут потеряны при передаче другим, но при этом невозможно или нежелательно использовать распределенные транзакции (CAP: жертвуем мгновенной консистентностью ради доступности и устойчивости к сбоям).
- Если нужна атомарность между локальной операцией и генерацией события. Например, паттерн Outbox часто применяется при реализации Саг (Saga Pattern) и Domain Events – т.е. когда изменение агрегата порождает доменное событие.
Плюсы:
- Надежность и отсутствие потери данных. Паттерн Outbox гарантирует, что событие будет отправлено, если транзакция в базе данных состоялась, и не отправлено в противном случае. Все промежуточные данные сохраняются на диске (в БД), поэтому даже при сбоях сервисов или брокера система восстановится без потерь. Он решает проблему связи между сервисами: вам не нужно беспокоиться, что во время вызова другой сервис или брокер упадет – событие не исчезнет, а дождется подходящего момента.
- Локальная простота – избегаем 2PC. Вместо сложных протоколов согласования (двухфазный коммит) или распределенных транзакционных менеджеров, каждый сервис работает с своей базой, используя привычные локальные транзакции. Это упрощает архитектуру и повышает автономность сервисов.
- Масштабируемость и разгрузка. Отправка событий происходит асинхронно и может выполняться в отдельном процессе. Основной сервис (прием запросов) не задерживается на общение с брокером, что снижает время отклика для пользователя. Обработку Outbox можно масштабировать независимо (например, отдельный воркер пул), оптимизируя под нагрузку сообщений.
- Гибкость в реализации. Outbox – это концепция. Реализовать ее можно разными способами: как мы показывали – через периодический опрос таблицы (polling publisher) или с помощью механизмов чтения транзакционного лога БД (например, Debezium CDC – читает журнал транзакций и находит там записи Outbox для публикации). Второй подход еще более прозрачен для приложения – ему достаточно только записать событие, а внешняя система сама его отправит. Есть готовые фреймворки и библиотеки (Eventuate Tram, Outbox patterns для различных языков), упрощающие внедрение.
Минусы:
- Дополнительная сложность и инфраструктура. Паттерн требует вести отдельную таблицу (или коллекцию) Outbox. Если вначале у вас не было БД, а только брокер, вам придется внедрять базу данных для хранения Outbox, что добавляет зависимость. Нужно также писать код для обработки этой таблицы (или настраивать сторонний инструмент). То есть появляется дополнительный контур, который надо разрабатывать, конфигурировать и поддерживать.
- Отложенная (eventual) консистентность. Между изменением в исходном сервисе и получением события в целевом может быть небольшая задержка. В нашем примере мы проверяем Outbox раз в 5 секунд – за это время другой сервис еще может не знать о произошедшем. В большинстве случаев это приемлемо, но все же это не мгновенная консистентность, к этому надо быть готовым (данные расходятся на короткое время).
- Потенциальные дубликаты и сложность обработки на приемнике. Как мы обсуждали, at-least-once означает необходимость разбирать дубляжи событий. Потребители должны быть более “умными”: например, хранить processed-ID или делать проверки. Это немного усложняет логику на стороне потребителей (хотя практические инструменты для этого обычно уже есть, и сами Kafka-консюмеры часто способны работать идемпотентно при правильном дизайне).
- Рост объема базы, нагрузка на БД. Таблица Outbox может расти очень быстро, особенно если сервис генерирует много событий. Необходимо реализовывать механизмы очистки/архивации обработанных записей (например, регулярно удалять старые успешно отправленные события, либо партиционировать таблицу). Дополнительно, запись в Outbox – это лишняя операция ввода-вывода для каждого события. В высоконагруженных системах нужно убедиться, что БД выдержит такой поток. Иногда вместо реляционной БД для Outbox применяют быстрые хранилища (NoSQL, in-memory), но тогда теряется часть гарантированной надежности транзакций.
- Человеческий фактор. Паттерн Outbox требует дисциплины от разработчиков. Нужно не забыть каждый раз при внесении изменений, требующих события, записать это событие в Outbox. Если забыли – получим несогласованность (данные изменились, а другие сервисы не уведомлены). Здесь помогают шаблоны проектирования и обертки: можно реализовать декоратор репозитория или доменные события, которые автоматически пишутся в Outbox (чтобы разработчику не приходилось писать повторяющийся код вручную).
В заключение: Transactional Outbox – мощный паттерн, повышающий надежность микросервисов. Его стоит применять, когда ценой некоторого усложнения системы вы хотите добиться гарантированной доставки событий и консистентности данных между сервисами. В сценариях, где потеря сообщения неприемлема (например, обработка денег, заказов, ключевых метрик бизнеса), Outbox-паттерн часто оказывается лучшим решением. Он прекрасно вписывается в архитектуры с event-driven взаимодействием и помогает строить системы, устойчивые к сбоям отдельных компонентов. С другой стороны, не стоит применять его без необходимости: для простых случаев, где можно допустить редкую потерю события или где достаточно синхронного вызова, данное решение может быть избыточным. Всегда оценивайте плюсы и минусы, но иметь этот паттерн в своём арсенале крайне полезно – в критических местах он способен предотвратить серьёзные баги и потери данных.
You must be logged in to post a comment.