#vthang

Nestjs - How I write pub/sub logic in a more type-safety way

May 5, 2023

First you need to define your message shapes. All your pub/sub will follow those shapes:

// typescript is duck-typing so our type satisfy Messages.
// type Payload = any;
// type Message = string;
// type Messages = { [k: Message]: Payload };

type ReconciliationMessages = {
  'upload-file': { ids: number[] };
  'update-order-status': { id: number };
};

Next, we create a Channel to manage the exchange declaration, publish messages, and consume messages:

export const ffmOrderChannel = ChannelManager.defineExchange<
    ReconciliationMessages
>('ffm-order', {
  queuePrefix: 'ffm-queue-',
});

After add ffmOrderChannel.Provider to providers. We can inject into our services then we can publish and consume message like bellow:

@Module({
    ...
    providers: [
        ...
        ffmOrderChannel.Provider,
        ...
    ]
})
@Injectable()
export class ReconciliationService {
  constructor(
    @InjectConnection(orderConnection)
    private connection: Connection,

    @ffmOrderChannel.InjectChannel()
    private channel: Channel<ReconciliationMessages>,
  ) {}

  action() {
    ...
    // (method) ReconciliationService.publishMessage<"upload-file">(message: "upload-file", payload: {
    //   ids: number[];
    // }): Promise<void>
    await this.channel.publishMessage('upload-file', { ids: orderIds });
  }
  ...

  @ffmOrderChannel.ConsumeMessage('upload-file')
  async processUploadedFile(data: ReconciliationMessages['upload-file']) {
    const list = data.ids || [];

    for (let i = 0; i < list.length; i++) {
      await this.channel.publishMessage('update-order-status', {
        id: list[i],
      });
    }
  }

  // Error: Unable to resolve signature of method decorator
  // when called as an expression.
  // data must be ReconciliationMessages['update-order-status']
  @ffmOrderChannel.ConsumeMessage('update-order-status')
  async processUpdateOrderStatus(data: number) {
    ...
  }
}

That’s it! It’s all you want to make your pub/sub logic more type-safety!

If you want to see the ugly part that I implemented. This is the behind the scene:


type Payload = any;
type Message = string;
type Messages = { [k: Message]: Payload };

type AmqpWrapperOptions = {
  queuePrefix?: string;
};

export class ExchangeChannel<T extends Messages> {
  private provider: Provider;
  constructor(private exchange: string, private options: AmqpWrapperOptions) {
    this.provider = ChannelManager.createProvider(exchange, options);
  }

  get Provider() {
    return this.provider;
  }

  get Name() {
    return this.exchange;
  }

  get Options() {
    return this.options;
  }

  InjectChannel() {
    return Inject(ChannelManagerSingleton.AMQP_INJECT_PREFIX + this.exchange);
  }

  ConsumeMessage<U extends keyof T>(key: U) {
    return <R extends any[]>(
      target: any,
      nkey: string | symbol,
      descriptor: TypedPropertyDescriptor<(...args: R) => any>,
    ): TypedPropertyDescriptor<(data: T[U]) => Promise<void>> | void => {
      let k: string = key as string;
      if (this.options?.queuePrefix != null) {
        k = this.options.queuePrefix + key;
      }
      RabbitRPC({
        exchange: this.exchange,
        routingKey: k,
        queue: k,
        errorHandler: rmqErrorsHandler,
      })(target, nkey, descriptor);
    };
  }

  createConsumer() {
    return this.ConsumeMessage;
  }
}

class ChannelManagerSingleton {
  static AMQP_INJECT_PREFIX = 'amqp:';
  private channels: Channel<any>[] = [];
  private providers: FactoryProvider[] = [];
  private exchanges: ExchangeChannel<any>[] = [];
  private static instance: ChannelManagerSingleton;
  private constructor() {}

  static get Instance() {
    return this.instance || (this.instance = new this());
  }

  get Count() {
    return { channels: this.channels.length, providers: this.providers.length };
  }

  createChannel(connection: AmqpConnection, exchange: string, options: AmqpWrapperOptions = {}) {
    const channel = new Channel(connection, exchange, options);
    this.channels.push(channel);
    return channel;
  }

  createProvider(exchange: string, options: AmqpWrapperOptions): Provider {
    const provider = {
      provide: ChannelManagerSingleton.AMQP_INJECT_PREFIX + exchange,
      inject: [AmqpConnection],
      useFactory: (amqpConnection: AmqpConnection) =>
        this.createChannel(amqpConnection, exchange, options),
    };
    const found = this.providers.find(p => p.provide === exchange);
    if (found) throw 'Can not red';
    this.providers.push(provider);
    return provider;
  }

  defineExchange<T extends Messages>(
    exchange: string,
    options: AmqpWrapperOptions,
  ): ExchangeChannel<T> {
    const found = this.exchanges.find(el => el.Name === exchange);
    if (found) return found;
    const exc = new ExchangeChannel<T>(exchange, options);
    this.exchanges.push(exc);
    return exc;
  }
}
export const ChannelManager = ChannelManagerSingleton.Instance;

export class Channel<T extends Messages> {
  constructor(
    private connection: AmqpConnection,
    private exchange: string,
    private options: AmqpWrapperOptions = {},
  ) {}

  async publishMessage<K extends keyof T>(message: K, payload: T[K]): Promise<void> {
    let key: string = message as string;
    if (this.options?.queuePrefix) {
      key = this.options?.queuePrefix + key;
    }
    if (!this.connection) throw 'Connection is not attached to the instance';

    await this.connection.publish(this.exchange, key, payload);
  }
}