Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,17 @@ app.Run("http://localhost:7878")
On the client side use `WebSocketClientTransport.connectAsync`, which
opens a `ClientWebSocket`, attaches the bearer token (if any) as an
`Authorization` header on the upgrade, and returns an `ITransport`.

> **Warning:** The WebSocket `Authorization` header does **not**
> authenticate the ARCP session. Session auth is sent in
> `session.hello.payload.auth` from `ArcpClientOptions.Auth`.
> `ArcpClientOptions.defaults` sets `AuthScheme.None` (`auth.scheme =
> "none"`), so passing a bearer token only to `connectAsync` leaves
> the ARCP principal anonymous unless you also set
> `Auth = AuthScheme.Bearer token` on the client options. Use
> `defaults` only with runtimes that explicitly allow anonymous
> sessions.

That header is host-layer metadata; ARCP session authentication still
comes from `ArcpClientOptions.Auth`:

Expand Down
14 changes: 12 additions & 2 deletions docs/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ let _ = server.HandleSessionAsync(serverT, CancellationToken.None)
let client = new ArcpClient(clientT, ArcpClientOptions.defaults)
```

> **Warning:** `ArcpClientOptions.defaults` sends `auth.scheme = "none"`.
> Use it only with runtimes that allow anonymous sessions. For bearer
> auth, set `Auth = AuthScheme.Bearer token` on the client options —
> the WebSocket upgrade `Authorization` header alone does not
> authenticate the ARCP session (see [WebSocket](#websocket) below).

Calling `CloseAsync` on either half completes both channels, ending
the paired `Receive` enumerator on the other side.

Expand All @@ -26,8 +32,12 @@ the paired `Receive` enumerator on the other side.
The convenience constructor `connectAsync` opens a `ClientWebSocket`,
adds the bearer token (if any) as the `Authorization` header on the
upgrade, and returns an `ITransport`. Treat that header as host-layer
metadata; ARCP session authentication still comes from
`ArcpClientOptions.Auth`:
metadata only — it does **not** authenticate the ARCP session.
Session auth is sent in `session.hello.payload.auth` from
`ArcpClientOptions.Auth`. When the runtime expects bearer auth, pass
the same token to both `connectAsync` and
`Auth = AuthScheme.Bearer token`; `ArcpClientOptions.defaults` sends
`auth.scheme = "none"` and leaves the ARCP principal anonymous:

```fsharp
open ARCP.Client.Transport
Expand Down
7 changes: 3 additions & 4 deletions src/Arcp.Client/ArcpClient.fs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,10 @@ type ArcpClient(transport: ITransport, options: ArcpClientOptions) =
let assembler = w.ChunkIndex.GetOrCreate rid

