From 2562c93bbf814f3ff26da8171105f6de28175b06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Guti=C3=A9rrez?= Date: Thu, 21 May 2026 17:47:52 +0200 Subject: [PATCH] feat(shards): group shard deletions by farmer and enqueue as batch jobs --- lib/core/shards/usecase.ts | 34 +++++--- tests/lib/core/shards/usecase.test.ts | 115 +++++++++++++++++--------- 2 files changed, 96 insertions(+), 53 deletions(-) diff --git a/lib/core/shards/usecase.ts b/lib/core/shards/usecase.ts index 0228a4d1..4ea3ba0b 100644 --- a/lib/core/shards/usecase.ts +++ b/lib/core/shards/usecase.ts @@ -46,27 +46,35 @@ export class ShardsUsecase { } } + const byFarmer = new Map(); + for (const { contact, shardHash } of stillExistentMirrors) { const { address, port } = contact; - const { uuid } = (shards.find(s => s.hash === shardHash) as { hash: string, uuid: string }); - - const url = `http://${address}:${port}/v2/shards/${uuid}`; + const farmerKey = `${address}:${port}`; + const shard = shards.find(s => s.hash === shardHash); + if (!shard) continue; - try { - const q = getQueue(); - if (!q) { - console.error('deleteShards: BullMQ queue not initialized, skipping enqueue for shard %s', uuid); - } else { - console.log('adding removal of shard %s to the queue', uuid) - q.add('delete-shard', { key: uuid, hash: uuid, url }, { + if (!byFarmer.has(farmerKey)) { + byFarmer.set(farmerKey, { url: `http://${address}:${port}/v2/shards`, keys: [] }); + } + byFarmer.get(farmerKey)!.keys.push(shard.uuid); + } + + const q = getQueue(); + if (!q) { + console.error('deleteShards: BullMQ queue not initialized'); + } else { + for (const { url, keys } of Array.from(byFarmer.values())) { + for (let i = 0; i < keys.length; i += 50) { + const chunk = keys.slice(i, i + 50); + log.info('deleteShards: enqueuing batch of %d keys to %s: %s', chunk.length, url, chunk.join(', ')); + q.add('delete-shards-batch', { url, keys: chunk }, { attempts: 3, backoff: { type: 'exponential', delay: 1000 }, }).catch((err) => { - console.error('deleteShards: Error enqueuing BullMQ job for shard %s: %s', uuid, err.message); + console.error('deleteShards: Error enqueuing BullMQ batch job: %s', err.message); }); } - } catch (err: any) { - console.error('deleteShards: Failed to enqueue BullMQ job for shard %s: %s', uuid, err.message); } } diff --git a/tests/lib/core/shards/usecase.test.ts b/tests/lib/core/shards/usecase.test.ts index f8f27bf7..4fd64c22 100644 --- a/tests/lib/core/shards/usecase.test.ts +++ b/tests/lib/core/shards/usecase.test.ts @@ -24,19 +24,17 @@ describe('ShardsUsecase', () => { }); describe('deleteShardsStorageByUuids()', () => { - it('When mirrors exist, then it deletes them properly', async () => { + it('When mirrors exist, then enqueues one batch job per farmer', async () => { const shardsToDelete = [fixtures.getShard(), fixtures.getShard()]; - const [firstShard, secondShard] = shardsToDelete; const contacts = shardsToDelete.map((s) => fixtures.getContact({ id: s.contracts[0].nodeID })); const mirrors: MirrorWithContact[] = contacts.map((c, i) => ({ ...fixtures.getMirror(), shardHash: shardsToDelete[i].hash, contact: c, })); - const [firstMirror, secondMirror] = mirrors; - const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); - const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(); + stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + stub(contactsRepository, 'findByIds').resolves(); const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves(); const add = stub().resolves({ id: 'job-id-1' }); @@ -44,37 +42,81 @@ describe('ShardsUsecase', () => { await usecase.deleteShardsStorageByUuids(shardsToDelete as (Shard & { uuid: string })[]); - expect(findByShardHashes.calledOnce).toBeTruthy(); - expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map((s) => s.hash)]); - expect(findContactsByIds.notCalled).toBeTruthy(); + expect(add.callCount).toEqual(2); + add.args.forEach((args, i) => { + const shard = shardsToDelete[i]; + const contact = contacts[i]; + expect(args[0]).toEqual('delete-shards-batch'); + expect(args[1]).toEqual({ + url: `http://${contact.address}:${contact.port}/v2/shards`, + keys: [shard.uuid], + }); + }); + + expect(deleteMirrorsByIds.calledOnce).toBeTruthy(); + expect(deleteMirrorsByIds.firstCall.args).toStrictEqual([mirrors.map((m) => m.id)]); + }); + + it('When multiple shards share the same farmer, then enqueues a single batch job with all keys', async () => { + const contact = fixtures.getContact(); + const shardsToDelete = [ + fixtures.getShard({ contracts: [{ nodeID: contact.id, contract: {} as any }] }), + fixtures.getShard({ contracts: [{ nodeID: contact.id, contract: {} as any }] }), + ]; + const mirrors: MirrorWithContact[] = shardsToDelete.map((s) => ({ + ...fixtures.getMirror(), + shardHash: s.hash, + contact, + })); - expect(add.callCount).toEqual(mirrors.length); - expect(add.firstCall.args[0]).toEqual('delete-shard'); + stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + stub(contactsRepository, 'findByIds').resolves(); + stub(mirrorsRepository, 'deleteByIds').resolves(); + + const add = stub().resolves({ id: 'job-id-1' }); + stub(BullQueueModule, 'getQueue').returns({ add } as any); + + await usecase.deleteShardsStorageByUuids(shardsToDelete as (Shard & { uuid: string })[]); + + expect(add.callCount).toEqual(1); + expect(add.firstCall.args[0]).toEqual('delete-shards-batch'); expect(add.firstCall.args[1]).toEqual({ - key: firstShard.uuid, - hash: firstShard.uuid, - url: `http://${firstMirror.contact.address}:${firstMirror.contact.port}/v2/shards/${firstShard.uuid}`, + url: `http://${contact.address}:${contact.port}/v2/shards`, + keys: shardsToDelete.map((s) => s.uuid), }); + }); - expect(add.secondCall.args[0]).toEqual('delete-shard'); - expect(add.secondCall.args[1]).toEqual({ - key: secondShard.uuid, - hash: secondShard.uuid, - url: `http://${secondMirror.contact.address}:${secondMirror.contact.port}/v2/shards/${secondShard.uuid}`, - }); + it('When a farmer has more than 50 shards, then enqueues multiple chunks of 50', async () => { + const contact = fixtures.getContact(); + const shardsToDelete = Array.from({ length: 55 }, () => + fixtures.getShard({ contracts: [{ nodeID: contact.id, contract: {} as any }] }) + ); + const mirrors: MirrorWithContact[] = shardsToDelete.map((s) => ({ + ...fixtures.getMirror(), + shardHash: s.hash, + contact, + })); - expect(deleteMirrorsByIds.calledOnce).toBeTruthy(); - expect(deleteMirrorsByIds.firstCall.args).toStrictEqual([mirrors.map((m) => m.id)]); + stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + stub(contactsRepository, 'findByIds').resolves(); + stub(mirrorsRepository, 'deleteByIds').resolves(); + + const add = stub().resolves({ id: 'job-id-1' }); + stub(BullQueueModule, 'getQueue').returns({ add } as any); + + await usecase.deleteShardsStorageByUuids(shardsToDelete as (Shard & { uuid: string })[]); + + expect(add.callCount).toEqual(2); + expect(add.firstCall.args[1].keys).toHaveLength(50); + expect(add.secondCall.args[1].keys).toHaveLength(5); }); it('When mirrors do not exist, then uses contracts as fallback to delete shards', async () => { const shardsToDelete = [fixtures.getShard(), fixtures.getShard()]; - const [firstShard, secondShard] = shardsToDelete; const contacts = shardsToDelete.map((s) => fixtures.getContact({ id: s.contracts[0].nodeID })); - const [firstContact, secondContact] = contacts; const mirrors: MirrorWithContact[] = []; - const findByShardHashes = stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); + stub(mirrorsRepository, 'findByShardHashesWithContacts').resolves(mirrors); const findContactsByIds = stub(contactsRepository, 'findByIds').resolves(contacts); const deleteMirrorsByIds = stub(mirrorsRepository, 'deleteByIds').resolves(); @@ -83,27 +125,20 @@ describe('ShardsUsecase', () => { await usecase.deleteShardsStorageByUuids(shardsToDelete as (Shard & { uuid: string })[]); - expect(findByShardHashes.calledOnce).toBeTruthy(); - expect(findByShardHashes.firstCall.args).toStrictEqual([shardsToDelete.map((s) => s.hash)]); expect(findContactsByIds.calledOnce).toBeTruthy(); expect(findContactsByIds.firstCall.args).toStrictEqual([ shardsToDelete.flatMap((s) => s.contracts.flatMap((c) => c.nodeID)), ]); - expect(add.callCount).toEqual(shardsToDelete.reduce((a, s) => a + s.contracts.length, 0)); - - expect(add.firstCall.args[0]).toEqual('delete-shard'); - expect(add.firstCall.args[1]).toEqual({ - key: firstShard.uuid, - hash: firstShard.uuid, - url: `http://${firstContact.address}:${firstContact.port}/v2/shards/${firstShard.uuid}`, - }); - - expect(add.secondCall.args[0]).toEqual('delete-shard'); - expect(add.secondCall.args[1]).toEqual({ - key: secondShard.uuid, - hash: secondShard.uuid, - url: `http://${secondContact.address}:${secondContact.port}/v2/shards/${secondShard.uuid}`, + expect(add.callCount).toEqual(2); + add.args.forEach((args, i) => { + const shard = shardsToDelete[i]; + const contact = contacts[i]; + expect(args[0]).toEqual('delete-shards-batch'); + expect(args[1]).toEqual({ + url: `http://${contact.address}:${contact.port}/v2/shards`, + keys: [shard.uuid], + }); }); expect(deleteMirrorsByIds.notCalled).toBeTruthy();