Part 9 - Switching to Kafka

In this part, we’ll replace the in-memory message broker configured earlier with Kafka — a production-ready inter-process message broker.
Running Kafka Locally
Update your docker-compose.yaml to include Kafka, Zookeeper, and a UI client. Make sure to also set up the kafka-net network:
<!-- prettier-ignore-start -->
networks:
  kafka-net:
    driver: bridge
services:
  postgres:
    image: postgres:14
    container_name: dugongjs_nestjs_tutorial_account_service_db
    restart: unless-stopped
    environment:
      POSTGRES_USER: postgres_user
      POSTGRES_PASSWORD: postgres_password
      POSTGRES_DB: account_service_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: dugongjs_nestjs_tutorial_zookeeper
    networks:
      - kafka-net
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    container_name: dugongjs_nestjs_tutorial_kafka
    networks:
      - kafka-net
    ports:
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_LISTENERS: PLAINTEXT_INTERNAL://0.0.0.0:29092,PLAINTEXT_C://0.0.0.0:9093,PLAINTEXT_L://0.0.0.0:9092,
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_INTERNAL://kafka:29092,PLAINTEXT_L://localhost:9092,PLAINTEXT_C://kafka:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT_INTERNAL:PLAINTEXT,PLAINTEXT_L:PLAINTEXT,PLAINTEXT_C:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    container_name: dugongjs_nestjs_tutorial_kafka_ui
    networks:
      - kafka-net
    ports:
      - "8080:8080"
    depends_on:
      - kafka
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9093
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
volumes:
  postgres_data:
<!-- prettier-ignore-end -->
Then run:
docker compose up
This starts
- Zookeeper (required by Kafka).
- Kafka
- Kafka UI (a web interface for monitoring Kafka)
Kafka will be accessible at localhost:9092. To view Kafka messages, navigate to http://localhost:8080 in your browser.
Installing Dependencies
Install the Kafka packages for DugongJS and NestJS:
npm install kafkajs @dugong/kafkajs @dugong/nestjs-kafkajs
Replacing the In-Memory Message Broker With Kafka
First, add the Kafka broker config to your .env file:
DB_HOST=localhost
DB_PORT=5432
DB_USERNAME=postgres_user
DB_PASSWORD=postgres_password
DB_NAME=account_service_db
KAFKA_BROKERS=localhost:9092
Then update your AppModule. Remove the MessageBrokerInMemoryModule and instead import KafkaModule and MessageBrokerKafkaJSModule from @dugongjs/nestjs-kafkajs:
import { EventIssuerModule } from "@dugongjs/nestjs";
import { KafkaModule, MessageBrokerKafkaJSModule } from "@dugongjs/nestjs-kafkajs";
import { AggregateQueryMicroserviceModule } from "@dugongjs/nestjs-microservice-query";
import { RepositoryTypeOrmModule, TransactionManagerTypeOrmModule } from "@dugongjs/nestjs-typeorm";
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BankAccountQueryModelReadRepositoryTypeOrmService } from "./bank-account/adapters/repository/bank-account-query-model-read-repository-typeorm.service.js";
import { BankAccountQueryModelWriteRepositoryTypeOrmService } from "./bank-account/adapters/repository/bank-account-query-model-write-repository-typeorm.service.js";
import { BankAccountQueryModelEntity } from "./bank-account/adapters/repository/bank-account-query-model.entity.js";
import { BankAccountCommandModule } from "./bank-account/application/command/bank-account.command.module.js";
import { BankAccountQueryModelProjectionConsumerModule } from "./bank-account/application/consumer/bank-account-query-model-projection-consumer.module.js";
import { BankAccountQueryModule } from "./bank-account/application/query/bank-account.query.module.js";
import { dataSourceOptions } from "./db/data-source-options.js";
@Module({
    imports: [
        TypeOrmModule.forRoot(dataSourceOptions),
        KafkaModule.forRoot({ brokers: process.env.KAFKA_BROKERS!.split(",") }),
        RepositoryTypeOrmModule.forRoot(),
        TransactionManagerTypeOrmModule.forRoot(),
        EventIssuerModule.forRoot({ currentOrigin: "BankingContext-AccountService" }),
        AggregateQueryMicroserviceModule,
        MessageBrokerKafkaJSModule.forRoot(),
        BankAccountCommandModule,
        BankAccountQueryModelProjectionConsumerModule.register({
            repository: BankAccountQueryModelWriteRepositoryTypeOrmService
        }),
        BankAccountQueryModule.register({
            module: { imports: [TypeOrmModule.forFeature([BankAccountQueryModelEntity])] },
            repository: BankAccountQueryModelReadRepositoryTypeOrmService
        })
    ]
})
export class AppModule {}
Restart the application to apply the changes.
Testing the Adapter
Use curl or Postman to invoke some operations — open new accounts, deposit and withdraw money, or close accounts.
Once events are generated, open http://localhost:8080 and go to the Topics section in Kafka UI. You should see a topic named something like:
banking-context-account-service-bank-account
This name is automatically generated from the configured origin and the name of the aggregate.
Select the topic and go to the Messages tab. Here, you’ll see every published domain event. Each message includes:
- Key: the aggregate ID.
- Value: the event payload.
- Headers: remaining event metadata.
Kafka UI is a good tool for monitoring and debugging Kafka related issues. However, its usefulness for debugging domain events and aggregates is limited similarly to database clients. The Dugong CLI configured in part 5 is generally a better fit for understanding the behavior of domain events and aggregates.