From 9b8a27b6252b6101c7cf503beb7d6d4c46ff756b Mon Sep 17 00:00:00 2001 From: Gustavo Freze Date: Thu, 2 Jul 2026 13:51:30 -0300 Subject: [PATCH 1/4] refactor: Reorganize the outbox schema boundary and write pipeline. Reorder the DoctrineOutboxRepository constructor by identifier length and move its per-record write into an OutboxWriter collaborator. Promote IdentityColumn to a public Schema interface and relocate ColumnsBuilder and TableLayoutBuilder out of Internal, so no Internal type appears in a public signature. The remaining edits bring the sources into phpcs compliance: bracketed null-coalescing, a non-empty catch, and inline code cross-references in place of unused imports. --- README.md | 35 +- phpstan.neon.dist | 2 +- src/DoctrineOutboxRepository.php | 63 +- src/Internal/BinaryIdentityColumn.php | 12 +- src/Internal/IdentityColumn.php | 14 - src/Internal/OutboxInsert.php | 4 +- src/Internal/OutboxWriter.php | 71 ++ src/Internal/StringIdentityColumn.php | 13 +- src/OutboxRepository.php | 9 +- src/Schema/Columns.php | 43 +- src/{Internal => Schema}/ColumnsBuilder.php | 132 +- src/Schema/IdentityColumn.php | 26 + src/Schema/IdentityColumnType.php | 1 - src/Schema/TableLayout.php | 42 +- .../TableLayoutBuilder.php | 55 +- tests/Integration/Database.php | 40 +- .../DoctrineOutboxRepositoryTest.php | 1070 ++++++++--------- tests/Integration/OutboxTableFactory.php | 16 +- tests/Models/EventRecordFactory.php | 4 +- tests/Unit/InMemoryOutboxRepositoryMock.php | 128 +- 20 files changed, 969 insertions(+), 811 deletions(-) delete mode 100644 src/Internal/IdentityColumn.php create mode 100644 src/Internal/OutboxWriter.php rename src/{Internal => Schema}/ColumnsBuilder.php (54%) create mode 100644 src/Schema/IdentityColumn.php rename src/{Internal => Schema}/TableLayoutBuilder.php (53%) diff --git a/README.md b/README.md index 330ce0a..0d55a5e 100644 --- a/README.md +++ b/README.md @@ -87,8 +87,9 @@ CREATE TABLE outbox_events ### Wiring the repository -`DoctrineOutboxRepository` requires a Doctrine DBAL `Connection`, an `IntegrationEventTranslators` collection, and a -`PayloadSerializers` collection. The table layout defaults to table `outbox_events` with BINARY(16) identity columns. +`DoctrineOutboxRepository` requires a Doctrine DBAL `Connection`, a `PayloadSerializers` collection, and an +`IntegrationEventTranslators` collection. The table layout defaults to table `outbox_events` with BINARY(16) identity +columns. ```php . - identifier: argument.type - path: src/DoctrineOutboxRepository.php + path: src/Internal/OutboxWriter.php # aggregateId is intentionally typed mixed at the public exception boundary; sprintf at level max flags the mixed argument. - identifier: argument.type diff --git a/src/DoctrineOutboxRepository.php b/src/DoctrineOutboxRepository.php index 0bdf70f..edde723 100644 --- a/src/DoctrineOutboxRepository.php +++ b/src/DoctrineOutboxRepository.php @@ -5,30 +5,32 @@ namespace TinyBlocks\Outbox; use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Exception\UniqueConstraintViolationException; use TinyBlocks\BuildingBlocks\Event\EventRecord; use TinyBlocks\BuildingBlocks\Event\EventRecords; -use TinyBlocks\BuildingBlocks\Event\IntegrationEventRecord; use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslators; -use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; -use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent; use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction; -use TinyBlocks\Outbox\Exceptions\PayloadSerializerNotConfigured; -use TinyBlocks\Outbox\Internal\OutboxInsert; +use TinyBlocks\Outbox\Internal\OutboxWriter; use TinyBlocks\Outbox\Schema\TableLayout; use TinyBlocks\Outbox\Serialization\PayloadSerializers; final readonly class DoctrineOutboxRepository implements OutboxRepository { private TableLayout $tableLayout; + private OutboxWriter $writer; public function __construct( private Connection $connection, - private IntegrationEventTranslators $translators, - private PayloadSerializers $serializers, + PayloadSerializers $serializers, + IntegrationEventTranslators $translators, ?TableLayout $tableLayout = null ) { - $this->tableLayout = $tableLayout ?? TableLayout::default(); + $this->tableLayout = ($tableLayout ?? TableLayout::default()); + $this->writer = new OutboxWriter( + connection: $connection, + serializers: $serializers, + tableLayout: $this->tableLayout, + translators: $translators + ); } public function push(EventRecords $records): void @@ -38,48 +40,7 @@ public function push(EventRecords $records): void } $records->each(actions: function (EventRecord $eventRecord): void { - $translator = $this->translators->findFor(record: $eventRecord); - - if (is_null($translator)) { - return; - } - - $integrationEventRecord = IntegrationEventRecord::from( - eventRecord: $eventRecord, - integrationEvent: $translator->translate(record: $eventRecord) - ); - - $payloadSerializer = $this->serializers->findFor(record: $integrationEventRecord); - - if (is_null($payloadSerializer)) { - throw PayloadSerializerNotConfigured::forEventClass( - eventClass: $integrationEventRecord->event::class - ); - } - - $insert = OutboxInsert::from( - record: $integrationEventRecord, - payload: $payloadSerializer->serialize(record: $integrationEventRecord), - tableLayout: $this->tableLayout - ); - - try { - $this->connection->executeStatement(sql: $insert->sql, params: $insert->parameters); - } catch (UniqueConstraintViolationException $exception) { - if ($this->tableLayout->uniqueConstraint->isViolatedBy(exception: $exception)) { - throw DuplicateAggregateVersion::forRecord( - previous: $exception, - aggregateId: $integrationEventRecord->aggregateId->identityValue(), - aggregateType: $integrationEventRecord->aggregateType, - aggregateVersion: $integrationEventRecord->aggregateVersion->value - ); - } - - throw DuplicateOutboxEvent::forRecord( - eventId: $integrationEventRecord->id->toString(), - previous: $exception - ); - } + $this->writer->write(eventRecord: $eventRecord); }); } } diff --git a/src/Internal/BinaryIdentityColumn.php b/src/Internal/BinaryIdentityColumn.php index bf71b12..03bce5d 100644 --- a/src/Internal/BinaryIdentityColumn.php +++ b/src/Internal/BinaryIdentityColumn.php @@ -5,14 +5,24 @@ namespace TinyBlocks\Outbox\Internal; use Ramsey\Uuid\Uuid; +use TinyBlocks\Outbox\Schema\IdentityColumn; -final readonly class BinaryIdentityColumn extends IdentityColumn +final readonly class BinaryIdentityColumn implements IdentityColumn { + private function __construct(private string $name) + { + } + public static function named(string $name): BinaryIdentityColumn { return new BinaryIdentityColumn(name: $name); } + public function name(): string + { + return $this->name; + } + public function convert(mixed $identityValue): string { return Uuid::fromString(uuid: (string)$identityValue)->getBytes(); diff --git a/src/Internal/IdentityColumn.php b/src/Internal/IdentityColumn.php deleted file mode 100644 index 3718618..0000000 --- a/src/Internal/IdentityColumn.php +++ /dev/null @@ -1,14 +0,0 @@ -tableName, - $columns->id->name, - $columns->aggregateId->name, + $columns->id->name(), + $columns->aggregateId->name(), $columns->aggregateType, $columns->eventType, $columns->revision, diff --git a/src/Internal/OutboxWriter.php b/src/Internal/OutboxWriter.php new file mode 100644 index 0000000..a042015 --- /dev/null +++ b/src/Internal/OutboxWriter.php @@ -0,0 +1,71 @@ +translators->findFor(record: $eventRecord); + + if (is_null($translator)) { + return; + } + + $record = IntegrationEventRecord::from( + eventRecord: $eventRecord, + integrationEvent: $translator->translate(record: $eventRecord) + ); + + $payloadSerializer = $this->serializers->findFor(record: $record); + + if (is_null($payloadSerializer)) { + throw PayloadSerializerNotConfigured::forEventClass(eventClass: $record->event::class); + } + + $insert = OutboxInsert::from( + record: $record, + payload: $payloadSerializer->serialize(record: $record), + tableLayout: $this->tableLayout + ); + + try { + $this->connection->executeStatement(sql: $insert->sql, params: $insert->parameters); + } catch (UniqueConstraintViolationException $exception) { + if ($this->tableLayout->uniqueConstraint->isViolatedBy(exception: $exception)) { + throw DuplicateAggregateVersion::forRecord( + previous: $exception, + aggregateId: $record->aggregateId->identityValue(), + aggregateType: $record->aggregateType, + aggregateVersion: $record->aggregateVersion->value + ); + } + + throw DuplicateOutboxEvent::forRecord( + eventId: $record->id->toString(), + previous: $exception + ); + } + } +} diff --git a/src/Internal/StringIdentityColumn.php b/src/Internal/StringIdentityColumn.php index 4118075..4223c2d 100644 --- a/src/Internal/StringIdentityColumn.php +++ b/src/Internal/StringIdentityColumn.php @@ -4,13 +4,24 @@ namespace TinyBlocks\Outbox\Internal; -final readonly class StringIdentityColumn extends IdentityColumn +use TinyBlocks\Outbox\Schema\IdentityColumn; + +final readonly class StringIdentityColumn implements IdentityColumn { + private function __construct(private string $name) + { + } + public static function named(string $name): StringIdentityColumn { return new StringIdentityColumn(name: $name); } + public function name(): string + { + return $this->name; + } + public function convert(mixed $identityValue): string { return (string)$identityValue; diff --git a/src/OutboxRepository.php b/src/OutboxRepository.php index de09e2b..f3763c5 100644 --- a/src/OutboxRepository.php +++ b/src/OutboxRepository.php @@ -5,14 +5,11 @@ namespace TinyBlocks\Outbox; use TinyBlocks\BuildingBlocks\Event\EventRecords; -use TinyBlocks\BuildingBlocks\Event\IntegrationEvent; -use TinyBlocks\BuildingBlocks\Event\IntegrationEventTranslator; use TinyBlocks\Outbox\Exceptions\DuplicateAggregateVersion; use TinyBlocks\Outbox\Exceptions\DuplicateOutboxEvent; use TinyBlocks\Outbox\Exceptions\InvalidPayloadJson; use TinyBlocks\Outbox\Exceptions\OutboxRequiresActiveTransaction; use TinyBlocks\Outbox\Exceptions\PayloadSerializerNotConfigured; -use TinyBlocks\Outbox\Serialization\PayloadSerializer; /** * Producer-side contract: persists outbox records as part of the caller's open transaction. @@ -28,14 +25,14 @@ interface OutboxRepository * *

