Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions docs/UPGRADE-4.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,75 @@ Further changes:
* `ProcessedResult` now extends `Result`, so the `execute` method always returns a `Result`. The `Boot` and `Run` commands return a `ProcessedResult`.
* The `DefaultSubscriptionEngine` accepts an optional `EventDispatcherInterface` as last constructor argument to hook into the engine with own listeners.

### SubscriberHelper and SubscriberUtil

The deprecated `Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberHelper`
and the `Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberUtil` trait have been removed.

If you used them inside a projector to keep the projector id and the table name in sync,
use a constant instead:

```php
use Doctrine\DBAL\Connection;
use Patchlevel\EventSourcing\Attribute\Projector;

#[Projector(self::TABLE)]
final class HotelProjector
{
// use a const for easier access in the projector & to keep projector id and table name in sync
private const TABLE = 'hotel';

public function __construct(
private readonly Connection $db,
) {
}

/** @return list<array{id: string, name: string, guests: int}> */
public function getHotels(): array
{
return $this->db->fetchAllAssociative(sprintf('SELECT id, name, guests FROM %s;', self::TABLE));
}

// ...
}
```

If you still need the subscriber id elsewhere, read it from the metadata instead:

```php
use Patchlevel\EventSourcing\Metadata\Subscriber\AttributeSubscriberMetadataFactory;

$metadata = (new AttributeSubscriberMetadataFactory())->metadata($subscriber::class);
$subscriberId = $metadata->id;
```
Comment thread
DavidBadura marked this conversation as resolved.

### SubscriberAccessor and RealSubscriberAccessor

The `Patchlevel\EventSourcing\Subscription\Subscriber\SubscriberAccessor` and
`Patchlevel\EventSourcing\Subscription\Subscriber\RealSubscriberAccessor` interfaces have been removed.
Use `Patchlevel\EventSourcing\Subscription\Subscriber\MetadataSubscriberAccessor` directly.

Accordingly, `SubscriberAccessorRepository::get()` now returns a `MetadataSubscriberAccessor|null`
instead of a `SubscriberAccessor|null`.

The deprecated methods `id()`, `group()` and `runMode()` on `MetadataSubscriberAccessor` have been removed.
Use `->metadata()->id`, `->metadata()->group` and `->metadata()->runMode` instead.

### AggregateIdArgumentResolver

The deprecated `Patchlevel\EventSourcing\Subscription\Subscriber\ArgumentResolver\AggregateIdArgumentResolver`
has been removed. Automatically resolving the aggregate id in a stream store is not possible.
Add the aggregate id to your events instead.

### ArgumentMetadata

The subscriber argument resolver now uses `symfony/type-info` to describe argument types.

`Patchlevel\EventSourcing\Metadata\Subscriber\ArgumentMetadata` no longer carries a `string $type`
and a `bool $allowsNull` property. Instead it now has a single `Symfony\Component\TypeInfo\Type $type` property.

If you implemented a custom `ArgumentResolver`, adjust it to read the type from the new `Type` object.

## Store

### StreamStore
Expand All @@ -160,6 +229,49 @@ And all the associated classes:

`StreamReadOnlyStore` was been merged in `ReadOnlyStore`.

## Stream

The stream handling has been reworked. Previously the `Stream` was an interface that every store had to
implement on its own (`ArrayStream`, `StreamDoctrineDbalStoreStream`, `TaggableDoctrineDbalStoreStream`,
`GeneratorStream`, ...). Now there is a single generic implementation that you can reuse.

### Stream interface

The `Patchlevel\EventSourcing\Store\Stream` interface has been removed and replaced by the concrete final
class `Patchlevel\EventSourcing\Message\Stream`.

All stores now return a `Patchlevel\EventSourcing\Message\Stream` from their `load()` method.
The following store specific stream implementations have been removed:

* `Patchlevel\EventSourcing\Store\ArrayStream`
* `Patchlevel\EventSourcing\Store\StreamDoctrineDbalStoreStream`
* `Patchlevel\EventSourcing\Store\TaggableDoctrineDbalStoreStream`
* `Patchlevel\EventSourcing\Subscription\Engine\GeneratorStream`

The new `Stream` class implements `Iterator` and accepts any `iterable<Message>` in its constructor.
The `index()`, `position()`, `end()` and `close()` methods remain available.
In addition there are now the helper methods `toList()`, `toArray()`, `transform()` and `chunk()`.

### Pipe

`Patchlevel\EventSourcing\Message\Pipe` has been removed. Use `Stream::transform()` instead.

before:

```php
use Patchlevel\EventSourcing\Message\Pipe;

$messages = (new Pipe($messages, $translator))->toArray();
```

after:

```php
use Patchlevel\EventSourcing\Message\Stream;

$messages = (new Stream($messages))->transform($translator)->toList();
```

## Message

### AggregateHeader
Expand Down
2 changes: 1 addition & 1 deletion docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ final class HotelProjector
/** @return list<array{id: string, name: string, guests: int}> */
public function getHotels(): array
{
return $this->db->fetchAllAssociative(sprintf('SELECT id, name, guests FROM %s;'), self::TABLE);
return $this->db->fetchAllAssociative(sprintf('SELECT id, name, guests FROM %s;', self::TABLE));
}

#[Subscribe(HotelCreated::class)]
Expand Down
18 changes: 12 additions & 6 deletions docs/message.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,27 +97,33 @@ use Patchlevel\EventSourcing\Message\Message;
/** @var Message $message */
$message->header(ApplicationHeader::class);
```
## Pipe
## Stream

The `Pipe` is a construct that allows you to chain multiple translators.
A `Stream` wraps an iterable of messages and allows you to chain multiple translators with `transform`.
This can be used to manipulate, filter or expand messages or events.
This can be used for anti-corruption layers, data migration, or to fix errors in the event stream.

```php
use Patchlevel\EventSourcing\Message\Pipe;
use Patchlevel\EventSourcing\Message\Stream;
use Patchlevel\EventSourcing\Message\Translator\ExcludeEventTranslator;
use Patchlevel\EventSourcing\Message\Translator\RecalculatePlayheadTranslator;

$messages = new Pipe(
$messages,
$stream = (new Stream($messages))->transform(
new ExcludeEventTranslator([ProfileCreated::class]),
new RecalculatePlayheadTranslator(),
);

foreach ($messages as $message) {
foreach ($stream as $message) {
// do something with the message
}
```

:::tip
A `Stream` is also what every store returns from its `load` method, so you can apply the same
transformations to the messages you read from the store. Besides iterating, a `Stream` offers
`toList()`, `toArray()` and `chunk()` to consume the messages.
:::

## Translator

Translator can be used to manipulate, filter or expand messages or events.
Expand Down
Loading