Skip to content

Add Support for Batching to Downstreams in Router#60

Open
dchoi-viant wants to merge 52 commits into
router-devfrom
router-batching
Open

Add Support for Batching to Downstreams in Router#60
dchoi-viant wants to merge 52 commits into
router-devfrom
router-batching

Conversation

@dchoi-viant
Copy link
Copy Markdown
Contributor

@dchoi-viant dchoi-viant commented Jan 28, 2026

  • Add batching (enabled by default, technically a breaking behavior change) to downstreams in router.
  • Move from work routines to semaphore based routine bounding.
  • Refactor ReloadIfNeeded() and related to separate file.
  • Fix unload counter double-counting.

… descriptions of client batching payload features.
dchoi-viant and others added 22 commits March 2, 2026 15:14
Previously, a partial or aborted body read on a 200 OK response was
silently discarded and httpPost returned (possibly-empty data, nil).
Callers then unmarshaled empty bytes and the failure surfaced
downstream as "Invalid JSON, wrong char ' ' found at position 0",
which is indistinguishable from a server-side encoding bug.

Return the read error wrapped with the partial byte count so the
caller sees an honest transport failure instead of a silent
empty-body success.

Made-with: Cursor
Co-authored-by: Claude <noreply@anthropic.com>
…cated metrics

Replace the implicit-commit pattern in writeResponse where the first
writer.Write call auto-fired WriteHeader(200) at the same time as the
body bytes. Under load this produced a wire trace of "200 OK + empty
body" when the body Write failed (broken pipe due to client cancellation),
and the server-side error was silently discarded by the surrounding code.

writeResponse now:

  - Marshals first; on marshal failure returns a typed
    responseMarshalError so ServeHTTP can emit an explicit 5xx with a
    meaningful body.
  - Sets Content-Type and Content-Length explicitly so clients can
    detect a truncated body via io.ErrUnexpectedEOF instead of
    receiving a silent empty 200 OK.
  - Calls WriteHeader(200) explicitly so the status line is committed
    in a known order, not as a side effect of the first Write.
  - Returns a typed responseCommittedError when Write fails after the
    status was committed, so ServeHTTP knows to log unconditionally
    rather than calling http.Error (which would silently drop the new
    status code with a "superfluous WriteHeader" warning).

Inline the single-caller handleAppRequest helper into ServeHTTP -- the
io.Writer narrowing it provided is no longer useful now that
writeResponse takes an http.ResponseWriter directly.

Add a new gmetric counter provider in service/stat (NewHandler) that
extends NewCtxErrOnly with two response-write failure classes
(responseMarshalError, responseCommittedError) so they can be alerted
independently of the generic error bucket. The first three keys
(error, canceled, deadlineExceeded) preserve their indices so existing
Prometheus dashboards and alerts continue to emit at the same labels.

Add unit tests in service/handler_test.go covering the success path,
total and partial post-commit Write failures, the Content-Length
invariant across a range of Response shapes, and the typed-error
chain participation in errors.As / errors.Is.

Document the response wire contract in README under
/v1/api/model/%s/eval (Content-Type and Content-Length explicit) and
the new HTTPHandler metric keys under /v1/api/metric/operations.

Made-with: Cursor
Co-authored-by: Claude <noreply@anthropic.com>
The /v1/api/model/%s/eval endpoint is the only one with a structured
request/response shape; collapse the previous flat bullet list under
/v1/api/model and promote each model endpoint to a peer top-level
section so eval can grow sub-sections (Methods, Request body,
Successful response, Error response, Example) without distorting the
surrounding density.

Document the current contract:

  - Methods (GET / POST) with single-mode vs batch-mode semantics.
  - Request body shape (input keys + reserved batch_size / cache_key).
  - Successful response shape (status, dictHash, data, serviceTimeMcs)
    with Content-Type and Content-Length set explicitly so clients can
    detect transport failures via short-read against declared length.
  - Error response status-code table (400 / 413 / 429 / 500) with a
    note that the body format is plain text today and is expected to
    align with the success response shape in a future release.
  - Curl examples for both methods.

Promote the meta endpoints to peer ## sections with one-line
descriptions of their purpose for the mly client bootstrap path.

