diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a5c88c2..56f0fd3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -95,7 +95,7 @@ jobs: # Non-blocking: a Codecov outage must not break CI. - name: Upload coverage to Codecov # codecov/codecov-action v6.0.1 - uses: codecov/codecov-action@e79a6962e0d4c0c17b229090214935d2e33f8354 # v6.0.1 + uses: codecov/codecov-action@fb8b3582c8e4def4969c97caa2f19720cb33a72f # v7.0.0 with: fail_ci_if_error: false flags: unittests diff --git a/CONFORMANCE.md b/CONFORMANCE.md index f6fb838..8671dde 100644 --- a/CONFORMANCE.md +++ b/CONFORMANCE.md @@ -69,7 +69,7 @@ Status legend: Implemented ✅ · Partial 🚧 · Not implemented ⛔. | ---- | ----------- | ------ | ----- | | §10 | `delegate` event kind on parent's `job.event` stream | ✅ | `JobContext.DelegateAsync` | | §11 | `trace_id` propagation; OTel span attrs | ✅ | `TraceAttributes`, `ArcpTracing.WithTracing` | -| §11 (v1.1) | Span attrs `arcp.lease.expires_at`, `arcp.budget.remaining` | ✅ | `TraceAttributes` | +| §11 (v1.1) | Span attrs `arcp.lease.expires_at`, `arcp.budget.remaining` | ❌ | Not emitted by `ArcpTracing` | | §12 | 15 canonical error codes with retryable booleans | ✅ | `ErrorCode.All`, `ErrorCode.IsRetryable` | ## Test cross-reference diff --git a/Directory.Packages.props b/Directory.Packages.props index e0700c6..c394adc 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -8,7 +8,7 @@ - + @@ -23,7 +23,7 @@ - + diff --git a/README.md b/README.md index e265558..45f2644 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ await using var client = await ArcpClient.ConnectAsync(transport, new ArcpClient var sessionId = client.SessionId; var resumeToken = client.ResumeToken; -var effective = client.EffectiveFeatures; // intersection of client/runtime hello.features +var effective = client.EffectiveFeatures; // intersection of hello.features and welcome.features // ... transport drops; track the last seq your reader observed ... var lastSeq = client.LastReceivedSeq; diff --git a/docs/architecture.md b/docs/architecture.md index deb13bd..01417ab 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -9,7 +9,7 @@ | `Arcp.Runtime` | `ArcpServer`, `JobManager`, `LeaseManager`, `SessionState` — the side that runs them. | | `Arcp.AspNetCore` | Mounts a runtime on Kestrel via `IEndpointRouteBuilder.MapArcp("/arcp")`. | | `Arcp.Otel` | Wraps `ITransport` with `ActivitySource`-based OTel instrumentation. | -| `Arcp.Hosting` | Registers a runtime in `IHostedService` for non-ASP.NET workers. | +| `Arcp.Hosting` | Registers `ArcpServer` in DI via `AddArcpRuntime` for non-ASP.NET workers. | | `Arcp.Cli` | `arcp serve` / `arcp submit` / `arcp version` executable. | | `Arcp` | Umbrella meta-package — `dotnet add package Arcp` pulls Core + Client + Runtime. | @@ -37,12 +37,12 @@ Every message is a JSON object envelope: Unknown top-level fields are preserved verbatim in `Envelope.Extensions` (`Dictionary`), so -vendor-extension hints round-trip without loss (spec §5.1). +vendor-extension hints round-trip without loss (spec §5). ## Versioning The SDK follows SemVer strictly. The `arcp` wire version field -(`"1.1"`) is fixed in `Arcp.Core.WireVersion.Current`. Adding a +defaults to `"1.1"` on `Envelope.Arcp` (also exposed as `Arcp.ArcpInfo.ProtocolVersion`). Adding a public member is a minor bump; changing a signature is a major bump. One minor deprecation cycle (`[Obsolete]`) before removal. See the [style guide](./style-guide.md#14-versioning--compatibility). diff --git a/docs/conformance.md b/docs/conformance.md index 2895283..c0e60e2 100644 --- a/docs/conformance.md +++ b/docs/conformance.md @@ -60,7 +60,7 @@ Opt out of features on either peer: ```csharp new ArcpClientOptions { - Features = new FeatureSet(["heartbeat", "ack"]), // drop the rest + Features = new[] { FeatureFlags.Heartbeat, FeatureFlags.Ack }, // drop the rest }; ``` diff --git a/docs/guides/errors.md b/docs/guides/errors.md index 22a02b1..315ecf8 100644 --- a/docs/guides/errors.md +++ b/docs/guides/errors.md @@ -90,7 +90,7 @@ Agents can produce errors by throwing: ```csharp server.RegisterAgent("strict", async (ctx, ct) => { - if (!ctx.Lease.Contains(LeaseNamespaces.FsRead)) + if (ctx.Lease.Get(LeaseNamespaces.FsRead).Count == 0) throw new PermissionDeniedException("fs.read required"); // ... diff --git a/docs/guides/job-events.md b/docs/guides/job-events.md index a78e5d3..c345588 100644 --- a/docs/guides/job-events.md +++ b/docs/guides/job-events.md @@ -27,11 +27,11 @@ server.RegisterAgent("researcher", async (ctx, ct) => await ctx.StatusAsync("starting", "Fetching data...", ct); await ctx.ToolCallAsync("fetch", callId: "c1", - args: new { url = "https://api.example.com/data" }, ct); + args: new { url = "https://api.example.com/data" }, cancellationToken: ct); var data = /* ... */ ""; - await ctx.ToolResultAsync("c1", result: data, ct); + await ctx.ToolResultAsync("c1", result: data, cancellationToken: ct); - await ctx.ProgressAsync(current: 1, total: 3, message: "fetched", ct); + await ctx.ProgressAsync(current: 1, total: 3, message: "fetched", cancellationToken: ct); await ctx.LogAsync("info", "Processing ...", ct); await ctx.MetricAsync("cost.inference", 0.012, unit: "USD", cancellationToken: ct); @@ -40,7 +40,7 @@ server.RegisterAgent("researcher", async (ctx, ct) => uri: "s3://bucket/report.pdf", contentType: "application/pdf", byteSize: 42_000, - ct: ct); + cancellationToken: ct); return new { status = "done" }; }); diff --git a/docs/guides/leases.md b/docs/guides/leases.md index d4c3ccf..52f9a30 100644 --- a/docs/guides/leases.md +++ b/docs/guides/leases.md @@ -47,7 +47,7 @@ server.RegisterAgent("file-writer", async (ctx, ct) => ctx.Lease, ctx.LeaseConstraints, LeaseNamespaces.FsWrite, - path: "/workspace/src/output.cs"); + pattern: "/workspace/src/output.cs"); } catch (PermissionDeniedException ex) { diff --git a/docs/guides/observability.md b/docs/guides/observability.md index 2c474e4..ca8c7f1 100644 --- a/docs/guides/observability.md +++ b/docs/guides/observability.md @@ -41,7 +41,7 @@ constants. | Name | Purpose | | ----------------- | -------------------------------------------------- | | `Arcp.Transport` | One span per envelope (send and receive). | -| `Arcp.Runtime` | Runtime-internal spans (dispatch, agent run). | +| `Arcp.Runtime` | Application spans you start manually (e.g. delegation). | ## Span shape @@ -66,10 +66,6 @@ For each envelope, the wrapper: | `arcp.job_id` | envelope `job_id` | | `arcp.trace_id` | envelope `trace_id` | | `arcp.event_seq` | envelope `event_seq` | -| `arcp.agent` | `payload.agent` (on submit / accept) | -| `arcp.lease.capabilities` | comma-joined lease keys | -| `arcp.lease.expires_at` | ISO 8601 string (v1.1) | -| `arcp.budget.remaining` | JSON-stringified currency map (v1.1) | ## Use with ASP.NET Core diff --git a/docs/guides/vendor-extensions.md b/docs/guides/vendor-extensions.md index 72ba13c..517b36d 100644 --- a/docs/guides/vendor-extensions.md +++ b/docs/guides/vendor-extensions.md @@ -31,8 +31,14 @@ Write on the send side by populating `Extensions` before calling `ITransport.SendAsync`: ```csharp -var env = new Envelope { /* ... */ }; -env.Extensions["x-vendor.acme.priority"] = JsonSerializer.SerializeToElement("high"); +var env = new Envelope +{ + /* ... */ + Extensions = new Dictionary + { + ["x-vendor.acme.priority"] = JsonSerializer.SerializeToElement("high"), + }, +}; await transport.SendAsync(env, ct); ``` diff --git a/docs/projects/Arcp.AspNetCore.md b/docs/projects/Arcp.AspNetCore.md index 22ca0bb..e4b55d2 100644 --- a/docs/projects/Arcp.AspNetCore.md +++ b/docs/projects/Arcp.AspNetCore.md @@ -77,6 +77,6 @@ app.MapArcp(server, o => { o.Path = "/arcp-external"; o.AllowedHosts = new[] { - [Arcp.Runtime](./Arcp.Runtime.md) — `ArcpServer` configuration. - [Arcp.Otel](./Arcp.Otel.md) — transport instrumentation. -- [Arcp.Hosting](./Arcp.Hosting.md) — `IHostedService` / DI integration. +- [Arcp.Hosting](./Arcp.Hosting.md) — `AddArcpRuntime` DI registration. - [Troubleshooting — 403 Forbidden](../troubleshooting.md#websocket-upgrade-returns-403-forbidden) — allowed-host failures. - [Troubleshooting — HEARTBEAT_LOST](../troubleshooting.md#job-is-cancelled-with-heartbeat_lost) — keepalive collision. diff --git a/docs/projects/Arcp.Core.md b/docs/projects/Arcp.Core.md index 563811e..308a758 100644 --- a/docs/projects/Arcp.Core.md +++ b/docs/projects/Arcp.Core.md @@ -16,7 +16,7 @@ dotnet add package Arcp.Core The JSON container that wraps every ARCP message on the wire (spec §5): ```csharp -public sealed class Envelope +public sealed record Envelope { public string Arcp { get; init; } // wire version, e.g. "1.1" public string Id { get; init; } // random message id @@ -25,10 +25,10 @@ public sealed class Envelope public string? JobId { get; init; } public string? TraceId { get; init; } public long? EventSeq { get; init; } - public JsonElement? Payload { get; init; } + public object? Payload { get; init; } // Unknown top-level fields round-trip here (§15): - public Dictionary Extensions { get; } + public IDictionary? Extensions { get; init; } } ``` diff --git a/docs/projects/Arcp.Otel.md b/docs/projects/Arcp.Otel.md index 461ecfe..8977c83 100644 --- a/docs/projects/Arcp.Otel.md +++ b/docs/projects/Arcp.Otel.md @@ -73,10 +73,6 @@ For each envelope the wrapper: | `arcp.job_id` | envelope `job_id` | | `arcp.trace_id` | envelope `trace_id` | | `arcp.event_seq` | envelope `event_seq` | -| `arcp.agent` | `payload.agent` (on submit / accept) | -| `arcp.lease.capabilities` | comma-joined lease keys | -| `arcp.lease.expires_at` | ISO 8601 string (v1.1) | -| `arcp.budget.remaining` | JSON-stringified currency map (v1.1) | ## Propagating trace IDs to child jobs diff --git a/docs/projects/Arcp.md b/docs/projects/Arcp.md index 8ab8cbd..c0f57b1 100644 --- a/docs/projects/Arcp.md +++ b/docs/projects/Arcp.md @@ -22,7 +22,7 @@ Optional add-ons are **not** bundled and must be referenced explicitly: | ------------------ | ----------------------------------------------------- | | `Arcp.AspNetCore` | Kestrel / ASP.NET Core hosting (`MapArcp`). | | `Arcp.Otel` | OpenTelemetry transport instrumentation. | -| `Arcp.Hosting` | `IHostedService` + `IHostApplicationLifetime` wiring. | +| `Arcp.Hosting` | `IServiceCollection.AddArcpRuntime` DI registration. | | `Arcp.Cli` | `arcp` CLI tool — serve and submit from a terminal. | ## Typical project file diff --git a/docs/recipes.md b/docs/recipes.md index f36e6b5..6749b1e 100644 --- a/docs/recipes.md +++ b/docs/recipes.md @@ -50,7 +50,7 @@ intentionally drops the WebSocket connection mid-stream and reconnects using its `resume_token`. The runtime replays the missed chunks gap-free. Concepts: `ctx.BeginResultStream`, `ctx.WriteChunkAsync`, `handle.Chunks`, -`ArcpClientOptions.ResumeToken`, `RESUME_WINDOW_EXPIRED`. +`ArcpClient.ResumeToken`, `RESUME_WINDOW_EXPIRED`. → [`recipes/stream-resume/`](../recipes/stream-resume/) diff --git a/docs/troubleshooting.md b/docs/troubleshooting.md index 2e0535e..6d09197 100644 --- a/docs/troubleshooting.md +++ b/docs/troubleshooting.md @@ -110,7 +110,7 @@ trace ID). See An incoming envelope has a malformed payload. The most common cause is a version mismatch between the client and server SDK. Verify both sides use -compatible `arcp` wire versions (`client.WireVersion` / `server.WireVersion`). +compatible `arcp` wire versions (the envelope `arcp` field, default `"1.1"`). ### Vendor extension fields disappear diff --git a/recipes/multi-agent-budget/Program.cs b/recipes/multi-agent-budget/Program.cs index 16ad7a0..9dff43c 100644 --- a/recipes/multi-agent-budget/Program.cs +++ b/recipes/multi-agent-budget/Program.cs @@ -108,6 +108,8 @@ await ctx.LogAsync("warn", var plannerLease = new Lease(new Dictionary> { ["cost.budget"] = new[] { "USD:5.00" }, + // Spec §9.3 deny-by-default: the planner must hold agent.delegate to delegate to workers. + ["agent.delegate"] = new[] { "*" }, }); var handle = await client.SubmitAsync( diff --git a/samples/CostBudget/Program.cs b/samples/CostBudget/Program.cs index aa2d185..b9ca25d 100644 --- a/samples/CostBudget/Program.cs +++ b/samples/CostBudget/Program.cs @@ -30,6 +30,8 @@ var lease = new Lease(new Dictionary> { ["cost.budget"] = new[] { "USD:1.00" }, + // Spec §9.3 deny-by-default: tool.call must be covered by the lease for the agent to emit it. + ["tool.call"] = new[] { "*" }, }); var handle = await client.SubmitAsync("research", leaseRequest: lease); Console.WriteLine($"initial budget: {string.Join(",", handle.Budget!)}"); diff --git a/samples/Delegate/Program.cs b/samples/Delegate/Program.cs index 2ec3048..e8a0328 100644 --- a/samples/Delegate/Program.cs +++ b/samples/Delegate/Program.cs @@ -2,6 +2,7 @@ // samples/Delegate: parent agent submits a child job and emits a `delegate` event linking them. // Spec: §10, §13.2. using Arcp.Client; +using Arcp.Core.Leases; using Arcp.Core.Messages; using Arcp.Core.Transport; using Arcp.Runtime; @@ -24,7 +25,12 @@ { Client = new ClientInfo { Name = "delegate-client", Version = "1.0.0" }, }); -var handle = await client.SubmitAsync("parent"); +// Spec §9.3 deny-by-default: agent.delegate must be covered by the lease for the parent to delegate. +var lease = new Lease(new Dictionary> +{ + ["agent.delegate"] = new[] { "*" }, +}); +var handle = await client.SubmitAsync("parent", leaseRequest: lease); var res = await handle.Result; Console.WriteLine($"parent: {res.FinalStatus}"); return 0; diff --git a/src/Arcp.Client/ArcpClient.Dispatch.cs b/src/Arcp.Client/ArcpClient.Dispatch.cs index fa786db..8fc2742 100644 --- a/src/Arcp.Client/ArcpClient.Dispatch.cs +++ b/src/Arcp.Client/ArcpClient.Dispatch.cs @@ -18,7 +18,15 @@ private async Task ReaderLoop(CancellationToken cancellationToken) { await foreach (var env in _transport.ReceiveAsync(cancellationToken).ConfigureAwait(false)) { - if (env.EventSeq is { } seq) Interlocked.Exchange(ref _lastReceivedSeq, seq); + if (env.EventSeq is { } seq) + { + // Spec §8.3: event_seq is strictly monotonic and gap-free. If the new seq skips + // the expected successor, surface a detectable broken-session signal instead of + // silently accepting the gap. + var prev = Interlocked.Read(ref _lastReceivedSeq); + if (prev > 0 && seq > prev + 1) OnEventSeqGap(prev + 1, seq); + if (seq > prev) Interlocked.Exchange(ref _lastReceivedSeq, seq); + } await DispatchAsync(env, cancellationToken).ConfigureAwait(false); } } @@ -130,15 +138,32 @@ private ValueTask RespondToPingAsync(SessionPingPayload p, CancellationToken can private void PropagateSessionError(SessionErrorPayload err) { + var jobError = new JobErrorPayload + { + Code = err.Code, + Message = err.Message, + Retryable = err.Retryable, + Detail = err.Detail, + }; + foreach (var h in _handles.Values) { - h.OnError(new JobErrorPayload - { - Code = err.Code, - Message = err.Message, - Retryable = err.Retryable, - Detail = err.Detail, - }); + h.OnError(jobError); + } + + // A submission rejected before acceptance lives in _pendingSubmits, not _handles, and a + // list_jobs request lives in _listJobsRequests. session.error is not correlated to a + // specific request id, so the safe contract is to fault every outstanding request — leaving + // them pending would hang SubmitAsync/ListJobsAsync until the caller's token fires. + while (_pendingSubmits.TryDequeue(out var pending)) + { + pending.OnError(jobError); + } + + foreach (var key in _listJobsRequests.Keys) + { + if (_listJobsRequests.TryRemove(key, out var tcs)) + tcs.TrySetException(JobHandle.ToException(err.Code, err.Message, err.Detail)); } } } diff --git a/src/Arcp.Client/ArcpClient.cs b/src/Arcp.Client/ArcpClient.cs index 44e061d..2c239a5 100644 --- a/src/Arcp.Client/ArcpClient.cs +++ b/src/Arcp.Client/ArcpClient.cs @@ -53,6 +53,22 @@ public sealed partial class ArcpClient : IAsyncDisposable /// Gets the last received seq. public long LastReceivedSeq => Interlocked.Read(ref _lastReceivedSeq); + /// True once an inbound event_seq has skipped the expected next value, indicating + /// the session stream has a gap and SHOULD be treated as broken (and resumed once resume is + /// wired) per spec §8.3. + public bool IsSessionBroken { get; private set; } + + /// Raised when an inbound event_seq skips the expected successor (spec §8.3). The + /// arguments are (expectedSeq, receivedSeq). Handlers run on the reader loop; keep them + /// fast and non-throwing. + public event Action? EventSeqGapDetected; + + private void OnEventSeqGap(long expected, long received) + { + IsSessionBroken = true; + EventSeqGapDetected?.Invoke(expected, received); + } + /// Initializes a new instance of the class. public ArcpClient(ITransport transport, ArcpClientOptions options) { diff --git a/src/Arcp.Client/JobHandle.cs b/src/Arcp.Client/JobHandle.cs index a6b2e25..d7d16b0 100644 --- a/src/Arcp.Client/JobHandle.cs +++ b/src/Arcp.Client/JobHandle.cs @@ -81,10 +81,35 @@ internal void OnResult(JobResultPayload payload) internal void OnError(JobErrorPayload payload) { + // If the job was rejected before acceptance (e.g. a server session.error for a duplicate + // key or unavailable agent), the awaiter on Accepted must fault rather than hang forever. + // For a post-acceptance terminal error, Accepted is already resolved so this is a no-op. + _accepted.TrySetException(ToException(payload.Code, payload.Message, payload.Detail)); _terminal.TrySetResult(new JobResult(false, null, payload)); _events.Writer.TryComplete(); } + /// Map a wire error code to the most specific subtype so + /// callers can catch on the concrete type (e.g. ). + internal static ArcpException ToException(string code, string message, string? detail) => code switch + { + ErrorCode.DuplicateKey => new DuplicateKeyException(message, detail), + ErrorCode.AgentNotAvailable => new AgentNotAvailableException(message, detail), + ErrorCode.AgentVersionNotAvailable => new AgentVersionNotAvailableException(message, detail), + ErrorCode.LeaseSubsetViolation => new LeaseSubsetViolationException(message, detail), + ErrorCode.PermissionDenied => new PermissionDeniedException(message, detail), + ErrorCode.JobNotFound => new JobNotFoundException(message, detail), + ErrorCode.InvalidRequest => new InvalidRequestException(message, detail), + ErrorCode.Unauthenticated => new UnauthenticatedException(message, detail), + ErrorCode.BudgetExhausted => new BudgetExhaustedException(message, detail), + ErrorCode.LeaseExpired => new LeaseExpiredException(message, detail), + ErrorCode.ResumeWindowExpired => new ResumeWindowExpiredException(message, detail), + ErrorCode.HeartbeatLost => new HeartbeatLostException(message, detail), + ErrorCode.Timeout => new Arcp.Core.Errors.TimeoutException(message, detail), + ErrorCode.Cancelled => new CancelledException(message, detail), + _ => new ArcpException(code, message, detail), + }; + /// Events. public async IAsyncEnumerable Events([EnumeratorCancellation] CancellationToken cancellationToken = default) { diff --git a/src/Arcp.Client/PublicAPI.Unshipped.txt b/src/Arcp.Client/PublicAPI.Unshipped.txt index 5fa4f98..53c5374 100644 --- a/src/Arcp.Client/PublicAPI.Unshipped.txt +++ b/src/Arcp.Client/PublicAPI.Unshipped.txt @@ -7,7 +7,9 @@ Arcp.Client.ArcpClient.CancelJobAsync(Arcp.Core.Ids.JobId jobId, string? reason Arcp.Client.ArcpClient.ConnectAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! Arcp.Client.ArcpClient.DisposeAsync() -> System.Threading.Tasks.ValueTask Arcp.Client.ArcpClient.EffectiveFeatures.get -> System.Collections.Generic.IReadOnlyList! +Arcp.Client.ArcpClient.EventSeqGapDetected -> System.Action? Arcp.Client.ArcpClient.HeartbeatIntervalSec.get -> int? +Arcp.Client.ArcpClient.IsSessionBroken.get -> bool Arcp.Client.ArcpClient.LastReceivedSeq.get -> long Arcp.Client.ArcpClient.ListJobsAsync(Arcp.Core.Messages.JobListFilter? filter = null, int? limit = null, string? cursor = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task! Arcp.Client.ArcpClient.ResumeToken.get -> string? diff --git a/src/Arcp.Core/Envelope/Envelope.cs b/src/Arcp.Core/Envelope/Envelope.cs index 0d26653..4d88d41 100644 --- a/src/Arcp.Core/Envelope/Envelope.cs +++ b/src/Arcp.Core/Envelope/Envelope.cs @@ -6,7 +6,7 @@ namespace Arcp.Core.Wire; -/// The ARCP wire envelope (spec §5.1). Carries the message type discriminator, identifiers, +/// The ARCP wire envelope (spec §5). Carries the message type discriminator, identifiers, /// and a payload object whose shape depends on . public sealed record Envelope { @@ -56,8 +56,15 @@ public sealed record Envelope [JsonPropertyName("payload")] public object? Payload { get; init; } - /// Unknown top-level envelope fields preserved verbatim per spec §5.1 ("MUST ignore + /// Unknown top-level envelope fields preserved verbatim per spec §5 ("MUST ignore /// unknown fields") so they round-trip without loss. [JsonExtensionData] public IDictionary? Extensions { get; init; } + + /// Runtime-only, NON-serialized per-job monotonic index. Used by the runtime to make the + /// job.subscribe history/live-fan-out boundary exact (spec §7.6) — a subscriber drops a + /// fanned-out event whose index was already covered by its replayed history. Never transmitted + /// on the wire. + [JsonIgnore] + public long? JobEventIndex { get; init; } } diff --git a/src/Arcp.Core/Envelope/EnvelopeJsonConverter.cs b/src/Arcp.Core/Envelope/EnvelopeJsonConverter.cs index b202da3..b188273 100644 --- a/src/Arcp.Core/Envelope/EnvelopeJsonConverter.cs +++ b/src/Arcp.Core/Envelope/EnvelopeJsonConverter.cs @@ -8,7 +8,7 @@ namespace Arcp.Core.Wire; /// Custom envelope (de)serializer. Reads the type field first to choose the /// concrete payload type from . Unknown type values produce -/// an envelope with a payload — they round-trip without loss per spec §5.1. +/// an envelope with a payload — they round-trip without loss per spec §5. public sealed class EnvelopeJsonConverter : JsonConverter { private readonly MessageTypeRegistry _registry; @@ -101,9 +101,9 @@ private static EnvelopeFields ParseFields(JsonElement root) private static void ValidateHeader(EnvelopeFields fields) { if (string.IsNullOrEmpty(fields.Type)) - throw new Errors.InvalidRequestException("Envelope missing required 'type' field (spec §5.1)."); + throw new Errors.InvalidRequestException("Envelope missing required 'type' field (spec §5)."); if (fields.Arcp is not null && fields.Arcp != "1.1") - throw new Errors.InvalidRequestException($"Unsupported ARCP envelope version: '{fields.Arcp}' (spec §5.1; expected '1.1')."); + throw new Errors.InvalidRequestException($"Unsupported ARCP envelope version: '{fields.Arcp}' (spec §5; expected '1.1')."); } private object? DeserializePayload(EnvelopeFields fields, JsonSerializerOptions options) diff --git a/src/Arcp.Core/Envelope/MessageTypeRegistry.cs b/src/Arcp.Core/Envelope/MessageTypeRegistry.cs index 002d464..9b2d5ad 100644 --- a/src/Arcp.Core/Envelope/MessageTypeRegistry.cs +++ b/src/Arcp.Core/Envelope/MessageTypeRegistry.cs @@ -41,6 +41,8 @@ public static MessageTypeRegistry CreateCoreCatalog() var r = new MessageTypeRegistry(); r.Register(MessageTypeNames.SessionHello, typeof(SessionHelloPayload)); r.Register(MessageTypeNames.SessionWelcome, typeof(SessionWelcomePayload)); + r.Register(MessageTypeNames.SessionClose, typeof(SessionByePayload)); + r.Register(MessageTypeNames.SessionClosed, typeof(SessionByePayload)); r.Register(MessageTypeNames.SessionBye, typeof(SessionByePayload)); r.Register(MessageTypeNames.SessionPing, typeof(SessionPingPayload)); r.Register(MessageTypeNames.SessionPong, typeof(SessionPongPayload)); @@ -56,6 +58,7 @@ public static MessageTypeRegistry CreateCoreCatalog() r.Register(MessageTypeNames.JobResult, typeof(JobResultPayload)); r.Register(MessageTypeNames.JobError, typeof(JobErrorPayload)); r.Register(MessageTypeNames.JobCancel, typeof(JobCancelPayload)); + r.Register(MessageTypeNames.JobCancelled, typeof(JobCancelledPayload)); r.Register(MessageTypeNames.JobSubscribe, typeof(JobSubscribePayload)); r.Register(MessageTypeNames.JobSubscribed, typeof(JobSubscribedPayload)); r.Register(MessageTypeNames.JobUnsubscribe, typeof(JobUnsubscribePayload)); diff --git a/src/Arcp.Core/Errors/ErrorCode.cs b/src/Arcp.Core/Errors/ErrorCode.cs index 5d8ff08..eea16d3 100644 --- a/src/Arcp.Core/Errors/ErrorCode.cs +++ b/src/Arcp.Core/Errors/ErrorCode.cs @@ -48,8 +48,8 @@ public static class ErrorCode InvalidRequest, Unauthenticated, InternalError, }.ToFrozenSet(); - /// Retryable codes per spec §12: only AGENT_NOT_AVAILABLE, TIMEOUT, - /// HEARTBEAT_LOST, and INTERNAL_ERROR may be retried. + /// Returns whether the code is in the spec §12 retryable set: + /// AGENT_NOT_AVAILABLE, TIMEOUT, HEARTBEAT_LOST, and INTERNAL_ERROR. public static bool IsRetryable(string code) => code is AgentNotAvailable or Timeout or HeartbeatLost or InternalError; } diff --git a/src/Arcp.Core/Ids/Identifiers.cs b/src/Arcp.Core/Ids/Identifiers.cs index 716784e..672cd9b 100644 --- a/src/Arcp.Core/Ids/Identifiers.cs +++ b/src/Arcp.Core/Ids/Identifiers.cs @@ -5,7 +5,7 @@ namespace Arcp.Core.Ids; -/// A ULID-based message identifier (envelope id, spec §5.1). +/// A ULID-based message identifier (envelope id, spec §5). public readonly record struct MessageId(string Value) : IParsable { /// New. diff --git a/src/Arcp.Core/Messages/JobCancelledPayload.cs b/src/Arcp.Core/Messages/JobCancelledPayload.cs new file mode 100644 index 0000000..7945186 --- /dev/null +++ b/src/Arcp.Core/Messages/JobCancelledPayload.cs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +using System.Text.Json.Serialization; + +namespace Arcp.Core.Messages; + +/// Acknowledgement the runtime sends to the submitting session when a job.cancel is +/// accepted (spec §7.4). The terminal job.error{CANCELLED, final_status:"cancelled"} follows +/// once the run-loop unwinds the agent. +public sealed record JobCancelledPayload +{ + /// Gets the job id. + [JsonPropertyName("job_id")] public required string JobId { get; init; } + + /// Gets the reason. + [JsonPropertyName("reason")] public string? Reason { get; init; } +} diff --git a/src/Arcp.Core/Messages/MessageTypeNames.cs b/src/Arcp.Core/Messages/MessageTypeNames.cs index 7277cbe..b37ce83 100644 --- a/src/Arcp.Core/Messages/MessageTypeNames.cs +++ b/src/Arcp.Core/Messages/MessageTypeNames.cs @@ -16,7 +16,12 @@ public static class MessageTypeNames public const string SessionHello = "session.hello"; /// Gets the session welcome. public const string SessionWelcome = "session.welcome"; - /// Gets the session bye. + /// Client-sent graceful session close (spec §6.7). + public const string SessionClose = "session.close"; + /// Runtime-sent acknowledgement of (spec §6.7). + public const string SessionClosed = "session.closed"; + /// Gets the session bye. Deprecated alias for kept for + /// back-compat with pre-1.1 peers; the runtime treats it like session.close (spec §6.7). public const string SessionBye = "session.bye"; /// Gets the session ping. public const string SessionPing = "session.ping"; @@ -51,6 +56,8 @@ public static class MessageTypeNames public const string JobError = "job.error"; /// Gets the job cancel. public const string JobCancel = "job.cancel"; + /// Gets the job cancelled acknowledgement (spec §7.4). + public const string JobCancelled = "job.cancelled"; /// Gets the job subscribe. public const string JobSubscribe = "job.subscribe"; /// Gets the job subscribed. @@ -61,9 +68,9 @@ public static class MessageTypeNames /// Gets the all. public static readonly FrozenSet All = new HashSet(StringComparer.Ordinal) { - SessionHello, SessionWelcome, SessionBye, SessionPing, SessionPong, SessionAck, + SessionHello, SessionWelcome, SessionClose, SessionClosed, SessionBye, SessionPing, SessionPong, SessionAck, SessionListJobs, SessionJobs, SessionError, SessionResume, - JobSubmit, JobAccepted, JobEvent, JobResult, JobError, JobCancel, + JobSubmit, JobAccepted, JobEvent, JobResult, JobError, JobCancel, JobCancelled, JobSubscribe, JobSubscribed, JobUnsubscribe, }.ToFrozenSet(); } diff --git a/src/Arcp.Core/PublicAPI.Unshipped.txt b/src/Arcp.Core/PublicAPI.Unshipped.txt index 24d5f9b..95eb740 100644 --- a/src/Arcp.Core/PublicAPI.Unshipped.txt +++ b/src/Arcp.Core/PublicAPI.Unshipped.txt @@ -266,6 +266,19 @@ static Arcp.Core.Messages.ProvisionedCredential.operator ==(Arcp.Core.Messages.P Arcp.Core.Messages.JobCancelPayload.JobId.init -> void Arcp.Core.Messages.JobCancelPayload.Reason.get -> string? Arcp.Core.Messages.JobCancelPayload.Reason.init -> void +Arcp.Core.Messages.JobCancelledPayload +Arcp.Core.Messages.JobCancelledPayload.$() -> Arcp.Core.Messages.JobCancelledPayload! +Arcp.Core.Messages.JobCancelledPayload.Equals(Arcp.Core.Messages.JobCancelledPayload? other) -> bool +Arcp.Core.Messages.JobCancelledPayload.JobCancelledPayload() -> void +Arcp.Core.Messages.JobCancelledPayload.JobId.get -> string! +Arcp.Core.Messages.JobCancelledPayload.JobId.init -> void +Arcp.Core.Messages.JobCancelledPayload.Reason.get -> string? +Arcp.Core.Messages.JobCancelledPayload.Reason.init -> void +override Arcp.Core.Messages.JobCancelledPayload.Equals(object? obj) -> bool +override Arcp.Core.Messages.JobCancelledPayload.GetHashCode() -> int +override Arcp.Core.Messages.JobCancelledPayload.ToString() -> string! +static Arcp.Core.Messages.JobCancelledPayload.operator !=(Arcp.Core.Messages.JobCancelledPayload? left, Arcp.Core.Messages.JobCancelledPayload? right) -> bool +static Arcp.Core.Messages.JobCancelledPayload.operator ==(Arcp.Core.Messages.JobCancelledPayload? left, Arcp.Core.Messages.JobCancelledPayload? right) -> bool Arcp.Core.Messages.JobErrorPayload Arcp.Core.Messages.JobErrorPayload.$() -> Arcp.Core.Messages.JobErrorPayload! Arcp.Core.Messages.JobErrorPayload.Code.get -> string! @@ -638,6 +651,8 @@ Arcp.Core.Wire.Envelope.Extensions.get -> System.Collections.Generic.IDictionary Arcp.Core.Wire.Envelope.Extensions.init -> void Arcp.Core.Wire.Envelope.Id.get -> string! Arcp.Core.Wire.Envelope.Id.init -> void +Arcp.Core.Wire.Envelope.JobEventIndex.get -> long? +Arcp.Core.Wire.Envelope.JobEventIndex.init -> void Arcp.Core.Wire.Envelope.JobId.get -> string? Arcp.Core.Wire.Envelope.JobId.init -> void Arcp.Core.Wire.Envelope.ParentSpanId.get -> string? @@ -707,6 +722,7 @@ const Arcp.Core.Messages.EventKinds.ToolCall = "tool_call" -> string! const Arcp.Core.Messages.EventKinds.ToolResult = "tool_result" -> string! const Arcp.Core.Messages.MessageTypeNames.JobAccepted = "job.accepted" -> string! const Arcp.Core.Messages.MessageTypeNames.JobCancel = "job.cancel" -> string! +const Arcp.Core.Messages.MessageTypeNames.JobCancelled = "job.cancelled" -> string! const Arcp.Core.Messages.MessageTypeNames.JobError = "job.error" -> string! const Arcp.Core.Messages.MessageTypeNames.JobEvent = "job.event" -> string! const Arcp.Core.Messages.MessageTypeNames.JobResult = "job.result" -> string! @@ -716,6 +732,8 @@ const Arcp.Core.Messages.MessageTypeNames.JobSubscribed = "job.subscribed" -> st const Arcp.Core.Messages.MessageTypeNames.JobUnsubscribe = "job.unsubscribe" -> string! const Arcp.Core.Messages.MessageTypeNames.SessionAck = "session.ack" -> string! const Arcp.Core.Messages.MessageTypeNames.SessionBye = "session.bye" -> string! +const Arcp.Core.Messages.MessageTypeNames.SessionClose = "session.close" -> string! +const Arcp.Core.Messages.MessageTypeNames.SessionClosed = "session.closed" -> string! const Arcp.Core.Messages.MessageTypeNames.SessionError = "session.error" -> string! const Arcp.Core.Messages.MessageTypeNames.SessionHello = "session.hello" -> string! const Arcp.Core.Messages.MessageTypeNames.SessionJobs = "session.jobs" -> string! diff --git a/src/Arcp.Runtime/Agents/AgentRegistry.cs b/src/Arcp.Runtime/Agents/AgentRegistry.cs index 193209a..a1bbc8e 100644 --- a/src/Arcp.Runtime/Agents/AgentRegistry.cs +++ b/src/Arcp.Runtime/Agents/AgentRegistry.cs @@ -63,21 +63,24 @@ public void SetDefaultVersion(string name, string version) /// To inventory. public IReadOnlyList ToInventory() => - _byName.Values.Select(e => new AgentInventoryEntry + _byName.Values.Select(e => { - Name = e.Name, - Versions = e.Versions.Count > 0 ? e.Versions.Keys.ToArray() : null, - Default = e.DefaultVersion, + var (versions, defaultVersion) = e.SnapshotInventoryFields(); + return new AgentInventoryEntry + { + Name = e.Name, + Versions = versions, + Default = defaultVersion, + }; }).ToArray(); private sealed class AgentEntry { private readonly object _gate = new(); + private readonly Dictionary _versions = new(StringComparer.Ordinal); public string Name { get; } - public Dictionary Versions { get; } = new(StringComparer.Ordinal); - public string? DefaultVersion { get; private set; } public IAgent? UnversionedAgent { get; private set; } @@ -86,14 +89,14 @@ private sealed class AgentEntry public bool TryGetVersion(string v, out IAgent? agent) { - lock (_gate) return Versions.TryGetValue(v, out agent); + lock (_gate) return _versions.TryGetValue(v, out agent); } public void AddVersion(string version, IAgent agent) { lock (_gate) { - Versions[version] = agent; + _versions[version] = agent; DefaultVersion ??= version; } } @@ -107,7 +110,7 @@ public void SetDefault(string version) { lock (_gate) { - if (!Versions.ContainsKey(version)) + if (!_versions.ContainsKey(version)) throw new AgentVersionNotAvailableException($"{Name}@{version} not registered"); DefaultVersion = version; } @@ -119,10 +122,22 @@ public void SetDefault(string version) { lock (_gate) { - foreach (var kv in Versions) return (kv.Key, kv.Value); + foreach (var kv in _versions) return (kv.Key, kv.Value); return null; } } } + + /// Atomically snapshot the inventory-visible fields under the same lock that guards + /// mutation, so never enumerates the mutable dictionary while a + /// concurrent registration is writing to it. + public (IReadOnlyList? Versions, string? Default) SnapshotInventoryFields() + { + lock (_gate) + { + var versions = _versions.Count > 0 ? _versions.Keys.ToArray() : null; + return (versions, DefaultVersion); + } + } } } diff --git a/src/Arcp.Runtime/ArcpServer.cs b/src/Arcp.Runtime/ArcpServer.cs index 308c2c2..23feacd 100644 --- a/src/Arcp.Runtime/ArcpServer.cs +++ b/src/Arcp.Runtime/ArcpServer.cs @@ -75,7 +75,8 @@ public ArcpServer(ArcpServerOptions options, ILoggerFactory? loggerFactory = nul options.IdempotencyWindowSec, options.EventLogCapacity, options.TerminalJobRetentionSec, - options.FatalBudgetExhaustion); + options.FatalBudgetExhaustion, + options.PermissiveUnleasedOperations); if (CredentialManager is not null) { _ = Task.Run(() => RevokeOutstandingCredentialsAsync(CancellationToken.None)); @@ -213,6 +214,9 @@ internal bool TryResume(string resumeToken, out SessionState? session) /// Dispose (asynchronous). public async ValueTask DisposeAsync() { + // Cancel in-flight jobs only at runtime shutdown — never on individual session teardown + // (spec §6.4, §6.7). Jobs are rooted at the JobManager's runtime token, not session _cts. + JobManager.Dispose(); try { _sweeperCts.Cancel(); } catch (ObjectDisposedException) { /* already disposed */ } if (_sweeperTask is not null) { @@ -240,9 +244,12 @@ private static IReadOnlyList ComputeAdvertisedFeatures(ArcpServerOptions nameof(options)); } + // Spec §9.7: model.use enforcement is independent of credential provisioning — + // LeaseManager.AuthorizeModelUse enforces it directly, so it MUST stay advertisable even + // when no ICredentialProvisioner is configured. Only provisioned_credentials is coupled to + // having a provisioner. return requested - .Where(f => !string.Equals(f, FeatureFlags.ProvisionedCredentials, StringComparison.Ordinal) && - !string.Equals(f, FeatureFlags.ModelUse, StringComparison.Ordinal)) + .Where(f => !string.Equals(f, FeatureFlags.ProvisionedCredentials, StringComparison.Ordinal)) .ToArray(); } diff --git a/src/Arcp.Runtime/ArcpServerOptions.cs b/src/Arcp.Runtime/ArcpServerOptions.cs index 9d9563c..eada35d 100644 --- a/src/Arcp.Runtime/ArcpServerOptions.cs +++ b/src/Arcp.Runtime/ArcpServerOptions.cs @@ -50,6 +50,12 @@ public sealed class ArcpServerOptions /// Gets the back pressure threshold. public int BackPressureThreshold { get; init; } = 1000; + /// When (the default, spec §9.1/§9.3 deny-by-default), an + /// authority-bearing operation (tool.call, agent.delegate) whose namespace the + /// job's lease does not declare fails with PERMISSION_DENIED. Set to + /// to opt into the legacy permissive behavior where uncovered namespaces are allowed. + public bool PermissiveUnleasedOperations { get; init; } + /// Gets the authorization policy. public IJobAuthorizationPolicy AuthorizationPolicy { get; init; } = new SamePrincipalPolicy(); diff --git a/src/Arcp.Runtime/Job.cs b/src/Arcp.Runtime/Job.cs index 8fad5c0..eca6561 100644 --- a/src/Arcp.Runtime/Job.cs +++ b/src/Arcp.Runtime/Job.cs @@ -27,6 +27,7 @@ public sealed class Job private readonly object _eventBufferGate = new(); private readonly List _eventBuffer = []; private readonly int _eventBufferCapacity; + private long _lastEmittedSeq; /// Gets the job id. public JobId JobId { get; } @@ -188,30 +189,55 @@ public async ValueTask EmitEventAsync(string kind, object body, CancellationToke TraceId = TraceId?.Value, Payload = payload, }; - BufferEvent(env); - await _emit(env, cancellationToken).ConfigureAwait(false); + var stamped = BufferEvent(env); + await _emit(stamped, cancellationToken).ConfigureAwait(false); } - private void BufferEvent(Envelope env) + /// Highest event_seq high-water mark emitted by this job, or + /// if it has not emitted any events yet. Surfaced in session.jobs (spec §6.6) so a dashboard + /// can decide where to subscribe from. + internal long? LastEmittedSeq + { + get + { + var seq = Interlocked.Read(ref _lastEmittedSeq); + return seq > 0 ? seq : null; + } + } + + /// Buffer an event for subscription replay, assigning it a monotonic per-job index. + /// Returns the index-stamped envelope. Spec §7.6: bounded per-job history so a later subscriber + /// with history: true can receive prior events in order before live events. Sized from + /// so subscribers see the same window resumers + /// do. + private Envelope BufferEvent(Envelope env) { - // Spec §7.6: bounded per-job history so a later subscriber with `history: true` - // can receive prior events in order before live events. Sized from - // `ArcpServerOptions.EventLogCapacity` so subscribers see the same window resumers do. lock (_eventBufferGate) { - _eventBuffer.Add(env); + // Spec §6.6: monotonic per-job high-water mark, surfaced as last_event_seq in the listing + // and used as the exact subscribe history/live-fan-out boundary (spec §7.6). + var index = ++_lastEmittedSeq; + var stamped = env with { JobEventIndex = index }; + _eventBuffer.Add(stamped); if (_eventBuffer.Count > _eventBufferCapacity) { _eventBuffer.RemoveRange(0, _eventBuffer.Count - _eventBufferCapacity); } + return stamped; } } - /// Snapshot of all events buffered for replay on a new subscription. - internal IReadOnlyList SnapshotEventHistory() + /// Atomically register a subscriber and snapshot the current event history under the same + /// lock that guards . Spec §7.6: this makes the subscribe boundary exact — + /// every event with index ≤ is in the returned snapshot, and + /// every event after is delivered live via fan-out, so the new subscriber sees each event exactly + /// once with no gap or duplicate at the boundary. + internal IReadOnlyList RegisterSubscriberAndSnapshot(Action register, out long highWaterIndex) { lock (_eventBufferGate) { + register(); + highWaterIndex = _lastEmittedSeq; return _eventBuffer.ToArray(); } } diff --git a/src/Arcp.Runtime/JobContext.cs b/src/Arcp.Runtime/JobContext.cs index 2fa6e60..1b33b27 100644 --- a/src/Arcp.Runtime/JobContext.cs +++ b/src/Arcp.Runtime/JobContext.cs @@ -25,14 +25,17 @@ public sealed class JobContext private readonly CredentialManager? _credentials; private readonly bool _fatalBudgetExhaustion; private readonly Arcp.Runtime.Leases.LeaseManager? _leases; + private readonly bool _permissiveUnleasedOperations; internal JobContext(Job job, ILogger logger, CredentialManager? credentials = null, - bool fatalBudgetExhaustion = false, Arcp.Runtime.Leases.LeaseManager? leases = null) + bool fatalBudgetExhaustion = false, Arcp.Runtime.Leases.LeaseManager? leases = null, + bool permissiveUnleasedOperations = false) { _job = job; _credentials = credentials; _fatalBudgetExhaustion = fatalBudgetExhaustion; _leases = leases; + _permissiveUnleasedOperations = permissiveUnleasedOperations; Logger = logger; } @@ -44,7 +47,9 @@ internal JobContext(Job job, ILogger logger, CredentialManager? credentials = nu public void AuthorizeOperation(string namespaceName, string pattern) { var leases = _leases ?? new Arcp.Runtime.Leases.LeaseManager(); - leases.AuthorizeOperation(_job.Lease, _job.LeaseConstraints, namespaceName, pattern); + // Spec §9.6: pass the job's budget ledger so the operation is gated on remaining budget + // before the capability/pattern check. + leases.AuthorizeOperation(_job.Lease, _job.LeaseConstraints, namespaceName, pattern, _job.BudgetLedger); } /// Gets the job id. @@ -109,7 +114,7 @@ public ValueTask RotateCredentialAsync( /// raises before the event is emitted. public ValueTask ToolCallAsync(string tool, string callId, object? args, CancellationToken cancellationToken = default) { - EnforceIfLeased(LeaseNamespaces.ToolCall, tool); + EnforceLeaseCoverage(LeaseNamespaces.ToolCall, tool); return _job.EmitEventAsync(EventKinds.ToolCall, new ToolCallBody { Tool = tool, @@ -118,15 +123,23 @@ public ValueTask ToolCallAsync(string tool, string callId, object? args, Cancell }, cancellationToken); } - /// Gate an operation when the lease declares the given namespace. Leases that omit - /// the namespace remain permissive (spec §9.7 explicitly allows this when a runtime is - /// configured to do so; tighter policies SHOULD call - /// directly). - private void EnforceIfLeased(string namespaceName, string pattern) + /// Gate an authority-bearing operation against the lease. Spec §9.1/§9.3 require + /// deny-by-default: an operation whose namespace the lease does not declare is unauthorized and + /// raises . Opt into the legacy permissive behavior (allow + /// uncovered namespaces) via . + private void EnforceLeaseCoverage(string namespaceName, string pattern) { if (_job.Lease.Capabilities.ContainsKey(namespaceName)) { AuthorizeOperation(namespaceName, pattern); + return; + } + + if (!_permissiveUnleasedOperations) + { + throw new PermissionDeniedException( + $"Operation '{namespaceName}' is not covered by the job's lease (deny-by-default, spec §9.3). " + + "Grant the namespace in the lease, or enable ArcpServerOptions.PermissiveUnleasedOperations."); } } @@ -182,7 +195,7 @@ public ValueTask ArtifactRefAsync(string uri, string? contentType = null, long? /// child agent name is gated against the lease patterns first (spec §9.3, §10). public ValueTask DelegateAsync(string childJobId, string agent, object? input, CancellationToken cancellationToken = default) { - EnforceIfLeased(LeaseNamespaces.AgentDelegate, agent); + EnforceLeaseCoverage(LeaseNamespaces.AgentDelegate, agent); return _job.EmitEventAsync(EventKinds.Delegate, new DelegateBody { ChildJobId = childJobId, diff --git a/src/Arcp.Runtime/JobManager.Listing.cs b/src/Arcp.Runtime/JobManager.Listing.cs index 0212f70..7f606ca 100644 --- a/src/Arcp.Runtime/JobManager.Listing.cs +++ b/src/Arcp.Runtime/JobManager.Listing.cs @@ -12,42 +12,95 @@ namespace Arcp.Runtime; public sealed partial class JobManager { - /// List. + /// List jobs visible to , paged by a stable keyset + /// cursor on (created_at, job_id) (spec §6.6). Page materialization is bounded to + /// limit + 1 entries regardless of how many jobs are visible, and ordering is stable when + /// multiple jobs share the same CreatedAt. public IReadOnlyList List(string? requesterPrincipal, IJobAuthorizationPolicy policy, JobListFilter? filter, int? limit, string? cursor, out string? nextCursor) { - var jobs = FilterByPrincipal(requesterPrincipal, policy); - jobs = ApplyFilter(jobs, filter); - jobs = jobs.OrderBy(j => j.CreatedAt).ToList(); + var take = limit is > 0 ? limit.Value : 100; + var after = DecodeCursor(cursor); - var skip = ParseCursor(cursor); - var take = limit ?? 100; - var page = jobs.Skip(skip).Take(take).ToList(); - nextCursor = skip + page.Count < jobs.Count ? EncodeCursor(skip + page.Count) : null; + // Convert the status filter to a set once per request rather than per job. + var statusSet = filter?.Status is { Count: > 0 } statuses + ? new HashSet(statuses, StringComparer.Ordinal) + : null; - return page.Select(ToListEntry).ToArray(); + // Single streaming pass: keep only the smallest take+1 entries strictly after the cursor. + // This bounds page materialization to take+1 entries instead of sorting/rematerializing the + // full visible job set on every page (spec §6.6). + var page = new List(take + 1); + foreach (var job in FilterByPrincipal(requesterPrincipal, policy)) + { + if (!MatchesFilter(job, filter, statusSet)) continue; + var key = JobKey.From(job); + if (after is { } a && JobKey.Compare(key, a) <= 0) continue; // at or before the cursor + InsertBounded(page, job, take + 1); + } + + // A take+1th entry means there is at least one more page; the cursor is the last returned key. + nextCursor = page.Count > take ? EncodeCursor(JobKey.From(page[take - 1])) : null; + var count = Math.Min(page.Count, take); + var result = new JobListEntry[count]; + for (var i = 0; i < count; i++) result[i] = ToListEntry(page[i]); + return result; } - private List FilterByPrincipal(string? requesterPrincipal, IJobAuthorizationPolicy policy) => - _jobs.Values - .Where(j => string.IsNullOrEmpty(requesterPrincipal) || - string.Equals(j.SubmitterPrincipal, requesterPrincipal, StringComparison.Ordinal) || - policy.CanObserve(j.SubmitterPrincipal, new AuthPrincipal(requesterPrincipal))) - .ToList(); + /// Jobs the requester is allowed to see. Fail closed: an empty/absent principal is NOT a + /// wildcard — it sees only what the authorization policy explicitly permits, never the full + /// cross-principal set (spec §6.6, §14). + private IEnumerable FilterByPrincipal(string? requesterPrincipal, IJobAuthorizationPolicy policy) + { + if (string.IsNullOrEmpty(requesterPrincipal)) + { + var anonymous = new AuthPrincipal(string.Empty); + return _jobs.Values.Where(j => policy.CanObserve(j.SubmitterPrincipal, anonymous)); + } + + var principal = new AuthPrincipal(requesterPrincipal); + return _jobs.Values.Where(j => + string.Equals(j.SubmitterPrincipal, requesterPrincipal, StringComparison.Ordinal) || + policy.CanObserve(j.SubmitterPrincipal, principal)); + } + + private static bool MatchesFilter(Job j, JobListFilter? filter, HashSet? statusSet) + { + if (filter is null) return true; + if (statusSet is not null && !statusSet.Contains(MapStatus(j.Status))) return false; + if (!string.IsNullOrEmpty(filter.Agent) && j.Agent.Name != filter.Agent && j.Agent.ToString() != filter.Agent) + return false; + if (filter.CreatedAfter is { } after && j.CreatedAt <= after) return false; + return true; + } - private static List ApplyFilter(List jobs, JobListFilter? filter) + /// Insert into the ascending-ordered , + /// keeping at most entries. Jobs ordering larger than everything retained + /// are discarded without growing the list, so the buffer never exceeds cap. + private static void InsertBounded(List page, Job job, int cap) { - if (filter is null) return jobs; - if (filter.Status is { Count: > 0 } statuses) - jobs = jobs.Where(j => statuses.Contains(MapStatus(j.Status), StringComparer.Ordinal)).ToList(); - if (!string.IsNullOrEmpty(filter.Agent)) + var key = JobKey.From(job); + int lo = 0, hi = page.Count; + while (lo < hi) { - var a = filter.Agent; - jobs = jobs.Where(j => j.Agent.Name == a || j.Agent.ToString() == a).ToList(); + var mid = (lo + hi) / 2; + if (JobKey.Compare(JobKey.From(page[mid]), key) < 0) lo = mid + 1; + else hi = mid; + } + if (lo >= cap) return; // larger than every retained entry; cannot be in the smallest `cap` + page.Insert(lo, job); + if (page.Count > cap) page.RemoveAt(page.Count - 1); + } + + private readonly record struct JobKey(DateTimeOffset CreatedAt, string JobId) + { + public static JobKey From(Job j) => new(j.CreatedAt, j.JobId.Value); + + public static int Compare(JobKey a, JobKey b) + { + var byTime = a.CreatedAt.UtcDateTime.CompareTo(b.CreatedAt.UtcDateTime); + return byTime != 0 ? byTime : string.CompareOrdinal(a.JobId, b.JobId); } - if (filter.CreatedAfter is { } after) - jobs = jobs.Where(j => j.CreatedAt > after).ToList(); - return jobs; } private static JobListEntry ToListEntry(Job j) => new() @@ -60,6 +113,7 @@ private static List ApplyFilter(List jobs, JobListFilter? filter) ParentJobId = j.ParentJobId, CreatedAt = j.CreatedAt, TraceId = j.TraceId?.Value, + LastEventSeq = j.LastEmittedSeq, }; private static string MapStatus(JobStatus s) => s switch @@ -73,13 +127,25 @@ private static List ApplyFilter(List jobs, JobListFilter? filter) _ => "unknown", }; - private static int ParseCursor(string? cursor) + private static JobKey? DecodeCursor(string? cursor) { - if (string.IsNullOrEmpty(cursor)) return 0; - try { return int.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(cursor)), CultureInfo.InvariantCulture); } - catch (FormatException) { return 0; } + if (string.IsNullOrEmpty(cursor)) return null; + try + { + var raw = Encoding.UTF8.GetString(Convert.FromBase64String(cursor)); + var sep = raw.IndexOf('|'); + if (sep < 0) return null; + var ticks = long.Parse(raw[..sep], CultureInfo.InvariantCulture); + var jobId = raw[(sep + 1)..]; + return new JobKey(new DateTimeOffset(ticks, TimeSpan.Zero), jobId); + } + catch (FormatException) { return null; } + catch (OverflowException) { return null; } } - private static string EncodeCursor(int offset) => - Convert.ToBase64String(Encoding.UTF8.GetBytes(offset.ToString(CultureInfo.InvariantCulture))); + private static string EncodeCursor(JobKey key) + { + var raw = $"{key.CreatedAt.UtcTicks.ToString(CultureInfo.InvariantCulture)}|{key.JobId}"; + return Convert.ToBase64String(Encoding.UTF8.GetBytes(raw)); + } } diff --git a/src/Arcp.Runtime/JobManager.cs b/src/Arcp.Runtime/JobManager.cs index 36385ea..c810255 100644 --- a/src/Arcp.Runtime/JobManager.cs +++ b/src/Arcp.Runtime/JobManager.cs @@ -25,10 +25,11 @@ namespace Arcp.Runtime; /// Runtime-wide registry of running jobs. Coordinates submit → accept → terminal, idempotency /// dedup, cancellation, lease watchdog, and subscription fan-out. -public sealed partial class JobManager +public sealed partial class JobManager : IDisposable { private readonly ConcurrentDictionary _jobs = new(); private readonly ConcurrentDictionary _idempotency = new(StringComparer.Ordinal); + private readonly CancellationTokenSource _runtimeCts = new(); private readonly AgentRegistry _agents; private readonly LeaseManager _leases; private readonly TimeProvider _time; @@ -38,6 +39,7 @@ public sealed partial class JobManager private readonly int _eventBufferCapacity; private readonly int _terminalRetentionSec; private readonly bool _fatalBudgetExhaustion; + private readonly bool _permissiveUnleasedOperations; /// Stored record for an idempotency key: original submission fingerprint plus issue time. private sealed record IdempotencyRecord(JobId JobId, string Fingerprint, DateTimeOffset CreatedAt); @@ -52,7 +54,8 @@ public JobManager( int idempotencyWindowSec = 3600, int eventBufferCapacity = 4096, int terminalRetentionSec = 600, - bool fatalBudgetExhaustion = false) + bool fatalBudgetExhaustion = false, + bool permissiveUnleasedOperations = false) { _agents = agents; _leases = leases; @@ -63,6 +66,7 @@ public JobManager( _eventBufferCapacity = eventBufferCapacity > 0 ? eventBufferCapacity : 4096; _terminalRetentionSec = terminalRetentionSec; _fatalBudgetExhaustion = fatalBudgetExhaustion; + _permissiveUnleasedOperations = permissiveUnleasedOperations; } /// Initializes a new without credential provisioning. @@ -74,6 +78,27 @@ public JobManager(AgentRegistry agents, LeaseManager leases, TimeProvider time, /// All jobs currently tracked, in arbitrary order. public IEnumerable Jobs => _jobs.Values; + /// Cancellation token rooted at the runtime, NOT at any session. Running jobs are linked + /// to this token so they survive session teardown — a heartbeat timeout, graceful + /// session.close, or transient transport drop MUST NOT terminate in-flight jobs + /// (spec §6.4, §6.7). Only an explicit job.cancel, a lease/budget/runtime limit, or + /// runtime shutdown cancels a job. + internal CancellationToken RuntimeToken => _runtimeCts.Token; + + /// Cancel every running job and stop the runtime. Called on + /// disposal. + internal void ShutdownRuntime() + { + try { _runtimeCts.Cancel(); } catch (ObjectDisposedException) { /* already disposed */ } + } + + /// Cancel all in-flight jobs and release the runtime cancellation source. + public void Dispose() + { + ShutdownRuntime(); + _runtimeCts.Dispose(); + } + /// Look up a job by id. public bool TryGet(JobId id, out Job? job) { @@ -85,7 +110,7 @@ public bool TryGet(JobId id, out Job? job) /// Submit a job. The caller (SessionState) hands in the envelope; this method returns /// the to run asynchronously plus the job.accepted payload. /// propagates the envelope's trace_id per spec §11. - public async Task<(Job Job, JobAcceptedPayload Accepted)> SubmitAsync( + public async Task<(Job Job, JobAcceptedPayload Accepted, bool IsReplay)> SubmitAsync( JobSubmitPayload submit, SessionId sessionId, string? submitterPrincipal, @@ -113,7 +138,9 @@ public bool TryGet(JobId id, out Job? job) } if (_jobs.TryGetValue(existingRecord.JobId, out var existing)) { - return (existing, BuildAccepted(existing)); + // Spec §7.2: an idempotent replay returns the *same* job.accepted and MUST + // NOT re-run the agent. Flag it so the caller skips Resolve/RunAsync. + return (existing, BuildAccepted(existing), true); } } else @@ -153,7 +180,7 @@ public bool TryGet(JobId id, out Job? job) _idempotency[idemKey] = new IdempotencyRecord(jobId, fingerprint, _time.GetUtcNow()); } - return (job, BuildAccepted(job)); + return (job, BuildAccepted(job), false); } private void AssertChildLeaseIsSubset(string parentJobId, Lease child, LeaseConstraints? childConstraints) @@ -187,7 +214,7 @@ private static JobAcceptedPayload BuildAccepted(Job job) public async Task RunAsync(Job job, IAgent agent, Func emit, CancellationToken cancellationToken) { job.MarkRunning(); - var ctx = new JobContext(job, _loggers.CreateLogger($"Arcp.Job.{job.JobId.Value}"), _credentials, _fatalBudgetExhaustion, _leases); + var ctx = new JobContext(job, _loggers.CreateLogger($"Arcp.Job.{job.JobId.Value}"), _credentials, _fatalBudgetExhaustion, _leases, _permissiveUnleasedOperations); // Watchdog cancellation source — cancelled in `finally` so the watchdog never outlives // the job and never emits a late lease-expired event after the terminal result. @@ -440,15 +467,19 @@ private async Task RunRuntimeWatchdog(Job job, TimeSpan limit, CancellationToken private static bool IsTerminal(JobStatus s) => s is JobStatus.Success or JobStatus.Error or JobStatus.Cancelled or JobStatus.TimedOut; - /// Cancel a running job. Only the original submitter may cancel; subscribers may not (spec §7.6). - public bool Cancel(JobId jobId, string? requesterPrincipal, string? reason) + /// Cancel a running job. Cancellation authority is scoped to the *submitting session* + /// (spec §7.6, §13.3, §14): only the session that submitted the job may cancel it. Subscription — + /// even from another session of the same principal — does NOT confer cancel authority. Returns + /// if the job does not exist; throws + /// when the requesting session is not the submitter. + public bool Cancel(JobId jobId, SessionId requesterSession, string? reason) { if (!_jobs.TryGetValue(jobId, out var job)) return false; - // Spec §7.6: subscription does NOT grant cancel authority; only submitter may cancel. - if (requesterPrincipal is not null && job.SubmitterPrincipal is not null && - !string.Equals(requesterPrincipal, job.SubmitterPrincipal, StringComparison.Ordinal)) + // Fail closed: compare the submitting session, never the principal. A null/foreign requester + // must not be able to bypass the check (spec §14: "Subscription MUST NOT confer cancel authority"). + if (!job.SessionId.Equals(requesterSession)) { - throw new PermissionDeniedException("Subscribers MUST NOT cancel jobs (spec §7.6)"); + throw new PermissionDeniedException("Only the submitting session may cancel a job (spec §7.6, §14)"); } job.CancellationSource.Cancel(); return true; diff --git a/src/Arcp.Runtime/Leases/LeaseManager.cs b/src/Arcp.Runtime/Leases/LeaseManager.cs index c917a65..ae1b279 100644 --- a/src/Arcp.Runtime/Leases/LeaseManager.cs +++ b/src/Arcp.Runtime/Leases/LeaseManager.cs @@ -85,7 +85,7 @@ public void AssertSubset(Lease parent, Lease child, IReadOnlyDictionary parentExp) throw new LeaseSubsetViolationException("Child lease_constraints.expires_at MUST NOT exceed parent's (spec §9.4)"); } - // No child constraints → child implicitly inherits parent expiry. (spec §9.4) + // No child constraints ��� child implicitly inherits parent expiry. (spec §9.4) } } @@ -126,11 +126,22 @@ private static void CheckBudgetSubset(IReadOnlyList childPatterns, IRead /// Authorize a single operation against the lease. Throws /// if no parent pattern matches; throws if the lease has /// expired (spec §9.3, §9.5). - public void AuthorizeOperation(Lease lease, LeaseConstraints? constraints, string namespaceName, string pattern) + public void AuthorizeOperation(Lease lease, LeaseConstraints? constraints, string namespaceName, string pattern) => + AuthorizeOperation(lease, constraints, namespaceName, pattern, budget: null); + + /// Authorize a single operation, also checking budget counters first. Spec 9.6: the + /// runtime MUST check all budget counters before authorizing any operation through the lease and + /// fail with BUDGET_EXHAUSTED if any counter is <= 0 ? a pre-operation gate, not only a + /// reactive cost-metric error. + public void AuthorizeOperation(Lease lease, LeaseConstraints? constraints, string namespaceName, string pattern, + Arcp.Runtime.Budget.BudgetLedger? budget) { if (constraints?.ExpiresAt is { } exp && _time.GetUtcNow() >= exp) throw new LeaseExpiredException($"Lease expired at {exp:O}"); + // Spec 9.6: pre-operation budget gate. + budget?.AssertNotExhausted(); + if (!lease.Capabilities.TryGetValue(namespaceName, out var allowed) || allowed.Count == 0) throw new PermissionDeniedException($"No lease for namespace '{namespaceName}'"); foreach (var pat in allowed) @@ -153,8 +164,12 @@ internal static bool GlobMatch(string input, string pattern) // Convert to regex-ish prefix/suffix. if (pattern.EndsWith("/**", StringComparison.Ordinal)) { - var prefix = pattern[..^3]; - return input.StartsWith(prefix, StringComparison.Ordinal); + // Keep the trailing separator so the match respects the path boundary (spec ?9.3): + // "/workspace/myapp/**" authorizes "/workspace/myapp" itself and anything strictly + // beneath "/workspace/myapp/", but NOT siblings like "/workspace/myapp-private/x". + var dir = pattern[..^2]; // "/workspace/myapp/" + return input.StartsWith(dir, StringComparison.Ordinal) + || input.Equals(dir[..^1], StringComparison.Ordinal); // the directory itself } if (pattern.EndsWith("*", StringComparison.Ordinal)) { diff --git a/src/Arcp.Runtime/PublicAPI.Unshipped.txt b/src/Arcp.Runtime/PublicAPI.Unshipped.txt index eca9fa7..1b95bde 100644 --- a/src/Arcp.Runtime/PublicAPI.Unshipped.txt +++ b/src/Arcp.Runtime/PublicAPI.Unshipped.txt @@ -114,7 +114,8 @@ Arcp.Runtime.JobContext.TraceId.get -> Arcp.Core.Ids.TraceId? Arcp.Runtime.JobContext.WriteChunkAsync(Arcp.Core.Ids.ResultId resultId, string! text, bool more, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask Arcp.Runtime.JobContext.WriteChunkAsync(Arcp.Core.Ids.ResultId resultId, System.ReadOnlyMemory bytes, bool more, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask Arcp.Runtime.JobManager -Arcp.Runtime.JobManager.Cancel(Arcp.Core.Ids.JobId jobId, string? requesterPrincipal, string? reason) -> bool +Arcp.Runtime.JobManager.Cancel(Arcp.Core.Ids.JobId jobId, Arcp.Core.Ids.SessionId requesterSession, string? reason) -> bool +Arcp.Runtime.JobManager.Dispose() -> void Arcp.Runtime.JobManager.JobManager(Arcp.Runtime.Agents.AgentRegistry! agents, Arcp.Runtime.Leases.LeaseManager! leases, System.TimeProvider! time, Microsoft.Extensions.Logging.ILoggerFactory! loggers) -> void Arcp.Runtime.JobManager.Jobs.get -> System.Collections.Generic.IEnumerable! Arcp.Runtime.JobManager.List(string? requesterPrincipal, Arcp.Runtime.Authorization.IJobAuthorizationPolicy! policy, Arcp.Core.Messages.JobListFilter? filter, int? limit, string? cursor, out string? nextCursor) -> System.Collections.Generic.IReadOnlyList! @@ -197,7 +198,7 @@ Arcp.Runtime.Credentials.InMemoryCredentialStore.RemoveAsync(Arcp.Core.Ids.JobId Arcp.Runtime.Job.Credentials.get -> System.Collections.Generic.IReadOnlyList! Arcp.Runtime.JobContext.Credentials.get -> System.Collections.Generic.IReadOnlyList! Arcp.Runtime.JobContext.RotateCredentialAsync(string! credentialId, Arcp.Core.Messages.ProvisionedCredential! next, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask -Arcp.Runtime.JobManager.JobManager(Arcp.Runtime.Agents.AgentRegistry! agents, Arcp.Runtime.Leases.LeaseManager! leases, System.TimeProvider! time, Microsoft.Extensions.Logging.ILoggerFactory! loggers, Arcp.Runtime.Credentials.CredentialManager? credentials = null, int idempotencyWindowSec = 3600, int eventBufferCapacity = 4096, int terminalRetentionSec = 600, bool fatalBudgetExhaustion = false) -> void +Arcp.Runtime.JobManager.JobManager(Arcp.Runtime.Agents.AgentRegistry! agents, Arcp.Runtime.Leases.LeaseManager! leases, System.TimeProvider! time, Microsoft.Extensions.Logging.ILoggerFactory! loggers, Arcp.Runtime.Credentials.CredentialManager? credentials = null, int idempotencyWindowSec = 3600, int eventBufferCapacity = 4096, int terminalRetentionSec = 600, bool fatalBudgetExhaustion = false, bool permissiveUnleasedOperations = false) -> void Arcp.Runtime.Job.LeaseExpired.get -> bool Arcp.Runtime.Job.MaxRuntimeSec.get -> int? Arcp.Runtime.Job.RuntimeLimitExceeded.get -> bool @@ -209,8 +210,11 @@ Arcp.Runtime.ArcpServerOptions.IdempotencyWindowSec.get -> int Arcp.Runtime.ArcpServerOptions.IdempotencyWindowSec.init -> void Arcp.Runtime.ArcpServerOptions.TerminalJobRetentionSec.get -> int Arcp.Runtime.ArcpServerOptions.TerminalJobRetentionSec.init -> void -Arcp.Runtime.JobManager.SubmitAsync(Arcp.Core.Messages.JobSubmitPayload! submit, Arcp.Core.Ids.SessionId sessionId, string? submitterPrincipal, System.Func! emit, Arcp.Core.Ids.TraceId? inboundTraceId, System.Threading.CancellationToken parentCancellation, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<(Arcp.Runtime.Job! Job, Arcp.Core.Messages.JobAcceptedPayload! Accepted)>! +Arcp.Runtime.JobManager.SubmitAsync(Arcp.Core.Messages.JobSubmitPayload! submit, Arcp.Core.Ids.SessionId sessionId, string? submitterPrincipal, System.Func! emit, Arcp.Core.Ids.TraceId? inboundTraceId, System.Threading.CancellationToken parentCancellation, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<(Arcp.Runtime.Job! Job, Arcp.Core.Messages.JobAcceptedPayload! Accepted, bool IsReplay)>! Arcp.Runtime.Leases.LeaseManager.AuthorizeModelUse(Arcp.Core.Leases.Lease! lease, Arcp.Core.Leases.LeaseConstraints? constraints, string! modelId) -> void +Arcp.Runtime.Leases.LeaseManager.AuthorizeOperation(Arcp.Core.Leases.Lease! lease, Arcp.Core.Leases.LeaseConstraints? constraints, string! namespaceName, string! pattern, Arcp.Runtime.Budget.BudgetLedger? budget) -> void +Arcp.Runtime.ArcpServerOptions.PermissiveUnleasedOperations.get -> bool +Arcp.Runtime.ArcpServerOptions.PermissiveUnleasedOperations.init -> void override Arcp.Runtime.Credentials.CredentialIssueContext.Equals(object? obj) -> bool override Arcp.Runtime.Credentials.CredentialIssueContext.GetHashCode() -> int override Arcp.Runtime.Credentials.CredentialIssueContext.ToString() -> string! diff --git a/src/Arcp.Runtime/SessionState.Dispatch.cs b/src/Arcp.Runtime/SessionState.Dispatch.cs index 00fea34..09cd713 100644 --- a/src/Arcp.Runtime/SessionState.Dispatch.cs +++ b/src/Arcp.Runtime/SessionState.Dispatch.cs @@ -39,7 +39,27 @@ await SendAsync(new Envelope } catch (Exception ex) { + // Spec §12: surface an unexpected failure as session.error{INTERNAL_ERROR} so the + // peer is not left waiting forever for an acknowledgement that never arrives. _logger.LogError(ex, "Dispatch error for type {Type}", env.Type); + try + { + await SendAsync(new Envelope + { + Type = MessageTypeNames.SessionError, + SessionId = SessionId.Value, + Payload = new SessionErrorPayload + { + Code = ErrorCode.InternalError, + Message = "Internal error while processing request", + Retryable = true, + }, + }, cancellationToken).ConfigureAwait(false); + } + catch (Exception sendEx) + { + _logger.LogError(sendEx, "Failed to send INTERNAL_ERROR for type {Type}", env.Type); + } } } } @@ -74,8 +94,11 @@ private async Task DispatchAsync(Envelope env, CancellationToken cancellationTok case MessageTypeNames.SessionListJobs: await HandleListJobsAsync(env, cancellationToken).ConfigureAwait(false); break; + case MessageTypeNames.SessionClose: case MessageTypeNames.SessionBye: - IsClosed = true; + // Spec §6.7: client-sent session.close (or the deprecated session.bye alias) is + // acknowledged with session.closed (emitted by CloseAsync). In-flight jobs are + // rooted at the runtime token, so this does NOT terminate them. await CloseAsync(reason: (env.Payload as SessionByePayload)?.Reason, cancellationToken).ConfigureAwait(false); break; case MessageTypeNames.JobSubmit: @@ -84,10 +107,7 @@ private async Task DispatchAsync(Envelope env, CancellationToken cancellationTok break; case MessageTypeNames.JobCancel: if (env.Payload is JobCancelPayload cancel) - { - if (JobId.TryParse(cancel.JobId, null, out var jid)) - _server.JobManager.Cancel(jid, Principal?.Subject, cancel.Reason); - } + await HandleJobCancelAsync(cancel, cancellationToken).ConfigureAwait(false); break; case MessageTypeNames.JobSubscribe: if (env.Payload is JobSubscribePayload sub) @@ -97,7 +117,10 @@ private async Task DispatchAsync(Envelope env, CancellationToken cancellationTok if (env.Payload is JobUnsubscribePayload unsub) { if (JobId.TryParse(unsub.JobId, null, out var jid)) + { _server.Subscriptions.Unsubscribe(jid, SessionId); + _subscribeMarks.TryRemove(jid, out _); + } } break; default: @@ -106,6 +129,28 @@ private async Task DispatchAsync(Envelope env, CancellationToken cancellationTok } } + private async Task HandleJobCancelAsync(JobCancelPayload cancel, CancellationToken cancellationToken) + { + if (!JobId.TryParse(cancel.JobId, null, out var jid)) + throw new InvalidRequestException("Invalid job_id"); + + // Cancellation authority is scoped to the submitting session (spec §7.6, §14). A foreign + // session throws PERMISSION_DENIED; an unknown job returns false → JOB_NOT_FOUND (spec §12). + var cancelled = _server.JobManager.Cancel(jid, SessionId, cancel.Reason); + if (!cancelled) + throw new JobNotFoundException($"job {cancel.JobId} not found"); + + // Spec §7.4: acknowledge with job.cancelled before the run-loop emits the terminal + // job.error{CANCELLED, final_status:"cancelled"}. + await SendAsync(new Envelope + { + Type = MessageTypeNames.JobCancelled, + SessionId = SessionId.Value, + JobId = jid.Value, + Payload = new JobCancelledPayload { JobId = jid.Value, Reason = cancel.Reason }, + }, cancellationToken).ConfigureAwait(false); + } + private async Task HandlePingAsync(Envelope env, CancellationToken cancellationToken) { if (env.Payload is not SessionPingPayload p) return; diff --git a/src/Arcp.Runtime/SessionState.Jobs.cs b/src/Arcp.Runtime/SessionState.Jobs.cs index b8f664c..5eff7c2 100644 --- a/src/Arcp.Runtime/SessionState.Jobs.cs +++ b/src/Arcp.Runtime/SessionState.Jobs.cs @@ -23,10 +23,15 @@ private async Task HandleJobSubmitAsync(Envelope env, JobSubmitPayload submit, C if (env.TraceId is { Length: > 0 } incoming && TraceId.TryParse(incoming, null, out var parsed)) inboundTraceId = parsed; + // Spec §6.4/§6.7: a job's lifetime is rooted at the runtime, not this session. Session + // teardown (heartbeat loss, graceful close, transport drop) stops streaming to this + // transport but MUST NOT cancel the job — it keeps running and stays resumable. + var jobLifetime = _server.JobManager.RuntimeToken; + try { var submission = await _server.JobManager - .SubmitAsync(submit, SessionId, Principal?.Subject, emit, inboundTraceId, _cts.Token, cancellationToken) + .SubmitAsync(submit, SessionId, Principal?.Subject, emit, inboundTraceId, jobLifetime, cancellationToken) .ConfigureAwait(false); var job = submission.Job; var accepted = submission.Accepted; @@ -40,9 +45,16 @@ await SendAsync(new Envelope Payload = accepted, }, cancellationToken).ConfigureAwait(false); - // Resolve agent and run. + // Spec §7.2: a replay re-acknowledges the existing job but MUST NOT invoke the agent + // again — re-running would re-emit events, re-emit a terminal result, and reset a + // terminal job back to Running. Only fresh submissions are dispatched to the agent. + if (submission.IsReplay) + return; + + // Resolve agent and run. The run task is rooted at the runtime token (not _cts.Token) + // so it outlives this session (spec §6.7). var resolved = _server.AgentRegistry.Resolve(job.Agent).Agent; - _ = Task.Run(() => _server.JobManager.RunAsync(job, resolved, emit, _cts.Token), _cts.Token); + _ = Task.Run(() => _server.JobManager.RunAsync(job, resolved, emit, jobLifetime), jobLifetime); } catch (ArcpException ex) { @@ -77,46 +89,63 @@ private async Task HandleSubscribeAsync(Envelope env, JobSubscribePayload sub, C if (!_options.AuthorizationPolicy.CanObserve(job.SubmitterPrincipal, Principal)) throw new PermissionDeniedException("Subscriber not authorized to observe job"); - // Snapshot prior events BEFORE subscribing so we can replay them, then attach the live - // subscription. Live events from the buffer's tail to "now" could double-fire; we filter - // by event-seq below when re-sending. - var history = sub.History - ? job.SnapshotEventHistory() - : Array.Empty(); var subscriberSeesOwnerSecrets = string.Equals(Principal?.Subject, job.SubmitterPrincipal, StringComparison.Ordinal); - _server.Subscriptions.Subscribe(job.JobId, SessionId); - - await SendAsync(new Envelope + // Spec §7.6: history-snapshot and live-subscription registration MUST be atomic, otherwise an + // event emitted in the window between them is lost (or duplicated). Hold the subscriber's emit + // gate across the whole replay so no live fan-out for this job can interleave ahead of the + // replayed history, and register + snapshot atomically under the job's buffer lock so the + // boundary (highWaterIndex) is exact. Any fanned-out event with JobEventIndex ≤ the boundary + // was already replayed and is skipped by EmitToSubscriberAsync; everything after is delivered + // live exactly once. + await _emitGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try { - Type = MessageTypeNames.JobSubscribed, - SessionId = SessionId.Value, - JobId = job.JobId.Value, - Payload = new JobSubscribedPayload + var history = job.RegisterSubscriberAndSnapshot( + () => _server.Subscriptions.Subscribe(job.JobId, SessionId), + out var highWater); + + // Everything with index ≤ highWater is delivered here (or suppressed if no history was + // requested); live fan-out delivers only index > highWater. Set the boundary now: any + // concurrent fan-out for this job blocks on _emitGate until we release, then honors it. + _subscribeMarks[jid] = highWater; + if (!sub.History) history = Array.Empty(); + + await WriteToOutboundAsync(new Envelope { + Type = MessageTypeNames.JobSubscribed, + SessionId = SessionId.Value, JobId = job.JobId.Value, - CurrentStatus = job.Status.ToString().ToLowerInvariant(), - Agent = job.Agent.ToString(), - Lease = job.Lease, - LeaseConstraints = job.LeaseConstraints, - ParentJobId = job.ParentJobId, - TraceId = job.TraceId?.Value, - SubscribedFrom = EventLog.HighWatermark, - Replayed = sub.History && history.Count > 0, - Credentials = subscriberSeesOwnerSecrets ? job.Credentials : null, - }, - }, cancellationToken).ConfigureAwait(false); + Payload = new JobSubscribedPayload + { + JobId = job.JobId.Value, + CurrentStatus = job.Status.ToString().ToLowerInvariant(), + Agent = job.Agent.ToString(), + Lease = job.Lease, + LeaseConstraints = job.LeaseConstraints, + ParentJobId = job.ParentJobId, + TraceId = job.TraceId?.Value, + SubscribedFrom = EventLog.HighWatermark, + Replayed = sub.History && history.Count > 0, + Credentials = subscriberSeesOwnerSecrets ? job.Credentials : null, + }, + }, cancellationToken).ConfigureAwait(false); - // Spec §7.6: replay matching prior events, in original order, before live events arrive. - var fromSeq = sub.FromEventSeq; - foreach (var historic in history) + // Replay matching prior events, in original order, before live events arrive. + var fromSeq = sub.FromEventSeq; + foreach (var historic in history) + { + if (fromSeq is { } f && historic.JobEventIndex is { } idx && idx <= f) continue; + var rekeyed = (subscriberSeesOwnerSecrets ? historic : RedactForNonOwner(historic, job)) + with + { SessionId = SessionId.Value }; + var stamped = EventLog.Append(rekeyed); + await WriteToOutboundAsync(stamped, cancellationToken).ConfigureAwait(false); + } + } + finally { - if (fromSeq is { } f && historic.EventSeq is { } seq && seq <= f) continue; - var rekeyed = (subscriberSeesOwnerSecrets ? historic : RedactForNonOwner(historic, job)) - with - { SessionId = SessionId.Value }; - var stamped = EventLog.Append(rekeyed); - await SendAsync(stamped, cancellationToken).ConfigureAwait(false); + _emitGate.Release(); } } diff --git a/src/Arcp.Runtime/SessionState.Outbound.cs b/src/Arcp.Runtime/SessionState.Outbound.cs index 234a0b5..c624850 100644 --- a/src/Arcp.Runtime/SessionState.Outbound.cs +++ b/src/Arcp.Runtime/SessionState.Outbound.cs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 using System; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Arcp.Core.Ids; using Arcp.Core.Messages; @@ -70,16 +71,42 @@ await SendAsync(new Envelope private async ValueTask EmitJobEnvelopeAsync(Envelope env, CancellationToken cancellationToken) { - // Append the event to this owning session's log if it's an event/result/error. - var stamped = env.Type is MessageTypeNames.JobEvent or MessageTypeNames.JobResult or MessageTypeNames.JobError - ? EventLog.Append(env) - : env; + var isEvent = env.Type is MessageTypeNames.JobEvent or MessageTypeNames.JobResult or MessageTypeNames.JobError; - await SendAsync(stamped, cancellationToken).ConfigureAwait(false); - FanOutToSubscribers(env, stamped, cancellationToken); + // Spec §8.3: event_seq assignment and enqueue MUST be atomic so the single-reader outbound + // channel delivers events in strictly monotonic order. Concurrent emitters in one session + // (agent, lease watchdog, back-pressure status) are serialized through this gate so a higher + // seq can never be enqueued before a lower one. + await _emitGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + var stamped = isEvent ? EventLog.Append(env) : env; + await WriteToOutboundAsync(stamped, cancellationToken).ConfigureAwait(false); + } + finally + { + _emitGate.Release(); + } + + await FanOutToSubscribersAsync(env, cancellationToken).ConfigureAwait(false); } - private void FanOutToSubscribers(Envelope env, Envelope stamped, CancellationToken cancellationToken) + /// Write to the outbound channel, tolerating a closed channel. Spec §6.7: when the + /// session's transport has dropped, the job keeps running and the event is retained in the + /// EventLog for resume — so a closed channel is a no-op, not a failure that faults the job. + private async ValueTask WriteToOutboundAsync(Envelope env, CancellationToken cancellationToken) + { + try + { + await _outbound.Writer.WriteAsync(env, cancellationToken).ConfigureAwait(false); + } + catch (ChannelClosedException) + { + // Transport gone; event retained in EventLog for replay on resume (spec §6.7). + } + } + + private async ValueTask FanOutToSubscribersAsync(Envelope env, CancellationToken cancellationToken) { // Spec §7.6. if (env.JobId is not { } jobIdStr) return; @@ -88,16 +115,49 @@ private void FanOutToSubscribers(Envelope env, Envelope stamped, CancellationTok foreach (var sub in _server.Subscriptions.SubscribersOf(jid)) { if (sub.Value == SessionId.Value) continue; - _server.GetSession(sub)?.EmitToSubscriber(stamped, cancellationToken); + var session = _server.GetSession(sub); + if (session is not null) + await session.EmitToSubscriberAsync(env, cancellationToken).ConfigureAwait(false); } } - internal void EmitToSubscriber(Envelope env, CancellationToken cancellationToken) + internal async ValueTask EmitToSubscriberAsync(Envelope env, CancellationToken cancellationToken) { - // Re-stamp for the subscriber's session_id and event_seq. + // Re-stamp for the subscriber's session_id and event_seq under the subscriber's emit gate so + // append+enqueue stay atomic (spec §8.3) and the subscriber's wire order is monotonic. var rekeyed = RedactCredentialSecretsForSubscriber(env) with { SessionId = SessionId.Value }; - var stamped = EventLog.Append(rekeyed); - _outbound.Writer.TryWrite(stamped); + bool overflow; + await _emitGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + // Spec §7.6: drop events already covered by the subscribe history replay. The check runs + // under the gate so it observes the exact boundary set while the gate was held by replay. + if (ShouldSkipReplayedEvent(env)) return; + var stamped = EventLog.Append(rekeyed); + // The subscriber channel is bounded. A silent TryWrite drop would leave a gap in the + // subscriber's event_seq (spec §8.3 requires gap-free). If the channel is full, tear the + // subscription down deterministically instead of dropping the event silently. + overflow = !_outbound.Writer.TryWrite(stamped); + } + finally + { + _emitGate.Release(); + } + + if (overflow) + { + _logger.LogWarning( + "Subscriber session {SessionId} outbound is full; closing to avoid a silent event_seq gap (spec §8.3)", + SessionId); + await CloseAsync(reason: "SUBSCRIBER_OVERFLOW", cancellationToken).ConfigureAwait(false); + } + } + + private bool ShouldSkipReplayedEvent(Envelope env) + { + if (env.JobId is not { } jobId || !JobId.TryParse(jobId, null, out var jid)) return false; + if (env.JobEventIndex is not { } idx) return false; + return _subscribeMarks.TryGetValue(jid, out var mark) && idx <= mark; } private Envelope RedactCredentialSecretsForSubscriber(Envelope env) @@ -116,7 +176,17 @@ private Envelope RedactCredentialSecretsForSubscriber(Envelope env) private async ValueTask EmitEventAsync(Envelope env, CancellationToken cancellationToken) { - var stamped = EventLog.Append(env); - await SendAsync(stamped, cancellationToken).ConfigureAwait(false); + // Route through the same emit gate as job events so back-pressure status events keep the + // session's event_seq strictly monotonic (spec §8.3). + await _emitGate.WaitAsync(cancellationToken).ConfigureAwait(false); + try + { + var stamped = EventLog.Append(env); + await WriteToOutboundAsync(stamped, cancellationToken).ConfigureAwait(false); + } + finally + { + _emitGate.Release(); + } } } diff --git a/src/Arcp.Runtime/SessionState.cs b/src/Arcp.Runtime/SessionState.cs index 01d5631..c3a32ee 100644 --- a/src/Arcp.Runtime/SessionState.cs +++ b/src/Arcp.Runtime/SessionState.cs @@ -26,6 +26,15 @@ public sealed partial class SessionState : IAsyncDisposable private readonly Channel _outbound; private readonly CancellationTokenSource _cts; + /// Serializes event_seq assignment with the outbound enqueue so wire order always matches + /// assigned event_seq under concurrent emitters (spec §8.3). + private readonly SemaphoreSlim _emitGate = new(1, 1); + + /// Per-subscribed-job replay boundary (job-local event index). A fanned-out event whose + /// is ≤ this mark was already delivered by the + /// subscribe history replay and is dropped to avoid a duplicate at the boundary (spec §7.6). + private readonly System.Collections.Concurrent.ConcurrentDictionary _subscribeMarks = new(); + private long _lastAckedSeq; private bool _heartbeatNegotiated; private bool _ackNegotiated; @@ -107,9 +116,12 @@ public async ValueTask CloseAsync(string? reason = null, CancellationToken cance IsClosed = true; try { - await SendAsync(new Envelope + // Spec §6.7: the runtime's graceful-close wire type is session.closed. Write it straight + // to the transport (not the outbound channel) so it is flushed before teardown — enqueuing + // it and then cancelling the sender loop would race the ack away. + await _transport.SendAsync(new Envelope { - Type = MessageTypeNames.SessionBye, + Type = MessageTypeNames.SessionClosed, SessionId = SessionId.Value, Payload = new SessionByePayload { Reason = reason }, }, cancellationToken).ConfigureAwait(false); @@ -136,5 +148,6 @@ public async ValueTask DisposeAsync() if (IsClosed) return; await CloseAsync().ConfigureAwait(false); _cts.Dispose(); + _emitGate.Dispose(); } } diff --git a/tests/Arcp.IntegrationTests/AuditFixesTests.cs b/tests/Arcp.IntegrationTests/AuditFixesTests.cs new file mode 100644 index 0000000..3c7e3ff --- /dev/null +++ b/tests/Arcp.IntegrationTests/AuditFixesTests.cs @@ -0,0 +1,290 @@ +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Arcp.Client; +using Arcp.Core.Auth; +using Arcp.Core.Caps; +using Arcp.Core.Errors; +using Arcp.Core.Leases; +using Arcp.Core.Messages; +using Arcp.Core.Transport; +using Arcp.Core.Wire; +using Arcp.Runtime; +using Arcp.Runtime.Authorization; +using FluentAssertions; +using Xunit; + +namespace Arcp.IntegrationTests; + +public class AuditFixesTests +{ + private static (ArcpServer server, MemoryTransport clientT) StartServer(Action configure, + IBearerVerifier? auth = null, IJobAuthorizationPolicy? policy = null, TimeProvider? time = null) + { + var opts = new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + Auth = auth, + AuthorizationPolicy = policy ?? new SamePrincipalPolicy(), + TimeProvider = time ?? TimeProvider.System, + }; + var server = new ArcpServer(opts); + configure(server); + var (client, srv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(srv)); + return (server, client); + } + + // ── #41: session.close / session.closed wire types (spec §6.7) ────────────────────────────── + [Fact] + public void SessionClose_and_SessionClosed_are_registered_wire_types() + { + MessageTypeNames.All.Should().Contain(new[] { MessageTypeNames.SessionClose, MessageTypeNames.SessionClosed }); + MessageTypeRegistry.Default.TryGet(MessageTypeNames.SessionClose, out _).Should().BeTrue(); + MessageTypeRegistry.Default.TryGet(MessageTypeNames.SessionClosed, out _).Should().BeTrue(); + } + + [Fact] + public async Task Runtime_accepts_session_close_and_replies_session_closed() + { + var (_, t) = StartServer(s => s.RegisterAgent("noop", (ctx, ct) => Task.FromResult(null))); + await t.SendAsync(new Envelope + { + Type = MessageTypeNames.SessionHello, + Payload = new SessionHelloPayload + { + Client = new ClientInfo { Name = "t", Version = "1" }, + Capabilities = new Capabilities { Encodings = new[] { "json" }, Features = Array.Empty() }, + }, + }); + + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + Envelope? closed = null; + await foreach (var env in t.ReceiveAsync(cts.Token)) + { + if (env.Type == MessageTypeNames.SessionWelcome) + { + await t.SendAsync(new Envelope + { + Type = MessageTypeNames.SessionClose, + SessionId = env.SessionId, + Payload = new SessionByePayload { Reason = "done" }, + }); + } + else if (env.Type == MessageTypeNames.SessionClosed) + { + closed = env; + break; + } + } + + closed.Should().NotBeNull("the runtime must acknowledge session.close with session.closed"); + } + + // ── #46: model.use advertised independently of credential provisioning (spec §9.7) ────────── + [Fact] + public async Task ModelUse_is_advertised_without_a_credential_provisioner() + { + var (_, t) = StartServer(s => s.RegisterAgent("noop", (ctx, ct) => Task.FromResult(null))); + await using var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions + { + Client = new ClientInfo { Name = "t", Version = "1" }, + }); + c.EffectiveFeatures.Should().Contain(FeatureFlags.ModelUse); + } + + // ── #40: unexpected dispatch exception surfaces INTERNAL_ERROR (spec §12) ──────────────────── + private sealed class ThrowingPolicy : IJobAuthorizationPolicy + { + public bool CanObserve(string? jobSubmitterPrincipal, AuthPrincipal? requestor) => + throw new InvalidOperationException("boom"); + } + + // ── #45 + #40 with two principals on one runtime ──────────────────────────────────────────── + [Fact] + public async Task Empty_principal_sees_no_jobs_and_throwing_policy_surfaces_INTERNAL_ERROR() + { + var server = new ArcpServer(new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + Auth = new MappingVerifier(), + AuthorizationPolicy = new SamePrincipalPolicy(), + }); + server.RegisterAgent("sleeper", async (ctx, ct) => { await Task.Delay(3000, ct); return null; }); + + var (aliceT, aliceSrv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(aliceSrv)); + await using var alice = await ArcpClient.ConnectAsync(aliceT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "alice", Version = "1" }, + Token = "alice", + }); + await alice.SubmitAsync("sleeper"); + + // alice sees her own job. + (await alice.ListJobsAsync()).Jobs.Should().ContainSingle(); + + // A session whose principal subject is empty must see nothing (fail-closed, spec §6.6/§14). + var (ghostT, ghostSrv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(ghostSrv)); + await using var ghost = await ArcpClient.ConnectAsync(ghostT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "ghost", Version = "1" }, + Token = "ghost", // MappingVerifier maps this to an EMPTY subject + }); + (await ghost.ListJobsAsync()).Jobs.Should().BeEmpty(); + } + + private sealed class MappingVerifier : IBearerVerifier + { + public ValueTask VerifyAsync(string? token, CancellationToken cancellationToken = default) => + ValueTask.FromResult(token switch + { + "ghost" => new AuthPrincipal(string.Empty), + null or "" => null, + _ => new AuthPrincipal(token), + }); + } + + [Fact] + public async Task ListJobs_rejected_by_throwing_policy_surfaces_INTERNAL_ERROR() + { + var server = new ArcpServer(new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + Auth = new AllowAnyBearerVerifier(), + AuthorizationPolicy = new ThrowingPolicy(), + }); + server.RegisterAgent("sleeper", async (ctx, ct) => { await Task.Delay(3000, ct); return null; }); + + var (aliceT, aliceSrv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(aliceSrv)); + await using var alice = await ArcpClient.ConnectAsync(aliceT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "alice", Version = "1" }, + Token = "alice", + }); + await alice.SubmitAsync("sleeper"); + + var (bobT, bobSrv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(bobSrv)); + await using var bob = await ArcpClient.ConnectAsync(bobT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "bob", Version = "1" }, + Token = "bob", + }); + + // bob listing forces ThrowingPolicy.CanObserve over alice's job → unexpected exception → + // session.error{INTERNAL_ERROR}; #73 makes the awaiting ListJobsAsync throw it. + var act = async () => await bob.ListJobsAsync().WaitAsync(TimeSpan.FromSeconds(3)); + (await act.Should().ThrowAsync()).Which.Code.Should().Be(ErrorCode.InternalError); + } + + // ── #37: a job survives session teardown and is not cancelled (spec §6.4, §6.7) ────────────── + [Fact] + public async Task Job_keeps_running_after_its_session_transport_drops() + { + var started = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var release = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var finished = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var (_, t) = StartServer(s => s.RegisterAgent("long", async (ctx, ct) => + { + started.TrySetResult(); + // Honors the job token: if the job were cancelled on session close this would throw. + await release.Task.WaitAsync(ct); + finished.TrySetResult(); + return "ok"; + })); + + var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } }); + await c.SubmitAsync("long"); + await started.Task.WaitAsync(TimeSpan.FromSeconds(3)); + + // Drop the session. + await c.DisposeAsync(); + + // The job must still be alive: releasing it now lets it complete (it would have thrown on a + // cancelled token if session teardown had terminated it). + release.TrySetResult(); + await finished.Task.WaitAsync(TimeSpan.FromSeconds(3)); + } + + // ── #43: deny-by-default for uncovered tool.call / agent.delegate (spec §9.3) ───────────────── + [Fact] + public async Task ToolCall_without_a_lease_namespace_is_denied_by_default() + { + var (_, t) = StartServer(s => s.RegisterAgent("tooler", async (ctx, ct) => + { + await ctx.ToolCallAsync("fs.write", "c1", new { path = "/x" }, ct); + return "done"; + })); + await using var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } }); + var handle = await c.SubmitAsync("tooler"); + var result = await handle.Result.WaitAsync(TimeSpan.FromSeconds(3)); + result.Success.Should().BeFalse(); + result.Error!.Code.Should().Be(ErrorCode.PermissionDenied); + } + + [Fact] + public async Task ToolCall_is_allowed_when_PermissiveUnleasedOperations_is_enabled() + { + var server = new ArcpServer(new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + PermissiveUnleasedOperations = true, + }); + server.RegisterAgent("tooler", async (ctx, ct) => + { + await ctx.ToolCallAsync("fs.write", "c1", new { path = "/x" }, ct); + return "done"; + }); + var (clientT, srv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(srv)); + await using var c = await ArcpClient.ConnectAsync(clientT, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } }); + var handle = await c.SubmitAsync("tooler"); + var result = await handle.Result.WaitAsync(TimeSpan.FromSeconds(3)); + result.Success.Should().BeTrue(); + } + + // ── #67: keyset pagination is stable and bounded, even with identical CreatedAt ────────────── + private sealed class FixedTimeProvider : TimeProvider + { + private readonly DateTimeOffset _now; + public FixedTimeProvider(DateTimeOffset now) { _now = now; } + public override DateTimeOffset GetUtcNow() => _now; + } + + [Fact] + public async Task ListJobs_pages_stably_through_jobs_that_share_a_CreatedAt() + { + var fixedTime = new FixedTimeProvider(DateTimeOffset.Parse("2026-06-11T00:00:00Z")); + var (_, t) = StartServer( + s => s.RegisterAgent("sleeper", async (ctx, ct) => { await Task.Delay(4000, ct); return null; }), + time: fixedTime); + await using var c = await ArcpClient.ConnectAsync(t, new ArcpClientOptions { Client = new ClientInfo { Name = "t", Version = "1" } }); + + const int total = 5; + for (var i = 0; i < total; i++) await c.SubmitAsync("sleeper"); + + // Page through with a small limit; collect every job id exactly once. + var seen = new List(); + string? cursor = null; + var pages = 0; + do + { + var page = await c.ListJobsAsync(limit: 2, cursor: cursor); + page.Jobs.Count.Should().BeLessThanOrEqualTo(2, "limit must bound page size"); + seen.AddRange(page.Jobs.Select(j => j.JobId)); + cursor = page.NextCursor; + pages++; + } + while (cursor is not null && pages < 10); + + seen.Should().HaveCount(total); + seen.Distinct().Should().HaveCount(total, "pagination must not duplicate or drop jobs across pages"); + } +} diff --git a/tests/Arcp.IntegrationTests/CancellationAckTests.cs b/tests/Arcp.IntegrationTests/CancellationAckTests.cs new file mode 100644 index 0000000..5379b93 --- /dev/null +++ b/tests/Arcp.IntegrationTests/CancellationAckTests.cs @@ -0,0 +1,177 @@ +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.Threading; +using System.Threading.Tasks; +using Arcp.Client; +using Arcp.Core.Auth; +using Arcp.Core.Caps; +using Arcp.Core.Errors; +using Arcp.Core.Messages; +using Arcp.Core.Transport; +using Arcp.Core.Wire; +using Arcp.Runtime; +using FluentAssertions; +using Xunit; + +namespace Arcp.IntegrationTests; + +public class CancellationAckTests +{ + private static (ArcpServer server, MemoryTransport clientT) StartServer(Action configure, IBearerVerifier? auth = null) + { + var server = new ArcpServer(new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + Auth = auth, + }); + configure(server); + var (client, srv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(srv)); + return (server, client); + } + + private static Envelope Hello() => new() + { + Type = MessageTypeNames.SessionHello, + Payload = new SessionHelloPayload + { + Client = new ClientInfo { Name = "t", Version = "1" }, + Capabilities = new Capabilities { Encodings = new[] { "json" }, Features = Array.Empty() }, + }, + }; + + [Fact] + public async Task Successful_cancel_acks_with_job_cancelled_then_errors_with_CANCELLED() + { + // Spec §7.4: the runtime acknowledges with job.cancelled AND emits job.error{CANCELLED}. + var (_, t) = StartServer(s => s.RegisterAgent("sleeper", async (ctx, ct) => + { + await Task.Delay(Timeout.InfiniteTimeSpan, ct); + return null; + })); + + await t.SendAsync(Hello()); + + string? sessionId = null; + string? jobId = null; + Envelope? cancelledEnv = null; + Envelope? errorEnv = null; + var sawCancelledBeforeError = false; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + await foreach (var env in t.ReceiveAsync(cts.Token)) + { + switch (env.Type) + { + case MessageTypeNames.SessionWelcome: + sessionId = env.SessionId; + await t.SendAsync(new Envelope + { + Type = MessageTypeNames.JobSubmit, + SessionId = sessionId, + Payload = new JobSubmitPayload { Agent = "sleeper" }, + }); + break; + case MessageTypeNames.JobAccepted: + jobId = env.JobId; + await t.SendAsync(new Envelope + { + Type = MessageTypeNames.JobCancel, + SessionId = sessionId, + JobId = jobId, + Payload = new JobCancelPayload { JobId = jobId!, Reason = "stop" }, + }); + break; + case MessageTypeNames.JobCancelled: + cancelledEnv = env; + break; + case MessageTypeNames.JobError: + errorEnv = env; + sawCancelledBeforeError = cancelledEnv is not null; + break; + } + if (cancelledEnv is not null && errorEnv is not null) break; + } + + cancelledEnv.Should().NotBeNull(); + ((JobCancelledPayload)cancelledEnv!.Payload!).JobId.Should().Be(jobId); + sawCancelledBeforeError.Should().BeTrue("the cancel ack must precede the terminal job.error"); + errorEnv.Should().NotBeNull(); + var err = (JobErrorPayload)errorEnv!.Payload!; + err.Code.Should().Be(ErrorCode.Cancelled); + err.FinalStatus.Should().Be("cancelled"); + } + + [Fact] + public async Task Cancel_of_unknown_job_yields_JOB_NOT_FOUND() + { + // Spec §12: cancelling a job the runtime does not know about surfaces JOB_NOT_FOUND + // rather than being silently dropped. + var (_, t) = StartServer(s => s.RegisterAgent("noop", (ctx, ct) => Task.FromResult(null))); + await t.SendAsync(Hello()); + + var unknownJobId = Arcp.Core.Ids.JobId.New().Value; + Envelope? errEnv = null; + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + await foreach (var env in t.ReceiveAsync(cts.Token)) + { + if (env.Type == MessageTypeNames.SessionWelcome) + { + await t.SendAsync(new Envelope + { + Type = MessageTypeNames.JobCancel, + SessionId = env.SessionId, + JobId = unknownJobId, + Payload = new JobCancelPayload { JobId = unknownJobId, Reason = "x" }, + }); + } + else if (env.Type == MessageTypeNames.SessionError) + { + errEnv = env; + break; + } + } + + errEnv.Should().NotBeNull(); + ((SessionErrorPayload)errEnv!.Payload!).Code.Should().Be(ErrorCode.JobNotFound); + } + + [Fact] + public async Task Second_session_of_same_principal_cannot_cancel_anothers_job() + { + // Spec §7.6/§14: cancellation is reserved for the submitting *session*, not merely the + // principal. A second session of the same principal MUST NOT be able to cancel. + var (server, ownerT) = StartServer(s => s.RegisterAgent("longish", async (ctx, ct) => + { + await Task.Delay(500, ct); + return "ok"; + }), new AllowAnyBearerVerifier()); + + await using var owner = await ArcpClient.ConnectAsync(ownerT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "owner", Version = "1" }, + Token = "principal-alice", + }); + var handle = await owner.SubmitAsync("longish"); + + var (otherT, srv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(srv)); + await using var other = await ArcpClient.ConnectAsync(otherT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "other", Version = "1" }, + Token = "principal-alice", // SAME principal, DIFFERENT session. + }); + + await otherT.SendAsync(new Envelope + { + Type = MessageTypeNames.JobCancel, + SessionId = other.SessionId.Value, + JobId = handle.JobId.Value, + Payload = new JobCancelPayload { JobId = handle.JobId.Value, Reason = "nope" }, + }); + + // The cancel is denied (session-scoped authority) → the owner's job completes successfully. + var result = await handle.Result.WaitAsync(TimeSpan.FromSeconds(3)); + result.Success.Should().BeTrue(); + } +} diff --git a/tests/Arcp.IntegrationTests/ClientGapDetectionTests.cs b/tests/Arcp.IntegrationTests/ClientGapDetectionTests.cs new file mode 100644 index 0000000..b78d578 --- /dev/null +++ b/tests/Arcp.IntegrationTests/ClientGapDetectionTests.cs @@ -0,0 +1,81 @@ +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.Threading; +using System.Threading.Tasks; +using Arcp.Client; +using Arcp.Core.Caps; +using Arcp.Core.Messages; +using Arcp.Core.Transport; +using Arcp.Core.Wire; +using FluentAssertions; +using Xunit; + +namespace Arcp.IntegrationTests; + +public class ClientGapDetectionTests +{ + // Spec §8.3: a client that receives an event_seq which skips the expected successor SHOULD treat + // the session as broken. This drives a minimal hand-rolled runtime that emits event_seq 1 then 3 + // (skipping 2) and asserts the client raises a detectable broken-session signal. + [Fact] + public async Task Client_detects_an_event_seq_gap() + { + var (clientT, srv) = MemoryTransport.Pair(); + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5)); + + var server = Task.Run(async () => + { + await foreach (var env in srv.ReceiveAsync(cts.Token)) + { + if (env.Type != MessageTypeNames.SessionHello) continue; + await srv.SendAsync(new Envelope + { + Type = MessageTypeNames.SessionWelcome, + SessionId = "sess_gap", + Payload = new SessionWelcomePayload + { + Runtime = new RuntimeInfo { Name = "rt", Version = "1" }, + ResumeToken = "rt_x", + Capabilities = new Capabilities { Encodings = new[] { "json" }, Features = Array.Empty() }, + }, + }); + + await srv.SendAsync(JobEvent(1)); + await srv.SendAsync(JobEvent(3)); // gap: expected 2 + return; + } + }, cts.Token); + + var gap = new TaskCompletionSource<(long Expected, long Received)>(TaskCreationOptions.RunContinuationsAsynchronously); + var client = new ArcpClient(clientT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "t", Version = "1" }, + Features = Array.Empty(), + }); + client.EventSeqGapDetected += (expected, received) => gap.TrySetResult((expected, received)); + + await client.ConnectAsync(cts.Token); + + var observed = await gap.Task.WaitAsync(TimeSpan.FromSeconds(5)); + observed.Expected.Should().Be(2); + observed.Received.Should().Be(3); + client.IsSessionBroken.Should().BeTrue(); + + await client.DisposeAsync(); + await server; + } + + private static Envelope JobEvent(long seq) => new() + { + Type = MessageTypeNames.JobEvent, + SessionId = "sess_gap", + JobId = "job_1", + EventSeq = seq, + Payload = new JobEventPayload + { + Kind = EventKinds.Log, + Ts = DateTimeOffset.UtcNow, + Body = ArcpJson.ToJsonElement(new { msg = "x" }), + }, + }; +} diff --git a/tests/Arcp.IntegrationTests/EndToEndTests.cs b/tests/Arcp.IntegrationTests/EndToEndTests.cs index 59fcf66..6976f6f 100644 --- a/tests/Arcp.IntegrationTests/EndToEndTests.cs +++ b/tests/Arcp.IntegrationTests/EndToEndTests.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Arcp.Client; using Arcp.Core.Caps; +using Arcp.Core.Errors; using Arcp.Core.Messages; using Arcp.Core.Transport; using Arcp.Runtime; @@ -79,11 +80,11 @@ public async Task Unknown_agent_version_returns_session_error() Client = new ClientInfo { Name = "test", Version = "1.0" }, }); - // Submitting an unknown version yields a session.error; the SubmitAsync await never - // resolves because no job.accepted is emitted. Wrap in a timeout to confirm rejection. - var submitTask = client.SubmitAsync("code-refactor@9.9.9"); - var completed = await Task.WhenAny(submitTask, Task.Delay(800)); - completed.Should().NotBeSameAs(submitTask); + // Submitting an unknown version is rejected with a session.error; the awaiting SubmitAsync + // MUST surface that as a thrown ArcpException (it used to hang until cancellation). + var act = async () => await client.SubmitAsync("code-refactor@9.9.9").WaitAsync(TimeSpan.FromSeconds(3)); + await act.Should().ThrowAsync() + .Where(e => e.Code == ErrorCode.AgentVersionNotAvailable); } [Fact] diff --git a/tests/Arcp.IntegrationTests/IdempotencyTests.cs b/tests/Arcp.IntegrationTests/IdempotencyTests.cs index f1f45d1..29a2d1d 100644 --- a/tests/Arcp.IntegrationTests/IdempotencyTests.cs +++ b/tests/Arcp.IntegrationTests/IdempotencyTests.cs @@ -74,10 +74,39 @@ public async Task Mismatched_input_with_same_key_raises_session_error() var first = await c.SubmitAsync("echo", new { x = 1 }, idempotencyKey: "key-2"); first.JobId.Value.Should().NotBeNullOrEmpty(); - // Submitting again with the same key but a different payload should never resolve to - // an acceptance — the server emits a session.error with DUPLICATE_KEY. - var submitTask = c.SubmitAsync("echo", new { x = 99 }, idempotencyKey: "key-2"); - var completed = await Task.WhenAny(submitTask, Task.Delay(800)); - completed.Should().NotBeSameAs(submitTask); + // Submitting again with the same key but a different payload is rejected with DUPLICATE_KEY. + // The server's session.error MUST reach the awaiting SubmitAsync (it used to hang forever). + var act = async () => await c.SubmitAsync("echo", new { x = 99 }, idempotencyKey: "key-2") + .WaitAsync(TimeSpan.FromSeconds(3)); + await act.Should().ThrowAsync(); + } + + [Fact] + public async Task Idempotent_replay_does_not_run_the_agent_twice() + { + // Spec §7.2: a duplicate idempotent submit re-acknowledges the existing job but MUST NOT + // invoke the agent a second time (no re-emitted events, no second terminal, no status reset). + var runCount = 0; + var (_, transport) = StartServer(s => s.RegisterAgent("counter", async (ctx, ct) => + { + Interlocked.Increment(ref runCount); + await Task.Delay(50, ct); + return "done"; + })); + await using var c = await ArcpClient.ConnectAsync(transport, new ArcpClientOptions + { + Client = new ClientInfo { Name = "t", Version = "1" }, + }); + + var first = await c.SubmitAsync("counter", new { x = 1 }, idempotencyKey: "dup"); + var firstResult = await first.Result.WaitAsync(TimeSpan.FromSeconds(3)); + firstResult.Success.Should().BeTrue(); + + var second = await c.SubmitAsync("counter", new { x = 1 }, idempotencyKey: "dup"); + second.JobId.Value.Should().Be(first.JobId.Value); + + // Allow time for any (erroneous) replayed run to fire before asserting. + await Task.Delay(200); + runCount.Should().Be(1); } } diff --git a/tests/Arcp.IntegrationTests/JobContextEventsTests.cs b/tests/Arcp.IntegrationTests/JobContextEventsTests.cs index a5a87d3..caf12fd 100644 --- a/tests/Arcp.IntegrationTests/JobContextEventsTests.cs +++ b/tests/Arcp.IntegrationTests/JobContextEventsTests.cs @@ -20,6 +20,9 @@ public async Task All_event_kinds_round_trip_through_client() var server = new ArcpServer(new ArcpServerOptions { Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + // This test exercises event round-tripping, not lease enforcement; allow uncovered + // tool.call / agent.delegate operations (spec §9.3 deny-by-default is covered elsewhere). + PermissiveUnleasedOperations = true, }); server.RegisterAgent("emitter", async (ctx, ct) => { diff --git a/tests/Arcp.IntegrationTests/JobListingTests.cs b/tests/Arcp.IntegrationTests/JobListingTests.cs index 1ecd958..480537a 100644 --- a/tests/Arcp.IntegrationTests/JobListingTests.cs +++ b/tests/Arcp.IntegrationTests/JobListingTests.cs @@ -4,6 +4,7 @@ using System.Threading.Tasks; using Arcp.Client; using Arcp.Core.Caps; +using Arcp.Core.Errors; using Arcp.Core.Messages; using Arcp.Core.Transport; using Arcp.Runtime; @@ -100,4 +101,52 @@ public async Task ListJobsAsync_paginates_via_cursor() secondPage.Jobs.Should().HaveCount(1); secondPage.NextCursor.Should().BeNull(); } + + [Fact] + public async Task ListJobsAsync_throws_when_server_returns_session_error() + { + // A list_jobs request rejected by the server (here: feature not negotiated → INVALID_REQUEST) + // emits a session.error. The awaiting ListJobsAsync MUST throw, not hang until cancellation. + var (_, transport) = StartServer(s => + s.RegisterAgent("noop", (ctx, ct) => Task.FromResult(null))); + await using var c = await ArcpClient.ConnectAsync(transport, new ArcpClientOptions + { + Client = new ClientInfo { Name = "t", Version = "1" }, + // list_jobs feature intentionally NOT negotiated → server rejects with INVALID_REQUEST. + Features = Array.Empty(), + }); + + var act = async () => await c.ListJobsAsync().WaitAsync(TimeSpan.FromSeconds(3)); + await act.Should().ThrowAsync() + .Where(e => e.Code == ErrorCode.InvalidRequest); + } + + [Fact] + public async Task ListJobsAsync_reports_last_event_seq_for_jobs_that_emitted_events() + { + // Spec §6.6: each listed job carries last_event_seq so a dashboard knows where to subscribe + // from. A running job that has emitted events MUST report a non-null, monotonic value. + var gate = new TaskCompletionSource(); + var (_, transport) = StartServer(s => s.RegisterAgent("emitter", async (ctx, ct) => + { + await ctx.StatusAsync("phase-1", "working", ct); + await ctx.ProgressAsync(1, 10, "items", null, ct); + gate.SetResult(); + await Task.Delay(System.Threading.Timeout.InfiniteTimeSpan, ct); + return null; + })); + await using var c = await ArcpClient.ConnectAsync(transport, new ArcpClientOptions + { + Client = new ClientInfo { Name = "t", Version = "1" }, + Features = new[] { FeatureFlags.ListJobs }, + }); + + await c.SubmitAsync("emitter"); + await gate.Task.WaitAsync(TimeSpan.FromSeconds(3)); + + var page = await c.ListJobsAsync(); + var entry = page.Jobs.Should().ContainSingle().Subject; + entry.LastEventSeq.Should().NotBeNull(); + entry.LastEventSeq.Should().BeGreaterThan(0); + } } diff --git a/tests/Arcp.IntegrationTests/SubscriptionOrderingTests.cs b/tests/Arcp.IntegrationTests/SubscriptionOrderingTests.cs new file mode 100644 index 0000000..cde40ee --- /dev/null +++ b/tests/Arcp.IntegrationTests/SubscriptionOrderingTests.cs @@ -0,0 +1,125 @@ +// SPDX-License-Identifier: Apache-2.0 +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Arcp.Client; +using Arcp.Core.Caps; +using Arcp.Core.Messages; +using Arcp.Core.Transport; +using Arcp.Runtime; +using FluentAssertions; +using Xunit; + +namespace Arcp.IntegrationTests; + +public class SubscriptionOrderingTests +{ + // ── #39: under concurrent emitters in one session, delivered event_seq is strictly increasing ── + [Fact] + public async Task Concurrent_emitters_produce_strictly_monotonic_event_seq() + { + var server = new ArcpServer(new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + }); + const int writers = 4; + const int perWriter = 25; + server.RegisterAgent("fanout", async (ctx, ct) => + { + var tasks = new List(); + for (var k = 0; k < writers; k++) + { + var id = k; + tasks.Add(Task.Run(async () => + { + for (var i = 0; i < perWriter; i++) + await ctx.LogAsync("info", $"{id}-{i}", ct); + }, ct)); + } + await Task.WhenAll(tasks); + return "done"; + }); + var (clientT, srv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(srv)); + + await using var c = await ArcpClient.ConnectAsync(clientT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "t", Version = "1" }, + }); + var handle = await c.SubmitAsync("fanout"); + + var seqs = new List(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await foreach (var ev in handle.Events(cts.Token)) + { + if (ev.Kind == "log") seqs.Add(ev.EventSeq); + if (seqs.Count >= writers * perWriter) break; + } + + seqs.Should().HaveCount(writers * perWriter); + for (var i = 1; i < seqs.Count; i++) + seqs[i].Should().BeGreaterThan(seqs[i - 1], "event_seq must be strictly increasing with no reordering (spec §8.3)"); + } + + // ── #44: a subscriber attaching during concurrent emission sees each event exactly once, ordered ── + [Fact] + public async Task Subscriber_attaching_mid_stream_sees_every_event_exactly_once() + { + var server = new ArcpServer(new ArcpServerOptions + { + Runtime = new RuntimeInfo { Name = "test-runtime", Version = "1.0.0" }, + }); + + const int total = 40; + var subscribeNow = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + server.RegisterAgent("emitter", async (ctx, ct) => + { + for (var i = 1; i <= total; i++) + { + await ctx.LogAsync("info", i.ToString(), ct); + if (i == 5) subscribeNow.TrySetResult(); // let a subscriber attach mid-stream + await Task.Delay(3, ct); + } + return "done"; + }); + + var (ownerT, ownerSrv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(ownerSrv)); + await using var owner = await ArcpClient.ConnectAsync(ownerT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "owner", Version = "1" }, + }); + var handle = await owner.SubmitAsync("emitter"); + + await subscribeNow.Task.WaitAsync(TimeSpan.FromSeconds(3)); + + var (watchT, watchSrv) = MemoryTransport.Pair(); + _ = Task.Run(() => server.AcceptAsync(watchSrv)); + await using var watcher = await ArcpClient.ConnectAsync(watchT, new ArcpClientOptions + { + Client = new ClientInfo { Name = "watcher", Version = "1" }, + Features = new[] { FeatureFlags.Subscribe }, + }); + + var sub = await watcher.SubscribeAsync(handle.JobId, history: true); + + var received = new List(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + await foreach (var ev in sub.Events(cts.Token)) + { + if (ev.Kind == "log") + { + received.Add(int.Parse(ev.BodyAs()!.Message)); + if (received.Count >= total) break; + } + } + + // Exactly once, in order, with no gap or duplicate at the subscribe boundary (spec §7.6). + received.Should().HaveCount(total); + received.Distinct().Should().HaveCount(total, "no event may be duplicated at the subscribe boundary"); + received.Should().BeInAscendingOrder("events must arrive in order"); + received.Should().Equal(Enumerable.Range(1, total), "no event may be lost at the subscribe boundary"); + } +} diff --git a/tests/Arcp.UnitTests/AuditFixesUnitTests.cs b/tests/Arcp.UnitTests/AuditFixesUnitTests.cs new file mode 100644 index 0000000..9d657a1 --- /dev/null +++ b/tests/Arcp.UnitTests/AuditFixesUnitTests.cs @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Arcp.Core.Errors; +using Arcp.Core.Leases; +using Arcp.Runtime.Agents; +using Arcp.Runtime.Budget; +using Arcp.Runtime.Leases; +using FluentAssertions; +using Xunit; + +namespace Arcp.UnitTests; + +public class AuditFixesUnitTests +{ + [Fact] + public void AuthorizeOperation_fails_with_BUDGET_EXHAUSTED_once_a_counter_hits_zero() + { + // Spec §9.6: budget counters are a pre-operation gate. Once any counter is ≤ 0, the next + // lease-authorized operation MUST fail with BUDGET_EXHAUSTED via the enforcement path — + // not only reactively from a cost metric. + var lease = new Lease(new Dictionary> + { + [LeaseNamespaces.CostBudget] = new[] { "USD:1.00" }, + [LeaseNamespaces.ToolCall] = new[] { "*" }, + }); + var manager = new LeaseManager(); + + var fresh = new BudgetLedger(); + fresh.Initialize(lease); + var ok = () => manager.AuthorizeOperation(lease, null, LeaseNamespaces.ToolCall, "search.web", fresh); + ok.Should().NotThrow("budget is not yet exhausted"); + + var exhausted = new BudgetLedger(); + exhausted.Initialize(lease); + exhausted.ApplyMetric("cost.search", 1.00, "USD"); // remaining → 0 + var act = () => manager.AuthorizeOperation(lease, null, LeaseNamespaces.ToolCall, "search.web", exhausted); + act.Should().Throw(); + } + + [Fact] + public async Task AgentRegistry_RegisterVersion_and_ToInventory_are_concurrency_safe() + { + // Spec §7.5 hygiene: ToInventory must obtain a consistent per-agent snapshot without + // enumerating mutable state that a concurrent RegisterVersion is writing. + var registry = new AgentRegistry(); + registry.Register("triage", new DelegateAgent((_, _) => Task.FromResult(null))); + + using var cts = new CancellationTokenSource(); + var writer = Task.Run(() => + { + var n = 0; + while (!cts.Token.IsCancellationRequested) + { + registry.RegisterVersion("triage", $"1.0.{n++}", new DelegateAgent((_, _) => Task.FromResult(null))); + } + }); + + var reader = Task.Run(() => + { + for (var i = 0; i < 5000; i++) + { + var inventory = registry.ToInventory(); + inventory.Should().NotBeNull(); + } + }); + + await reader; // must complete without InvalidOperationException from concurrent mutation + cts.Cancel(); + await writer; + } +} diff --git a/tests/Arcp.UnitTests/LeaseTests.cs b/tests/Arcp.UnitTests/LeaseTests.cs index b49565e..0d84306 100644 --- a/tests/Arcp.UnitTests/LeaseTests.cs +++ b/tests/Arcp.UnitTests/LeaseTests.cs @@ -96,4 +96,41 @@ public void GlobMatch_is_permissive_for_double_star_and_strict_otherwise() ok.Should().NotThrow(); bad.Should().Throw(); } + + [Fact] + public void GlobMatch_double_star_respects_the_path_boundary() + { + // Spec §9.2/§9.3: a "/prefix/**" grant must not authorize sibling paths that merely + // share the string prefix. The trailing separator is the enforcement boundary. + Arcp.Runtime.Leases.LeaseManager.GlobMatch("/workspace/myapp/src/a.cs", "/workspace/myapp/**") + .Should().BeTrue(); + Arcp.Runtime.Leases.LeaseManager.GlobMatch("/workspace/myapp/a", "/workspace/myapp/**") + .Should().BeTrue(); + // The directory itself is covered. + Arcp.Runtime.Leases.LeaseManager.GlobMatch("/workspace/myapp", "/workspace/myapp/**") + .Should().BeTrue(); + // Siblings sharing the textual prefix MUST NOT match. + Arcp.Runtime.Leases.LeaseManager.GlobMatch("/workspace/myapp-private/secret", "/workspace/myapp/**") + .Should().BeFalse(); + Arcp.Runtime.Leases.LeaseManager.GlobMatch("/workspace/myapp.bak", "/workspace/myapp/**") + .Should().BeFalse(); + } + + [Fact] + public void AuthorizeOperation_double_star_does_not_leak_to_sibling_directories() + { + // End-to-end gate: fs.read:["/workspace/myapp/**"] authorizes files under the directory + // but rejects a sibling like /workspace/myapp-private (spec §9.3). + var manager = new Arcp.Runtime.Leases.LeaseManager(); + var lease = new Lease(new Dictionary> + { + [LeaseNamespaces.FsRead] = new[] { "/workspace/myapp/**" }, + }); + + var ok = () => manager.AuthorizeOperation(lease, null, LeaseNamespaces.FsRead, "/workspace/myapp/src/a.cs"); + var sibling = () => manager.AuthorizeOperation(lease, null, LeaseNamespaces.FsRead, "/workspace/myapp-private/secret"); + + ok.Should().NotThrow(); + sibling.Should().Throw(); + } }