diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 347c56a..c329c2b 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,6 +8,7 @@ use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; +use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; use Utopia\Validator; @@ -67,6 +68,7 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; + private Gauge $queueDepth; /** * Creates an instance of a Queue server. @@ -158,6 +160,30 @@ public function setTelemetry(Telemetry $telemetry): void ], ], ); + + $this->queueDepth = $telemetry->createGauge( + 'messaging.queue.depth', + '{message}', + 'Number of pending messages in the queue.', + ); + } + + private function recordQueueDepth(): void + { + if (!$this->adapter->consumer instanceof Publisher) { + return; + } + + try { + $this->queueDepth->record( + $this->adapter->consumer->getQueueSize($this->adapter->queue), + [ + 'messaging.destination.name' => $this->adapter->queue->name, + 'messaging.destination.namespace' => $this->adapter->queue->namespace, + ], + ); + } catch (Throwable) { + } } /** @@ -216,6 +242,8 @@ public function start(): self $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } + $this->recordQueueDepth(); + $this->adapter->consumer->consume( $this->adapter->queue, function (Message $message) { @@ -269,6 +297,7 @@ function (Message $message) { $processDuration = microtime(true) - $receivedAtTimestamp; $this->processDuration->record($processDuration); + $this->recordQueueDepth(); } }, function (Message $message) { diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php new file mode 100644 index 0000000..0cb27ae --- /dev/null +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -0,0 +1,196 @@ +setTelemetry($telemetry); + $server + ->job() + ->inject('message') + ->action(fn (Message $message) => null); + + $server->start(); + + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); + /** @var object{values: array} $queueDepth */ + $queueDepth = $telemetry->gauges['messaging.queue.depth']; + $this->assertObjectHasProperty('values', $queueDepth); + $this->assertSame([3, 2], $queueDepth->values); + } + + public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void + { + $consumer = new ServerTelemetryConsumer(); + $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); + $telemetry = new TestTelemetry(); + + $server = new Server($adapter); + $server->setTelemetry($telemetry); + $server + ->job() + ->inject('message') + ->action(fn (Message $message) => null); + + $server->start(); + + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); + /** @var object{values: array} $queueDepth */ + $queueDepth = $telemetry->gauges['messaging.queue.depth']; + $this->assertObjectHasProperty('values', $queueDepth); + $this->assertSame([], $queueDepth->values); + } + + public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void + { + $consumer = new ServerTelemetryFailingPublisherConsumer(); + $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); + $telemetry = new TestTelemetry(); + + $server = new Server($adapter); + $server->setTelemetry($telemetry); + $server + ->job() + ->inject('message') + ->action(fn (Message $message) => null); + + $server->start(); + + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); + /** @var object{values: array} $queueDepth */ + $queueDepth = $telemetry->gauges['messaging.queue.depth']; + $this->assertObjectHasProperty('values', $queueDepth); + $this->assertSame([], $queueDepth->values); + $this->assertArrayNotHasKey('messaging.queue.depth.errors', $telemetry->counters); + } +} + +final class ServerTelemetryAdapter extends Adapter +{ + /** + * @var callable[] + */ + private array $onWorkerStart = []; + + /** + * @var callable[] + */ + private array $onWorkerStop = []; + + public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') + { + parent::__construct($workerNum, $queue, $namespace); + $this->consumer = $consumer; + } + + public function start(): self + { + foreach ($this->onWorkerStart as $callback) { + $callback('0'); + } + + foreach ($this->onWorkerStop as $callback) { + $callback('0'); + } + + return $this; + } + + public function stop(): self + { + return $this; + } + + public function workerStart(callable $callback): self + { + $this->onWorkerStart[] = $callback; + return $this; + } + + public function workerStop(callable $callback): self + { + $this->onWorkerStop[] = $callback; + return $this; + } +} + +class ServerTelemetryConsumer implements Consumer +{ + public function consume( + Queue $queue, + callable $messageCallback, + callable $successCallback, + callable $errorCallback + ): void { + $message = new Message([ + 'pid' => 'test-pid', + 'queue' => $queue->name, + 'timestamp' => time() - 1, + 'payload' => [], + ]); + + $messageCallback($message); + $successCallback($message); + } + + public function close(): void + { + } +} + +final class ServerTelemetryPublisherConsumer extends ServerTelemetryConsumer implements Publisher +{ + /** + * @param int[] $queueSizes + */ + public function __construct(private array $queueSizes) + { + } + + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool + { + return true; + } + + public function retry(Queue $queue, ?int $limit = null): void + { + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + return array_shift($this->queueSizes) ?? 0; + } +} + +final class ServerTelemetryFailingPublisherConsumer extends ServerTelemetryConsumer implements Publisher +{ + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool + { + return true; + } + + public function retry(Queue $queue, ?int $limit = null): void + { + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + throw new \RuntimeException('Queue size unavailable.'); + } +}