Made-with: Cursor
Co-authored-by: Claude <noreply@anthropic.com>
…us codes

Unify the wire-format contract across success and failure: every
response from /v1/api/model/%s/eval is now a JSON-encoded Response
object with Content-Type: application/json and explicit Content-Length,
regardless of whether the prediction succeeded or any phase failed.
Errors retain their proper HTTP status codes (400 / 413 / 429 / 500);
only the body shape changes, from plain-text http.Error output to the
same Response struct used on success with status="error" and the
error field populated.

writeResponse now takes an http.StatusXxx parameter so the same
explicit-commit machinery (marshal first, declare Content-Length,
WriteHeader, Write) can serve both 200 success and 4xx/5xx error
responses.

writeError is a new helper that:

  - Calls SetError(err) and clears response.Data so the marshal
    cannot fail on a Data value that may have triggered the original
    failure.
  - Delegates to writeResponse with the supplied status code.
  - On post-commit Write failure, appends ResponseCommittedError to
    hStats and logs unconditionally (matching the success-path
    contract).
  - On marshal failure of the cleared error response (essentially
    impossible -- the struct now contains only string + int fields),
    falls back to http.Error so the client at least receives a
    status code.

All five http.Error call sites in ServeHTTP (GET-path query parse,
POST body read, POST body unmarshal, no-request fall-through, and
the post-Service.Do error block) now route through writeError. The
post-Service.Do block keeps its existing responseCommittedError
short-circuit since the success-response status is already on the
wire and cannot be changed.

Cross-repo audit (viant/mly, adelphic/mly, adelphic/mediator) found
no consumer that parses the previous plain-text error body format.
The one near-miss -- mediator's filter/ml/roas/service.go
isTimeoutError -- substring-matches "context deadline exceeded" /
"timeout" / "deadline exceeded" / "context canceled" against the
error string, which keeps working because the substring is now
embedded in the JSON error body wrapped by the client's httpPost
error format.

Update tests:

  - Existing writeResponse tests get the new status parameter.
  - New TestWriteResponse_HonorsStatusParam confirms the supplied
    status reaches the wire across the four error status codes plus
    200.
  - New TestWriteError_EmitsJSONErrorWithStatus locks in the wire
    shape (status code, Content-Type, Content-Length, JSON body
    with status=error and populated error message; cleared Data).
  - New TestWriteError_FallsBackToHTTPErrorOnCommittedFailure
    confirms hStats records the post-commit failure when the body
    write fails after the error status was committed.

Update README error response section to describe the new contract.

