Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 123 additions & 53 deletions cmd/runed/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
// match. To bypass self-bootstrap entirely, set both RUNED_LLAMA_SERVER
// and RUNED_MODEL to absolute paths.
//
// The gRPC UDS opens before self-bootstrap so clients can dial immediately
// and poll Health for STATUS_LOADING + Phase/bytes progress instead of
// seeing dial failures during the multi-minute install window. Embed and
// EmbedBatch return FAILED_PRECONDITION until the backend is wired.
//
// Optional environment:
//
// RUNED_HOME Data directory (default: $HOME/.runed).
Expand Down Expand Up @@ -99,11 +104,56 @@ func run() error {
}
}

// ctx-size: RUNED_CTX_SIZE if valid, else backend default (2048).
ctxSize := 0
if v := os.Getenv("RUNED_CTX_SIZE"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
ctxSize = n
} else {
logger.Warn("invalid RUNED_CTX_SIZE, using default", "value", v)
}
}

// Listen before self-bootstrap so clients can poll Health for
// STATUS_LOADING + Phase/progress instead of dial failures during
// the multi-minute install window. SetBackend below flips Health
// to STATUS_OK once llama-server is up.
srv := server.New(daemonVersion)
lis, err := ipc.Listen(sockPath)
if err != nil {
return fmt.Errorf("ipc listen: %w", err)
}
logger.Info("listening", "socket", sockPath)

gs := grpc.NewServer(grpc.UnaryInterceptor(srv.UnaryActivityInterceptor()))
runedv1.RegisterRunedServiceServer(gs, srv)

// Serve in a goroutine so main can block on signals/Shutdown/serve error.
// gs.Serve returns nil on graceful stop; any non-nil err is a real fault.
serveErr := make(chan error, 1)
go func() {
if err := gs.Serve(lis); err != nil {
serveErr <- err
}
close(serveErr)
}()

// Forward bootstrap progress into Health via stage→Phase mapping.
reporter := bootstrap.StatusReporter(func(stage string, done, total int64) {
phase, msg := stagePhase(stage)
srv.SetBootstrapStatus(phase, done, total, msg)
})

llamaBin := os.Getenv("RUNED_LLAMA_SERVER")
model := os.Getenv("RUNED_MODEL")
if llamaBin == "" || model == "" {
bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "")
// Manifest fetch has no dedicated Phase enum; use UNSPECIFIED and
// convey the stage through message. selfBootstrap overwrites
// Phase on entering ensure{LlamaServer,Model}.
srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_UNSPECIFIED, 0, 0, "fetching manifest")
bin, mp, err := selfBootstrap(ctx, logger, paths, llamaBin == "", model == "", reporter)
if err != nil {
bailBoot(logger, srv, gs, nil)
return err
}
if llamaBin == "" {
Expand All @@ -117,18 +167,11 @@ func run() error {
"llama_server", llamaBin, "model", model)
}

// ctx-size: RUNED_CTX_SIZE if valid, else backend default (2048).
ctxSize := 0
if v := os.Getenv("RUNED_CTX_SIZE"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
ctxSize = n
} else {
logger.Warn("invalid RUNED_CTX_SIZE, using default", "value", v)
}
}
srv.SetBootstrapStatus(runedv1.HealthResponse_PHASE_STARTING_LLAMA_SERVER, 0, 0, "starting llama-server")

