Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
permissions: read-all
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v6
with:
fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis
- uses: sonarsource/sonarqube-scan-action@master
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6
- uses: php-actions/composer@v6 # or alternative dependency management
- uses: php-actions/phpunit@v4
- name: Run PHP CS Fixer
Expand Down
1 change: 1 addition & 0 deletions .php-cs-fixer.dist.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
$config->setRules([
'@Symfony' => true,
'@Symfony:risky' => true,
'@PHP8x2Migration:risky' => true,
'@PSR12' => true,
'array_syntax' => [
'syntax' => 'short',
Expand Down
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Version 6.0.0
* Add managing exceptions with a stream

# Version 5.5.0
* Added cleanup commands
* Updated for PHP 8.2
Expand All @@ -6,7 +9,7 @@
* Fix File Exceptions integration

# Version 5.4.0
* Add possibility to save exceptions in file
* Add the possibility to save exceptions in the file

# Version 5.3.1
* Fix interface naming
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ providing an easy way to create import / export dataflow.

| Dataflow | Symfony | Support |
|----------|--------------------------|---------|
| 6.x | ^7.4 | yes |
| 5.x | ^7.3 | yes |
| 4.x | 3.4 \| 4.x \| 5.x \| 6.x | yes |
| 3.x | 3.4 \| 4.x \| 5.x | no |
Expand Down
9 changes: 6 additions & 3 deletions Tests/Processor/JobProcessorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use CodeRhapsodie\DataflowBundle\Entity\Job;
use CodeRhapsodie\DataflowBundle\Event\Events;
use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent;
use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface;
use CodeRhapsodie\DataflowBundle\Gateway\JobGateway;
use CodeRhapsodie\DataflowBundle\Processor\JobProcessor;
use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface;
Expand All @@ -22,15 +23,17 @@ class JobProcessorTest extends TestCase
private DataflowTypeRegistryInterface|MockObject $registry;
private EventDispatcherInterface|MockObject $dispatcher;
private JobGateway|MockObject $jobGateway;
private ExceptionHandlerInterface|MockObject $exceptionHandler;

protected function setUp(): void
{
$this->repository = $this->createMock(JobRepository::class);
$this->registry = $this->createMock(DataflowTypeRegistryInterface::class);
$this->dispatcher = $this->createMock(EventDispatcherInterface::class);
$this->jobGateway = $this->createMock(JobGateway::class);
$this->exceptionHandler = $this->createMock(ExceptionHandlerInterface::class);

$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway);
$this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway, $this->exceptionHandler);
}

public function testProcess()
Expand Down Expand Up @@ -64,7 +67,7 @@ public function testProcess()
->willReturn($dataflowType)
;

$bag = [new \Exception('message1')];
$bag = 1;

$result = new Result('name', new \DateTimeImmutable(), $end = new \DateTimeImmutable(), $count = 10, $bag);

Expand All @@ -85,6 +88,6 @@ public function testProcess()
$this->assertGreaterThanOrEqual($now, $job->getStartTime());
$this->assertSame(Job::STATUS_COMPLETED, $job->getStatus());
$this->assertSame($end, $job->getEndTime());
$this->assertSame($count - count($bag), $job->getCount());
$this->assertSame($count - $bag, $job->getCount());
}
}
22 changes: 11 additions & 11 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@
"doctrine/doctrine-bundle": "^2.0",
"monolog/monolog": "^2.0||^3.0",
"psr/log": "^1.1||^2.0||^3.0",
"symfony/config": "^7.3",
"symfony/console": "^7.3",
"symfony/dependency-injection": "^7.3",
"symfony/event-dispatcher": "^7.3",
"symfony/http-kernel": "^7.3",
"symfony/lock": "^7.3",
"symfony/monolog-bridge": "^7.3",
"symfony/options-resolver": "^7.3",
"symfony/validator": "^7.3",
"symfony/yaml": "^7.3"
"symfony/config": "^7.4",
"symfony/console": "^7.4",
"symfony/dependency-injection": "^7.4",
"symfony/event-dispatcher": "^7.4",
"symfony/http-kernel": "^7.4",
"symfony/lock": "^7.4",
"symfony/monolog-bridge": "^7.4",
"symfony/options-resolver": "^7.4",
"symfony/validator": "^7.4",
"symfony/yaml": "^7.4"
},
"require-dev": {
"amphp/amp": "^2.5",
Expand All @@ -65,7 +65,7 @@
"phpunit/phpunit": "^11",
"portphp/portphp": "^1.9",
"rector/rector": "^2.0",
"symfony/messenger": "^7.3"
"symfony/messenger": "^7.4"
},
"suggest": {
"amphp/amp": "Provide asynchronous steps for your dataflows",
Expand Down
7 changes: 3 additions & 4 deletions src/DataflowType/AbstractDataflowType.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
namespace CodeRhapsodie\DataflowBundle\DataflowType;

use CodeRhapsodie\DataflowBundle\Repository\JobRepository;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Psr\Log\LoggerInterface;
use Symfony\Component\OptionsResolver\OptionsResolver;

abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, AutoUpdateCountInterface
abstract class AbstractDataflowType implements DataflowTypeInterface, AutoUpdateCountInterface
{
use LoggerAwareTrait;

Expand All @@ -36,7 +35,7 @@ public function process(array $options, ?int $jobId = null): Result

$builder = $this->createDataflowBuilder();
$builder->setName($this->getLabel());
$builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId) {
$builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId): void {
if ($jobId === null || $this->saveDate > new \DateTime()) {
return;
}
Expand All @@ -46,7 +45,7 @@ public function process(array $options, ?int $jobId = null): Result
});
$this->buildDataflow($builder, $options);
$dataflow = $builder->getDataflow();
if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) {
if ($this->logger instanceof LoggerInterface) {
$dataflow->setLogger($this->logger);
}

Expand Down
23 changes: 9 additions & 14 deletions src/DataflowType/Dataflow/AMPAsyncDataflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;

class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface
class AMPAsyncDataflow implements DataflowInterface
{
use LoggerAwareTrait;

Expand Down Expand Up @@ -65,7 +64,7 @@ public function addWriter(WriterInterface $writer): self
public function process(): Result
{
$count = 0;
$exceptions = [];
$countExceptions = 0;
$startTime = new \DateTime();

try {
Expand All @@ -82,7 +81,7 @@ public function process(): Result
}
});

$watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$exceptions) {
$watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$countExceptions) {
if (yield $producer->advance()) {
$it = $producer->getCurrent();
[$index, $item] = $it;
Expand All @@ -93,7 +92,7 @@ public function process(): Result
}

foreach ($this->states as $state) {
$this->processState($state, $count, $exceptions);
$this->processState($state, $count, $countExceptions);
}
});

