Part 11 - Restoring Transactionality with the Outbox Pattern

In the previous part, we demonstrated a critical flaw: after switching from the in-memory message broker to Kafka, our system lost its transactionality. Specifically, we observed that domain events could be published to Kafka even if the corresponding transaction on the command side failed — leading to potential divergence between services.
In this part, we will solve this using the outbox pattern. We will implement an asymmetric messaging strategy where KafkaJS is still used to set up consumers, but an outbox database table will take on the role as the producer.
Background
The outbox pattern is a widely used approach to ensure reliable message publishing in the face of application errors or database and network failures. Instead of sending messages directly to a message broker (which happens outside the boundaries of a database transaction), the message is first written to an “outbox” table in the same database, as part of the current transaction. If the transaction commits, the message is persisted along with any other changes.
A separate message relay process (commonly called an “outbox processor” or “log miner”) then picks up these messages and forwards them to the actual message broker — ensuring that only successfully committed messages are ever published.
This decoupling guarantees that if a transaction fails, no message will be published, because nothing is ever written to the outbox table.
To learn more about the theory and practical implementations of the outbox pattern, check out this article by Gunnar Morling.
Setting Up Kafka Connect
We can integrate the outbox pattern with minimal code changes, but it does require a bit of additional infrastructure, as we will be setting up Kafka Connect and Debezium. Let’s begin by updating our docker-compose.yaml:
Docker Compose Changes
We’ll make three important updates:
- Add the PostgreSQL container to the same Docker network as Kafka.
- Adjust PostgreSQL settings to enable logical replication.
- Add a Kafka Connect container that runs Debezium.
<!-- prettier-ignore-start -->
networks:
  kafka-net:
    driver: bridge
services:
  postgres:
    image: postgres:14
    container_name: dugongjs_nestjs_tutorial_account_service_db
    restart: unless-stopped
    networks:
      - kafka-net
    environment:
      POSTGRES_USER: postgres_user
      POSTGRES_PASSWORD: postgres_password
      POSTGRES_DB: account_service_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_wal_senders=1"
      - "-c"
      - "max_replication_slots=1"
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: dugongjs_nestjs_tutorial_zookeeper
    networks:
      - kafka-net
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: dugongjs_nestjs_tutorial_kafka
    networks:
      - kafka-net
    ports:
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT_INTERNAL://0.0.0.0:29092,PLAINTEXT_C://0.0.0.0:9093,PLAINTEXT_L://0.0.0.0:9092,
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_INTERNAL://kafka:29092,PLAINTEXT_L://localhost:9092,PLAINTEXT_C://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT_INTERNAL:PLAINTEXT,PLAINTEXT_L:PLAINTEXT,PLAINTEXT_C:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
  kafka-connect:
    image: debezium/connect:2.4
    container_name: dugongjs_nestjs_tutorial_kafka_connect
    depends_on:
    - kafka
    - postgres
    ports:
    - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9093
      GROUP_ID: kafka-connect-group
      CONFIG_STORAGE_TOPIC: kafka-connect-configs
      OFFSET_STORAGE_TOPIC: kafka-connect-offsets
      STATUS_STORAGE_TOPIC: kafka-connect-status
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_PLUGIN_PATH: /kafka/connect,/debezium/connectors
    networks:
      - kafka-net
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: dugongjs_nestjs_tutorial_kafka_ui
    networks:
      - kafka-net
    ports:
      - "8080:8080"
    depends_on:
      - kafka
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
volumes:
  postgres_data:
