Part 6 - Adding a Message Broker
So far, we've focused on the command side of the system. However, we still lack a reliable way to query bank accounts. In this part and the next, we'll implement the query side of CQRS using a message broker to keep read models in sync with domain events.
DugongJS is built on the ports-and-adapters architecture. While itβs not strictly necessary to use this pattern in your application, weβll stick to it here to illustrate how it works in practice. This allows us to defer infrastructure decisions until the composition stage in AppModule
.
Project Structure Updateβ
Weβll start by adding two new folders to the bank-account feature module: ports
and adapters
. Each folder will contain a repository
subfolder. We'll also add a consumer
folder to the application
folder, where message consumption will be handled:
π bank-account
ββ π adapters
β ββ π repository
β ββ π bank-account-query-model.entity.ts
β ββ π bank-account-query-model-write-repository-typeorm.service.ts
ββ π application
β ββ π command
β ββ π consumer
β β ββ π bank-account-query-model-projection-handler.service.ts
β β ββ π bank-account-query-model-projection-consumer.module.ts
β ββ π dtos
β ββ π query
ββ π domain
ββ π ports
β ββ π repository
β ββ π bank-account-query-model.ts
β ββ π i-bank-account-query-model-write-repository.ts
Defining the Bank Account Query Modelβ
In the ports/repository
folder, we define a basic interface that represents the query model:
export type BankAccountQueryModel = {
id: string;
owner: string;
balance: number;
};
Next, we define a TypeORM entity adapter for this port:
import { Column, Entity, PrimaryColumn } from "typeorm";
import { BankAccountQueryModel } from "../../ports/repository/bank-account-query-model.js";
@Entity("bank_account_query_model")
export class BankAccountQueryModelEntity implements BankAccountQueryModel {
@PrimaryColumn("uuid")
public id: string;
@Column()
public owner: string;
@Column()
public balance: number;
}
Make sure to include this entity in data-source-options.ts
:
import { ConsumedMessageEntity, DomainEventEntity, SnapshotEntity } from "@dugongjs/typeorm";
import type { DataSourceOptions } from "typeorm";
import { BankAccountQueryModelEntity } from "../bank-account/adapters/repository/bank-account-query-model.entity.js";
export const dataSourceOptions: DataSourceOptions = {
type: "postgres",
host: process.env.DB_HOST,
port: Number(process.env.DB_PORT),
username: process.env.DB_USERNAME,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
synchronize: true,
logging: false,
entities: [
DomainEventEntity,
SnapshotEntity,
ConsumedMessageEntity,
BankAccountQueryModelEntity
]
};
Adding an In-Memory Message Brokerβ
To keep our query models in sync with domain events, weβll introduce a message broker. To begin, we'll use an in-memory message broker suitable for local testing and demos.
The in-memory broker is not suitable for production environments. In later steps, weβll show how to replace it with Kafka or another production-ready transport.
Enable it in AppModule
by importing MessageBrokerInMemoryModule
and calling forRoot()
:
import { EventIssuerModule, MessageBrokerInMemoryModule } from "@dugongjs/nestjs";
import { AggregateQueryMicroserviceModule } from "@dugongjs/nestjs-microservice-query";
import { RepositoryTypeOrmModule, TransactionManagerTypeOrmModule } from "@dugongjs/nestjs-typeorm";
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BankAccountCommandModule } from "./bank-account/application/command/bank-account.command.module.js";
import { dataSourceOptions } from "./db/data-source-options.js";
@Module({
imports: [
TypeOrmModule.forRoot(dataSourceOptions),
RepositoryTypeOrmModule.forRoot(),
TransactionManagerTypeOrmModule.forRoot(),
EventIssuerModule.forRoot({ currentOrigin: "BankingContext-AccountService" }),
AggregateQueryMicroserviceModule,
MessageBrokerInMemoryModule.forRoot(),
BankAccountCommandModule
]
})
export class AppModule {}
With this, whenever domain events are committed, theyβll also be published to the in-memory message broker as messages.
Updating Query Modelsβ
To respond to these messages and update the query model, we define a new port and adapter:
π bank-account
ββ π adapters
β ββ π repository
β ββ π bank-account-query-model-write-repository-typeorm.service.ts
ββ π ports
ββ π repository
ββ π i-bank-account-query-model-write-repository.ts
First we define the port. Notice that both methods receive a transactionContext
as their first argument. This is because they will always be called from within a transaction:
import { TransactionContext } from "@dugongjs/core";
import { BankAccount } from "../../domain/bank-account.aggregate.js";
export interface IBankAccountQueryModelWriteRepository {
update(transactionContext: TransactionContext, bankAccount: BankAccount): Promise<void>;
delete(transactionContext: TransactionContext, bankAccountId: string): Promise<void>;
}
export const IBankAccountQueryModelWriteRepository = "IBankAccountQueryModelWriteRepository" as const;
Note that we define both an interface
and a const
named IBankAccountQueryModelWriteRepository
. This is valid in TypeScript, since runtime variables and types are handled separately. The const
serves as an injection token, which allows us to inject the interface in the following way:
@Inject(IBankAccountQueryModelWriteRepository) repository: IBankAccountQueryModelWriteRepository
Next, we implement the adapter using TypeORM:
import { Injectable } from "@nestjs/common";
import { BankAccount } from "src/bank-account/domain/bank-account.aggregate.js";
import { EntityManager } from "typeorm";
import { IBankAccountQueryModelWriteRepository } from "../../ports/repository/i-bank-account-query-model-Write-repository.js";
import { BankAccountQueryModelEntity } from "./bank-account-query-model.entity.js";
@Injectable()
export class BankAccountQueryModelWriteRepositoryTypeOrmService implements IBankAccountQueryModelWriteRepository {
public async update(transactionContext: EntityManager, bankAccount: BankAccount): Promise<void> {
const repository = transactionContext.getRepository(BankAccountQueryModelEntity);
const queryModel = new BankAccountQueryModelEntity();
queryModel.id = bankAccount.getId();
queryModel.owner = bankAccount.getOwner();
queryModel.balance = bankAccount.getBalance();
await repository.save(queryModel);
}
public async delete(transactionContext: EntityManager, bankAccountId: string): Promise<void> {
const repository = transactionContext.getRepository(BankAccountQueryModelEntity);
await repository.delete({ id: bankAccountId });
}
}
Handling Projectionsβ
Next, we create a projection handler that listens for messages and calls our write repository by implementing the IQueryModelProjectionHandler
interface from @dugongjs/nestjs
:
import { TransactionContext } from "@dugongjs/core";
import { IQueryModelProjectionHandler } from "@dugongjs/nestjs";
import { Inject, Injectable } from "@nestjs/common";
import { BankAccount } from "../../domain/bank-account.aggregate.js";
import { IBankAccountQueryModelWriteRepository } from "../../ports/repository/i-bank-account-query-model-write-repository.js";
@Injectable()
export class BankAccountQueryModelProjectionHandlerService implements IQueryModelProjectionHandler<typeof BankAccount> {
constructor(
@Inject(IBankAccountQueryModelWriteRepository)
private readonly queryModelRepository: IBankAccountQueryModelWriteRepository
) {}
public getAggregateClass(): typeof BankAccount {
return BankAccount;
}
public async updateQueryModel(transactionContext: TransactionContext, aggregate: BankAccount): Promise<void> {
return this.queryModelRepository.update(transactionContext, aggregate);
}
public async deleteQueryModel(transactionContext: TransactionContext, aggregateId: string): Promise<void> {
return this.queryModelRepository.delete(transactionContext, aggregateId);
}
}
By implementing this interface, query models will automatically be updated and deleted when messages are received.
Registering the Consumer Moduleβ
Finally, we'll create a module that wraps the built-in QueryModelProjectionConsumerModule
:
import { Constructor } from "@dugongjs/core";
import { QueryModelProjectionConsumerModule, QueryModelProjectionConsumerModuleOptions } from "@dugongjs/nestjs";
import { DynamicModule, Module } from "@nestjs/common";
import { IBankAccountQueryModelWriteRepository } from "../../ports/repository/i-bank-account-query-model-Write-repository.js";
import { BankAccountQueryModelProjectionHandlerService } from "./bank-account-query-model-projection-handler.service.js";
export type BankAccountQueryModelProjectionConsumerModuleOptions = Omit<
QueryModelProjectionConsumerModuleOptions,
"queryModelProjectionHandler"
> & {
repository: Constructor<IBankAccountQueryModelWriteRepository>;
};
@Module({})
export class BankAccountQueryModelProjectionConsumerModule {
public static register(options: BankAccountQueryModelProjectionConsumerModuleOptions): DynamicModule {
return QueryModelProjectionConsumerModule.register({
...options,
queryModelProjectionHandler: BankAccountQueryModelProjectionHandlerService,
module: {
imports: options.module?.imports,
providers: [
...(options.module?.providers ?? []),
{
provide: IBankAccountQueryModelWriteRepository,
useClass: options.repository
}
]
}
});
}
}
This module might look complex, but itβs actually a simple wrapper around QueryModelProjectionConsumerModule
from @dugongjs/nestjs
. It wires up the BankAccountQueryModelProjectionHandlerService
as the projection handler and ensures the appropriate repository is registered as a provider. It also adds optional support for injecting additional modules or providers, which may be required depending on the dependencies of the repository.
Finally, we'll register this module in AppModule
:
import { EventIssuerModule, MessageBrokerInMemoryModule } from "@dugongjs/nestjs";
import { AggregateQueryMicroserviceModule } from "@dugongjs/nestjs-microservice-query";
import { RepositoryTypeOrmModule, TransactionManagerTypeOrmModule } from "@dugongjs/nestjs-typeorm";
import { Module } from "@nestjs/common";
import { TypeOrmModule } from "@nestjs/typeorm";
import { BankAccountQueryModelWriteRepositoryTypeOrmService } from "./bank-account/adapters/repository/bank-account-query-model-write-repository-typeorm.service.js";
import { BankAccountCommandModule } from "./bank-account/application/command/bank-account.command.module.js";
import { BankAccountQueryModelProjectionConsumerModule } from "./bank-account/application/consumer/bank-account-query-model-projection-consumer.module.js";
import { dataSourceOptions } from "./db/data-source-options.js";
@Module({
imports: [
TypeOrmModule.forRoot(dataSourceOptions),
RepositoryTypeOrmModule.forRoot(),
TransactionManagerTypeOrmModule.forRoot(),
EventIssuerModule.forRoot({ currentOrigin: "BankingContext-AccountService" }),
AggregateQueryMicroserviceModule,
MessageBrokerInMemoryModule.forRoot(),
BankAccountCommandModule,
BankAccountQueryModelProjectionConsumerModule.register({
repository: BankAccountQueryModelWriteRepositoryTypeOrmService
})
]
})
export class AppModule {}
As we are using the ports-and-adapters architecture, this is where we inject the TypeORM implementation. One of the key benefits of this architecture is that if we ever want to switch out the infrastructure or dependencies (e.g. using Prisma or Drizzle instead of TypeORM), we could do so simply by writing new adapters and updating the injected services in the AppModule
.
If you now test the application with curl or Postman, you should see the messages being published and consumed, and the query models updated in the database.
In the next section, we'll expose the query model through a REST API.