Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ export class Bus {
await this.processErrorRetryQueue()
}

subscribe<T extends Serializable>(channel: string, handler: SubscribeHandler<T>) {
async subscribe<T extends Serializable>(channel: string, handler: SubscribeHandler<T>) {
debug(`subscribing to channel ${channel}`)

return this.#transport.subscribe(channel, async (message) => {
return await this.#transport.subscribe(channel, async (message) => {
debug('received message %j from bus', message)
// @ts-expect-error - TODO: Weird typing issue
handler(message)
Expand Down Expand Up @@ -94,7 +94,7 @@ export class Bus {
return this.#transport.disconnect()
}

unsubscribe(channel: string) {
return this.#transport.unsubscribe(channel)
async unsubscribe(channel: string) {
return await this.#transport.unsubscribe(channel)
}
}
8 changes: 4 additions & 4 deletions src/bus_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ export class BusManager<KnownTransports extends Record<string, TransportConfig>>
return this.use().publish(channel, message)
}

subscribe<T extends Serializable>(channel: string, handler: SubscribeHandler<T>) {
return this.use().subscribe(channel, handler)
async subscribe<T extends Serializable>(channel: string, handler: SubscribeHandler<T>) {
return await this.use().subscribe(channel, handler)
}

unsubscribe(channel: string) {
return this.use().unsubscribe(channel)
async unsubscribe(channel: string) {
return await this.use().unsubscribe(channel)
}

disconnect() {
Expand Down
6 changes: 5 additions & 1 deletion src/transports/memory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,20 @@ export class MemoryTransport implements Transport {

async publish(channel: string, message: Serializable) {
const handlers = MemoryTransport.#subscriptions.get(channel)
let count: number = 0

if (!handlers) {
return
return count
}

for (const { handler, busId } of handlers) {
if (busId === this.#id) continue
count++

handler(message)
}

return count
}

async subscribe<T extends Serializable>(channel: string, handler: SubscribeHandler<T>) {
Expand Down
3 changes: 2 additions & 1 deletion src/transports/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ export class MqttTransport implements Transport {
await this.#client.endAsync()
}

async publish(channel: string, message: any): Promise<void> {
async publish(channel: string, message: any): Promise<number> {
assert(this.#id, 'You must set an id before publishing a message')

const encoded = this.#encoder.encode({ payload: message, busId: this.#id })

await this.#client.publishAsync(channel, encoded)
return -1 // undefined
}

async subscribe<T extends Serializable>(
Expand Down
74 changes: 44 additions & 30 deletions src/transports/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import { JsonEncoder } from '../encoders/json_encoder.js'
import type {
Transport,
TransportEncoder,
TransportMessage,
Serializable,
SubscribeHandler,
RedisTransportConfig,
RedisTransportOptions,
} from '../types/main.js'

type Handler = (message: Buffer | string) => void | Promise<void>

export function redis(config: RedisTransportConfig, encoder?: TransportEncoder) {
return () => new RedisTransport(config, encoder)
}
Expand All @@ -29,6 +30,7 @@ export class RedisTransport implements Transport {
readonly #subscriber: Redis | Cluster
readonly #encoder: TransportEncoder
readonly #useMessageBuffer: boolean = false
readonly #handlers = new Map<string, Set<Handler>>()

#id: string | undefined

Expand All @@ -54,6 +56,7 @@ export class RedisTransport implements Transport {
this.#publisher = options.duplicate()
this.#subscriber = options.duplicate()
this.#useMessageBuffer = transportOptions?.useMessageBuffer ?? false
this.#setupSubscriber()
return
}

Expand All @@ -65,6 +68,36 @@ export class RedisTransport implements Transport {
if (typeof options === 'object') {
this.#useMessageBuffer = options.useMessageBuffer ?? false
}
this.#setupSubscriber()
}

#setupSubscriber = () => {
const event = this.#useMessageBuffer ? 'messageBuffer' : 'message'
this.#subscriber.on(event, this.#onMessage)
}

#onMessage = async (receivedChannel: Buffer | string, message: Buffer | string) => {
const channel = receivedChannel.toString()
const handlers = this.#handlers.get(channel)
debug('received message for channel "%s"', channel)
if (!handlers || handlers.size === 0) {
debug('no handlers for channel "%s"', channel)
return
}
for (const handler of handlers) {
await handler(message)
}
}

#makeHandler = <T extends Serializable>(handler: SubscribeHandler<T>) => {
return async (message: Buffer | string) => {
const data = this.#encoder.decode<T>(message)
if (data.busId === this.#id) {
debug('ignoring message published by the same bus instance')
return
}
await handler(data.payload)
}
}

setId(id: string): Transport {
Expand All @@ -77,52 +110,33 @@ export class RedisTransport implements Transport {
await Promise.all([this.#publisher.quit(), this.#subscriber.quit()])
}

async publish(channel: string, message: Serializable): Promise<void> {
async publish(channel: string, message: Serializable): Promise<number> {
assert(this.#id, 'You must set an id before publishing a message')

const encoded = this.#encoder.encode({ payload: message, busId: this.#id })

await this.#publisher.publish(channel, encoded)
return await this.#publisher.publish(channel, encoded)
}

async subscribe<T extends Serializable>(
channel: string,
handler: SubscribeHandler<T>
): Promise<void> {
this.#subscriber.subscribe(channel, (err) => {
if (err) {
throw err
}
})

const event = this.#useMessageBuffer ? 'messageBuffer' : 'message'
this.#subscriber.on(event, (receivedChannel: Buffer | string, message: Buffer | string) => {
receivedChannel = receivedChannel.toString()

if (channel !== receivedChannel) return

debug('received message for channel "%s"', channel)

const data = this.#encoder.decode<TransportMessage<T>>(message)

/**
* Ignore messages published by this bus instance
*/
if (data.busId === this.#id) {
debug('ignoring message published by the same bus instance')
return
}

// @ts-expect-error - TODO: Weird typing issue
handler(data.payload)
})
let handlers = this.#handlers.get(channel)
if (!handlers) {
handlers = new Set()
this.#handlers.set(channel, handlers)
await this.#subscriber.subscribe(channel)
}
handlers.add(this.#makeHandler(handler))
}

onReconnect(callback: () => void): void {
this.#subscriber.on('reconnecting', callback)
}

async unsubscribe(channel: string): Promise<void> {
this.#handlers.delete(channel)
await this.#subscriber.unsubscribe(channel)
}
}
2 changes: 1 addition & 1 deletion src/types/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export interface MqttTransportConfig {
export interface Transport {
setId: (id: string) => Transport
onReconnect: (callback: () => void) => void
publish: (channel: string, message: Serializable) => Promise<void>
publish: (channel: string, message: Serializable) => Promise<number>
subscribe: <T extends Serializable>(
channel: string,
handler: SubscribeHandler<T>
Expand Down
4 changes: 2 additions & 2 deletions test_helpers/chaos_transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ export class ChaosTransport implements Transport {
return this.#innerTransport.subscribe(channel, handler)
}

unsubscribe(channel: string) {
return this.#innerTransport.unsubscribe(channel)
async unsubscribe(channel: string) {
return await this.#innerTransport.unsubscribe(channel)
}

disconnect() {
Expand Down