diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cb7b455..d5d9214 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,7 +12,7 @@ jobs: runs-on: ubuntu-latest permissions: read-all steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v6 with: fetch-depth: 0 # Shallow clones should be disabled for a better relevancy of analysis - uses: sonarsource/sonarqube-scan-action@master diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3c02c2b..ac25f8c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - uses: php-actions/composer@v6 # or alternative dependency management - uses: php-actions/phpunit@v4 - name: Run PHP CS Fixer diff --git a/.php-cs-fixer.dist.php b/.php-cs-fixer.dist.php index 385218d..fcdc587 100644 --- a/.php-cs-fixer.dist.php +++ b/.php-cs-fixer.dist.php @@ -9,6 +9,7 @@ $config->setRules([ '@Symfony' => true, '@Symfony:risky' => true, + '@PHP8x2Migration:risky' => true, '@PSR12' => true, 'array_syntax' => [ 'syntax' => 'short', diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ce7ca1..699edf5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# Version 6.0.0 +* Add managing exceptions with a stream + # Version 5.5.0 * Added cleanup commands * Updated for PHP 8.2 @@ -6,7 +9,7 @@ * Fix File Exceptions integration # Version 5.4.0 -* Add possibility to save exceptions in file +* Add the possibility to save exceptions in the file # Version 5.3.1 * Fix interface naming diff --git a/README.md b/README.md index 5c251a7..04a5465 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ providing an easy way to create import / export dataflow. | Dataflow | Symfony | Support | |----------|--------------------------|---------| +| 6.x | ^7.4 | yes | | 5.x | ^7.3 | yes | | 4.x | 3.4 \| 4.x \| 5.x \| 6.x | yes | | 3.x | 3.4 \| 4.x \| 5.x | no | diff --git a/Tests/Processor/JobProcessorTest.php b/Tests/Processor/JobProcessorTest.php index bd37342..008a7fd 100644 --- a/Tests/Processor/JobProcessorTest.php +++ b/Tests/Processor/JobProcessorTest.php @@ -7,6 +7,7 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent; +use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface; use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; use CodeRhapsodie\DataflowBundle\Processor\JobProcessor; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; @@ -22,6 +23,7 @@ class JobProcessorTest extends TestCase private DataflowTypeRegistryInterface|MockObject $registry; private EventDispatcherInterface|MockObject $dispatcher; private JobGateway|MockObject $jobGateway; + private ExceptionHandlerInterface|MockObject $exceptionHandler; protected function setUp(): void { @@ -29,8 +31,9 @@ protected function setUp(): void $this->registry = $this->createMock(DataflowTypeRegistryInterface::class); $this->dispatcher = $this->createMock(EventDispatcherInterface::class); $this->jobGateway = $this->createMock(JobGateway::class); + $this->exceptionHandler = $this->createMock(ExceptionHandlerInterface::class); - $this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway); + $this->processor = new JobProcessor($this->repository, $this->registry, $this->dispatcher, $this->jobGateway, $this->exceptionHandler); } public function testProcess() @@ -64,7 +67,7 @@ public function testProcess() ->willReturn($dataflowType) ; - $bag = [new \Exception('message1')]; + $bag = 1; $result = new Result('name', new \DateTimeImmutable(), $end = new \DateTimeImmutable(), $count = 10, $bag); @@ -85,6 +88,6 @@ public function testProcess() $this->assertGreaterThanOrEqual($now, $job->getStartTime()); $this->assertSame(Job::STATUS_COMPLETED, $job->getStatus()); $this->assertSame($end, $job->getEndTime()); - $this->assertSame($count - count($bag), $job->getCount()); + $this->assertSame($count - $bag, $job->getCount()); } } diff --git a/composer.json b/composer.json index e60f0e0..350d0b9 100644 --- a/composer.json +++ b/composer.json @@ -47,16 +47,16 @@ "doctrine/doctrine-bundle": "^2.0", "monolog/monolog": "^2.0||^3.0", "psr/log": "^1.1||^2.0||^3.0", - "symfony/config": "^7.3", - "symfony/console": "^7.3", - "symfony/dependency-injection": "^7.3", - "symfony/event-dispatcher": "^7.3", - "symfony/http-kernel": "^7.3", - "symfony/lock": "^7.3", - "symfony/monolog-bridge": "^7.3", - "symfony/options-resolver": "^7.3", - "symfony/validator": "^7.3", - "symfony/yaml": "^7.3" + "symfony/config": "^7.4", + "symfony/console": "^7.4", + "symfony/dependency-injection": "^7.4", + "symfony/event-dispatcher": "^7.4", + "symfony/http-kernel": "^7.4", + "symfony/lock": "^7.4", + "symfony/monolog-bridge": "^7.4", + "symfony/options-resolver": "^7.4", + "symfony/validator": "^7.4", + "symfony/yaml": "^7.4" }, "require-dev": { "amphp/amp": "^2.5", @@ -65,7 +65,7 @@ "phpunit/phpunit": "^11", "portphp/portphp": "^1.9", "rector/rector": "^2.0", - "symfony/messenger": "^7.3" + "symfony/messenger": "^7.4" }, "suggest": { "amphp/amp": "Provide asynchronous steps for your dataflows", diff --git a/src/DataflowType/AbstractDataflowType.php b/src/DataflowType/AbstractDataflowType.php index 543b173..8f3dddc 100644 --- a/src/DataflowType/AbstractDataflowType.php +++ b/src/DataflowType/AbstractDataflowType.php @@ -5,12 +5,11 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; -use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; use Psr\Log\LoggerInterface; use Symfony\Component\OptionsResolver\OptionsResolver; -abstract class AbstractDataflowType implements DataflowTypeInterface, LoggerAwareInterface, AutoUpdateCountInterface +abstract class AbstractDataflowType implements DataflowTypeInterface, AutoUpdateCountInterface { use LoggerAwareTrait; @@ -36,7 +35,7 @@ public function process(array $options, ?int $jobId = null): Result $builder = $this->createDataflowBuilder(); $builder->setName($this->getLabel()); - $builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId) { + $builder->addAfterItemProcessor(function (int|string $index, mixed $item, int $count) use ($jobId): void { if ($jobId === null || $this->saveDate > new \DateTime()) { return; } @@ -46,7 +45,7 @@ public function process(array $options, ?int $jobId = null): Result }); $this->buildDataflow($builder, $options); $dataflow = $builder->getDataflow(); - if ($dataflow instanceof LoggerAwareInterface && $this->logger instanceof LoggerInterface) { + if ($this->logger instanceof LoggerInterface) { $dataflow->setLogger($this->logger); } diff --git a/src/DataflowType/Dataflow/AMPAsyncDataflow.php b/src/DataflowType/Dataflow/AMPAsyncDataflow.php index 8d64929..cdacac3 100644 --- a/src/DataflowType/Dataflow/AMPAsyncDataflow.php +++ b/src/DataflowType/Dataflow/AMPAsyncDataflow.php @@ -16,10 +16,9 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result; use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface; -use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; -class AMPAsyncDataflow implements DataflowInterface, LoggerAwareInterface +class AMPAsyncDataflow implements DataflowInterface { use LoggerAwareTrait; @@ -65,7 +64,7 @@ public function addWriter(WriterInterface $writer): self public function process(): Result { $count = 0; - $exceptions = []; + $countExceptions = 0; $startTime = new \DateTime(); try { @@ -82,7 +81,7 @@ public function process(): Result } }); - $watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$exceptions) { + $watcherId = Loop::repeat($this->loopInterval, function () use ($deferred, &$resolved, $producer, &$count, &$countExceptions) { if (yield $producer->advance()) { $it = $producer->getCurrent(); [$index, $item] = $it; @@ -93,7 +92,7 @@ public function process(): Result } foreach ($this->states as $state) { - $this->processState($state, $count, $exceptions); + $this->processState($state, $count, $countExceptions); } }); @@ -104,18 +103,14 @@ public function process(): Result $writer->finish(); } } catch (\Throwable $e) { - $exceptions[] = $e; + ++$countExceptions; $this->logException($e); } - return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions); + return new Result($this->name, $startTime, new \DateTime(), $count, $countExceptions); } - /** - * @param int $count internal count reference - * @param array $exceptions internal exceptions - */ - private function processState(mixed $state, int &$count, array &$exceptions): void + private function processState(mixed $state, int &$count, int &$countExceptions): void { [$readIndex, $stepIndex, $item] = $state; if ($stepIndex < \count($this->steps)) { @@ -127,9 +122,9 @@ private function processState(mixed $state, int &$count, array &$exceptions): vo $this->stepsJobs[$stepIndex][$readIndex] = true; /** @var Promise $promise */ $promise = coroutine($step)($item); - $promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$exceptions) { + $promise->onResolve(function (?\Throwable $exception = null, $newItem = null) use ($stepIndex, $readIndex, &$countExceptions): void { if ($exception) { - $exceptions[$stepIndex] = $exception; + ++$countExceptions; $this->logException($exception, (string) $stepIndex); } elseif ($newItem === false) { unset($this->states[$readIndex]); diff --git a/src/DataflowType/Dataflow/Dataflow.php b/src/DataflowType/Dataflow/Dataflow.php index 81b2d99..c4d9d7b 100644 --- a/src/DataflowType/Dataflow/Dataflow.php +++ b/src/DataflowType/Dataflow/Dataflow.php @@ -6,10 +6,9 @@ use CodeRhapsodie\DataflowBundle\DataflowType\Result; use CodeRhapsodie\DataflowBundle\DataflowType\Writer\WriterInterface; -use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; -class Dataflow implements DataflowInterface, LoggerAwareInterface +class Dataflow implements DataflowInterface { use LoggerAwareTrait; @@ -73,7 +72,7 @@ public function setAfterItemProcessors(array $processors): self public function process(): Result { $count = 0; - $exceptions = []; + $countExceptions = 0; $startTime = new \DateTime(); try { @@ -91,10 +90,10 @@ public function process(): Result $exceptionIndex = (string) ($this->customExceptionIndex)($item, $index); } } catch (\Throwable $e2) { - $exceptions[$index] = $e2; + ++$countExceptions; $this->logException($e2, $index); } - $exceptions[$exceptionIndex] = $e; + ++$countExceptions; $this->logException($e, $exceptionIndex); } @@ -109,11 +108,11 @@ public function process(): Result $writer->finish(); } } catch (\Throwable $e) { - $exceptions[] = $e; + ++$countExceptions; $this->logException($e); } - return new Result($this->name, $startTime, new \DateTime(), $count, $exceptions); + return new Result($this->name, $startTime, new \DateTime(), $count, $countExceptions); } private function processItem(mixed $item): void diff --git a/src/DataflowType/Dataflow/DataflowInterface.php b/src/DataflowType/Dataflow/DataflowInterface.php index b2ad311..3d50a47 100644 --- a/src/DataflowType/Dataflow/DataflowInterface.php +++ b/src/DataflowType/Dataflow/DataflowInterface.php @@ -5,11 +5,12 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType\Dataflow; use CodeRhapsodie\DataflowBundle\DataflowType\Result; +use Psr\Log\LoggerAwareInterface; /** * Combines a reader, steps and writers as a data processing workflow. */ -interface DataflowInterface +interface DataflowInterface extends LoggerAwareInterface { /** * Processes the data. diff --git a/src/DataflowType/DataflowTypeInterface.php b/src/DataflowType/DataflowTypeInterface.php index d64b111..51989b1 100644 --- a/src/DataflowType/DataflowTypeInterface.php +++ b/src/DataflowType/DataflowTypeInterface.php @@ -4,7 +4,9 @@ namespace CodeRhapsodie\DataflowBundle\DataflowType; -interface DataflowTypeInterface +use Psr\Log\LoggerAwareInterface; + +interface DataflowTypeInterface extends LoggerAwareInterface { public function getLabel(): string; diff --git a/src/DataflowType/Result.php b/src/DataflowType/Result.php index f7cf5fe..d4ff8a9 100644 --- a/src/DataflowType/Result.php +++ b/src/DataflowType/Result.php @@ -11,18 +11,12 @@ class Result { private readonly \DateInterval $elapsed; - private int $errorCount = 0; + private int $successCount; - private int $successCount = 0; - - private readonly array $exceptions; - - public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private readonly \DateTimeInterface $endTime, private readonly int $totalProcessedCount, array $exceptions) + public function __construct(private readonly string $name, private readonly \DateTimeInterface $startTime, private readonly \DateTimeInterface $endTime, private readonly int $totalProcessedCount, private readonly int $errorCount) { $this->elapsed = $startTime->diff($endTime); - $this->errorCount = \count($exceptions); $this->successCount = $totalProcessedCount - $this->errorCount; - $this->exceptions = $exceptions; } public function getName(): string diff --git a/src/DataflowType/Writer/CollectionWriter.php b/src/DataflowType/Writer/CollectionWriter.php index 7dee448..297c4ed 100644 --- a/src/DataflowType/Writer/CollectionWriter.php +++ b/src/DataflowType/Writer/CollectionWriter.php @@ -18,12 +18,12 @@ public function __construct(private readonly WriterInterface $writer) { } - public function prepare() + public function prepare(): void { $this->writer->prepare(); } - public function write($collection) + public function write($collection): void { if (!is_iterable($collection)) { throw new UnsupportedItemTypeException(\sprintf('Item to write was expected to be an iterable, received %s.', get_debug_type($collection))); @@ -34,7 +34,7 @@ public function write($collection) } } - public function finish() + public function finish(): void { $this->writer->finish(); } diff --git a/src/DataflowType/Writer/DelegatorWriter.php b/src/DataflowType/Writer/DelegatorWriter.php index de2ada3..878384c 100644 --- a/src/DataflowType/Writer/DelegatorWriter.php +++ b/src/DataflowType/Writer/DelegatorWriter.php @@ -21,14 +21,14 @@ public function __construct() { } - public function prepare() + public function prepare(): void { foreach ($this->delegates as $delegate) { $delegate->prepare(); } } - public function write($item) + public function write($item): void { foreach ($this->delegates as $delegate) { if (!$delegate->supports($item)) { @@ -43,7 +43,7 @@ public function write($item) throw new UnsupportedItemTypeException(\sprintf('None of the registered delegate writers support the received item of type %s', get_debug_type($item))); } - public function finish() + public function finish(): void { foreach ($this->delegates as $delegate) { $delegate->finish(); diff --git a/src/DataflowType/Writer/PortWriterAdapter.php b/src/DataflowType/Writer/PortWriterAdapter.php index 32bfb58..5c96af7 100644 --- a/src/DataflowType/Writer/PortWriterAdapter.php +++ b/src/DataflowType/Writer/PortWriterAdapter.php @@ -10,17 +10,17 @@ public function __construct(private readonly \Port\Writer $writer) { } - public function prepare() + public function prepare(): void { $this->writer->prepare(); } - public function write($item) + public function write($item): void { $this->writer->writeItem((array) $item); } - public function finish() + public function finish(): void { $this->writer->finish(); } diff --git a/src/Entity/Job.php b/src/Entity/Job.php index 020f839..1558b3d 100644 --- a/src/Entity/Job.php +++ b/src/Entity/Job.php @@ -62,6 +62,11 @@ class Job private ?\DateTimeInterface $endTime = null; + /** + * @var resource|null + */ + private mixed $streamExceptions = null; + public static function createFromScheduledDataflow(ScheduledDataflow $scheduled): self { return (new self()) @@ -244,4 +249,22 @@ public function setEndTime(?\DateTimeInterface $endTime): self return $this; } + + /** + * @return resource|null + */ + public function getStreamExceptions() + { + return $this->streamExceptions; + } + + /** + * @param resource $streamExceptions + */ + public function setStreamExceptions($streamExceptions): self + { + $this->streamExceptions = $streamExceptions; + + return $this; + } } diff --git a/src/ExceptionsHandler/ExceptionHandlerInterface.php b/src/ExceptionsHandler/ExceptionHandlerInterface.php index bb1bfcc..278a855 100644 --- a/src/ExceptionsHandler/ExceptionHandlerInterface.php +++ b/src/ExceptionsHandler/ExceptionHandlerInterface.php @@ -6,9 +6,11 @@ interface ExceptionHandlerInterface { - public function save(?int $jobId, ?array $exceptions): void; + /** @param resource $exceptions */ + public function save(?int $jobId, $exceptions): void; - public function find(int $jobId): ?array; + /** @return resource|null */ + public function find(int $jobId); public function delete(int $jobId): void; } diff --git a/src/ExceptionsHandler/FilesystemExceptionHandler.php b/src/ExceptionsHandler/FilesystemExceptionHandler.php index 20ec200..53cc0c2 100644 --- a/src/ExceptionsHandler/FilesystemExceptionHandler.php +++ b/src/ExceptionsHandler/FilesystemExceptionHandler.php @@ -13,25 +13,49 @@ public function __construct(private readonly Filesystem $filesystem) { } - public function save(?int $jobId, ?array $exceptions): void + public function save(?int $jobId, $exceptions): void { - if ($jobId === null || empty($exceptions)) { + if ($jobId === null || stream_get_contents($exceptions, 1) === false) { return; } - $this->filesystem->write(\sprintf('dataflow-job-%s.log', $jobId), json_encode($exceptions)); + $path = \sprintf('dataflow-job-%s.log', $jobId); + rewind($exceptions); + + if ($this->filesystem->fileExists($path)) { + $existingStream = $this->filesystem->readStream($path); + + $combined = fopen('php://temp', 'r+'); + + stream_copy_to_stream($existingStream, $combined); + stream_copy_to_stream($exceptions, $combined); + + rewind($combined); + + $this->filesystem->delete($path); + $this->filesystem->writeStream($path, $combined); + + fclose($existingStream); + fclose($combined); + + return; + } + + $this->filesystem->writeStream($path, $exceptions); + + fclose($exceptions); } - public function find(int $jobId): ?array + public function find(int $jobId) { try { if (!$this->filesystem->fileExists(\sprintf('dataflow-job-%s.log', $jobId))) { - return []; + return null; } - return json_decode($this->filesystem->read(\sprintf('dataflow-job-%s.log', $jobId)), true); + return $this->filesystem->readStream(\sprintf('dataflow-job-%s.log', $jobId)); } catch (FilesystemException) { - return []; + return null; } } diff --git a/src/ExceptionsHandler/NullExceptionHandler.php b/src/ExceptionsHandler/NullExceptionHandler.php index bc398e5..04bf1ab 100644 --- a/src/ExceptionsHandler/NullExceptionHandler.php +++ b/src/ExceptionsHandler/NullExceptionHandler.php @@ -6,12 +6,12 @@ class NullExceptionHandler implements ExceptionHandlerInterface { - public function save(?int $jobId, ?array $exceptions): void + public function save(?int $jobId, $exceptions): void { // Nothing to do } - public function find(int $jobId): ?array + public function find(int $jobId) { return null; } diff --git a/src/Factory/ConnectionFactory.php b/src/Factory/ConnectionFactory.php index 6daae2b..3e9f6c3 100644 --- a/src/Factory/ConnectionFactory.php +++ b/src/Factory/ConnectionFactory.php @@ -17,7 +17,7 @@ public function __construct(private readonly Container $container, private strin { } - public function setConnectionName(string $connectionName) + public function setConnectionName(string $connectionName): void { $this->connectionName = $connectionName; } diff --git a/src/Gateway/JobGateway.php b/src/Gateway/JobGateway.php index 55ba631..0c88c17 100644 --- a/src/Gateway/JobGateway.php +++ b/src/Gateway/JobGateway.php @@ -19,16 +19,11 @@ public function find(int $jobId): ?Job { $job = $this->repository->find($jobId); - return $this->loadExceptions($job); + return $this->loadStreamExceptions($job); } public function save(Job $job): void { - if (!$this->exceptionHandler instanceof NullExceptionHandler) { - $this->exceptionHandler->save($job->getId(), $job->getExceptions()); - $job->setExceptions([]); - } - $this->repository->save($job); } @@ -36,17 +31,15 @@ public function findLastForDataflowId(int $scheduleId): ?Job { $job = $this->repository->findLastForDataflowId($scheduleId); - return $this->loadExceptions($job); + return $this->loadStreamExceptions($job); } - private function loadExceptions(?Job $job): ?Job + private function loadStreamExceptions(?Job $job): ?Job { if ($job === null || $this->exceptionHandler instanceof NullExceptionHandler) { return $job; } - $this->exceptionHandler->save($job->getId(), $job->getExceptions()); - - return $job->setExceptions($this->exceptionHandler->find($job->getId())); + return $job->setStreamExceptions($this->exceptionHandler->find($job->getId())); } } diff --git a/src/Logger/BufferHandler.php b/src/Logger/BufferHandler.php deleted file mode 100644 index cc6bc0a..0000000 --- a/src/Logger/BufferHandler.php +++ /dev/null @@ -1,35 +0,0 @@ -buffer; - $this->buffer = []; - - return $logs; - } - - protected function write(array|LogRecord $record): void - { - $this->buffer[] = $record['formatted']; - } - - protected function getDefaultFormatter(): FormatterInterface - { - return new LineFormatter(self::FORMAT); - } -} diff --git a/src/MessengerMode/JobMessageHandler.php b/src/MessengerMode/JobMessageHandler.php index 44d2a2a..132a6aa 100644 --- a/src/MessengerMode/JobMessageHandler.php +++ b/src/MessengerMode/JobMessageHandler.php @@ -15,7 +15,7 @@ public function __construct(private readonly JobRepository $repository, private { } - public function __invoke(JobMessage $message) + public function __invoke(JobMessage $message): void { $this->processor->process($this->repository->find($message->getJobId())); } diff --git a/src/Processor/JobProcessor.php b/src/Processor/JobProcessor.php index e7019db..ca31217 100644 --- a/src/Processor/JobProcessor.php +++ b/src/Processor/JobProcessor.php @@ -9,11 +9,14 @@ use CodeRhapsodie\DataflowBundle\Entity\Job; use CodeRhapsodie\DataflowBundle\Event\Events; use CodeRhapsodie\DataflowBundle\Event\ProcessingEvent; +use CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface; +use CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler; use CodeRhapsodie\DataflowBundle\Gateway\JobGateway; -use CodeRhapsodie\DataflowBundle\Logger\BufferHandler; use CodeRhapsodie\DataflowBundle\Logger\DelegatingLogger; use CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface; use CodeRhapsodie\DataflowBundle\Repository\JobRepository; +use Monolog\Formatter\LineFormatter; +use Monolog\Handler\StreamHandler; use Monolog\Logger; use Psr\Log\LoggerAwareInterface; use Psr\Log\LoggerAwareTrait; @@ -22,12 +25,14 @@ class JobProcessor implements JobProcessorInterface, LoggerAwareInterface { use LoggerAwareTrait; + private const FORMAT = "[%datetime%] %level_name% when processing item %context.index%: %message% %context% %extra%\n"; public function __construct( private JobRepository $repository, private DataflowTypeRegistryInterface $registry, private EventDispatcherInterface $dispatcher, private JobGateway $jobGateway, + private ExceptionHandlerInterface $exceptionHandler, ) { } @@ -40,24 +45,19 @@ public function process(Job $job): void $dataflowType->setRepository($this->repository); } - $loggers = [new Logger('dataflow_internal', [$bufferHandler = new BufferHandler()])]; + $handler = new StreamHandler(tempnam(sys_get_temp_dir(), 'dataflow_')); + $handler->setFormatter(new LineFormatter(self::FORMAT)); + + $loggers = [new Logger('dataflow_internal', [$bufferHandler = $handler])]; if (isset($this->logger)) { $loggers[] = $this->logger; } $logger = new DelegatingLogger($loggers); - if ($dataflowType instanceof LoggerAwareInterface) { - $dataflowType->setLogger($logger); - } + $dataflowType->setLogger($logger); $result = $dataflowType->process($job->getOptions(), $job->getId()); - if (!$dataflowType instanceof LoggerAwareInterface) { - foreach ($result->getExceptions() as $index => $e) { - $logger->error($e, ['index' => $index]); - } - } - $this->afterProcessing($job, $result, $bufferHandler); } @@ -72,15 +72,22 @@ private function beforeProcessing(Job $job): void $this->jobGateway->save($job); } - private function afterProcessing(Job $job, Result $result, BufferHandler $bufferLogger): void + private function afterProcessing(Job $job, Result $result, StreamHandler $streamHandler): void { $job ->setEndTime($result->getEndTime()) ->setStatus(Job::STATUS_COMPLETED) ->setCount($result->getSuccessCount()) - ->setExceptions($bufferLogger->clearBuffer()) ; + if (!$this->exceptionHandler instanceof NullExceptionHandler) { + $this->exceptionHandler->save($job->getId(), $streamHandler->getStream()); + $job->setStreamExceptions($streamHandler->getStream()); + } else { + $job->setExceptions(json_decode(stream_get_contents($streamHandler->getStream()), true)); + $streamHandler->reset(); + } + $this->jobGateway->save($job); $this->dispatcher->dispatch(new ProcessingEvent($job), Events::AFTER_PROCESSING); diff --git a/src/Repository/JobRepository.php b/src/Repository/JobRepository.php index 1683243..59bb4fa 100644 --- a/src/Repository/JobRepository.php +++ b/src/Repository/JobRepository.php @@ -116,7 +116,7 @@ public function findForScheduled(int $id): iterable } } - public function save(Job $job) + public function save(Job $job): void { $datas = $job->toArray(); unset($datas['id']); diff --git a/src/Repository/ScheduledDataflowRepository.php b/src/Repository/ScheduledDataflowRepository.php index dd9d1e3..4e3c219 100644 --- a/src/Repository/ScheduledDataflowRepository.php +++ b/src/Repository/ScheduledDataflowRepository.php @@ -85,7 +85,7 @@ public function listAllOrderedByLabel(): array return $query->executeQuery()->fetchAllAssociative(); } - public function save(ScheduledDataflow $scheduledDataflow) + public function save(ScheduledDataflow $scheduledDataflow): void { $datas = $scheduledDataflow->toArray(); unset($datas['id']); diff --git a/src/Resources/config/services.yaml b/src/Resources/config/services.yaml index 83af479..2352ab7 100644 --- a/src/Resources/config/services.yaml +++ b/src/Resources/config/services.yaml @@ -94,6 +94,7 @@ services: $registry: '@CodeRhapsodie\DataflowBundle\Registry\DataflowTypeRegistryInterface' $dispatcher: '@event_dispatcher' $jobGateway: '@CodeRhapsodie\DataflowBundle\Gateway\JobGateway' + $exceptionHandler: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface' CodeRhapsodie\DataflowBundle\ExceptionsHandler\ExceptionHandlerInterface: '@CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler' CodeRhapsodie\DataflowBundle\ExceptionsHandler\NullExceptionHandler: ~ CodeRhapsodie\DataflowBundle\Gateway\JobGateway: diff --git a/src/Validator/Constraints/FrequencyValidator.php b/src/Validator/Constraints/FrequencyValidator.php index 4a03814..cfbcddf 100644 --- a/src/Validator/Constraints/FrequencyValidator.php +++ b/src/Validator/Constraints/FrequencyValidator.php @@ -10,7 +10,7 @@ class FrequencyValidator extends ConstraintValidator { - public function validate(mixed $value, Constraint $constraint) + public function validate(mixed $value, Constraint $constraint): void { if (!$constraint instanceof Frequency) { throw new UnexpectedTypeException($constraint, Frequency::class);