Expand All @@ -104,18 +103,14 @@ public function process(): Result
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
++$countExceptions;
$this->logException($e);
}

return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
return new Result($this->name, $startTime, new \DateTime(), $count, $countExceptions);
}

/**
* @param int $count internal count reference
* @param array $exceptions internal exceptions
*/
private function processState(mixed $state, int &$count, array &$exceptions): void
private function processState(mixed $state, int &$count, int &$countExceptions): void
{
[$readIndex, $stepIndex, $item] = $state;
if ($stepIndex < \count($this->steps)) {
Expand All @@ -127,9 +122,9 @@ private function processState(mixed $state, int &$count, array &$exceptions): vo
$this->stepsJobs[$stepIndex][$readIndex] = true;
/** @var Promise<void> $promise */
$promise = coroutine($step)($item);
$promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) {
$promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$countExceptions): void {
if ($exception) {
$exceptions[$stepIndex] = $exception;
++$countExceptions;
$this->logException($exception, (string) $stepIndex);
} elseif ($newItem === false) {
unset($this->states[$readIndex]);
Expand Down
13 changes: 6 additions & 7 deletions src/DataflowType/Dataflow/Dataflow.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;

class Dataflow implements DataflowInterface, LoggerAwareInterface
class Dataflow implements DataflowInterface
{
use LoggerAwareTrait;

Expand Down Expand Up @@ -73,7 +72,7 @@ public function setAfterItemProcessors(array $processors): self
public function process(): Result
{
$count = 0;
$exceptions = [];
$countExceptions = 0;
$startTime = new \DateTime();

try {
Expand All @@ -91,10 +90,10 @@ public function process(): Result
$exceptionIndex = (string) ($this->customExceptionIndex)($item, $index);
}
} catch (\Throwable $e2) {
$exceptions[$index] = $e2;
++$countExceptions;
$this->logException($e2, $index);
}
$exceptions[$exceptionIndex] = $e;
++$countExceptions;
$this->logException($e, $exceptionIndex);
}

Expand All @@ -109,11 +108,11 @@ public function process(): Result
$writer->finish();
}
} catch (\Throwable $e) {
$exceptions[] = $e;
++$countExceptions;
$this->logException($e);
}

return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions);
return new Result($this->name, $startTime, new \DateTime(), $count, $countExceptions);
}