<!-- prettier-ignore-end -->
Setting Up Debezium
Debezium is a change data capture (CDC) tool that integrates with Kafka Connect. We’ll configure it to monitor the outbox table and emit messages based on committed rows.
Create a connector.config.json file with the following contents:
{
    "name": "outbox-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "plugin.name": "pgoutput",
        "topic.prefix": "outbox",
        "table.include.list": "public.outbox",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres_user",
        "database.password": "postgres_password",
        "database.dbname": "account_service_db",
        "transforms": "outbox",
        "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
        "transforms.outbox.route.by.field": "channelId",
        "transforms.outbox.route.topic.replacement": "${routedByValue}",
        "transforms.outbox.table.field.event.id": "id",
        "transforms.outbox.table.field.event.key": "aggregateId",
        "transforms.outbox.table.field.event.payload": "payload",
        "transforms.outbox.table.fields.additional.placement": "origin:header,aggregateType:header,type:header,version:header,sequenceNumber:header,timestamp:header,correlationId:header,triggeredByUserId:header,triggeredByEventId:header,metadata:header",
        "transforms.outbox.table.expand.json.payload": "true",
        "transforms.outbox.table.json.payload.null.behavior": "ignore",
        "transforms.outbox.route.tombstone.on.empty.payload": "true",
        "key.converter": "io.debezium.converters.BinaryDataConverter",
        "key.converter.delegate.converter.type": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.delegate.converter.type.schemas.enable": "false",
        "value.converter": "io.debezium.converters.BinaryDataConverter",
        "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.delegate.converter.type.schemas.enable": "false"
    }
}
Breakdown of the Debezium Configuration
Debezium is a complex platform with many configurable parameters. Here’s a brief summary of what each part of the configuration does:
1. Plugin and Table Selection
{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "pgoutput",
    "topic.prefix": "outbox",
    "table.include.list": "public.outbox"
}
- connector.class: Tells Kafka Connect to use the PostgreSQL connector from Debezium.
- plugin.name: Specifies the logical decoding output plugin used by PostgreSQL. pgoutput is the default used for logical replication.
- topic.prefix: Required by Debezium, even though it’s overridden by our outbox transformation. It normally controls the prefix of generated Kafka topics. Although it is not used, Debezium requires it as part of its schema validation.
- table.include.list: Tells Debezium which tables to monitor. Here, it watches only the- outboxtable in the- publicschema.
2. Database Connection
{
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres_user",
    "database.password": "postgres_password",
    "database.dbname": "account_service_db"
}
These settings configure the connection to our PostgreSQL database.
In production, use environment variables or secret stores to manage credentials securely.
We are using the same credentials for both the NestJS application and Debezium. In production, this should be a dedicated user with only the minimal necessary permissions. See the Debezium docs on PostgreSQL permissions.
3. Outbox Plugin Settings
{
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "channelId",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregateId",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.fields.additional.placement": "origin:header,aggregateType:header,type:header,version:header,sequenceNumber:header,timestamp:header,correlationId:header,triggeredByUserId:header,triggeredByEventId:header,metadata:header",
    "transforms.outbox.table.expand.json.payload": "true",
    "transforms.outbox.table.json.payload.null.behavior": "ignore",
    "transforms.outbox.route.tombstone.on.empty.payload": "true"
}
This block configures Debezium to use the Outbox Event Router transformation. This is the crucial part that handles the transformation between our outbox table and Kafka messages.
Here’s what each setting does:
- transforms: Enables a named transformation — here called "outbox".
- transforms.outbox.type: Specifies that this transformation uses Debezium’s- EventRouter, which rewrites database rows into structured Kafka messages.
- transforms.outbox.route.by.field: Specifies the database column used to dynamically determine the target Kafka topic. We’re using- channelIdfor this.
- transforms.outbox.route.topic.replacement: Replaces the Kafka topic name with the value in- channelId.
- transforms.outbox.table.field.event.id: The column storing the unique event ID.
- transforms.outbox.table.field.event.key: The column used as the Kafka message key — in this case,- aggregateId.
- transforms.outbox.table.field.event.payload: The main payload column that contains the domain event data in serialized JSON.
- transforms.outbox.table.fields.additional.placement: Lists other fields to include in the Kafka message and where to place them — in this case, all as headers.
- transforms.outbox.table.expand.json.payload: Tells Debezium to expand the JSON payload into fields.
The configuration shown here exactly mimics the message producer adapter from @dugongjs/kafkajs. Messages emitted to Kafka through the outbox should therefore be indistinguishable from the ones emitted in the previous parts.
4. Converters
{
    "key.converter": "io.debezium.converters.BinaryDataConverter",
    "key.converter.delegate.converter.type": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.delegate.converter.type.schemas.enable": "false",
    "value.converter": "io.debezium.converters.BinaryDataConverter",
    "value.converter.delegate.converter.type": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.delegate.converter.type.schemas.enable": "false"
}
This section configures how Debezium serializes the Kafka message keys and values.
- key.converterand- value.converterare both set to- BinaryDataConverter, which allows us to delegate the actual conversion to a simpler underlying converter.
- For the key, we use a plain string converter.
- For the value, we use a JSON converter.
In both cases, we explicitly disable schema inclusion (schemas.enable: false). Without this setting, Debezium would include the full Avro-style schema along with each message. While this can be useful in some environments (such as those using schema registries), DugongJS does not use or require this metadata — so we disable it to reduce message size and simplify parsing.
Creating a Bootstrap Script
Up until now, we’ve used docker compose up to run our containers. However, to include Debezium, we need to make a small adjustment. Kafka Connect exposes a REST API on localhost:8083, which we must call to register the Debezium connector. To automate this process, we’ll create the following bootstrap.sh script:
#!/bin/bash
set -euo pipefail
CONNECTOR_NAME="account-outbox-connector"
CONFIG_FILE="connector.config.json"
KAFKA_CONNECT_URL="http://localhost:8083/connectors"
echo "🛠️  Starting Docker Compose..."
docker compose up -d
echo "⏳ Waiting for Kafka Connect to be available..."
until curl -s "${KAFKA_CONNECT_URL}" > /dev/null; do
    sleep 2
