From a7fd47950dd617306173943cdddf25b568d2a4e5 Mon Sep 17 00:00:00 2001 From: Emmanuel Merali Date: Mon, 13 Apr 2026 20:52:32 +0300 Subject: [PATCH] Await on redis.subscribe() to ensure that the method returns only after Redis has acknowledged the registration of the subscriber. Add return number of subscribers on publish Fix MaxListenersExceededWarning when subscribing to too many channels with Redis transport --- src/bus.ts | 8 ++-- src/bus_manager.ts | 8 ++-- src/transports/memory.ts | 6 ++- src/transports/mqtt.ts | 3 +- src/transports/redis.ts | 74 ++++++++++++++++++++------------- src/types/main.ts | 2 +- test_helpers/chaos_transport.ts | 4 +- 7 files changed, 62 insertions(+), 43 deletions(-) diff --git a/src/bus.ts b/src/bus.ts index ac7e849..cbb3eb6 100644 --- a/src/bus.ts +++ b/src/bus.ts @@ -54,10 +54,10 @@ export class Bus { await this.processErrorRetryQueue() } - subscribe(channel: string, handler: SubscribeHandler) { + async subscribe(channel: string, handler: SubscribeHandler) { debug(`subscribing to channel ${channel}`) - return this.#transport.subscribe(channel, async (message) => { + return await this.#transport.subscribe(channel, async (message) => { debug('received message %j from bus', message) // @ts-expect-error - TODO: Weird typing issue handler(message) @@ -94,7 +94,7 @@ export class Bus { return this.#transport.disconnect() } - unsubscribe(channel: string) { - return this.#transport.unsubscribe(channel) + async unsubscribe(channel: string) { + return await this.#transport.unsubscribe(channel) } } diff --git a/src/bus_manager.ts b/src/bus_manager.ts index fa8b494..12e01d8 100644 --- a/src/bus_manager.ts +++ b/src/bus_manager.ts @@ -58,12 +58,12 @@ export class BusManager> return this.use().publish(channel, message) } - subscribe(channel: string, handler: SubscribeHandler) { - return this.use().subscribe(channel, handler) + async subscribe(channel: string, handler: SubscribeHandler) { + return await this.use().subscribe(channel, handler) } - unsubscribe(channel: string) { - return this.use().unsubscribe(channel) + async unsubscribe(channel: string) { + return await this.use().unsubscribe(channel) } disconnect() { diff --git a/src/transports/memory.ts b/src/transports/memory.ts index dec1fef..4febc6f 100644 --- a/src/transports/memory.ts +++ b/src/transports/memory.ts @@ -38,16 +38,20 @@ export class MemoryTransport implements Transport { async publish(channel: string, message: Serializable) { const handlers = MemoryTransport.#subscriptions.get(channel) + let count: number = 0 if (!handlers) { - return + return count } for (const { handler, busId } of handlers) { if (busId === this.#id) continue + count++ handler(message) } + + return count } async subscribe(channel: string, handler: SubscribeHandler) { diff --git a/src/transports/mqtt.ts b/src/transports/mqtt.ts index df12ae3..f40937c 100644 --- a/src/transports/mqtt.ts +++ b/src/transports/mqtt.ts @@ -47,12 +47,13 @@ export class MqttTransport implements Transport { await this.#client.endAsync() } - async publish(channel: string, message: any): Promise { + async publish(channel: string, message: any): Promise { assert(this.#id, 'You must set an id before publishing a message') const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) await this.#client.publishAsync(channel, encoded) + return -1 // undefined } async subscribe( diff --git a/src/transports/redis.ts b/src/transports/redis.ts index af89683..7830481 100644 --- a/src/transports/redis.ts +++ b/src/transports/redis.ts @@ -13,13 +13,14 @@ import { JsonEncoder } from '../encoders/json_encoder.js' import type { Transport, TransportEncoder, - TransportMessage, Serializable, SubscribeHandler, RedisTransportConfig, RedisTransportOptions, } from '../types/main.js' +type Handler = (message: Buffer | string) => void | Promise + export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) { return () => new RedisTransport(config, encoder) } @@ -29,6 +30,7 @@ export class RedisTransport implements Transport { readonly #subscriber: Redis | Cluster readonly #encoder: TransportEncoder readonly #useMessageBuffer: boolean = false + readonly #handlers = new Map>() #id: string | undefined @@ -54,6 +56,7 @@ export class RedisTransport implements Transport { this.#publisher = options.duplicate() this.#subscriber = options.duplicate() this.#useMessageBuffer = transportOptions?.useMessageBuffer ?? false + this.#setupSubscriber() return } @@ -65,6 +68,36 @@ export class RedisTransport implements Transport { if (typeof options === 'object') { this.#useMessageBuffer = options.useMessageBuffer ?? false } + this.#setupSubscriber() + } + + #setupSubscriber = () => { + const event = this.#useMessageBuffer ? 'messageBuffer' : 'message' + this.#subscriber.on(event, this.#onMessage) + } + + #onMessage = async (receivedChannel: Buffer | string, message: Buffer | string) => { + const channel = receivedChannel.toString() + const handlers = this.#handlers.get(channel) + debug('received message for channel "%s"', channel) + if (!handlers || handlers.size === 0) { + debug('no handlers for channel "%s"', channel) + return + } + for (const handler of handlers) { + await handler(message) + } + } + + #makeHandler = (handler: SubscribeHandler) => { + return async (message: Buffer | string) => { + const data = this.#encoder.decode(message) + if (data.busId === this.#id) { + debug('ignoring message published by the same bus instance') + return + } + await handler(data.payload) + } } setId(id: string): Transport { @@ -77,45 +110,25 @@ export class RedisTransport implements Transport { await Promise.all([this.#publisher.quit(), this.#subscriber.quit()]) } - async publish(channel: string, message: Serializable): Promise { + async publish(channel: string, message: Serializable): Promise { assert(this.#id, 'You must set an id before publishing a message') const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) - await this.#publisher.publish(channel, encoded) + return await this.#publisher.publish(channel, encoded) } async subscribe( channel: string, handler: SubscribeHandler ): Promise { - this.#subscriber.subscribe(channel, (err) => { - if (err) { - throw err - } - }) - - const event = this.#useMessageBuffer ? 'messageBuffer' : 'message' - this.#subscriber.on(event, (receivedChannel: Buffer | string, message: Buffer | string) => { - receivedChannel = receivedChannel.toString() - - if (channel !== receivedChannel) return - - debug('received message for channel "%s"', channel) - - const data = this.#encoder.decode>(message) - - /** - * Ignore messages published by this bus instance - */ - if (data.busId === this.#id) { - debug('ignoring message published by the same bus instance') - return - } - - // @ts-expect-error - TODO: Weird typing issue - handler(data.payload) - }) + let handlers = this.#handlers.get(channel) + if (!handlers) { + handlers = new Set() + this.#handlers.set(channel, handlers) + await this.#subscriber.subscribe(channel) + } + handlers.add(this.#makeHandler(handler)) } onReconnect(callback: () => void): void { @@ -123,6 +136,7 @@ export class RedisTransport implements Transport { } async unsubscribe(channel: string): Promise { + this.#handlers.delete(channel) await this.#subscriber.unsubscribe(channel) } } diff --git a/src/types/main.ts b/src/types/main.ts index 5ca8db1..aab0dbe 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -69,7 +69,7 @@ export interface MqttTransportConfig { export interface Transport { setId: (id: string) => Transport onReconnect: (callback: () => void) => void - publish: (channel: string, message: Serializable) => Promise + publish: (channel: string, message: Serializable) => Promise subscribe: ( channel: string, handler: SubscribeHandler diff --git a/test_helpers/chaos_transport.ts b/test_helpers/chaos_transport.ts index ba1a936..b175929 100644 --- a/test_helpers/chaos_transport.ts +++ b/test_helpers/chaos_transport.ts @@ -59,8 +59,8 @@ export class ChaosTransport implements Transport { return this.#innerTransport.subscribe(channel, handler) } - unsubscribe(channel: string) { - return this.#innerTransport.unsubscribe(channel) + async unsubscribe(channel: string) { + return await this.#innerTransport.unsubscribe(channel) } disconnect() {