private function processItem(mixed $item): void
Expand Down
3 changes: 2 additions & 1 deletion src/DataflowType/Dataflow/DataflowInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow;

use CodeRhapsodie\DataflowBundle\DataflowType\Result;
use Psr\Log\LoggerAwareInterface;

/**
* Combines a reader, steps and writers as a data processing workflow.
*/
interface DataflowInterface
interface DataflowInterface extends LoggerAwareInterface
{
/**
* Processes the data.
Expand Down
4 changes: 3 additions & 1 deletion src/DataflowType/DataflowTypeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

namespace CodeRhapsodie\DataflowBundle\DataflowType;

interface DataflowTypeInterface
use Psr\Log\LoggerAwareInterface;

interface DataflowTypeInterface extends LoggerAwareInterface
{
public function getLabel(): string;

Expand Down
10 changes: 2 additions & 8 deletions src/DataflowType/Result.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,12 @@ class Result
{
private readonly \DateInterval $elapsed;

private int $errorCount = 0;
private int $successCount;

private int $successCount = 0;

private readonly array $exceptions;

public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private readonly \DateTimeInterface $endTime, private readonly int $totalProcessedCount, array $exceptions)
public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private readonly \DateTimeInterface $endTime, private readonly int $totalProcessedCount, private readonly int $errorCount)
{
$this->elapsed = $startTime->diff($endTime);
$this->errorCount = \count($exceptions);
$this->successCount = $totalProcessedCount - $this->errorCount;
$this->exceptions = $exceptions;
}

public function getName(): string
Expand Down
6 changes: 3 additions & 3 deletions src/DataflowType/Writer/CollectionWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ public function __construct(private readonly WriterInterface $writer)
{
}

public function prepare()
public function prepare(): void
{
$this->writer->prepare();
}

public function write($collection)
public function write($collection): void
{
if (!is_iterable($collection)) {
throw new UnsupportedItemTypeException(\sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection)));
Expand All @@ -34,7 +34,7 @@ public function write($collection)
}
}

public function finish()
public function finish(): void
{
$this->writer->finish();
}
Expand Down
6 changes: 3 additions & 3 deletions src/DataflowType/Writer/DelegatorWriter.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ public function __construct()
{
}

public function prepare()
public function prepare(): void
{
foreach ($this->delegates as $delegate) {
$delegate->prepare();
}
}

public function write($item)
public function write($item): void
{
foreach ($this->delegates as $delegate) {
if (!$delegate->supports($item)) {
Expand All @@ -43,7 +43,7 @@ public function write($item)
throw new UnsupportedItemTypeException(\sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item)));
}

public function finish()
public function finish(): void
{
foreach ($this->delegates as $delegate) {
$delegate->finish();
Expand Down
6 changes: 3 additions & 3 deletions src/DataflowType/Writer/PortWriterAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ public function __construct(private readonly \Port\Writer $writer)
{
}

public function prepare()
public function prepare(): void
{
$this->writer->prepare();
}

public function write($item)
public function write($item): void
{
$this->writer->writeItem((array) $item);
}

public function finish()
public function finish(): void
{
$this->writer->finish();
}
Expand Down
Loading