First you need to define your message shapes. All your pub/sub will follow those shapes:
// typescript is duck-typing so our type satisfy Messages.
// type Payload = any;
// type Message = string;
// type Messages = { [k: Message]: Payload };
type ReconciliationMessages = {
'upload-file': { ids: number[] };
'update-order-status': { id: number };
};
Next, we create a Channel
to manage the exchange declaration, publish messages, and consume messages:
export const ffmOrderChannel = ChannelManager.defineExchange<
ReconciliationMessages
>('ffm-order', {
queuePrefix: 'ffm-queue-',
});
After add ffmOrderChannel.Provider
to providers. We can inject into our services then we can publish and consume message like bellow:
@Module({
...
providers: [
...
ffmOrderChannel.Provider,
...
]
})
@Injectable()
export class ReconciliationService {
constructor(
@InjectConnection(orderConnection)
private connection: Connection,
@ffmOrderChannel.InjectChannel()
private channel: Channel<ReconciliationMessages>,
) {}
action() {
...
// (method) ReconciliationService.publishMessage<"upload-file">(message: "upload-file", payload: {
// ids: number[];
// }): Promise<void>
await this.channel.publishMessage('upload-file', { ids: orderIds });
}
...
@ffmOrderChannel.ConsumeMessage('upload-file')
async processUploadedFile(data: ReconciliationMessages['upload-file']) {
const list = data.ids || [];
for (let i = 0; i < list.length; i++) {
await this.channel.publishMessage('update-order-status', {
id: list[i],
});
}
}
// Error: Unable to resolve signature of method decorator
// when called as an expression.
// data must be ReconciliationMessages['update-order-status']
@ffmOrderChannel.ConsumeMessage('update-order-status')
async processUpdateOrderStatus(data: number) {
...
}
}
That’s it! It’s all you want to make your pub/sub logic more type-safety!
If you want to see the ugly part that I implemented. This is the behind the scene:
type Payload = any;
type Message = string;
type Messages = { [k: Message]: Payload };
type AmqpWrapperOptions = {
queuePrefix?: string;
};
export class ExchangeChannel<T extends Messages> {
private provider: Provider;
constructor(private exchange: string, private options: AmqpWrapperOptions) {
this.provider = ChannelManager.createProvider(exchange, options);
}
get Provider() {
return this.provider;
}
get Name() {
return this.exchange;
}
get Options() {
return this.options;
}
InjectChannel() {
return Inject(ChannelManagerSingleton.AMQP_INJECT_PREFIX + this.exchange);
}
ConsumeMessage<U extends keyof T>(key: U) {
return <R extends any[]>(
target: any,
nkey: string | symbol,
descriptor: TypedPropertyDescriptor<(...args: R) => any>,
): TypedPropertyDescriptor<(data: T[U]) => Promise<void>> | void => {
let k: string = key as string;
if (this.options?.queuePrefix != null) {
k = this.options.queuePrefix + key;
}
RabbitRPC({
exchange: this.exchange,
routingKey: k,
queue: k,
errorHandler: rmqErrorsHandler,
})(target, nkey, descriptor);
};
}
createConsumer() {
return this.ConsumeMessage;
}
}
class ChannelManagerSingleton {
static AMQP_INJECT_PREFIX = 'amqp:';
private channels: Channel<any>[] = [];
private providers: FactoryProvider[] = [];
private exchanges: ExchangeChannel<any>[] = [];
private static instance: ChannelManagerSingleton;
private constructor() {}
static get Instance() {
return this.instance || (this.instance = new this());
}
get Count() {
return { channels: this.channels.length, providers: this.providers.length };
}
createChannel(connection: AmqpConnection, exchange: string, options: AmqpWrapperOptions = {}) {
const channel = new Channel(connection, exchange, options);
this.channels.push(channel);
return channel;
}
createProvider(exchange: string, options: AmqpWrapperOptions): Provider {
const provider = {
provide: ChannelManagerSingleton.AMQP_INJECT_PREFIX + exchange,
inject: [AmqpConnection],
useFactory: (amqpConnection: AmqpConnection) =>
this.createChannel(amqpConnection, exchange, options),
};
const found = this.providers.find(p => p.provide === exchange);
if (found) throw 'Can not red';
this.providers.push(provider);
return provider;
}
defineExchange<T extends Messages>(
exchange: string,
options: AmqpWrapperOptions,
): ExchangeChannel<T> {
const found = this.exchanges.find(el => el.Name === exchange);
if (found) return found;
const exc = new ExchangeChannel<T>(exchange, options);
this.exchanges.push(exc);
return exc;
}
}
export const ChannelManager = ChannelManagerSingleton.Instance;
export class Channel<T extends Messages> {
constructor(
private connection: AmqpConnection,
private exchange: string,
private options: AmqpWrapperOptions = {},
) {}
async publishMessage<K extends keyof T>(message: K, payload: T[K]): Promise<void> {
let key: string = message as string;
if (this.options?.queuePrefix) {
key = this.options?.queuePrefix + key;
}
if (!this.connection) throw 'Connection is not attached to the instance';
await this.connection.publish(this.exchange, key, payload);
}
}