From 1977bc3183dcbadbab6df53992fe56935bf0f0d4 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Mon, 11 May 2026 17:05:10 +0530 Subject: [PATCH 1/3] Add queue depth telemetry gauge --- src/Queue/Server.php | 29 ++++ .../Queue/E2E/Adapter/ServerTelemetryTest.php | 156 ++++++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 tests/Queue/E2E/Adapter/ServerTelemetryTest.php diff --git a/src/Queue/Server.php b/src/Queue/Server.php index 347c56a..c329c2b 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,6 +8,7 @@ use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; +use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; use Utopia\Validator; @@ -67,6 +68,7 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; + private Gauge $queueDepth; /** * Creates an instance of a Queue server. @@ -158,6 +160,30 @@ public function setTelemetry(Telemetry $telemetry): void ], ], ); + + $this->queueDepth = $telemetry->createGauge( + 'messaging.queue.depth', + '{message}', + 'Number of pending messages in the queue.', + ); + } + + private function recordQueueDepth(): void + { + if (!$this->adapter->consumer instanceof Publisher) { + return; + } + + try { + $this->queueDepth->record( + $this->adapter->consumer->getQueueSize($this->adapter->queue), + [ + 'messaging.destination.name' => $this->adapter->queue->name, + 'messaging.destination.namespace' => $this->adapter->queue->namespace, + ], + ); + } catch (Throwable) { + } } /** @@ -216,6 +242,8 @@ public function start(): self $hook->getAction()(...$this->getArguments($this->getContainer(), $hook)); } + $this->recordQueueDepth(); + $this->adapter->consumer->consume( $this->adapter->queue, function (Message $message) { @@ -269,6 +297,7 @@ function (Message $message) { $processDuration = microtime(true) - $receivedAtTimestamp; $this->processDuration->record($processDuration); + $this->recordQueueDepth(); } }, function (Message $message) { diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php new file mode 100644 index 0000000..eda39d8 --- /dev/null +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -0,0 +1,156 @@ +setTelemetry($telemetry); + $server + ->job() + ->inject('message') + ->action(fn (Message $message) => null); + + $server->start(); + + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); + /** @var object{values: array} $queueDepth */ + $queueDepth = $telemetry->gauges['messaging.queue.depth']; + $this->assertObjectHasProperty('values', $queueDepth); + $this->assertSame([3, 2], $queueDepth->values); + } + + public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void + { + $consumer = new ServerTelemetryConsumer(); + $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); + $telemetry = new TestTelemetry(); + + $server = new Server($adapter); + $server->setTelemetry($telemetry); + $server + ->job() + ->inject('message') + ->action(fn (Message $message) => null); + + $server->start(); + + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); + /** @var object{values: array} $queueDepth */ + $queueDepth = $telemetry->gauges['messaging.queue.depth']; + $this->assertObjectHasProperty('values', $queueDepth); + $this->assertSame([], $queueDepth->values); + } +} + +final class ServerTelemetryAdapter extends Adapter +{ + /** + * @var callable[] + */ + private array $onWorkerStart = []; + + /** + * @var callable[] + */ + private array $onWorkerStop = []; + + public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue') + { + parent::__construct($workerNum, $queue, $namespace); + $this->consumer = $consumer; + } + + public function start(): self + { + foreach ($this->onWorkerStart as $callback) { + $callback('0'); + } + + foreach ($this->onWorkerStop as $callback) { + $callback('0'); + } + + return $this; + } + + public function stop(): self + { + return $this; + } + + public function workerStart(callable $callback): self + { + $this->onWorkerStart[] = $callback; + return $this; + } + + public function workerStop(callable $callback): self + { + $this->onWorkerStop[] = $callback; + return $this; + } +} + +class ServerTelemetryConsumer implements Consumer +{ + public function consume( + Queue $queue, + callable $messageCallback, + callable $successCallback, + callable $errorCallback + ): void { + $message = new Message([ + 'pid' => 'test-pid', + 'queue' => $queue->name, + 'timestamp' => time() - 1, + 'payload' => [], + ]); + + $messageCallback($message); + $successCallback($message); + } + + public function close(): void + { + } +} + +final class ServerTelemetryPublisherConsumer extends ServerTelemetryConsumer implements Publisher +{ + /** + * @param int[] $queueSizes + */ + public function __construct(private array $queueSizes) + { + } + + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool + { + return true; + } + + public function retry(Queue $queue, ?int $limit = null): void + { + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + return array_shift($this->queueSizes) ?? 0; + } +} From 025f1e413b9ac4f7206109613d3224806d37a842 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Mon, 11 May 2026 17:17:00 +0530 Subject: [PATCH 2/3] Surface queue depth telemetry errors --- src/Queue/Server.php | 15 ++++++- .../Queue/E2E/Adapter/ServerTelemetryTest.php | 39 +++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/Queue/Server.php b/src/Queue/Server.php index c329c2b..e5e399b 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,6 +8,7 @@ use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; +use Utopia\Telemetry\Counter; use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; use Utopia\Validator; @@ -69,6 +70,7 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; private Gauge $queueDepth; + private Counter $queueDepthErrors; /** * Creates an instance of a Queue server. @@ -166,6 +168,12 @@ public function setTelemetry(Telemetry $telemetry): void '{message}', 'Number of pending messages in the queue.', ); + + $this->queueDepthErrors = $telemetry->createCounter( + 'messaging.queue.depth.errors', + '{error}', + 'Number of failed attempts to record queue depth.', + ); } private function recordQueueDepth(): void @@ -182,7 +190,12 @@ private function recordQueueDepth(): void 'messaging.destination.namespace' => $this->adapter->queue->namespace, ], ); - } catch (Throwable) { + } catch (Throwable $error) { + $this->queueDepthErrors->add(1, [ + 'messaging.destination.name' => $this->adapter->queue->name, + 'messaging.destination.namespace' => $this->adapter->queue->namespace, + 'error.type' => $error::class, + ]); } } diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index eda39d8..41cf985 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -56,6 +56,28 @@ public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void $this->assertObjectHasProperty('values', $queueDepth); $this->assertSame([], $queueDepth->values); } + + public function testRecordsQueueDepthErrors(): void + { + $consumer = new ServerTelemetryFailingPublisherConsumer(); + $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); + $telemetry = new TestTelemetry(); + + $server = new Server($adapter); + $server->setTelemetry($telemetry); + $server + ->job() + ->inject('message') + ->action(fn (Message $message) => null); + + $server->start(); + + $this->assertArrayHasKey('messaging.queue.depth.errors', $telemetry->counters); + /** @var object{values: array} $queueDepthErrors */ + $queueDepthErrors = $telemetry->counters['messaging.queue.depth.errors']; + $this->assertObjectHasProperty('values', $queueDepthErrors); + $this->assertSame([1, 1], $queueDepthErrors->values); + } } final class ServerTelemetryAdapter extends Adapter @@ -154,3 +176,20 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int return array_shift($this->queueSizes) ?? 0; } } + +final class ServerTelemetryFailingPublisherConsumer extends ServerTelemetryConsumer implements Publisher +{ + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool + { + return true; + } + + public function retry(Queue $queue, ?int $limit = null): void + { + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + throw new \RuntimeException('Queue size unavailable.'); + } +} From 7e6b977a7bc9ea357cc99d7a5dec3cf65b49fa01 Mon Sep 17 00:00:00 2001 From: Chirag Aggarwal Date: Tue, 12 May 2026 10:51:37 +0530 Subject: [PATCH 3/3] Simplify queue depth error telemetry --- src/Queue/Server.php | 15 +-------------- tests/Queue/E2E/Adapter/ServerTelemetryTest.php | 13 +++++++------ 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/src/Queue/Server.php b/src/Queue/Server.php index e5e399b..c329c2b 100644 --- a/src/Queue/Server.php +++ b/src/Queue/Server.php @@ -8,7 +8,6 @@ use Utopia\Servers\Hook; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; -use Utopia\Telemetry\Counter; use Utopia\Telemetry\Gauge; use Utopia\Telemetry\Histogram; use Utopia\Validator; @@ -70,7 +69,6 @@ class Server private Histogram $jobWaitTime; private Histogram $processDuration; private Gauge $queueDepth; - private Counter $queueDepthErrors; /** * Creates an instance of a Queue server. @@ -168,12 +166,6 @@ public function setTelemetry(Telemetry $telemetry): void '{message}', 'Number of pending messages in the queue.', ); - - $this->queueDepthErrors = $telemetry->createCounter( - 'messaging.queue.depth.errors', - '{error}', - 'Number of failed attempts to record queue depth.', - ); } private function recordQueueDepth(): void @@ -190,12 +182,7 @@ private function recordQueueDepth(): void 'messaging.destination.namespace' => $this->adapter->queue->namespace, ], ); - } catch (Throwable $error) { - $this->queueDepthErrors->add(1, [ - 'messaging.destination.name' => $this->adapter->queue->name, - 'messaging.destination.namespace' => $this->adapter->queue->namespace, - 'error.type' => $error::class, - ]); + } catch (Throwable) { } } diff --git a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php index 41cf985..0cb27ae 100644 --- a/tests/Queue/E2E/Adapter/ServerTelemetryTest.php +++ b/tests/Queue/E2E/Adapter/ServerTelemetryTest.php @@ -57,7 +57,7 @@ public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void $this->assertSame([], $queueDepth->values); } - public function testRecordsQueueDepthErrors(): void + public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void { $consumer = new ServerTelemetryFailingPublisherConsumer(); $adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite'); @@ -72,11 +72,12 @@ public function testRecordsQueueDepthErrors(): void $server->start(); - $this->assertArrayHasKey('messaging.queue.depth.errors', $telemetry->counters); - /** @var object{values: array} $queueDepthErrors */ - $queueDepthErrors = $telemetry->counters['messaging.queue.depth.errors']; - $this->assertObjectHasProperty('values', $queueDepthErrors); - $this->assertSame([1, 1], $queueDepthErrors->values); + $this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges); + /** @var object{values: array} $queueDepth */ + $queueDepth = $telemetry->gauges['messaging.queue.depth']; + $this->assertObjectHasProperty('values', $queueDepth); + $this->assertSame([], $queueDepth->values); + $this->assertArrayNotHasKey('messaging.queue.depth.errors', $telemetry->counters); } }