Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Utopia\Servers\Hook;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\Telemetry\Gauge;
use Utopia\Telemetry\Histogram;
use Utopia\Validator;

Expand Down Expand Up @@ -67,6 +68,7 @@ class Server

private Histogram $jobWaitTime;
private Histogram $processDuration;
private Gauge $queueDepth;

/**
* Creates an instance of a Queue server.
Expand Down Expand Up @@ -158,6 +160,30 @@ public function setTelemetry(Telemetry $telemetry): void
],
],
);

$this->queueDepth = $telemetry->createGauge(
'messaging.queue.depth',
'{message}',
'Number of pending messages in the queue.',
);
}

private function recordQueueDepth(): void
{
if (!$this->adapter->consumer instanceof Publisher) {
return;
}

try {
$this->queueDepth->record(
$this->adapter->consumer->getQueueSize($this->adapter->queue),
[
'messaging.destination.name' => $this->adapter->queue->name,
'messaging.destination.namespace' => $this->adapter->queue->namespace,
],
);
} catch (Throwable) {
}
}

/**
Expand Down Expand Up @@ -216,6 +242,8 @@ public function start(): self
$hook->getAction()(...$this->getArguments($this->getContainer(), $hook));
}

$this->recordQueueDepth();

$this->adapter->consumer->consume(
$this->adapter->queue,
function (Message $message) {
Expand Down Expand Up @@ -269,6 +297,7 @@ function (Message $message) {
$processDuration =
microtime(true) - $receivedAtTimestamp;
$this->processDuration->record($processDuration);
$this->recordQueueDepth();
}
},
function (Message $message) {
Expand Down
196 changes: 196 additions & 0 deletions tests/Queue/E2E/Adapter/ServerTelemetryTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
<?php

namespace Tests\E2E\Adapter;

use PHPUnit\Framework\TestCase;
use Utopia\Queue\Adapter;
use Utopia\Queue\Consumer;
use Utopia\Queue\Message;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
use Utopia\Queue\Server;
use Utopia\Telemetry\Adapter\Test as TestTelemetry;

class ServerTelemetryTest extends TestCase
{
public function testRecordsQueueDepth(): void
{
$consumer = new ServerTelemetryPublisherConsumer([3, 2]);
$adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite');
$telemetry = new TestTelemetry();

$server = new Server($adapter);
$server->setTelemetry($telemetry);
$server
->job()
->inject('message')
->action(fn (Message $message) => null);

$server->start();

$this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges);
/** @var object{values: array<int, float|int>} $queueDepth */
$queueDepth = $telemetry->gauges['messaging.queue.depth'];
$this->assertObjectHasProperty('values', $queueDepth);
$this->assertSame([3, 2], $queueDepth->values);
}

public function testSkipsQueueDepthWhenConsumerCannotReportSize(): void
{
$consumer = new ServerTelemetryConsumer();
$adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite');
$telemetry = new TestTelemetry();

$server = new Server($adapter);
$server->setTelemetry($telemetry);
$server
->job()
->inject('message')
->action(fn (Message $message) => null);

$server->start();

$this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges);
/** @var object{values: array<int, float|int>} $queueDepth */
$queueDepth = $telemetry->gauges['messaging.queue.depth'];
$this->assertObjectHasProperty('values', $queueDepth);
$this->assertSame([], $queueDepth->values);
}

public function testSkipsQueueDepthWhenConsumerCannotReadSize(): void
{
$consumer = new ServerTelemetryFailingPublisherConsumer();
$adapter = new ServerTelemetryAdapter($consumer, 1, 'emails', 'appwrite');
$telemetry = new TestTelemetry();

$server = new Server($adapter);
$server->setTelemetry($telemetry);
$server
->job()
->inject('message')
->action(fn (Message $message) => null);

$server->start();

$this->assertArrayHasKey('messaging.queue.depth', $telemetry->gauges);
/** @var object{values: array<int, float|int>} $queueDepth */
$queueDepth = $telemetry->gauges['messaging.queue.depth'];
$this->assertObjectHasProperty('values', $queueDepth);
$this->assertSame([], $queueDepth->values);
$this->assertArrayNotHasKey('messaging.queue.depth.errors', $telemetry->counters);
}
}

final class ServerTelemetryAdapter extends Adapter
{
/**
* @var callable[]
*/
private array $onWorkerStart = [];

/**
* @var callable[]
*/
private array $onWorkerStop = [];

public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
{
parent::__construct($workerNum, $queue, $namespace);
$this->consumer = $consumer;
}

public function start(): self
{
foreach ($this->onWorkerStart as $callback) {
$callback('0');
}

foreach ($this->onWorkerStop as $callback) {
$callback('0');
}

return $this;
}

public function stop(): self
{
return $this;
}

public function workerStart(callable $callback): self
{
$this->onWorkerStart[] = $callback;
return $this;
}

public function workerStop(callable $callback): self
{
$this->onWorkerStop[] = $callback;
return $this;
}
}

class ServerTelemetryConsumer implements Consumer
{
public function consume(
Queue $queue,
callable $messageCallback,
callable $successCallback,
callable $errorCallback
): void {
$message = new Message([
'pid' => 'test-pid',
'queue' => $queue->name,
'timestamp' => time() - 1,
'payload' => [],
]);

$messageCallback($message);
$successCallback($message);
}

public function close(): void
{
}
}

final class ServerTelemetryPublisherConsumer extends ServerTelemetryConsumer implements Publisher
{
/**
* @param int[] $queueSizes
*/
public function __construct(private array $queueSizes)
{
}

public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
{
return true;
}

public function retry(Queue $queue, ?int $limit = null): void
{
}

public function getQueueSize(Queue $queue, bool $failedJobs = false): int
{
return array_shift($this->queueSizes) ?? 0;
}
}

final class ServerTelemetryFailingPublisherConsumer extends ServerTelemetryConsumer implements Publisher
{
public function enqueue(Queue $queue, array $payload, bool $priority = false): bool
{
return true;
}

public function retry(Queue $queue, ?int $limit = null): void
{
}

public function getQueueSize(Queue $queue, bool $failedJobs = false): int
{
throw new \RuntimeException('Queue size unavailable.');
}
}
Loading