diff --git a/recipes/_common/RecipeHarness.fs b/recipes/_common/RecipeHarness.fs index cc1ad66..97c1688 100644 --- a/recipes/_common/RecipeHarness.fs +++ b/recipes/_common/RecipeHarness.fs @@ -44,7 +44,7 @@ let connectWithOptions Features = features BearerVerifier = DevModeBearerVerifier() :> IBearerVerifier } |> configureOptions - let server = ArcpServer(options) + let server = new ArcpServer(options) configureServer server let clientTransport, serverTransport = MemoryTransport.CreatePair() let serverTask = server.HandleSessionAsync(serverTransport, cts.Token) diff --git a/samples/AspNetCore/Program.fs b/samples/AspNetCore/Program.fs index b70b5c3..a2401c5 100644 --- a/samples/AspNetCore/Program.fs +++ b/samples/AspNetCore/Program.fs @@ -15,7 +15,7 @@ let main argv = let builder = WebApplication.CreateBuilder(argv) let server = - ArcpServer( + new ArcpServer( { ArcpServerOptions.defaults with Features = Features.All } diff --git a/samples/CustomAuth/Program.fs b/samples/CustomAuth/Program.fs index c6f6636..2e9524a 100644 --- a/samples/CustomAuth/Program.fs +++ b/samples/CustomAuth/Program.fs @@ -29,7 +29,7 @@ let main _argv = let cts = new System.Threading.CancellationTokenSource() let server = - ArcpServer( + new ArcpServer( { ArcpServerOptions.defaults with BearerVerifier = FixedBearerVerifier "secret" :> IBearerVerifier Features = Features.All diff --git a/samples/Giraffe/Program.fs b/samples/Giraffe/Program.fs index 62adb71..61f8813 100644 --- a/samples/Giraffe/Program.fs +++ b/samples/Giraffe/Program.fs @@ -15,7 +15,7 @@ let main argv = let builder = WebApplication.CreateBuilder(argv) let server = - ArcpServer( + new ArcpServer( { ArcpServerOptions.defaults with Features = Features.All } diff --git a/samples/LiteLLM/Program.fs b/samples/LiteLLM/Program.fs index f652344..312bc4b 100644 --- a/samples/LiteLLM/Program.fs +++ b/samples/LiteLLM/Program.fs @@ -89,7 +89,7 @@ type LiteLLMProvisioner(baseUrl: Uri, adminKey: string, http: HttpClient) = task { let body = {| key = credentialId |} :> obj let! _ = postJsonAsync "/key/delete" body ct - return true + return RevocationOutcome.Revoked } [] diff --git a/samples/ProvisionedCredentials/Program.fs b/samples/ProvisionedCredentials/Program.fs index 33d9fcd..6a3bc61 100644 --- a/samples/ProvisionedCredentials/Program.fs +++ b/samples/ProvisionedCredentials/Program.fs @@ -31,7 +31,7 @@ type StaticProvisioner(revoked: HashSet) = member _.RevokeAsync(credentialId, _ct) = lock revoked (fun () -> revoked.Add credentialId |> ignore) - Task.FromResult true + Task.FromResult RevocationOutcome.Revoked [] let main _argv = diff --git a/samples/Stdio/Program.fs b/samples/Stdio/Program.fs index b713704..3da8272 100644 --- a/samples/Stdio/Program.fs +++ b/samples/Stdio/Program.fs @@ -12,7 +12,7 @@ open ARCP.Runtime [] let main _argv = let server = - ArcpServer( + new ArcpServer( { ArcpServerOptions.defaults with Features = Features.All AllowAnonymousAuth = true diff --git a/samples/_common/SampleHarness.fs b/samples/_common/SampleHarness.fs index 98e2a4f..3270904 100644 --- a/samples/_common/SampleHarness.fs +++ b/samples/_common/SampleHarness.fs @@ -42,7 +42,7 @@ let private makeServerWithOptions } |> configureOptions - let server = ArcpServer(options) + let server = new ArcpServer(options) configure server server diff --git a/src/Arcp.Cli/Program.fs b/src/Arcp.Cli/Program.fs index ae4df42..7e2176b 100644 --- a/src/Arcp.Cli/Program.fs +++ b/src/Arcp.Cli/Program.fs @@ -65,7 +65,7 @@ let private serveStdio (token: string option) : Task = AllowAnonymousAuth = Option.isNone token } - let server = ArcpServer(options) + let server = new ArcpServer(options) server.RegisterAgent("echo", fun ctx -> task { return Json.serializeToElement "echo" }) let transport = StdioTransport.fromConsole () errorLine "serve --stdio: ready" @@ -88,7 +88,11 @@ let private streamEventsAsync (handle: JobHandle) : Task = else writeLine (sprintf "event: %s" (JobEventBody.kind enumerator.Current)) finally - ignore (enumerator.DisposeAsync().AsTask()) + () + + // §38: await disposal so teardown errors surface and complete + // before this function returns. + do! enumerator.DisposeAsync() } :> Task @@ -162,7 +166,12 @@ let main argv = let env = Environment.GetEnvironmentVariable "ARCP_TOKEN" if String.IsNullOrEmpty env then None else Some env) - (serveStdio token).GetAwaiter().GetResult() + // §39: honor the --stdio flag instead of ignoring it. + if sub.Contains ServeArgs.Stdio then + (serveStdio token).GetAwaiter().GetResult() + else + errorLine "serve requires a transport flag; only --stdio is currently supported (pass --stdio)" + 2 | Send sub :: _ -> let url = sub.GetResult SendArgs.Url diff --git a/src/Arcp.Client/ArcpClient.fs b/src/Arcp.Client/ArcpClient.fs index d5f2165..ac4fc7a 100644 --- a/src/Arcp.Client/ArcpClient.fs +++ b/src/Arcp.Client/ArcpClient.fs @@ -18,8 +18,20 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = let handles = ConcurrentDictionary() let mutable sessionCtx: SessionContext option = None let mutable autoAck: AutoAckScheduler option = None + let mutable receiveLoopTask: Task = Task.CompletedTask let receiveLoopCts = new CancellationTokenSource() + /// Attach a faulted-state observer so fire-and-forget sends do not + /// become unobserved task exceptions (#60). + let observeTask (t: Task) : unit = + t.ContinueWith( + (fun (tt: Task) -> + if tt.IsFaulted then + eprintfn "[ARCP] background send failed: %O" tt.Exception), + TaskContinuationOptions.OnlyOnFaulted + ) + |> ignore + let connectedTcs = TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) @@ -28,60 +40,98 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = | Some s when s.NegotiatedFeatures.Contains flag -> Ok() | _ -> Error(ARCPError.InvalidRequest(sprintf "Feature %s was not negotiated" flag, None)) - let sendEnvelope (env: Envelope) : Task = + let sendEnvelopeCt (env: Envelope) (ct: CancellationToken) : Task = let env = match sessionCtx with | Some s -> Envelope.withSessionId s.SessionId env | None -> env - transport.SendAsync(env, receiveLoopCts.Token) + transport.SendAsync(env, ct) + + let sendEnvelope (env: Envelope) : Task = sendEnvelopeCt env receiveLoopCts.Token let sendMessage (msg: Message) : Task = let env = Codec.toEnvelope msg sendEnvelope env - let dispatchJobEvent (env: Envelope) (payload: JobEventPayload) : unit = - match env.JobId with - | None -> () - | Some jid -> - match handles.TryGetValue jid with - | true, w -> - match payload.Body with - | JobEventBody.ResultChunk(rid, chunkSeq, data, enc, more) -> - let assembler = w.ChunkIndex.GetOrCreate rid - - match assembler.Append(chunkSeq, data, enc, more) with - | Ok _ -> w.Channel.Writer.TryWrite payload.Body |> ignore - | Error err -> - // Out-of-order or undecodable chunk: tear down - // the handle so callers don't sit on a job that - // will never produce a usable result. - handles.TryRemove jid |> ignore - w.Channel.Writer.TryComplete() |> ignore - w.ResultSetter.TrySetResult(Error err) |> ignore - | other -> w.Channel.Writer.TryWrite other |> ignore - | _ -> () - - let dispatchJobResult (env: Envelope) (payload: JobResultPayload) : unit = - match env.JobId with - | None -> () - | Some jid -> - match handles.TryRemove jid with - | true, w -> - w.Channel.Writer.TryComplete() |> ignore - w.ResultSetter.TrySetResult(Ok payload) |> ignore - | _ -> () + /// Await a correlated response while honoring the caller's token; on + /// cancellation drop the pending entry so it does not leak (#98). + let awaitResponse (requestId: string) (waiter: Task) (ct: CancellationToken) : Task = + task { + try + return! waiter.WaitAsync(ct) + with :? OperationCanceledException as ex -> + pending.Remove requestId + return raise ex + } - let dispatchJobError (env: Envelope) (payload: JobErrorPayload) : unit = + // Job-addressed envelopes can arrive before `SubmitAsync`/ + // `SubscribeAsync` register the handle (the receive loop completes + // the request waiter and races ahead). Buffer such envelopes per + // job id and flush them in order once the handle is registered, all + // under one gate so registration and delivery cannot interleave (#95). + let dispatchGate = obj () + let orphans = ConcurrentDictionary>() + + let deliver (jid: string) (w: JobHandleWriter) (msg: Message) : unit = + match msg with + | Message.JobEvent payload -> + match payload.Body with + | JobEventBody.ResultChunk(rid, chunkSeq, data, enc, more) -> + let assembler = w.ChunkIndex.GetOrCreate rid + + match assembler.Append(chunkSeq, data, enc, more) with + | Ok _ -> w.Channel.Writer.TryWrite payload.Body |> ignore + | Error err -> + // Out-of-order or undecodable chunk: tear down the + // handle so callers don't sit on a job that will never + // produce a usable result. + handles.TryRemove jid |> ignore + w.Channel.Writer.TryComplete() |> ignore + w.ResultSetter.TrySetResult(Error err) |> ignore + | other -> w.Channel.Writer.TryWrite other |> ignore + | Message.JobResult payload -> + handles.TryRemove jid |> ignore + w.Channel.Writer.TryComplete() |> ignore + w.ResultSetter.TrySetResult(Ok payload) |> ignore + | Message.JobError payload -> + handles.TryRemove jid |> ignore + + let err = + JobErrorMapper.ofWireWith payload.Code payload.Message payload.Details payload.Retryable (Some jid) + + w.Channel.Writer.TryComplete() |> ignore + w.ResultSetter.TrySetResult(Error err) |> ignore + | _ -> () + + let dispatchJob (env: Envelope) (msg: Message) : unit = match env.JobId with | None -> () | Some jid -> - match handles.TryRemove jid with - | true, w -> - let err = JobErrorMapper.ofWire payload.Code payload.Message payload.Details jid - w.Channel.Writer.TryComplete() |> ignore - w.ResultSetter.TrySetResult(Error err) |> ignore - | _ -> () + lock dispatchGate (fun () -> + match handles.TryGetValue jid with + | true, w -> deliver jid w msg + | _ -> + // Buffer until the handle appears. + let q = orphans.GetOrAdd(jid, (fun _ -> ResizeArray())) + q.Add env) + + /// Register a job handle and flush any envelopes that arrived before + /// it was known, preserving order (#95). + let registerHandle (jid: string) (w: JobHandleWriter) : unit = + lock dispatchGate (fun () -> + handles.[jid] <- w + + match orphans.TryRemove jid with + | true, q -> + for env in q do + match Codec.toMessage env with + | Ok m -> + match handles.TryGetValue jid with + | true, w2 -> deliver jid w2 m + | _ -> () + | _ -> () + | _ -> ()) let onPing (payload: SessionPingPayload) : Task = let pong: SessionPongPayload = @@ -98,7 +148,7 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = match sched.OnEvent seq with | Some toAck -> let ack: SessionAckPayload = { LastProcessedSeq = toAck } - ignore (sendMessage (Message.SessionAck ack)) + observeTask (sendMessage (Message.SessionAck ack)) | None -> () | _ -> () @@ -128,15 +178,31 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = match msg with | Message.SessionPing p -> do! onPing p - | Message.JobEvent p -> dispatchJobEvent env p - | Message.JobResult p -> dispatchJobResult env p - | Message.JobError p -> dispatchJobError env p + | Message.JobEvent _ + | Message.JobResult _ + | Message.JobError _ -> dispatchJob env msg | _ -> () with | :? OperationCanceledException -> () | ex -> pending.FailAll ex finally - ignore (enumerator.DisposeAsync().AsTask()) + // §97: on any loop exit (clean EOF or cancellation) fault + // every in-flight request waiter and complete every open + // job handle so callers never hang forever. + let closed = ARCPError.InternalError "ARCP transport closed" + pending.FailAll(ArcpException closed) + + lock dispatchGate (fun () -> + for kv in handles do + kv.Value.Channel.Writer.TryComplete() |> ignore + kv.Value.ResultSetter.TrySetResult(Error closed) |> ignore + + handles.Clear() + orphans.Clear()) + + // §62: await enumerator disposal so transport teardown errors + // surface rather than being swallowed on a background thread. + do! enumerator.DisposeAsync() } :> Task @@ -149,7 +215,6 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = Encodings = [ "json" ] Features = options.Features } - Resume = None } let acceptWelcome (welcomeEnv: Envelope) (w: SessionWelcomePayload) : SessionContext = @@ -180,11 +245,14 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = /// receive loop, then resolves with the negotiated session context. member this.ConnectAsync(ct: CancellationToken) : Task = task { - ignore (runReceiveLoop ()) + // §61: retain the receive-loop task so its completion (and any + // fault) is observable via `Completion`. + receiveLoopTask <- runReceiveLoop () + observeTask receiveLoopTask let env = Codec.toEnvelope (Message.SessionHello(buildHello ())) let waiter = pending.Register env.Id do! transport.SendAsync(env, ct) - let! welcomeEnv = waiter + let! welcomeEnv = awaitResponse env.Id waiter ct match Codec.toMessage welcomeEnv with | Ok(Message.SessionWelcome w) -> return acceptWelcome welcomeEnv w @@ -217,8 +285,8 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = | Ok() -> let env = Codec.toEnvelope (Message.JobSubmit payload) let waiter = pending.Register env.Id - do! sendEnvelope env - let! acceptedEnv = waiter + do! sendEnvelopeCt env ct + let! acceptedEnv = awaitResponse env.Id waiter ct match Codec.toMessage acceptedEnv with | Ok(Message.JobAccepted accepted) -> @@ -238,11 +306,17 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = let credentials = accepted.Credentials |> Option.defaultValue [] let handle, writer = mkHandle jid credentials cancelDelegate - handles.[accepted.JobId] <- writer + registerHandle accepted.JobId writer return handle | Ok(Message.JobError errPayload) -> + // §71: no job id context here — pass None. let err = - JobErrorMapper.ofWire errPayload.Code errPayload.Message errPayload.Details "" + JobErrorMapper.ofWireWith + errPayload.Code + errPayload.Message + errPayload.Details + errPayload.Retryable + None return raise (ArcpException err) | _ -> return raise (ArcpException(ARCPError.InvalidRequest("Expected job.accepted", None))) @@ -263,15 +337,27 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = let env = Codec.toEnvelope (Message.JobSubscribe payload) let waiter = pending.Register env.Id - do! sendEnvelope env - let! _subscribed = waiter - - let cancelDelegate (_reason, _ct') = - task { return Error(ARCPError.PermissionDenied("Subscribers cannot cancel", None)) } - - let handle, writer = mkHandle jobId [] cancelDelegate - handles.[jobId.Value] <- writer - return handle + do! sendEnvelopeCt env ct + let! subscribedEnv = awaitResponse env.Id waiter ct + + // §7.6 / #96: surface subscription denials instead of + // returning a live-looking handle. + match Codec.toMessage subscribedEnv with + | Ok(Message.JobSubscribed _) -> + let cancelDelegate (_reason, _ct') = + task { return Error(ARCPError.PermissionDenied("Subscribers cannot cancel", None)) } + + let handle, writer = mkHandle jobId [] cancelDelegate + registerHandle jobId.Value writer + return handle + | Ok(Message.SessionError e) -> + return + raise ( + ArcpException( + JobErrorMapper.ofWireWith e.Code e.Message e.Details e.Retryable (Some jobId.Value) + ) + ) + | _ -> return raise (ArcpException(ARCPError.InvalidRequest("Expected job.subscribed", None))) } /// Stop receiving events for a subscribed job. @@ -306,8 +392,8 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = let env = Codec.toEnvelope (Message.SessionListJobs payload) let waiter = pending.Register env.Id - do! sendEnvelope env - let! respEnv = waiter + do! sendEnvelopeCt env ct + let! respEnv = awaitResponse env.Id waiter ct match Codec.toMessage respEnv with | Ok(Message.SessionJobs jobs) -> return jobs @@ -335,11 +421,20 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) = member _.Session = sessionCtx + /// Completes when the receive loop terminates (clean EOF, cancellation, + /// or fault). Lets callers observe that the client stopped pumping (#61). + member _.Completion: Task = receiveLoopTask + + /// Resolves with the negotiated session once `session.welcome` is + /// received; faults if the handshake fails. A separate handle to the + /// connect result for callers that did not await `ConnectAsync` (#70). + member _.Connected: Task = connectedTcs.Task + /// Close the session cleanly with an optional reason. member this.CloseAsync(reason: string option, ct: CancellationToken) : Task = task { try - do! sendMessage (Message.SessionBye { Reason = reason }) + do! sendMessage (Message.SessionClose { Reason = reason }) with _ -> () diff --git a/src/Arcp.Client/Internal/AutoAck.fs b/src/Arcp.Client/Internal/AutoAck.fs index 3eae91e..e9d149b 100644 --- a/src/Arcp.Client/Internal/AutoAck.fs +++ b/src/Arcp.Client/Internal/AutoAck.fs @@ -10,8 +10,11 @@ type AutoAckOptions = { /// Maximum number of events to receive before forcing an `ack`. EveryEvents: int - /// Maximum time between acks. The scheduler flushes if either - /// threshold is reached. + /// Minimum elapsed time before the next event triggers an ack. + /// NOTE: the scheduler is event-driven — it is evaluated only on + /// `OnEvent`, so this interval gates acks on the next event + /// arrival rather than firing on its own timer. `session.ack` is + /// advisory (spec §6.5), so no standalone timer is used. Interval: TimeSpan } @@ -24,12 +27,14 @@ module AutoAckOptions = } /// Tracks `last_processed_seq` and decides when to emit a -/// `session.ack` based on event count and elapsed time. +/// `session.ack`. Evaluation is event-driven: `OnEvent` returns the +/// seq to ack when the event-count threshold is hit, or when the +/// configured interval has elapsed *as observed on the current event*. +/// There is no background timer — if events stop arriving, no ack is +/// produced (acks are advisory per spec §6.5). /// /// The scheduler does NOT send the ack itself — it returns the seq -/// to send so the client can build/send the envelope. Spec §6.5 -/// notes ack is purely advisory; this implementation matches the -/// TS SDK's behaviour (ack every 32 events / 250 ms by default). +/// to send so the client can build/send the envelope. type internal AutoAckScheduler(options: AutoAckOptions, timeProvider: TimeProvider) = let lockObj = obj () let mutable lastSeq: int64 = 0L diff --git a/src/Arcp.Client/Internal/ChunkAssembler.fs b/src/Arcp.Client/Internal/ChunkAssembler.fs index f3ecd77..fb6f835 100644 --- a/src/Arcp.Client/Internal/ChunkAssembler.fs +++ b/src/Arcp.Client/Internal/ChunkAssembler.fs @@ -11,13 +11,26 @@ open ARCP.Core /// One assembler instance per `result_id`. Chunks MUST arrive in /// `chunk_seq` order per spec §8.4; out-of-order arrivals raise /// `InvalidRequest` and the caller is expected to terminate the job. -type internal ChunkAssembler() = +/// Default caps guarding against unbounded result streams (DoS). +[] +module internal ChunkLimits = + [] + let DefaultMaxBytes: int64 = 256L * 1024L * 1024L // 256 MiB + + [] + let DefaultMaxChunks: int = 1_000_000 + +type internal ChunkAssembler(maxBytes: int64, maxChunks: int) = let buffer = ResizeArray() let mutable expectedSeq: int64 = 0L + let mutable totalBytes: int64 = 0L let mutable closed = false + new() = ChunkAssembler(ChunkLimits.DefaultMaxBytes, ChunkLimits.DefaultMaxChunks) + /// Append a chunk. Returns `Ok finished` where `finished` is - /// `true` once a `more = false` chunk has arrived. + /// `true` once a `more = false` chunk has arrived. A stream that + /// exceeds the byte/chunk cap is rejected to bound memory (§8.4). member _.Append(chunkSeq: int64, data: string, encoding: ChunkEncoding, more: bool) : Result = if closed then Error(ARCPError.InvalidRequest("Chunk arrived after stream closed", None)) @@ -25,6 +38,8 @@ type internal ChunkAssembler() = Error( ARCPError.InvalidRequest(sprintf "Out-of-order chunk: expected %d, got %d" expectedSeq chunkSeq, None) ) + elif int64 buffer.Count >= int64 maxChunks then + Error(ARCPError.InvalidRequest(sprintf "Result stream exceeded max chunk count (%d)" maxChunks, None)) else let bytesResult = try @@ -42,8 +57,11 @@ type internal ChunkAssembler() = match bytesResult with | Error e -> Error e + | Ok bytes when totalBytes + int64 bytes.Length > maxBytes -> + Error(ARCPError.InvalidRequest(sprintf "Result stream exceeded max byte budget (%d)" maxBytes, None)) | Ok bytes -> buffer.Add bytes + totalBytes <- totalBytes + int64 bytes.Length expectedSeq <- expectedSeq + 1L if not more then diff --git a/src/Arcp.Client/Internal/JobErrorMapper.fs b/src/Arcp.Client/Internal/JobErrorMapper.fs index 4fde7ad..a99939a 100644 --- a/src/Arcp.Client/Internal/JobErrorMapper.fs +++ b/src/Arcp.Client/Internal/JobErrorMapper.fs @@ -1,32 +1,90 @@ namespace ARCP.Client.Internal open System +open System.Text.Json open ARCP.Core /// Map a wire `job.error` / `session.error` code string back to an -/// `ARCPError` DU. The reverse direction is `ARCPError.code`. Out -/// of scope: details payloads beyond `message`. +/// `ARCPError` DU, parsing structured fields out of the `details` +/// payload where the DU carries them. The reverse direction is +/// `ARCPError.code`. [] module internal JobErrorMapper = - let ofWire + let private prop (details: JsonElement option) (name: string) : JsonElement option = + match details with + | Some d when d.ValueKind = JsonValueKind.Object -> + match d.TryGetProperty name with + | true, v when v.ValueKind <> JsonValueKind.Null -> Some v + | _ -> None + | _ -> None + + let private strField details name fallback = + prop details name + |> Option.map (fun v -> v.GetString()) + |> Option.defaultValue fallback + + let private intField details name fallback = + prop details name + |> Option.bind (fun v -> + match v.TryGetInt32() with + | true, n -> Some n + | _ -> None) + |> Option.defaultValue fallback + + let private int64Field details name fallback = + prop details name + |> Option.bind (fun v -> + match v.TryGetInt64() with + | true, n -> Some n + | _ -> None) + |> Option.defaultValue fallback + + let private dateField details name fallback = + prop details name + |> Option.bind (fun v -> + match v.TryGetDateTimeOffset() with + | true, d -> Some d + | _ -> None) + |> Option.defaultValue fallback + + /// Map a wire error. `jobId` is the job context when known (used for + /// `JOB_NOT_FOUND`); `None` for non-job dispatch sites (#71). + /// `retryable` is the wire flag, honored for unknown codes (#90). + let ofWireWith (code: string) (message: string) (details: System.Text.Json.JsonElement option) - (jobId: string) + (retryable: bool) + (jobId: string option) : ARCPError = match code with | "PERMISSION_DENIED" -> ARCPError.PermissionDenied(message, details) | "LEASE_SUBSET_VIOLATION" -> ARCPError.LeaseSubsetViolation(message, details) - | "JOB_NOT_FOUND" -> ARCPError.JobNotFound jobId + | "JOB_NOT_FOUND" -> + match jobId with + | Some j -> ARCPError.JobNotFound j + | None -> ARCPError.InvalidRequest(message, details) | "DUPLICATE_KEY" -> ARCPError.DuplicateKey message | "AGENT_NOT_AVAILABLE" -> ARCPError.AgentNotAvailable message - | "AGENT_VERSION_NOT_AVAILABLE" -> ARCPError.AgentVersionNotAvailable(message, "") + | "AGENT_VERSION_NOT_AVAILABLE" -> ARCPError.AgentVersionNotAvailable(message, strField details "version" "") | "CANCELLED" -> ARCPError.Cancelled(Some message) - | "TIMEOUT" -> ARCPError.Timeout 0 + | "TIMEOUT" -> ARCPError.Timeout(intField details "timeout_sec" 0) | "HEARTBEAT_LOST" -> ARCPError.HeartbeatLost - | "LEASE_EXPIRED" -> ARCPError.LeaseExpired DateTimeOffset.MinValue - | "BUDGET_EXHAUSTED" -> ARCPError.BudgetExhausted "USD" + | "LEASE_EXPIRED" -> ARCPError.LeaseExpired(dateField details "expires_at" DateTimeOffset.MinValue) + | "BUDGET_EXHAUSTED" -> ARCPError.BudgetExhausted(strField details "currency" "USD") | "INVALID_REQUEST" -> ARCPError.InvalidRequest(message, details) | "UNAUTHENTICATED" -> ARCPError.Unauthenticated message - | "RESUME_WINDOW_EXPIRED" -> ARCPError.ResumeWindowExpired(0L, 0) - | _ -> ARCPError.InternalError message + | "RESUME_WINDOW_EXPIRED" -> + ARCPError.ResumeWindowExpired(int64Field details "from_seq" 0L, intField details "window_sec" 0) + | other -> ARCPError.Unknown(other, message, retryable) + + /// Backwards-compatible entry point: jobId as a plain string and a + /// default retryable of false for unknown codes. + let ofWire + (code: string) + (message: string) + (details: System.Text.Json.JsonElement option) + (jobId: string) + : ARCPError = + let jid = if String.IsNullOrEmpty jobId then None else Some jobId + ofWireWith code message details false jid diff --git a/src/Arcp.Client/Internal/Pending.fs b/src/Arcp.Client/Internal/Pending.fs index 69db82c..974037c 100644 --- a/src/Arcp.Client/Internal/Pending.fs +++ b/src/Arcp.Client/Internal/Pending.fs @@ -19,7 +19,8 @@ type internal PendingRegistry() = TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously) if not (pending.TryAdd(requestId, tcs)) then - failwithf "Duplicate pending request id: %s" requestId + // §69: surface as a typed ARCP error rather than a bare exn. + raise (ArcpException(ARCPError.InternalError(sprintf "Duplicate pending request id: %s" requestId))) tcs.Task @@ -32,6 +33,10 @@ type internal PendingRegistry() = true | _ -> false + /// Drop a pending request without completing it (e.g. when the + /// caller's cancellation token fires). + member _.Remove(requestId: string) : unit = pending.TryRemove requestId |> ignore + /// Fail all pending operations (e.g. on transport close). member _.FailAll(error: exn) : unit = for kvp in pending do diff --git a/src/Arcp.Client/Transport/Memory.fs b/src/Arcp.Client/Transport/Memory.fs index 0dcac0c..6ce21ac 100644 --- a/src/Arcp.Client/Transport/Memory.fs +++ b/src/Arcp.Client/Transport/Memory.fs @@ -17,12 +17,6 @@ type MemoryTransport private (outgoing: Channel, incoming: Channel Task member _.Receive(ct) = - let enumerable = - seq { - while not ct.IsCancellationRequested do - yield incoming - } - // Convert the channel reader into IAsyncEnumerable via a helper. let reader = incoming.Reader { new IAsyncEnumerable with diff --git a/src/Arcp.Client/Transport/Stdio.fs b/src/Arcp.Client/Transport/Stdio.fs index 56a8ca4..55dd665 100644 --- a/src/Arcp.Client/Transport/Stdio.fs +++ b/src/Arcp.Client/Transport/Stdio.fs @@ -11,7 +11,12 @@ open ARCP.Client /// Newline-delimited JSON over a pair of streams. Used for stdio /// child processes (spec §4: stdio mandatory for in-process children). type StdioTransport(input: TextReader, output: TextWriter, ownsStreams: bool) = + // §68: `closed` is written by CloseAsync (possibly another thread) + // and read in the receive loop; mark it volatile so the write is + // observed promptly on weak memory models. + [] let mutable closed = false + let writeLock = obj () interface ITransport with diff --git a/src/Arcp.Client/Transport/WebSocket.fs b/src/Arcp.Client/Transport/WebSocket.fs index ca97b1f..f0f3fd8 100644 --- a/src/Arcp.Client/Transport/WebSocket.fs +++ b/src/Arcp.Client/Transport/WebSocket.fs @@ -17,7 +17,7 @@ open ARCP.Client /// /// One text frame per envelope. The receive loop reassembles /// continuation frames into a single message. -type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) = +type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool, maxMessageBytes: int64) = // The BCL WebSocket allows only one outstanding send at a time, // so we serialise concurrent callers (auto-ack, pong, submit, // cancel, close) through an async-aware semaphore — a plain @@ -59,6 +59,16 @@ type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) = if result.MessageType = WebSocketMessageType.Close then closedRemotely <- true else + if ms.Length + int64 result.Count > maxMessageBytes then + // §66: reject oversized messages before the + // codec ever sees them. + raise ( + WebSocketException( + WebSocketError.HeaderError, + sprintf "Inbound message exceeded %d bytes" maxMessageBytes + ) + ) + ms.Write(buffer, 0, result.Count) endOfMessage <- result.EndOfMessage @@ -80,6 +90,11 @@ type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool) = | Error _ -> return! receiveOne ct } + /// Defaults to a 256 MiB cap on a single reassembled message to + /// bound memory against a peer streaming continuation frames + /// indefinitely. + new(socket: WebSocket, ownsSocket: bool) = WebSocketClientTransport(socket, ownsSocket, 256L * 1024L * 1024L) + interface ITransport with member _.SendAsync(env, ct) = sendOne env ct diff --git a/src/Arcp.Core/Arcp.Core.fsproj b/src/Arcp.Core/Arcp.Core.fsproj index ddb5df3..230d175 100644 --- a/src/Arcp.Core/Arcp.Core.fsproj +++ b/src/Arcp.Core/Arcp.Core.fsproj @@ -11,13 +11,13 @@ - + diff --git a/src/Arcp.Core/Codec.fs b/src/Arcp.Core/Codec.fs index f363f75..3d3fbc1 100644 --- a/src/Arcp.Core/Codec.fs +++ b/src/Arcp.Core/Codec.fs @@ -23,13 +23,15 @@ module Codec = let payload = match msg with | Message.SessionHello p -> payloadElement p + | Message.SessionResume p -> payloadElement p | Message.SessionWelcome p -> payloadElement p | Message.SessionPing p -> payloadElement p | Message.SessionPong p -> payloadElement p | Message.SessionAck p -> payloadElement p | Message.SessionListJobs p -> payloadElement p | Message.SessionJobs p -> payloadElement p - | Message.SessionBye p -> payloadElement p + | Message.SessionClose p -> payloadElement p + | Message.SessionClosed p -> payloadElement p | Message.SessionError p -> payloadElement p | Message.JobSubmit p -> payloadElement p | Message.JobAccepted p -> payloadElement p @@ -37,6 +39,7 @@ module Codec = | Message.JobResult p -> payloadElement p | Message.JobError p -> payloadElement p | Message.JobCancel p -> payloadElement p + | Message.JobCancelled p -> payloadElement p | Message.JobSubscribe p -> payloadElement p | Message.JobSubscribed p -> payloadElement p | Message.JobUnsubscribe p -> payloadElement p @@ -48,13 +51,15 @@ module Codec = try match env.Type with | "session.hello" -> Ok(Message.SessionHello(decodePayload env)) + | "session.resume" -> Ok(Message.SessionResume(decodePayload env)) | "session.welcome" -> Ok(Message.SessionWelcome(decodePayload env)) | "session.ping" -> Ok(Message.SessionPing(decodePayload env)) | "session.pong" -> Ok(Message.SessionPong(decodePayload env)) | "session.ack" -> Ok(Message.SessionAck(decodePayload env)) | "session.list_jobs" -> Ok(Message.SessionListJobs(decodePayload env)) | "session.jobs" -> Ok(Message.SessionJobs(decodePayload env)) - | "session.bye" -> Ok(Message.SessionBye(decodePayload env)) + | "session.close" -> Ok(Message.SessionClose(decodePayload env)) + | "session.closed" -> Ok(Message.SessionClosed(decodePayload env)) | "session.error" -> Ok(Message.SessionError(decodePayload env)) | "job.submit" -> Ok(Message.JobSubmit(decodePayload env)) | "job.accepted" -> Ok(Message.JobAccepted(decodePayload env)) @@ -62,6 +67,7 @@ module Codec = | "job.result" -> Ok(Message.JobResult(decodePayload env)) | "job.error" -> Ok(Message.JobError(decodePayload env)) | "job.cancel" -> Ok(Message.JobCancel(decodePayload env)) + | "job.cancelled" -> Ok(Message.JobCancelled(decodePayload env)) | "job.subscribe" -> Ok(Message.JobSubscribe(decodePayload env)) | "job.subscribed" -> Ok(Message.JobSubscribed(decodePayload env)) | "job.unsubscribe" -> Ok(Message.JobUnsubscribe(decodePayload env)) @@ -75,6 +81,15 @@ module Codec = /// Parse a JSON string from the wire into an envelope. let readEnvelope (json: string) : Result = try - Ok(Json.deserialize json) + let env = Json.deserialize json + + // The JSON literal `null` deserializes to a null record; reject + // it (and missing type/id) so the session loop never NREs (§5, §12). + if obj.ReferenceEquals(env, null) then + Error(ARCPError.InvalidRequest("Envelope must be a JSON object", None)) + elif String.IsNullOrEmpty env.Type || String.IsNullOrEmpty env.Id then + Error(ARCPError.InvalidRequest("Envelope is missing required type/id", None)) + else + Ok env with :? JsonException as ex -> Error(ARCPError.InvalidRequest(sprintf "Malformed envelope: %s" ex.Message, None)) diff --git a/src/Arcp.Core/Errors.fs b/src/Arcp.Core/Errors.fs index 48587d5..b3a809f 100644 --- a/src/Arcp.Core/Errors.fs +++ b/src/Arcp.Core/Errors.fs @@ -31,6 +31,9 @@ type ARCPError = | InvalidRequest of message: string * details: JsonElement option | Unauthenticated of message: string | InternalError of message: string + /// Forward-compatible arm for wire error codes this SDK version + /// does not model, carrying the wire `retryable` flag verbatim (§12). + | Unknown of code: string * message: string * retryable: bool [] module ARCPError = @@ -52,6 +55,7 @@ module ARCPError = | ARCPError.InvalidRequest _ -> "INVALID_REQUEST" | ARCPError.Unauthenticated _ -> "UNAUTHENTICATED" | ARCPError.InternalError _ -> "INTERNAL_ERROR" + | ARCPError.Unknown(c, _, _) -> c /// Human-readable message; suitable for `error.message` on the wire. let message (e: ARCPError) : string = @@ -73,6 +77,7 @@ module ARCPError = | ARCPError.InvalidRequest(m, _) -> m | ARCPError.Unauthenticated m -> m | ARCPError.InternalError m -> m + | ARCPError.Unknown(_, m, _) -> m /// Spec §12: retryable iff a different attempt could succeed. let retryable (e: ARCPError) : bool = @@ -80,6 +85,7 @@ module ARCPError = | ARCPError.Timeout _ | ARCPError.HeartbeatLost | ARCPError.InternalError _ -> true + | ARCPError.Unknown(_, _, r) -> r | _ -> false let details (e: ARCPError) : JsonElement option = @@ -105,8 +111,11 @@ type ArcpException(error: ARCPError, ?inner: exn) = member _.Code = ARCPError.code error member _.Retryable = ARCPError.retryable error +/// Helpers bridging `Result<_, ARCPError>` and the throwing C#-style +/// surface. Named `ArcpResult` (not `Result`) so it does not shadow +/// FSharp.Core's `Result` for consumers that `open ARCP.Core` (#118). [] -module Result = +module ArcpResult = /// Throw `ArcpException` on `Error`, return the value on `Ok`. /// The seam between internal `Result<_, ARCPError>` and the /// public C#-friendly throwing API. diff --git a/src/Arcp.Core/Json.fs b/src/Arcp.Core/Json.fs index b191f4a..dd2403a 100644 --- a/src/Arcp.Core/Json.fs +++ b/src/Arcp.Core/Json.fs @@ -1,16 +1,337 @@ namespace ARCP.Core +open System open System.Text.Json open System.Text.Json.Serialization open System.Text.Encodings.Web +/// Custom converters that pin the spec wire format (§6.2, §7.3, +/// §8.2, §8.4). FSharp.SystemTextJson's union handling would emit +/// case-name wrappers and PascalCase tags; these converters instead +/// produce the flat, lowercase, snake_case shapes the spec mandates +/// so non-F# peers (TS/Go SDKs, the spec examples) interoperate. +module internal JsonConverters = + + let private logLevelToWire (l: LogLevel) = + match l with + | LogLevel.Debug -> "debug" + | LogLevel.Info -> "info" + | LogLevel.Warn -> "warn" + | LogLevel.Error -> "error" + + let private logLevelOfWire (s: string) = + match s with + | "debug" -> LogLevel.Debug + | "info" -> LogLevel.Info + | "warn" -> LogLevel.Warn + | "error" -> LogLevel.Error + | other -> raise (JsonException(sprintf "Unknown log level: %s" other)) + + let private chunkEncodingToWire (e: ChunkEncoding) = + match e with + | ChunkEncoding.Utf8 -> "utf8" + | ChunkEncoding.Base64 -> "base64" + + let private chunkEncodingOfWire (s: string) = + match s with + | "utf8" -> ChunkEncoding.Utf8 + | "base64" -> ChunkEncoding.Base64 + | other -> raise (JsonException(sprintf "Unknown chunk encoding: %s" other)) + + type JobStatusConverter() = + inherit JsonConverter() + + override _.Read(reader, _, _) = + let s = reader.GetString() + + match JobStatus.tryOfWire s with + | Ok v -> v + | Error e -> raise (JsonException e) + + override _.Write(writer, value, _) = + writer.WriteStringValue(JobStatus.toWire value) + + type LogLevelConverter() = + inherit JsonConverter() + override _.Read(reader, _, _) = logLevelOfWire (reader.GetString()) + + override _.Write(writer, value, _) = + writer.WriteStringValue(logLevelToWire value) + + type ChunkEncodingConverter() = + inherit JsonConverter() + + override _.Read(reader, _, _) = + chunkEncodingOfWire (reader.GetString()) + + override _.Write(writer, value, _) = + writer.WriteStringValue(chunkEncodingToWire value) + + /// `LeaseGrant` is wire-encoded as a bare namespace→patterns map + /// (§9.1, §9.2), not a `{ "capabilities": {...} }` wrapper. + type LeaseGrantConverter() = + inherit JsonConverter() + + override _.Read(reader, _, options) = + let map = JsonSerializer.Deserialize>(&reader, options) + { Capabilities = map } + + override _.Write(writer, value, options) = + JsonSerializer.Serialize(writer, value.Capabilities, options) + + /// Credential constraints use dotted lease-namespace keys + /// (`cost.budget`, `model.use`) per §9.8.1, not snake_case. + type CredentialConstraintsConverter() = + inherit JsonConverter() + + override _.Read(reader, _, options) = + let el = JsonElement.ParseValue(&reader) + + let prop name = + match el.TryGetProperty(name: string) with + | true, v when v.ValueKind <> JsonValueKind.Null -> Some v + | _ -> None + + { + CostBudget = + prop "cost.budget" + |> Option.map (fun v -> JsonSerializer.Deserialize(v.GetRawText(), options)) + ModelUse = + prop "model.use" + |> Option.map (fun v -> JsonSerializer.Deserialize(v.GetRawText(), options)) + ExpiresAt = prop "expires_at" |> Option.map (fun v -> v.GetDateTimeOffset()) + } + + override _.Write(writer, value, options) = + writer.WriteStartObject() + + value.CostBudget + |> Option.iter (fun xs -> + writer.WritePropertyName("cost.budget") + JsonSerializer.Serialize(writer, xs, options)) + + value.ModelUse + |> Option.iter (fun xs -> + writer.WritePropertyName("model.use") + JsonSerializer.Serialize(writer, xs, options)) + + value.ExpiresAt + |> Option.iter (fun t -> + writer.WritePropertyName("expires_at") + JsonSerializer.Serialize(writer, t, options)) + + writer.WriteEndObject() + + /// `capabilities.agents` is a plain JSON array in both the flat + /// (string) and rich (object) shapes (§6.2). + type AgentInventoryConverter() = + inherit JsonConverter() + + override _.Read(reader, _, options) = + let el = JsonElement.ParseValue(&reader) + + if el.ValueKind <> JsonValueKind.Array then + raise (JsonException "capabilities.agents must be an array") + + let items = el.EnumerateArray() |> Seq.toList + + match items with + | [] -> AgentInventory.Flat [] + | first :: _ when first.ValueKind = JsonValueKind.String -> + AgentInventory.Flat(items |> List.map (fun e -> e.GetString())) + | _ -> + AgentInventory.Rich( + items + |> List.map (fun e -> JsonSerializer.Deserialize(e.GetRawText(), options)) + ) + + override _.Write(writer, value, options) = + writer.WriteStartArray() + + match value with + | AgentInventory.Flat names -> names |> List.iter writer.WriteStringValue + | AgentInventory.Rich entries -> + entries |> List.iter (fun e -> JsonSerializer.Serialize(writer, e, options)) + + writer.WriteEndArray() + + let private writeBody (writer: Utf8JsonWriter) (options: JsonSerializerOptions) (body: JobEventBody) = + let writeOptStr name (v: string option) = + v |> Option.iter (fun s -> writer.WriteString((name: string), s)) + + match body with + | JobEventBody.XVendor(_, el) -> el.WriteTo(writer) + | _ -> + writer.WriteStartObject() + + match body with + | JobEventBody.XVendor _ -> () + | JobEventBody.Log(level, message) -> + writer.WriteString("level", logLevelToWire level) + writer.WriteString("message", message) + | JobEventBody.Thought text -> writer.WriteString("text", text) + | JobEventBody.ToolCall(tool, args, callId) -> + writer.WriteString("tool", tool) + writer.WritePropertyName("args") + args.WriteTo(writer) + writer.WriteString("call_id", callId) + | JobEventBody.ToolResult(callId, outcome) -> + writer.WriteString("call_id", callId) + + match outcome with + | ToolOutcome.Result v -> + writer.WritePropertyName("result") + v.WriteTo(writer) + | ToolOutcome.Error(code, message, retryable) -> + writer.WritePropertyName("error") + writer.WriteStartObject() + writer.WriteString("code", code) + writer.WriteString("message", message) + writer.WriteBoolean("retryable", retryable) + writer.WriteEndObject() + | JobEventBody.Status(phase, message) -> + writer.WriteString("phase", phase) + writeOptStr "message" message + | JobEventBody.Metric(name, value, unit, dimensions) -> + writer.WriteString("name", name) + writer.WriteNumber("value", value) + writeOptStr "unit" unit + + dimensions + |> Option.iter (fun d -> + writer.WritePropertyName("dimensions") + JsonSerializer.Serialize(writer, d, options)) + | JobEventBody.ArtifactRef(uri, contentType, byteSize, sha256) -> + writer.WriteString("uri", uri) + writer.WriteString("content_type", contentType) + byteSize |> Option.iter (fun b -> writer.WriteNumber("byte_size", b)) + writeOptStr "sha256" sha256 + | JobEventBody.Delegate b -> + writer.WriteString("child_job_id", b.ChildJobId) + writer.WriteString("agent", b.Agent) + writer.WritePropertyName("lease") + JsonSerializer.Serialize(writer, b.Lease, options) + + b.LeaseConstraints + |> Option.iter (fun lc -> + writer.WritePropertyName("lease_constraints") + JsonSerializer.Serialize(writer, lc, options)) + | JobEventBody.Progress(current, total, units, message) -> + writer.WriteNumber("current", current) + total |> Option.iter (fun t -> writer.WriteNumber("total", t)) + writeOptStr "units" units + writeOptStr "message" message + | JobEventBody.ResultChunk(resultId, chunkSeq, data, encoding, more) -> + writer.WriteString("result_id", resultId) + writer.WriteNumber("chunk_seq", chunkSeq) + writer.WriteString("data", data) + writer.WriteString("encoding", chunkEncodingToWire encoding) + writer.WriteBoolean("more", more) + + writer.WriteEndObject() + + let private readBody (kind: string) (el: JsonElement) (options: JsonSerializerOptions) : JobEventBody = + let req name = el.GetProperty(name: string) + + let opt name = + match el.TryGetProperty(name: string) with + | true, v when v.ValueKind <> JsonValueKind.Null -> Some v + | _ -> None + + let optStr name = + opt name |> Option.map (fun v -> v.GetString()) + + match kind with + | "log" -> JobEventBody.Log(logLevelOfWire ((req "level").GetString()), (req "message").GetString()) + | "thought" -> JobEventBody.Thought((req "text").GetString()) + | "tool_call" -> + JobEventBody.ToolCall((req "tool").GetString(), (req "args").Clone(), (req "call_id").GetString()) + | "tool_result" -> + let callId = (req "call_id").GetString() + + let outcome = + match opt "error" with + | Some errEl -> + ToolOutcome.Error( + (errEl.GetProperty("code")).GetString(), + (errEl.GetProperty("message")).GetString(), + (errEl.GetProperty("retryable")).GetBoolean() + ) + | None -> ToolOutcome.Result((req "result").Clone()) + + JobEventBody.ToolResult(callId, outcome) + | "status" -> JobEventBody.Status((req "phase").GetString(), optStr "message") + | "metric" -> + JobEventBody.Metric( + (req "name").GetString(), + (req "value").GetDecimal(), + optStr "unit", + opt "dimensions" + |> Option.map (fun v -> JsonSerializer.Deserialize>(v.GetRawText(), options)) + ) + | "artifact_ref" -> + JobEventBody.ArtifactRef( + (req "uri").GetString(), + (req "content_type").GetString(), + opt "byte_size" |> Option.map (fun v -> v.GetInt64()), + optStr "sha256" + ) + | "delegate" -> + let b: DelegateBody = + { + ChildJobId = (req "child_job_id").GetString() + Agent = (req "agent").GetString() + Lease = JsonSerializer.Deserialize((req "lease").GetRawText(), options) + LeaseConstraints = + opt "lease_constraints" + |> Option.map (fun v -> JsonSerializer.Deserialize(v.GetRawText(), options)) + } + + JobEventBody.Delegate b + | "progress" -> + JobEventBody.Progress( + (req "current").GetDecimal(), + opt "total" |> Option.map (fun v -> v.GetDecimal()), + optStr "units", + optStr "message" + ) + | "result_chunk" -> + JobEventBody.ResultChunk( + (req "result_id").GetString(), + (req "chunk_seq").GetInt64(), + (req "data").GetString(), + chunkEncodingOfWire ((req "encoding").GetString()), + (req "more").GetBoolean() + ) + | other -> JobEventBody.XVendor(other, el.Clone()) + + /// `job.event` payload converter: emits `{ kind, ts, body }` with + /// `body` as the flat kind-specific shape (§8.1, §8.2). + type JobEventPayloadConverter() = + inherit JsonConverter() + + override _.Read(reader, _, options) = + let el = JsonElement.ParseValue(&reader) + let kind = (el.GetProperty("kind")).GetString() + let ts = (el.GetProperty("ts")).GetDateTimeOffset() + let body = readBody kind (el.GetProperty("body")) options + { Kind = kind; Ts = ts; Body = body } + + override _.Write(writer, value, options) = + writer.WriteStartObject() + writer.WriteString("kind", value.Kind) + writer.WritePropertyName("ts") + JsonSerializer.Serialize(writer, value.Ts, options) + writer.WritePropertyName("body") + writeBody writer options value.Body + writer.WriteEndObject() + /// JSON configuration shared across the SDK. /// /// The wire format (spec §5.1, §6, §7, §8) puts the discriminator as -/// a top-level `type` field next to peer fields. Use -/// `JsonFSharpOptions.InternalTag` keyed on `"type"`, not the default -/// `AdjacentTag` which would wrap as `{ "type": "X", "fields": {...} }` -/// and break the wire shape. +/// a top-level `type` field next to peer fields. Custom converters in +/// `JsonConverters` pin the spec-mandated flat shapes for the unions +/// that appear inside payloads. [] module Json = let private buildOptions () : JsonSerializerOptions = @@ -33,6 +354,16 @@ module Json = opts.DefaultIgnoreCondition <- JsonIgnoreCondition.WhenWritingNull opts.WriteIndented <- false opts.Encoder <- JavaScriptEncoder.UnsafeRelaxedJsonEscaping + + // Wire-format converters take priority over the FSharp.SystemTextJson + // union/record handling; insert them at the front. + opts.Converters.Insert(0, JsonConverters.JobStatusConverter()) + opts.Converters.Insert(1, JsonConverters.LogLevelConverter()) + opts.Converters.Insert(2, JsonConverters.ChunkEncodingConverter()) + opts.Converters.Insert(3, JsonConverters.LeaseGrantConverter()) + opts.Converters.Insert(4, JsonConverters.CredentialConstraintsConverter()) + opts.Converters.Insert(5, JsonConverters.AgentInventoryConverter()) + opts.Converters.Insert(6, JsonConverters.JobEventPayloadConverter()) opts /// Default `JsonSerializerOptions` for ARCP-wire serialisation. diff --git a/src/Arcp.Core/Lease.fs b/src/Arcp.Core/Lease.fs index cd4b315..9010b15 100644 --- a/src/Arcp.Core/Lease.fs +++ b/src/Arcp.Core/Lease.fs @@ -47,20 +47,29 @@ module Capabilities = [] module Glob = + // Compiled regexes are cached per glob so the hot authorisation + // path does not recompile (and re-emit IL) on every match (#44). + let private cache = + System.Collections.Concurrent.ConcurrentDictionary() + /// Compile a glob pattern (`?`, `*`, `**`) into a regex. /// `**` matches any path segment including `/`; `*` matches /// any character except `/`; `?` matches a single non-`/` char. let compile (pattern: string) : Regex = - let rec translate (chars: char list) (acc: string list) : string list = - match chars with - | [] -> List.rev acc - | '*' :: '*' :: rest -> translate rest (".*" :: acc) - | '*' :: rest -> translate rest ("[^/]*" :: acc) - | '?' :: rest -> translate rest ("[^/]" :: acc) - | c :: rest -> translate rest (Regex.Escape(string c) :: acc) - - let body = pattern |> List.ofSeq |> (fun cs -> translate cs []) |> String.concat "" - Regex("^" + body + "$", RegexOptions.Compiled ||| RegexOptions.CultureInvariant) + cache.GetOrAdd( + pattern, + fun p -> + let rec translate (chars: char list) (acc: string list) : string list = + match chars with + | [] -> List.rev acc + | '*' :: '*' :: rest -> translate rest (".*" :: acc) + | '*' :: rest -> translate rest ("[^/]*" :: acc) + | '?' :: rest -> translate rest ("[^/]" :: acc) + | c :: rest -> translate rest (Regex.Escape(string c) :: acc) + + let body = p |> List.ofSeq |> (fun cs -> translate cs []) |> String.concat "" + Regex("^" + body + "$", RegexOptions.Compiled ||| RegexOptions.CultureInvariant) + ) let isMatch (pattern: string) (target: string) : bool = // Amount strings used in `cost.budget` and any non-glob @@ -99,35 +108,33 @@ module Lease = | true, d when d >= 0m -> Ok(currency, d) | _ -> Error(sprintf "Invalid cost.budget amount: %s" amount) + /// Aggregate `currency:decimal` amount strings into a per-currency + /// sum. Duplicate currencies are summed (not last-wins) so the same + /// rule applies at acceptance and at subset validation (#116). + let internal amountsPerCurrency (amounts: string list) : Map = + amounts + |> List.choose (fun a -> + match parseBudgetAmount a with + | Ok kv -> Some kv + | Error _ -> None) + |> List.groupBy fst + |> List.map (fun (c, xs) -> c, xs |> List.sumBy snd) + |> Map.ofList + /// Initial budget counters from a lease. let initialBudgets (lease: LeaseGrant) : Map = match Map.tryFind Capabilities.CostBudget lease.Capabilities with | None -> Map.empty - | Some amounts -> - amounts - |> List.choose (fun a -> - match parseBudgetAmount a with - | Ok kv -> Some kv - | Error _ -> None) - |> List.groupBy fst - |> List.map (fun (c, xs) -> c, xs |> List.sumBy snd) - |> Map.ofList + | Some amounts -> amountsPerCurrency amounts let private violation (msg: string) : ARCPError = ARCPError.LeaseSubsetViolation(msg, None) /// Longest prefix of `pattern` containing no glob metacharacters. let private literalPrefix (pattern: string) : string = - let mutable i = 0 - let mutable stop = false - - while not stop && i < pattern.Length do - match pattern.[i] with - | '*' - | '?' -> stop <- true - | _ -> i <- i + 1 - - pattern.Substring(0, i) + match pattern.IndexOfAny([| '*'; '?' |]) with + | -1 -> pattern + | idx -> pattern.Substring(0, idx) let private isLiteralPattern (pattern: string) : bool = not (pattern.Contains '*') && not (pattern.Contains '?') @@ -181,12 +188,7 @@ module Lease = child.Capabilities |> Map.tryFind Capabilities.CostBudget |> Option.bind (fun amts -> - amts - |> List.choose (fun a -> - match parseBudgetAmount a with - | Ok kv -> Some kv - | Error _ -> None) - |> Map.ofList + amountsPerCurrency amts |> Map.toSeq |> Seq.tryPick (fun (currency, requested) -> let remaining = Map.tryFind currency parentRemaining |> Option.defaultValue 0m @@ -220,15 +222,15 @@ module Lease = (parentExpiresAt: DateTimeOffset option) (childExpiresAt: DateTimeOffset option) : Result = - match subsetNamespaces child parent with - | Some e -> Error e - | None -> - match subsetBudget child parentRemainingBudget with + [ + subsetNamespaces child parent + subsetBudget child parentRemainingBudget + subsetExpiry parentExpiresAt childExpiresAt + ] + |> List.tryPick id + |> function | Some e -> Error e - | None -> - match subsetExpiry parentExpiresAt childExpiresAt with - | Some e -> Error e - | None -> Ok() + | None -> Ok() /// Stateless authorisation check. Order: namespace+glob match, /// then expiry (§9.5), then per-currency budget counter (§9.6). diff --git a/src/Arcp.Core/Messages.fs b/src/Arcp.Core/Messages.fs index 436dd10..a44974f 100644 --- a/src/Arcp.Core/Messages.fs +++ b/src/Arcp.Core/Messages.fs @@ -25,7 +25,17 @@ type SessionHelloPayload = Client: ClientIdentity Auth: AuthPayload Capabilities: HelloCapabilities - Resume: ResumeRequest option + } + +/// `session.resume` payload (spec §6.3). Sent in place of +/// `session.hello` to reattach to an existing session and replay +/// buffered events from `last_event_seq`. + +type SessionResumePayload = + { + SessionId: string + ResumeToken: string + LastEventSeq: int64 } /// `session.welcome` payload (spec §6.2). @@ -100,9 +110,15 @@ type SessionJobsPayload = NextCursor: string option } -/// `session.bye` payload (spec §6.7). +/// `session.close` payload (spec §6.7). Sent by the client to +/// terminate a session gracefully. + +type SessionClosePayload = { Reason: string option } + +/// `session.closed` payload (spec §6.7). Runtime acknowledgement of +/// a `session.close`. -type SessionByePayload = { Reason: string option } +type SessionClosedPayload = { Reason: string option } /// `session.error` payload (spec §12). @@ -175,6 +191,11 @@ type JobErrorPayload = type JobCancelPayload = { JobId: string; Reason: string option } +/// `job.cancelled` payload (spec §7.4). Runtime acknowledgement of a +/// `job.cancel` request; the terminal `job.error(CANCELLED)` follows. + +type JobCancelledPayload = { JobId: string } + /// `job.subscribe` payload (spec §7.6). type JobSubscribePayload = @@ -205,13 +226,15 @@ type JobUnsubscribePayload = { JobId: string } [] type Message = | SessionHello of SessionHelloPayload + | SessionResume of SessionResumePayload | SessionWelcome of SessionWelcomePayload | SessionPing of SessionPingPayload | SessionPong of SessionPongPayload | SessionAck of SessionAckPayload | SessionListJobs of SessionListJobsPayload | SessionJobs of SessionJobsPayload - | SessionBye of SessionByePayload + | SessionClose of SessionClosePayload + | SessionClosed of SessionClosedPayload | SessionError of SessionErrorPayload | JobSubmit of JobSubmitPayload | JobAccepted of JobAcceptedPayload @@ -219,6 +242,7 @@ type Message = | JobResult of JobResultPayload | JobError of JobErrorPayload | JobCancel of JobCancelPayload + | JobCancelled of JobCancelledPayload | JobSubscribe of JobSubscribePayload | JobSubscribed of JobSubscribedPayload | JobUnsubscribe of JobUnsubscribePayload @@ -229,13 +253,15 @@ module Message = let typeOf (m: Message) : string = match m with | Message.SessionHello _ -> "session.hello" + | Message.SessionResume _ -> "session.resume" | Message.SessionWelcome _ -> "session.welcome" | Message.SessionPing _ -> "session.ping" | Message.SessionPong _ -> "session.pong" | Message.SessionAck _ -> "session.ack" | Message.SessionListJobs _ -> "session.list_jobs" | Message.SessionJobs _ -> "session.jobs" - | Message.SessionBye _ -> "session.bye" + | Message.SessionClose _ -> "session.close" + | Message.SessionClosed _ -> "session.closed" | Message.SessionError _ -> "session.error" | Message.JobSubmit _ -> "job.submit" | Message.JobAccepted _ -> "job.accepted" @@ -243,6 +269,7 @@ module Message = | Message.JobResult _ -> "job.result" | Message.JobError _ -> "job.error" | Message.JobCancel _ -> "job.cancel" + | Message.JobCancelled _ -> "job.cancelled" | Message.JobSubscribe _ -> "job.subscribe" | Message.JobSubscribed _ -> "job.subscribed" | Message.JobUnsubscribe _ -> "job.unsubscribe" diff --git a/src/Arcp.Core/Trace.fs b/src/Arcp.Core/Trace.fs index b002d00..76b86e1 100644 --- a/src/Arcp.Core/Trace.fs +++ b/src/Arcp.Core/Trace.fs @@ -1,6 +1,7 @@ namespace ARCP.Core open System +open System.Security.Cryptography /// W3C Trace Context identifiers. Trace ids are 32 hex chars, /// span ids 16 hex chars; the strings are kept opaque on the wire. @@ -19,22 +20,21 @@ type SpanId = member this.Value = let (SpanId v) = this in v override this.ToString() = this.Value -[] -module TraceId = - let private hex (count: int) = +/// Cryptographically random hex string. Trace/span ids cross trust +/// boundaries, so guess-resistance matters (W3C Trace Context). +[] +module private TraceRandom = + let cryptoHex (count: int) = let buf = Array.zeroCreate count - Random.Shared.NextBytes(buf) + RandomNumberGenerator.Fill(Span(buf)) Convert.ToHexString(buf).ToLowerInvariant() - let newId () : TraceId = TraceId(hex 16) +[] +module TraceId = + let newId () : TraceId = TraceId(cryptoHex 16) let ofString (s: string) : TraceId = TraceId s [] module SpanId = - let private hex (count: int) = - let buf = Array.zeroCreate count - Random.Shared.NextBytes(buf) - Convert.ToHexString(buf).ToLowerInvariant() - - let newId () : SpanId = SpanId(hex 8) + let newId () : SpanId = SpanId(cryptoHex 8) let ofString (s: string) : SpanId = SpanId s diff --git a/src/Arcp.Giraffe/GiraffeEndpoint.fs b/src/Arcp.Giraffe/GiraffeEndpoint.fs index 6501817..f34bad6 100644 --- a/src/Arcp.Giraffe/GiraffeEndpoint.fs +++ b/src/Arcp.Giraffe/GiraffeEndpoint.fs @@ -19,10 +19,13 @@ module ArcpGiraffe = fun (next: HttpFunc) (ctx: HttpContext) -> task { if ctx.Request.Path.Value <> path then + // Only fall through to the next handler on a path miss. return! next ctx elif not ctx.WebSockets.IsWebSocketRequest then + // §40: short-circuit so downstream handlers cannot + // overwrite the 400 status. ctx.Response.StatusCode <- StatusCodes.Status400BadRequest - return! next ctx + return Some ctx else let! socket = ctx.WebSockets.AcceptWebSocketAsync() @@ -30,5 +33,7 @@ module ArcpGiraffe = new WebSocketClientTransport(socket, ownsSocket = true) :> ITransport do! server.HandleSessionAsync(transport, ctx.RequestAborted) - return! next ctx + // §40: the response is hijacked by the upgrade; do not + // continue the pipeline. + return Some ctx } diff --git a/src/Arcp.Otel/ArcpActivitySource.fs b/src/Arcp.Otel/ArcpActivitySource.fs index 66b9c81..11e0ed1 100644 --- a/src/Arcp.Otel/ArcpActivitySource.fs +++ b/src/Arcp.Otel/ArcpActivitySource.fs @@ -1,13 +1,17 @@ namespace ARCP.Otel open System.Diagnostics +open ARCP.Core /// Shared `ActivitySource` for ARCP-emitted spans (spec §11). module ArcpActivitySource = [] let Name = "ARCP" - let Instance = new ActivitySource(Name, "1.0.0") + /// Lazily-created shared source (#42): not allocated at module load + /// and versioned from `Version.Sdk` so the version has one source of + /// truth. Access the source via `Instance.Value`. + let Instance = lazy (new ActivitySource(Name, Version.Sdk)) /// Canonical attribute keys for ARCP spans (spec §11). module ArcpSpanAttributes = diff --git a/src/Arcp.Otel/ArcpOtel.fs b/src/Arcp.Otel/ArcpOtel.fs index a08f42c..501e2b2 100644 --- a/src/Arcp.Otel/ArcpOtel.fs +++ b/src/Arcp.Otel/ArcpOtel.fs @@ -18,7 +18,7 @@ module ArcpOtel = (constraints: LeaseConstraints option) : Activity option = let activity = - ArcpActivitySource.Instance.StartActivity("arcp.job", ActivityKind.Internal) + ArcpActivitySource.Instance.Value.StartActivity("arcp.job", ActivityKind.Internal) match activity with | null -> None diff --git a/src/Arcp.Runtime/ArcpServer.fs b/src/Arcp.Runtime/ArcpServer.fs index 33ff95e..3ecd507 100644 --- a/src/Arcp.Runtime/ArcpServer.fs +++ b/src/Arcp.Runtime/ArcpServer.fs @@ -65,6 +65,15 @@ type ArcpServer(options: ArcpServerOptions) = "Provisioned credentials require an explicit CredentialStore for revocation reliability." | _ -> () + // §14: credentials MUST only be issued over authenticated + // transports. Allowing anonymous sessions alongside a + // provisioner would leak minted credentials to unauthenticated + // peers, so the combination is rejected at startup. + if options.AllowAnonymousAuth && options.Provisioner.IsSome then + invalidArg + "options" + "AllowAnonymousAuth cannot be combined with a credential Provisioner (§14): credentials must only be issued over authenticated sessions." + let inventory = AgentInventoryStore() let provisioner = @@ -94,8 +103,22 @@ type ArcpServer(options: ArcpServerOptions) = ) let sessions = ConcurrentDictionary() + + // Sessions whose transport dropped but whose buffered events are + // still within the resume window (spec §6.3). A `session.resume` + // reattaches one of these; pruning removes them with the window. + let resumable = ConcurrentDictionary() let agentHandlers = ConcurrentDictionary() + // Highest acked seq for a live session; a gone session can no + // longer ack, so its buffered events age out by the window alone. + let lastAckedFor (sid: string) : int64 = + match sessions.TryGetValue sid with + | true, ctx -> ctx.LastAckedSeq + | _ -> Int64.MaxValue + + let isSessionActive (sid: string) : bool = sessions.ContainsKey sid + // `JobManager` and the real outbox are mutually dependent: the // outbox needs `jobs` (for Subscribers / Terminate) and `jobs` // needs the outbox. We break the cycle with a ref cell that @@ -109,9 +132,41 @@ type ArcpServer(options: ArcpServerOptions) = member _.EmitJobEventAsync(rec0, body) = (!outbox).EmitJobEventAsync(rec0, body) member _.EmitJobResultAsync(rec0, p) = (!outbox).EmitJobResultAsync(rec0, p) member _.EmitJobErrorAsync(rec0, p) = (!outbox).EmitJobErrorAsync(rec0, p) + + member _.EmitCredentialRotatedAsync(rec0, cid, v) = + (!outbox).EmitCredentialRotatedAsync(rec0, cid, v) } ) + // Periodic pruning (spec §6.3 resume window): evict aged/acked + // buffered events, release the buffers of gone sessions, and evict + // terminal job records past the retention window. + let pruneIntervalSec = max 1 (options.ResumeWindowSec / 4) + + let prune () = + try + eventLog.EvictExpired(lastAckedFor) |> ignore + eventLog.PruneEmpty isSessionActive + + let cutoff = + options.TimeProvider.GetUtcNow().AddSeconds(-float options.ResumeWindowSec) + + jobs.EvictTerminated cutoff |> ignore + + for kv in resumable do + if kv.Value.LastInboundAt < cutoff then + resumable.TryRemove kv.Key |> ignore + with _ -> + () + + let pruneTimer = + options.TimeProvider.CreateTimer( + TimerCallback(fun _ -> prune ()), + null, + TimeSpan.FromSeconds(float pruneIntervalSec), + TimeSpan.FromSeconds(float pruneIntervalSec) + ) + let registerHandler (name: string) (version: string) (h: ArcpAgentHandler) = agentHandlers.[name + "@" + version] <- h // The inventory stores an `AgentHandler` purely as a presence @@ -147,34 +202,83 @@ type ArcpServer(options: ArcpServerOptions) = { new IJobOutbox with member _.EmitJobEventAsync(record, body) = task { - do! EnvelopeOut.pushJobEvent sessions options.TimeProvider record.SessionId record.JobId body - - for sid in jobs.Subscriptions.Subscribers record.JobId do - do! EnvelopeOut.pushJobEvent sessions options.TimeProvider sid record.JobId body - - record.LastEventSeq <- record.LastEventSeq + 1L + // §7.3/§9.5: no events after a terminal message. + if not record.TerminalEmitted then + let! ownerSeq = + EnvelopeOut.pushJobEventSeq sessions options.TimeProvider record.SessionId record.JobId body + + for sid in jobs.Subscriptions.Subscribers record.JobId do + do! EnvelopeOut.pushJobEvent sessions options.TimeProvider sid record.JobId body + + // §111: report the owning session's event_seq (not a + // per-job counter); atomic write avoids lost updates. + match ownerSeq with + | Some s -> System.Threading.Interlocked.Exchange(&record.LastEventSeq, s) |> ignore + | None -> () } :> Task member _.EmitJobResultAsync(record, payload) = task { - do! EnvelopeOut.pushJobResult sessions record.SessionId record.JobId payload + // Exactly one terminal message wins (§7.3, §9.5). + if jobs.TryClaimTerminal record then + do! EnvelopeOut.pushJobResult sessions record.SessionId record.JobId payload - for sid in jobs.Subscriptions.Subscribers record.JobId do - do! EnvelopeOut.pushJobResult sessions sid record.JobId payload + for sid in jobs.Subscriptions.Subscribers record.JobId do + do! EnvelopeOut.pushJobResult sessions sid record.JobId payload - jobs.Terminate(record.JobId, payload.FinalStatus) + jobs.Terminate(record.JobId, payload.FinalStatus) } :> Task member _.EmitJobErrorAsync(record, payload) = task { - do! EnvelopeOut.pushJobError sessions record.SessionId record.JobId payload + if jobs.TryClaimTerminal record then + do! EnvelopeOut.pushJobError sessions record.SessionId record.JobId payload + + for sid in jobs.Subscriptions.Subscribers record.JobId do + do! EnvelopeOut.pushJobError sessions sid record.JobId payload + + jobs.Terminate(record.JobId, payload.FinalStatus) + } + :> Task + + member _.EmitCredentialRotatedAsync(record, credentialId, newValue) = + task { + // §14/§9.8.2: the submitting session gets the new + // value; subscribers get a redacted body (id only). + let ownerBody = + JobEventBody.Status( + StatusPhases.CredentialRotated, + Some( + Json.serialize + {| + id = credentialId + value = newValue + |} + ) + ) + + let redactedBody = + JobEventBody.Status( + StatusPhases.CredentialRotated, + Some(Json.serialize {| id = credentialId |}) + ) + + let! ownerSeq = + EnvelopeOut.pushJobEventSeq + sessions + options.TimeProvider + record.SessionId + record.JobId + ownerBody for sid in jobs.Subscriptions.Subscribers record.JobId do - do! EnvelopeOut.pushJobError sessions sid record.JobId payload + do! EnvelopeOut.pushJobEvent sessions options.TimeProvider sid record.JobId redactedBody - jobs.Terminate(record.JobId, JobStatus.Error) + match ownerSeq with + | Some s -> System.Threading.Interlocked.Exchange(&record.LastEventSeq, s) |> ignore + | None -> () } :> Task } @@ -188,6 +292,17 @@ type ArcpServer(options: ArcpServerOptions) = : Task = task { match msg, ctxRef.Value with + | Message.SessionHello _, Some ctx -> + // §106: a second hello on an established session would leak + // the prior context; reject it instead of re-handshaking. + do! + EnvelopeOut.respondWithError + ctx + env.Id + (ARCPError.InvalidRequest("Session already established", None)) + ct + + return true | Message.SessionHello hello, _ -> let! ctxOpt = SessionHandshake.handleAsync @@ -211,8 +326,16 @@ type ArcpServer(options: ArcpServerOptions) = sessions.[ctx.SessionId.Value] <- ctx return true | None -> return false + | Message.SessionResume resume, _ -> return! this.HandleSessionResumeAsync transport ctxRef env.Id resume ct | _, None -> return true - | Message.SessionBye _, Some _ -> return false + | Message.SessionClose _, Some ctx -> + let envOut = + Message.SessionClosed { Reason = None } + |> Codec.toEnvelope + |> Envelope.withSessionId ctx.SessionId + + do! transport.SendAsync(envOut, ct) + return false | Message.SessionPing p, Some ctx -> let pong: SessionPongPayload = { @@ -250,14 +373,7 @@ type ArcpServer(options: ArcpServerOptions) = return true | Message.JobCancel c, Some ctx -> - match jobs.TryGet(JobId.ofString c.JobId) with - | Some r when r.SessionId = ctx.SessionId -> - try - r.Cancellation.Cancel() - with _ -> - () - | _ -> () - + do! this.HandleJobCancelAsync env.Id ctx c ct return true | Message.JobSubscribe s, Some ctx -> do! this.HandleJobSubscribeAsync env.Id ctx s ct @@ -289,7 +405,26 @@ type ArcpServer(options: ArcpServerOptions) = let env = enumerator.Current match Codec.toMessage env with - | Error _ -> () + | Error err -> + // §12: malformed payloads / unknown types get a + // correlated INVALID_REQUEST; the session survives. + let payload: SessionErrorPayload = + { + Code = ARCPError.code err + Message = ARCPError.message err + Retryable = ARCPError.retryable err + Details = ARCPError.details err + } + + let envOut = + Message.SessionError payload |> Codec.toEnvelope |> Envelope.withId env.Id + + let envOut = + match ctxRef.Value with + | Some ctx -> Envelope.withSessionId ctx.SessionId envOut + | None -> envOut + + do! transport.SendAsync(envOut, ct) | Ok msg -> let! keepGoing = this.DispatchMessage transport ctxRef env msg ct @@ -304,6 +439,10 @@ type ArcpServer(options: ArcpServerOptions) = | Some ctx -> jobs.Subscriptions.UnsubscribeAll ctx.SessionId sessions.TryRemove ctx.SessionId.Value |> ignore + // Retain as resumable within the window (spec §6.3, §6.7): + // in-flight jobs keep running and the client may reattach. + ctx.LastInboundAt <- options.TimeProvider.GetUtcNow() + resumable.[ctx.SessionId.Value] <- ctx | None -> () } :> Task @@ -323,29 +462,57 @@ type ArcpServer(options: ArcpServerOptions) = (ARCPError.InvalidRequest("list_jobs feature not negotiated", None)) ct else - let filtered = + // §6.6: bare-name agent filter matches any version; + // `name@version` matches that version exactly. + let agentMatches (filterAgent: string) (jobAgent: string) = + jobAgent = filterAgent || jobAgent.StartsWith(filterAgent + "@") + + // Stable ordering by JobId (ULIDs are time-ordered), so + // repeated requests page deterministically (§6.6, §109). + let ordered = jobs.AllForPrincipal ctx.Principal.Id |> Seq.filter (fun r -> match req.Filter with | None -> true | Some f -> (f.Status |> Option.map (List.contains r.Status) |> Option.defaultValue true) - && (f.Agent |> Option.map (fun a -> r.Agent = a) |> Option.defaultValue true) + && (f.Agent + |> Option.map (fun a -> agentMatches a r.Agent) + |> Option.defaultValue true) && (f.CreatedAfter |> Option.map (fun ca -> r.CreatedAt >= ca) |> Option.defaultValue true)) - |> Seq.toList + |> Seq.sortBy (fun r -> r.JobId.Value) + + // Skip past the cursor (the last JobId of the prior page). + let afterCursor = + match req.Cursor with + | Some c -> ordered |> Seq.filter (fun r -> r.JobId.Value > c) + | None -> ordered - let limited = + let limit = match req.Limit with - | Some n when n > 0 -> List.truncate n filtered - | _ -> filtered + | Some n when n > 0 -> n + | _ -> Int32.MaxValue + + // Take limit+1 to detect whether more pages remain without + // materialising the entire visible set (§91). + let takeCount = if limit = Int32.MaxValue then limit else limit + 1 + let page = afterCursor |> Seq.truncate takeCount |> Seq.toList + let hasMore = List.length page > limit + let pageRows = page |> List.truncate limit + + let nextCursor = + if hasMore then + pageRows |> List.tryLast |> Option.map (fun r -> r.JobId.Value) + else + None let resp: SessionJobsPayload = { RequestId = requestId - Jobs = limited |> List.map jobs.ToSummary - NextCursor = None + Jobs = pageRows |> List.map jobs.ToSummary + NextCursor = nextCursor } let env = @@ -383,27 +550,200 @@ type ArcpServer(options: ArcpServerOptions) = (ARCPError.PermissionDenied("Subscribe denied", None)) ct | Some record -> - jobs.Subscriptions.Subscribe(record.JobId, ctx.SessionId) + let wantHistory = sub.History |> Option.defaultValue false + let fromSeq = sub.FromEventSeq |> Option.defaultValue 0L + + // §7.6: gather buffered `job.event`s for replay (from the + // owning session's log) before registering live delivery. + let replayResult = + if wantHistory then + eventLog.Replay(record.SessionId, fromSeq) + |> Result.map (fun entries -> + entries + |> Seq.filter (fun e -> + e.Envelope.JobId = Some record.JobId.Value && e.Envelope.Type = "job.event") + |> Seq.toList) + else + Ok [] + + match replayResult with + | Error _ -> + // Buffer no longer covers from_event_seq. + do! + EnvelopeOut.respondWithError + ctx + requestId + (ARCPError.ResumeWindowExpired(fromSeq, options.ResumeWindowSec)) + ct + | Ok replayEntries -> + jobs.Subscriptions.Subscribe(record.JobId, ctx.SessionId) + + let payload: JobSubscribedPayload = + { + JobId = record.JobId.Value + CurrentStatus = record.Status + Agent = record.Agent + Lease = record.Lease + ParentJobId = record.ParentJobId + TraceId = record.TraceId + // §111: report position in the subscriber's + // own session seq space. + SubscribedFrom = eventLog.CurrentSeq ctx.SessionId + Replayed = not (List.isEmpty replayEntries) + } + + let env = + Message.JobSubscribed payload + |> Codec.toEnvelope + |> Envelope.withId requestId + |> Envelope.withSessionId ctx.SessionId + |> Envelope.withJobId record.JobId + + do! ctx.Transport.SendAsync(env, ct) + + // Replay buffered events into the subscriber's seq space + // before live events flow. + for e in replayEntries do + match Codec.toMessage e.Envelope with + | Ok(Message.JobEvent p) -> + do! + EnvelopeOut.pushJobEvent + sessions + options.TimeProvider + ctx.SessionId + record.JobId + p.Body + | _ -> () + } + :> Task - let payload: JobSubscribedPayload = + /// Handle `job.cancel` (spec §7.4). Acknowledge with `job.cancelled` + /// then trigger cancellation; the terminal `job.error(CANCELLED)` is + /// emitted by the launcher. Unknown ids return `JOB_NOT_FOUND`; + /// requests from a non-owning session return `PERMISSION_DENIED`. + member private _.HandleJobCancelAsync + (requestId: string) + (ctx: ServerSessionContext) + (cancel: JobCancelPayload) + (ct: CancellationToken) + : Task = + task { + match jobs.TryGet(JobId.ofString cancel.JobId) with + | None -> do! EnvelopeOut.respondWithError ctx requestId (ARCPError.JobNotFound cancel.JobId) ct + | Some r when r.SessionId <> ctx.SessionId -> + do! + EnvelopeOut.respondWithError + ctx + requestId + (ARCPError.PermissionDenied("Only the submitting session may cancel this job", None)) + ct + | Some r -> + let ack = + Message.JobCancelled { JobId = cancel.JobId } + |> Codec.toEnvelope + |> Envelope.withId requestId + |> Envelope.withSessionId ctx.SessionId + |> Envelope.withJobId r.JobId + + do! ctx.Transport.SendAsync(ack, ct) + + try + r.Cancellation.Cancel() + with _ -> + () + } + :> Task + + /// Handle `session.resume` (spec §6.3). Validates the presented + /// `(session_id, resume_token)` against a resumable session, replays + /// buffered events with `seq > last_event_seq`, rotates the resume + /// token, and resends a `session.welcome`. Returns `true` (and sets + /// `ctxRef`) on success; `false` after a `RESUME_WINDOW_EXPIRED`. + member private _.HandleSessionResumeAsync + (transport: ITransport) + (ctxRef: ServerSessionContext option ref) + (requestId: string) + (resume: SessionResumePayload) + (ct: CancellationToken) + : Task = + task { + let windowError = + ARCPError.ResumeWindowExpired(resume.LastEventSeq, options.ResumeWindowSec) + + let writeError (err: ARCPError) : Task = + let payload: SessionErrorPayload = + { + Code = ARCPError.code err + Message = ARCPError.message err + Retryable = ARCPError.retryable err + Details = ARCPError.details err + } + + let envOut = + Message.SessionError payload |> Codec.toEnvelope |> Envelope.withId requestId + + transport.SendAsync(envOut, ct) + + match resumable.TryGetValue resume.SessionId with + | true, ctx when ctx.ResumeToken = resume.ResumeToken -> + match eventLog.Replay(ctx.SessionId, resume.LastEventSeq) with + | Error _ -> + do! writeError windowError + return false + | Ok entries -> + let sid = ctx.SessionId + // Re-point the session at the new transport and rotate + // the resume token (it rotates on every welcome, §6.3). + ctx.Transport <- transport + ctx.ResumeToken <- (MessageId.newId ()).Value + ctx.LastInboundAt <- options.TimeProvider.GetUtcNow() + sessions.[sid.Value] <- ctx + resumable.TryRemove sid.Value |> ignore + + let agents = + if ctx.NegotiatedFeatures.Contains Features.AgentVersions then + AgentInventory.Rich(inventory.ToRichInventory()) + else + AgentInventory.Flat(inventory.ToFlatInventory()) + + let welcome: SessionWelcomePayload = { - JobId = record.JobId.Value - CurrentStatus = record.Status - Agent = record.Agent - Lease = record.Lease - ParentJobId = record.ParentJobId - TraceId = record.TraceId - SubscribedFrom = record.LastEventSeq - Replayed = sub.History |> Option.defaultValue false + Runtime = options.Runtime + ResumeToken = ctx.ResumeToken + ResumeWindowSec = ctx.ResumeWindowSec + HeartbeatIntervalSec = ctx.HeartbeatIntervalSec + Capabilities = + { + Encodings = [ "json" ] + Features = ctx.NegotiatedFeatures + Agents = agents + } } - let env = - Message.JobSubscribed payload + let welcomeEnv = + Message.SessionWelcome welcome |> Codec.toEnvelope + |> Envelope.withSessionId sid |> Envelope.withId requestId - |> Envelope.withSessionId ctx.SessionId - |> Envelope.withJobId record.JobId - do! ctx.Transport.SendAsync(env, ct) + do! transport.SendAsync(welcomeEnv, ct) + + // Replay buffered events the client missed. + for entry in entries do + do! transport.SendAsync(entry.Envelope, ct) + + ctxRef.Value <- Some ctx + return true + | _ -> + // Unknown session or token mismatch: the buffer no longer + // covers the request. + do! writeError windowError + return false } - :> Task + + interface IDisposable with + member _.Dispose() = + try + pruneTimer.Dispose() + with _ -> + () diff --git a/src/Arcp.Runtime/Auth/Bearer.fs b/src/Arcp.Runtime/Auth/Bearer.fs index 6f6c398..b2fbaa0 100644 --- a/src/Arcp.Runtime/Auth/Bearer.fs +++ b/src/Arcp.Runtime/Auth/Bearer.fs @@ -1,20 +1,43 @@ namespace ARCP.Runtime.Auth +open System open System.Collections.Generic +open System.Security.Cryptography +open System.Text open System.Threading open System.Threading.Tasks open ARCP.Core /// Bearer verifier that accepts a static token → principal table. /// Suitable for tests and the developer-mode CLI; production -/// deployments should plug their own `IBearerVerifier`. +/// deployments should plug their own `IBearerVerifier`. The table is +/// expected to be small (token comparison is linear and constant-time). type StaticBearerVerifier(tokens: IReadOnlyDictionary) = interface IBearerVerifier with member _.VerifyAsync(token, _ct) = task { - match tokens.TryGetValue token with - | true, principalId -> return Ok(StringPrincipal(principalId) :> IPrincipal) - | _ -> return Error(ARCPError.Unauthenticated "Invalid bearer token") + // §119: compare with FixedTimeEquals so verification does + // not short-circuit on the first differing character. + let presented = Encoding.UTF8.GetBytes token + + let matched = + tokens + |> Seq.tryPick (fun kv -> + let candidate = Encoding.UTF8.GetBytes kv.Key + + if + CryptographicOperations.FixedTimeEquals( + ReadOnlySpan(presented), + ReadOnlySpan(candidate) + ) + then + Some kv.Value + else + None) + + match matched with + | Some principalId -> return Ok(StringPrincipal(principalId) :> IPrincipal) + | None -> return Error(ARCPError.Unauthenticated "Invalid bearer token") } /// Allows any non-empty token; useful for local development. @@ -22,7 +45,8 @@ type DevModeBearerVerifier() = interface IBearerVerifier with member _.VerifyAsync(token, _ct) = task { - if System.String.IsNullOrEmpty token then + // §54: reject whitespace-only tokens, not just null/empty. + if System.String.IsNullOrWhiteSpace token then return Error(ARCPError.Unauthenticated "Missing bearer token") else return Ok(StringPrincipal("dev:" + token) :> IPrincipal) diff --git a/src/Arcp.Runtime/Auth/IPrincipal.fs b/src/Arcp.Runtime/Auth/IPrincipal.fs index 1842136..d61d49a 100644 --- a/src/Arcp.Runtime/Auth/IPrincipal.fs +++ b/src/Arcp.Runtime/Auth/IPrincipal.fs @@ -21,9 +21,16 @@ type IBearerVerifier = abstract member VerifyAsync: token: string * ct: CancellationToken -> Task> /// Anonymous principal used for `auth.scheme = "none"`. +/// +/// Each instance carries a unique id (§14, §6.6) so two anonymous +/// connections are distinct principals and cannot list or subscribe to +/// each other's jobs. As a consequence anonymous principals cannot +/// observe their own jobs across separate connections. type AnonymousPrincipal() = + let id = "anon:" + System.Guid.NewGuid().ToString("N") + interface IPrincipal with - member _.Id = "anonymous" + member _.Id = id member _.Labels = Map.empty /// Simple principal that wraps an id string. diff --git a/src/Arcp.Runtime/CredentialProvisioner.fs b/src/Arcp.Runtime/CredentialProvisioner.fs index a13ebe5..ef1f237 100644 --- a/src/Arcp.Runtime/CredentialProvisioner.fs +++ b/src/Arcp.Runtime/CredentialProvisioner.fs @@ -17,6 +17,19 @@ type CredentialIssueContext = ParentJobId: JobId option } +/// Outcome of a single upstream revocation attempt (§9.8.2). Splits +/// the old boolean so success and permanent failure are distinct: a +/// credential that permanently failed to revoke must NOT be treated as +/// revoked (it stays outstanding for operator reconciliation). +[] +type RevocationOutcome = + /// Confirmed revoked upstream. + | Revoked + /// Transient failure; the registry should retry. + | Transient + /// Permanent failure; further retries are futile. + | Permanent + /// Vendor-neutral provisioner. Implementations for LiteLLM, Anthropic /// admin keys, or internal gateways live outside core runtime wiring. type ICredentialProvisioner = @@ -24,10 +37,10 @@ type ICredentialProvisioner = /// callers must treat each `Value` as a secret. abstract member IssueAsync: ctx: CredentialIssueContext * ct: CancellationToken -> Task - /// Revoke a credential upstream. Return `false` for transient - /// failures that should be retried, `true` once no further retry - /// is useful. - abstract member RevokeAsync: credentialId: string * ct: CancellationToken -> Task + /// Revoke a credential upstream. Return `Transient` for failures + /// that should be retried, `Revoked` on confirmed success, and + /// `Permanent` when further retries are futile. + abstract member RevokeAsync: credentialId: string * ct: CancellationToken -> Task /// Durable per-credential store. Deployments that need revocation to /// survive process restart should back this with their own database. @@ -56,4 +69,6 @@ type InMemoryCredentialStore() = type NoOpCredentialProvisioner() = interface ICredentialProvisioner with member _.IssueAsync(_ctx, _ct) = Task.FromResult [] - member _.RevokeAsync(_id, _ct) = Task.FromResult true + + member _.RevokeAsync(_id, _ct) = + Task.FromResult RevocationOutcome.Revoked diff --git a/src/Arcp.Runtime/Internal/CredentialRegistry.fs b/src/Arcp.Runtime/Internal/CredentialRegistry.fs index 6bc6633..b5f557a 100644 --- a/src/Arcp.Runtime/Internal/CredentialRegistry.fs +++ b/src/Arcp.Runtime/Internal/CredentialRegistry.fs @@ -11,7 +11,11 @@ open ARCP.Runtime /// best-effort revocation with bounded retry. type internal CredentialRegistry(provisioner: ICredentialProvisioner, store: ICredentialStore) = let perJob = ConcurrentDictionary>() - let retryDelays = [ 200; 1000; 5000 ] + let retryDelays = [| 200; 1000; 5000 |] + + // §9.8.2/§14: credentials whose revocation permanently failed, kept + // for operator inspection (`RevocationFailures`). + let revocationFailures = ConcurrentDictionary() let remember (jobId: JobId) (credentialId: string) = let ids = @@ -31,18 +35,27 @@ type internal CredentialRegistry(provisioner: ICredentialProvisioner, store: ICr let revokeWithRetryAsync (jobIdOpt: JobId option) (credentialId: string) (ct: CancellationToken) = task { let mutable revoked = false + let mutable stop = false let mutable attempt = 0 - while not revoked && attempt < retryDelays.Length do - let! doneOrPermanent = provisioner.RevokeAsync(credentialId, ct) + // §49: only `Revoked` counts as success. `Permanent` stops + // retrying but leaves the credential outstanding; exhausted + // `Transient` retries are also a (non-confirmed) failure. + while not stop && attempt < retryDelays.Length do + let! outcome = provisioner.RevokeAsync(credentialId, ct) - if doneOrPermanent then + match outcome with + | RevocationOutcome.Revoked -> revoked <- true - else + stop <- true + | RevocationOutcome.Permanent -> stop <- true + | RevocationOutcome.Transient -> do! Task.Delay(retryDelays.[attempt], ct) attempt <- attempt + 1 if revoked then + revocationFailures.TryRemove credentialId |> ignore + match jobIdOpt with | Some jobId -> do! store.RecordRevokedAsync(jobId, credentialId) @@ -54,8 +67,26 @@ type internal CredentialRegistry(provisioner: ICredentialProvisioner, store: ICr if id = credentialId then do! store.RecordRevokedAsync(jobId, credentialId) forget jobId credentialId + else + // §9.8.2: permanent revocation failures MUST be logged; §14: + // surfaced to operators. Record for `RevocationFailures`. + let jobLabel = + jobIdOpt |> Option.map (fun j -> j.Value) |> Option.defaultValue "" + + revocationFailures.[credentialId] <- jobLabel + + eprintfn + "[ARCP] WARN credential revocation failed permanently: credential_id=%s job_id=%s" + credentialId + jobLabel } + /// Credentials whose revocation permanently failed, as + /// `(credentialId, jobId)` pairs. Operators can poll this to find + /// dangling credentials (§9.8.2, §14). + member _.RevocationFailures: (string * string) list = + revocationFailures |> Seq.map (fun kv -> kv.Key, kv.Value) |> List.ofSeq + member _.Track(jobId: JobId, cred: Credential) : Task = task { remember jobId cred.Id diff --git a/src/Arcp.Runtime/Internal/EnvelopeOut.fs b/src/Arcp.Runtime/Internal/EnvelopeOut.fs index 1d154a7..422b373 100644 --- a/src/Arcp.Runtime/Internal/EnvelopeOut.fs +++ b/src/Arcp.Runtime/Internal/EnvelopeOut.fs @@ -17,34 +17,56 @@ module internal EnvelopeOut = /// Send `env` over the transport belonging to `sid`. If /// `attachSeq` is true the envelope is recorded in the event /// log first (which assigns its `event_seq`). - let pushEnvelope + /// Returns the session `event_seq` assigned to the sent envelope + /// (when `attachSeq` and the session is live), else `None`. + let pushEnvelopeSeq (sessions: ConcurrentDictionary) (sid: SessionId) (env: Envelope) (attachSeq: bool) - : Task = + : Task = task { match sessions.TryGetValue sid.Value with | true, sctx -> - let envOut = - if attachSeq then - let entry = sctx.EventLog.Append(sid, env) - entry.Envelope - else - Envelope.withSessionId sid env - - do! sctx.Transport.SendAsync(envOut, CancellationToken.None) - | _ -> () + // §8.3: hold the per-session gate across seq assignment and + // the send so concurrent emitters cannot reorder events. + do! sctx.SendGate.WaitAsync() + + try + let envOut, seq = + if attachSeq then + let entry = sctx.EventLog.Append(sid, env) + entry.Envelope, Some entry.EventSeq + else + Envelope.withSessionId sid env, None + + do! sctx.Transport.SendAsync(envOut, CancellationToken.None) + return seq + finally + sctx.SendGate.Release() |> ignore + | _ -> return None + } + + let pushEnvelope + (sessions: ConcurrentDictionary) + (sid: SessionId) + (env: Envelope) + (attachSeq: bool) + : Task = + task { + let! _ = pushEnvelopeSeq sessions sid env attachSeq + return () } :> Task - let pushJobEvent + /// Send a `job.event` and return the owning session's assigned seq. + let pushJobEventSeq (sessions: ConcurrentDictionary) (timeProvider: TimeProvider) (sid: SessionId) (jobId: JobId) (body: JobEventBody) - : Task = + : Task = let payload: JobEventPayload = { Kind = JobEventBody.kind body @@ -58,7 +80,20 @@ module internal EnvelopeOut = |> Envelope.withJobId jobId |> Envelope.withSessionId sid - pushEnvelope sessions sid env true + pushEnvelopeSeq sessions sid env true + + let pushJobEvent + (sessions: ConcurrentDictionary) + (timeProvider: TimeProvider) + (sid: SessionId) + (jobId: JobId) + (body: JobEventBody) + : Task = + task { + let! _ = pushJobEventSeq sessions timeProvider sid jobId body + return () + } + :> Task let pushJobResult (sessions: ConcurrentDictionary) diff --git a/src/Arcp.Runtime/Internal/JobLauncher.fs b/src/Arcp.Runtime/Internal/JobLauncher.fs index 629b20e..fc73847 100644 --- a/src/Arcp.Runtime/Internal/JobLauncher.fs +++ b/src/Arcp.Runtime/Internal/JobLauncher.fs @@ -22,13 +22,30 @@ module internal JobLauncher = Summary = None } - let private buildCancelled () : JobResultPayload = + /// Terminal `job.result` for a streamed result (§8.4): carries + /// `result_id` and omits the inline result. + let private buildStreamedSuccess (resultId: string) : JobResultPayload = { - FinalStatus = JobStatus.Cancelled + FinalStatus = JobStatus.Success Result = None - ResultId = None + ResultId = Some resultId ResultSize = None - Summary = Some "cancelled" + Summary = None + } + + /// True when the handler returned a meaningful inline result (i.e. + /// not JSON `null`/`undefined`). + let private hasInlineResult (result: JsonElement) : bool = + result.ValueKind <> JsonValueKind.Null + && result.ValueKind <> JsonValueKind.Undefined + + let private buildCancelled () : JobErrorPayload = + { + FinalStatus = JobStatus.Cancelled + Code = "CANCELLED" + Message = "Job cancelled" + Retryable = false + Details = None } let private buildError (e: ARCPError) : JobErrorPayload = @@ -62,21 +79,22 @@ module internal JobLauncher = let emit (body: JobEventBody) : Task = jobs.EmitEventAsync(record, body) - let rotateCredential (credentialId: string, newValue: string, ct: CancellationToken) : Task = + let rotateCredential (credentialId: string, newValue: string, _ct: CancellationToken) : Task = task { - let message = - Json.serialize - {| - id = credentialId - value = newValue - |} - - do! emit (JobEventBody.Status(StatusPhases.CredentialRotated, Some message)) - do! credentialRegistry.RevokeCredentialAsync(credentialId, ct) + // §14: the new value goes only to the submitting session; + // subscribers receive a redacted status (id only). + do! jobs.EmitCredentialRotatedAsync(record, credentialId, newValue) + // §9.8.2 / #107: the credential id stays outstanding so the + // rotated (new) value is revoked at job termination. We do not + // erase the registry entry here (which would orphan the new + // value), nor revoke by id (which would invalidate it). } :> Task - let beginStream () : ResultId = ResultId.newId () + let beginStream () : ResultId = + let id = ResultId.newId () + record.StreamResultId <- Some id.Value + id let context = JobContext( @@ -102,16 +120,32 @@ module internal JobLauncher = task { try let! result = handler context - do! jobs.EmitResultAsync(record, buildSuccess result) + + match record.StreamResultId with + | Some resultId when hasInlineResult result -> + // §8.4: mixing inline result and result_chunk is + // forbidden. + do! + jobs.EmitErrorAsync( + record, + buildInternal ( + InvalidOperationException( + "Agent returned an inline result after streaming result_chunk events; mixing is forbidden (§8.4)." + ) + ) + ) + | Some resultId -> do! jobs.EmitResultAsync(record, buildStreamedSuccess resultId) + | None -> do! jobs.EmitResultAsync(record, buildSuccess result) with - | :? OperationCanceledException -> do! jobs.EmitResultAsync(record, buildCancelled ()) + | :? OperationCanceledException -> do! jobs.EmitErrorAsync(record, buildCancelled ()) | :? ArcpException as ax -> do! jobs.EmitErrorAsync(record, buildError ax.Error) | ex -> do! jobs.EmitErrorAsync(record, buildInternal ex) try do! credentialRegistry.RevokeJobAsync(record.JobId, CancellationToken.None) - with _ -> - () + with ex -> + // §53: surface revocation failures rather than swallowing. + eprintfn "[ARCP] credential revocation failed at job termination for %s: %O" record.JobId.Value ex } :> Task) |> ignore diff --git a/src/Arcp.Runtime/Internal/JobSubmitFlow.fs b/src/Arcp.Runtime/Internal/JobSubmitFlow.fs index 6454670..32e5d0d 100644 --- a/src/Arcp.Runtime/Internal/JobSubmitFlow.fs +++ b/src/Arcp.Runtime/Internal/JobSubmitFlow.fs @@ -13,32 +13,37 @@ open ARCP.Runtime [] module internal JobSubmitFlow = - let private replayAccepted (timeProvider: TimeProvider) (jobs: JobManager) (existing: string) : JobAcceptedPayload = + /// Stable fingerprint of the idempotency-relevant submission + /// parameters (§7.2): agent, input, lease request/constraints, and + /// max_runtime_sec. Used to detect conflicting key reuse. + let private fingerprint (submit: JobSubmitPayload) : string = + let canonical = + Json.serialize + {| + agent = submit.Agent + input = submit.Input + lease_request = submit.LeaseRequest + lease_constraints = submit.LeaseConstraints + max_runtime_sec = submit.MaxRuntimeSec + |} + + use sha = System.Security.Cryptography.SHA256.Create() + + canonical + |> System.Text.Encoding.UTF8.GetBytes + |> sha.ComputeHash + |> Convert.ToHexString + + /// Replay the exact original `job.accepted` payload (§7.2). The + /// payload was captured at first acceptance, so the budget reflects + /// the initially granted counters and credentials are preserved. + let private replayAccepted (jobs: JobManager) (existing: string) : Result = match jobs.TryGet(JobId.ofString existing) with | Some r -> - { - JobId = r.JobId.Value - Lease = r.Lease - LeaseConstraints = r.Constraints - Budget = - if r.Budgets.Snapshot() = Map.empty then - None - else - Some(r.Budgets.Snapshot()) - Credentials = None - AcceptedAt = r.CreatedAt - TraceId = r.TraceId - } - | None -> - { - JobId = existing - Lease = Lease.empty - LeaseConstraints = None - Budget = None - Credentials = None - AcceptedAt = timeProvider.GetUtcNow() - TraceId = None - } + match r.AcceptedPayload with + | Some accepted -> Ok accepted + | None -> Error(ARCPError.JobNotFound existing) + | None -> Error(ARCPError.JobNotFound existing) let private validateConstraints (timeProvider: TimeProvider) @@ -75,15 +80,67 @@ module internal JobSubmitFlow = Details = None } + // §89: skip already-terminal jobs (the terminal gate also + // drops the emit, but avoid the work and revoke entirely). match jobs.TryGet jobId with - | Some r -> - ignore ( + | Some r when not r.TerminalEmitted -> + Task.Run(fun () -> task { - do! jobs.EmitErrorAsync(r, payload) - do! credentialRegistry.RevokeJobAsync(jobId, CancellationToken.None) + try + do! jobs.EmitErrorAsync(r, payload) + do! credentialRegistry.RevokeJobAsync(jobId, CancellationToken.None) + with ex -> + eprintfn "[ARCP] lease-expiry watchdog failed for job %s: %O" jobId.Value ex } - ) - | None -> () + :> Task) + |> ignore + | _ -> () + ) + + w) + + /// Watchdog enforcing `max_runtime_sec` (§7.1). On expiry emits + /// `job.error` with code `TIMEOUT` and `final_status: "timed_out"`, + /// then revokes credentials. Guarded by the terminal gate so it + /// never double-terminates. + let private buildRuntimeWatchdog + (timeProvider: TimeProvider) + (jobs: JobManager) + (credentialRegistry: CredentialRegistry) + (jobId: JobId) + (maxRuntimeSec: int option) + : ExpiryWatchdog option = + maxRuntimeSec + |> Option.filter (fun n -> n > 0) + |> Option.map (fun n -> + let w = new ExpiryWatchdog(timeProvider) + let deadline = timeProvider.GetUtcNow().AddSeconds(float n) + + w.Start( + deadline, + fun () -> + let payload: JobErrorPayload = + { + FinalStatus = JobStatus.TimedOut + Code = "TIMEOUT" + Message = sprintf "Job exceeded max_runtime_sec=%d" n + Retryable = true + Details = None + } + + match jobs.TryGet jobId with + | Some r when not r.TerminalEmitted -> + Task.Run(fun () -> + task { + try + do! jobs.EmitErrorAsync(r, payload) + do! credentialRegistry.RevokeJobAsync(jobId, CancellationToken.None) + with ex -> + eprintfn "[ARCP] runtime watchdog failed for job %s: %O" jobId.Value ex + } + :> Task) + |> ignore + | _ -> () ) w) @@ -95,26 +152,31 @@ module internal JobSubmitFlow = (ct: CancellationToken) : Task> = task { - let ctx: CredentialIssueContext = - { - JobId = record.JobId - Principal = record.Principal - Lease = record.Lease - LeaseConstraints = record.Constraints - ParentJobId = record.ParentJobId |> Option.map JobId.ofString - } - - try - let! credentials = provisioner.IssueAsync(ctx, ct) - - for cred in credentials do - do! registry.Track(record.JobId, cred) - - return Ok credentials - with - | :? ArcpException as ax -> return Error ax.Error - | :? UnauthorizedAccessException as ex -> return Error(ARCPError.PermissionDenied(ex.Message, None)) - | ex -> return Error(ARCPError.InternalError ex.Message) + // §14: never mint credentials for an anonymous principal, + // even if a provisioner is configured. + match record.Principal with + | :? ARCP.Runtime.Auth.AnonymousPrincipal -> return Ok [] + | _ -> + let ctx: CredentialIssueContext = + { + JobId = record.JobId + Principal = record.Principal + Lease = record.Lease + LeaseConstraints = record.Constraints + ParentJobId = record.ParentJobId |> Option.map JobId.ofString + } + + try + let! credentials = provisioner.IssueAsync(ctx, ct) + + for cred in credentials do + do! registry.Track(record.JobId, cred) + + return Ok credentials + with + | :? ArcpException as ax -> return Error ax.Error + | :? UnauthorizedAccessException as ex -> return Error(ARCPError.PermissionDenied(ex.Message, None)) + | ex -> return Error(ARCPError.InternalError ex.Message) } let private sendAccepted @@ -149,126 +211,211 @@ module internal JobSubmitFlow = (ct: CancellationToken) : Task = task { - // Idempotency-key short-circuit. - match submit.IdempotencyKey with - | Some key when (jobs.LookupIdempotencyKey key).IsSome -> - let existing = (jobs.LookupIdempotencyKey key).Value - let accepted = replayAccepted timeProvider jobs existing - do! sendAccepted ctx.Transport ctx.SessionId requestId (JobId.ofString existing) accepted ct + // Idempotency-key short-circuit. Single lookup (#52). + let existingForKey = submit.IdempotencyKey |> Option.bind jobs.LookupIdempotencyKey + + match submit.IdempotencyKey, existingForKey with + | Some key, Some existing -> + let newFingerprint = fingerprint submit + + // §7.2: identical params → replay original job.accepted; + // conflicting params under the same key → DUPLICATE_KEY. + let conflicting = + match jobs.TryGet(JobId.ofString existing) with + | Some r -> + match r.IdempotencyFingerprint with + | Some fp -> fp <> newFingerprint + | None -> false + | None -> false + + if conflicting then + do! EnvelopeOut.respondWithError ctx requestId (ARCPError.DuplicateKey key) ct + else + match replayAccepted jobs existing with + | Ok accepted -> + do! sendAccepted ctx.Transport ctx.SessionId requestId (JobId.ofString existing) accepted ct + | Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct | _ -> - match inventory.Resolve submit.Agent with - | Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct - | Ok(name, version, _) -> - let resolvedAgent = AgentRef.format name (Some version) - let lease = submit.LeaseRequest |> Option.defaultValue Lease.empty + let agentVersionsOn = ctx.NegotiatedFeatures.Contains Features.AgentVersions + + if not agentVersionsOn && submit.Agent.Contains '@' then + // §6.2/§7.5: cannot pin a version without negotiating + // agent_versions. + do! + EnvelopeOut.respondWithError + ctx + requestId + (ARCPError.InvalidRequest("agent_versions not negotiated; bare agent name required", None)) + ct + elif + submit.LeaseConstraints.IsSome + && not (ctx.NegotiatedFeatures.Contains Features.LeaseExpiresAt) + then + // §6.2/§114: cannot use lease_constraints.expires_at + // without negotiating lease_expires_at. + do! + EnvelopeOut.respondWithError + ctx + requestId + (ARCPError.InvalidRequest("lease_expires_at not negotiated", None)) + ct + else - match validateConstraints timeProvider submit.LeaseConstraints with + match inventory.Resolve submit.Agent with | Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct - | Ok constraints -> - let jobId = JobId.newId () - - // Claim the idempotency key first so a duplicate - // submission short-circuits before any side effects - // (record registration, watchdog start, provisioner - // call). Without this, two concurrent submits with - // the same key both fell through and created jobs. - let claimResult = - match submit.IdempotencyKey with - | Some key -> jobs.TryClaimIdempotencyKey(key, jobId) - | None -> Ok() - - match claimResult with - | Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct - | Ok() -> - let budgets = BudgetCounters() - budgets.SetInitial(Lease.initialBudgets lease) - let cts = new CancellationTokenSource() - let watchdog = buildWatchdog timeProvider jobs credentialRegistry jobId constraints - - let record: JobRecord = - { - JobId = jobId - SessionId = ctx.SessionId - Principal = ctx.Principal - Agent = resolvedAgent - Input = submit.Input - Lease = lease - Constraints = constraints - Credentials = [] - Budgets = budgets - ParentJobId = None - TraceId = traceIdOpt - CreatedAt = timeProvider.GetUtcNow() - Cancellation = cts - Watchdog = watchdog - Status = JobStatus.Pending - LastEventSeq = 0L - } - - jobs.Register record - let! issued = issueCredentialsAsync provisioner credentialRegistry record ct - - match issued with - | Error err -> - // Acceptance failed after registration — - // unwind state so the failed job does not - // surface in list/get and the idempotency - // key is free for a retry. - jobs.Unregister jobId + | Ok(name, version, _) -> + // The handler is always keyed name@version; the agent + // surfaced on the wire omits the version when the feature + // wasn't negotiated (§6.2). + let handlerKey = AgentRef.format name (Some version) - match submit.IdempotencyKey with - | Some key -> jobs.ReleaseIdempotencyKey(key, jobId) - | None -> () + let resolvedAgent = if agentVersionsOn then handlerKey else name - watchdog |> Option.iter (fun w -> (w :> IDisposable).Dispose()) + let lease = submit.LeaseRequest |> Option.defaultValue Lease.empty - try - cts.Cancel() - with _ -> - () + match validateConstraints timeProvider submit.LeaseConstraints with + | Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct + | Ok constraints -> + let jobId = JobId.newId () - try - cts.Dispose() - with _ -> - () + // Claim the idempotency key first so a duplicate + // submission short-circuits before any side effects + // (record registration, watchdog start, provisioner + // call). Without this, two concurrent submits with + // the same key both fell through and created jobs. + let claimResult = + match submit.IdempotencyKey with + | Some key -> jobs.TryClaimIdempotencyKey(key, jobId) + | None -> Ok() - try - do! credentialRegistry.RevokeJobAsync(jobId, ct) - with _ -> - () + match claimResult with + | Error err -> do! EnvelopeOut.respondWithError ctx requestId err ct + | Ok() -> + let budgets = BudgetCounters() - do! EnvelopeOut.respondWithError ctx requestId err ct - | Ok credentials -> - record.Credentials <- credentials + // §114: only track budget counters when cost.budget + // was negotiated. + if ctx.NegotiatedFeatures.Contains Features.CostBudget then + budgets.SetInitial(Lease.initialBudgets lease) - let initialBudget = - if budgets.Snapshot() = Map.empty then - None - else - Some(budgets.Snapshot()) + let cts = new CancellationTokenSource() + let watchdog = buildWatchdog timeProvider jobs credentialRegistry jobId constraints + + let runtimeWatchdog = + buildRuntimeWatchdog timeProvider jobs credentialRegistry jobId submit.MaxRuntimeSec - let accepted: JobAcceptedPayload = + let record: JobRecord = { - JobId = jobId.Value + JobId = jobId + SessionId = ctx.SessionId + Principal = ctx.Principal + Agent = resolvedAgent + Input = submit.Input Lease = lease - LeaseConstraints = constraints - Budget = initialBudget - Credentials = if List.isEmpty credentials then None else Some credentials - AcceptedAt = record.CreatedAt + Constraints = constraints + Credentials = [] + Budgets = budgets + ParentJobId = None TraceId = traceIdOpt + CreatedAt = timeProvider.GetUtcNow() + Cancellation = cts + Watchdog = watchdog + RuntimeWatchdog = runtimeWatchdog + TerminalEmitted = false + AcceptedPayload = None + Status = JobStatus.Pending + LastEventSeq = 0L + StreamResultId = None + IdempotencyFingerprint = + submit.IdempotencyKey |> Option.map (fun _ -> fingerprint submit) + IdempotencyKey = submit.IdempotencyKey + TerminatedAt = None } - do! sendAccepted ctx.Transport ctx.SessionId requestId jobId accepted ct - - match agentHandlers.TryGetValue resolvedAgent with - | true, handler -> - JobLauncher.launch jobs credentialRegistry timeProvider record handler - | _ -> - do! - EnvelopeOut.respondWithError - ctx - requestId - (ARCPError.AgentNotAvailable resolvedAgent) - ct + jobs.Register record + + // Unwind all acceptance side effects so a failed + // acceptance leaves no record, frees the idempotency + // key, stops timers, and revokes any credentials. + let unwind () : Task = + task { + jobs.Unregister jobId + + match submit.IdempotencyKey with + | Some key -> jobs.ReleaseIdempotencyKey(key, jobId) + | None -> () + + watchdog |> Option.iter (fun w -> (w :> IDisposable).Dispose()) + runtimeWatchdog |> Option.iter (fun w -> (w :> IDisposable).Dispose()) + + try + cts.Cancel() + with _ -> + () + + try + cts.Dispose() + with _ -> + () + + try + do! credentialRegistry.RevokeJobAsync(jobId, ct) + with _ -> + () + } + :> Task + + // §114: only issue provisioned credentials when the + // feature was negotiated. + let! issued = + if ctx.NegotiatedFeatures.Contains Features.ProvisionedCredentials then + issueCredentialsAsync provisioner credentialRegistry record ct + else + Task.FromResult(Ok []) + + match issued with + | Error err -> + do! unwind () + do! EnvelopeOut.respondWithError ctx requestId err ct + | Ok credentials -> + // §47: resolve the handler BEFORE accepting so a + // missing handler does not produce both a + // job.accepted and an error, nor leak the record. + match agentHandlers.TryGetValue handlerKey with + | false, _ -> + do! unwind () + + do! + EnvelopeOut.respondWithError + ctx + requestId + (ARCPError.AgentNotAvailable resolvedAgent) + ct + | true, handler -> + record.Credentials <- credentials + + let initialBudget = + if budgets.Snapshot() = Map.empty then + None + else + Some(budgets.Snapshot()) + + let accepted: JobAcceptedPayload = + { + JobId = jobId.Value + Lease = lease + LeaseConstraints = constraints + Budget = initialBudget + Credentials = + if List.isEmpty credentials then None else Some credentials + AcceptedAt = record.CreatedAt + TraceId = traceIdOpt + } + + // Capture for verbatim idempotent replay (§7.2). + record.AcceptedPayload <- Some accepted + + do! sendAccepted ctx.Transport ctx.SessionId requestId jobId accepted ct + JobLauncher.launch jobs credentialRegistry timeProvider record handler } :> Task diff --git a/src/Arcp.Runtime/Internal/SubscriptionFanout.fs b/src/Arcp.Runtime/Internal/SubscriptionFanout.fs index 85e9768..eb130b2 100644 --- a/src/Arcp.Runtime/Internal/SubscriptionFanout.fs +++ b/src/Arcp.Runtime/Internal/SubscriptionFanout.fs @@ -12,7 +12,6 @@ open ARCP.Core /// what; the seq remap happens at emit time inside the runtime. type internal SubscriptionFanout() = let byJob = ConcurrentDictionary>() - let byPrincipal = ConcurrentDictionary>() let lockObj = obj () /// Register `sessionId` as a subscriber of `jobId`. diff --git a/src/Arcp.Runtime/JobContext.fs b/src/Arcp.Runtime/JobContext.fs index 310a212..c00d5ff 100644 --- a/src/Arcp.Runtime/JobContext.fs +++ b/src/Arcp.Runtime/JobContext.fs @@ -32,6 +32,12 @@ type JobContext streamResultBegin: unit -> ResultId, onCostMetric: string * decimal -> unit ) = + // Per-result_id chunk ordering state (§8.4): next expected chunk_seq + // and whether the stream was closed by a `more=false` chunk. + let chunkNext = System.Collections.Generic.Dictionary() + let chunkClosed = System.Collections.Generic.HashSet() + let chunkLock = obj () + member _.JobId: JobId = jobId member _.SessionId: SessionId = sessionId member _.ParentJobId: JobId option = parentJobId @@ -63,10 +69,21 @@ type JobContext member _.RotateCredentialAsync(credentialId: string, newValue: string, ct: CancellationToken) : Task = rotateCredential (credentialId, newValue, ct) + /// Emit a `progress` event (§8.2.1). `current` MUST be non-negative + /// (rejected with INVALID_REQUEST otherwise); when `total` is present + /// `current` is clamped to `total` so the wire invariant holds. member _.EmitProgressAsync (current: decimal, total: decimal option, units: string option, message: string option, _ct: CancellationToken) : Task = - emit (JobEventBody.Progress(current, total, units, message)) + if current < 0m then + raise (ArcpException(ARCPError.InvalidRequest("progress.current must be non-negative", None))) + + let clamped = + match total with + | Some t when current > t -> t + | _ -> current + + emit (JobEventBody.Progress(clamped, total, units, message)) /// Emit a `metric` event. Names starting with `cost.` and a /// budgeted `unit` decrement the matching budget counter @@ -80,9 +97,17 @@ type JobContext _ct: CancellationToken ) : Task = if value < 0m then - Task.CompletedTask - else + // §9.6: negative cost metrics are rejected; other negative + // metrics are not governed by §9.6 and still flow through. if name.StartsWith("cost.") then + raise (ArcpException(ARCPError.InvalidRequest("cost metric value must be non-negative", None))) + else + emit (JobEventBody.Metric(name, value, unit, dimensions)) + else + // §9.6: `cost.budget.*` is budget telemetry (e.g. + // `cost.budget.remaining`), not a charge — it must not + // decrement the counter. Only genuine `cost.*` spend metrics do. + if name.StartsWith("cost.") && not (name.StartsWith("cost.budget.")) then match unit with | Some u -> onCostMetric (u, value) | None -> () @@ -94,7 +119,22 @@ type JobContext : Task = emit (JobEventBody.ArtifactRef(uri, contentType, byteSize, sha256)) - member _.EmitDelegateAsync(body: DelegateBody, _ct: CancellationToken) : Task = emit (JobEventBody.Delegate body) + /// Emit a `delegate` event after validating that the child lease is + /// a strict subset of this job's lease (spec §9.4). A child lease + /// that names an uncovered capability, exceeds the parent's + /// remaining budget, or extends `expires_at` beyond the parent's is + /// rejected with `LEASE_SUBSET_VIOLATION` before any event is emitted. + member _.EmitDelegateAsync(body: DelegateBody, _ct: CancellationToken) : Task = + match + Lease.isSubset + body.Lease + lease + (budgets.Snapshot()) + (constraints |> Option.map (fun c -> c.ExpiresAt)) + (body.LeaseConstraints |> Option.map (fun c -> c.ExpiresAt)) + with + | Ok() -> emit (JobEventBody.Delegate body) + | Error err -> raise (ArcpException err) member _.EmitVendorEventAsync(kind: string, body: JsonElement, _ct: CancellationToken) : Task = if not (kind.StartsWith("x-vendor.", StringComparison.Ordinal)) then @@ -117,6 +157,42 @@ type JobContext more: bool, _ct: CancellationToken ) : Task = + // §8.4: chunk_seq is 0-based monotonic per result_id and chunks + // MUST be emitted in order; nothing may follow a `more=false` + // chunk. Enforce both before anything reaches the wire. + lock chunkLock (fun () -> + if chunkClosed.Contains resultId.Value then + raise ( + ArcpException( + ARCPError.InternalError( + sprintf "result_id %s already completed; no further chunks allowed" resultId.Value + ) + ) + ) + + let expected = + match chunkNext.TryGetValue resultId.Value with + | true, n -> n + | _ -> 0L + + if chunkSeq <> expected then + raise ( + ArcpException( + ARCPError.InternalError( + sprintf + "out-of-order chunk_seq %d (expected %d) for result_id %s" + chunkSeq + expected + resultId.Value + ) + ) + ) + + chunkNext.[resultId.Value] <- expected + 1L + + if not more then + chunkClosed.Add resultId.Value |> ignore) + let encoded = match encoding with | ChunkEncoding.Utf8 -> Encoding.UTF8.GetString(data.Span) diff --git a/src/Arcp.Runtime/JobManager.fs b/src/Arcp.Runtime/JobManager.fs index f6bfe3d..12bb71b 100644 --- a/src/Arcp.Runtime/JobManager.fs +++ b/src/Arcp.Runtime/JobManager.fs @@ -31,8 +31,30 @@ type internal JobRecord = CreatedAt: DateTimeOffset Cancellation: CancellationTokenSource Watchdog: ExpiryWatchdog option + /// `max_runtime_sec` watchdog (§7.1); fires TIMEOUT. + RuntimeWatchdog: ExpiryWatchdog option + /// Set once a terminal message has been emitted so post-terminal + /// emissions are dropped (§7.3, §9.5). + mutable TerminalEmitted: bool + /// The exact `job.accepted` payload sent at acceptance, replayed + /// verbatim for idempotent resubmits (§7.2). + mutable AcceptedPayload: JobAcceptedPayload option mutable Status: JobStatus mutable LastEventSeq: int64 + /// Set when the agent began a streamed result via + /// `BeginStreamingResult` (§8.4); the terminating `job.result` + /// MUST carry this `result_id` and omit the inline result. + mutable StreamResultId: string option + /// Fingerprint of the original submission parameters for an + /// idempotency-keyed job (§7.2). A replay with a different + /// fingerprint is a conflicting reuse → `DUPLICATE_KEY`. + IdempotencyFingerprint: string option + /// Idempotency key claimed by this job, if any. Used to release + /// the claim when the terminal record is evicted. + IdempotencyKey: string option + /// When the job reached a terminal state. Drives retention-based + /// eviction so terminal records do not accumulate forever. + mutable TerminatedAt: DateTimeOffset option } /// Adapter that lets `JobManager` push a `job.event` (or `job.result`, @@ -43,6 +65,10 @@ type internal IJobOutbox = abstract member EmitJobEventAsync: record: JobRecord * body: JobEventBody -> Task abstract member EmitJobResultAsync: record: JobRecord * payload: JobResultPayload -> Task abstract member EmitJobErrorAsync: record: JobRecord * payload: JobErrorPayload -> Task + /// Emit a `credential_rotated` status. The submitting session + /// receives the new credential `value`; subscribers receive a + /// redacted body (id only) per §14/§9.8.2. + abstract member EmitCredentialRotatedAsync: record: JobRecord * credentialId: string * newValue: string -> Task /// Tracks every running and terminated job for the runtime. type internal JobManager(timeProvider: TimeProvider, outbox: IJobOutbox) = @@ -54,6 +80,17 @@ type internal JobManager(timeProvider: TimeProvider, outbox: IJobOutbox) = member _.Register(record: JobRecord) : unit = byId.[record.JobId.Value] <- record + /// Claim the single terminal emission for `record`. Returns true for + /// exactly one caller; subsequent callers get false and must not + /// emit a second contradictory terminal message (§7.3, §9.5). + member _.TryClaimTerminal(record: JobRecord) : bool = + lock record (fun () -> + if record.TerminalEmitted then + false + else + record.TerminalEmitted <- true + true) + member _.TryGet(jobId: JobId) : JobRecord option = match byId.TryGetValue jobId.Value with | true, r -> Some r @@ -89,19 +126,58 @@ type internal JobManager(timeProvider: TimeProvider, outbox: IJobOutbox) = member _.Unregister(jobId: JobId) : unit = byId.TryRemove(jobId.Value) |> ignore /// Mark `jobId` as terminated. Subsequent emit attempts on this - /// id are dropped. + /// id are dropped. Disposes the watchdog and cancellation source + /// and clears any retained credential values (§14); the record is + /// evicted from `byId` later by `EvictTerminated`. member this.Terminate(jobId: JobId, status: JobStatus) : unit = match this.TryGet jobId with | Some r -> r.Status <- status - r.Watchdog |> Option.iter (fun w -> w.Stop()) + + for w in [ r.Watchdog; r.RuntimeWatchdog ] do + w + |> Option.iter (fun w -> + try + (w :> IDisposable).Dispose() + with _ -> + ()) try r.Cancellation.Cancel() with _ -> () + + try + r.Cancellation.Dispose() + with _ -> + () + + // §14: do not retain credential secrets past termination. + r.Credentials <- [] + r.TerminatedAt <- Some(timeProvider.GetUtcNow()) | _ -> () + /// Evict terminal records whose termination is older than + /// `retentionCutoff`, releasing their idempotency-key claims. Keeps + /// `byId` bounded under sustained churn while leaving recent + /// terminals visible to `list_jobs`/replay. + member _.EvictTerminated(retentionCutoff: DateTimeOffset) : int = + let mutable removed = 0 + + for kv in byId do + match kv.Value.TerminatedAt with + | Some t when t < retentionCutoff -> + if byId.TryRemove kv.Key |> fst then + removed <- removed + 1 + + kv.Value.IdempotencyKey + |> Option.iter (fun key -> + let pair = KeyValuePair(key, kv.Key) + (idempotency :> ICollection>).Remove pair |> ignore) + | _ -> () + + removed + /// Snapshot the record into a `JobSummary` shape for §6.6 /// `session.list_jobs` responses. member _.ToSummary(r: JobRecord) : JobSummary = @@ -124,3 +200,6 @@ type internal JobManager(timeProvider: TimeProvider, outbox: IJobOutbox) = member this.EmitErrorAsync(record: JobRecord, payload: JobErrorPayload) : Task = outbox.EmitJobErrorAsync(record, payload) + + member this.EmitCredentialRotatedAsync(record: JobRecord, credentialId: string, newValue: string) : Task = + outbox.EmitCredentialRotatedAsync(record, credentialId, newValue) diff --git a/src/Arcp.Runtime/SessionContext.fs b/src/Arcp.Runtime/SessionContext.fs index 5aec1a2..3eb5c56 100644 --- a/src/Arcp.Runtime/SessionContext.fs +++ b/src/Arcp.Runtime/SessionContext.fs @@ -22,10 +22,14 @@ type internal ServerSessionContext = Principal: IPrincipal NegotiatedFeatures: Set HeartbeatIntervalSec: int option - ResumeToken: string + mutable ResumeToken: string ResumeWindowSec: int - Transport: ITransport + mutable Transport: ITransport EventLog: EventLog + /// Serializes event_seq assignment with the matching transport + /// send so concurrent emitters cannot interleave out of order + /// (spec §8.3). + SendGate: System.Threading.SemaphoreSlim mutable LastAckedSeq: int64 mutable LastInboundAt: DateTimeOffset } @@ -52,6 +56,7 @@ module internal ServerSessionContext = ResumeWindowSec = resumeWindow Transport = transport EventLog = log + SendGate = new System.Threading.SemaphoreSlim(1, 1) LastAckedSeq = 0L LastInboundAt = now } diff --git a/src/Arcp.Runtime/Store/EventLog.fs b/src/Arcp.Runtime/Store/EventLog.fs index bead978..231c166 100644 --- a/src/Arcp.Runtime/Store/EventLog.fs +++ b/src/Arcp.Runtime/Store/EventLog.fs @@ -110,20 +110,48 @@ type internal EventLog(options: EventLogOptions) = perSession.TryRemove(sessionId.Value) |> ignore seqCounters.TryRemove(sessionId.Value) |> ignore - /// Age out entries whose timestamp is older than the resume - /// window. Caller invokes periodically. - member _.EvictExpired() : int = + /// Age out entries older than the resume window, but never an + /// entry the session has not yet acknowledged (spec §6.5): an + /// entry is only evicted when it is both older than the window and + /// `EventSeq <= lastAcked` for its session. Buffer-count limits are + /// enforced separately at `Append`. Caller invokes periodically. + member _.EvictExpired(lastAckedBySession: string -> int64) : int = let now = options.TimeProvider.GetUtcNow() let cutoff = now.AddSeconds(-float options.ResumeWindowSec) - let evictOne (queue: Queue) : int = + let evictOne (key: string) (queue: Queue) : int = + let lastAcked = lastAckedBySession key + lock queue (fun () -> let mutable removed = 0 - while queue.Count > 0 && (queue.Peek()).Timestamp < cutoff do + while queue.Count > 0 + && (queue.Peek()).Timestamp < cutoff + && (queue.Peek()).EventSeq <= lastAcked do queue.Dequeue() |> ignore removed <- removed + 1 removed) - perSession |> Seq.sumBy (fun kvp -> evictOne kvp.Value) + perSession |> Seq.sumBy (fun kvp -> evictOne kvp.Key kvp.Value) + + /// Age out entries older than the resume window irrespective of ack + /// state. Used for sessions that have permanently disconnected and + /// can no longer acknowledge. + member this.EvictExpired() : int = + this.EvictExpired(fun _ -> Int64.MaxValue) + + /// Drop the buffer and seq counter for any session whose buffer is + /// empty and which `isActive` reports as gone. Releases the per- + /// session dictionary entries so memory does not grow with the + /// historical session count. + member _.PruneEmpty(isActive: string -> bool) : unit = + for kvp in perSession do + let key = kvp.Key + + if not (isActive key) then + let empty = lock kvp.Value (fun () -> kvp.Value.Count = 0) + + if empty then + perSession.TryRemove key |> ignore + seqCounters.TryRemove key |> ignore diff --git a/tests/Arcp.IntegrationTests/Arcp.IntegrationTests.fsproj b/tests/Arcp.IntegrationTests/Arcp.IntegrationTests.fsproj index 6e807e1..df5271d 100644 --- a/tests/Arcp.IntegrationTests/Arcp.IntegrationTests.fsproj +++ b/tests/Arcp.IntegrationTests/Arcp.IntegrationTests.fsproj @@ -15,6 +15,7 @@ + diff --git a/tests/Arcp.IntegrationTests/AuthAndAcceptanceTests.fs b/tests/Arcp.IntegrationTests/AuthAndAcceptanceTests.fs index 40dd251..216d405 100644 --- a/tests/Arcp.IntegrationTests/AuthAndAcceptanceTests.fs +++ b/tests/Arcp.IntegrationTests/AuthAndAcceptanceTests.fs @@ -35,7 +35,7 @@ let private connectClient (server: ArcpServer) (auth: AuthScheme) : Task] let ``provisioner failure unwinds the job — list_jobs returns nothing`` () = diff --git a/tests/Arcp.IntegrationTests/Harness.fs b/tests/Arcp.IntegrationTests/Harness.fs index 98d668e..9f1f84a 100644 --- a/tests/Arcp.IntegrationTests/Harness.fs +++ b/tests/Arcp.IntegrationTests/Harness.fs @@ -31,7 +31,7 @@ let connectWithOptions } |> configureOptions - let server = ArcpServer(serverOptions) + let server = new ArcpServer(serverOptions) configure server let clientT, serverT = MemoryTransport.CreatePair() let serverTask = server.HandleSessionAsync(serverT, cts.Token) diff --git a/tests/Arcp.IntegrationTests/IdempotencyAndCancelTests.fs b/tests/Arcp.IntegrationTests/IdempotencyAndCancelTests.fs index 874d4e6..f74163e 100644 --- a/tests/Arcp.IntegrationTests/IdempotencyAndCancelTests.fs +++ b/tests/Arcp.IntegrationTests/IdempotencyAndCancelTests.fs @@ -57,10 +57,12 @@ let ``list_jobs filter by agent narrows the set`` () = let! _ = p.Client.SubmitAsync(mkRequest "b", CancellationToken.None) let! _ = p.Client.SubmitAsync(mkRequest "b", CancellationToken.None) + // agent_versions not negotiated -> agents are stored bare; the + // bare-name filter matches (§79, §108). let filter: JobListFilter = { Status = None - Agent = Some "a@default" + Agent = Some "a" CreatedAfter = None } diff --git a/tests/Arcp.IntegrationTests/JobLifecycleTests.fs b/tests/Arcp.IntegrationTests/JobLifecycleTests.fs index dfc97e8..f90296b 100644 --- a/tests/Arcp.IntegrationTests/JobLifecycleTests.fs +++ b/tests/Arcp.IntegrationTests/JobLifecycleTests.fs @@ -48,8 +48,10 @@ let ``cancel transitions a job to Cancelled`` () = let! _ = handle.CancelAsync(Some "test", CancellationToken.None) let! r = handle.Result + // §7.4: cancellation terminates with job.error(CANCELLED). match r with - | Ok rp -> rp.FinalStatus |> should equal JobStatus.Cancelled + | Ok rp -> failwithf "expected CANCELLED error, got result %A" rp + | Error(ARCPError.Cancelled _) -> () | Error e -> failwithf "expected Cancelled, got %A" e do! teardown p diff --git a/tests/Arcp.IntegrationTests/ProvisionedCredentialsTests.fs b/tests/Arcp.IntegrationTests/ProvisionedCredentialsTests.fs index 006e800..fe46b8f 100644 --- a/tests/Arcp.IntegrationTests/ProvisionedCredentialsTests.fs +++ b/tests/Arcp.IntegrationTests/ProvisionedCredentialsTests.fs @@ -37,7 +37,7 @@ type private FakeProvisioner() = member _.RevokeAsync(credentialId, _ct) = revocations.Add credentialId - Task.FromResult true + Task.FromResult RevocationOutcome.Revoked let private withProvisioner (fake: FakeProvisioner) (options: ArcpServerOptions) = { options with diff --git a/tests/Arcp.IntegrationTests/ResumeTests.fs b/tests/Arcp.IntegrationTests/ResumeTests.fs new file mode 100644 index 0000000..b1ea3d5 --- /dev/null +++ b/tests/Arcp.IntegrationTests/ResumeTests.fs @@ -0,0 +1,201 @@ +module ARCP.IntegrationTests.ResumeTests + +open System.Threading +open System.Threading.Tasks +open Xunit +open FsUnit.Xunit +open ARCP.Core +open ARCP.Client +open ARCP.Client.Transport +open ARCP.Runtime +open ARCP.Runtime.Auth + +/// Server-level resume tests (spec §6.3). These drive raw envelopes so +/// the `session.resume` flow can be exercised end-to-end. + +let private hello: SessionHelloPayload = + { + Client = { Name = "t"; Version = "1" } + Auth = + { + Scheme = "bearer" + Token = Some "tok" + } + Capabilities = + { + Encodings = [ "json" ] + Features = Features.All + } + } + +let private send (t: ITransport) (msg: Message) (sid: string option) : Task = + let env = Codec.toEnvelope msg + + let env = + match sid with + | Some s -> { env with SessionId = Some s } + | None -> env + + t.SendAsync(env, CancellationToken.None) + +/// Drain envelopes until `stopWhen` matches one or the timeout elapses. +let private drain (t: ITransport) (stopWhen: Envelope -> bool) (timeoutMs: int) : Task = + task { + use cts = new CancellationTokenSource(timeoutMs) + let acc = ResizeArray() + let en = (t.Receive(cts.Token)).GetAsyncEnumerator(cts.Token) + let mutable more = true + + try + while more do + let! has = en.MoveNextAsync().AsTask() + + if has then + acc.Add en.Current + + if stopWhen en.Current then + more <- false + else + more <- false + with _ -> + () + + do! en.DisposeAsync().AsTask() + return List.ofSeq acc + } + +let private emitterServer () : ArcpServer = + let server = + new ArcpServer( + { ArcpServerOptions.defaults with + Features = Features.All + BearerVerifier = DevModeBearerVerifier() + } + ) + + server.RegisterAgent( + "emitter", + fun ctx -> + task { + do! ctx.EmitLogAsync(LogLevel.Info, "e1", ctx.CancellationToken) + do! ctx.EmitLogAsync(LogLevel.Info, "e2", ctx.CancellationToken) + do! ctx.EmitLogAsync(LogLevel.Info, "e3", ctx.CancellationToken) + return Json.serializeToElement "done" + } + ) + + server + +[] +let ``session.resume replays buffered events`` () = + task { + use cts = new CancellationTokenSource() + let server = emitterServer () + + // First connection: handshake, submit, drain through job.result. + let c1, s1 = MemoryTransport.CreatePair() + let st1 = server.HandleSessionAsync(s1, cts.Token) + do! send c1 (Message.SessionHello hello) None + let! welcomeBatch = drain c1 (fun e -> e.Type = "session.welcome") 1000 + let welcomeEnv = welcomeBatch |> List.find (fun e -> e.Type = "session.welcome") + let sid = welcomeEnv.SessionId.Value + + let token = + match Codec.toMessage welcomeEnv with + | Ok(Message.SessionWelcome w) -> w.ResumeToken + | _ -> failwith "expected welcome" + + do! + send + c1 + (Message.JobSubmit + { + Agent = "emitter" + Input = Json.serializeToElement 0 + LeaseRequest = None + LeaseConstraints = None + IdempotencyKey = None + MaxRuntimeSec = None + }) + (Some sid) + + let! _firstEvents = drain c1 (fun e -> e.Type = "job.result") 1500 + + // Simulate a transport drop and let the server move the session + // into the resumable set. + do! c1.CloseAsync CancellationToken.None + do! st1 + + // Reconnect on a fresh transport and resume from seq 0. + let c2, s2 = MemoryTransport.CreatePair() + let st2 = server.HandleSessionAsync(s2, cts.Token) + + do! + send + c2 + (Message.SessionResume + { + SessionId = sid + ResumeToken = token + LastEventSeq = 0L + }) + None + + let! resumed = drain c2 (fun e -> e.Type = "job.result") 1500 + + resumed + |> List.filter (fun e -> e.Type = "session.welcome") + |> List.isEmpty + |> should equal false + + let events = resumed |> List.filter (fun e -> e.Type = "job.event") + events |> List.length |> should be (greaterThanOrEqualTo 3) + + cts.Cancel() + do! st2 + } + +[] +let ``session.resume with unknown token returns RESUME_WINDOW_EXPIRED`` () = + task { + use cts = new CancellationTokenSource() + let server = emitterServer () + + let c1, s1 = MemoryTransport.CreatePair() + let st1 = server.HandleSessionAsync(s1, cts.Token) + do! send c1 (Message.SessionHello hello) None + let! welcomeBatch = drain c1 (fun e -> e.Type = "session.welcome") 1000 + + let sid = + (welcomeBatch |> List.find (fun e -> e.Type = "session.welcome")).SessionId.Value + + do! c1.CloseAsync CancellationToken.None + do! st1 + + let c2, s2 = MemoryTransport.CreatePair() + let st2 = server.HandleSessionAsync(s2, cts.Token) + + do! + send + c2 + (Message.SessionResume + { + SessionId = sid + ResumeToken = "wrong-token" + LastEventSeq = 0L + }) + None + + let! batch = drain c2 (fun e -> e.Type = "session.error") 1000 + let errEnv = batch |> List.tryFind (fun e -> e.Type = "session.error") + + match errEnv with + | Some e -> + match Codec.toMessage e with + | Ok(Message.SessionError p) -> p.Code |> should equal "RESUME_WINDOW_EXPIRED" + | _ -> failwith "expected session.error" + | None -> failwith "expected session.error envelope" + + cts.Cancel() + do! st2 + } diff --git a/tests/Arcp.UnitTests/Arcp.UnitTests.fsproj b/tests/Arcp.UnitTests/Arcp.UnitTests.fsproj index 22159d0..4f118ed 100644 --- a/tests/Arcp.UnitTests/Arcp.UnitTests.fsproj +++ b/tests/Arcp.UnitTests/Arcp.UnitTests.fsproj @@ -8,6 +8,7 @@ + diff --git a/tests/Arcp.UnitTests/AuthTests.fs b/tests/Arcp.UnitTests/AuthTests.fs index 07d6bc2..bc0d73d 100644 --- a/tests/Arcp.UnitTests/AuthTests.fs +++ b/tests/Arcp.UnitTests/AuthTests.fs @@ -51,10 +51,15 @@ let ``AlwaysDenyVerifier always returns Unauthenticated`` () = [] let ``AnonymousPrincipal id is anonymous`` () = + // §110: each anonymous principal gets a unique id so distinct + // anonymous connections are distinct principals. let p = AnonymousPrincipal() :> IPrincipal - p.Id |> should equal "anonymous" + p.Id |> should startWith "anon:" p.Labels.IsEmpty |> should equal true + let p2 = AnonymousPrincipal() :> IPrincipal + p.Id |> should not' (equal p2.Id) + [] let ``StringPrincipal with labels exposes them`` () = let labels = Map.ofList [ "team", "platform" ] diff --git a/tests/Arcp.UnitTests/CodecMessageRoundTripTests.fs b/tests/Arcp.UnitTests/CodecMessageRoundTripTests.fs index 3a11ae6..172d905 100644 --- a/tests/Arcp.UnitTests/CodecMessageRoundTripTests.fs +++ b/tests/Arcp.UnitTests/CodecMessageRoundTripTests.fs @@ -49,7 +49,9 @@ let private jobs = NextCursor = None } -let private bye = Message.SessionBye { Reason = Some "x" } +let private close = Message.SessionClose { Reason = Some "x" } +let private closed = Message.SessionClosed { Reason = Some "x" } +let private cancelled = Message.JobCancelled { JobId = "j" } let private sessionError = Message.SessionError @@ -132,7 +134,9 @@ let private unsubscribe = Message.JobUnsubscribe { JobId = "j" } [] [] [] -[] +[] +[] +[] [] [] [] @@ -148,7 +152,9 @@ let ``every message type round-trips through Codec`` (which: string) = | "welcome" -> welcome | "list_jobs" -> listJobs | "jobs" -> jobs - | "bye" -> bye + | "close" -> close + | "closed" -> closed + | "cancelled" -> cancelled | "session_error" -> sessionError | "submit" -> submit | "accepted" -> accepted diff --git a/tests/Arcp.UnitTests/CodecTests.fs b/tests/Arcp.UnitTests/CodecTests.fs index 417b754..8fa2c62 100644 --- a/tests/Arcp.UnitTests/CodecTests.fs +++ b/tests/Arcp.UnitTests/CodecTests.fs @@ -33,7 +33,6 @@ let ``session.hello roundtrips`` () = Encodings = [ "json" ] Features = Features.All } - Resume = None } match rt (Message.SessionHello hello) with diff --git a/tests/Arcp.UnitTests/ErrorsTests.fs b/tests/Arcp.UnitTests/ErrorsTests.fs index 987709d..93932f3 100644 --- a/tests/Arcp.UnitTests/ErrorsTests.fs +++ b/tests/Arcp.UnitTests/ErrorsTests.fs @@ -113,10 +113,15 @@ let ``Result.unwrapOrThrow throws ArcpException for Error`` () = let err = ARCPError.JobNotFound "j" let ex = - Assert.Throws(fun () -> Result.unwrapOrThrow (Error err) |> ignore) + Assert.Throws(fun () -> ArcpResult.unwrapOrThrow (Error err) |> ignore) ex.Error |> should equal err [] -let ``Result.unwrapOrThrow returns value for Ok`` () = - Result.unwrapOrThrow (Ok 42) |> should equal 42 +let ``ArcpResult.unwrapOrThrow returns value for Ok`` () = + ArcpResult.unwrapOrThrow (Ok 42) |> should equal 42 + +[] +let ``FSharp.Core Result is not shadowed after open ARCP.Core`` () = + // §118: opening ARCP.Core must not hide FSharp.Core's Result. + Result.map (fun x -> x + 1) (Ok 1) |> should equal (Ok 2) diff --git a/tests/Arcp.UnitTests/JobContextTests.fs b/tests/Arcp.UnitTests/JobContextTests.fs index 78ddd67..b0c3a37 100644 --- a/tests/Arcp.UnitTests/JobContextTests.fs +++ b/tests/Arcp.UnitTests/JobContextTests.fs @@ -90,12 +90,26 @@ let ``EmitMetric with positive cost.* decrements budget tracking`` () = costs.[0] |> should equal ("USD", 0.25m) [] -let ``EmitMetric with negative value is silently dropped`` () = +let ``EmitMetric negative cost value is rejected`` () = + // §86: negative cost.* metrics raise INVALID_REQUEST. let ctx, emitted, _, costs, _ = mkContext Lease.empty - ctx.EmitMetricAsync("anything", -1m, None, None, CancellationToken.None).Wait() + + (fun () -> + ctx.EmitMetricAsync("cost.inference", -0.01m, Some "USD", None, CancellationToken.None) + |> ignore) + |> should throw typeof + emitted.Count |> should equal 0 costs.Count |> should equal 0 +[] +let ``EmitMetric negative non-cost value still emits`` () = + // §86: §9.6 only governs cost metrics; negative non-cost metrics flow. + let ctx, emitted, _, costs, _ = mkContext Lease.empty + ctx.EmitMetricAsync("latency.ms", -5m, Some "ms", None, CancellationToken.None).Wait() + emitted.Count |> should equal 1 + costs.Count |> should equal 0 + [] let ``EmitMetric non-cost name does not touch budget`` () = let ctx, emitted, _, costs, _ = mkContext Lease.empty diff --git a/tests/Arcp.UnitTests/JobErrorMapperTests.fs b/tests/Arcp.UnitTests/JobErrorMapperTests.fs index a275145..961ee91 100644 --- a/tests/Arcp.UnitTests/JobErrorMapperTests.fs +++ b/tests/Arcp.UnitTests/JobErrorMapperTests.fs @@ -26,12 +26,19 @@ let ``ofWire round-trips canonical code`` (code: string) = ARCPError.code err |> should equal code [] -let ``ofWire unknown code falls back to InternalError`` () = +let ``ofWire unknown code maps to Unknown preserving code`` () = + // §90: unknown codes round-trip via the Unknown arm (not InternalError). let err = JobErrorMapper.ofWire "FUTURE_CODE" "msg" None "" match err with - | ARCPError.InternalError _ -> () - | other -> failwithf "expected InternalError, got %A" other + | ARCPError.Unknown("FUTURE_CODE", "msg", false) -> () + | other -> failwithf "expected Unknown, got %A" other + +[] +let ``ofWireWith honors wire retryable for unknown code`` () = + match JobErrorMapper.ofWireWith "FUTURE_CODE" "msg" None false None with + | ARCPError.Unknown(_, _, r) -> r |> should equal false + | other -> failwithf "expected Unknown, got %A" other [] let ``BUDGET_EXHAUSTED maps from upstream-style error`` () = diff --git a/tests/Arcp.UnitTests/JobManagerTests.fs b/tests/Arcp.UnitTests/JobManagerTests.fs index 0a9254e..576368d 100644 --- a/tests/Arcp.UnitTests/JobManagerTests.fs +++ b/tests/Arcp.UnitTests/JobManagerTests.fs @@ -16,6 +16,7 @@ let private noopOutbox = member _.EmitJobEventAsync(_, _) = Task.CompletedTask member _.EmitJobResultAsync(_, _) = Task.CompletedTask member _.EmitJobErrorAsync(_, _) = Task.CompletedTask + member _.EmitCredentialRotatedAsync(_, _, _) = Task.CompletedTask } let private mkRecord (jobId: JobId) (principalId: string) : JobRecord = @@ -34,8 +35,15 @@ let private mkRecord (jobId: JobId) (principalId: string) : JobRecord = CreatedAt = DateTimeOffset.UnixEpoch Cancellation = new CancellationTokenSource() Watchdog = None + RuntimeWatchdog = None + TerminalEmitted = false + AcceptedPayload = None Status = JobStatus.Pending LastEventSeq = 0L + StreamResultId = None + IdempotencyFingerprint = None + IdempotencyKey = None + TerminatedAt = None } [] diff --git a/tests/Arcp.UnitTests/PendingRegistryTests.fs b/tests/Arcp.UnitTests/PendingRegistryTests.fs index 3a64b31..d8cc3ef 100644 --- a/tests/Arcp.UnitTests/PendingRegistryTests.fs +++ b/tests/Arcp.UnitTests/PendingRegistryTests.fs @@ -31,7 +31,7 @@ let ``Duplicate Register throws`` () = let reg = PendingRegistry() reg.Register "req-1" |> ignore - Assert.Throws(fun () -> reg.Register "req-1" |> ignore) + Assert.Throws(fun () -> reg.Register "req-1" |> ignore) |> ignore [] diff --git a/tests/Arcp.UnitTests/WireFormatGoldenTests.fs b/tests/Arcp.UnitTests/WireFormatGoldenTests.fs new file mode 100644 index 0000000..ed7a9bf --- /dev/null +++ b/tests/Arcp.UnitTests/WireFormatGoldenTests.fs @@ -0,0 +1,187 @@ +module ARCP.UnitTests.WireFormatGoldenTests + +open System.Text.Json +open Xunit +open FsUnit.Xunit +open ARCP.Core + +/// Golden wire-format tests pinning the spec JSON shapes (§6.2, +/// §7.3, §8.2, §8.4). These guard cross-SDK interop: any non-F# +/// peer must be able to read/write these exact shapes. + +let private ts = System.DateTimeOffset.Parse("2026-05-13T19:42:13Z") + +/// Serialize a payload to its JSON object string. +let private payloadJson (v: 'T) = Json.serialize v + +[] +let ``log event body serializes flat with lowercase level`` () = + let payload: JobEventPayload = + { + Kind = "log" + Ts = ts + Body = JobEventBody.Log(LogLevel.Info, "hello") + } + + let json = payloadJson payload + + json + |> should haveSubstring "\"body\":{\"level\":\"info\",\"message\":\"hello\"}" + + json |> should not' (haveSubstring "Log") + json |> should haveSubstring "\"kind\":\"log\"" + +[] +let ``progress event body matches spec example shape`` () = + let payload: JobEventPayload = + { + Kind = "progress" + Ts = ts + Body = JobEventBody.Progress(47m, Some 120m, Some "files", Some "Refactoring") + } + + let json = payloadJson payload + + json + |> should haveSubstring "\"body\":{\"current\":47,\"total\":120,\"units\":\"files\",\"message\":\"Refactoring\"}" + +[] +let ``result_chunk body uses lowercase encoding and snake_case fields`` () = + let payload: JobEventPayload = + { + Kind = "result_chunk" + Ts = ts + Body = JobEventBody.ResultChunk("res_1", 0L, "abc", ChunkEncoding.Utf8, true) + } + + let json = payloadJson payload + + json + |> should + haveSubstring + "\"body\":{\"result_id\":\"res_1\",\"chunk_seq\":0,\"data\":\"abc\",\"encoding\":\"utf8\",\"more\":true}" + +[] +let ``job.result final_status is lowercase`` () = + let payload: JobResultPayload = + { + FinalStatus = JobStatus.Success + Result = None + ResultId = None + ResultSize = None + Summary = None + } + + payloadJson payload |> should haveSubstring "\"final_status\":\"success\"" + +[] +let ``tool_result with error serializes nested error object`` () = + let payload: JobEventPayload = + { + Kind = "tool_result" + Ts = ts + Body = JobEventBody.ToolResult("call_1", ToolOutcome.Error("INTERNAL_ERROR", "boom", true)) + } + + let json = payloadJson payload + json |> should haveSubstring "\"call_id\":\"call_1\"" + + json + |> should haveSubstring "\"error\":{\"code\":\"INTERNAL_ERROR\",\"message\":\"boom\",\"retryable\":true}" + +[] +let ``welcome capabilities agents serialize as plain array (rich)`` () = + let payload: SessionWelcomePayload = + { + Runtime = { Name = "rt"; Version = "1.1.0" } + ResumeToken = "rt_1" + ResumeWindowSec = 600 + HeartbeatIntervalSec = Some 30 + Capabilities = + { + Encodings = [ "json" ] + Features = set [ "heartbeat" ] + Agents = + AgentInventory.Rich + [ + { + Name = "code-refactor" + Versions = [ "1.0.0"; "2.0.0" ] + Default = Some "2.0.0" + } + ] + } + } + + let json = payloadJson payload + + json + |> should + haveSubstring + "\"agents\":[{\"name\":\"code-refactor\",\"versions\":[\"1.0.0\",\"2.0.0\"],\"default\":\"2.0.0\"}]" + +[] +let ``welcome capabilities agents serialize as plain array (flat)`` () = + let inv = AgentInventory.Flat [ "a"; "b" ] + Json.serialize inv |> should equal "[\"a\",\"b\"]" + +[] +let ``lease serializes as bare namespace map`` () = + let lease = + { + Capabilities = Map.ofList [ "fs.read", [ "/workspace/**" ] ] + } + + Json.serialize lease |> should equal "{\"fs.read\":[\"/workspace/**\"]}" + +[] +let ``credential constraints use dotted lease keys`` () = + let c: CredentialConstraints = + { + CostBudget = Some [ "USD:5.00" ] + ModelUse = Some [ "tier-fast/*" ] + ExpiresAt = None + } + + let json = Json.serialize c + json |> should haveSubstring "\"cost.budget\":[\"USD:5.00\"]" + json |> should haveSubstring "\"model.use\":[\"tier-fast/*\"]" + +[] +let ``job.event round-trips through wire JSON`` () = + let payload: JobEventPayload = + { + Kind = "metric" + Ts = ts + Body = JobEventBody.Metric("cost.inference", 0.42m, Some "USD", Some(Map.ofList [ "model", "gpt" ])) + } + + let json = Json.serialize payload + let rt = Json.deserialize json + + match rt.Body with + | JobEventBody.Metric(name, value, unit, dims) -> + name |> should equal "cost.inference" + value |> should equal 0.42m + unit |> should equal (Some "USD") + dims |> should equal (Some(Map.ofList [ "model", "gpt" ])) + | _ -> failwith "wrong body" + +[] +let ``unknown event kind round-trips via x-vendor`` () = + let body = Json.parseElement "{\"foo\":1}" + + let payload: JobEventPayload = + { + Kind = "x-acme.custom" + Ts = ts + Body = JobEventBody.XVendor("x-acme.custom", body) + } + + let json = Json.serialize payload + json |> should haveSubstring "\"body\":{\"foo\":1}" + let rt = Json.deserialize json + + match rt.Body with + | JobEventBody.XVendor(k, _) -> k |> should equal "x-acme.custom" + | _ -> failwith "wrong body"