diff --git a/bin/cli/commands/empty-users-from-csv.command.ts b/bin/cli/commands/empty-users-from-csv.command.ts new file mode 100644 index 00000000..15f911e7 --- /dev/null +++ b/bin/cli/commands/empty-users-from-csv.command.ts @@ -0,0 +1,15 @@ +import { PrepareFunctionReturnType } from '../init'; +import { emptyUsersFromCsv } from '../tasks/empty-users-from-csv.task'; +import { CommandId } from './id'; + +export default { + id: CommandId.EmptyUsersFromCsv, + version: '0.0.1', + fn: async ( + { repo: { usersRepository }, usecase: { bucketsUsecase, bucketEntriesUsecase } }: PrepareFunctionReturnType, + csvPath: string, + concurrency: number, + ): Promise => { + await emptyUsersFromCsv(csvPath, usersRepository, bucketsUsecase, bucketEntriesUsecase, concurrency); + }, +}; diff --git a/bin/cli/commands/id.ts b/bin/cli/commands/id.ts index 0c904c8c..07b12e70 100644 --- a/bin/cli/commands/id.ts +++ b/bin/cli/commands/id.ts @@ -4,4 +4,5 @@ export enum CommandId { EmptyBuckets = 'empty-buckets', CleanStalledFrames = 'clean-stalled-frames', CleanStalledBucketEntries = 'clean-stalled-bucket-entries', + EmptyUsersFromCsv = 'empty-users-from-csv', } diff --git a/bin/cli/commands/index.ts b/bin/cli/commands/index.ts index f978ab79..a5a1ed58 100644 --- a/bin/cli/commands/index.ts +++ b/bin/cli/commands/index.ts @@ -6,6 +6,7 @@ import { default as emptyBucket } from "./empty-bucket.command"; import { default as emptyBuckets } from "./empty-buckets.command"; import { default as cleanStalledFrames } from "./clean-stalled-frames.command"; import { default as cleanStalledBucketEntries } from "./clean-stalled-bucket-entries.command"; +import { default as emptyUsersFromCsv } from "./empty-users-from-csv.command"; export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ({ [destroyUserBuckets.id]: buildCommand({ @@ -57,4 +58,16 @@ export default (resources: PrepareFunctionReturnType, onFinish: () => void) => ( await cleanStalledBucketEntries.fn(resources); onFinish(); }), + + [emptyUsersFromCsv.id]: buildCommand({ + version: emptyUsersFromCsv.version, + command: `${emptyUsersFromCsv.id} `, + description: 'Empties buckets for all users listed in a CSV file (one email per line)', + options: [ + { flags: '-c, --concurrency ', description: 'Number of users to process in parallel', defaultValue: '3' }, + ], + }).action(async (csvPath, { concurrency }) => { + await emptyUsersFromCsv.fn(resources, csvPath, parseInt(concurrency, 10)); + onFinish(); + }), }); diff --git a/bin/cli/init.ts b/bin/cli/init.ts index bf3122c2..5c3b0b40 100644 --- a/bin/cli/init.ts +++ b/bin/cli/init.ts @@ -1,4 +1,3 @@ -import NetworkMessageQueue from '../../lib/server/queues/networkQueue'; import { connectToDatabase } from "../utils/database"; import { MongoDBUsersRepository } from '../../lib/core/users/MongoDBUsersRepository' @@ -32,6 +31,7 @@ import { ContactsRepository } from '../../lib/core/contacts/Repository'; import { MongoDB } from '../delete-objects/temp-shard.model'; import { DatabaseFramesReader, DatabaseBucketEntriesReaderWithoutBucket } from '../delete-objects/ObjectStorage'; import { FileStateRepository } from '../../lib/core/fileState/Repository'; +import * as bullQueueModule from '../../lib/core/queue/bullQueue'; const Config = require('../../lib/config'); @@ -63,28 +63,17 @@ export type PrepareFunctionReturnType = { } export async function prepare(): Promise { - const QUEUE_NAME = 'NETWORK_WORKER_TASKS_QUEUE'; - - const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUri as string); + const newDbConnection = new MongoDB(process.env.inxtbridge_storage__mongoUrl as string); await newDbConnection.connect(); const models = await connectToDatabase('', ''); - const { QUEUE_USERNAME, QUEUE_PASSWORD, QUEUE_HOST } = config; + try { + bullQueueModule.init(config); + console.log('bull queue initialized'); + } catch (err) { + const error = err as any; + console.error('failed to initialize bull queue:', error && error.message ? error.message : error); + } - const networkQueue = new NetworkMessageQueue({ - connection: { - url: `amqp://${QUEUE_USERNAME}:${QUEUE_PASSWORD}@${QUEUE_HOST}`, - }, - exchange: { - name: 'exchangeName', - type: 'direct', - }, - queue: { - name: QUEUE_NAME, - }, - routingKey: { - name: 'routingKeyName', - }, - }); const bucketEntriesRepository = new MongoDBBucketEntriesRepository(models.BucketEntry); const bucketEntryShardsRepository = new MongoDBBucketEntryShardsRepository(models.BucketEntryShard); const bucketsRepository = new MongoDBBucketsRepository(models.Bucket); @@ -101,7 +90,6 @@ export async function prepare(): Promise { const shardsUsecase = new ShardsUsecase( mirrorsRepository, contactsRepository, - networkQueue ); const bucketEntriesUsecase = new BucketEntriesUsecase( bucketEntriesRepository, @@ -133,8 +121,6 @@ export async function prepare(): Promise { const bucketEntriesReader = new DatabaseBucketEntriesReaderWithoutBucket( newDbConnection.getCollections().bucketEntries ); - await networkQueue.connectAndRetry(); - return { readers: { framesReader, diff --git a/bin/cli/tasks/empty-users-from-csv.task.ts b/bin/cli/tasks/empty-users-from-csv.task.ts new file mode 100644 index 00000000..52f52597 --- /dev/null +++ b/bin/cli/tasks/empty-users-from-csv.task.ts @@ -0,0 +1,72 @@ +import fs from 'fs'; +import readline from 'readline'; +import { BucketEntriesUsecase } from '../../../lib/core/bucketEntries/usecase'; +import { BucketsUsecase } from '../../../lib/core/buckets/usecase'; +import { UsersRepository } from '../../../lib/core/users/Repository'; +import { emptyBucket } from './empty-bucket.task'; +import { emptyBuckets } from './empty-buckets.task'; + +const emptyUserBuckets = async ( + email: string, + usersRepository: UsersRepository, + bucketsUsecase: BucketsUsecase, + bucketEntriesUsecase: BucketEntriesUsecase, +): Promise => { + const user = await usersRepository.findByEmail(email); + + if (!user) { + console.log(`User not found: ${email}`); + return; + } + + console.log(`Emptying buckets for user ${email} (${user.id})`); + + const limit = 20; + let offset = 0; + let moreToProcess = true; + + do { + const buckets = await bucketsUsecase.listByUserId(user.id, limit, offset); + + moreToProcess = buckets.length === limit; + offset += buckets.length; + + await emptyBuckets( + buckets.map(({ id }) => id), + (id) => emptyBucket(id, bucketEntriesUsecase), + async (id) => console.log(` Bucket ${id} emptied`), + ); + } while (moreToProcess); + + console.log(`Done with user ${email}`); +}; + +export const emptyUsersFromCsv = async ( + csvPath: string, + usersRepository: UsersRepository, + bucketsUsecase: BucketsUsecase, + bucketEntriesUsecase: BucketEntriesUsecase, + concurrency = 3, +): Promise => { + const fileStream = fs.createReadStream(csvPath); + const rl = readline.createInterface({ input: fileStream, crlfDelay: Infinity }); + + const emails: string[] = []; + for await (const line of rl) { + const email = line.trim(); + if (email && email.includes('@')) emails.push(email); + } + + console.log(`Processing ${emails.length} users with concurrency ${concurrency}`); + + let index = 0; + + const worker = async (): Promise => { + while (index < emails.length) { + const email = emails[index++]; + await emptyUserBuckets(email, usersRepository, bucketsUsecase, bucketEntriesUsecase); + } + }; + + await Promise.all(Array.from({ length: concurrency }, worker)); +}; diff --git a/bin/utils/database.ts b/bin/utils/database.ts index 958dfe5a..cdf65543 100644 --- a/bin/utils/database.ts +++ b/bin/utils/database.ts @@ -1,10 +1,9 @@ import { config as loadEnv } from 'dotenv'; import Config from '../../lib/config'; +import DatabaseConnection from '../../lib/models/database'; loadEnv(); -const Storage = require('storj-service-storage-models') as any; - const wait = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); export interface Models { @@ -22,7 +21,7 @@ export interface Models { FileState: any, }; -export async function connectToDatabase(configJSON: any, mongoURL: string): Promise { +export async function connectToDatabase(configJSON: any, mongoURL: string): Promise { const config = new Config(process.env.NODE_ENV, configJSON, '') as { storage: { mongoUrl: string; @@ -31,13 +30,9 @@ export async function connectToDatabase(configJSON: any, mongoURL: string): Prom QUEUE_USERNAME: string; QUEUE_PASSWORD: string; QUEUE_HOST: string; - }; + } - const storage = new Storage( - mongoURL || config.storage.mongoUrl, - config.storage.mongoOpts, - null - ); + const storage = DatabaseConnection.createFromConfig(config.storage); await wait(5000);