Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion envd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,25 @@ func (rw *recordingWriter) Flush() {
}
}

// maxUnaryBody caps a Connect-RPC unary request body. Sized for the largest
// unary payload Safari sends: a sync_skill zip_data of MaxSkillZipBytes
// (10 MiB) base64-encodes to ~13.3 MiB, leaving >2 MiB of JSON-envelope
// headroom under 16 MiB. Safari's knowledge staging chunks itself well below
// this (5 MiB raw per batch).
const maxUnaryBody = 16 << 20

// readUnary decodes a Connect-RPC unary request body into dst.
func readUnary(r *http.Request, dst any) error {
if r.Method != http.MethodPost {
return errBadMethod
}
body, err := io.ReadAll(io.LimitReader(r.Body, 8<<20)) // 8 MB cap on unary body
body, err := io.ReadAll(io.LimitReader(r.Body, maxUnaryBody+1))
if err != nil {
return fmt.Errorf("read request body: %w", err)
}
if len(body) > maxUnaryBody {
return fmt.Errorf("request body exceeds %d-byte unary cap", maxUnaryBody)
}
if len(body) == 0 {
return nil // empty body == zero-valued request
}
Expand Down
28 changes: 28 additions & 0 deletions envd/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,31 @@ func TestReadUnary_Decodes(t *testing.T) {
t.Fatalf("dst=%v", dst)
}
}

// The unary cap must clear the protocol's largest payload: a sync_skill
// zip_data at Safari's MaxSkillZipBytes (10 MiB) is ~13.3 MiB after base64
// plus the JSON envelope. 14 MiB of real payload must decode fine.
func TestReadUnary_AcceptsLargestProtocolPayload(t *testing.T) {
big := strings.Repeat("a", 14<<20)
body, _ := json.Marshal(map[string]string{"zip_data": big})
r := httptest.NewRequestWithContext(context.Background(), http.MethodPost, "/svc/m", bytes.NewReader(body))
var dst map[string]string
if err := readUnary(r, &dst); err != nil {
t.Fatal(err)
}
if len(dst["zip_data"]) != len(big) {
t.Fatalf("payload truncated: got %d bytes", len(dst["zip_data"]))
}
}