match assembler.Append(chunkSeq, data, enc, more) with
| Ok _ -> w.Channel.Writer.TryWrite payload.Body |> ignore
| Ok _ -> ()
| Error err ->
// Out-of-order or undecodable chunk: tear down the
// handle so callers don't sit on a job that will never
// produce a usable result.
// Out-of-order or undecodable chunk: complete the
// result task with this error and drop the handle.
handles.TryRemove jid |> ignore
w.Channel.Writer.TryComplete() |> ignore
w.ResultSetter.TrySetResult(Error err) |> ignore
Expand Down
6 changes: 3 additions & 3 deletions src/Arcp.Client/JobHandle.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ open ARCP.Client.Internal
///
/// `Events` is an `IAsyncEnumerable<JobEventBody>` over every non-
/// `result_chunk` event; consumers iterate with `await foreach`
/// (C#) or `for x in handle.Events do` (F#). `ResultBytes` returns
/// the assembled bytes from any `result_chunk` stream associated
/// with this job.
/// (C#) or `for x in handle.Events do` (F#). `TryReadResultBytes`
/// returns the assembled bytes for a given `ResultId` once the
/// chunk stream has closed (`byte[] option`).
///
/// `Result` resolves once the terminating `job.result` / `job.error`
/// arrives.
Expand Down
7 changes: 4 additions & 3 deletions src/Arcp.Client/Transport/WebSocket.fs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ type WebSocketClientTransport(socket: WebSocket, ownsSocket: bool, maxMessageByt
module WebSocketClientTransport =
/// Connect a new client transport to `uri`. The bearer token (if
/// provided) is added as the `Authorization` header on the
/// upgrade request. That header is host-layer metadata; ARCP
/// session authentication is configured separately on
/// `ArcpClientOptions`.
/// upgrade request. That header is host-layer metadata only and
/// does not authenticate the ARCP session — set
/// `ArcpClientOptions.Auth = AuthScheme.Bearer token` for session
/// auth (`ArcpClientOptions.defaults` sends `auth.scheme = "none"`).
let connectAsync (uri: Uri) (bearerToken: string option) (ct: CancellationToken) : Task<ITransport> =
task {
let client = new ClientWebSocket()
Expand Down
3 changes: 0 additions & 3 deletions src/Arcp.Core/Capabilities.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ type AgentInventoryEntry =
Default: string option
}

/// Agent inventory advertised in `session.welcome.payload.capabilities.agents`.
/// The runtime always emits the rich shape when `agent_versions` is
/// in the negotiated feature set; otherwise the flat shape is used.
/// Agent inventory advertised in
/// `session.welcome.payload.capabilities.agents`. The flat shape is
/// emitted when `agent_versions` is not in the negotiated feature
Expand Down
13 changes: 4 additions & 9 deletions src/Arcp.Core/Errors.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@ namespace ARCP.Core
open System
open System.Text.Json

/// Canonical ARCP error taxonomy (spec §12). Fifteen cases.
/// Canonical ARCP error taxonomy (spec §12). Fifteen DU cases —
/// every wire error code has exactly one DU arm.
///
/// `retryable` follows §12. Three cases must be `retryable = false`
/// because a naive retry will fail identically:
/// `LeaseExpired`, `BudgetExhausted`, `AgentVersionNotAvailable`.
/// Only `Timeout`, `HeartbeatLost`, and `InternalError` are retryable
/// per §12; the remaining twelve arms are not.
///
/// F# consumers prefer `Result<_, ARCPError>` for expected outcomes.
/// `ArcpException` (below) wraps the same value for C# callers and
/// for fatal paths where exceptions are the idiom.
/// Canonical ARCP error taxonomy (spec §12). Fifteen DU cases —
/// every wire error code has exactly one DU arm.
[<RequireQualifiedAccess>]
type ARCPError =
| PermissionDenied of message: string * details: JsonElement option
Expand Down Expand Up @@ -95,9 +93,6 @@ module ARCPError =
| ARCPError.InvalidRequest(_, d) -> d
| _ -> None

/// Exception form for C# interop and for fatal paths in F# where
/// the spec-canonical surface is "throw with code". Carries the
/// underlying `ARCPError`.
/// Convenience alias for `ARCPError`. The protocol uses "ARCP"
/// all-caps, so the spec-named type is `ARCPError`; `SdkError`
/// is the F#-conventional name for callers who prefer it.
Expand Down
10 changes: 7 additions & 3 deletions src/Arcp.Core/Json.fs
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,13 @@ module internal JsonConverters =
/// JSON configuration shared across the SDK.
///
/// The wire format (spec §5.1, §6, §7, §8) puts the discriminator as
/// a top-level `type` field next to peer fields. Custom converters in
/// `JsonConverters` pin the spec-mandated flat shapes for the unions
/// that appear inside payloads.
/// a top-level `type` field next to peer fields. `buildOptions` uses
/// `WithUnionExternalTag()` keyed on `"type"`, combined with
/// `WithUnionUnwrapRecordCases` and related unwrap options, to emit
/// the flat `{ "type": "...", ...fields }` shape rather than a nested
/// case wrapper. Custom converters in `JsonConverters` pin the
/// spec-mandated flat shapes for the unions that appear inside
/// payloads.
[<RequireQualifiedAccess>]
module Json =
let private buildOptions () : JsonSerializerOptions =
Expand Down
7 changes: 3 additions & 4 deletions src/Arcp.Runtime/ArcpServer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,9 @@ type ArcpServer(options: ArcpServerOptions) =
let registerHandler (name: string) (version: string) (h: ArcpAgentHandler) =
agentHandlers.[name + "@" + version] <- h
// The inventory stores an `AgentHandler` purely as a presence
// marker — `JobSubmitFlow` discards it and dispatches via
// `agentHandlers` keyed by `name@version`. The placeholder
// raises so any regression that routes through it surfaces
// loudly instead of returning a garbage JsonElement.
// marker — `JobSubmitFlow` dispatches via `agentHandlers`
// keyed by `name@version`. The placeholder raises if invoked
// so routing regressions fail fast.
let placeholder: AgentHandler =
fun _ ->
raise (
Expand Down
7 changes: 3 additions & 4 deletions src/Arcp.Runtime/Internal/JobSubmitFlow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,10 @@ module internal JobSubmitFlow =
| Ok constraints ->
let jobId = JobId.newId ()

// Claim the idempotency key first so a duplicate
// submission short-circuits before any side effects
// Claim the idempotency key before any side effects
// (record registration, watchdog start, provisioner
// call). Without this, two concurrent submits with
// the same key both fell through and created jobs.
// call) so concurrent duplicate submits collapse
// to one job.
let claimResult =
match submit.IdempotencyKey with
| Some key -> jobs.TryClaimIdempotencyKey(key, jobId)
Expand Down
3 changes: 2 additions & 1 deletion src/Arcp.Runtime/JobManager.fs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ type internal JobManager(timeProvider: TimeProvider, outbox: IJobOutbox) =
LastEventSeq = r.LastEventSeq
}

/// Emit a `job.event` for `record`. Updates `LastEventSeq`.
/// Emit a `job.event` for `record` via the configured `IJobOutbox`.
/// The outbox is responsible for updating `LastEventSeq`.
member this.EmitEventAsync(record: JobRecord, body: JobEventBody) : Task = outbox.EmitJobEventAsync(record, body)

member this.EmitResultAsync(record: JobRecord, payload: JobResultPayload) : Task =
Expand Down
64 changes: 64 additions & 0 deletions tests/Arcp.IntegrationTests/JobLifecycleTests.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module ARCP.IntegrationTests.JobLifecycleTests

open System
open System.Threading
open System.Threading.Tasks
open Xunit
Expand Down Expand Up @@ -75,3 +76,66 @@ let ``idempotency key returns same JobId on second submit`` () =
h1.JobId.Value |> should equal h2.JobId.Value
do! teardown p
}

