diff --git a/src/bus.ts b/src/bus.ts index ac7e849..cbb3eb6 100644 --- a/src/bus.ts +++ b/src/bus.ts @@ -54,10 +54,10 @@ export class Bus { await this.processErrorRetryQueue() } - subscribe(channel: string, handler: SubscribeHandler) { + async subscribe(channel: string, handler: SubscribeHandler) { debug(`subscribing to channel ${channel}`) - return this.#transport.subscribe(channel, async (message) => { + return await this.#transport.subscribe(channel, async (message) => { debug('received message %j from bus', message) // @ts-expect-error - TODO: Weird typing issue handler(message) @@ -94,7 +94,7 @@ export class Bus { return this.#transport.disconnect() } - unsubscribe(channel: string) { - return this.#transport.unsubscribe(channel) + async unsubscribe(channel: string) { + return await this.#transport.unsubscribe(channel) } } diff --git a/src/bus_manager.ts b/src/bus_manager.ts index fa8b494..12e01d8 100644 --- a/src/bus_manager.ts +++ b/src/bus_manager.ts @@ -58,12 +58,12 @@ export class BusManager> return this.use().publish(channel, message) } - subscribe(channel: string, handler: SubscribeHandler) { - return this.use().subscribe(channel, handler) + async subscribe(channel: string, handler: SubscribeHandler) { + return await this.use().subscribe(channel, handler) } - unsubscribe(channel: string) { - return this.use().unsubscribe(channel) + async unsubscribe(channel: string) { + return await this.use().unsubscribe(channel) } disconnect() { diff --git a/src/transports/memory.ts b/src/transports/memory.ts index dec1fef..4febc6f 100644 --- a/src/transports/memory.ts +++ b/src/transports/memory.ts @@ -38,16 +38,20 @@ export class MemoryTransport implements Transport { async publish(channel: string, message: Serializable) { const handlers = MemoryTransport.#subscriptions.get(channel) + let count: number = 0 if (!handlers) { - return + return count } for (const { handler, busId } of handlers) { if (busId === this.#id) continue + count++ handler(message) } + + return count } async subscribe(channel: string, handler: SubscribeHandler) { diff --git a/src/transports/mqtt.ts b/src/transports/mqtt.ts index df12ae3..f40937c 100644 --- a/src/transports/mqtt.ts +++ b/src/transports/mqtt.ts @@ -47,12 +47,13 @@ export class MqttTransport implements Transport { await this.#client.endAsync() } - async publish(channel: string, message: any): Promise { + async publish(channel: string, message: any): Promise { assert(this.#id, 'You must set an id before publishing a message') const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) await this.#client.publishAsync(channel, encoded) + return -1 // undefined } async subscribe( diff --git a/src/transports/redis.ts b/src/transports/redis.ts index af89683..7830481 100644 --- a/src/transports/redis.ts +++ b/src/transports/redis.ts @@ -13,13 +13,14 @@ import { JsonEncoder } from '../encoders/json_encoder.js' import type { Transport, TransportEncoder, - TransportMessage, Serializable, SubscribeHandler, RedisTransportConfig, RedisTransportOptions, } from '../types/main.js' +type Handler = (message: Buffer | string) => void | Promise + export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) { return () => new RedisTransport(config, encoder) } @@ -29,6 +30,7 @@ export class RedisTransport implements Transport { readonly #subscriber: Redis | Cluster readonly #encoder: TransportEncoder readonly #useMessageBuffer: boolean = false + readonly #handlers = new Map>() #id: string | undefined @@ -54,6 +56,7 @@ export class RedisTransport implements Transport { this.#publisher = options.duplicate() this.#subscriber = options.duplicate() this.#useMessageBuffer = transportOptions?.useMessageBuffer ?? false + this.#setupSubscriber() return } @@ -65,6 +68,36 @@ export class RedisTransport implements Transport { if (typeof options === 'object') { this.#useMessageBuffer = options.useMessageBuffer ?? false } + this.#setupSubscriber() + } + + #setupSubscriber = () => { + const event = this.#useMessageBuffer ? 'messageBuffer' : 'message' + this.#subscriber.on(event, this.#onMessage) + } + + #onMessage = async (receivedChannel: Buffer | string, message: Buffer | string) => { + const channel = receivedChannel.toString() + const handlers = this.#handlers.get(channel) + debug('received message for channel "%s"', channel) + if (!handlers || handlers.size === 0) { + debug('no handlers for channel "%s"', channel) + return + } + for (const handler of handlers) { + await handler(message) + } + } + + #makeHandler = (handler: SubscribeHandler) => { + return async (message: Buffer | string) => { + const data = this.#encoder.decode(message) + if (data.busId === this.#id) { + debug('ignoring message published by the same bus instance') + return + } + await handler(data.payload) + } } setId(id: string): Transport { @@ -77,45 +110,25 @@ export class RedisTransport implements Transport { await Promise.all([this.#publisher.quit(), this.#subscriber.quit()]) } - async publish(channel: string, message: Serializable): Promise { + async publish(channel: string, message: Serializable): Promise { assert(this.#id, 'You must set an id before publishing a message') const encoded = this.#encoder.encode({ payload: message, busId: this.#id }) - await this.#publisher.publish(channel, encoded) + return await this.#publisher.publish(channel, encoded) } async subscribe( channel: string, handler: SubscribeHandler ): Promise { - this.#subscriber.subscribe(channel, (err) => { - if (err) { - throw err - } - }) - - const event = this.#useMessageBuffer ? 'messageBuffer' : 'message' - this.#subscriber.on(event, (receivedChannel: Buffer | string, message: Buffer | string) => { - receivedChannel = receivedChannel.toString() - - if (channel !== receivedChannel) return - - debug('received message for channel "%s"', channel) - - const data = this.#encoder.decode>(message) - - /** - * Ignore messages published by this bus instance - */ - if (data.busId === this.#id) { - debug('ignoring message published by the same bus instance') - return - } - - // @ts-expect-error - TODO: Weird typing issue - handler(data.payload) - }) + let handlers = this.#handlers.get(channel) + if (!handlers) { + handlers = new Set() + this.#handlers.set(channel, handlers) + await this.#subscriber.subscribe(channel) + } + handlers.add(this.#makeHandler(handler)) } onReconnect(callback: () => void): void { @@ -123,6 +136,7 @@ export class RedisTransport implements Transport { } async unsubscribe(channel: string): Promise { + this.#handlers.delete(channel) await this.#subscriber.unsubscribe(channel) } } diff --git a/src/types/main.ts b/src/types/main.ts index 5ca8db1..aab0dbe 100644 --- a/src/types/main.ts +++ b/src/types/main.ts @@ -69,7 +69,7 @@ export interface MqttTransportConfig { export interface Transport { setId: (id: string) => Transport onReconnect: (callback: () => void) => void - publish: (channel: string, message: Serializable) => Promise + publish: (channel: string, message: Serializable) => Promise subscribe: ( channel: string, handler: SubscribeHandler diff --git a/test_helpers/chaos_transport.ts b/test_helpers/chaos_transport.ts index ba1a936..b175929 100644 --- a/test_helpers/chaos_transport.ts +++ b/test_helpers/chaos_transport.ts @@ -59,8 +59,8 @@ export class ChaosTransport implements Transport { return this.#innerTransport.subscribe(channel, handler) } - unsubscribe(channel: string) { - return this.#innerTransport.unsubscribe(channel) + async unsubscribe(channel: string) { + return await this.#innerTransport.unsubscribe(channel) } disconnect() {