// Over the cap the failure must say so, not surface as a cryptic JSON decode
// error from a silently truncated body.
func TestReadUnary_RejectsOversizedBodyExplicitly(t *testing.T) {
r := httptest.NewRequestWithContext(context.Background(), http.MethodPost, "/svc/m",
bytes.NewReader(make([]byte, maxUnaryBody+1)))
var dst map[string]string
err := readUnary(r, &dst)
if err == nil || !strings.Contains(err.Error(), "unary cap") {
t.Fatalf("want explicit cap error, got %v", err)
}
}
4 changes: 2 additions & 2 deletions envd/safari_knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func (s *Server) handleSyncSkill(w http.ResponseWriter, r *http.Request) {
if !s.requireWorkspace(w) {
return
}
// TODO: readUnary 8MB cap — switch to streaming if skill payloads outgrow it.
// Largest unary in the protocol: zip_data is capped at Safari's
// MaxSkillZipBytes (10 MiB), which fits maxUnaryBody after base64.
var req protocol.SyncSkillArgs
if err := readUnary(r, &req); err != nil {
writeUnary(w, nil, err)
Expand All @@ -63,7 +64,6 @@ func (s *Server) handlePackFolder(w http.ResponseWriter, r *http.Request) {
if !s.requireWorkspace(w) {
return
}
// TODO: readUnary 8MB cap — switch to streaming if folder payloads outgrow it.
var req protocol.PackFolderArgs
if err := readUnary(r, &req); err != nil {
writeUnary(w, nil, err)
Expand Down
53 changes: 44 additions & 9 deletions environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/bmatcuk/doublestar/v4"
Expand All @@ -33,6 +34,11 @@ type Environment struct {
root string
checker *permission.Checker
mcpMgr *mcp.ClientManager
// skillInstallMu serializes skill installs: two concurrent installs of
// the same skill would interleave their RemoveAll+Rename swaps (the
// loser failing with ENOTEMPTY), and the orphaned-staging sweep must
// never race a live install's staging dir.
skillInstallMu sync.Mutex
}

// New creates a new workspace with the given root directory and permission checker.
Expand Down Expand Up @@ -1082,32 +1088,61 @@ func (e *Environment) SyncSkill(ctx context.Context, args *protocol.SyncSkillArg
return &protocol.SyncSkillResult{Success: true, Cached: false}, nil
}

// Install: decode, wipe, unzip, write .checksum.
// Install via staging dir + swap: unzip and .checksum land in a sibling
// temp dir, which replaces the live dir only once complete. A bad archive
// or mid-extract crash never destroys the previously installed version;
// a reader in the brief RemoveAll→Rename window sees a missing dir and
// re-probes — never a half-extracted one.
zipData, err := base64.StdEncoding.DecodeString(args.ZipData)
if err != nil {
return nil, fmt.Errorf("failed to decode zip data: %w", err)
}
if err := e.unzipSkill(zipData, skillDir); err != nil {

e.skillInstallMu.Lock()
defer e.skillInstallMu.Unlock()

parentDir := filepath.Dir(skillDir)
if mkErr := os.MkdirAll(parentDir, 0o755); mkErr != nil {
return nil, fmt.Errorf("failed to create skills directory: %w", mkErr)
}
// Sweep staging dirs orphaned by a hard crash (the defer below never ran).
// Safe under skillInstallMu — no other install is live.
if leftovers, _ := filepath.Glob(filepath.Join(parentDir, ".installing-*")); len(leftovers) > 0 {
for _, d := range leftovers {
_ = os.RemoveAll(d)
}
}
// Deterministic staging name is safe: installs are serialized by
// skillInstallMu and the sweep above just cleared any leftover.
tmpDir := filepath.Join(parentDir, ".installing-"+args.SkillName)
if mkErr := os.Mkdir(tmpDir, 0o755); mkErr != nil {
return nil, fmt.Errorf("failed to create staging directory: %w", mkErr)
}
defer func() { _ = os.RemoveAll(tmpDir) }() // no-op after a successful rename
if err := e.unzipSkill(zipData, tmpDir); err != nil {
return nil, fmt.Errorf("failed to unzip skill: %w", err)
}
if err := os.WriteFile(filepath.Join(skillDir, ".checksum"), []byte(args.Checksum), 0o644); err != nil {
if err := os.WriteFile(filepath.Join(tmpDir, ".checksum"), []byte(args.Checksum), 0o644); err != nil {
return nil, fmt.Errorf("failed to write checksum: %w", err)
}
if err := os.RemoveAll(skillDir); err != nil {
return nil, fmt.Errorf("failed to remove existing directory: %w", err)
}
if err := os.Rename(tmpDir, skillDir); err != nil {
return nil, fmt.Errorf("failed to activate skill directory: %w", err)
}
return &protocol.SyncSkillResult{Success: true, Path: skillDir, Cached: false}, nil
}

// unzipSkill extracts a zip archive to the destination directory.
// unzipSkill extracts a zip archive into dest, which the caller supplies as a
// fresh staging directory — installs become visible only via the caller's
// rename, so this function never deletes anything.
func (e *Environment) unzipSkill(data []byte, dest string) error {
reader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
if err != nil {
return fmt.Errorf("failed to read zip archive: %w", err)
}

// Remove existing directory if exists
if rmErr := os.RemoveAll(dest); rmErr != nil {
return fmt.Errorf("failed to remove existing directory: %w", rmErr)
}

if mkErr := os.MkdirAll(dest, 0o755); mkErr != nil {
return fmt.Errorf("failed to create destination directory: %w", mkErr)
}
Expand Down
47 changes: 46 additions & 1 deletion environment/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func TestSyncSkill_InstallOverwrites(t *testing.T) {
assert.False(t, res.Cached, "install must report Cached=false")
assert.Equal(t, resolveDir(t, dir), res.Path)

// Old file should be gone (unzipSkill does RemoveAll before extracting).
// Old file should be gone (install swaps in a freshly extracted dir).
_, err = os.Stat(filepath.Join(dir, "old-file.txt"))
assert.True(t, errors.Is(err, os.ErrNotExist), "old file should be wiped")

Expand All @@ -580,3 +580,48 @@ func TestSyncSkill_InstallOverwrites(t *testing.T) {
sum, _ := os.ReadFile(filepath.Join(dir, ".checksum"))
assert.Equal(t, "new", string(sum))
}

// A failed install must not destroy the previously installed version: the new
// archive is extracted into a staging dir and only swapped in on success, so
// a corrupt zip leaves the old skill fully usable (probe still hits) and no
// staging litter behind.
func TestSyncSkill_BadZipKeepsExistingInstall(t *testing.T) {
ws := newTestEnvironment(t)
dir := filepath.Join(ws.Root(), "skills", "demo")

zipBytes := buildTestZip(t, map[string]string{
"SKILL.md": "---\nname: demo\ndescription: x\n---\nv1",
})
_, err := ws.SyncSkill(context.Background(), &protocol.SyncSkillArgs{
SkillName: "demo",
Checksum: "v1",
ZipData: base64.StdEncoding.EncodeToString(zipBytes),
})
require.NoError(t, err)

_, err = ws.SyncSkill(context.Background(), &protocol.SyncSkillArgs{
SkillName: "demo",
Checksum: "v2",
ZipData: base64.StdEncoding.EncodeToString([]byte("not a zip archive")),
})
require.Error(t, err, "corrupt zip must fail the install")

body, readErr := os.ReadFile(filepath.Join(dir, "SKILL.md"))
require.NoError(t, readErr, "old SKILL.md must survive a failed install")
assert.Contains(t, string(body), "v1")
sum, _ := os.ReadFile(filepath.Join(dir, ".checksum"))
assert.Equal(t, "v1", string(sum), "old checksum must survive so the probe still hits")

res, err := ws.SyncSkill(context.Background(), &protocol.SyncSkillArgs{
SkillName: "demo",
Checksum: "v1",
})
require.NoError(t, err)
assert.True(t, res.Cached, "probe must still hit the surviving v1 install")

entries, err := os.ReadDir(filepath.Join(ws.Root(), "skills"))
require.NoError(t, err)
for _, e := range entries {
assert.NotContains(t, e.Name(), ".installing-", "staging dir must be cleaned up on failure")
}
}
39 changes: 39 additions & 0 deletions environment/knowledge.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ func (e *Environment) StageKnowledgeFiles(ctx context.Context, args *protocol.St
// triggers a fresh lazy install from Safari). Files whose checksum already
// matches are left in place.
//
// The orphan prune is scope-bounded when the caller sends scope lists (see
// ReconcileKnowledgeManifestArgs): inside a staged scope the manifest is
// authoritative and orphans go; a scope that is valid account-wide but not
// staged by this session is another session's territory and is left alone; a
// scope outside ValidScopes no longer exists upstream and is pruned whole.
// Without scope lists (an older Safari) every orphan is pruned, as before.
//
// The runner does NOT pre-stage anything in this call: the manifest declares
// what *should* exist if it were read, not what *must* be cached. Cold packs
// stay cold; only drift is corrected. The whole pass runs under the sentinel
Expand All @@ -261,6 +268,18 @@ func (e *Environment) ReconcileKnowledgeManifest(ctx context.Context, args *prot
}
expected[f.RelPath] = f.Checksum
}
staged := make(map[string]struct{}, len(args.StagedScopes))
for _, s := range args.StagedScopes {
staged[s] = struct{}{}
}
valid := make(map[string]struct{}, len(args.ValidScopes))
for _, s := range args.ValidScopes {
valid[s] = struct{}{}
}
// ValidScopes empty means either an older Safari (no scope info) or an
// account with zero packs (everything on disk is residue) — the legacy
// global orphan prune is the right behavior for both.
legacyPrune := len(valid) == 0

result := &protocol.ReconcileKnowledgeManifestResult{}
knowledgeRoot := filepath.Join(e.root, "knowledge")
Expand All @@ -285,6 +304,16 @@ func (e *Environment) ReconcileKnowledgeManifest(ctx context.Context, args *prot
expectedSum, want := expected[relPath]
switch {
case !want:
if !legacyPrune {
scope := knowledgeScope(relPath)
_, isValid := valid[scope]
_, isStaged := staged[scope]
if isValid && !isStaged {
// Another session's scope on this shared runner —
// the manifest has no authority over it.
continue
}
}
// Orphan: not declared in the current manifest.
if rmErr := os.Remove(filepath.Join(e.root, relPath)); rmErr != nil && !os.IsNotExist(rmErr) {
slog.Warn("failed to prune orphan knowledge file", "rel_path", relPath, "error", rmErr)
Expand Down Expand Up @@ -332,6 +361,16 @@ func (e *Environment) ReconcileKnowledgeManifest(ctx context.Context, args *prot
return result, nil
}

// knowledgeScope extracts the scope segment ("account" or "team_<id>") from a
// validated `knowledge/<scope>/<leaf…>` rel-path.
func knowledgeScope(relPath string) string {
parts := strings.SplitN(relPath, "/", 3)
if len(parts) < 2 {
return ""
}
return parts[1]
}

// walkKnowledgeTree returns every leaf file anywhere under
// <knowledgeRoot>/<scope>/… as `knowledge/<scope>/<leaf…>` rel-paths,
// recursing into subdirectories so nested files are reconciled too. Hidden
Expand Down
92 changes: 92 additions & 0 deletions environment/knowledge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,95 @@ func TestReconcileKnowledgeManifest_IgnoresSentinelAndUnknownDirs(t *testing.T)
_, err = os.Stat(filepath.Join(rogueScope, "stuff.md"))
assert.NoError(t, err)
}

// Scope-bounded reconcile: on a runner shared by concurrent sessions, an
// over-cap manifest covers only its own session's scopes. Orphans inside a
// staged scope are pruned, a valid-but-unstaged scope (another session's
// mount) is left untouched, and a scope absent from ValidScopes (its pack was
// deleted upstream) is pruned whole.
func TestReconcileKnowledgeManifest_ScopedPrune(t *testing.T) {
ws := newTestEnvironment(t)
ctx := context.Background()

stageOne(t, ws, "knowledge/account/runbook.md", "sum-a", "A")
stageOne(t, ws, "knowledge/account/orphan.md", "sum-o", "O")
stageOne(t, ws, "knowledge/team_1/notes.md", "sum-n", "N")
stageOne(t, ws, "knowledge/team_2/other.md", "sum-x", "X") // another session's mount
stageOne(t, ws, "knowledge/team_9/gone.md", "sum-g", "G") // pack deleted upstream

res, err := ws.ReconcileKnowledgeManifest(ctx, &protocol.ReconcileKnowledgeManifestArgs{
Files: []protocol.KnowledgeManifestEntry{
{RelPath: "knowledge/account/runbook.md", Checksum: "sum-a"},
{RelPath: "knowledge/team_1/notes.md", Checksum: "sum-n"},
},
StagedScopes: []string{"account", "team_1"},
ValidScopes: []string{"account", "team_1", "team_2"},
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"knowledge/account/orphan.md", "knowledge/team_9/gone.md"}, res.Pruned)
assert.Equal(t, 2, res.KeptCount)
assert.Empty(t, res.NeedsStage)

// The foreign scope survives byte-for-byte, sentinel entry intact — this
// is the cross-session thrash fix.
assert.Equal(t, "X", fileContent(t, ws.Root(), "knowledge/team_2/other.md"))
sentinel := readSentinelMap(t, ws.Root())
assert.Equal(t, "sum-x", sentinel["knowledge/team_2/other.md"])
_, hasGone := sentinel["knowledge/team_9/gone.md"]
assert.False(t, hasGone, "deleted-pack residue must be scrubbed from the sentinel")
_, err = os.Stat(filepath.Join(ws.Root(), "knowledge/team_9/gone.md"))
assert.True(t, os.IsNotExist(err))
}

// A session that staged nothing (over-cap, unbound, no mounts) still sends
// the account-wide ValidScopes list. Nothing it doesn't cover may be orphan-
// pruned — only deleted-pack residue goes.
func TestReconcileKnowledgeManifest_EmptyStagedScopesKeepsValidScopes(t *testing.T) {
ws := newTestEnvironment(t)
ctx := context.Background()

stageOne(t, ws, "knowledge/team_2/other.md", "sum-x", "X")
stageOne(t, ws, "knowledge/team_9/gone.md", "sum-g", "G")

res, err := ws.ReconcileKnowledgeManifest(ctx, &protocol.ReconcileKnowledgeManifestArgs{
ValidScopes: []string{"team_2"},
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"knowledge/team_9/gone.md"}, res.Pruned)
assert.Equal(t, "X", fileContent(t, ws.Root(), "knowledge/team_2/other.md"))
}

// Without scope lists (an older Safari) the orphan prune stays global — the
// pre-scope behavior is the explicit compatibility contract.
func TestReconcileKnowledgeManifest_NoScopesIsLegacyGlobalPrune(t *testing.T) {
ws := newTestEnvironment(t)
ctx := context.Background()

stageOne(t, ws, "knowledge/account/keep.md", "sum-k", "K")
stageOne(t, ws, "knowledge/team_2/other.md", "sum-x", "X")

res, err := ws.ReconcileKnowledgeManifest(ctx, &protocol.ReconcileKnowledgeManifestArgs{
Files: []protocol.KnowledgeManifestEntry{
{RelPath: "knowledge/account/keep.md", Checksum: "sum-k"},
},
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"knowledge/team_2/other.md"}, res.Pruned)
}

// Defensive: a scope listed in StagedScopes but missing from ValidScopes
// (degenerate caller input — Safari always sends staged ⊆ valid) is treated
// as authoritative, so its orphans are pruned like any staged scope.
func TestReconcileKnowledgeManifest_StagedButNotValidStillPrunes(t *testing.T) {
ws := newTestEnvironment(t)
ctx := context.Background()

stageOne(t, ws, "knowledge/team_1/orphan.md", "sum-o", "O")

res, err := ws.ReconcileKnowledgeManifest(ctx, &protocol.ReconcileKnowledgeManifestArgs{
StagedScopes: []string{"team_1"},
ValidScopes: []string{"account"},
})
require.NoError(t, err)
assert.ElementsMatch(t, []string{"knowledge/team_1/orphan.md"}, res.Pruned)
}
Loading
Loading