done
echo "✅ Kafka Connect is up!"
if [[ ! -f "$CONFIG_FILE" ]]; then
    echo "❌ Connector config file '$CONFIG_FILE' not found!"
    exit 1
fi
echo "🚀 Submitting connector config..."
curl -s -X POST \
    -H "Content-Type: application/json" \
    --data @"$CONFIG_FILE" \
    "${KAFKA_CONNECT_URL}"
echo "✅ Debezium connector '${CONNECTOR_NAME}' created."
Make the script executable:
chmod +x ./bootstrap.sh
Now, instead of running:
docker compose up
Use the script:
./bootstrap.sh
To stop the running containers:
docker compose down
Updating the Modules
Supporting the outbox pattern requires minimal changes to our source code. We do not need to install any additional dependencies, as the @dugongjs/typeorm and @dugongjs/nestjs-typeorm packages have everything we need. The first update is to our data-source-config.ts:
import { ConsumedMessageEntity, DomainEventEntity, OutboxEntity, SnapshotEntity } from "@dugongjs/typeorm";
import type { DataSourceOptions } from "typeorm";
import { BankAccountQueryModelEntity } from "../bank-account/adapters/repository/bank-account-query-model.entity.js";
export const dataSourceOptions: DataSourceOptions = {
    type: "postgres",
    host: process.env.DB_HOST,
    port: Number(process.env.DB_PORT),
    username: process.env.DB_USERNAME,
    password: process.env.DB_PASSWORD,
    database: process.env.DB_NAME,
    synchronize: true,
    logging: false,
    entities: [
        DomainEventEntity,
        SnapshotEntity,
        ConsumedMessageEntity,
        OutboxEntity,
        BankAccountQueryModelEntity
    ]
};
Here we’ve added the OutboxEntity, which introduces the outbox table to our database schema.
Next, update AppModule:
import { EventIssuerModule } from "@dugongjs/nestjs";
import { KafkaModule, MessageConsumerKafkaJSModule } from "@dugongjs/nestjs-kafkajs";
import { AggregateQueryMicroserviceModule } from "@dugongjs/nestjs-microservice-query";
import {
    OutboxMessageProducerTypeOrmModule,
    RepositoryTypeOrmModule,
    TransactionManagerTypeOrmModule
} from "@dugongjs/nestjs-typeorm";
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BankAccountQueryModelReadRepositoryTypeOrmService } from "./bank-account/adapters/repository/bank-account-query-model-read-repository-typeorm.service.js";
import { BankAccountQueryModelWriteRepositoryTypeOrmService } from "./bank-account/adapters/repository/bank-account-query-model-write-repository-typeorm.service.js";
import { BankAccountQueryModelEntity } from "./bank-account/adapters/repository/bank-account-query-model.entity.js";
import { BankAccountCommandModule } from "./bank-account/application/command/bank-account.command.module.js";
import { BankAccountQueryModelProjectionConsumerModule } from "./bank-account/application/consumer/bank-account-query-model-projection-consumer.module.js";
import { BankAccountQueryModule } from "./bank-account/application/query/bank-account.query.module.js";
import { dataSourceOptions } from "./db/data-source-options.js";
@Module({
    imports: [
        TypeOrmModule.forRoot(dataSourceOptions),
        KafkaModule.forRoot({ brokers: process.env.KAFKA_BROKERS!.split(",") }),
        RepositoryTypeOrmModule.forRoot(),
        TransactionManagerTypeOrmModule.forRoot(),
        EventIssuerModule.forRoot({ currentOrigin: "BankingContext-AccountService" }),
        AggregateQueryMicroserviceModule,
        MessageConsumerKafkaJSModule.forRoot(),
        OutboxMessageProducerTypeOrmModule.forRoot(),
        BankAccountCommandModule,
        BankAccountQueryModelProjectionConsumerModule.register({
            repository: BankAccountQueryModelWriteRepositoryTypeOrmService
        }),
        BankAccountQueryModule.register({
            module: { imports: [TypeOrmModule.forFeature([BankAccountQueryModelEntity])] },
            repository: BankAccountQueryModelReadRepositoryTypeOrmService
        })
    ]
})
export class AppModule {}
Two important changes have been made:
- Switched from MessageBrokerKafkaJSModuletoMessageConsumerKafkaJSModule: This module only registers Kafka as a consumer. Previously, we usedMessageBrokerKafkaJSModule, which is a wrapper around bothMessageConsumerKafkaJSModuleandMessageProducerKafkaJSModule. Since message publishing is now handled via the outbox, we no longer need the Kafka producer.
- Added OutboxMessageProducerTypeOrmModule: This enables DugongJS to write messages to the outbox table during the same database transaction as the command handling — ensuring transactionality.
In the next part we will once again evaluate the transactionality of the system.