Skip to content
Merged
6 changes: 5 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ jobs:
java-version: "21"
cache: maven
- name: Build javadoc for all published modules
# `package` before javadoc:javadoc so inter-module dependencies resolve
# from this reactor's output instead of a (possibly stale) cached
# SNAPSHOT in the runner's local repository — otherwise javadoc cannot
# see classes a PR adds to an upstream module like arcp-core.
run: |
mvn -B -ntp -am -DskipTests -Darcp.skip.spotless=true \
-pl arcp-core,arcp-client,arcp-runtime,arcp,arcp-otel,arcp-runtime-jetty,arcp-middleware-jakarta,arcp-middleware-spring-boot,arcp-middleware-vertx,arcp-tck \
javadoc:javadoc
package javadoc:javadoc
173 changes: 155 additions & 18 deletions arcp-client/src/main/java/dev/arcp/client/ArcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import dev.arcp.core.auth.Auth;
import dev.arcp.core.capabilities.Capabilities;
import dev.arcp.core.capabilities.Feature;
import dev.arcp.core.credentials.Credential;
import dev.arcp.core.credentials.CredentialScheme;
import dev.arcp.core.error.ArcpException;
import dev.arcp.core.error.ErrorPayload;
import dev.arcp.core.events.EventBody;
Expand All @@ -20,6 +22,7 @@
import dev.arcp.core.messages.ClientInfo;
import dev.arcp.core.messages.JobAccepted;
import dev.arcp.core.messages.JobCancel;
import dev.arcp.core.messages.JobCancelled;
import dev.arcp.core.messages.JobError;
import dev.arcp.core.messages.JobEvent;
import dev.arcp.core.messages.JobFilter;
Expand All @@ -33,6 +36,7 @@
import dev.arcp.core.messages.Messages;
import dev.arcp.core.messages.SessionAck;
import dev.arcp.core.messages.SessionBye;
import dev.arcp.core.messages.SessionClosed;
import dev.arcp.core.messages.SessionHello;
import dev.arcp.core.messages.SessionJobs;
import dev.arcp.core.messages.SessionListJobs;
Expand Down Expand Up @@ -90,6 +94,10 @@ static EnumSet<Feature> safeFeatureCopy(Set<Feature> features) {
private final ScheduledExecutorService scheduler;
private final boolean autoAck;
private final Duration ackInterval;
private final Duration submitTimeout;
// Set while the transport's inbound delivery thread is inside dispatch(); used to fail fast if a
// user completes a future on this thread and then calls blocking submit (#106).
private final ThreadLocal<Boolean> inDispatch = ThreadLocal.withInitial(() -> Boolean.FALSE);
private final AtomicLong lastSeenSeq = new AtomicLong(-1);
private final AtomicLong lastAckedSeq = new AtomicLong(-1);
private final AtomicLong lastInboundMillis = new AtomicLong(System.currentTimeMillis());
Expand Down Expand Up @@ -121,6 +129,7 @@ private ArcpClient(Builder b) {
this.requestedFeatures = safeFeatureCopy(b.features);
this.autoAck = b.autoAck;
this.ackInterval = b.ackInterval;
this.submitTimeout = b.submitTimeout;
if (b.scheduler != null) {
this.scheduler = b.scheduler;
this.ownedScheduler = false;
Expand Down Expand Up @@ -165,7 +174,45 @@ public JobHandle submit(JobSubmit submit) {
return submit(submit, null);
}

/**
* Blocking submit. Returns once the runtime acknowledges with {@code job.accepted} (or fails on
* rejection). Bounded by the configured submit timeout so it can never block forever (#106).
*
* <p>Must not be called from a dispatch/result callback (i.e. the transport inbound thread);
* doing so would deadlock because the acknowledgement is delivered by that same thread. Such a
* call fails fast with {@link IllegalStateException}.
*/
public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
if (Boolean.TRUE.equals(inDispatch.get())) {
throw new IllegalStateException(
"submit() must not be called from an event/result callback; use submitAsync()");
}
CompletableFuture<JobHandle> future = submitAsync(submit, traceId);
try {
return future.get(submitTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
pendingSubmits.removeIf(p -> p.outstanding().handleFuture == future);
future.completeExceptionally(e);
throw new IllegalStateException(
"submit timed out after " + submitTimeout + " awaiting job.accepted", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("submit interrupted", e);
} catch (java.util.concurrent.ExecutionException e) {
// Preserve the prior join() behavior: surface the cause (e.g. an ArcpException such as
// DuplicateKeyException) wrapped in an unchecked CompletionException so callers can inspect
// getCause().
Throwable cause = e.getCause() != null ? e.getCause() : e;
throw new java.util.concurrent.CompletionException(cause);
}
}

/** Non-blocking submit. Completes with the {@link JobHandle} on {@code job.accepted} (#106). */
public CompletableFuture<JobHandle> submitAsync(JobSubmit submit) {
return submitAsync(submit, null);
}

public CompletableFuture<JobHandle> submitAsync(JobSubmit submit, @Nullable TraceId traceId) {
Outstanding o = new Outstanding();
MessageId requestId = MessageId.generate();
// The put-then-send pair must be atomic w.r.t. other submits so that the
Expand All @@ -180,7 +227,7 @@ public JobHandle submit(JobSubmit submit, @Nullable TraceId traceId) {
} finally {
submitLock.unlock();
}
return o.handleFuture.join();
return o.handleFuture;
}

public Page<JobSummary> listJobs(@Nullable JobFilter filter)
Expand Down Expand Up @@ -330,10 +377,13 @@ public void onNext(Envelope envelope) {
if (seq != null) {
lastSeenSeq.updateAndGet(prev -> Math.max(prev, seq));
}
inDispatch.set(Boolean.TRUE);
try {
dispatch(envelope);
} catch (RuntimeException e) {
log.warn("client dispatch error for {}: {}", envelope.type(), e.toString());
} finally {
inDispatch.set(Boolean.FALSE);
}
}

Expand Down Expand Up @@ -403,6 +453,8 @@ private void dispatch(Envelope envelope) {
case JobSubscribed ignored -> {
/* signal */
}
case JobCancelled cancelled -> handleCancelled(envelope, cancelled);
case SessionClosed ignored -> log.debug("session closed acknowledged by runtime");
Comment thread
nficano marked this conversation as resolved.
case SessionJobs jobs -> handleListResponse(jobs);
case SessionPing ping -> handlePing(ping);
case SessionPong ignored -> log.debug("client ignored: {}", envelope.type());
Expand Down Expand Up @@ -481,10 +533,40 @@ private void handleAccepted(Envelope envelope, JobAccepted accepted) {
if (head == null) {
return;
}
// §9.8.1: drop credentials whose scheme this client does not recognize rather than failing the
// whole acceptance (#97). Unknown schemes decode to CredentialScheme.UNKNOWN.
JobAccepted visible = withRecognizedCredentials(accepted);
Outstanding o = head.outstanding();
o.jobId = accepted.jobId();
outstanding.put(accepted.jobId(), o);
o.handleFuture.complete(new ClientJobHandle(accepted, o));
o.jobId = visible.jobId();
outstanding.put(visible.jobId(), o);
o.handleFuture.complete(new ClientJobHandle(visible, o));
}

private static JobAccepted withRecognizedCredentials(JobAccepted accepted) {
List<Credential> credentials = accepted.credentials();
if (credentials == null || credentials.isEmpty()) {
return accepted;
}
List<Credential> recognized =
credentials.stream().filter(c -> c.scheme() != CredentialScheme.UNKNOWN).toList();
if (recognized.size() == credentials.size()) {
return accepted;
}
return new JobAccepted(
accepted.jobId(),
accepted.agent(),
accepted.lease(),
accepted.leaseConstraints(),
accepted.budget(),
recognized.isEmpty() ? null : recognized,
accepted.acceptedAt(),
accepted.traceId());
}

private void handleCancelled(Envelope envelope, JobCancelled cancelled) {
// §7.4 ack: the terminal job.error CANCELLED that follows completes the result/subscriber; the
// ack itself is informational.
log.debug("job {} cancellation acknowledged: {}", envelope.jobId(), cancelled.reason());
}

private void handleJobEvent(Envelope envelope, JobEvent event) {
Expand All @@ -509,28 +591,76 @@ private void handleResult(Envelope envelope, JobResult result) {
return;
}
Outstanding o = outstanding.remove(jid);
if (o == null) {
return;
if (o != null) {
o.events.close();
o.resultFuture.complete(result);
}
o.events.close();
o.resultFuture.complete(result);
// Complete the live subscriber publisher (if any) so "subscribe and iterate until complete"
// consumers see onComplete instead of blocking forever, and per-job executors are released
// (#105).
completeLiveSubscriber(jid, null);
}

private void handleError(Envelope envelope, JobError err) {
JobId jid = envelope.jobId();
Outstanding o = jid != null ? outstanding.remove(jid) : null;
if (o == null) {
// Top-level (unassigned) error: fail the oldest pending submit.
PendingSubmit head = pendingSubmits.pollFirst();
if (head != null) {
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
head.outstanding().handleFuture.completeExceptionally(ex);
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
if (jid != null) {
// Terminal error for a known job: fail its result and complete its subscriber. A jobful error
// is never treated as a submit rejection (#102).
Outstanding o = outstanding.remove(jid);
if (o != null) {
o.events.close();
o.resultFuture.completeExceptionally(ex);
}
completeLiveSubscriber(jid, ex);
return;
}
o.events.close();
ArcpException ex = ArcpException.from(ErrorPayload.of(err.code(), err.message()));
o.resultFuture.completeExceptionally(ex);
// Top-level (jobless) error: correlate to the originating request via the echoed envelope id so
// a list_jobs/subscribe error never fails an unrelated in-flight submit (#102).
MessageId correlationId = envelope.id();
CompletableFuture<SessionJobs> listFuture = listRequests.remove(correlationId);
if (listFuture != null) {
listFuture.completeExceptionally(ex);
return;
}
PendingSubmit match = removePendingSubmit(correlationId);
if (match != null) {
match.outstanding().handleFuture.completeExceptionally(ex);
return;
}
// A top-level error before session.welcome rejects the handshake itself (e.g.
// RESUME_WINDOW_EXPIRED for an unknown/expired resume token, §6.3); fail connect() so the
// caller can fall back to a fresh session instead of timing out.
if (sessionFuture.completeExceptionally(ex)) {
return;
}
log.warn("dropping uncorrelated top-level error {}: {}", err.code(), err.message());
}

private @Nullable PendingSubmit removePendingSubmit(MessageId requestId) {
for (java.util.Iterator<PendingSubmit> it = pendingSubmits.iterator(); it.hasNext(); ) {
PendingSubmit p = it.next();
if (p.requestId().equals(requestId)) {
it.remove();
return p;
}
}
return null;
}

private void completeLiveSubscriber(JobId jid, @Nullable Throwable error) {
SubmissionPublisher<EventBody> pub = liveSubscribers.remove(jid);
if (pub != null) {
if (error != null) {
pub.closeExceptionally(error);
} else {
pub.close();
}
}
ExecutorService exec = liveExecutors.remove(jid);
if (exec != null) {
exec.shutdown();
}
}

private void handleListResponse(SessionJobs jobs) {
Expand Down Expand Up @@ -629,6 +759,7 @@ public static final class Builder {
private Set<Feature> features = EnumSet.allOf(Feature.class);
private boolean autoAck = true;
private Duration ackInterval = Duration.ofMillis(200);
private Duration submitTimeout = Duration.ofSeconds(30);
private @Nullable ScheduledExecutorService scheduler;
private @Nullable String resumeToken;
private @Nullable Long lastEventSeq;
Expand Down Expand Up @@ -672,6 +803,12 @@ public Builder ackInterval(Duration interval) {
return this;
}

/** Maximum time blocking {@link #submit(JobSubmit)} waits for {@code job.accepted} (#106). */
public Builder submitTimeout(Duration timeout) {
this.submitTimeout = timeout;
return this;
}

public Builder scheduler(ScheduledExecutorService s) {
this.scheduler = s;
return this;
Expand Down
9 changes: 9 additions & 0 deletions arcp-client/src/main/java/dev/arcp/client/ResultStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ public synchronized void accept(ResultChunkEvent chunk) throws IOException {
if (chunk.chunkSeq() < nextExpected) {
throw new DuplicateChunkException(chunk.chunkSeq());
}
ResultChunkEvent existing = pending.get(chunk.chunkSeq());
if (existing != null) {
// §8.4: a duplicate of a still-pending chunk is rejected like any other duplicate. A
// byte-identical retransmission is tolerated (idempotent); a divergent copy is an error.
if (existing.equals(chunk)) {
return;
}
throw new DuplicateChunkException(chunk.chunkSeq());
}
pending.put(chunk.chunkSeq(), chunk);
while (pending.containsKey(nextExpected)) {
ResultChunkEvent ready = pending.remove(nextExpected);
Expand Down
Loading
Loading