diff --git a/packages/Dbal/tests/Integration/MultiTenant/DeduplicationCleanupMultiTenantTest.php b/packages/Dbal/tests/Integration/MultiTenant/DeduplicationCleanupMultiTenantTest.php new file mode 100644 index 000000000..4ce03ae72 --- /dev/null +++ b/packages/Dbal/tests/Integration/MultiTenant/DeduplicationCleanupMultiTenantTest.php @@ -0,0 +1,101 @@ +connectionForTenantA(), $this->connectionForTenantB()] as $connectionFactory) { + $connectionFactory->createContext()->getDbalConnection() + ->executeStatement('DROP TABLE IF EXISTS ecotone_deduplication'); + } + } + + public function test_reproduces_667_cleanup_without_tenant_header_throws_when_no_default_connection(): void + { + $ecotoneLite = $this->bootstrapEcotone(); + + $this->expectException(InvalidArgumentException::class); + + $ecotoneLite->runConsoleCommand('ecotone:deduplication:remove-expired-messages', []); + } + + public function test_cleanup_with_tenant_header_routes_to_correct_tenant_connection(): void + { + $ecotoneLite = $this->bootstrapEcotone(); + + $ecotoneLite->sendCommandWithRoutingKey( + 'email_event_handler.handle_with_custom_deduplication_header', + metadata: ['tenant' => 'tenant_a', 'emailId' => 'a-1'] + ); + $ecotoneLite->sendCommandWithRoutingKey( + 'email_event_handler.handle_with_custom_deduplication_header', + metadata: ['tenant' => 'tenant_b', 'emailId' => 'b-1'] + ); + + $this->assertSame(1, $this->countDeduplicationRows($this->connectionForTenantA()), 'tenant_a should have one tracked message before cleanup'); + $this->assertSame(1, $this->countDeduplicationRows($this->connectionForTenantB()), 'tenant_b should have one tracked message before cleanup'); + + $ecotoneLite->runConsoleCommand('ecotone:deduplication:remove-expired-messages', ['header' => ['tenant:tenant_a']]); + + $this->assertSame(0, $this->countDeduplicationRows($this->connectionForTenantA()), 'tenant_a expired message should be removed'); + $this->assertSame(1, $this->countDeduplicationRows($this->connectionForTenantB()), 'tenant_b must be untouched - cleanup routed to tenant_a only'); + } + + private function countDeduplicationRows(object $connectionFactory): int + { + return (int) $connectionFactory->createContext()->getDbalConnection() + ->executeQuery('SELECT COUNT(*) FROM ecotone_deduplication') + ->fetchOne(); + } + + private function bootstrapEcotone(): FlowTestSupport + { + return EcotoneLite::bootstrapFlowTesting( + [EmailCommandHandler::class], + [ + new EmailCommandHandler(), + 'tenant_a_connection' => $this->connectionForTenantA(), + 'tenant_b_connection' => $this->connectionForTenantB(), + ], + ServiceConfiguration::createWithDefaults() + ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::DBAL_PACKAGE])) + ->withExtensionObjects([ + MultiTenantConfiguration::create( + tenantHeaderName: 'tenant', + tenantToConnectionMapping: [ + 'tenant_a' => 'tenant_a_connection', + 'tenant_b' => 'tenant_b_connection', + ], + ), + DbalConfiguration::createWithDefaults() + ->withDeduplication(true, expirationTime: 1), + ]), + ); + } +} diff --git a/packages/Ecotone/src/Messaging/Config/ConsoleCommandRunner.php b/packages/Ecotone/src/Messaging/Config/ConsoleCommandRunner.php index ec4b65493..94369c0fd 100644 --- a/packages/Ecotone/src/Messaging/Config/ConsoleCommandRunner.php +++ b/packages/Ecotone/src/Messaging/Config/ConsoleCommandRunner.php @@ -54,7 +54,7 @@ public function run(array $parameters): mixed } } - return $this->entrypoint->sendWithHeaders([], $arguments, $this->commandConfiguration->getChannelName()); + return $this->entrypoint->sendWithHeadersPropagation([], $arguments, $this->commandConfiguration->getChannelName()); } private function hasParameterWithGivenName(int|string $argumentName): bool