[<Fact>]
let ``result_chunk events are not forwarded to Events but assemble via TryReadResultBytes`` () =
task {
let! p =
connect
(fun s ->
s.RegisterAgent(
"chunker",
fun ctx ->
task {
let rid = ctx.BeginStreamingResult()
let payload = System.Text.Encoding.UTF8.GetBytes("hello")

do!
ctx.EmitResultChunkAsync(
rid,
0L,
ReadOnlyMemory<byte>(payload),
ChunkEncoding.Utf8,
false,
ctx.CancellationToken
)

return Json.serializeToElement<string option> None
}
))
(Set.ofList [ Features.ResultChunk ])

let! handle = p.Client.SubmitAsync(mkRequest "chunker", CancellationToken.None)

let events = ResizeArray<JobEventBody>()
let enumerator = handle.Events.GetAsyncEnumerator(CancellationToken.None)

try
let mutable more = true

while more do
let! has = enumerator.MoveNextAsync().AsTask()

if has then events.Add enumerator.Current else more <- false
finally
ignore (enumerator.DisposeAsync().AsTask())

events
|> Seq.exists (function
| JobEventBody.ResultChunk _ -> true
| _ -> false)
|> should equal false

let! result = handle.Result

match result with
| Ok rp ->
rp.ResultId.IsSome |> should equal true

match handle.TryReadResultBytes(ResultId.ofString rp.ResultId.Value) with
| Some bytes -> System.Text.Encoding.UTF8.GetString bytes |> should equal "hello"
| None -> failwith "expected assembled bytes"
| Error e -> failwithf "%A" e

do! teardown p
}