diff --git a/cmd/runed/main.go b/cmd/runed/main.go index 1a4c3f1..a77a10f 100644 --- a/cmd/runed/main.go +++ b/cmd/runed/main.go @@ -8,6 +8,11 @@ // match. To bypass self-bootstrap entirely, set both RUNED_LLAMA_SERVER // and RUNED_MODEL to absolute paths. // +// The gRPC UDS opens before self-bootstrap so clients can dial immediately +// and poll Health for STATUS_LOADING + Phase/bytes progress instead of +// seeing dial failures during the multi-minute install window. Embed and +// EmbedBatch return FAILED_PRECONDITION until the backend is wired. +// // Optional environment: // // RUNED_HOME Data directory (default: $HOME/.runed). @@ -99,11 +104,56 @@ func run() error { } } + // ctx-size: RUNED_CTX_SIZE if valid, else backend default (2048). + ctxSize := 0 + if v := os.Getenv("RUNED_CTX_SIZE"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + ctxSize = n + } else { + logger.Warn("invalid RUNED_CTX_SIZE, using default", "value", v) + } + } + + // Listen before self-bootstrap so clients can poll Health for + // STATUS_LOADING + Phase/progress instead of dial failures during + // the multi-minute install window. SetBackend below flips Health + // to STATUS_OK once llama-server is up. + srv := server.New(daemonVersion) + lis, err := ipc.Listen(sockPath) + if err != nil { + return fmt.Errorf("ipc listen: %w", err) + } + logger.Info("listening", "socket", sockPath) + + gs := grpc.NewServer(grpc.UnaryInterceptor(srv.UnaryActivityInterceptor())) + runedv1.RegisterRunedServiceServer(gs, srv) + + // Serve in a goroutine so main can block on signals/Shutdown/serve error. + // gs.Serve returns nil on graceful stop; any non-nil err is a real fault. + serveErr := make(chan error, 1) + go func() { + if err := gs.Serve(lis); err != nil { + serveErr <- err + } + close(serveErr) + }() + + // Forward bootstrap progress into Health via stage→Phase mapping. + reporter := bootstrap.StatusReporter(func(stage string, done, total int64) { + phase, msg := stagePhase(stage) + srv.SetBootstrapStatus(phase, done, total, msg) + }) + llamaBin := os.Getenv("RUNED_LLAMA_SERVER") model := os.Getenv("RUNED_MODEL") if llamaBin == "" || model == "" { - bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "") + // Manifest fetch has no dedicated Phase enum; use UNSPECIFIED and + // convey the stage through message. selfBootstrap overwrites + // Phase on entering ensure{LlamaServer,Model}. + srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_UNSPECIFIED, 0, 0, "fetching manifest") + bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "", reporter) if err != nil { + bailBoot(logger, srv, gs, nil) return err } if llamaBin == "" { @@ -117,18 +167,11 @@ func run() error { "llama_server", llamaBin, "model", model) } - // ctx-size: RUNED_CTX_SIZE if valid, else backend default (2048). - ctxSize := 0 - if v := os.Getenv("RUNED_CTX_SIZE"); v != "" { - if n, err := strconv.Atoi(v); err == nil && n > 0 { - ctxSize = n - } else { - logger.Warn("invalid RUNED_CTX_SIZE, using default", "value", v) - } - } + srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_STARTING_LLAMA_SERVER, 0, 0, "starting llama-server") modelID, err := sha256File(model) if err != nil { + bailBoot(logger, srv, gs, nil) return fmt.Errorf("model hash: %w", err) } logger.Info("model identity", "sha256", modelID, "path", model) @@ -146,35 +189,19 @@ func run() error { // backend.Start already bounds start-up on its own (~15s health poll + // early-exit detection via the stderr scanner). if err := b.Start(ctx); err != nil { + // b.Start may have spawned a child that failed health-probe; + // bailBoot's b.Stop reaps it. + bailBoot(logger, srv, gs, b) return fmt.Errorf("backend start: %w", err) } logger.Info("llama-server ready", "port", b.Port()) - lis, err := ipc.Listen(sockPath) - if err != nil { - // Best-effort backend cleanup — we have no logger-side context to thread here. - _ = b.Stop(context.Background()) - return fmt.Errorf("ipc listen: %w", err) - } - logger.Info("listening", "socket", sockPath) - - srv := server.New(b, daemonVersion, modelID) - gs := grpc.NewServer(grpc.UnaryInterceptor(srv.UnaryActivityInterceptor())) - runedv1.RegisterRunedServiceServer(gs, srv) - - // Serve in a goroutine so main can block on signals/Shutdown/serve error. - // gs.Serve returns nil on graceful stop; any non-nil err is a real fault. - serveErr := make(chan error, 1) - go func() { - if err := gs.Serve(lis); err != nil { - serveErr <- err - } - close(serveErr) - }() + // Flip Health to STATUS_OK and accept Embed/EmbedBatch. + srv.SetBackend(b, modelID) idleTimeout, err := parseIdleTimeout() if err != nil { - _ = b.Stop(context.Background()) + bailBoot(logger, srv, gs, b) return fmt.Errorf("RUNED_IDLE_TIMEOUT: %w", err) } if idleTimeout > 0 { @@ -222,20 +249,28 @@ func run() error { // SIGKILL cannot be intercepted from user space. signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + // All exit triggers share the cleanup below. Only a serve fault yields a + // non-zero process exit; capture it and return after cleanup rather than + // bailing early, so the drain-then-stop ordering still applies. + var exitErr error select { case s := <-sigCh: logger.Info("received signal", "signal", s.String()) case <-srv.ShutdownCh(): logger.Info("received Shutdown RPC") case err := <-serveErr: + // Non-nil err is a real fault (Serve returns nil only on graceful stop). if err != nil { logger.Error("gRPC serve error", "err", err) - // Still fall through to backend cleanup below. - _ = b.Stop(context.Background()) - return fmt.Errorf("serve: %w", err) + exitErr = fmt.Errorf("serve: %w", err) } } + // Flip Health to STATUS_SHUTTING_DOWN for all exit triggers (OS + // signal / serve error / Shutdown RPC). sync.Once makes a redundant + // call a no-op. + srv.TriggerShutdown() + // Phase 1: drain in-flight RPCs. GracefulStop blocks until all active // handlers return; 10s is a safety net for a wedged client. logger.Info("draining in-flight requests") @@ -259,29 +294,23 @@ func run() error { logger.Warn("backend stop returned error", "err", err) } logger.Info("shutdown complete") - return nil + return exitErr } -// selfBootstrap fetches the manifest and ensures llama-server and the -// resolved model variant are present under paths. Called whenever -// RUNED_LLAMA_SERVER or RUNED_MODEL is unset; bypassed entirely when -// both are provided. -// -// needLlama / needModel reflect which side(s) the caller still needs. -// Only those sides are downloaded — the env-overridden side is skipped -// so its manifest artifact isn't fetched only to be discarded. When -// both are needed, EnsureAll's single-lock fast path is used. -func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool) (binPath, modelPath string, err error) { +// selfBootstrap fetches the manifest and installs the side(s) the +// caller needs. needLlama / needModel reflect which env paths are +// unset; only those sides are downloaded so an env override doesn't +// pay for an artifact it will discard. Both needed → EnsureAll's +// single-lock fast path. +func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool, reporter bootstrap.StatusReporter) (binPath, modelPath string, err error) { manifestURL := bootstrap.ResolveManifestURL() if manifestURL == "" { return "", "", fmt.Errorf( "self-bootstrap needed (RUNED_LLAMA_SERVER or RUNED_MODEL unset) but no manifest URL: set %s or rebuild with DEFAULT_MANIFEST_URL", bootstrap.EnvManifest) } - // HTTPS isn't enforced because a private network may legitimately serve - // the manifest over plain HTTP. But callers should know: a MITM that - // rewrites the manifest can also rewrite the SHA256s, so artifact - // integrity collapses to "trust the manifest channel." + // A MITM that rewrites the manifest can also rewrite the SHA256s, + // so artifact integrity collapses to "trust the manifest channel." if !strings.HasPrefix(manifestURL, "https://") { logger.Warn("manifest URL is not HTTPS; artifact integrity rests on manifest channel trust", "url", manifestURL) @@ -298,7 +327,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa switch { case needLlama && needModel: var variant string - binPath, modelPath, variant, err = bootstrap.EnsureAll(ctx, paths, mani, logger) + binPath, modelPath, variant, err = bootstrap.EnsureAll(ctx, paths, mani, logger, reporter) if err != nil { return "", "", fmt.Errorf("bootstrap install: %w", err) } @@ -307,7 +336,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa "model", modelPath, "variant", variant) case needLlama: - binPath, err = bootstrap.EnsureLlamaServer(ctx, paths, mani, logger) + binPath, err = bootstrap.EnsureLlamaServer(ctx, paths, mani, logger, reporter) if err != nil { return "", "", fmt.Errorf("bootstrap install: %w", err) } @@ -315,7 +344,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa "llama_server", binPath) case needModel: var variant string - modelPath, variant, err = bootstrap.EnsureModel(ctx, paths, mani, logger) + modelPath, variant, err = bootstrap.EnsureModel(ctx, paths, mani, logger, reporter) if err != nil { return "", "", fmt.Errorf("bootstrap install: %w", err) } @@ -328,6 +357,47 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa return binPath, modelPath, nil } +// stagePhase maps a bootstrap stage name to its proto Phase + message. +// PHASE_UNSPECIFIED on unknown stages keeps the surface forward- +// compatible with future stages. +func stagePhase(stage string) (runedv1.HealthResponse_Phase, string) { + switch stage { + case "llama_server": + return runedv1.HealthResponse_PHASE_FETCHING_LLAMA_SERVER, "fetching llama-server" + case "model": + return runedv1.HealthResponse_PHASE_FETCHING_MODEL, "fetching embedding model" + default: + return runedv1.HealthResponse_PHASE_UNSPECIFIED, "" + } +} + +// bailBoot runs the normal-exit cleanup sequence for boot-time failures +// so clients see one final STATUS_SHUTTING_DOWN instead of a sudden +// disconnect. b may be nil; b.Stop is idempotent. +func bailBoot(logger *slog.Logger, srv *server.Server, gs *grpc.Server, b *backend.LlamaBackend) { + srv.TriggerShutdown() + stopGRPC(logger, gs) + if b != nil { + _ = b.Stop(context.Background()) + } +} + +// stopGRPC drives a 10s-bounded GracefulStop with force-Stop fallback. +func stopGRPC(logger *slog.Logger, gs *grpc.Server) { + graceDone := make(chan struct{}) + go func() { + gs.GracefulStop() + close(graceDone) + }() + select { + case <-graceDone: + case <-time.After(10 * time.Second): + logger.Warn("graceful stop timed out, forcing") + gs.Stop() + <-graceDone + } +} + // anotherDaemonReachable returns true if a UDS listener accepts our dial // at sockPath within 500ms. False covers both "file missing" and "file // exists but nobody listening" (the latter is a stale-socket case the diff --git a/internal/bootstrap/install.go b/internal/bootstrap/install.go index e49a959..c1f7e8f 100644 --- a/internal/bootstrap/install.go +++ b/internal/bootstrap/install.go @@ -42,21 +42,21 @@ var downloadRetryBackoff = 5 * time.Second const retryBackoffMultiplier = 3 -// EnsureAll resolves the model variant and ensures both llama-server and -// the model GGUF are present, downloading any missing pieces while -// holding $RUNED_HOME/install.lock. Returns the absolute path to the -// llama-server executable and the GGUF file the daemon should load. -// -// This is the normal-path entry point: both RUNED_LLAMA_SERVER and -// RUNED_MODEL unset → manifest-driven install of everything. When only -// one env var is set, callers should use EnsureLlamaServer or -// EnsureModel directly so the side they already have isn't redownloaded -// only to be discarded by the env override. +// StatusReporter receives throttled progress ticks from Ensure* tagged +// with stage ("llama_server" or "model"). bytesTotal == 0 means the +// total isn't yet known (no Content-Length observed). nil is a valid +// "no sink" value; reporter calls run inline on the download goroutine +// and should return quickly. +type StatusReporter func(stage string, bytesDone, bytesTotal int64) + +// EnsureAll ensures both llama-server and the model GGUF are installed +// under a single lock acquisition. Use when both RUNED_LLAMA_SERVER and +// RUNED_MODEL are unset; for the partial-set case, call +// EnsureLlamaServer / EnsureModel individually so the env-overridden +// side isn't redownloaded only to be discarded. // -// logger may be nil; slog.Default() is used in that case. All progress -// and per-step status is emitted through this logger — callers don't -// need to thread a separate ProgressFunc. -func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (llamaBin, modelPath, variant string, err error) { +// logger may be nil (slog.Default used). reporter may be nil. +func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (llamaBin, modelPath, variant string, err error) { if logger == nil { logger = slog.Default() } @@ -71,56 +71,57 @@ func EnsureAll(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) "variant", variant, "default_model", m.DefaultModel) + // Stage tick before AcquireLock so a lock-waiting trailer surfaces + // the right Phase while it waits. + if reporter != nil { + reporter("llama_server", 0, 0) + } lock, err := AcquireLock(p.InstallLock, InstallLockTimeout) if err != nil { return "", "", "", fmt.Errorf("install lock: %w", err) } defer lock.Release() - llamaBin, err = ensureLlamaServer(ctx, p, m, logger) + llamaBin, err = ensureLlamaServer(ctx, p, m, logger, reporter) if err != nil { return "", "", "", err } - modelPath, err = ensureModel(ctx, p, m, variant, logger) + if reporter != nil { + reporter("model", 0, 0) + } + modelPath, err = ensureModel(ctx, p, m, variant, logger, reporter) if err != nil { return "", "", "", err } return llamaBin, modelPath, variant, nil } -// EnsureLlamaServer ensures only the llama-server binary is present and -// returns its absolute path. Use when the caller already has a model -// path (e.g. RUNED_MODEL is set) and only the llama-server side needs -// the manifest install. -// -// The manifest must include an entry for the current platform — -// LlamaServerForCurrentPlatform is consulted unconditionally here, which -// is fine because the caller explicitly asked for the manifest's -// llama-server. -func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (string, error) { +// EnsureLlamaServer ensures the llama-server binary is installed. The +// manifest must have an entry for the current platform (this is the +// caller's intent — they explicitly asked for the manifest's binary). +func EnsureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (string, error) { if logger == nil { logger = slog.Default() } if err := p.EnsureDirs(); err != nil { return "", err } + if reporter != nil { + reporter("llama_server", 0, 0) + } lock, err := AcquireLock(p.InstallLock, InstallLockTimeout) if err != nil { return "", fmt.Errorf("install lock: %w", err) } defer lock.Release() - return ensureLlamaServer(ctx, p, m, logger) + return ensureLlamaServer(ctx, p, m, logger, reporter) } -// EnsureModel ensures only the model GGUF is present and returns the -// absolute path plus the resolved variant ID. Use when the caller -// already has a llama-server path (e.g. RUNED_LLAMA_SERVER is set) and -// only the model side needs the manifest install. -// -// LlamaServerForCurrentPlatform is not called here, so a caller on a -// platform missing from the manifest can still bootstrap a model as -// long as their RUNED_LLAMA_SERVER points at a working binary. -func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (modelPath, variant string, err error) { +// EnsureModel ensures the model GGUF is installed. Skips +// LlamaServerForCurrentPlatform so a caller on a platform missing from +// the manifest can still bootstrap a model when RUNED_LLAMA_SERVER +// points at a working binary. +func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (modelPath, variant string, err error) { if logger == nil { logger = slog.Default() } @@ -135,13 +136,16 @@ func EnsureModel(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger "variant", variant, "default_model", m.DefaultModel) + if reporter != nil { + reporter("model", 0, 0) + } lock, err := AcquireLock(p.InstallLock, InstallLockTimeout) if err != nil { return "", "", fmt.Errorf("install lock: %w", err) } defer lock.Release() - modelPath, err = ensureModel(ctx, p, m, variant, logger) + modelPath, err = ensureModel(ctx, p, m, variant, logger, reporter) if err != nil { return "", "", err } @@ -186,10 +190,10 @@ func downloadWithRetry(ctx context.Context, url, sha string, size int64, dest st } // makeProgress returns a throttled ProgressFunc that logs at most one -// line per progressLogInterval, plus a final 100% line on completion. -// total ≤ 0 means Content-Length wasn't advertised; the function falls -// back to byte-count-only output. -func makeProgress(logger *slog.Logger, stage string, expectedTotal int64) ProgressFunc { +// line per progressLogInterval and, if reporter != nil, forwards the +// same throttled tick. total ≤ 0 means Content-Length wasn't advertised; +// falls back to byte-count-only output. +func makeProgress(logger *slog.Logger, reporter StatusReporter, stage string, expectedTotal int64) ProgressFunc { var lastReport time.Time return func(downloaded, observedTotal int64) { total := expectedTotal @@ -207,6 +211,9 @@ func makeProgress(logger *slog.Logger, stage string, expectedTotal int64) Progre attrs = append(attrs, "total", total, "pct", fmt.Sprintf("%.1f%%", pct)) } logger.Info("download progress", attrs...) + if reporter != nil { + reporter(stage, downloaded, total) + } } } @@ -234,7 +241,8 @@ func ResolveModelVariant(p *Paths, m *Manifest) (string, error) { // extracting or downloading the artifact as needed. A sidecar marker // file (.llama_server.sha256) tracks the last-installed tarball hash so // repeat boots don't re-extract. -func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger) (string, error) { +func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog.Logger, reporter StatusReporter) (string, error) { + // Caller emits the stage tick before AcquireLock. spec, err := m.LlamaServerForCurrentPlatform() if err != nil { return "", err @@ -259,7 +267,7 @@ func ensureLlamaServer(ctx context.Context, p *Paths, m *Manifest, logger *slog. "target", target) } - progress := makeProgress(logger, "llama_server", spec.Size) + progress := makeProgress(logger, reporter, "llama_server", spec.Size) switch spec.Extract { case "": if err := os.MkdirAll(filepath.Dir(target), 0o700); err != nil { @@ -314,7 +322,8 @@ func llamaServerTarget(p *Paths, spec *LlamaServerSpec) string { return filepath.Join(p.LlamaDir, filepath.FromSlash(exec)) } -func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, logger *slog.Logger) (string, error) { +func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, logger *slog.Logger, reporter StatusReporter) (string, error) { + // Caller emits the stage tick before invoking us. spec, err := m.ModelSpec(variant) if err != nil { return "", err @@ -338,7 +347,7 @@ func ensureModel(ctx context.Context, p *Paths, m *Manifest, variant string, log return "", err } logger.Info("ensure model: downloading GGUF", "url", spec.URL) - progress := makeProgress(logger, "model", spec.Size) + progress := makeProgress(logger, reporter, "model", spec.Size) if err := downloadWithRetry(ctx, spec.URL, spec.SHA256, spec.Size, target, progress, logger, "model"); err != nil { return "", fmt.Errorf("ensure model: download: %w", err) } diff --git a/internal/bootstrap/install_test.go b/internal/bootstrap/install_test.go index f3cabfd..29e22d3 100644 --- a/internal/bootstrap/install_test.go +++ b/internal/bootstrap/install_test.go @@ -164,9 +164,6 @@ func writeConfig(t *testing.T, path, body string) { } } -// EnsureLlamaServer surfaces the manifest-platform-missing error directly: -// callers that explicitly asked for the manifest's llama-server can't be -// rescued by env override of the model side. func TestEnsureLlamaServer_PlatformMissingFails(t *testing.T) { dir := t.TempDir() t.Setenv(EnvHome, dir) @@ -178,14 +175,13 @@ func TestEnsureLlamaServer_PlatformMissingFails(t *testing.T) { Models: map[string]ArtifactSpec{}, } - if _, err := EnsureLlamaServer(t.Context(), p, m, slog.Default()); !errors.Is(err, ErrNoArtifactForPlatform) { + if _, err := EnsureLlamaServer(t.Context(), p, m, slog.Default(), nil); !errors.Is(err, ErrNoArtifactForPlatform) { t.Fatalf("expected ErrNoArtifactForPlatform, got: %v", err) } } -// EnsureModel must not consult LlamaServerForCurrentPlatform: a caller on -// a platform missing from the manifest who already has RUNED_LLAMA_SERVER -// set should still be able to bootstrap a model. +// A caller on a platform missing from the manifest can still bootstrap +// the model side (LlamaServerForCurrentPlatform is not consulted). func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { dir := t.TempDir() t.Setenv(EnvHome, dir) @@ -220,7 +216,7 @@ func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { DefaultModel: variant, } - got, gotVariant, err := EnsureModel(t.Context(), p, m, slog.Default()) + got, gotVariant, err := EnsureModel(t.Context(), p, m, slog.Default(), nil) if err != nil { t.Fatalf("EnsureModel must succeed when no llama-server entry is present: %v", err) } @@ -231,3 +227,71 @@ func TestEnsureModel_NoLlamaServerEntryNeeded(t *testing.T) { t.Errorf("got variant %q, want %q", gotVariant, variant) } } + +// Stage tick fires on Ensure* entry so Health flips Phase even when +// cache hit (or early error) means no byte ticks follow. +func TestEnsureLlamaServer_ReporterReceivesStageTickOnEntry(t *testing.T) { + dir := t.TempDir() + t.Setenv(EnvHome, dir) + p, _ := Resolve() + + // platforms empty → LlamaServerForCurrentPlatform errors after the + // reporter's entry tick. + m := &Manifest{ + Version: 1, + Platforms: map[string]PlatformArtifacts{}, + Models: map[string]ArtifactSpec{}, + } + + var stages []string + reporter := func(stage string, _, _ int64) { + stages = append(stages, stage) + } + _, _ = EnsureLlamaServer(t.Context(), p, m, slog.Default(), reporter) + if len(stages) != 1 || stages[0] != "llama_server" { + t.Errorf("stages = %v, want [llama_server]", stages) + } +} + +func TestEnsureModel_ReporterReceivesStageTickOnEntry(t *testing.T) { + dir := t.TempDir() + t.Setenv(EnvHome, dir) + t.Setenv(EnvModelVariant, "") + p, _ := Resolve() + if err := p.EnsureDirs(); err != nil { + t.Fatalf("EnsureDirs: %v", err) + } + + // Pre-populate model so cache-hit returns without any download path + // running. Only the entry tick should reach the reporter. + variant := "test-variant" + body := []byte("model-content") + sum := sha256.Sum256(body) + wantSHA := hex.EncodeToString(sum[:]) + modelPath := p.ModelPath(variant) + if err := os.MkdirAll(filepath.Dir(modelPath), 0o700); err != nil { + t.Fatalf("mkdir models: %v", err) + } + if err := os.WriteFile(modelPath, body, 0o600); err != nil { + t.Fatalf("write model: %v", err) + } + + m := &Manifest{ + Version: 1, + Models: map[string]ArtifactSpec{ + variant: {URL: "https://ignored", SHA256: wantSHA, Size: int64(len(body))}, + }, + DefaultModel: variant, + } + + var stages []string + reporter := func(stage string, _, _ int64) { + stages = append(stages, stage) + } + if _, _, err := EnsureModel(t.Context(), p, m, slog.Default(), reporter); err != nil { + t.Fatalf("EnsureModel: %v", err) + } + if len(stages) != 1 || stages[0] != "model" { + t.Errorf("stages = %v, want [model]", stages) + } +} diff --git a/internal/server/server.go b/internal/server/server.go index 81431db..d55cc0d 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -15,6 +15,8 @@ import ( runedv1 "github.com/CryptoLabInc/runed/gen/runed/v1" "github.com/CryptoLabInc/runed/internal/backend" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // Plan A constants for the Info RPC. Qwen3-Embedding-0.6B fixes these at @@ -25,53 +27,83 @@ const ( maxBatchSize int32 = 32 ) -// Server implements runedv1.RunedServiceServer. It does not own the backend — -// callers (cmd/runed) are responsible for Start/Stop on the LlamaBackend. +// bootstrapState is replaced atomically so Health sees a consistent +// {phase, bytes, message} tuple in a single load. +type bootstrapState struct { + phase runedv1.HealthResponse_Phase + bytesDone int64 + bytesTotal int64 + message string +} + +// Server implements runedv1.RunedServiceServer. Callers (cmd/runed) +// construct New(), then SetBackend once llama-server is up; Health +// reports STATUS_LOADING in between. type Server struct { runedv1.UnimplementedRunedServiceServer - backend *backend.LlamaBackend - version string - modelIdentity string - startedAt time.Time - - // maxTextLength (chars) is snapshotted from the backend's ctx-size (tokens) - // in New(); chars==tokens is conservative (dense text is ≥~1.27 chars/token), - // so it always fits ctx. Advertised via Info → clients cap to whatever ctx - // the daemon booted with, keeping the char limit locked to the token limit. - maxTextLength int32 - - // requests counts Embed + EmbedBatch calls (post-entry, pre-return). - // Exposed through HealthResponse.total_requests so clients can observe - // daemon throughput without scraping logs. + + // nil until SetBackend; while nil Embed returns FAILED_PRECONDITION + // and Health returns STATUS_LOADING. + backend atomic.Pointer[backend.LlamaBackend] + + version string + startedAt time.Time + + // "" until SetBackend; atomic so concurrent Info reads don't race. + modelIdentity atomic.Value // string + + // chars equal to backend ctx-size in tokens — chars/token ratio is + // ≥~1.27 so chars==tokens always fits ctx. 0 before SetBackend. + maxTextLength atomic.Int32 + + // nil before any SetBootstrapStatus call. + bootstrapStatus atomic.Pointer[bootstrapState] + + // Embed + EmbedBatch counter. Surfaced via HealthResponse.total_requests. requests atomic.Int64 - // shutdownOnce guarantees close(shutdownCh) runs exactly once even under - // a flurry of concurrent Shutdown RPCs (double-close panics). + // sync.Once guards close(shutdownCh) against double-close. shutdownOnce sync.Once shutdownCh chan struct{} - // lastActivity records the UnixNano timestamp of the most recent RPC - // entry (set by UnaryActivityInterceptor). Used by the idle-exit ticker - // in cmd/runed to decide when to call TriggerShutdown. + // UnixNano of the most recent RPC; read by cmd/runed's idle ticker. lastActivity atomic.Int64 } -// New returns a Server that delegates Embed/EmbedBatch to backend and fills -// Info metadata from the given version and modelIdentity. max_text_length is -// snapshotted from the backend's ctx-size here (see Server.maxTextLength). -func New(b *backend.LlamaBackend, version, modelIdentity string) *Server { +// New returns a Server with backend unset; SetBackend wires it after +// bootstrap completes. +func New(version string) *Server { s := &Server{ - backend: b, - version: version, - modelIdentity: modelIdentity, - startedAt: time.Now(), - maxTextLength: int32(b.CtxSize()), - shutdownCh: make(chan struct{}), + version: version, + startedAt: time.Now(), + shutdownCh: make(chan struct{}), } + s.modelIdentity.Store("") s.lastActivity.Store(time.Now().UnixNano()) return s } +// SetBackend wires the backend after bootstrap. maxTextLength and +// modelIdentity are written before backend.Store so any reader seeing +// the backend pointer necessarily sees the other two. +func (s *Server) SetBackend(b *backend.LlamaBackend, modelIdentity string) { + s.maxTextLength.Store(int32(b.CtxSize())) + s.modelIdentity.Store(modelIdentity) + s.backend.Store(b) +} + +// SetBootstrapStatus updates the Phase / bytes / message that the next +// Health call returns under STATUS_LOADING. bytesTotal == 0 means the +// total size isn't yet known (no Content-Length observed yet). +func (s *Server) SetBootstrapStatus(phase runedv1.HealthResponse_Phase, bytesDone, bytesTotal int64, message string) { + s.bootstrapStatus.Store(&bootstrapState{ + phase: phase, + bytesDone: bytesDone, + bytesTotal: bytesTotal, + message: message, + }) +} + // ShutdownCh returns a channel that closes when a Shutdown RPC is received. // The daemon main() selects on this alongside OS signals to trigger graceful // termination; the channel is never sent on — only closed. @@ -146,28 +178,24 @@ func shortMethod(fullMethod string) string { // loop forever. const embedMaxAttempts = 2 -// Embed delegates to the backend's single-text embedding path. -// The proto dropped the normalize field (see commit 816ef81); the backend is -// called with normalize=true as a harmless default since llama-server always -// returns L2-normalized vectors anyway. +// Embed returns FAILED_PRECONDITION (not Unavailable) before SetBackend +// so client retry policies don't burn budget against a multi-minute +// bootstrap. // -// Backend may be suspended (idle-suspend) when this RPC arrives. EnsureStarted -// resurrects it under the daemon-lifetime context — the first request after a -// suspend pays the llama-server cold-start latency (~hundreds of ms to a few -// seconds for model load); subsequent requests fall through the cheap health- -// probe fast path. -// -// Retry loop: backend.Embed holds inflightMu.RLock so Stop can't kill an -// in-flight HTTP. The remaining race window is EnsureStarted-return → -// RLock-acquire; if Stop slips into that gap we get ErrNotStarted on the -// first attempt and recover by re-running EnsureStarted once. +// After wiring, EnsureStarted handles idle-suspend wake-up. The retry +// loop covers the EnsureStarted-return → RLock-acquire race where Stop +// slips in and surfaces ErrNotStarted. func (s *Server) Embed(ctx context.Context, req *runedv1.EmbedRequest) (*runedv1.EmbedResponse, error) { + b := s.backend.Load() + if b == nil { + return nil, status.Error(codes.FailedPrecondition, "daemon is bootstrapping; embed not yet available") + } s.requests.Add(1) for attempt := 0; attempt < embedMaxAttempts; attempt++ { - if err := s.backend.EnsureStarted(); err != nil { + if err := b.EnsureStarted(); err != nil { return nil, fmt.Errorf("backend not ready: %w", err) } - vec, err := s.backend.Embed(ctx, req.Text, true) + vec, err := b.Embed(ctx, req.Text, true) if err == nil { return &runedv1.EmbedResponse{Vector: vec}, nil } @@ -178,17 +206,19 @@ func (s *Server) Embed(ctx context.Context, req *runedv1.EmbedRequest) (*runedv1 return nil, fmt.Errorf("backend kept suspending between EnsureStarted and Embed") } -// EmbedBatch delegates to the backend's batch path and wraps each vector in -// an EmbedResponse so the proto response message stays composable with -// single-text Embed. See Embed godoc on EnsureStarted / cold-start behaviour -// and on the ErrNotStarted retry loop. +// EmbedBatch is the batch variant of Embed; see Embed for behaviour +// during bootstrap and idle-suspend. func (s *Server) EmbedBatch(ctx context.Context, req *runedv1.EmbedBatchRequest) (*runedv1.EmbedBatchResponse, error) { + b := s.backend.Load() + if b == nil { + return nil, status.Error(codes.FailedPrecondition, "daemon is bootstrapping; embed not yet available") + } s.requests.Add(1) for attempt := 0; attempt < embedMaxAttempts; attempt++ { - if err := s.backend.EnsureStarted(); err != nil { + if err := b.EnsureStarted(); err != nil { return nil, fmt.Errorf("backend not ready: %w", err) } - vecs, err := s.backend.EmbedBatch(ctx, req.Texts, true) + vecs, err := b.EmbedBatch(ctx, req.Texts, true) if err == nil { out := &runedv1.EmbedBatchResponse{ Embeddings: make([]*runedv1.EmbedResponse, len(vecs)), @@ -205,31 +235,55 @@ func (s *Server) EmbedBatch(ctx context.Context, req *runedv1.EmbedBatchRequest) return nil, fmt.Errorf("backend kept suspending between EnsureStarted and EmbedBatch") } -// Info returns static daemon metadata. Does not touch the backend — safe to -// call before Start() or during a DEGRADED state. +// Info returns static daemon metadata. MaxTextLength reads 0 before +// SetBackend; clients needing the final value should re-query after +// Health reports STATUS_OK. func (s *Server) Info(ctx context.Context, _ *runedv1.InfoRequest) (*runedv1.InfoResponse, error) { + mid, _ := s.modelIdentity.Load().(string) return &runedv1.InfoResponse{ DaemonVersion: s.version, - ModelIdentity: s.modelIdentity, + ModelIdentity: mid, VectorDim: vectorDim, - MaxTextLength: s.maxTextLength, + MaxTextLength: s.maxTextLength.Load(), MaxBatchSize: maxBatchSize, }, nil } -// Health maps backend readiness onto the proto Status enum. A nil backend or -// unhealthy probe yields DEGRADED; we never return an error from this RPC so -// clients can always read uptime as a liveness signal. +// Health maps backend state to the proto Status enum. SHUTTING_DOWN +// outranks LOADING/DEGRADED/OK so a drain-in-progress daemon doesn't +// advertise itself as ready (the GracefulStop race would otherwise +// surface Unavailable right after the OK response). +// +// Never returns an error — clients can read uptime as a liveness signal +// even when the other fields are zero-valued. func (s *Server) Health(ctx context.Context, _ *runedv1.HealthRequest) (*runedv1.HealthResponse, error) { - status := runedv1.HealthResponse_STATUS_OK - if s.backend == nil || !s.backend.IsHealthy(ctx) { - status = runedv1.HealthResponse_STATUS_DEGRADED - } - return &runedv1.HealthResponse{ - Status: status, + resp := &runedv1.HealthResponse{ UptimeSeconds: int64(time.Since(s.startedAt).Seconds()), TotalRequests: s.requests.Load(), - }, nil + } + select { + case <-s.shutdownCh: + resp.Status = runedv1.HealthResponse_STATUS_SHUTTING_DOWN + return resp, nil + default: + } + b := s.backend.Load() + if b == nil { + resp.Status = runedv1.HealthResponse_STATUS_LOADING + if bs := s.bootstrapStatus.Load(); bs != nil { + resp.Phase = bs.phase + resp.BytesDone = bs.bytesDone + resp.BytesTotal = bs.bytesTotal + resp.Message = bs.message + } + return resp, nil + } + if !b.IsHealthy(ctx) { + resp.Status = runedv1.HealthResponse_STATUS_DEGRADED + return resp, nil + } + resp.Status = runedv1.HealthResponse_STATUS_OK + return resp, nil } // Shutdown signals the daemon to begin graceful termination. It closes diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 42d7f2a..169c0e7 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -11,20 +11,26 @@ import ( runedv1 "github.com/CryptoLabInc/runed/gen/runed/v1" "github.com/CryptoLabInc/runed/internal/backend" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" ) -// newInProcessServer wires a Server{} to a loopback listener and returns a -// client plus a cleanup that shuts both ends down. It deliberately uses a -// random port (":0") so multiple tests can run in parallel without port clash. +// newInProcessServer wires a Server to a loopback listener. b == nil +// leaves the server pre-SetBackend (Health = LOADING, Embed = +// FAILED_PRECONDITION); non-nil b is wired with a synthetic model id. func newInProcessServer(t *testing.T, b *backend.LlamaBackend) (runedv1.RunedServiceClient, func()) { t.Helper() lis, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } + s := New("v0.1.0-test") + if b != nil { + s.SetBackend(b, "test-model-id") + } gs := grpc.NewServer() - runedv1.RegisterRunedServiceServer(gs, New(b, "v0.1.0-test", "test-model-id")) + runedv1.RegisterRunedServiceServer(gs, s) go gs.Serve(lis) conn, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -111,7 +117,7 @@ func TestServer_InfoMaxTextLengthTracksCtxSize(t *testing.T) { func TestServer_LastActivity_InitializedAtConstruction(t *testing.T) { before := time.Now() - s := New(backend.NewLlamaBackend(backend.Config{}), "vtest", "model-test") + s := New("vtest") after := time.Now() got := s.LastActivity() if got.Before(before) || got.After(after) { @@ -120,7 +126,7 @@ func TestServer_LastActivity_InitializedAtConstruction(t *testing.T) { } func TestServer_TriggerShutdown_Idempotent(t *testing.T) { - s := New(backend.NewLlamaBackend(backend.Config{}), "vtest", "model-test") + s := New("vtest") // Two concurrent TriggerShutdown calls must not panic (sync.Once). var wg sync.WaitGroup for i := 0; i < 4; i++ { @@ -140,7 +146,7 @@ func TestServer_TriggerShutdown_Idempotent(t *testing.T) { } func TestServer_UnaryActivityInterceptor_UpdatesLastActivity(t *testing.T) { - s := New(backend.NewLlamaBackend(backend.Config{}), "vtest", "model-test") + s := New("vtest") initial := s.LastActivity() time.Sleep(2 * time.Millisecond) // ensure new nanosecond bucket @@ -161,7 +167,7 @@ func TestServer_UnaryActivityInterceptor_UpdatesLastActivity(t *testing.T) { } } -// Before self-bootstrap provide progress, values should be zero +// Before self-bootstrap provides progress, values should be zero func TestServer_HealthBootstrapFieldsDefaultZero(t *testing.T) { b := backend.NewLlamaBackend(backend.Config{}) // not started — Health uses the unhealthy path client, cleanup := newInProcessServer(t, b) @@ -187,3 +193,97 @@ func TestServer_HealthBootstrapFieldsDefaultZero(t *testing.T) { t.Errorf("Message = %q, want empty", h.Message) } } + +func TestServer_HealthLoadingBeforeSetBackend(t *testing.T) { + client, cleanup := newInProcessServer(t, nil) + defer cleanup() + + h, err := client.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatal(err) + } + if h.Status != runedv1.HealthResponse_STATUS_LOADING { + t.Errorf("Status = %v, want STATUS_LOADING (backend not wired)", h.Status) + } +} + +func TestServer_HealthLoadingReflectsSetBootstrapStatus(t *testing.T) { + s := New("vtest") + s.SetBootstrapStatus( + runedv1.HealthResponse_PHASE_FETCHING_MODEL, + 123, 456, "downloading X") + + resp, err := s.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatalf("Health: %v", err) + } + if resp.Status != runedv1.HealthResponse_STATUS_LOADING { + t.Errorf("Status = %v, want STATUS_LOADING", resp.Status) + } + if resp.Phase != runedv1.HealthResponse_PHASE_FETCHING_MODEL { + t.Errorf("Phase = %v, want PHASE_FETCHING_MODEL", resp.Phase) + } + if resp.BytesDone != 123 || resp.BytesTotal != 456 { + t.Errorf("bytes: got done=%d total=%d, want 123/456", resp.BytesDone, resp.BytesTotal) + } + if resp.Message != "downloading X" { + t.Errorf("Message = %q, want %q", resp.Message, "downloading X") + } +} + +// FAILED_PRECONDITION (not Unavailable) so client retry policies don't +// consume budget against a non-ready daemon. +func TestServer_EmbedFailsBeforeBackendSet(t *testing.T) { + s := New("vtest") + _, err := s.Embed(context.Background(), &runedv1.EmbedRequest{Text: "x"}) + if err == nil { + t.Fatal("expected error before SetBackend") + } + if got := status.Code(err); got != codes.FailedPrecondition { + t.Errorf("code = %v, want %v", got, codes.FailedPrecondition) + } +} + +func TestServer_EmbedBatchFailsBeforeBackendSet(t *testing.T) { + s := New("vtest") + _, err := s.EmbedBatch(context.Background(), &runedv1.EmbedBatchRequest{Texts: []string{"x"}}) + if err == nil { + t.Fatal("expected error before SetBackend") + } + if got := status.Code(err); got != codes.FailedPrecondition { + t.Errorf("code = %v, want %v", got, codes.FailedPrecondition) + } +} + +func TestServer_HealthShuttingDownAfterTrigger(t *testing.T) { + s := New("vtest") + s.TriggerShutdown() + + resp, err := s.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatalf("Health: %v", err) + } + if resp.Status != runedv1.HealthResponse_STATUS_SHUTTING_DOWN { + t.Errorf("Status = %v, want STATUS_SHUTTING_DOWN", resp.Status) + } +} + +// Pins SHUTTING_DOWN > LOADING priority so future Health refactors +// don't surface LOADING during drain. +func TestServer_HealthShuttingDownOutranksLoading(t *testing.T) { + s := New("vtest") + // backend nil → LOADING candidate; bootstrap status would normally + // shape the LOADING response. + s.SetBootstrapStatus( + runedv1.HealthResponse_PHASE_FETCHING_MODEL, + 50, 100, "downloading model") + s.TriggerShutdown() + + resp, err := s.Health(context.Background(), &runedv1.HealthRequest{}) + if err != nil { + t.Fatalf("Health: %v", err) + } + if resp.Status != runedv1.HealthResponse_STATUS_SHUTTING_DOWN { + t.Errorf("Status = %v, want STATUS_SHUTTING_DOWN (must outrank LOADING)", resp.Status) + } +}