The input carries domain events from the aggregate's recorded-events buffer. * The implementation filters each record through the registered - * {@see IntegrationEventTranslator} collection: domain events without a matching + * IntegrationEventTranslator collection: domain events without a matching * translator are silently skipped, because the absence of a translator is the canonical * declaration that the event is internal to the bounded context and must not cross its * boundary.

* - *

Matched domain events are translated into {@see IntegrationEvent} envelopes via + *

Matched domain events are translated into IntegrationEvent envelopes via * the Anti-Corruption Layer and only then serialized and persisted. The - * {@see PayloadSerializer} operates on the integration event record, never on the + * PayloadSerializer operates on the integration event record, never on the * domain event directly.

* *

The implementation must not open or commit a transaction. It is the caller's diff --git a/src/Schema/Columns.php b/src/Schema/Columns.php index 874e38d..48d14d0 100644 --- a/src/Schema/Columns.php +++ b/src/Schema/Columns.php @@ -4,9 +4,6 @@ namespace TinyBlocks\Outbox\Schema; -use TinyBlocks\Outbox\Internal\ColumnsBuilder; -use TinyBlocks\Outbox\Internal\IdentityColumn; - final readonly class Columns { private function __construct( @@ -22,26 +19,6 @@ private function __construct( ) { } - /** - * Creates a ColumnsBuilder used to customize the outbox column names. - * - * @return ColumnsBuilder A new builder seeded with the default column names. - */ - public static function builder(): ColumnsBuilder - { - return ColumnsBuilder::create(); - } - - /** - * Creates a Columns instance using the default outbox column names. - * - * @return Columns The default column configuration. - */ - public static function default(): Columns - { - return ColumnsBuilder::create()->build(); - } - /** * Builds a Columns with the explicit column names for every outbox field. * @@ -79,4 +56,24 @@ public static function from( aggregateVersion: $aggregateVersion ); } + + /** + * Creates a ColumnsBuilder used to customize the outbox column names. + * + * @return ColumnsBuilder A new builder seeded with the default column names. + */ + public static function builder(): ColumnsBuilder + { + return ColumnsBuilder::create(); + } + + /** + * Creates a Columns instance using the default outbox column names. + * + * @return Columns The default column configuration. + */ + public static function default(): Columns + { + return ColumnsBuilder::create()->build(); + } } diff --git a/src/Internal/ColumnsBuilder.php b/src/Schema/ColumnsBuilder.php similarity index 54% rename from src/Internal/ColumnsBuilder.php rename to src/Schema/ColumnsBuilder.php index 9394043..b56edf6 100644 --- a/src/Internal/ColumnsBuilder.php +++ b/src/Schema/ColumnsBuilder.php @@ -2,11 +2,11 @@ declare(strict_types=1); -namespace TinyBlocks\Outbox\Internal; - -use TinyBlocks\Outbox\Schema\Columns; -use TinyBlocks\Outbox\Schema\IdentityColumnType; +namespace TinyBlocks\Outbox\Schema; +/** + * Fluent builder for the outbox column configuration. + */ final class ColumnsBuilder { private string $idName = 'id'; @@ -26,79 +26,145 @@ private function __construct() { } + /** + * Creates a ColumnsBuilder seeded with the default column names. + * + * @return ColumnsBuilder A new builder with the default column names. + */ public static function create(): ColumnsBuilder { return new ColumnsBuilder(); } + /** + * Builds a Columns from the configured column names. + * + * @return Columns The built column configuration. + */ + public function build(): Columns + { + return Columns::from( + id: $this->idType->toColumn(name: $this->idName), + payload: $this->payload, + revision: $this->revision, + createdAt: $this->createdAt, + eventType: $this->eventType, + occurredAt: $this->occurredAt, + aggregateId: $this->aggregateIdType->toColumn(name: $this->aggregateIdName), + aggregateType: $this->aggregateType, + aggregateVersion: $this->aggregateVersion + ); + } + + /** + * Sets the event id column name and its identity storage type. + * + * @param string $name The event id column name. + * @param IdentityColumnType $type The identity storage type. + * @return ColumnsBuilder The builder for chaining. + */ + public function withId(string $name, IdentityColumnType $type): ColumnsBuilder + { + $this->idName = $name; + $this->idType = $type; + return $this; + } + + /** + * Sets the event payload column name. + * + * @param string $name The payload column name. + * @return ColumnsBuilder The builder for chaining. + */ public function withPayload(string $name): ColumnsBuilder { $this->payload = $name; return $this; } + /** + * Sets the schema revision column name. + * + * @param string $name The revision column name. + * @return ColumnsBuilder The builder for chaining. + */ public function withRevision(string $name): ColumnsBuilder { $this->revision = $name; return $this; } + /** + * Sets the record creation timestamp column name. + * + * @param string $name The creation timestamp column name. + * @return ColumnsBuilder The builder for chaining. + */ public function withCreatedAt(string $name): ColumnsBuilder { $this->createdAt = $name; return $this; } + /** + * Sets the event type column name. + * + * @param string $name The event type column name. + * @return ColumnsBuilder The builder for chaining. + */ public function withEventType(string $name): ColumnsBuilder { $this->eventType = $name; return $this; } + /** + * Sets the event occurrence timestamp column name. + * + * @param string $name The occurrence timestamp column name. + * @return ColumnsBuilder The builder for chaining. + */ public function withOccurredAt(string $name): ColumnsBuilder { $this->occurredAt = $name; return $this; } - public function withAggregateType(string $name): ColumnsBuilder - { - $this->aggregateType = $name; - return $this; - } - - public function withAggregateVersion(string $name): ColumnsBuilder + /** + * Sets the aggregate id column name and its identity storage type. + * + * @param string $name The aggregate id column name. + * @param IdentityColumnType $type The identity storage type. + * @return ColumnsBuilder The builder for chaining. + */ + public function withAggregateId(string $name, IdentityColumnType $type): ColumnsBuilder { - $this->aggregateVersion = $name; + $this->aggregateIdName = $name; + $this->aggregateIdType = $type; return $this; } - public function withId(string $name, IdentityColumnType $type): ColumnsBuilder + /** + * Sets the aggregate type column name. + * + * @param string $name The aggregate type column name. + * @return ColumnsBuilder The builder for chaining. + */ + public function withAggregateType(string $name): ColumnsBuilder { - $this->idName = $name; - $this->idType = $type; + $this->aggregateType = $name; return $this; } - public function withAggregateId(string $name, IdentityColumnType $type): ColumnsBuilder + /** + * Sets the aggregate version column name. + * + * @param string $name The aggregate version column name. + * @return ColumnsBuilder The builder for chaining. + */ + public function withAggregateVersion(string $name): ColumnsBuilder { - $this->aggregateIdName = $name; - $this->aggregateIdType = $type; + $this->aggregateVersion = $name; return $this; } - - public function build(): Columns - { - return Columns::from( - id: $this->idType->toColumn(name: $this->idName), - payload: $this->payload, - revision: $this->revision, - createdAt: $this->createdAt, - eventType: $this->eventType, - occurredAt: $this->occurredAt, - aggregateId: $this->aggregateIdType->toColumn(name: $this->aggregateIdName), - aggregateType: $this->aggregateType, - aggregateVersion: $this->aggregateVersion - ); - } } diff --git a/src/Schema/IdentityColumn.php b/src/Schema/IdentityColumn.php new file mode 100644 index 0000000..2ed0109 --- /dev/null +++ b/src/Schema/IdentityColumn.php @@ -0,0 +1,26 @@ +build(); - } - /** * Builds a TableLayout with the explicit columns, table name, and unique constraint. * @@ -54,4 +32,24 @@ public static function from( uniqueConstraint: $uniqueConstraint ); } + + /** + * Creates a TableLayoutBuilder used to customize the outbox table layout. + * + * @return TableLayoutBuilder A new builder seeded with the default table layout. + */ + public static function builder(): TableLayoutBuilder + { + return TableLayoutBuilder::create(); + } + + /** + * Creates a TableLayout using the default table name, columns, and unique constraint. + * + * @return TableLayout The default table layout. + */ + public static function default(): TableLayout + { + return TableLayoutBuilder::create()->build(); + } } diff --git a/src/Internal/TableLayoutBuilder.php b/src/Schema/TableLayoutBuilder.php similarity index 53% rename from src/Internal/TableLayoutBuilder.php rename to src/Schema/TableLayoutBuilder.php index a021d28..3c91ad1 100644 --- a/src/Internal/TableLayoutBuilder.php +++ b/src/Schema/TableLayoutBuilder.php @@ -2,12 +2,11 @@ declare(strict_types=1); -namespace TinyBlocks\Outbox\Internal; - -use TinyBlocks\Outbox\Schema\Columns; -use TinyBlocks\Outbox\Schema\TableLayout; -use TinyBlocks\Outbox\Schema\UniqueConstraint; +namespace TinyBlocks\Outbox\Schema; +/** + * Fluent builder for the outbox table layout. + */ final class TableLayoutBuilder { private Columns $columns; @@ -20,35 +19,63 @@ private function __construct() $this->uniqueConstraint = UniqueConstraint::default(); } + /** + * Creates a TableLayoutBuilder seeded with the default table layout. + * + * @return TableLayoutBuilder A new builder with the default table layout. + */ public static function create(): TableLayoutBuilder { return new TableLayoutBuilder(); } + /** + * Builds a TableLayout from the configured columns, table name, and unique constraint. + * + * @return TableLayout The built table layout. + */ + public function build(): TableLayout + { + return TableLayout::from( + columns: $this->columns, + tableName: $this->tableName, + uniqueConstraint: $this->uniqueConstraint + ); + } + + /** + * Sets the column configuration. + * + * @param Columns $columns The column configuration to apply. + * @return TableLayoutBuilder The builder for chaining. + */ public function withColumns(Columns $columns): TableLayoutBuilder { $this->columns = $columns; return $this; } + /** + * Sets the outbox table name. + * + * @param string $tableName The physical outbox table name. + * @return TableLayoutBuilder The builder for chaining. + */ public function withTableName(string $tableName): TableLayoutBuilder { $this->tableName = $tableName; return $this; } + /** + * Sets the unique constraint name used to detect duplicate aggregate versions. + * + * @param string $name The unique constraint name. + * @return TableLayoutBuilder The builder for chaining. + */ public function withUniqueConstraint(string $name): TableLayoutBuilder { $this->uniqueConstraint = UniqueConstraint::named(name: $name); return $this; } - - public function build(): TableLayout - { - return TableLayout::from( - columns: $this->columns, - tableName: $this->tableName, - uniqueConstraint: $this->uniqueConstraint - ); - } } diff --git a/tests/Integration/Database.php b/tests/Integration/Database.php index 7159e09..922afc0 100644 --- a/tests/Integration/Database.php +++ b/tests/Integration/Database.php @@ -17,8 +17,8 @@ private function __construct( private string $host, private string $port, private string $database, - private string $username, - private string $password + private string $password, + private string $username ) { } @@ -34,8 +34,8 @@ public static function instance(): Database host: $host, port: $port, database: $database, - username: $username, - password: $password + password: $password, + username: $username ); } @@ -43,25 +43,23 @@ public function start(): void { try { $this->connection()->executeQuery('SELECT 1'); - return; } catch (Exception) { - } - - new Process(['docker', 'rm', '-f', sprintf('tiny-blocks-reaper-%s', $this->host)])->run(); - new Process(['docker', 'rm', '-f', $this->host])->run(); + new Process(['docker', 'rm', '-f', sprintf('tiny-blocks-reaper-%s', $this->host)])->run(); + new Process(['docker', 'rm', '-f', $this->host])->run(); - MySQLDockerContainer::from(image: 'mysql:8.4', name: $this->host) - ->pullImage() - ->withTimezone(timezone: 'UTC') - ->withUsername(user: $this->username) - ->withPassword(password: $this->password) - ->withDatabase(database: $this->database) - ->withPortMapping(portOnHost: (int)$this->port, portOnContainer: 3306) - ->withRootPassword(rootPassword: 'root') - ->withGrantedHosts() - ->withoutAutoRemove() - ->withReadinessTimeout(timeoutInSeconds: 60) - ->run(); + MySQLDockerContainer::from(image: 'mysql:8.4', name: $this->host) + ->pullImage() + ->withTimezone(timezone: 'UTC') + ->withUsername(user: $this->username) + ->withPassword(password: $this->password) + ->withDatabase(database: $this->database) + ->withPortMapping(portOnHost: (int)$this->port, portOnContainer: 3306) + ->withRootPassword(rootPassword: 'root') + ->withGrantedHosts() + ->withoutAutoRemove() + ->withReadinessTimeout(timeoutInSeconds: 60) + ->run(); + } } public function connection(): Connection diff --git a/tests/Integration/DoctrineOutboxRepositoryTest.php b/tests/Integration/DoctrineOutboxRepositoryTest.php index 0e225e2..c370174 100644 --- a/tests/Integration/DoctrineOutboxRepositoryTest.php +++ b/tests/Integration/DoctrineOutboxRepositoryTest.php @@ -43,75 +43,24 @@ protected function setUp(): void ); } - public function testPushWhenNoTransactionThenOutboxRequiresActiveTransaction(): void + public function testPushWhenTwoRecordsThenBothPersisted(): void { /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) - ); - - /** @And a record to push */ - $records = EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ) - ]); - - /** @Then an exception requiring an active transaction is thrown */ - $this->expectException(OutboxRequiresActiveTransaction::class); - $this->expectExceptionMessage('push() must be called within an active transaction.'); - - /** @When pushing without an active transaction */ - $repository->push(records: $records); - } - - public function testPushWhenMultipleSerializersAndFirstMatchesThenFirstIsUsed(): void - { - /** @Given a repository with a translator and two serializers supporting the same integration event */ - $repository = new DoctrineOutboxRepository( - connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [ - new OrderPlacedSerializer(), - new FallbackOrderPlacedSerializer() - ]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing an order placed event */ + /** @When pushing two records */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order' - ) - ])); - - /** @And the transaction is committed */ - self::$connection->commit(); - - /** @Then the payload is from the first serializer, not the fallback */ - self::assertSame('{}', self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1')); - } - - public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEncoded(): void - { - /** @Given a repository with a translator and a reflection payload serializer */ - $repository = new DoctrineOutboxRepository( - connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) - ); - - /** @And the connection has an active transaction */ - self::$connection->beginTransaction(); - - /** @When pushing a record */ - $repository->push(records: EventRecords::createFrom(elements: [ + ), EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order' @@ -121,8 +70,8 @@ public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEnc /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the payload reflects the integration event's public properties */ - self::assertSame('[]', self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1')); + /** @Then exactly two records are stored */ + self::assertSame(2, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); } public function testPushWhenCallerRollsBackThenNoRecordPersisted(): void @@ -130,8 +79,8 @@ public function testPushWhenCallerRollsBackThenNoRecordPersisted(): void /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the caller opens a transaction */ @@ -152,53 +101,22 @@ public function testPushWhenCallerRollsBackThenNoRecordPersisted(): void self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); } - public function testPushWhenTwoRecordsThenBothPersisted(): void + public function testPushWhenKnownIdThenPersistedIdMatchesOriginal(): void { /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); - /** @And the connection has an active transaction */ - self::$connection->beginTransaction(); - - /** @When pushing two records */ - $repository->push(records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ), - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ) - ])); - - /** @And the transaction is committed */ - self::$connection->commit(); - - /** @Then exactly two records are stored */ - self::assertSame(2, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); - } - - public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void - { - /** @Given a UUID whose bytes include a leading null byte */ - $recordId = Uuid::fromBytes(bytes: "\x00\x11\x22\x33\x44\x55\x67\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff"); - - /** @And a repository */ - $repository = new DoctrineOutboxRepository( - connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) - ); + /** @And a record with a known id */ + $recordId = Uuid::uuid4(); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing a record with the null-byte UUID */ + /** @When pushing the record */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), @@ -210,24 +128,24 @@ public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the retrieved bytes are identical to the original UUID bytes */ - $row = self::$connection->fetchAssociative('SELECT id FROM outbox_events LIMIT 1'); + /** @Then the persisted id bytes match the original record id */ + $row = self::$connection->fetchAssociative( + 'SELECT id FROM outbox_events WHERE id = UUID_TO_BIN(?)', + [$recordId->toString()] + ); self::assertSame($recordId->getBytes(), $row['id']); } - public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson(): void + public function testPushWhenDuplicateEventIdThenDuplicateOutboxEvent(): void { - /** @Given a repository with a translator and a serializer that produces invalid JSON */ + /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new InvalidPayloadSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); - /** @And the connection has an active transaction */ - self::$connection->beginTransaction(); - - /** @And a record to push */ + /** @And a record with a fixed id */ $records = EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), @@ -235,150 +153,206 @@ public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson() ) ]); - /** @Then an exception indicating invalid JSON payload is thrown */ - $this->expectException(InvalidPayloadJson::class); - $this->expectExceptionMessage('Payload is not valid JSON .'); + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); - /** @When pushing the record */ + /** @And the record is pushed once */ + $repository->push(records: $records); + + /** @Then an exception indicating a duplicate event is thrown */ + $this->expectException(DuplicateOutboxEvent::class); + + /** @When pushing the same record again */ $repository->push(records: $records); } - public function testPushWhenMultipleSerializersAndSecondMatchesThenCorrectSerializerIsUsed(): void + public function testPushWhenEventRecordsIsEmptyThenNoInsertIsExecuted(): void { - /** @Given a repository with translators for both event types and matching serializers */ + /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [ - new OrderPlacedTranslator(), - new RefundIssuedTranslator() - ]), - serializers: PayloadSerializers::createFrom(elements: [ - new OrderPlacedSerializer(), - new RefundIssuedSerializer() - ]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing a refund event */ - $repository->push(records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new RefundIssued(), - aggregateType: 'Refund' - ) - ])); + /** @When pushing an empty EventRecords collection */ + $repository->push(records: EventRecords::createFromEmpty()); /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the payload is from the refund serializer */ - self::assertSame( - '{"type": "refund"}', - self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1') - ); + /** @Then no records are persisted */ + self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); } - public function testPushWhenSerializerDoesNotSupportIntegrationEventThenPayloadSerializerNotConfigured(): void + public function testPushWhenNullTableLayoutThenSqlUsesDefaultTableName(): void { - /** @Given a repository with a refund translator but only an order serializer */ - $repository = new DoctrineOutboxRepository( - connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new RefundIssuedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) - ); + /** @Given a mocked connection with an active transaction */ + $connection = $this->createMock(Connection::class); - /** @And the connection has an active transaction */ - self::$connection->beginTransaction(); + /** @And the connection reports an active transaction */ + $connection->method('isTransactionActive')->willReturn(true); - /** @And a refund event that the order serializer does not support */ - $records = EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new RefundIssued(), - aggregateType: 'Refund' - ) - ]); + /** @Then the SQL statement targets the default table name */ + $connection->expects(self::once()) + ->method('executeStatement') + ->with(self::stringContains('outbox_events')) + ->willReturn(1); - /** @Then an exception indicating no serializer is configured is thrown */ - $this->expectException(PayloadSerializerNotConfigured::class); - $this->expectExceptionMessage( - 'No payload serializer configured for event class .' + /** @When pushing a record using the default table layout */ + new DoctrineOutboxRepository( + connection: $connection, + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) + )->push( + records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' + ) + ]) ); - - /** @When pushing the unsupported event */ - $repository->push(records: $records); } - public function testPushWhenDuplicateEventIdThenDuplicateOutboxEvent(): void + public function testPushWhenStringIdentityTypeStoredThenIdIsUuidString(): void { - /** @Given a repository */ + /** @Given a layout with STRING identity columns */ + $tableLayout = TableLayout::builder() + ->withTableName(tableName: 'string_outbox') + ->withColumns( + columns: Columns::builder() + ->withId(name: 'id', type: IdentityColumnType::STRING) + ->withAggregateId(name: 'aggregate_id', type: IdentityColumnType::STRING) + ->build() + ) + ->build(); + + /** @And any pre-existing string outbox table is dropped */ + self::$connection->executeStatement('DROP TABLE IF EXISTS string_outbox'); + + /** @And the string outbox table is registered for cleanup */ + self::registerTableForCleanup(tableName: 'string_outbox'); + + /** @And the string outbox table is created */ + OutboxTableFactory::createWithStringIdentities( + connection: self::$connection, + tableLayout: $tableLayout + ); + + /** @And a repository using the string layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + tableLayout: $tableLayout ); - /** @And a record with a fixed id */ - $records = EventRecords::createFrom(elements: [ + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @When pushing a record */ + $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order' ) - ]); + ])); - /** @And the connection has an active transaction */ - self::$connection->beginTransaction(); + /** @And the transaction is committed */ + self::$connection->commit(); - /** @And the record is pushed once */ - $repository->push(records: $records); + /** @Then the id is stored as a 36-character UUID string */ + $row = self::$connection->fetchAssociative('SELECT id FROM string_outbox LIMIT 1'); + self::assertSame(36, strlen($row['id'])); + } - /** @Then an exception indicating a duplicate event is thrown */ - $this->expectException(DuplicateOutboxEvent::class); + public function testPushWhenCustomTableLayoutThenSqlUsesCustomTableName(): void + { + /** @Given a mocked connection with an active transaction */ + $connection = $this->createMock(Connection::class); - /** @When pushing the same record again */ - $repository->push(records: $records); + /** @And the connection reports an active transaction */ + $connection->method('isTransactionActive')->willReturn(true); + + /** @And a custom table layout with a distinct table name */ + $tableLayout = TableLayout::builder()->withTableName(tableName: 'custom_outbox')->build(); + + /** @Then the SQL statement targets the custom table name */ + $connection->expects(self::once()) + ->method('executeStatement') + ->with(self::stringContains('custom_outbox')) + ->willReturn(1); + + /** @When pushing a record using the custom table layout */ + new DoctrineOutboxRepository( + connection: $connection, + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), + tableLayout: $tableLayout + )->push( + records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' + ) + ]) + ); } - public function testPushWhenDuplicateAggregateVersionThenDuplicateAggregateVersion(): void + public function testPushWhenSingleRecordThenAllFieldsPersistedCorrectly(): void { - /** @Given a repository */ + /** @Given a repository with a translator and an order placed serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); - /** @And a fixed aggregate identity and aggregate version */ - $aggregateId = Uuid::uuid4()->toString(); - /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @And a first record is pushed with that aggregate and aggregate version */ + /** @When pushing the record */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order', - aggregateId: $aggregateId, - aggregateVersion: AggregateVersion::of(value: 1) + occurredAt: Instant::fromString(value: '2024-06-01 12:00:00.000000'), + aggregateVersion: AggregateVersion::of(value: 3) ) ])); - /** @Then an exception indicating a duplicate aggregate version is thrown */ - $this->expectException(DuplicateAggregateVersion::class); - $this->expectExceptionMessage( - sprintf('Duplicate aggregate version for at aggregate version <1>.', $aggregateId) - ); + /** @And the transaction is committed */ + self::$connection->commit(); - /** @When pushing a second record with the same aggregate and aggregate version but a different id */ - $repository->push(records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order', - aggregateId: $aggregateId, - aggregateVersion: AggregateVersion::of(value: 1) - ) - ])); + /** @Then the row is retrievable from the database */ + $row = self::$connection->fetchAssociative('SELECT * FROM outbox_events LIMIT 1'); + + /** @And the id is stored as 16-byte binary */ + self::assertSame(16, strlen($row['id'])); + + /** @And the aggregate_id is stored as 16-byte binary */ + self::assertSame(16, strlen($row['aggregate_id'])); + + /** @And the event_type reflects the integration event class */ + self::assertSame('OrderShipped', $row['event_type']); + + /** @And the revision is correct */ + self::assertSame(1, (int)$row['revision']); + + /** @And the aggregate_version is correct */ + self::assertSame(3, (int)$row['aggregate_version']); + + /** @And the payload matches the serializer output */ + self::assertSame('{}', $row['payload']); + + /** @And the aggregate_type is correct */ + self::assertSame('Order', $row['aggregate_type']); + + /** @And the occurred_at is stored */ + self::assertStringStartsWith('2024-06-01', $row['occurred_at']); } public function testPushWhenCustomTableNameThenRecordStoredInCustomTable(): void @@ -403,8 +377,8 @@ public function testPushWhenCustomTableNameThenRecordStoredInCustomTable(): void /** @And a repository using the custom layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), tableLayout: $tableLayout ); @@ -429,43 +403,79 @@ public function testPushWhenCustomTableNameThenRecordStoredInCustomTable(): void self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); } - public function testPushWhenStringIdentityTypeStoredThenIdIsUuidString(): void + public function testPushWhenUuidWithNullBytesThenBytesPreservedInStorage(): void { - /** @Given a layout with STRING identity columns */ - $tableLayout = TableLayout::builder() - ->withTableName(tableName: 'string_outbox') - ->withColumns( - columns: Columns::builder() - ->withId(name: 'id', type: IdentityColumnType::STRING) - ->withAggregateId(name: 'aggregate_id', type: IdentityColumnType::STRING) - ->build() + /** @Given a UUID whose bytes include a leading null byte */ + $recordId = Uuid::fromBytes(bytes: "\x00\x11\x22\x33\x44\x55\x67\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff"); + + /** @And a repository */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @When pushing a record with the null-byte UUID */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + id: $recordId ) - ->build(); + ])); - /** @And any pre-existing string outbox table is dropped */ - self::$connection->executeStatement('DROP TABLE IF EXISTS string_outbox'); + /** @And the transaction is committed */ + self::$connection->commit(); - /** @And the string outbox table is registered for cleanup */ - self::registerTableForCleanup(tableName: 'string_outbox'); + /** @Then the retrieved bytes are identical to the original UUID bytes */ + $row = self::$connection->fetchAssociative('SELECT id FROM outbox_events LIMIT 1'); + self::assertSame($recordId->getBytes(), $row['id']); + } - /** @And the string outbox table is created */ - OutboxTableFactory::createWithStringIdentities( + public function testPushWhenNoTransactionThenOutboxRequiresActiveTransaction(): void + { + /** @Given a repository */ + $repository = new DoctrineOutboxRepository( connection: self::$connection, - tableLayout: $tableLayout + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); - /** @And a repository using the string layout */ + /** @And a record to push */ + $records = EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' + ) + ]); + + /** @Then an exception requiring an active transaction is thrown */ + $this->expectException(OutboxRequiresActiveTransaction::class); + $this->expectExceptionMessage('push() must be called within an active transaction.'); + + /** @When pushing without an active transaction */ + $repository->push(records: $records); + } + + public function testPushWhenMultipleSerializersAndFirstMatchesThenFirstIsUsed(): void + { + /** @Given a repository with a translator and two serializers supporting the same integration event */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), - tableLayout: $tableLayout + serializers: PayloadSerializers::createFrom(elements: [ + new OrderPlacedSerializer(), + new FallbackOrderPlacedSerializer() + ]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing a record */ + /** @When pushing an order placed event */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), @@ -476,152 +486,190 @@ public function testPushWhenStringIdentityTypeStoredThenIdIsUuidString(): void /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the id is stored as a 36-character UUID string */ - $row = self::$connection->fetchAssociative('SELECT id FROM string_outbox LIMIT 1'); - self::assertSame(36, strlen($row['id'])); + /** @Then the payload is from the first serializer, not the fallback */ + self::assertSame('{}', self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1')); } - public function testPushWhenNonUuidAggregateIdWithStringTypeThenStoredAsOriginalString(): void + public function testPushWhenNoTranslatorsRegisteredThenRecordIsSilentlySkipped(): void { - /** @Given a layout with STRING identity columns */ - $tableLayout = TableLayout::builder() - ->withTableName(tableName: 'string_outbox') - ->withColumns( - columns: Columns::builder() - ->withId(name: 'id', type: IdentityColumnType::STRING) - ->withAggregateId(name: 'aggregate_id', type: IdentityColumnType::STRING) - ->build() + /** @Given a repository with no translators and a reflection serializer */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]), + translators: IntegrationEventTranslators::createFromEmpty() + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @When pushing an order placed record */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' + ) + ])); + + /** @And the transaction is committed */ + self::$connection->commit(); + + /** @Then no records are persisted */ + self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); + } + + public function testPushWhenSerializerReturnsInvalidJsonThenInvalidPayloadJson(): void + { + /** @Given a repository with a translator and a serializer that produces invalid JSON */ + $repository = new DoctrineOutboxRepository( + connection: self::$connection, + serializers: PayloadSerializers::createFrom(elements: [new InvalidPayloadSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) + ); + + /** @And the connection has an active transaction */ + self::$connection->beginTransaction(); + + /** @And a record to push */ + $records = EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' ) + ]); + + /** @Then an exception indicating invalid JSON payload is thrown */ + $this->expectException(InvalidPayloadJson::class); + $this->expectExceptionMessage('Payload is not valid JSON .'); + + /** @When pushing the record */ + $repository->push(records: $records); + } + + public function testPushWhenConstraintNameIsCustomThenDuplicateAggregateVersion(): void + { + /** @Given a custom table layout with a distinct unique constraint name */ + $tableLayout = TableLayout::builder() + ->withTableName(tableName: 'custom_constraint_outbox') + ->withUniqueConstraint(name: 'unq_custom_outbox_aggregate_version') ->build(); - /** @And any pre-existing string outbox table is dropped */ - self::$connection->executeStatement('DROP TABLE IF EXISTS string_outbox'); + /** @And any pre-existing table is dropped */ + self::$connection->executeStatement('DROP TABLE IF EXISTS custom_constraint_outbox'); - /** @And the string outbox table is registered for cleanup */ - self::registerTableForCleanup(tableName: 'string_outbox'); + /** @And the table is registered for cleanup */ + self::registerTableForCleanup(tableName: 'custom_constraint_outbox'); - /** @And the string outbox table is created */ - OutboxTableFactory::createWithStringIdentities( + /** @And the table is created with the custom constraint name */ + OutboxTableFactory::createWithBinaryIdentities( connection: self::$connection, tableLayout: $tableLayout ); - /** @And a repository using the string layout */ + /** @And a repository using the custom table layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), tableLayout: $tableLayout ); - /** @And a non-UUID aggregate identity */ - $aggregateId = 'ord-1'; - + /** @And a fixed aggregate identity */ + $aggregateId = Uuid::uuid4()->toString(); + /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing a record with a non-UUID aggregate id */ + /** @And a first record is pushed with that aggregate and aggregate version */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order', - aggregateId: $aggregateId + aggregateId: $aggregateId, + aggregateVersion: AggregateVersion::of(value: 1) ) ])); - /** @And the transaction is committed */ - self::$connection->commit(); + /** @Then an exception indicating a duplicate aggregate version is thrown */ + $this->expectException(DuplicateAggregateVersion::class); - /** @Then the aggregate_id is stored as the original string */ - self::assertSame($aggregateId, self::$connection->fetchOne('SELECT aggregate_id FROM string_outbox LIMIT 1')); + /** @When pushing a second record with the same aggregate and aggregate version */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + aggregateId: $aggregateId, + aggregateVersion: AggregateVersion::of(value: 1) + ) + ])); } - public function testPushWhenSingleRecordThenAllFieldsPersistedCorrectly(): void + public function testPushWhenOnlyOrderTranslatorRegisteredThenRefundEventIsSkipped(): void { - /** @Given a repository with a translator and an order placed serializer */ + /** @Given a repository with only an order translator and both serializers */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [ + new OrderPlacedSerializer(), + new RefundIssuedSerializer() + ]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing the record */ + /** @When pushing one order placed and one refund issued record */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order', - occurredAt: Instant::fromString(value: '2024-06-01 12:00:00.000000'), - aggregateVersion: AggregateVersion::of(value: 3) + aggregateVersion: AggregateVersion::of(value: 1) + ), + EventRecordFactory::create( + event: new RefundIssued(), + aggregateType: 'Refund', + aggregateVersion: AggregateVersion::of(value: 1) ) ])); /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the row is retrievable from the database */ - $row = self::$connection->fetchAssociative('SELECT * FROM outbox_events LIMIT 1'); - - /** @And the id is stored as 16-byte binary */ - self::assertSame(16, strlen($row['id'])); - - /** @And the aggregate_id is stored as 16-byte binary */ - self::assertSame(16, strlen($row['aggregate_id'])); - - /** @And the event_type reflects the integration event class */ - self::assertSame('OrderShipped', $row['event_type']); - - /** @And the revision is correct */ - self::assertSame(1, (int)$row['revision']); - - /** @And the aggregate_version is correct */ - self::assertSame(3, (int)$row['aggregate_version']); - - /** @And the payload matches the serializer output */ - self::assertSame('{}', $row['payload']); - - /** @And the aggregate_type is correct */ - self::assertSame('Order', $row['aggregate_type']); + /** @Then exactly one row is persisted */ + self::assertSame(1, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); - /** @And the occurred_at is stored */ - self::assertStringStartsWith('2024-06-01', $row['occurred_at']); + /** @And the persisted event_type is the order integration event */ + self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1')); } - public function testPushWhenKnownIdThenPersistedIdMatchesOriginal(): void + public function testPushWhenTwoTranslatorsSupportSameEventThenFirstTranslatorWins(): void { - /** @Given a repository */ + /** @Given a repository with two translators both supporting order placed, and a matching serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [ + new OrderPlacedTranslator(), + new DuplicateOrderPlacedTranslator() + ]) ); - /** @And a record with a known id */ - $recordId = Uuid::uuid4(); - /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing the record */ + /** @When pushing an order placed record */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), - aggregateType: 'Order', - id: $recordId + aggregateType: 'Order' ) ])); /** @And the transaction is committed */ self::$connection->commit(); - /** @Then the persisted id bytes match the original record id */ - $row = self::$connection->fetchAssociative( - 'SELECT id FROM outbox_events WHERE id = UUID_TO_BIN(?)', - [$recordId->toString()] - ); - self::assertSame($recordId->getBytes(), $row['id']); + /** @Then the persisted event_type reflects the first translator's output */ + self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1')); } public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColumns(): void @@ -659,8 +707,8 @@ public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColum /** @And a repository using this layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), tableLayout: $tableLayout ); @@ -710,113 +758,73 @@ public function testPushWhenAllColumnNamesAreCustomThenRecordStoredInCustomColum self::assertNotNull($row['event_created_at']); } - public function testPushWhenEventRecordsIsEmptyThenNoInsertIsExecuted(): void + public function testPushWhenDuplicateAggregateVersionThenDuplicateAggregateVersion(): void { /** @Given a repository */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); + /** @And a fixed aggregate identity and aggregate version */ + $aggregateId = Uuid::uuid4()->toString(); + /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing an empty EventRecords collection */ - $repository->push(records: EventRecords::createFromEmpty()); + /** @And a first record is pushed with that aggregate and aggregate version */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + aggregateId: $aggregateId, + aggregateVersion: AggregateVersion::of(value: 1) + ) + ])); - /** @And the transaction is committed */ - self::$connection->commit(); + /** @Then an exception indicating a duplicate aggregate version is thrown */ + $this->expectException(DuplicateAggregateVersion::class); + $this->expectExceptionMessage( + sprintf('Duplicate aggregate version for at aggregate version <1>.', $aggregateId) + ); - /** @Then no records are persisted */ - self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); + /** @When pushing a second record with the same aggregate and aggregate version but a different id */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + aggregateId: $aggregateId, + aggregateVersion: AggregateVersion::of(value: 1) + ) + ])); } - public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersistedCorrectly(): void + public function testPushWhenReflectionPayloadSerializerThenEventPropertiesAreEncoded(): void { /** @Given a repository with a translator and a reflection payload serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) + serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing the aggregate's recorded events */ - $repository->push(records: Order::place(orderId: Uuid::uuid4()->toString())->pullEvents()); + /** @When pushing a record */ + $repository->push(records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' + ) + ])); /** @And the transaction is committed */ self::$connection->commit(); - /** @Then one outbox record is persisted */ - self::assertSame(1, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); - - /** @And the aggregate type is Order */ - self::assertSame('Order', self::$connection->fetchOne('SELECT aggregate_type FROM outbox_events LIMIT 1')); - } - - public function testPushWhenNullTableLayoutThenSqlUsesDefaultTableName(): void - { - /** @Given a mocked connection with an active transaction */ - $connection = $this->createMock(Connection::class); - - /** @And the connection reports an active transaction */ - $connection->method('isTransactionActive')->willReturn(true); - - /** @Then the SQL statement targets the default table name */ - $connection->expects(self::once()) - ->method('executeStatement') - ->with(self::stringContains('outbox_events')) - ->willReturn(1); - - /** @When pushing a record using the default table layout */ - new DoctrineOutboxRepository( - connection: $connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) - )->push( - records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ) - ]) - ); - } - - public function testPushWhenCustomTableLayoutThenSqlUsesCustomTableName(): void - { - /** @Given a mocked connection with an active transaction */ - $connection = $this->createMock(Connection::class); - - /** @And the connection reports an active transaction */ - $connection->method('isTransactionActive')->willReturn(true); - - /** @And a custom table layout with a distinct table name */ - $tableLayout = TableLayout::builder()->withTableName(tableName: 'custom_outbox')->build(); - - /** @Then the SQL statement targets the custom table name */ - $connection->expects(self::once()) - ->method('executeStatement') - ->with(self::stringContains('custom_outbox')) - ->willReturn(1); - - /** @When pushing a record using the custom table layout */ - new DoctrineOutboxRepository( - connection: $connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), - tableLayout: $tableLayout - )->push( - records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ) - ]) - ); + /** @Then the payload reflects the integration event's public properties */ + self::assertSame('[]', self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1')); } public function testPushWhenRecordHasAllFieldsThenExecuteStatementReceivesAllBindings(): void @@ -853,8 +861,8 @@ function (string $sql, array $params) use (&$capturedParameters): int { /** @When pushing a record with all deterministic fields */ new DoctrineOutboxRepository( connection: $connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), tableLayout: $tableLayout )->push( records: EventRecords::createFrom(elements: [ @@ -878,46 +886,9 @@ function (string $sql, array $params) use (&$capturedParameters): int { 'revision' => 1, 'aggregateVersion' => 1, 'payload' => '{}', - 'occurredAt' => '2021-01-01T00:00:00+00:00' - ], - $capturedParameters - ); - } - - public function testPushWhenUniqueConstraintOnAggregateVersionThenDuplicateAggregateVersionIsThrown(): void - { - /** @Given a mocked connection with an active transaction */ - $connection = self::createConfiguredStub(Connection::class, ['isTransactionActive' => true]); - - /** @And the connection raises an aggregate version constraint violation */ - $connection->method('executeStatement')->willThrowException( - new UniqueConstraintViolationException( - new DriverExceptionStub( - 'Duplicate entry for key unq_outbox_events_aggregate_type_aggregate_id_aggregate_version' - ), - null - ) - ); - - /** @Then a duplicate aggregate version exception is thrown */ - $this->expectException(DuplicateAggregateVersion::class); - $this->expectExceptionMessage( - 'Duplicate aggregate version for at aggregate version <1>.' - ); - - /** @When pushing the record */ - new DoctrineOutboxRepository( - connection: $connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) - )->push( - records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order', - aggregateId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8' - ) - ]) + 'occurredAt' => '2021-01-01T00:00:00+00:00' + ], + $capturedParameters ); } @@ -941,45 +912,8 @@ public function testPushWhenUniqueConstraintOnEventIdThenDuplicateOutboxEventIsT /** @When pushing the record */ new DoctrineOutboxRepository( connection: $connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) - )->push( - records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ) - ]) - ); - } - - public function testPushWhenUniqueConstraintWithCustomNameThenDuplicateAggregateVersionIsThrown(): void - { - /** @Given a mocked connection with an active transaction */ - $connection = self::createConfiguredStub(Connection::class, ['isTransactionActive' => true]); - - /** @And a custom table layout with a distinct unique constraint name */ - $tableLayout = TableLayout::builder() - ->withUniqueConstraint(name: 'unq_custom_outbox_aggregate_version') - ->build(); - - /** @And the connection raises a violation on the custom constraint name */ - $connection->method('executeStatement')->willThrowException( - new UniqueConstraintViolationException( - new DriverExceptionStub('Duplicate entry for key unq_custom_outbox_aggregate_version'), - null - ) - ); - - /** @Then a duplicate aggregate version exception is thrown */ - $this->expectException(DuplicateAggregateVersion::class); - - /** @When pushing a record with the custom table layout */ - new DoctrineOutboxRepository( - connection: $connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), - tableLayout: $tableLayout + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) )->push( records: EventRecords::createFrom(elements: [ EventRecordFactory::create( @@ -990,187 +924,253 @@ public function testPushWhenUniqueConstraintWithCustomNameThenDuplicateAggregate ); } - public function testPushWhenConstraintNameIsCustomThenDuplicateAggregateVersion(): void + public function testPushWhenNonUuidAggregateIdWithStringTypeThenStoredAsOriginalString(): void { - /** @Given a custom table layout with a distinct unique constraint name */ + /** @Given a layout with STRING identity columns */ $tableLayout = TableLayout::builder() - ->withTableName(tableName: 'custom_constraint_outbox') - ->withUniqueConstraint(name: 'unq_custom_outbox_aggregate_version') + ->withTableName(tableName: 'string_outbox') + ->withColumns( + columns: Columns::builder() + ->withId(name: 'id', type: IdentityColumnType::STRING) + ->withAggregateId(name: 'aggregate_id', type: IdentityColumnType::STRING) + ->build() + ) ->build(); - /** @And any pre-existing table is dropped */ - self::$connection->executeStatement('DROP TABLE IF EXISTS custom_constraint_outbox'); + /** @And any pre-existing string outbox table is dropped */ + self::$connection->executeStatement('DROP TABLE IF EXISTS string_outbox'); - /** @And the table is registered for cleanup */ - self::registerTableForCleanup(tableName: 'custom_constraint_outbox'); + /** @And the string outbox table is registered for cleanup */ + self::registerTableForCleanup(tableName: 'string_outbox'); - /** @And the table is created with the custom constraint name */ - OutboxTableFactory::createWithBinaryIdentities( + /** @And the string outbox table is created */ + OutboxTableFactory::createWithStringIdentities( connection: self::$connection, tableLayout: $tableLayout ); - /** @And a repository using the custom table layout */ + /** @And a repository using the string layout */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), tableLayout: $tableLayout ); - /** @And a fixed aggregate identity */ - $aggregateId = Uuid::uuid4()->toString(); + /** @And a non-UUID aggregate identity */ + $aggregateId = 'ord-1'; /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @And a first record is pushed with that aggregate and aggregate version */ + /** @When pushing a record with a non-UUID aggregate id */ $repository->push(records: EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order', - aggregateId: $aggregateId, - aggregateVersion: AggregateVersion::of(value: 1) + aggregateId: $aggregateId ) ])); - /** @Then an exception indicating a duplicate aggregate version is thrown */ - $this->expectException(DuplicateAggregateVersion::class); + /** @And the transaction is committed */ + self::$connection->commit(); - /** @When pushing a second record with the same aggregate and aggregate version */ - $repository->push(records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order', - aggregateId: $aggregateId, - aggregateVersion: AggregateVersion::of(value: 1) - ) - ])); + /** @Then the aggregate_id is stored as the original string */ + self::assertSame($aggregateId, self::$connection->fetchOne('SELECT aggregate_id FROM string_outbox LIMIT 1')); } - public function testPushWhenNoTranslatorsRegisteredThenRecordIsSilentlySkipped(): void + public function testPushWhenRealEventualAggregateRootThenEventRecordIsPersistedCorrectly(): void { - /** @Given a repository with no translators and a reflection serializer */ + /** @Given a repository with a translator and a reflection payload serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFromEmpty(), - serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]) + serializers: PayloadSerializers::createFrom(elements: [new PayloadSerializerReflection()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing an order placed record */ - $repository->push(records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' - ) - ])); + /** @When pushing the aggregate's recorded events */ + $repository->push(records: Order::place(orderId: Uuid::uuid4()->toString())->pullEvents()); /** @And the transaction is committed */ self::$connection->commit(); - /** @Then no records are persisted */ - self::assertSame(0, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); + /** @Then one outbox record is persisted */ + self::assertSame(1, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); + + /** @And the aggregate type is Order */ + self::assertSame('Order', self::$connection->fetchOne('SELECT aggregate_type FROM outbox_events LIMIT 1')); } - public function testPushWhenOnlyOrderTranslatorRegisteredThenRefundEventIsSkipped(): void + public function testPushWhenMultipleSerializersAndSecondMatchesThenCorrectSerializerIsUsed(): void { - /** @Given a repository with only an order translator and both serializers */ + /** @Given a repository with translators for both event types and matching serializers */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), serializers: PayloadSerializers::createFrom(elements: [ new OrderPlacedSerializer(), new RefundIssuedSerializer() + ]), + translators: IntegrationEventTranslators::createFrom(elements: [ + new OrderPlacedTranslator(), + new RefundIssuedTranslator() ]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing one order placed and one refund issued record */ + /** @When pushing a refund event */ $repository->push(records: EventRecords::createFrom(elements: [ - EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order', - aggregateVersion: AggregateVersion::of(value: 1) - ), EventRecordFactory::create( event: new RefundIssued(), - aggregateType: 'Refund', - aggregateVersion: AggregateVersion::of(value: 1) + aggregateType: 'Refund' ) ])); /** @And the transaction is committed */ self::$connection->commit(); - /** @Then exactly one row is persisted */ - self::assertSame(1, (int)self::$connection->fetchOne('SELECT COUNT(*) FROM outbox_events')); - - /** @And the persisted event_type is the order integration event */ - self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1')); + /** @Then the payload is from the refund serializer */ + self::assertSame( + '{"type": "refund"}', + self::$connection->fetchOne('SELECT payload FROM outbox_events LIMIT 1') + ); } - public function testPushWhenTwoTranslatorsSupportSameEventThenFirstTranslatorWins(): void + public function testPushWhenTranslatorMatchesButNoSerializerThenPayloadSerializerNotConfigured(): void { - /** @Given a repository with two translators both supporting order placed, and a matching serializer */ + /** @Given a repository with a translator but no serializer that matches the produced integration event */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [ - new OrderPlacedTranslator(), - new DuplicateOrderPlacedTranslator() - ]), - serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]) + serializers: PayloadSerializers::createFromEmpty(), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @When pushing an order placed record */ - $repository->push(records: EventRecords::createFrom(elements: [ + /** @And a record to push */ + $records = EventRecords::createFrom(elements: [ EventRecordFactory::create( event: new OrderPlaced(), aggregateType: 'Order' ) - ])); + ]); - /** @And the transaction is committed */ - self::$connection->commit(); + /** @Then an exception indicating no serializer is configured is thrown */ + $this->expectException(PayloadSerializerNotConfigured::class); + $this->expectExceptionMessage( + 'No payload serializer configured for event class .' + ); - /** @Then the persisted event_type reflects the first translator's output */ - self::assertSame('OrderShipped', self::$connection->fetchOne('SELECT event_type FROM outbox_events LIMIT 1')); + /** @When pushing the record */ + $repository->push(records: $records); } - public function testPushWhenTranslatorMatchesButNoSerializerThenPayloadSerializerNotConfigured(): void + public function testPushWhenUniqueConstraintWithCustomNameThenDuplicateAggregateVersionIsThrown(): void { - /** @Given a repository with a translator but no serializer that matches the produced integration event */ + /** @Given a mocked connection with an active transaction */ + $connection = self::createConfiguredStub(Connection::class, ['isTransactionActive' => true]); + + /** @And a custom table layout with a distinct unique constraint name */ + $tableLayout = TableLayout::builder() + ->withUniqueConstraint(name: 'unq_custom_outbox_aggregate_version') + ->build(); + + /** @And the connection raises a violation on the custom constraint name */ + $connection->method('executeStatement')->willThrowException( + new UniqueConstraintViolationException( + new DriverExceptionStub('Duplicate entry for key unq_custom_outbox_aggregate_version'), + null + ) + ); + + /** @Then a duplicate aggregate version exception is thrown */ + $this->expectException(DuplicateAggregateVersion::class); + + /** @When pushing a record with the custom table layout */ + new DoctrineOutboxRepository( + connection: $connection, + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), + tableLayout: $tableLayout + )->push( + records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order' + ) + ]) + ); + } + + public function testPushWhenUniqueConstraintOnAggregateVersionThenDuplicateAggregateVersionIsThrown(): void + { + /** @Given a mocked connection with an active transaction */ + $connection = self::createConfiguredStub(Connection::class, ['isTransactionActive' => true]); + + /** @And the connection raises an aggregate version constraint violation */ + $connection->method('executeStatement')->willThrowException( + new UniqueConstraintViolationException( + new DriverExceptionStub( + 'Duplicate entry for key unq_outbox_events_aggregate_type_aggregate_id_aggregate_version' + ), + null + ) + ); + + /** @Then a duplicate aggregate version exception is thrown */ + $this->expectException(DuplicateAggregateVersion::class); + $this->expectExceptionMessage( + 'Duplicate aggregate version for at aggregate version <1>.' + ); + + /** @When pushing the record */ + new DoctrineOutboxRepository( + connection: $connection, + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]) + )->push( + records: EventRecords::createFrom(elements: [ + EventRecordFactory::create( + event: new OrderPlaced(), + aggregateType: 'Order', + aggregateId: '6ba7b810-9dad-11d1-80b4-00c04fd430c8' + ) + ]) + ); + } + + public function testPushWhenSerializerDoesNotSupportIntegrationEventThenPayloadSerializerNotConfigured(): void + { + /** @Given a repository with a refund translator but only an order serializer */ $repository = new DoctrineOutboxRepository( connection: self::$connection, - translators: IntegrationEventTranslators::createFrom(elements: [new OrderPlacedTranslator()]), - serializers: PayloadSerializers::createFromEmpty() + serializers: PayloadSerializers::createFrom(elements: [new OrderPlacedSerializer()]), + translators: IntegrationEventTranslators::createFrom(elements: [new RefundIssuedTranslator()]) ); /** @And the connection has an active transaction */ self::$connection->beginTransaction(); - /** @And a record to push */ + /** @And a refund event that the order serializer does not support */ $records = EventRecords::createFrom(elements: [ EventRecordFactory::create( - event: new OrderPlaced(), - aggregateType: 'Order' + event: new RefundIssued(), + aggregateType: 'Refund' ) ]); /** @Then an exception indicating no serializer is configured is thrown */ $this->expectException(PayloadSerializerNotConfigured::class); $this->expectExceptionMessage( - 'No payload serializer configured for event class .' + 'No payload serializer configured for event class .' ); - /** @When pushing the record */ + /** @When pushing the unsupported event */ $repository->push(records: $records); } } diff --git a/tests/Integration/OutboxTableFactory.php b/tests/Integration/OutboxTableFactory.php index 06c60de..424cb6f 100644 --- a/tests/Integration/OutboxTableFactory.php +++ b/tests/Integration/OutboxTableFactory.php @@ -38,19 +38,19 @@ public static function createWithBinaryIdentities(Connection $connection, TableL sprintf( $template, $tableLayout->tableName, - $tableLayout->columns->id->name, + $tableLayout->columns->id->name(), $tableLayout->columns->payload, $tableLayout->columns->revision, $tableLayout->columns->eventType, $tableLayout->columns->occurredAt, - $tableLayout->columns->aggregateId->name, + $tableLayout->columns->aggregateId->name(), $tableLayout->columns->aggregateType, $tableLayout->columns->aggregateVersion, $tableLayout->columns->createdAt, - $tableLayout->columns->id->name, + $tableLayout->columns->id->name(), $tableLayout->uniqueConstraint->name, $tableLayout->columns->aggregateType, - $tableLayout->columns->aggregateId->name, + $tableLayout->columns->aggregateId->name(), $tableLayout->columns->aggregateVersion ) ); @@ -81,19 +81,19 @@ public static function createWithStringIdentities(Connection $connection, TableL sprintf( $template, $tableLayout->tableName, - $tableLayout->columns->id->name, + $tableLayout->columns->id->name(), $tableLayout->columns->payload, $tableLayout->columns->revision, $tableLayout->columns->eventType, $tableLayout->columns->occurredAt, - $tableLayout->columns->aggregateId->name, + $tableLayout->columns->aggregateId->name(), $tableLayout->columns->aggregateType, $tableLayout->columns->aggregateVersion, $tableLayout->columns->createdAt, - $tableLayout->columns->id->name, + $tableLayout->columns->id->name(), $tableLayout->uniqueConstraint->name, $tableLayout->columns->aggregateType, - $tableLayout->columns->aggregateId->name, + $tableLayout->columns->aggregateId->name(), $tableLayout->columns->aggregateVersion ) ); diff --git a/tests/Models/EventRecordFactory.php b/tests/Models/EventRecordFactory.php index fa34fdf..2591991 100644 --- a/tests/Models/EventRecordFactory.php +++ b/tests/Models/EventRecordFactory.php @@ -28,9 +28,9 @@ public static function create( ): EventRecord { return EventRecord::from( event: $event, - aggregateId: new OrderId(value: $aggregateId ?? Uuid::generateV7()->toString()), + aggregateId: new OrderId(value: ($aggregateId ?? Uuid::generateV7()->toString())), aggregateType: $aggregateType, - aggregateVersion: $aggregateVersion ?? AggregateVersion::first(), + aggregateVersion: ($aggregateVersion ?? AggregateVersion::first()), id: is_null($id) ? null : Uuid::from(value: $id->toString()), occurredAt: is_null($occurredAt) ? null : Utc::fromIso8601(value: $occurredAt->toIso8601()) ); diff --git a/tests/Unit/InMemoryOutboxRepositoryMock.php b/tests/Unit/InMemoryOutboxRepositoryMock.php index 4adc232..f973a67 100644 --- a/tests/Unit/InMemoryOutboxRepositoryMock.php +++ b/tests/Unit/InMemoryOutboxRepositoryMock.php @@ -28,86 +28,96 @@ public function __construct( ) { } - public function beginTransaction(): void + public function push(EventRecords $records): void { - $this->transactionActive = true; - } + if (!$this->transactionActive) { + throw OutboxRequiresActiveTransaction::asMissing(); + } - public function commit(): void - { - $this->transactionActive = false; + $records->each(actions: function (EventRecord $eventRecord): void { + $this->record(eventRecord: $eventRecord); + }); } - public function persistedRecords(): array + public function store(IntegrationEventRecord $record): void { - return $this->records; + $aggregateKey = sprintf( + '%s|%s|%d', + $record->aggregateType, + $record->aggregateId->identityValue(), + $record->aggregateVersion->value + ); + + if (isset($this->aggregateVersions[$aggregateKey])) { + throw DuplicateAggregateVersion::forRecord( + previous: null, + aggregateId: $record->aggregateId->identityValue(), + aggregateType: $record->aggregateType, + aggregateVersion: $record->aggregateVersion->value + ); + } + + $eventId = $record->id->toString(); + + if (isset($this->records[$eventId])) { + throw DuplicateOutboxEvent::forRecord( + eventId: $eventId, + previous: new UniqueConstraintViolationException( + new DriverExceptionStub('Duplicate entry for key PRIMARY'), + null + ) + ); + } + + $this->aggregateVersions[$aggregateKey] = true; + $this->records[$eventId] = $record; } - public function rollback(): void + public function commit(): void { $this->transactionActive = false; - $this->records = []; - $this->aggregateVersions = []; } - public function push(EventRecords $records): void + public function record(EventRecord $eventRecord): void { - if (!$this->transactionActive) { - throw OutboxRequiresActiveTransaction::asMissing(); + $translator = $this->translators->findFor(record: $eventRecord); + + if (is_null($translator)) { + return; } - $records->each(actions: function (EventRecord $eventRecord): void { - $translator = $this->translators->findFor(record: $eventRecord); + $integrationEventRecord = IntegrationEventRecord::from( + eventRecord: $eventRecord, + integrationEvent: $translator->translate(record: $eventRecord) + ); - if (is_null($translator)) { - return; - } + $payloadSerializer = $this->payloadSerializers->findFor(record: $integrationEventRecord); - $integrationEventRecord = IntegrationEventRecord::from( - eventRecord: $eventRecord, - integrationEvent: $translator->translate(record: $eventRecord) + if (is_null($payloadSerializer)) { + throw PayloadSerializerNotConfigured::forEventClass( + eventClass: $integrationEventRecord->event::class ); + } - $payloadSerializer = $this->payloadSerializers->findFor(record: $integrationEventRecord); + $payloadSerializer->serialize(record: $integrationEventRecord); - if (is_null($payloadSerializer)) { - throw PayloadSerializerNotConfigured::forEventClass( - eventClass: $integrationEventRecord->event::class - ); - } + $this->store(record: $integrationEventRecord); + } - $payloadSerializer->serialize(record: $integrationEventRecord); + public function rollback(): void + { + $this->transactionActive = false; + $this->records = []; + $this->aggregateVersions = []; + } - $aggregateKey = sprintf( - '%s|%s|%d', - $integrationEventRecord->aggregateType, - $integrationEventRecord->aggregateId->identityValue(), - $integrationEventRecord->aggregateVersion->value - ); + public function beginTransaction(): void + { + $this->transactionActive = true; + } - if (isset($this->aggregateVersions[$aggregateKey])) { - throw DuplicateAggregateVersion::forRecord( - previous: null, - aggregateId: $integrationEventRecord->aggregateId->identityValue(), - aggregateType: $integrationEventRecord->aggregateType, - aggregateVersion: $integrationEventRecord->aggregateVersion->value - ); - } - - $eventId = $integrationEventRecord->id->toString(); - - if (isset($this->records[$eventId])) { - throw DuplicateOutboxEvent::forRecord( - eventId: $eventId, - previous: new UniqueConstraintViolationException( - new DriverExceptionStub('Duplicate entry for key PRIMARY'), - null - ) - ); - } - - $this->aggregateVersions[$aggregateKey] = true; - $this->records[$eventId] = $integrationEventRecord; - }); + public function persistedRecords(): array + { + return $this->records; } } From 4461b1cbf5e523e2b54910697a5bb1bec1a6c73a Mon Sep 17 00:00:00 2001 From: Gustavo Freze Date: Thu, 2 Jul 2026 13:51:49 -0300 Subject: [PATCH 2/4] build: Make the Docker run command TTY-aware. --- Makefile | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index fbabced..efdfa53 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,9 @@ ifeq ($(ARCH),arm64) PLATFORM := --platform=linux/amd64 endif -DOCKER_RUN = docker run ${PLATFORM} --rm -it --net=host \ +TTY := $(shell [ -t 0 ] && echo -it) + +DOCKER_RUN = docker run ${PLATFORM} --rm ${TTY} --net=host \ -e DATABASE_HOST=outbox-test-db \ -e TEST_DB_HOST_PORT=33306 \ -e DATABASE_NAME=outbox_test \ From cbbe49243c063470b134f8c4be7eac1fc764c05c Mon Sep 17 00:00:00 2001 From: Gustavo Freze Date: Thu, 2 Jul 2026 14:06:26 -0300 Subject: [PATCH 3/4] build: Adopt the Slevomat coding standard. --- composer.json | 2 ++ phpcs.xml | 88 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 89 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index a163ab8..80ae924 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,7 @@ "infection/infection": "^0.33", "phpstan/phpstan": "^2.2", "phpunit/phpunit": "^13.1", + "slevomat/coding-standard": "^8.30", "squizlabs/php_codesniffer": "^4.0", "tiny-blocks/docker-container": "^2.5", "tiny-blocks/environment-variable": "^1.2" @@ -59,6 +60,7 @@ }, "config": { "allow-plugins": { + "dealerdirect/phpcodesniffer-composer-installer": true, "ergebnis/composer-normalize": true, "infection/extension-installer": true }, diff --git a/phpcs.xml b/phpcs.xml index a52372c..8f95184 100644 --- a/phpcs.xml +++ b/phpcs.xml @@ -1,7 +1,93 @@ Code style for the tiny-blocks library. - + src tests + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 24911de4dbda3990357b027b939992c6420fbbaf Mon Sep 17 00:00:00 2001 From: Gustavo Freze Date: Thu, 2 Jul 2026 14:07:07 -0300 Subject: [PATCH 4/4] chore: Add the PHP member-ordering and prose-punctuation hooks. Wire the two hooks as PostToolUse commands in settings.json, document the conventions they enforce in the library rules, and mirror the changes in the tiny-blocks-create scaffolding. Keep the Python hook caches out of Git and out of GitHub language statistics. --- .claude/hooks/php-ordering-conformance.py | 607 ++++++++++++++++++ .../php-prose-punctuation-conformance.py | 176 +++++ .claude/rules/php-library-code-style.md | 5 +- .claude/rules/php-library-tooling.md | 27 +- .claude/settings.json | 17 + .claude/skills/tiny-blocks-create/SKILL.md | 5 +- .../assets/config/.gitattributes | 3 + .../assets/config/.gitignore | 4 + .../assets/config/composer.json | 2 + .../assets/config/phpcs.xml | 88 ++- .gitattributes | 3 + .gitignore | 4 + 12 files changed, 927 insertions(+), 14 deletions(-) create mode 100644 .claude/hooks/php-ordering-conformance.py create mode 100644 .claude/hooks/php-prose-punctuation-conformance.py diff --git a/.claude/hooks/php-ordering-conformance.py b/.claude/hooks/php-ordering-conformance.py new file mode 100644 index 0000000..21a3ec0 --- /dev/null +++ b/.claude/hooks/php-ordering-conformance.py @@ -0,0 +1,607 @@ +#!/usr/bin/env python3 +"""PHP ordering conformance hook for tiny-blocks PHP libraries. + +Self-contained PostToolUse hook on Edit|Write|MultiEdit. Verifies the deterministic +ordering conventions for PHP declarations: + +- Parameter ordering: declaration parameters (constructors, factories, methods, + property promotion) in three tiers, required parameters first, then defaulted + parameters, then a variadic, each tier by identifier length ascending, + alphabetical tie-breaker, semantic pairs preserved. A PHPUnit test method fed by + a data provider is exempt, its parameters are the columns of its data set. +- Member ordering: constants, enum cases, constructor, static methods, instance + methods, in that group order, each group length-ascending with alphabetical + tie-breaker. PHPUnit test classes instead order methods as lifecycle hooks (in + execution order), then other methods, then data providers. + +The analysis is pure (FileUnit in, Violation out) and runs in three passes over +well-formed PHP: a lexical pass blanks every comment, string, and heredoc/nowdoc +body (LITERALS), a structural pass maps every bracket to its pair (bracket_spans); +extraction assigns tokens of interest to their containers by flat walks. Control +flow uses guard clauses only and nesting never exceeds two levels. Reports +violations to stderr and exits 2 to prompt Claude, exits 0 silently if no violations +or the file is out of scope. +""" + +import json +import re +import sys +from dataclasses import dataclass +from enum import Enum +from functools import cached_property +from pathlib import Path +from typing import Final + +# --- Configuration ---------------------------------------------------------- + +# In-scope files: PHP sources under src/ or tests/. +SCOPE_PATTERN: Final = re.compile(r"(^|/)(src|tests)/.+\.php$") + +# Semantic pairs (exhaustive). Natural order wins between +# the two members when both appear in the same parameter list. +SEMANTIC_PAIRS: Final = ( + ("start", "end"), + ("from", "to"), + ("startAt", "endAt"), + ("createdAt", "updatedAt"), + ("before", "after"), + ("min", "max"), +) + +# Each member maps to (first, second, position). Both members keep their natural +# order only when both are present, sorting as a unit at the lead member's key. +PAIR_MEMBER: Final = { + member: (first, second, position) + for first, second in SEMANTIC_PAIRS + for position, member in enumerate((first, second)) +} + +MODIFIERS: Final = ("abstract", "final", "private", "protected", "public", "static") + +# The lexical grammar: every PHP construct that must not be scanned as code. +# Alternatives are ordered, the heredoc label closes via backreference. +LITERALS: Final = re.compile( + r""" + /\*.*?\*/ # block comment + | //[^\n]* # line comment + | \#(?!\[)[^\n]* # hash comment, never a #[ attribute + | <<<[ \t]*(?P['"]?)(?P