modelID, err := sha256File(model)
if err != nil {
bailBoot(logger, srv, gs, nil)
return fmt.Errorf("model hash: %w", err)
}
logger.Info("model identity", "sha256", modelID, "path", model)
Expand All @@ -146,35 +189,19 @@ func run() error {
// backend.Start already bounds start-up on its own (~15s health poll +
// early-exit detection via the stderr scanner).
if err := b.Start(ctx); err != nil {
// b.Start may have spawned a child that failed health-probe;
// bailBoot's b.Stop reaps it.
bailBoot(logger, srv, gs, b)
return fmt.Errorf("backend start: %w", err)
}
logger.Info("llama-server ready", "port", b.Port())

lis, err := ipc.Listen(sockPath)
if err != nil {
// Best-effort backend cleanup — we have no logger-side context to thread here.
_ = b.Stop(context.Background())
return fmt.Errorf("ipc listen: %w", err)
}
logger.Info("listening", "socket", sockPath)

srv := server.New(b, daemonVersion, modelID)
gs := grpc.NewServer(grpc.UnaryInterceptor(srv.UnaryActivityInterceptor()))
runedv1.RegisterRunedServiceServer(gs, srv)

// Serve in a goroutine so main can block on signals/Shutdown/serve error.
// gs.Serve returns nil on graceful stop; any non-nil err is a real fault.
serveErr := make(chan error, 1)
go func() {
if err := gs.Serve(lis); err != nil {
serveErr <- err
}
close(serveErr)
}()
// Flip Health to STATUS_OK and accept Embed/EmbedBatch.
srv.SetBackend(b, modelID)

idleTimeout, err := parseIdleTimeout()
if err != nil {
_ = b.Stop(context.Background())
bailBoot(logger, srv, gs, b)
return fmt.Errorf("RUNED_IDLE_TIMEOUT: %w", err)
}
if idleTimeout > 0 {
Expand Down Expand Up @@ -222,20 +249,28 @@ func run() error {
// SIGKILL cannot be intercepted from user space.
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

// All exit triggers share the cleanup below. Only a serve fault yields a
// non-zero process exit; capture it and return after cleanup rather than
// bailing early, so the drain-then-stop ordering still applies.
var exitErr error
select {
case s := <-sigCh:
logger.Info("received signal", "signal", s.String())
case <-srv.ShutdownCh():
logger.Info("received Shutdown RPC")
case err := <-serveErr:
// Non-nil err is a real fault (Serve returns nil only on graceful stop).
if err != nil {
logger.Error("gRPC serve error", "err", err)
// Still fall through to backend cleanup below.
_ = b.Stop(context.Background())
return fmt.Errorf("serve: %w", err)
exitErr = fmt.Errorf("serve: %w", err)
}
}

// Flip Health to STATUS_SHUTTING_DOWN for all exit triggers (OS
// signal / serve error / Shutdown RPC). sync.Once makes a redundant
// call a no-op.
srv.TriggerShutdown()
Comment thread
jh-lee-cryptolab marked this conversation as resolved.

// Phase 1: drain in-flight RPCs. GracefulStop blocks until all active
// handlers return; 10s is a safety net for a wedged client.
logger.Info("draining in-flight requests")
Expand All @@ -259,29 +294,23 @@ func run() error {
logger.Warn("backend stop returned error", "err", err)
}
logger.Info("shutdown complete")
return nil
return exitErr
}

// selfBootstrap fetches the manifest and ensures llama-server and the
// resolved model variant are present under paths. Called whenever
// RUNED_LLAMA_SERVER or RUNED_MODEL is unset; bypassed entirely when
// both are provided.
//
// needLlama / needModel reflect which side(s) the caller still needs.
// Only those sides are downloaded — the env-overridden side is skipped
// so its manifest artifact isn't fetched only to be discarded. When
// both are needed, EnsureAll's single-lock fast path is used.
func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool) (binPath, modelPath string, err error) {
// selfBootstrap fetches the manifest and installs the side(s) the
// caller needs. needLlama / needModel reflect which env paths are
// unset; only those sides are downloaded so an env override doesn't
// pay for an artifact it will discard. Both needed → EnsureAll's
// single-lock fast path.
func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Paths, needLlama, needModel bool, reporter bootstrap.StatusReporter) (binPath, modelPath string, err error) {
manifestURL := bootstrap.ResolveManifestURL()
if manifestURL == "" {
return "", "", fmt.Errorf(
"self-bootstrap needed (RUNED_LLAMA_SERVER or RUNED_MODEL unset) but no manifest URL: set %s or rebuild with DEFAULT_MANIFEST_URL",
bootstrap.EnvManifest)
}
// HTTPS isn't enforced because a private network may legitimately serve
// the manifest over plain HTTP. But callers should know: a MITM that
// rewrites the manifest can also rewrite the SHA256s, so artifact
// integrity collapses to "trust the manifest channel."
// A MITM that rewrites the manifest can also rewrite the SHA256s,
// so artifact integrity collapses to "trust the manifest channel."
if !strings.HasPrefix(manifestURL, "https://") {
logger.Warn("manifest URL is not HTTPS; artifact integrity rests on manifest channel trust",
"url", manifestURL)
Expand All @@ -298,7 +327,7 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa
switch {
case needLlama && needModel:
var variant string
binPath, modelPath, variant, err = bootstrap.EnsureAll(ctx, paths, mani, logger)
binPath, modelPath, variant, err = bootstrap.EnsureAll(ctx, paths, mani, logger, reporter)
if err != nil {
return "", "", fmt.Errorf("bootstrap install: %w", err)
}
Expand All @@ -307,15 +336,15 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa
"model", modelPath,
"variant", variant)
case needLlama:
binPath, err = bootstrap.EnsureLlamaServer(ctx, paths, mani, logger)
binPath, err = bootstrap.EnsureLlamaServer(ctx, paths, mani, logger, reporter)
if err != nil {
return "", "", fmt.Errorf("bootstrap install: %w", err)
}
logger.Info("self-bootstrap ready (llama-server only; model from env)",
"llama_server", binPath)
case needModel:
var variant string
modelPath, variant, err = bootstrap.EnsureModel(ctx, paths, mani, logger)
modelPath, variant, err = bootstrap.EnsureModel(ctx, paths, mani, logger, reporter)
if err != nil {
return "", "", fmt.Errorf("bootstrap install: %w", err)
}
Expand All @@ -328,6 +357,47 @@ func selfBootstrap(ctx context.Context, logger *slog.Logger, paths *bootstrap.Pa
return binPath, modelPath, nil
}

// stagePhase maps a bootstrap stage name to its proto Phase + message.
// PHASE_UNSPECIFIED on unknown stages keeps the surface forward-
// compatible with future stages.
func stagePhase(stage string) (runedv1.HealthResponse_Phase, string) {
switch stage {
case "llama_server":
return runedv1.HealthResponse_PHASE_FETCHING_LLAMA_SERVER, "fetching llama-server"
case "model":
return runedv1.HealthResponse_PHASE_FETCHING_MODEL, "fetching embedding model"
default:
return runedv1.HealthResponse_PHASE_UNSPECIFIED, ""
}
}

// bailBoot runs the normal-exit cleanup sequence for boot-time failures
// so clients see one final STATUS_SHUTTING_DOWN instead of a sudden
// disconnect. b may be nil; b.Stop is idempotent.
func bailBoot(logger *slog.Logger, srv *server.Server, gs *grpc.Server, b *backend.LlamaBackend) {
srv.TriggerShutdown()
stopGRPC(logger, gs)
if b != nil {
_ = b.Stop(context.Background())
}
}

// stopGRPC drives a 10s-bounded GracefulStop with force-Stop fallback.
func stopGRPC(logger *slog.Logger, gs *grpc.Server) {
graceDone := make(chan struct{})
go func() {
gs.GracefulStop()
close(graceDone)
}()
select {
case <-graceDone:
case <-time.After(10 * time.Second):
logger.Warn("graceful stop timed out, forcing")
gs.Stop()
<-graceDone
}
}

// anotherDaemonReachable returns true if a UDS listener accepts our dial
// at sockPath within 500ms. False covers both "file missing" and "file
// exists but nobody listening" (the latter is a stale-socket case the
Expand Down
Loading
Loading