Made-with: Cursor
Co-authored-by: Claude <noreply@anthropic.com>
Surface the server-side error message into the caller's Response
struct on non-2xx responses, so callers that defensively check
response.Error (e.g. mediator's ml/fraud and ml/fraudv3 services)
finally observe predict-time errors instead of silently swallowing
them.

Two coordinated changes:

1. httpPost now returns the response body alongside the error on a
   non-2xx terminal status. Previously it returned (nil, err) on
   non-200, which discarded the body and prevented any structured
   parsing. The retry-loop logic is updated to capture the body in
   a postBody variable so terminal-error iterations preserve the
   bytes for the outer return. Successful iterations return data
   directly with err == nil as before.

2. Run() does a best-effort gojay.Unmarshal of the body into the
   caller's response struct before returning the err. For a
   v0.20.0+ server's JSON error body this populates
   response.Status = "error" and response.Error = "<message>". For
   an older server's plain-text body the unmarshal silently fails
   and the response struct stays untouched. The non-nil err return
   is unchanged in either case and remains the source-of-truth
   signal; this is purely additive population of the response
   struct for the callers that want it.

Backward compatibility is preserved both directions:

  - New v0.20.0 client against an older server: response.Error
    stays empty (best-effort unmarshal silently fails), err is
    returned as before, and any consumer that checks err keeps
    working unchanged.
  - Older client against a new v0.20.0 server: client never sees
    the populated response.Error (the older client's httpPost
    returns nil body on non-200), but err is still surfaced and
    consumers that check err keep working unchanged.

Add TestService_Run_ParsesErrorBody covering four cases: 400+JSON,
500+JSON, 400+plain-text, 500+plain-text. Each asserts both the
err return and the response.Error/Status population behavior so
the contract is locked in for both server-version axes.

Made-with: Cursor
Co-authored-by: Claude <noreply@anthropic.com>
Record the v0.20.0 wire-format change: error responses from
/v1/api/model/%s/eval now share the same shape as success responses
(JSON-encoded Response object, Content-Type: application/json,
explicit Content-Length) instead of plain text. HTTP status codes
are unchanged.

The client side picks this up automatically: response.Error is
populated from the parsed body on non-2xx responses, so callers can
rely on either the err return value or response.Error as the error
signal. Older clients connecting to a v0.20.0+ server still work
(the err return is unchanged); newer clients connecting to a pre-
v0.20.0 server still work (the best-effort body parse silently
fails on plain text and response.Error stays empty, with the err
return remaining the source-of-truth signal).

Made-with: Cursor
Co-authored-by: Claude <noreply@anthropic.com>
…requests

Add stat.Shed key to the http counter provider and append it in
postRequest when getHost returns an error wrapping common.ErrNodeDown.
Existing _down (trip event) and _error counters are unchanged; the
new _shed disambiguates breaker-rejected requests from requests that
reached httpPost and failed there.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Made-with: Cursor
FlagUp and FlagDown now use atomic.CompareAndSwapInt32 on Down so the
flag is consistent with IsUp's atomic.LoadInt32, and FlagUp's
resetDuration reset runs under the mutex so it cannot clobber a
concurrent FlagDown's resetDuration *= 2 (lost-update bug that defeated
exponential backoff under flapping).

Adds breaker_test.go (first tests in the package) covering the data
race under -race, backoff accumulation across trips, and CAS-based
idempotency of FlagUp / FlagDown.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Made-with: Cursor
Pending Inc/Dec via metric.EnterThenExit captures the recent-bucket
index at Enter time and decrements that bucket on Exit; rotation
between Enter and Exit lands the decrement in the wrong bucket.
Also: Dir is not a string, so MultiCounter takes a mutex per
Inc/Dec which serializes hot paths.

Both _pending and _pending_Max remain operationally useful in
different regimes (high-QPS vs low-QPS operations); the doc
comment explains which signal to query in each case.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Made-with: Cursor
…ss-through

Adds LatencyBreaker parallel to the existing connection-failure
Breaker. Trips on latest > LatestThreshold OR rolling > RollingThreshold;
recovers after K consecutive observations satisfy both thresholds. While
ON, allows a configurable fraction of requests through to drive recovery
sensing without committing real load.

Host.IsUp() now requires both breakers to be up. Service.init wires a
LatencyBreaker into each host when at least one threshold is non-zero
(zero -> disabled, backward-compatible). postRequest feeds elapsed time
to LatencyBreaker.Observe after each httpPost.

Caller opts in by setting Config.LatencyBreaker* fields or via
WithLatencyBreaker. Pass-through fraction defaults to 0.01 (1%).

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Made-with: Cursor
Brings in latency-aware breaker, race fixes in circut.Breaker, the
ClientHTTP_shed marker, and prior v0.20.x server response/error
handling work.

Made-with: Cursor

# Conflicts:
#	service/handler.go
#	shared/client/service.go
Add WithPrometheusMetrics(false) for short-lived helper clients that
should not register long-lived native Prometheus series. Use it for
server startup self-test clients so completed self-tests do not leave
zero-valued mly_client_* series in the process-wide registry.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Made-with: Cursor
WithPrometheusMetrics(false) skips native Prometheus registration, leaving
observer and counter fields nil. Guard observation helpers and direct
counter increments so helper clients can opt out without panicking.

Exercise the opt-out path in the client Run test.

Co-authored-by: GPT-5.5 <noreply@openai.com>
Made-with: Cursor
Validate latency-breaker thresholds, rolling window, recovery count, and
pass-through fraction during Service initialization. Normalize defaults in
one path so direct Config usage and WithLatencyBreaker behave the same.

Also wrap Service.init errors with context while preserving the underlying
cause via %w.

Co-authored-by: GPT-5.5 <noreply@openai.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant