Skip to content
4 changes: 4 additions & 0 deletions arcp-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
</parent>

<artifactId>arcp-client</artifactId>

<properties>
<arcp.skip.coverage.check>false</arcp.skip.coverage.check>
</properties>
<name>arcp-client</name>
<description>ARCP client SDK.</description>

Expand Down
203 changes: 194 additions & 9 deletions arcp-client/src/main/java/dev/arcp/client/ArcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,22 @@ private ArcpClient(Builder b) {
this.lastEventSeq = b.lastEventSeq;
}

/**
* Starts building a client over the given transport.
*
* @param transport connected transport the client will own for the session's lifetime
* @return a new {@link Builder} with default client info, auth, and feature set
*/
public static Builder builder(Transport transport) {
return new Builder(transport);
}

/** Send hello and return a future completing with the negotiated {@link Session}. */
/**
* Sends {@code session.hello} and returns a future completing with the negotiated {@link Session}
* once {@code session.welcome} arrives (§6.2).
*
* @return a future completing with the session, or exceptionally if the handshake is rejected
*/
public CompletableFuture<Session> connect() {
transport.incoming().subscribe(this);
SessionHello hello =
Expand All @@ -157,6 +168,17 @@ public CompletableFuture<Session> connect() {
return sessionFuture;
}

/**
* Blocking variant of {@link #connect()}: sends {@code session.hello} and waits for {@code
* session.welcome} (§6.2).
*
* @param timeout maximum time to wait for the handshake to complete
* @return the negotiated session
* @throws InterruptedException if the calling thread is interrupted while waiting
* @throws TimeoutException if no {@code session.welcome} arrives within {@code timeout}
* @throws ArcpException if the runtime rejects the handshake with a protocol error (e.g. {@code
* RESUME_WINDOW_EXPIRED} for a stale resume token, §6.3)
*/
public Session connect(Duration timeout)
throws InterruptedException, TimeoutException, ArcpException {
try {
Expand All @@ -170,6 +192,12 @@ public Session connect(Duration timeout)
}
}

/**
* Blocking submit without trace context; see {@link #submit(JobSubmit, TraceId)}.
*
* @param submit the {@code job.submit} payload (§7)
* @return a handle to the accepted job
*/
public JobHandle submit(JobSubmit submit) {
return submit(submit, null);
}
Expand All @@ -181,6 +209,10 @@ public JobHandle submit(JobSubmit submit) {
* <p>Must not be called from a dispatch/result callback (i.e. the transport inbound thread);
* doing so would deadlock because the acknowledgement is delivered by that same thread. Such a
* call fails fast with {@link IllegalStateException}.
*
* @param submit the {@code job.submit} payload (§7)
* @param traceId W3C trace context to stamp on the envelope (§11), or {@code null} for none
* @return a handle to the accepted job
*/
public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
if (Boolean.TRUE.equals(inDispatch.get())) {
Expand All @@ -207,11 +239,24 @@ public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
}
}

/** Non-blocking submit. Completes with the {@link JobHandle} on {@code job.accepted} (#106). */
/**
* Non-blocking submit. Completes with the {@link JobHandle} on {@code job.accepted} (#106).
*
* @param submit the {@code job.submit} payload (§7)
* @return a future completing when the runtime accepts (or rejects) the job
*/
public CompletableFuture<JobHandle> submitAsync(JobSubmit submit) {
return submitAsync(submit, null);
}

/**
* Non-blocking submit with trace context. Completes with the {@link JobHandle} on {@code
* job.accepted}, or exceptionally on rejection (#106).
*
* @param submit the {@code job.submit} payload (§7)
* @param traceId W3C trace context to stamp on the envelope (§11), or {@code null} for none
* @return a future completing when the runtime accepts (or rejects) the job
*/
public CompletableFuture<JobHandle> submitAsync(JobSubmit submit, @Nullable TraceId traceId) {
Outstanding o = new Outstanding();
MessageId requestId = MessageId.generate();
Expand All @@ -230,14 +275,32 @@ public CompletableFuture<JobHandle> submitAsync(JobSubmit submit, @Nullable Trac
return o.handleFuture;
}

/**
* Lists the first page of jobs visible to this session via {@code session.list_jobs} (§6.6).
*
* @param filter status/agent/creation-time filter, or {@code null} for all visible jobs
* @return the first page of job summaries
* @throws InterruptedException if the calling thread is interrupted while waiting
* @throws TimeoutException if the runtime does not answer with {@code session.jobs} in time
* @throws ArcpException if the runtime rejects the request with a protocol error
*/
public Page<JobSummary> listJobs(@Nullable JobFilter filter)
throws InterruptedException, TimeoutException, ArcpException {
return listJobs(filter, null, null);
}

/**
* List jobs with optional pagination. Supply {@code cursor} from the previous {@link Page} to
* continue, or {@code null} to fetch the first page. {@code limit} caps the page size.
* Lists jobs with optional pagination via {@code session.list_jobs} (§6.6). Supply {@code cursor}
* from the previous {@link Page} to continue, or {@code null} to fetch the first page. {@code
* limit} caps the page size.
*
* @param filter status/agent/creation-time filter, or {@code null} for all visible jobs
* @param limit maximum number of jobs per page, or {@code null} for the runtime default
* @param cursor {@code next_cursor} from the previous page, or {@code null} for the first page
* @return one page of job summaries plus the continuation cursor, if any
* @throws InterruptedException if the calling thread is interrupted while waiting
* @throws TimeoutException if the runtime does not answer with {@code session.jobs} in time
* @throws ArcpException if the runtime rejects the request with a protocol error
*/
public Page<JobSummary> listJobs(
@Nullable JobFilter filter, @Nullable Integer limit, @Nullable String cursor)
Expand Down Expand Up @@ -267,6 +330,15 @@ public Page<JobSummary> listJobs(
}
}

/**
* Attaches to a job's event stream via {@code job.subscribe} (§7.6), e.g. one submitted in a
* different session. The first call for a {@code jobId} sends the subscribe message; subsequent
* calls share the same publisher and the original options.
*
* @param jobId the job to observe
* @param options live-only or history-replaying subscription, per {@link SubscribeOptions}
* @return a hot publisher of decoded {@code job.event} bodies for the subscribed job
*/
public Flow.Publisher<EventBody> subscribe(JobId jobId, SubscribeOptions options) {
java.util.concurrent.atomic.AtomicBoolean inserted =
new java.util.concurrent.atomic.AtomicBoolean(false);
Expand All @@ -291,6 +363,8 @@ public Flow.Publisher<EventBody> subscribe(JobId jobId, SubscribeOptions options
/**
* Locally unsubscribe from job events and notify the runtime via {@code job.unsubscribe}. Closes
* the local {@link Flow.Publisher} so any downstream subscribers see {@code onComplete}.
*
* @param jobId the job whose subscription to cancel (§7.6)
*/
public void unsubscribe(JobId jobId) {
SubmissionPublisher<EventBody> pub = liveSubscribers.remove(jobId);
Expand All @@ -310,6 +384,12 @@ public void unsubscribe(JobId jobId) {
}
}

/**
* Sends an explicit {@code session.ack} acknowledging processed events (§6.5). Only needed when
* auto-ack is disabled via {@link Builder#autoAck(boolean)}.
*
* @param lastProcessedSeq highest {@code event_seq} the application has fully processed
*/
public void ack(long lastProcessedSeq) {
send(Message.Type.SESSION_ACK, new SessionAck(lastProcessedSeq), sessionId, null, null, null);
}
Expand Down Expand Up @@ -350,12 +430,21 @@ public void close() {
}
}

/** Returns the highest event sequence number seen from the server, or -1 if none. */
/**
* Returns the highest event sequence number seen from the server, or -1 if none. Useful as the
* {@code last_event_seq} when resuming (§6.3).
*
* @return the highest observed {@code event_seq}, or -1 before any sequenced event arrives
*/
public long lastSeenSeq() {
return lastSeenSeq.get();
}

/** Returns the active session after {@link #connect()} completes. */
/**
* Returns the active session after {@link #connect()} completes.
*
* @return the negotiated session snapshot
*/
public Session session() {
Session current = session;
if (current == null) {
Expand Down Expand Up @@ -751,6 +840,10 @@ public void cancel() {
}
}

/**
* Fluent builder for {@link ArcpClient}. Obtain via {@link ArcpClient#builder(Transport)}; every
* setter returns this builder so calls can be chained, ending in {@link #build()}.
*/
public static final class Builder {
private final Transport transport;
private @Nullable ObjectMapper mapper;
Expand All @@ -768,53 +861,117 @@ public static final class Builder {
this.transport = transport;
}

/**
* Overrides the Jackson mapper used for wire encoding; defaults to {@link ArcpMapper#shared()}.
*
* @param m mapper configured for ARCP wire I/O
* @return this builder
*/
public Builder mapper(ObjectMapper m) {
this.mapper = m;
return this;
}

/**
* Sets the client name and version advertised in {@code session.hello} (§6.2).
*
* @param name client implementation name, e.g. {@code examplectl}
* @param version client implementation version, e.g. {@code 0.4.1}
* @return this builder
*/
public Builder client(String name, String version) {
this.info = new ClientInfo(name, version);
return this;
}

/**
* Sets the authentication presented in {@code session.hello} (§6.1); defaults to anonymous.
*
* @param a authentication payload, e.g. {@link Auth#bearer(String)}
* @return this builder
*/
public Builder auth(Auth a) {
this.auth = a;
return this;
}

/**
* Shorthand for {@code auth(Auth.bearer(token))}: authenticates with a bearer token (§6.1).
*
* @param token bearer token sent in {@code session.hello.payload.auth.token}
* @return this builder
*/
public Builder bearer(String token) {
this.auth = Auth.bearer(token);
return this;
}

/**
* Restricts the features requested in {@code session.hello} (§6.2); defaults to all features.
* The effective set is the intersection with what the runtime grants in {@code
* session.welcome}.
*
* @param features features to request; {@code null} or empty requests none
* @return this builder
*/
public Builder features(Set<Feature> features) {
this.features = safeFeatureCopy(features);
return this;
}

/**
* Enables or disables periodic automatic {@code session.ack} emission (§6.5); enabled by
* default. When disabled, the application must call {@link ArcpClient#ack(long)} itself.
*
* @param enabled whether the client acks processed events automatically
* @return this builder
*/
public Builder autoAck(boolean enabled) {
this.autoAck = enabled;
return this;
}

/**
* Sets the period between automatic {@code session.ack} emissions (§6.5); defaults to 200ms.
*
* @param interval delay between ack ticks when auto-ack is enabled
* @return this builder
*/
public Builder ackInterval(Duration interval) {
this.ackInterval = interval;
return this;
}

/** Maximum time blocking {@link #submit(JobSubmit)} waits for {@code job.accepted} (#106). */
/**
* Maximum time blocking {@link #submit(JobSubmit)} waits for {@code job.accepted} (#106).
*
* @param timeout submit timeout; defaults to 30 seconds
* @return this builder
*/
public Builder submitTimeout(Duration timeout) {
this.submitTimeout = timeout;
return this;
}

/**
* Supplies an external scheduler for ack ticks and the heartbeat watchdog. The client does not
* shut down a supplied scheduler on {@link ArcpClient#close()}; by default it creates and owns
* a single-threaded daemon scheduler.
*
* @param s scheduler to run the client's periodic tasks on
* @return this builder
*/
public Builder scheduler(ScheduledExecutorService s) {
this.scheduler = s;
return this;
}

/** Resume a prior session by supplying the token received in {@link Session#resumeToken()}. */
/**
* Resume a prior session by supplying the token received in {@link Session#resumeToken()}.
*
* @param token resume token presented in {@code session.resume} (§6.3)
* @return this builder
*/
public Builder resumeToken(String token) {
this.resumeToken = token;
return this;
Expand All @@ -823,22 +980,50 @@ public Builder resumeToken(String token) {
/**
* Resume from a known event sequence number (§6.3). Used together with {@link
* #resumeToken(String)} to re-subscribe to events the client may have missed.
*
* @param seq the {@code last_event_seq} the client has already processed
* @return this builder
*/
public Builder lastEventSeq(long seq) {
this.lastEventSeq = seq;
return this;
}

/**
* Builds the client. The client does not connect until {@link ArcpClient#connect()} is called.
*
* @return a new {@link ArcpClient} over this builder's transport and settings
*/
public ArcpClient build() {
return new ArcpClient(this);
}
}

/** Construct a job submit payload conveniently. */
/**
* Construct a job submit payload conveniently, with no lease request, constraints, idempotency
* key, or runtime cap (§7).
*
* @param agent agent reference, optionally versioned, e.g. {@code code-refactor@2.0.0}
* @param input agent-defined input document carried in {@code job.submit.payload.input}
* @return a minimal {@code job.submit} payload for {@link #submit(JobSubmit)}
*/
public static JobSubmit jobSubmit(String agent, JsonNode input) {
return new JobSubmit(AgentRef.parse(agent), input, null, null, null, null);
}

/**
* Construct a fully specified job submit payload (§7), validating that any {@code expires_at}
* lease constraint lies in the future.
*
* @param agent agent reference, optionally versioned, e.g. {@code code-refactor@2.0.0}
* @param input agent-defined input document carried in {@code job.submit.payload.input}
* @param lease requested capability lease ({@code lease_request}), or {@code null} for none
* @param constraints lease constraints such as {@code expires_at}, or {@code null} for none
* @param idempotencyKey key making resubmission return the same {@code job.accepted} (§7.2), or
* {@code null} to disable idempotency
* @param maxRuntimeSec maximum job runtime in seconds, or {@code null} for no cap
* @return a {@code job.submit} payload for {@link #submit(JobSubmit)}
*/
public static JobSubmit jobSubmit(
String agent,
JsonNode input,
Expand Down
Loading
Loading