Skip to content

Commit 62ca364

Browse files
nficanocursoragent
andcommitted
fix: single-source dispatch routing and bounded client subscription buffer (#104 #108)
- Dispatcher::routeLifecycle/routeWork derive "handled" from a single match via a $handled flag instead of duplicating the type matrix in a parallel || chain, so a new message type cannot silently fall through. - ResponseRouter caps per-subscription pending events for not-yet-registered subscriptions (dropping with a warning past the cap) and clears the buffer on unregisterSubscriber, so a racy/bogus subscription id cannot grow unbounded. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 9dd0c15 commit 62ca364

2 files changed

Lines changed: 22 additions & 18 deletions

File tree

src/Internal/Client/ResponseRouter.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@
2828
*/
2929
final class ResponseRouter
3030
{
31+
/**
32+
* Upper bound on events buffered for a not-yet-registered subscription,
33+
* so a never-registered (racy or bogus) subscription id cannot grow the
34+
* buffer without limit.
35+
*/
36+
private const int MAX_PENDING_SUBSCRIPTION_EVENTS = 1024;
37+
3138
/** @var array<string, \Closure(Envelope): void> */
3239
private array $subscribers = [];
3340

@@ -52,7 +59,8 @@ public function registerSubscriber(SubscriptionId $id, \Closure $onEvent): void
5259

5360
public function unregisterSubscriber(SubscriptionId $id): void
5461
{
55-
unset($this->subscribers[(string) $id]);
62+
$key = (string) $id;
63+
unset($this->subscribers[$key], $this->pendingSubscriptionEvents[$key]);
5664
}
5765

5866
public function handle(Envelope $env): void
@@ -103,6 +111,13 @@ private function routeSubscribeEvent(Envelope $env, SubscribeEvent $msg): void
103111
$this->invokeSubscriber($subscriber, $inner);
104112
return;
105113
}
114+
if (\count($this->pendingSubscriptionEvents[$key] ?? []) >= self::MAX_PENDING_SUBSCRIPTION_EVENTS) {
115+
$this->deps->logger->warning(
116+
'dropping subscription event; pending buffer full for unregistered subscription',
117+
['subscription_id' => $key, 'cap' => self::MAX_PENDING_SUBSCRIPTION_EVENTS],
118+
);
119+
return;
120+
}
106121
$this->pendingSubscriptionEvents[$key][] = $inner;
107122
}
108123

src/Internal/Runtime/Dispatcher.php

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ private function dispatch(Session $session, Envelope $env): void
143143

144144
private function routeLifecycle(Session $session, Envelope $env, MessageType $msg): bool
145145
{
146+
$handled = true;
146147
match (true) {
147148
$msg instanceof Ping => $this->lifecycle->handlePing($session, $env, $msg),
148149
$msg instanceof Pong, $msg instanceof Ack => null,
@@ -152,16 +153,9 @@ private function routeLifecycle(Session $session, Envelope $env, MessageType $ms
152153
$msg instanceof Resume => $this->lifecycle->handleResume($session, $env, $msg),
153154
$msg instanceof LeaseRefresh
154155
=> $this->lifecycle->handleLeaseRefresh($session, $env, $msg),
155-
default => null,
156+
default => $handled = false,
156157
};
157-
return $msg instanceof Ping
158-
|| $msg instanceof Pong
159-
|| $msg instanceof Ack
160-
|| $msg instanceof SessionClose
161-
|| $msg instanceof Cancel
162-
|| $msg instanceof Interrupt
163-
|| $msg instanceof Resume
164-
|| $msg instanceof LeaseRefresh;
158+
return $handled;
165159
}
166160

167161
private function routeWork(Session $session, Envelope $env, MessageType $msg): bool
@@ -176,6 +170,7 @@ private function routeWork(Session $session, Envelope $env, MessageType $msg): b
176170
$this->lifecycle->nack($session, $env, 'UNIMPLEMENTED', 'subscribe not negotiated');
177171
return true;
178172
}
173+
$handled = true;
179174
match (true) {
180175
$msg instanceof ToolInvoke => $this->toolInvocation->handle($session, $env, $msg),
181176
$msg instanceof ListJobs => $this->jobList->handle($session, $env, $msg),
@@ -184,15 +179,9 @@ private function routeWork(Session $session, Envelope $env, MessageType $msg): b
184179
$msg instanceof ArtifactPut => $this->artifacts->put($session, $env, $msg),
185180
$msg instanceof ArtifactFetch => $this->artifacts->fetch($session, $env, $msg),
186181
$msg instanceof ArtifactRelease => $this->artifacts->release($session, $env, $msg),
187-
default => null,
182+
default => $handled = false,
188183
};
189-
return $msg instanceof ToolInvoke
190-
|| $msg instanceof ListJobs
191-
|| $msg instanceof Subscribe
192-
|| $msg instanceof Unsubscribe
193-
|| $msg instanceof ArtifactPut
194-
|| $msg instanceof ArtifactFetch
195-
|| $msg instanceof ArtifactRelease;
184+
return $handled;
196185
}
197186

198187
private function featureEnabled(Session $session, string $feature): bool

0 commit comments

Comments
 (0)