From 9fd4eaf14886111b1189eed3b4935c74166936b1 Mon Sep 17 00:00:00 2001 From: couragehong Date: Thu, 28 May 2026 17:30:16 +0900 Subject: [PATCH 1/5] feat(server): bootstrap-aware Server status surface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Until self-bootstrap completes, Embed/EmbedBatch return codes.FailedPrecondition rather than dialing into a nil backend, and Health reports STATUS_LOADING with the current Phase / bytes_done / bytes_total / message that bootstrap can feed in. Mechanics: - backend reference moves to atomic.Pointer[backend.LlamaBackend]; modelIdentity becomes atomic.Value; bootstrapStatus is an atomic.Pointer[bootstrapState] so {phase, bytes, message} publish and observe as one tuple. - New(version) constructs a Server with nil backend. SetBackend(b, modelID) wires it after bootstrap, writing maxTextLength / modelIdentity / backend in that order so a reader seeing backend necessarily sees the other two. - SetBootstrapStatus(phase, bytesDone, bytesTotal, message) is the loading-state sink consumed by the next Health call while backend is still nil. Health priority is SHUTTING_DOWN (shutdownCh closed) → LOADING (backend nil) → DEGRADED (IsHealthy false) → OK. SHUTTING_DOWN outranks LOADING so a drain-in-progress daemon doesn't advertise itself as "still loading" mid-drain. Tests cover the LOADING / SetBootstrapStatus reflection / FAILED_PRECONDITION before-SetBackend / SHUTTING_DOWN priority cases. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/server/server.go | 200 +++++++++++++++++++++++++-------- internal/server/server_test.go | 126 ++++++++++++++++++++- 2 files changed, 272 insertions(+), 54 deletions(-) diff --git a/internal/server/server.go b/internal/server/server.go index 81431db..530ae30 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -15,6 +15,8 @@ import ( runedv1 "github.com/CryptoLabInc/runed/gen/runed/v1" "github.com/CryptoLabInc/runed/internal/backend" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Plan A constants for the Info RPC. Qwen3-Embedding-0.6B fixes these at @@ -25,20 +27,44 @@ const ( maxBatchSize int32 = 32 ) -// Server implements runedv1.RunedServiceServer. It does not own the backend — -// callers (cmd/runed) are responsible for Start/Stop on the LlamaBackend. +// bootstrapState is the snapshot fed to Health while STATUS_LOADING. +// Treated as immutable once stored — SetBootstrapStatus replaces the +// whole pointer atomically so readers always see a consistent tuple. +type bootstrapState struct { + phase runedv1.HealthResponse_Phase + bytesDone int64 + bytesTotal int64 + message string +} + +// Server implements runedv1.RunedServiceServer. It does not own the backend +// lifecycle — callers (cmd/runed) construct New(), drive self-bootstrap while +// the gRPC socket already listens (Health reports STATUS_LOADING during that +// window), then call SetBackend once llama-server is up. type Server struct { runedv1.UnimplementedRunedServiceServer - backend *backend.LlamaBackend - version string - modelIdentity string - startedAt time.Time - // maxTextLength (chars) is snapshotted from the backend's ctx-size (tokens) - // in New(); chars==tokens is conservative (dense text is ≥~1.27 chars/token), - // so it always fits ctx. Advertised via Info → clients cap to whatever ctx - // the daemon booted with, keeping the char limit locked to the token limit. - maxTextLength int32 + // backend is nil until SetBackend is called. Embed/EmbedBatch return + // FAILED_PRECONDITION while nil; Health returns STATUS_LOADING. + backend atomic.Pointer[backend.LlamaBackend] + + version string + startedAt time.Time + + // modelIdentity is "" until SetBackend computes the model SHA. Stored + // via atomic.Value so Info readers don't race the writer. + modelIdentity atomic.Value // string + + // maxTextLength (chars) is sourced from the backend's ctx-size (tokens) + // at SetBackend time; chars==tokens is conservative (dense text is + // ≥~1.27 chars/token), so it always fits ctx. Advertised via Info; reads + // 0 before bootstrap completes since the value depends on llama-server's + // loaded config. + maxTextLength atomic.Int32 + + // bootstrapStatus is updated during self-bootstrap and read by Health + // when backend is still nil. nil before any update. + bootstrapStatus atomic.Pointer[bootstrapState] // requests counts Embed + EmbedBatch calls (post-entry, pre-return). // Exposed through HealthResponse.total_requests so clients can observe @@ -56,22 +82,48 @@ type Server struct { lastActivity atomic.Int64 } -// New returns a Server that delegates Embed/EmbedBatch to backend and fills -// Info metadata from the given version and modelIdentity. max_text_length is -// snapshotted from the backend's ctx-size here (see Server.maxTextLength). -func New(b *backend.LlamaBackend, version, modelIdentity string) *Server { +// New returns a Server with backend unset. Until SetBackend is called, +// Embed/EmbedBatch return FAILED_PRECONDITION and Health reports +// STATUS_LOADING + whatever phase the latest SetBootstrapStatus posted. +// modelIdentity is empty until SetBackend supplies it. +func New(version string) *Server { s := &Server{ - backend: b, - version: version, - modelIdentity: modelIdentity, - startedAt: time.Now(), - maxTextLength: int32(b.CtxSize()), - shutdownCh: make(chan struct{}), + version: version, + startedAt: time.Now(), + shutdownCh: make(chan struct{}), } + s.modelIdentity.Store("") s.lastActivity.Store(time.Now().UnixNano()) return s } +// SetBackend wires the backend and model identity after self-bootstrap +// completes. From this point on, Embed/EmbedBatch are accepted and +// Health reports STATUS_OK (or STATUS_DEGRADED if IsHealthy fails). Safe +// to call concurrently with in-flight RPCs — readers see a consistent +// transition because maxTextLength and modelIdentity are written before +// the backend pointer is published. +func (s *Server) SetBackend(b *backend.LlamaBackend, modelIdentity string) { + s.maxTextLength.Store(int32(b.CtxSize())) + s.modelIdentity.Store(modelIdentity) + s.backend.Store(b) +} + +// SetBootstrapStatus records the current self-bootstrap phase and download +// progress. The next Health RPC returns these fields when STATUS_LOADING. +// Callers (cmd/runed + bootstrap reporter) emit one update per phase +// transition and periodically during long downloads. bytesTotal == 0 +// means total size isn't yet known (e.g. before HTTP Content-Length is +// read); clients should render percent-complete only when total > 0. +func (s *Server) SetBootstrapStatus(phase runedv1.HealthResponse_Phase, bytesDone, bytesTotal int64, message string) { + s.bootstrapStatus.Store(&bootstrapState{ + phase: phase, + bytesDone: bytesDone, + bytesTotal: bytesTotal, + message: message, + }) +} + // ShutdownCh returns a channel that closes when a Shutdown RPC is received. // The daemon main() selects on this alongside OS signals to trigger graceful // termination; the channel is never sent on — only closed. @@ -151,23 +203,34 @@ const embedMaxAttempts = 2 // called with normalize=true as a harmless default since llama-server always // returns L2-normalized vectors anyway. // -// Backend may be suspended (idle-suspend) when this RPC arrives. EnsureStarted -// resurrects it under the daemon-lifetime context — the first request after a -// suspend pays the llama-server cold-start latency (~hundreds of ms to a few -// seconds for model load); subsequent requests fall through the cheap health- -// probe fast path. +// Returns FAILED_PRECONDITION when the backend hasn't been wired yet +// (self-bootstrap still in progress). codes.FailedPrecondition (not +// Unavailable) intentionally bypasses default-retry policies — bootstrap +// can take minutes, so short exponential backoffs would just exhaust +// retries pre-ready. Whether clients fast-fail or poll Health is the +// client's concern; the error message stays neutral on retry strategy. +// +// Once the backend is wired it may still be suspended by the idle- +// suspend ticker. EnsureStarted resurrects it under the daemon-lifetime +// context — the first request after a suspend pays llama-server cold- +// start latency (~hundreds of ms to a few seconds for model load); +// subsequent requests fall through the cheap health-probe fast path. // -// Retry loop: backend.Embed holds inflightMu.RLock so Stop can't kill an -// in-flight HTTP. The remaining race window is EnsureStarted-return → -// RLock-acquire; if Stop slips into that gap we get ErrNotStarted on the -// first attempt and recover by re-running EnsureStarted once. +// Retry loop: backend.Embed holds inflightMu.RLock so Stop can't kill +// an in-flight HTTP. The remaining race window is EnsureStarted-return +// → RLock-acquire; if Stop slips into that gap we get ErrNotStarted on +// the first attempt and recover by re-running EnsureStarted once. func (s *Server) Embed(ctx context.Context, req *runedv1.EmbedRequest) (*runedv1.EmbedResponse, error) { + b := s.backend.Load() + if b == nil { + return nil, status.Error(codes.FailedPrecondition, "daemon is bootstrapping; embed not yet available") + } s.requests.Add(1) for attempt := 0; attempt < embedMaxAttempts; attempt++ { - if err := s.backend.EnsureStarted(); err != nil { + if err := b.EnsureStarted(); err != nil { return nil, fmt.Errorf("backend not ready: %w", err) } - vec, err := s.backend.Embed(ctx, req.Text, true) + vec, err := b.Embed(ctx, req.Text, true) if err == nil { return &runedv1.EmbedResponse{Vector: vec}, nil } @@ -180,15 +243,20 @@ func (s *Server) Embed(ctx context.Context, req *runedv1.EmbedRequest) (*runedv1 // EmbedBatch delegates to the backend's batch path and wraps each vector in // an EmbedResponse so the proto response message stays composable with -// single-text Embed. See Embed godoc on EnsureStarted / cold-start behaviour -// and on the ErrNotStarted retry loop. +// single-text Embed. Returns FAILED_PRECONDITION when the backend hasn't +// been wired yet; see Embed godoc for the EnsureStarted / ErrNotStarted +// retry rationale. func (s *Server) EmbedBatch(ctx context.Context, req *runedv1.EmbedBatchRequest) (*runedv1.EmbedBatchResponse, error) { + b := s.backend.Load() + if b == nil { + return nil, status.Error(codes.FailedPrecondition, "daemon is bootstrapping; embed not yet available") + } s.requests.Add(1) for attempt := 0; attempt < embedMaxAttempts; attempt++ { - if err := s.backend.EnsureStarted(); err != nil { + if err := b.EnsureStarted(); err != nil { return nil, fmt.Errorf("backend not ready: %w", err) } - vecs, err := s.backend.EmbedBatch(ctx, req.Texts, true) + vecs, err := b.EmbedBatch(ctx, req.Texts, true) if err == nil { out := &runedv1.EmbedBatchResponse{ Embeddings: make([]*runedv1.EmbedResponse, len(vecs)), @@ -206,30 +274,64 @@ func (s *Server) EmbedBatch(ctx context.Context, req *runedv1.EmbedBatchRequest) } // Info returns static daemon metadata. Does not touch the backend — safe to -// call before Start() or during a DEGRADED state. +// call before SetBackend or during a DEGRADED state. MaxTextLength reads 0 +// before bootstrap completes since the value depends on llama-server's +// loaded ctx-size; clients should re-query Info after Health reports +// STATUS_OK if they need the final value. func (s *Server) Info(ctx context.Context, _ *runedv1.InfoRequest) (*runedv1.InfoResponse, error) { + mid, _ := s.modelIdentity.Load().(string) return &runedv1.InfoResponse{ DaemonVersion: s.version, - ModelIdentity: s.modelIdentity, + ModelIdentity: mid, VectorDim: vectorDim, - MaxTextLength: s.maxTextLength, + MaxTextLength: s.maxTextLength.Load(), MaxBatchSize: maxBatchSize, }, nil } -// Health maps backend readiness onto the proto Status enum. A nil backend or -// unhealthy probe yields DEGRADED; we never return an error from this RPC so -// clients can always read uptime as a liveness signal. +// Health maps backend readiness onto the proto Status enum: +// +// - shutdown signalled (Shutdown RPC / TriggerShutdown) → STATUS_SHUTTING_DOWN +// - backend not yet wired → STATUS_LOADING + +// Phase / bytes / message populated from the most recent +// SetBootstrapStatus +// - backend wired but unhealthy → STATUS_DEGRADED +// - backend wired and healthy → STATUS_OK +// +// SHUTTING_DOWN is checked first so a drain-in-progress daemon doesn't +// advertise itself as ready (callers that read OK during the GracefulStop +// race would otherwise send a request just to receive Unavailable). +// +// Never returns an error so clients can always read uptime as a liveness +// signal and treat any RPC success as proof the daemon process exists. func (s *Server) Health(ctx context.Context, _ *runedv1.HealthRequest) (*runedv1.HealthResponse, error) { - status := runedv1.HealthResponse_STATUS_OK - if s.backend == nil || !s.backend.IsHealthy(ctx) { - status = runedv1.HealthResponse_STATUS_DEGRADED - } - return &runedv1.HealthResponse{ - Status: status, + resp := &runedv1.HealthResponse{ UptimeSeconds: int64(time.Since(s.startedAt).Seconds()), TotalRequests: s.requests.Load(), - }, nil + } + select { + case <-s.shutdownCh: + resp.Status = runedv1.HealthResponse_STATUS_SHUTTING_DOWN + return resp, nil + default: + } + b := s.backend.Load() + if b == nil { + resp.Status = runedv1.HealthResponse_STATUS_LOADING + if bs := s.bootstrapStatus.Load(); bs != nil { + resp.Phase = bs.phase + resp.BytesDone = bs.bytesDone + resp.BytesTotal = bs.bytesTotal + resp.Message = bs.message + } + return resp, nil + } + if !b.IsHealthy(ctx) { + resp.Status = runedv1.HealthResponse_STATUS_DEGRADED + return resp, nil + } + resp.Status = runedv1.HealthResponse_STATUS_OK + return resp, nil } // Shutdown signals the daemon to begin graceful termination. It closes diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 42d7f2a..0643700 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -11,20 +11,30 @@ import ( runedv1 "github.com/CryptoLabInc/runed/gen/runed/v1" "github.com/CryptoLabInc/runed/internal/backend" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) // newInProcessServer wires a Server{} to a loopback listener and returns a // client plus a cleanup that shuts both ends down. It deliberately uses a // random port (":0") so multiple tests can run in parallel without port clash. +// +// b == nil leaves the server in its pre-SetBackend state (Health returns +// STATUS_LOADING; Embed/EmbedBatch return FAILED_PRECONDITION). Non-nil b +// is wired via SetBackend with a synthetic model identity. func newInProcessServer(t *testing.T, b *backend.LlamaBackend) (runedv1.RunedServiceClient, func()) { t.Helper() lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } + s := New("v0.1.0-test") + if b != nil { + s.SetBackend(b, "test-model-id") + } gs := grpc.NewServer() - runedv1.RegisterRunedServiceServer(gs, New(b, "v0.1.0-test", "test-model-id")) + runedv1.RegisterRunedServiceServer(gs, s) go gs.Serve(lis) conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -111,7 +121,7 @@ func TestServer_InfoMaxTextLengthTracksCtxSize(t *testing.T) { func TestServer_LastActivity_InitializedAtConstruction(t *testing.T) { before := time.Now() - s := New(backend.NewLlamaBackend(backend.Config{}), "vtest", "model-test") + s := New("vtest") after := time.Now() got := s.LastActivity() if got.Before(before) || got.After(after) { @@ -120,7 +130,7 @@ func TestServer_LastActivity_InitializedAtConstruction(t *testing.T) { } func TestServer_TriggerShutdown_Idempotent(t *testing.T) { - s := New(backend.NewLlamaBackend(backend.Config{}), "vtest", "model-test") + s := New("vtest") // Two concurrent TriggerShutdown calls must not panic (sync.Once). var wg sync.WaitGroup for i := 0; i < 4; i++ { @@ -140,7 +150,7 @@ func TestServer_TriggerShutdown_Idempotent(t *testing.T) { } func TestServer_UnaryActivityInterceptor_UpdatesLastActivity(t *testing.T) { - s := New(backend.NewLlamaBackend(backend.Config{}), "vtest", "model-test") + s := New("vtest") initial := s.LastActivity() time.Sleep(2 * time.Millisecond) // ensure new nanosecond bucket @@ -161,7 +171,7 @@ func TestServer_UnaryActivityInterceptor_UpdatesLastActivity(t *testing.T) { } } -// Before self-bootstrap provide progress, values should be zero +// Before self-bootstrap provides progress, values should be zero func TestServer_HealthBootstrapFieldsDefaultZero(t *testing.T) { b := backend.NewLlamaBackend(backend.Config{}) // not started — Health uses the unhealthy path client, cleanup := newInProcessServer(t, b) @@ -187,3 +197,109 @@ func TestServer_HealthBootstrapFieldsDefaultZero(t *testing.T) { t.Errorf("Message = %q, want empty", h.Message) } } + +// Without a backend wired, Health reports STATUS_LOADING. Phase/bytes are +// zero until SetBootstrapStatus is called. +func TestServer_HealthLoadingBeforeSetBackend(t *testing.T) { + client, cleanup := newInProcessServer(t, nil) + defer cleanup() + + h, err := client.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatal(err) + } + if h.Status != runedv1.HealthResponse_STATUS_LOADING { + t.Errorf("Status = %v, want STATUS_LOADING (backend not wired)", h.Status) + } +} + +// After SetBootstrapStatus, Health reflects the recorded phase, bytes, and +// message verbatim — the proto fields are wired directly from the snapshot. +func TestServer_HealthLoadingReflectsSetBootstrapStatus(t *testing.T) { + s := New("vtest") + s.SetBootstrapStatus( + runedv1.HealthResponse_PHASE_FETCHING_MODEL, + 123, 456, "downloading X") + + resp, err := s.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatalf("Health: %v", err) + } + if resp.Status != runedv1.HealthResponse_STATUS_LOADING { + t.Errorf("Status = %v, want STATUS_LOADING", resp.Status) + } + if resp.Phase != runedv1.HealthResponse_PHASE_FETCHING_MODEL { + t.Errorf("Phase = %v, want PHASE_FETCHING_MODEL", resp.Phase) + } + if resp.BytesDone != 123 || resp.BytesTotal != 456 { + t.Errorf("bytes: got done=%d total=%d, want 123/456", resp.BytesDone, resp.BytesTotal) + } + if resp.Message != "downloading X" { + t.Errorf("Message = %q, want %q", resp.Message, "downloading X") + } +} + +// Embed before SetBackend must surface FAILED_PRECONDITION (not Unavailable +// or generic Internal) so clients can branch on the code and switch to +// Health-polling instead of consuming retry budget against a daemon that +// can't yet answer. +func TestServer_EmbedFailsBeforeBackendSet(t *testing.T) { + s := New("vtest") + _, err := s.Embed(context.Background(), &runedv1.EmbedRequest{Text: "x"}) + if err == nil { + t.Fatal("expected error before SetBackend") + } + if got := status.Code(err); got != codes.FailedPrecondition { + t.Errorf("code = %v, want %v", got, codes.FailedPrecondition) + } +} + +func TestServer_EmbedBatchFailsBeforeBackendSet(t *testing.T) { + s := New("vtest") + _, err := s.EmbedBatch(context.Background(), &runedv1.EmbedBatchRequest{Texts: []string{"x"}}) + if err == nil { + t.Fatal("expected error before SetBackend") + } + if got := status.Code(err); got != codes.FailedPrecondition { + t.Errorf("code = %v, want %v", got, codes.FailedPrecondition) + } +} + +// After TriggerShutdown (or a Shutdown RPC), Health flips to +// STATUS_SHUTTING_DOWN regardless of backend state — the drain-in-progress +// signal outranks LOADING/DEGRADED/OK so callers don't send fresh work +// just to receive Unavailable mid-drain. +func TestServer_HealthShuttingDownAfterTrigger(t *testing.T) { + s := New("vtest") + s.TriggerShutdown() + + resp, err := s.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatalf("Health: %v", err) + } + if resp.Status != runedv1.HealthResponse_STATUS_SHUTTING_DOWN { + t.Errorf("Status = %v, want STATUS_SHUTTING_DOWN", resp.Status) + } +} + +// SHUTTING_DOWN must outrank LOADING — pinning this priority guards +// against future Health refactors accidentally reordering the checks and +// surfacing LOADING to clients during a drain (which would lead them to +// keep polling instead of disconnecting). +func TestServer_HealthShuttingDownOutranksLoading(t *testing.T) { + s := New("vtest") + // backend nil → LOADING candidate; bootstrap status would normally + // shape the LOADING response. + s.SetBootstrapStatus( + runedv1.HealthResponse_PHASE_FETCHING_MODEL, + 50, 100, "downloading model") + s.TriggerShutdown() + + resp, err := s.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatalf("Health: %v", err) + } + if resp.Status != runedv1.HealthResponse_STATUS_SHUTTING_DOWN { + t.Errorf("Status = %v, want STATUS_SHUTTING_DOWN (must outrank LOADING)", resp.Status) + } +} From d7295590eee548466b161cbce6beb7af015829bd Mon Sep 17 00:00:00 2001 From: couragehong Date: Thu, 28 May 2026 17:30:28 +0900 Subject: [PATCH 2/5] feat(bootstrap): StatusReporter callback for download progress MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Threads an optional reporter through EnsureAll / EnsureLlamaServer / EnsureModel so download-byte progress and per-stage transitions reach callers without coupling the bootstrap package to a specific status sink (e.g. server.SetBootstrapStatus). Callback shape: type StatusReporter func(stage string, bytesDone, bytesTotal int64) stage is "llama_server" or "model"; the caller maps that to whichever domain enum it cares about (cmd/runed routes to HealthResponse_Phase). Reporter calls run inline on the download goroutine and share the existing 2-second throttle inside makeProgress, so the status sink isn't flooded at full chunk cadence. Stage-transition ticks are emitted by the public entry points *before* AcquireLock so a trailer waiting on the install lock still surfaces the correct stage to clients during the lock-wait window. The internal ensure* helpers no longer emit their own ticks; under the new arrangement they would have produced duplicate transitions, and the EnsureAll path explicitly issues the llama-server → model transition between the two internal calls (still inside the same lock). Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/bootstrap/install.go | 88 ++++++++++++++++++++++++------ internal/bootstrap/install_test.go | 74 ++++++++++++++++++++++++- 2 files changed, 143 insertions(+), 19 deletions(-) diff --git a/internal/bootstrap/install.go b/internal/bootstrap/install.go index e49a959..d567273 100644 --- a/internal/bootstrap/install.go +++ b/internal/bootstrap/install.go @@ -42,6 +42,19 @@ var downloadRetryBackoff = 5 * time.Second const retryBackoffMultiplier = 3 +// StatusReporter is the optional progress sink wired through EnsureAll / +// EnsureLlamaServer / EnsureModel. It receives every throttled progress +// tick from the underlying downloads, tagged with the stage string +// ("llama_server" or "model") so callers can map back to a higher-level +// phase (proto Phase enum, UI label, etc.). bytesTotal is 0 when the +// total isn't yet known (no Content-Length header observed yet); render +// "indeterminate" rather than divide-by-zero in that case. +// +// nil is the documented "no reporter" value — bootstrap silently skips +// the callback then. Reporter calls run inline on the download goroutine +// and should return quickly; offload non-trivial work. +type StatusReporter func(stage string, bytesDone, bytesTotal int64) + // EnsureAll resolves the model variant and ensures both llama-server and // the model GGUF are present, downloading any missing pieces while // holding $RUNED_HOME/install.lock. Returns the absolute path to the @@ -54,9 +67,12 @@ const retryBackoffMultiplier = 3 // only to be discarded by the env override. // // logger may be nil; slog.Default() is used in that case. All progress -// and per-step status is emitted through this logger — callers don't -// need to thread a separate ProgressFunc. -func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (llamaBin, modelPath, variant string, err error) { +// and per-step status is emitted through this logger. +// +// reporter is the optional sink for download-byte progress (proto +// HealthResponse Phase/bytes_done/bytes_total in cmd/runed). nil is +// allowed — callers without a status sink can pass nil. +func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (llamaBin, modelPath, variant string, err error) { if logger == nil { logger = slog.Default() } @@ -71,17 +87,30 @@ func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) "variant", variant, "default_model", m.DefaultModel) + // Emit the llama-server stage tick before AcquireLock so a trailer + // waiting on the lock surfaces the correct Phase to clients (rather + // than freezing on whatever stage was set before EnsureAll was + // called). + if reporter != nil { + reporter("llama_server", 0, 0) + } lock, err := AcquireLock(p.InstallLock, InstallLockTimeout) if err != nil { return "", "", "", fmt.Errorf("install lock: %w", err) } defer lock.Release() - llamaBin, err = ensureLlamaServer(ctx, p, m, logger) + llamaBin, err = ensureLlamaServer(ctx, p, m, logger, reporter) if err != nil { return "", "", "", err } - modelPath, err = ensureModel(ctx, p, m, variant, logger) + // Transition to the model stage. We're still inside the install + // lock, so this can't happen at the public-API entry the way the + // initial llama-server tick does. + if reporter != nil { + reporter("model", 0, 0) + } + modelPath, err = ensureModel(ctx, p, m, variant, logger, reporter) if err != nil { return "", "", "", err } @@ -97,19 +126,27 @@ func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) // LlamaServerForCurrentPlatform is consulted unconditionally here, which // is fine because the caller explicitly asked for the manifest's // llama-server. -func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (string, error) { +// +// reporter is the optional sink for download-byte progress; pass nil if +// no status sink is wired. +func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (string, error) { if logger == nil { logger = slog.Default() } if err := p.EnsureDirs(); err != nil { return "", err } + // Emit stage tick before AcquireLock so a lock-waiting trailer + // surfaces the correct Phase to clients during the wait window. + if reporter != nil { + reporter("llama_server", 0, 0) + } lock, err := AcquireLock(p.InstallLock, InstallLockTimeout) if err != nil { return "", fmt.Errorf("install lock: %w", err) } defer lock.Release() - return ensureLlamaServer(ctx, p, m, logger) + return ensureLlamaServer(ctx, p, m, logger, reporter) } // EnsureModel ensures only the model GGUF is present and returns the @@ -120,7 +157,10 @@ func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog. // LlamaServerForCurrentPlatform is not called here, so a caller on a // platform missing from the manifest can still bootstrap a model as // long as their RUNED_LLAMA_SERVER points at a working binary. -func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (modelPath, variant string, err error) { +// +// reporter is the optional sink for download-byte progress; pass nil if +// no status sink is wired. +func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (modelPath, variant string, err error) { if logger == nil { logger = slog.Default() } @@ -135,13 +175,17 @@ func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger "variant", variant, "default_model", m.DefaultModel) + // Emit stage tick before AcquireLock — see EnsureAll/EnsureLlamaServer. + if reporter != nil { + reporter("model", 0, 0) + } lock, err := AcquireLock(p.InstallLock, InstallLockTimeout) if err != nil { return "", "", fmt.Errorf("install lock: %w", err) } defer lock.Release() - modelPath, err = ensureModel(ctx, p, m, variant, logger) + modelPath, err = ensureModel(ctx, p, m, variant, logger, reporter) if err != nil { return "", "", err } @@ -186,10 +230,13 @@ func downloadWithRetry(ctx context.Context, url, sha string, size int64, dest st } // makeProgress returns a throttled ProgressFunc that logs at most one -// line per progressLogInterval, plus a final 100% line on completion. -// total ≤ 0 means Content-Length wasn't advertised; the function falls -// back to byte-count-only output. -func makeProgress(logger *slog.Logger, stage string, expectedTotal int64) ProgressFunc { +// line per progressLogInterval (plus a final 100% line on completion) +// and, when reporter != nil, forwards the same throttled tick to the +// reporter so an external status sink (server.SetBootstrapStatus etc.) +// shares the cadence. total ≤ 0 means Content-Length wasn't advertised; +// the function falls back to byte-count-only output and forwards +// total=0 to the reporter. +func makeProgress(logger *slog.Logger, reporter StatusReporter, stage string, expectedTotal int64) ProgressFunc { var lastReport time.Time return func(downloaded, observedTotal int64) { total := expectedTotal @@ -207,6 +254,9 @@ func makeProgress(logger *slog.Logger, stage string, expectedTotal int64) Progre attrs = append(attrs, "total", total, "pct", fmt.Sprintf("%.1f%%", pct)) } logger.Info("download progress", attrs...) + if reporter != nil { + reporter(stage, downloaded, total) + } } } @@ -234,7 +284,9 @@ func ResolveModelVariant(p *Paths, m *Manifest) (string, error) { // extracting or downloading the artifact as needed. A sidecar marker // file (.llama_server.sha256) tracks the last-installed tarball hash so // repeat boots don't re-extract. -func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (string, error) { +func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (string, error) { + // Stage-transition tick is emitted by the public Ensure* entry + // points before AcquireLock, so we don't re-emit here. spec, err := m.LlamaServerForCurrentPlatform() if err != nil { return "", err @@ -259,7 +311,7 @@ func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog. "target", target) } - progress := makeProgress(logger, "llama_server", spec.Size) + progress := makeProgress(logger, reporter, "llama_server", spec.Size) switch spec.Extract { case "": if err := os.MkdirAll(filepath.Dir(target), 0o700); err != nil { @@ -314,7 +366,9 @@ func llamaServerTarget(p *Paths, spec *LlamaServerSpec) string { return filepath.Join(p.LlamaDir, filepath.FromSlash(exec)) } -func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, logger *slog.Logger) (string, error) { +func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, logger *slog.Logger, reporter StatusReporter) (string, error) { + // Stage-transition tick is emitted by the caller (EnsureModel or + // EnsureAll) before invoking this function, so we don't re-emit here. spec, err := m.ModelSpec(variant) if err != nil { return "", err @@ -338,7 +392,7 @@ func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, log return "", err } logger.Info("ensure model: downloading GGUF", "url", spec.URL) - progress := makeProgress(logger, "model", spec.Size) + progress := makeProgress(logger, reporter, "model", spec.Size) if err := downloadWithRetry(ctx, spec.URL, spec.SHA256, spec.Size, target, progress, logger, "model"); err != nil { return "", fmt.Errorf("ensure model: download: %w", err) } diff --git a/internal/bootstrap/install_test.go b/internal/bootstrap/install_test.go index f3cabfd..2aaed2d 100644 --- a/internal/bootstrap/install_test.go +++ b/internal/bootstrap/install_test.go @@ -178,7 +178,7 @@ func TestEnsureLlamaServer_PlatformMissingFails(t *testing.T) { Models: map[string]ArtifactSpec{}, } - if _, err := EnsureLlamaServer(t.Context(), p, m, slog.Default()); !errors.Is(err, ErrNoArtifactForPlatform) { + if _, err := EnsureLlamaServer(t.Context(), p, m, slog.Default(), nil); !errors.Is(err, ErrNoArtifactForPlatform) { t.Fatalf("expected ErrNoArtifactForPlatform, got: %v", err) } } @@ -220,7 +220,7 @@ func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { DefaultModel: variant, } - got, gotVariant, err := EnsureModel(t.Context(), p, m, slog.Default()) + got, gotVariant, err := EnsureModel(t.Context(), p, m, slog.Default(), nil) if err != nil { t.Fatalf("EnsureModel must succeed when no llama-server entry is present: %v", err) } @@ -231,3 +231,73 @@ func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { t.Errorf("got variant %q, want %q", gotVariant, variant) } } + +// Reporter must receive an entry-stage tick from each Ensure* function so a +// Health-surface caller can flip Phase the moment bootstrap enters that +// stage, even when the cache-hit short-circuit (or an early error) means no +// byte ticks follow. +func TestEnsureLlamaServer_ReporterReceivesStageTickOnEntry(t *testing.T) { + dir := t.TempDir() + t.Setenv(EnvHome, dir) + p, _ := Resolve() + + // platforms empty → LlamaServerForCurrentPlatform errors after the + // reporter's entry tick. + m := &Manifest{ + Version: 1, + Platforms: map[string]PlatformArtifacts{}, + Models: map[string]ArtifactSpec{}, + } + + var stages []string + reporter := func(stage string, _, _ int64) { + stages = append(stages, stage) + } + _, _ = EnsureLlamaServer(t.Context(), p, m, slog.Default(), reporter) + if len(stages) != 1 || stages[0] != "llama_server" { + t.Errorf("stages = %v, want [llama_server]", stages) + } +} + +func TestEnsureModel_ReporterReceivesStageTickOnEntry(t *testing.T) { + dir := t.TempDir() + t.Setenv(EnvHome, dir) + t.Setenv(EnvModelVariant, "") + p, _ := Resolve() + if err := p.EnsureDirs(); err != nil { + t.Fatalf("EnsureDirs: %v", err) + } + + // Pre-populate model so cache-hit returns without any download path + // running. Only the entry tick should reach the reporter. + variant := "test-variant" + body := []byte("model-content") + sum := sha256.Sum256(body) + wantSHA := hex.EncodeToString(sum[:]) + modelPath := p.ModelPath(variant) + if err := os.MkdirAll(filepath.Dir(modelPath), 0o700); err != nil { + t.Fatalf("mkdir models: %v", err) + } + if err := os.WriteFile(modelPath, body, 0o600); err != nil { + t.Fatalf("write model: %v", err) + } + + m := &Manifest{ + Version: 1, + Models: map[string]ArtifactSpec{ + variant: {URL: "https://ignored", SHA256: wantSHA, Size: int64(len(body))}, + }, + DefaultModel: variant, + } + + var stages []string + reporter := func(stage string, _, _ int64) { + stages = append(stages, stage) + } + if _, _, err := EnsureModel(t.Context(), p, m, slog.Default(), reporter); err != nil { + t.Fatalf("EnsureModel: %v", err) + } + if len(stages) != 1 || stages[0] != "model" { + t.Errorf("stages = %v, want [model]", stages) + } +} From f01512aa064d0e37a6c2a1ef936524fcc703a7eb Mon Sep 17 00:00:00 2001 From: couragehong Date: Thu, 28 May 2026 17:30:47 +0900 Subject: [PATCH 3/5] feat(runed): listen before bootstrap + surface progress via Health RPC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gRPC UDS used to open *after* self-bootstrap completed, so clients connecting during the multi-minute install window only saw dial failures. They now connect immediately and observe STATUS_LOADING with the current Phase + bytes_done / bytes_total / message — exactly what the proto envisioned when these fields were originally defined. Flow rewrite: paths → daemon-check → server.New (backend unset) → ipc.Listen → grpc.Serve [bg] → SetBootstrapStatus(UNSPECIFIED, "fetching manifest") → selfBootstrap with reporter (Phase flips per stage tick) → SetBootstrapStatus(STARTING_LLAMA_SERVER) → backend.Start → srv.SetBackend(b, modelID) ← Health flips to OK → idle ticker / signal wait / drain reporter is a closure that maps each bootstrap stage to its proto Phase + message via stagePhase() and forwards to srv.SetBootstrapStatus. The proto omits PHASE_FETCHING_MANIFEST, so the manifest-fetch interval reports PHASE_UNSPECIFIED with a "fetching manifest" message — clients that surface message render correctly without depending on enum recognition for that brief stage. A new bailBoot(logger, srv, gs, b) helper centralises early-failure cleanup: any boot-time error (selfBootstrap, sha256File, backend.Start, parseIdleTimeout) drives the same TriggerShutdown + GracefulStop + best-effort b.Stop sequence. Clients see one final STATUS_SHUTTING_DOWN before the listener closes instead of an abrupt connection drop, which matches the experience on the normal exit path. main's exit-select also calls srv.TriggerShutdown() unconditionally so OS-signal and serve-error exits flip Health to SHUTTING_DOWN too (sync.Once makes a follow-up Shutdown RPC a no-op). backend.Start failure additionally calls b.Stop — b.Start may have spawned a child that failed health-probe, leaving an orphan llama- server holding ~470MB; b.Stop is idempotent on never-spawned backends. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/runed/main.go | 168 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 132 insertions(+), 36 deletions(-) diff --git a/cmd/runed/main.go b/cmd/runed/main.go index 1a4c3f1..1a10b22 100644 --- a/cmd/runed/main.go +++ b/cmd/runed/main.go @@ -8,6 +8,11 @@ // match. To bypass self-bootstrap entirely, set both RUNED_LLAMA_SERVER // and RUNED_MODEL to absolute paths. // +// The gRPC UDS opens before self-bootstrap so clients can dial immediately +// and poll Health for STATUS_LOADING + Phase/bytes progress instead of +// seeing dial failures during the multi-minute install window. Embed and +// EmbedBatch return FAILED_PRECONDITION until the backend is wired. +// // Optional environment: // // RUNED_HOME Data directory (default: $HOME/.runed). @@ -99,11 +104,64 @@ func run() error { } } + // ctx-size: RUNED_CTX_SIZE if valid, else backend default (2048). + ctxSize := 0 + if v := os.Getenv("RUNED_CTX_SIZE"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + ctxSize = n + } else { + logger.Warn("invalid RUNED_CTX_SIZE, using default", "value", v) + } + } + + // Build the server and start serving the UDS BEFORE self-bootstrap + + // backend.Start so clients can dial immediately and poll Health for + // STATUS_LOADING / Phase / progress instead of seeing dial failures + // during the multi-minute install window. SetBackend wires the + // backend at the end; until then Embed/EmbedBatch return + // FAILED_PRECONDITION. + srv := server.New(daemonVersion) + lis, err := ipc.Listen(sockPath) + if err != nil { + return fmt.Errorf("ipc listen: %w", err) + } + logger.Info("listening", "socket", sockPath) + + gs := grpc.NewServer(grpc.UnaryInterceptor(srv.UnaryActivityInterceptor())) + runedv1.RegisterRunedServiceServer(gs, srv) + + // Serve in a goroutine so main can block on signals/Shutdown/serve error. + // gs.Serve returns nil on graceful stop; any non-nil err is a real fault. + serveErr := make(chan error, 1) + go func() { + if err := gs.Serve(lis); err != nil { + serveErr <- err + } + close(serveErr) + }() + + // Forward bootstrap download progress into the Health surface. Each + // stage name maps to a proto Phase value; bytes flow through with + // matching messages so clients render percent-complete from one tuple. + reporter := bootstrap.StatusReporter(func(stage string, done, total int64) { + phase, msg := stagePhase(stage) + srv.SetBootstrapStatus(phase, done, total, msg) + }) + llamaBin := os.Getenv("RUNED_LLAMA_SERVER") model := os.Getenv("RUNED_MODEL") if llamaBin == "" || model == "" { - bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "") + // Pre-set status before the reporter's first tick. Manifest fetch + // has no dedicated Phase enum (proto omits PHASE_FETCHING_MANIFEST + // to avoid coupling enum churn to wire-format changes), so we + // leave Phase = UNSPECIFIED and convey the stage through the + // message field — clients that surface message render correctly + // without depending on enum recognition. selfBootstrap overwrites + // Phase once ensure{LlamaServer,Model} enters its own stage. + srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_UNSPECIFIED, 0, 0, "fetching manifest") + bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "", reporter) if err != nil { + bailBoot(logger, srv, gs, nil) return err } if llamaBin == "" { @@ -117,18 +175,11 @@ func run() error { "llama_server", llamaBin, "model", model) } - // ctx-size: RUNED_CTX_SIZE if valid, else backend default (2048). - ctxSize := 0 - if v := os.Getenv("RUNED_CTX_SIZE"); v != "" { - if n, err := strconv.Atoi(v); err == nil && n > 0 { - ctxSize = n - } else { - logger.Warn("invalid RUNED_CTX_SIZE, using default", "value", v) - } - } + srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_STARTING_LLAMA_SERVER, 0, 0, "starting llama-server") modelID, err := sha256File(model) if err != nil { + bailBoot(logger, srv, gs, nil) return fmt.Errorf("model hash: %w", err) } logger.Info("model identity", "sha256", modelID, "path", model) @@ -146,35 +197,21 @@ func run() error { // backend.Start already bounds start-up on its own (~15s health poll + // early-exit detection via the stderr scanner). if err := b.Start(ctx); err != nil { + // b.Start may have spawned a child that failed health-probe, + // leaving an orphan llama-server holding ~470MB. bailBoot reaps + // it via b.Stop (idempotent on never-spawned backends). + bailBoot(logger, srv, gs, b) return fmt.Errorf("backend start: %w", err) } logger.Info("llama-server ready", "port", b.Port()) - lis, err := ipc.Listen(sockPath) - if err != nil { - // Best-effort backend cleanup — we have no logger-side context to thread here. - _ = b.Stop(context.Background()) - return fmt.Errorf("ipc listen: %w", err) - } - logger.Info("listening", "socket", sockPath) - - srv := server.New(b, daemonVersion, modelID) - gs := grpc.NewServer(grpc.UnaryInterceptor(srv.UnaryActivityInterceptor())) - runedv1.RegisterRunedServiceServer(gs, srv) - - // Serve in a goroutine so main can block on signals/Shutdown/serve error. - // gs.Serve returns nil on graceful stop; any non-nil err is a real fault. - serveErr := make(chan error, 1) - go func() { - if err := gs.Serve(lis); err != nil { - serveErr <- err - } - close(serveErr) - }() + // Backend up — wire it into the server. From here on Health reports + // STATUS_OK and Embed/EmbedBatch accept work. + srv.SetBackend(b, modelID) idleTimeout, err := parseIdleTimeout() if err != nil { - _ = b.Stop(context.Background()) + bailBoot(logger, srv, gs, b) return fmt.Errorf("RUNED_IDLE_TIMEOUT: %w", err) } if idleTimeout > 0 { @@ -236,6 +273,12 @@ func run() error { } } + // Ensure shutdownCh is closed even when the trigger was an OS signal or + // a serve error — Health then advertises STATUS_SHUTTING_DOWN during + // the drain window. sync.Once makes a second call (after a Shutdown + // RPC) a no-op. + srv.TriggerShutdown() + // Phase 1: drain in-flight RPCs. GracefulStop blocks until all active // handlers return; 10s is a safety net for a wedged client. logger.Info("draining in-flight requests") @@ -271,7 +314,10 @@ func run() error { // Only those sides are downloaded — the env-overridden side is skipped // so its manifest artifact isn't fetched only to be discarded. When // both are needed, EnsureAll's single-lock fast path is used. -func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool) (binPath, modelPath string, err error) { +// +// reporter is the optional status sink for download-byte progress (see +// bootstrap.StatusReporter). Pass nil when no sink is wired. +func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool, reporter bootstrap.StatusReporter) (binPath, modelPath string, err error) { manifestURL := bootstrap.ResolveManifestURL() if manifestURL == "" { return "", "", fmt.Errorf( @@ -298,7 +344,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa switch { case needLlama && needModel: var variant string - binPath, modelPath, variant, err = bootstrap.EnsureAll(ctx, paths, mani, logger) + binPath, modelPath, variant, err = bootstrap.EnsureAll(ctx, paths, mani, logger, reporter) if err != nil { return "", "", fmt.Errorf("bootstrap install: %w", err) } @@ -307,7 +353,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa "model", modelPath, "variant", variant) case needLlama: - binPath, err = bootstrap.EnsureLlamaServer(ctx, paths, mani, logger) + binPath, err = bootstrap.EnsureLlamaServer(ctx, paths, mani, logger, reporter) if err != nil { return "", "", fmt.Errorf("bootstrap install: %w", err) } @@ -315,7 +361,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa "llama_server", binPath) case needModel: var variant string - modelPath, variant, err = bootstrap.EnsureModel(ctx, paths, mani, logger) + modelPath, variant, err = bootstrap.EnsureModel(ctx, paths, mani, logger, reporter) if err != nil { return "", "", fmt.Errorf("bootstrap install: %w", err) } @@ -328,6 +374,56 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa return binPath, modelPath, nil } +// stagePhase maps a bootstrap.StatusReporter stage name to the proto +// HealthResponse_Phase value and a short user-facing message. Returning +// PHASE_UNSPECIFIED on unknown stages keeps the surface forward-compatible +// with future bootstrap stages. +func stagePhase(stage string) (runedv1.HealthResponse_Phase, string) { + switch stage { + case "llama_server": + return runedv1.HealthResponse_PHASE_FETCHING_LLAMA_SERVER, "fetching llama-server" + case "model": + return runedv1.HealthResponse_PHASE_FETCHING_MODEL, "fetching embedding model" + default: + return runedv1.HealthResponse_PHASE_UNSPECIFIED, "" + } +} + +// bailBoot drives the same graceful-shutdown sequence as the normal exit +// path so an early failure during boot still flips Health to +// STATUS_SHUTTING_DOWN, drains in-flight RPCs, and reaps the backend if +// it managed to spawn. b may be nil when boot fails before +// backend.NewLlamaBackend ran; b.Stop is idempotent on never-spawned +// backends and on backends that already returned, so a non-nil b is +// always safe to pass. +func bailBoot(logger *slog.Logger, srv *server.Server, gs *grpc.Server, b *backend.LlamaBackend) { + srv.TriggerShutdown() + stopGRPC(logger, gs) + if b != nil { + _ = b.Stop(context.Background()) + } +} + +// stopGRPC drives a 10s-bounded GracefulStop and falls back to Stop on +// timeout. Used by the early-fail paths in run() (bootstrap / backend +// startup failure) where the listener is already up but no backend has +// been wired — in-flight RPCs are at most Health/Info, so the grace +// window is effectively a courtesy. +func stopGRPC(logger *slog.Logger, gs *grpc.Server) { + graceDone := make(chan struct{}) + go func() { + gs.GracefulStop() + close(graceDone) + }() + select { + case <-graceDone: + case <-time.After(10 * time.Second): + logger.Warn("graceful stop timed out, forcing") + gs.Stop() + <-graceDone + } +} + // anotherDaemonReachable returns true if a UDS listener accepts our dial // at sockPath within 500ms. False covers both "file missing" and "file // exists but nobody listening" (the latter is a stale-socket case the From f5bcb673bc208f814d17e679c3556497534a3535 Mon Sep 17 00:00:00 2001 From: couragehong Date: Thu, 28 May 2026 17:53:15 +0900 Subject: [PATCH 4/5] chore(comments): trim verbose docs to WHY-only The three preceding commits accumulated a lot of explanatory prose that restates what well-named identifiers already convey. This pass prunes WHAT-style narration, "added for X" meta notes, and historical context, keeping only the lines that capture a non-obvious WHY: publish order in SetBackend, the FAILED_PRECONDITION-vs-Unavailable rationale in Embed, the SHUTTING_DOWN-outranks-LOADING priority in Health, the chars==tokens conservativism in maxTextLength, the trailer-wait reason for emitting stage ticks before AcquireLock, etc. No behaviour change; ~145 lines net removed across server / bootstrap / runed plus their tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/runed/main.go | 85 +++++++------------ internal/bootstrap/install.go | 95 ++++++---------------- internal/bootstrap/install_test.go | 14 +--- internal/server/server.go | 126 +++++++++-------------------- internal/server/server_test.go | 30 ++----- 5 files changed, 103 insertions(+), 247 deletions(-) diff --git a/cmd/runed/main.go b/cmd/runed/main.go index 1a10b22..334d9be 100644 --- a/cmd/runed/main.go +++ b/cmd/runed/main.go @@ -114,12 +114,10 @@ func run() error { } } - // Build the server and start serving the UDS BEFORE self-bootstrap + - // backend.Start so clients can dial immediately and poll Health for - // STATUS_LOADING / Phase / progress instead of seeing dial failures - // during the multi-minute install window. SetBackend wires the - // backend at the end; until then Embed/EmbedBatch return - // FAILED_PRECONDITION. + // Listen before self-bootstrap so clients can poll Health for + // STATUS_LOADING + Phase/progress instead of dial failures during + // the multi-minute install window. SetBackend below flips Health + // to STATUS_OK once llama-server is up. srv := server.New(daemonVersion) lis, err := ipc.Listen(sockPath) if err != nil { @@ -140,9 +138,7 @@ func run() error { close(serveErr) }() - // Forward bootstrap download progress into the Health surface. Each - // stage name maps to a proto Phase value; bytes flow through with - // matching messages so clients render percent-complete from one tuple. + // Forward bootstrap progress into Health via stage→Phase mapping. reporter := bootstrap.StatusReporter(func(stage string, done, total int64) { phase, msg := stagePhase(stage) srv.SetBootstrapStatus(phase, done, total, msg) @@ -151,13 +147,9 @@ func run() error { llamaBin := os.Getenv("RUNED_LLAMA_SERVER") model := os.Getenv("RUNED_MODEL") if llamaBin == "" || model == "" { - // Pre-set status before the reporter's first tick. Manifest fetch - // has no dedicated Phase enum (proto omits PHASE_FETCHING_MANIFEST - // to avoid coupling enum churn to wire-format changes), so we - // leave Phase = UNSPECIFIED and convey the stage through the - // message field — clients that surface message render correctly - // without depending on enum recognition. selfBootstrap overwrites - // Phase once ensure{LlamaServer,Model} enters its own stage. + // Manifest fetch has no dedicated Phase enum; use UNSPECIFIED and + // convey the stage through message. selfBootstrap overwrites + // Phase on entering ensure{LlamaServer,Model}. srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_UNSPECIFIED, 0, 0, "fetching manifest") bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "", reporter) if err != nil { @@ -197,16 +189,14 @@ func run() error { // backend.Start already bounds start-up on its own (~15s health poll + // early-exit detection via the stderr scanner). if err := b.Start(ctx); err != nil { - // b.Start may have spawned a child that failed health-probe, - // leaving an orphan llama-server holding ~470MB. bailBoot reaps - // it via b.Stop (idempotent on never-spawned backends). + // b.Start may have spawned a child that failed health-probe; + // bailBoot's b.Stop reaps it. bailBoot(logger, srv, gs, b) return fmt.Errorf("backend start: %w", err) } logger.Info("llama-server ready", "port", b.Port()) - // Backend up — wire it into the server. From here on Health reports - // STATUS_OK and Embed/EmbedBatch accept work. + // Flip Health to STATUS_OK and accept Embed/EmbedBatch. srv.SetBackend(b, modelID) idleTimeout, err := parseIdleTimeout() @@ -273,10 +263,9 @@ func run() error { } } - // Ensure shutdownCh is closed even when the trigger was an OS signal or - // a serve error — Health then advertises STATUS_SHUTTING_DOWN during - // the drain window. sync.Once makes a second call (after a Shutdown - // RPC) a no-op. + // Flip Health to STATUS_SHUTTING_DOWN for all exit triggers (OS + // signal / serve error / Shutdown RPC). sync.Once makes a redundant + // call a no-op. srv.TriggerShutdown() // Phase 1: drain in-flight RPCs. GracefulStop blocks until all active @@ -305,18 +294,11 @@ func run() error { return nil } -// selfBootstrap fetches the manifest and ensures llama-server and the -// resolved model variant are present under paths. Called whenever -// RUNED_LLAMA_SERVER or RUNED_MODEL is unset; bypassed entirely when -// both are provided. -// -// needLlama / needModel reflect which side(s) the caller still needs. -// Only those sides are downloaded — the env-overridden side is skipped -// so its manifest artifact isn't fetched only to be discarded. When -// both are needed, EnsureAll's single-lock fast path is used. -// -// reporter is the optional status sink for download-byte progress (see -// bootstrap.StatusReporter). Pass nil when no sink is wired. +// selfBootstrap fetches the manifest and installs the side(s) the +// caller needs. needLlama / needModel reflect which env paths are +// unset; only those sides are downloaded so an env override doesn't +// pay for an artifact it will discard. Both needed → EnsureAll's +// single-lock fast path. func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool, reporter bootstrap.StatusReporter) (binPath, modelPath string, err error) { manifestURL := bootstrap.ResolveManifestURL() if manifestURL == "" { @@ -324,10 +306,8 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa "self-bootstrap needed (RUNED_LLAMA_SERVER or RUNED_MODEL unset) but no manifest URL: set %s or rebuild with DEFAULT_MANIFEST_URL", bootstrap.EnvManifest) } - // HTTPS isn't enforced because a private network may legitimately serve - // the manifest over plain HTTP. But callers should know: a MITM that - // rewrites the manifest can also rewrite the SHA256s, so artifact - // integrity collapses to "trust the manifest channel." + // A MITM that rewrites the manifest can also rewrite the SHA256s, + // so artifact integrity collapses to "trust the manifest channel." if !strings.HasPrefix(manifestURL, "https://") { logger.Warn("manifest URL is not HTTPS; artifact integrity rests on manifest channel trust", "url", manifestURL) @@ -374,10 +354,9 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa return binPath, modelPath, nil } -// stagePhase maps a bootstrap.StatusReporter stage name to the proto -// HealthResponse_Phase value and a short user-facing message. Returning -// PHASE_UNSPECIFIED on unknown stages keeps the surface forward-compatible -// with future bootstrap stages. +// stagePhase maps a bootstrap stage name to its proto Phase + message. +// PHASE_UNSPECIFIED on unknown stages keeps the surface forward- +// compatible with future stages. func stagePhase(stage string) (runedv1.HealthResponse_Phase, string) { switch stage { case "llama_server": @@ -389,13 +368,9 @@ func stagePhase(stage string) (runedv1.HealthResponse_Phase, string) { } } -// bailBoot drives the same graceful-shutdown sequence as the normal exit -// path so an early failure during boot still flips Health to -// STATUS_SHUTTING_DOWN, drains in-flight RPCs, and reaps the backend if -// it managed to spawn. b may be nil when boot fails before -// backend.NewLlamaBackend ran; b.Stop is idempotent on never-spawned -// backends and on backends that already returned, so a non-nil b is -// always safe to pass. +// bailBoot runs the normal-exit cleanup sequence for boot-time failures +// so clients see one final STATUS_SHUTTING_DOWN instead of a sudden +// disconnect. b may be nil; b.Stop is idempotent. func bailBoot(logger *slog.Logger, srv *server.Server, gs *grpc.Server, b *backend.LlamaBackend) { srv.TriggerShutdown() stopGRPC(logger, gs) @@ -404,11 +379,7 @@ func bailBoot(logger *slog.Logger, srv *server.Server, gs *grpc.Server, b *backe } } -// stopGRPC drives a 10s-bounded GracefulStop and falls back to Stop on -// timeout. Used by the early-fail paths in run() (bootstrap / backend -// startup failure) where the listener is already up but no backend has -// been wired — in-flight RPCs are at most Health/Info, so the grace -// window is effectively a courtesy. +// stopGRPC drives a 10s-bounded GracefulStop with force-Stop fallback. func stopGRPC(logger *slog.Logger, gs *grpc.Server) { graceDone := make(chan struct{}) go func() { diff --git a/internal/bootstrap/install.go b/internal/bootstrap/install.go index d567273..c1f7e8f 100644 --- a/internal/bootstrap/install.go +++ b/internal/bootstrap/install.go @@ -42,36 +42,20 @@ var downloadRetryBackoff = 5 * time.Second const retryBackoffMultiplier = 3 -// StatusReporter is the optional progress sink wired through EnsureAll / -// EnsureLlamaServer / EnsureModel. It receives every throttled progress -// tick from the underlying downloads, tagged with the stage string -// ("llama_server" or "model") so callers can map back to a higher-level -// phase (proto Phase enum, UI label, etc.). bytesTotal is 0 when the -// total isn't yet known (no Content-Length header observed yet); render -// "indeterminate" rather than divide-by-zero in that case. -// -// nil is the documented "no reporter" value — bootstrap silently skips -// the callback then. Reporter calls run inline on the download goroutine -// and should return quickly; offload non-trivial work. +// StatusReporter receives throttled progress ticks from Ensure* tagged +// with stage ("llama_server" or "model"). bytesTotal == 0 means the +// total isn't yet known (no Content-Length observed). nil is a valid +// "no sink" value; reporter calls run inline on the download goroutine +// and should return quickly. type StatusReporter func(stage string, bytesDone, bytesTotal int64) -// EnsureAll resolves the model variant and ensures both llama-server and -// the model GGUF are present, downloading any missing pieces while -// holding $RUNED_HOME/install.lock. Returns the absolute path to the -// llama-server executable and the GGUF file the daemon should load. -// -// This is the normal-path entry point: both RUNED_LLAMA_SERVER and -// RUNED_MODEL unset → manifest-driven install of everything. When only -// one env var is set, callers should use EnsureLlamaServer or -// EnsureModel directly so the side they already have isn't redownloaded -// only to be discarded by the env override. +// EnsureAll ensures both llama-server and the model GGUF are installed +// under a single lock acquisition. Use when both RUNED_LLAMA_SERVER and +// RUNED_MODEL are unset; for the partial-set case, call +// EnsureLlamaServer / EnsureModel individually so the env-overridden +// side isn't redownloaded only to be discarded. // -// logger may be nil; slog.Default() is used in that case. All progress -// and per-step status is emitted through this logger. -// -// reporter is the optional sink for download-byte progress (proto -// HealthResponse Phase/bytes_done/bytes_total in cmd/runed). nil is -// allowed — callers without a status sink can pass nil. +// logger may be nil (slog.Default used). reporter may be nil. func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (llamaBin, modelPath, variant string, err error) { if logger == nil { logger = slog.Default() @@ -87,10 +71,8 @@ func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, "variant", variant, "default_model", m.DefaultModel) - // Emit the llama-server stage tick before AcquireLock so a trailer - // waiting on the lock surfaces the correct Phase to clients (rather - // than freezing on whatever stage was set before EnsureAll was - // called). + // Stage tick before AcquireLock so a lock-waiting trailer surfaces + // the right Phase while it waits. if reporter != nil { reporter("llama_server", 0, 0) } @@ -104,9 +86,6 @@ func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, if err != nil { return "", "", "", err } - // Transition to the model stage. We're still inside the install - // lock, so this can't happen at the public-API entry the way the - // initial llama-server tick does. if reporter != nil { reporter("model", 0, 0) } @@ -117,18 +96,9 @@ func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, return llamaBin, modelPath, variant, nil } -// EnsureLlamaServer ensures only the llama-server binary is present and -// returns its absolute path. Use when the caller already has a model -// path (e.g. RUNED_MODEL is set) and only the llama-server side needs -// the manifest install. -// -// The manifest must include an entry for the current platform — -// LlamaServerForCurrentPlatform is consulted unconditionally here, which -// is fine because the caller explicitly asked for the manifest's -// llama-server. -// -// reporter is the optional sink for download-byte progress; pass nil if -// no status sink is wired. +// EnsureLlamaServer ensures the llama-server binary is installed. The +// manifest must have an entry for the current platform (this is the +// caller's intent — they explicitly asked for the manifest's binary). func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (string, error) { if logger == nil { logger = slog.Default() @@ -136,8 +106,6 @@ func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog. if err := p.EnsureDirs(); err != nil { return "", err } - // Emit stage tick before AcquireLock so a lock-waiting trailer - // surfaces the correct Phase to clients during the wait window. if reporter != nil { reporter("llama_server", 0, 0) } @@ -149,17 +117,10 @@ func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog. return ensureLlamaServer(ctx, p, m, logger, reporter) } -// EnsureModel ensures only the model GGUF is present and returns the -// absolute path plus the resolved variant ID. Use when the caller -// already has a llama-server path (e.g. RUNED_LLAMA_SERVER is set) and -// only the model side needs the manifest install. -// -// LlamaServerForCurrentPlatform is not called here, so a caller on a -// platform missing from the manifest can still bootstrap a model as -// long as their RUNED_LLAMA_SERVER points at a working binary. -// -// reporter is the optional sink for download-byte progress; pass nil if -// no status sink is wired. +// EnsureModel ensures the model GGUF is installed. Skips +// LlamaServerForCurrentPlatform so a caller on a platform missing from +// the manifest can still bootstrap a model when RUNED_LLAMA_SERVER +// points at a working binary. func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (modelPath, variant string, err error) { if logger == nil { logger = slog.Default() @@ -175,7 +136,6 @@ func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger "variant", variant, "default_model", m.DefaultModel) - // Emit stage tick before AcquireLock — see EnsureAll/EnsureLlamaServer. if reporter != nil { reporter("model", 0, 0) } @@ -230,12 +190,9 @@ func downloadWithRetry(ctx context.Context, url, sha string, size int64, dest st } // makeProgress returns a throttled ProgressFunc that logs at most one -// line per progressLogInterval (plus a final 100% line on completion) -// and, when reporter != nil, forwards the same throttled tick to the -// reporter so an external status sink (server.SetBootstrapStatus etc.) -// shares the cadence. total ≤ 0 means Content-Length wasn't advertised; -// the function falls back to byte-count-only output and forwards -// total=0 to the reporter. +// line per progressLogInterval and, if reporter != nil, forwards the +// same throttled tick. total ≤ 0 means Content-Length wasn't advertised; +// falls back to byte-count-only output. func makeProgress(logger *slog.Logger, reporter StatusReporter, stage string, expectedTotal int64) ProgressFunc { var lastReport time.Time return func(downloaded, observedTotal int64) { @@ -285,8 +242,7 @@ func ResolveModelVariant(p *Paths, m *Manifest) (string, error) { // file (.llama_server.sha256) tracks the last-installed tarball hash so // repeat boots don't re-extract. func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (string, error) { - // Stage-transition tick is emitted by the public Ensure* entry - // points before AcquireLock, so we don't re-emit here. + // Caller emits the stage tick before AcquireLock. spec, err := m.LlamaServerForCurrentPlatform() if err != nil { return "", err @@ -367,8 +323,7 @@ func llamaServerTarget(p *Paths, spec *LlamaServerSpec) string { } func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, logger *slog.Logger, reporter StatusReporter) (string, error) { - // Stage-transition tick is emitted by the caller (EnsureModel or - // EnsureAll) before invoking this function, so we don't re-emit here. + // Caller emits the stage tick before invoking us. spec, err := m.ModelSpec(variant) if err != nil { return "", err diff --git a/internal/bootstrap/install_test.go b/internal/bootstrap/install_test.go index 2aaed2d..29e22d3 100644 --- a/internal/bootstrap/install_test.go +++ b/internal/bootstrap/install_test.go @@ -164,9 +164,6 @@ func writeConfig(t *testing.T, path, body string) { } } -// EnsureLlamaServer surfaces the manifest-platform-missing error directly: -// callers that explicitly asked for the manifest's llama-server can't be -// rescued by env override of the model side. func TestEnsureLlamaServer_PlatformMissingFails(t *testing.T) { dir := t.TempDir() t.Setenv(EnvHome, dir) @@ -183,9 +180,8 @@ func TestEnsureLlamaServer_PlatformMissingFails(t *testing.T) { } } -// EnsureModel must not consult LlamaServerForCurrentPlatform: a caller on -// a platform missing from the manifest who already has RUNED_LLAMA_SERVER -// set should still be able to bootstrap a model. +// A caller on a platform missing from the manifest can still bootstrap +// the model side (LlamaServerForCurrentPlatform is not consulted). func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { dir := t.TempDir() t.Setenv(EnvHome, dir) @@ -232,10 +228,8 @@ func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { } } -// Reporter must receive an entry-stage tick from each Ensure* function so a -// Health-surface caller can flip Phase the moment bootstrap enters that -// stage, even when the cache-hit short-circuit (or an early error) means no -// byte ticks follow. +// Stage tick fires on Ensure* entry so Health flips Phase even when +// cache hit (or early error) means no byte ticks follow. func TestEnsureLlamaServer_ReporterReceivesStageTickOnEntry(t *testing.T) { dir := t.TempDir() t.Setenv(EnvHome, dir) diff --git a/internal/server/server.go b/internal/server/server.go index 530ae30..d55cc0d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -27,9 +27,8 @@ const ( maxBatchSize int32 = 32 ) -// bootstrapState is the snapshot fed to Health while STATUS_LOADING. -// Treated as immutable once stored — SetBootstrapStatus replaces the -// whole pointer atomically so readers always see a consistent tuple. +// bootstrapState is replaced atomically so Health sees a consistent +// {phase, bytes, message} tuple in a single load. type bootstrapState struct { phase runedv1.HealthResponse_Phase bytesDone int64 @@ -37,55 +36,42 @@ type bootstrapState struct { message string } -// Server implements runedv1.RunedServiceServer. It does not own the backend -// lifecycle — callers (cmd/runed) construct New(), drive self-bootstrap while -// the gRPC socket already listens (Health reports STATUS_LOADING during that -// window), then call SetBackend once llama-server is up. +// Server implements runedv1.RunedServiceServer. Callers (cmd/runed) +// construct New(), then SetBackend once llama-server is up; Health +// reports STATUS_LOADING in between. type Server struct { runedv1.UnimplementedRunedServiceServer - // backend is nil until SetBackend is called. Embed/EmbedBatch return - // FAILED_PRECONDITION while nil; Health returns STATUS_LOADING. + // nil until SetBackend; while nil Embed returns FAILED_PRECONDITION + // and Health returns STATUS_LOADING. backend atomic.Pointer[backend.LlamaBackend] version string startedAt time.Time - // modelIdentity is "" until SetBackend computes the model SHA. Stored - // via atomic.Value so Info readers don't race the writer. + // "" until SetBackend; atomic so concurrent Info reads don't race. modelIdentity atomic.Value // string - // maxTextLength (chars) is sourced from the backend's ctx-size (tokens) - // at SetBackend time; chars==tokens is conservative (dense text is - // ≥~1.27 chars/token), so it always fits ctx. Advertised via Info; reads - // 0 before bootstrap completes since the value depends on llama-server's - // loaded config. + // chars equal to backend ctx-size in tokens — chars/token ratio is + // ≥~1.27 so chars==tokens always fits ctx. 0 before SetBackend. maxTextLength atomic.Int32 - // bootstrapStatus is updated during self-bootstrap and read by Health - // when backend is still nil. nil before any update. + // nil before any SetBootstrapStatus call. bootstrapStatus atomic.Pointer[bootstrapState] - // requests counts Embed + EmbedBatch calls (post-entry, pre-return). - // Exposed through HealthResponse.total_requests so clients can observe - // daemon throughput without scraping logs. + // Embed + EmbedBatch counter. Surfaced via HealthResponse.total_requests. requests atomic.Int64 - // shutdownOnce guarantees close(shutdownCh) runs exactly once even under - // a flurry of concurrent Shutdown RPCs (double-close panics). + // sync.Once guards close(shutdownCh) against double-close. shutdownOnce sync.Once shutdownCh chan struct{} - // lastActivity records the UnixNano timestamp of the most recent RPC - // entry (set by UnaryActivityInterceptor). Used by the idle-exit ticker - // in cmd/runed to decide when to call TriggerShutdown. + // UnixNano of the most recent RPC; read by cmd/runed's idle ticker. lastActivity atomic.Int64 } -// New returns a Server with backend unset. Until SetBackend is called, -// Embed/EmbedBatch return FAILED_PRECONDITION and Health reports -// STATUS_LOADING + whatever phase the latest SetBootstrapStatus posted. -// modelIdentity is empty until SetBackend supplies it. +// New returns a Server with backend unset; SetBackend wires it after +// bootstrap completes. func New(version string) *Server { s := &Server{ version: version, @@ -97,24 +83,18 @@ func New(version string) *Server { return s } -// SetBackend wires the backend and model identity after self-bootstrap -// completes. From this point on, Embed/EmbedBatch are accepted and -// Health reports STATUS_OK (or STATUS_DEGRADED if IsHealthy fails). Safe -// to call concurrently with in-flight RPCs — readers see a consistent -// transition because maxTextLength and modelIdentity are written before -// the backend pointer is published. +// SetBackend wires the backend after bootstrap. maxTextLength and +// modelIdentity are written before backend.Store so any reader seeing +// the backend pointer necessarily sees the other two. func (s *Server) SetBackend(b *backend.LlamaBackend, modelIdentity string) { s.maxTextLength.Store(int32(b.CtxSize())) s.modelIdentity.Store(modelIdentity) s.backend.Store(b) } -// SetBootstrapStatus records the current self-bootstrap phase and download -// progress. The next Health RPC returns these fields when STATUS_LOADING. -// Callers (cmd/runed + bootstrap reporter) emit one update per phase -// transition and periodically during long downloads. bytesTotal == 0 -// means total size isn't yet known (e.g. before HTTP Content-Length is -// read); clients should render percent-complete only when total > 0. +// SetBootstrapStatus updates the Phase / bytes / message that the next +// Health call returns under STATUS_LOADING. bytesTotal == 0 means the +// total size isn't yet known (no Content-Length observed yet). func (s *Server) SetBootstrapStatus(phase runedv1.HealthResponse_Phase, bytesDone, bytesTotal int64, message string) { s.bootstrapStatus.Store(&bootstrapState{ phase: phase, @@ -198,28 +178,13 @@ func shortMethod(fullMethod string) string { // loop forever. const embedMaxAttempts = 2 -// Embed delegates to the backend's single-text embedding path. -// The proto dropped the normalize field (see commit 816ef81); the backend is -// called with normalize=true as a harmless default since llama-server always -// returns L2-normalized vectors anyway. +// Embed returns FAILED_PRECONDITION (not Unavailable) before SetBackend +// so client retry policies don't burn budget against a multi-minute +// bootstrap. // -// Returns FAILED_PRECONDITION when the backend hasn't been wired yet -// (self-bootstrap still in progress). codes.FailedPrecondition (not -// Unavailable) intentionally bypasses default-retry policies — bootstrap -// can take minutes, so short exponential backoffs would just exhaust -// retries pre-ready. Whether clients fast-fail or poll Health is the -// client's concern; the error message stays neutral on retry strategy. -// -// Once the backend is wired it may still be suspended by the idle- -// suspend ticker. EnsureStarted resurrects it under the daemon-lifetime -// context — the first request after a suspend pays llama-server cold- -// start latency (~hundreds of ms to a few seconds for model load); -// subsequent requests fall through the cheap health-probe fast path. -// -// Retry loop: backend.Embed holds inflightMu.RLock so Stop can't kill -// an in-flight HTTP. The remaining race window is EnsureStarted-return -// → RLock-acquire; if Stop slips into that gap we get ErrNotStarted on -// the first attempt and recover by re-running EnsureStarted once. +// After wiring, EnsureStarted handles idle-suspend wake-up. The retry +// loop covers the EnsureStarted-return → RLock-acquire race where Stop +// slips in and surfaces ErrNotStarted. func (s *Server) Embed(ctx context.Context, req *runedv1.EmbedRequest) (*runedv1.EmbedResponse, error) { b := s.backend.Load() if b == nil { @@ -241,11 +206,8 @@ func (s *Server) Embed(ctx context.Context, req *runedv1.EmbedRequest) (*runedv1 return nil, fmt.Errorf("backend kept suspending between EnsureStarted and Embed") } -// EmbedBatch delegates to the backend's batch path and wraps each vector in -// an EmbedResponse so the proto response message stays composable with -// single-text Embed. Returns FAILED_PRECONDITION when the backend hasn't -// been wired yet; see Embed godoc for the EnsureStarted / ErrNotStarted -// retry rationale. +// EmbedBatch is the batch variant of Embed; see Embed for behaviour +// during bootstrap and idle-suspend. func (s *Server) EmbedBatch(ctx context.Context, req *runedv1.EmbedBatchRequest) (*runedv1.EmbedBatchResponse, error) { b := s.backend.Load() if b == nil { @@ -273,11 +235,9 @@ func (s *Server) EmbedBatch(ctx context.Context, req *runedv1.EmbedBatchRequest) return nil, fmt.Errorf("backend kept suspending between EnsureStarted and EmbedBatch") } -// Info returns static daemon metadata. Does not touch the backend — safe to -// call before SetBackend or during a DEGRADED state. MaxTextLength reads 0 -// before bootstrap completes since the value depends on llama-server's -// loaded ctx-size; clients should re-query Info after Health reports -// STATUS_OK if they need the final value. +// Info returns static daemon metadata. MaxTextLength reads 0 before +// SetBackend; clients needing the final value should re-query after +// Health reports STATUS_OK. func (s *Server) Info(ctx context.Context, _ *runedv1.InfoRequest) (*runedv1.InfoResponse, error) { mid, _ := s.modelIdentity.Load().(string) return &runedv1.InfoResponse{ @@ -289,21 +249,13 @@ func (s *Server) Info(ctx context.Context, _ *runedv1.InfoRequest) (*runedv1.Inf }, nil } -// Health maps backend readiness onto the proto Status enum: -// -// - shutdown signalled (Shutdown RPC / TriggerShutdown) → STATUS_SHUTTING_DOWN -// - backend not yet wired → STATUS_LOADING + -// Phase / bytes / message populated from the most recent -// SetBootstrapStatus -// - backend wired but unhealthy → STATUS_DEGRADED -// - backend wired and healthy → STATUS_OK -// -// SHUTTING_DOWN is checked first so a drain-in-progress daemon doesn't -// advertise itself as ready (callers that read OK during the GracefulStop -// race would otherwise send a request just to receive Unavailable). +// Health maps backend state to the proto Status enum. SHUTTING_DOWN +// outranks LOADING/DEGRADED/OK so a drain-in-progress daemon doesn't +// advertise itself as ready (the GracefulStop race would otherwise +// surface Unavailable right after the OK response). // -// Never returns an error so clients can always read uptime as a liveness -// signal and treat any RPC success as proof the daemon process exists. +// Never returns an error — clients can read uptime as a liveness signal +// even when the other fields are zero-valued. func (s *Server) Health(ctx context.Context, _ *runedv1.HealthRequest) (*runedv1.HealthResponse, error) { resp := &runedv1.HealthResponse{ UptimeSeconds: int64(time.Since(s.startedAt).Seconds()), diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 0643700..169c0e7 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -16,13 +16,9 @@ import ( "google.golang.org/grpc/status" ) -// newInProcessServer wires a Server{} to a loopback listener and returns a -// client plus a cleanup that shuts both ends down. It deliberately uses a -// random port (":0") so multiple tests can run in parallel without port clash. -// -// b == nil leaves the server in its pre-SetBackend state (Health returns -// STATUS_LOADING; Embed/EmbedBatch return FAILED_PRECONDITION). Non-nil b -// is wired via SetBackend with a synthetic model identity. +// newInProcessServer wires a Server to a loopback listener. b == nil +// leaves the server pre-SetBackend (Health = LOADING, Embed = +// FAILED_PRECONDITION); non-nil b is wired with a synthetic model id. func newInProcessServer(t *testing.T, b *backend.LlamaBackend) (runedv1.RunedServiceClient, func()) { t.Helper() lis, err := net.Listen("tcp", "127.0.0.1:0") @@ -198,8 +194,6 @@ func TestServer_HealthBootstrapFieldsDefaultZero(t *testing.T) { } } -// Without a backend wired, Health reports STATUS_LOADING. Phase/bytes are -// zero until SetBootstrapStatus is called. func TestServer_HealthLoadingBeforeSetBackend(t *testing.T) { client, cleanup := newInProcessServer(t, nil) defer cleanup() @@ -213,8 +207,6 @@ func TestServer_HealthLoadingBeforeSetBackend(t *testing.T) { } } -// After SetBootstrapStatus, Health reflects the recorded phase, bytes, and -// message verbatim — the proto fields are wired directly from the snapshot. func TestServer_HealthLoadingReflectsSetBootstrapStatus(t *testing.T) { s := New("vtest") s.SetBootstrapStatus( @@ -239,10 +231,8 @@ func TestServer_HealthLoadingReflectsSetBootstrapStatus(t *testing.T) { } } -// Embed before SetBackend must surface FAILED_PRECONDITION (not Unavailable -// or generic Internal) so clients can branch on the code and switch to -// Health-polling instead of consuming retry budget against a daemon that -// can't yet answer. +// FAILED_PRECONDITION (not Unavailable) so client retry policies don't +// consume budget against a non-ready daemon. func TestServer_EmbedFailsBeforeBackendSet(t *testing.T) { s := New("vtest") _, err := s.Embed(context.Background(), &runedv1.EmbedRequest{Text: "x"}) @@ -265,10 +255,6 @@ func TestServer_EmbedBatchFailsBeforeBackendSet(t *testing.T) { } } -// After TriggerShutdown (or a Shutdown RPC), Health flips to -// STATUS_SHUTTING_DOWN regardless of backend state — the drain-in-progress -// signal outranks LOADING/DEGRADED/OK so callers don't send fresh work -// just to receive Unavailable mid-drain. func TestServer_HealthShuttingDownAfterTrigger(t *testing.T) { s := New("vtest") s.TriggerShutdown() @@ -282,10 +268,8 @@ func TestServer_HealthShuttingDownAfterTrigger(t *testing.T) { } } -// SHUTTING_DOWN must outrank LOADING — pinning this priority guards -// against future Health refactors accidentally reordering the checks and -// surfacing LOADING to clients during a drain (which would lead them to -// keep polling instead of disconnecting). +// Pins SHUTTING_DOWN > LOADING priority so future Health refactors +// don't surface LOADING during drain. func TestServer_HealthShuttingDownOutranksLoading(t *testing.T) { s := New("vtest") // backend nil → LOADING candidate; bootstrap status would normally From c6839d5816f6d88ea6d04ba313afe76eee2d5ddc Mon Sep 17 00:00:00 2001 From: couragehong Date: Fri, 29 May 2026 14:03:47 +0900 Subject: [PATCH 5/5] fix(runed): route serve-error exit through shared shutdown cleanup --- cmd/runed/main.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/runed/main.go b/cmd/runed/main.go index 334d9be..a77a10f 100644 --- a/cmd/runed/main.go +++ b/cmd/runed/main.go @@ -249,17 +249,20 @@ func run() error { // SIGKILL cannot be intercepted from user space. signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + // All exit triggers share the cleanup below. Only a serve fault yields a + // non-zero process exit; capture it and return after cleanup rather than + // bailing early, so the drain-then-stop ordering still applies. + var exitErr error select { case s := <-sigCh: logger.Info("received signal", "signal", s.String()) case <-srv.ShutdownCh(): logger.Info("received Shutdown RPC") case err := <-serveErr: + // Non-nil err is a real fault (Serve returns nil only on graceful stop). if err != nil { logger.Error("gRPC serve error", "err", err) - // Still fall through to backend cleanup below. - _ = b.Stop(context.Background()) - return fmt.Errorf("serve: %w", err) + exitErr = fmt.Errorf("serve: %w", err) } } @@ -291,7 +294,7 @@ func run() error { logger.Warn("backend stop returned error", "err", err) } logger.Info("shutdown complete") - return nil + return exitErr } // selfBootstrap fetches the manifest and installs the side(s) the