Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions backend/internal/adapters/telemetry/posthog.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ var remotePayloadAllowlist = map[string]map[string]struct{}{
"status": {},
"status_family": {},
},
"ao.onboarding.first_pr_merged": {},
"ao.onboarding.prereqs_checked": {
"all_ok": {},
"git_ok": {},
"github_ok": {},
"harness_ok": {},
"runtime_ok": {},
},
"ao.onboarding.prereqs_ready": {},
"ao.onboarding.first_pr_raised": {
"state": {},
},
"ao.onboarding.first_pr_reviewed": {
"decision": {},
},
"ao.onboarding.first_pr_revised": {},
"ao.onboarding.first_project_added": {
"has_git_remote": {},
"kind": {},
Expand All @@ -80,6 +96,23 @@ var remotePayloadAllowlist = map[string]map[string]struct{}{
"has_git_remote": {},
"kind": {},
},
"ao.session.first_agent_output": {
"harness": {},
"state": {},
},
"ao.session.pr_merged": {
"state": {},
},
"ao.session.pr_raised": {
"ci": {},
"mergeability": {},
"review": {},
"state": {},
},
"ao.session.pr_reviewed": {
"decision": {},
},
"ao.session.pr_revised": {},
"ao.session.spawn_failed": {
"component": {},
"duration_ms": {},
Expand Down
45 changes: 45 additions & 0 deletions backend/internal/adapters/telemetry/posthog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,51 @@ func TestPostHogSinkSanitizesPayloads(t *testing.T) {
}
}

func TestSanitizeRemotePayload_FunnelEventsAllowlisted(t *testing.T) {
cases := []struct {
event string
payload map[string]any
want map[string]any
}{
{
event: "ao.session.first_agent_output",
payload: map[string]any{"state": "active", "harness": "claude-code", "secret": "drop"},
want: map[string]any{"state": "active", "harness": "claude-code"},
},
{
event: "ao.session.pr_raised",
payload: map[string]any{"state": "open", "ci": "pending", "review": "none", "mergeability": "unknown", "url": "https://x/y"},
want: map[string]any{"state": "open", "ci": "pending", "review": "none", "mergeability": "unknown"},
},
{
event: "ao.onboarding.first_pr_raised",
payload: map[string]any{"state": "open", "url": "https://x/y"},
want: map[string]any{"state": "open"},
},
{
event: "ao.session.pr_reviewed",
payload: map[string]any{"decision": "approved", "url": "https://x/y"},
want: map[string]any{"decision": "approved"},
},
{
event: "ao.onboarding.prereqs_checked",
payload: map[string]any{"git_ok": true, "runtime_ok": true, "harness_ok": false, "github_ok": true, "all_ok": false, "path": "/Users/x"},
want: map[string]any{"git_ok": true, "runtime_ok": true, "harness_ok": false, "github_ok": true, "all_ok": false},
},
}
for _, tc := range cases {
got := sanitizeRemotePayload(tc.event, tc.payload)
if len(got) != len(tc.want) {
t.Fatalf("%s: sanitized = %#v, want %#v", tc.event, got, tc.want)
}
for k, v := range tc.want {
if got[k] != v {
t.Fatalf("%s: key %q = %#v, want %#v", tc.event, k, got[k], v)
}
}
}
}

type roundTripClient func(*http.Request) (*http.Response, error)

func (f roundTripClient) Do(req *http.Request) (*http.Response, error) { return f(req) }
Expand Down
15 changes: 15 additions & 0 deletions backend/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,21 @@ func Run() error {
return err
}

// Onboarding funnel telemetry. A single durable milestone marker gates the
// once-per-install events across the CDC subscriber and the prereqs probe.
onboardingMilestones := newMilestoneStore(cfg.DataDir)
// Turn live PR row changes into funnel telemetry (pr_raised = activation,
// pr_merged = success) plus their once-per-install onboarding milestones.
// Unsubscribe on shutdown so the broadcaster drops the closure.
unsubscribeOnboarding := startOnboardingCDC(cdcPipe.Broadcaster, telemetrySink, onboardingMilestones, log)
defer unsubscribeOnboarding()
// Stage 4: probe local prereqs off the boot path and emit prereqs_checked /
// prereqs_ready. The app supervisor starts the daemon on first launch, so
// this seeds the funnel without a first-run wizard.
go emitPrereqsTelemetry(ctx, telemetrySink, onboardingMilestones, func() bool {
return onboardingMilestones.claimed("prereqs_ready")
})

// Terminal streaming: the selected runtime (tmux on macOS/Linux, conpty on Windows) supplies the
// attach Stream and liveness; the CDC broadcaster feeds the session-state channel. The manager
// is handed to httpd, which mounts it at /mux. Raw PTY bytes never flow
Expand Down
206 changes: 206 additions & 0 deletions backend/internal/daemon/onboarding_cdc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package daemon

import (
"context"
"encoding/json"
"log/slog"
"os"
"path/filepath"
"sort"
"sync"
"time"

"github.com/aoagents/agent-orchestrator/backend/internal/cdc"
"github.com/aoagents/agent-orchestrator/backend/internal/domain"
"github.com/aoagents/agent-orchestrator/backend/internal/ports"
)

// milestoneStore claims one-time onboarding milestones durably. The CDC
// subscriber runs in a single poller-driven goroutine, but the marker is
// persisted so a milestone already reached in a prior daemon run is never
// re-emitted after a restart. Keyed by an opaque name (e.g. "first_pr_raised"
// or "pr_merged:<url>"). It is the funnel's once-per-install gate, the CDC
// analogue of the store-derived first-ness checks in the session service.
type milestoneStore struct {
mu sync.Mutex
path string
seen map[string]struct{}
}

func newMilestoneStore(dataDir string) *milestoneStore {
s := &milestoneStore{
path: filepath.Join(dataDir, "telemetry_milestones.json"),
seen: map[string]struct{}{},
}
if data, err := os.ReadFile(s.path); err == nil {
var names []string
if json.Unmarshal(data, &names) == nil {
for _, n := range names {
s.seen[n] = struct{}{}
}
}
}
return s
}

// claimed reports whether name was already recorded, without claiming it.
func (s *milestoneStore) claimed(name string) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.seen[name]
return ok
}

// claim records name and returns true only the first time it is seen.
func (s *milestoneStore) claim(name string) bool {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.seen[name]; ok {
return false
}
s.seen[name] = struct{}{}
names := make([]string, 0, len(s.seen))
for n := range s.seen {
names = append(names, n)
}
sort.Strings(names)
if data, err := json.Marshal(names); err == nil {
_ = os.WriteFile(s.path, data, 0o600)
}
return true
}

// prCDCPayload is the shape the pr_created/pr_updated triggers write into
// change_log (migration 0006). Only the fields the funnel needs are decoded.
type prCDCPayload struct {
URL string `json:"url"`
Session string `json:"session"`
State string `json:"state"`
CI string `json:"ci"`
Review string `json:"review"`
Mergeability string `json:"mergeability"`
}

// startOnboardingCDC subscribes to the CDC broadcaster and turns PR row changes
// into funnel telemetry: pr_created -> pr_raised (activation), pr_updated with
// state=merged -> pr_merged (success), each paired with a once-per-install
// onboarding milestone. The broadcaster only pushes live events, so this is a
// best-effort live signal; the milestone marker keeps first-* events exactly
// once across restarts. The subscriber callback must not block.
func startOnboardingCDC(bcast *cdc.Broadcaster, sink ports.EventSink, milestones *milestoneStore, log *slog.Logger) func() {
if bcast == nil || sink == nil || milestones == nil {
return func() {}
}
return bcast.Subscribe(func(ev cdc.Event) {
switch ev.Type {
case cdc.EventPRCreated:
emitPRRaised(sink, milestones, ev, log)
case cdc.EventPRUpdated:
emitPRMerged(sink, milestones, ev, log)
emitPRReviewed(sink, milestones, ev, log)
case cdc.EventPRReviewThreadResolved:
emitPRRevised(sink, milestones, ev)
}
})
}

func decodePRPayload(ev cdc.Event, log *slog.Logger) (prCDCPayload, bool) {
var p prCDCPayload
if err := json.Unmarshal(ev.Payload, &p); err != nil {
if log != nil {
log.Warn("onboarding cdc: decode pr payload", "type", ev.Type, "seq", ev.Seq, "err", err)
}
return prCDCPayload{}, false
}
return p, true
}

func emitPRRaised(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event, log *slog.Logger) {
p, ok := decodePRPayload(ev, log)
if !ok {
return
}
payload := map[string]any{"state": p.State, "ci": p.CI, "review": p.Review, "mergeability": p.Mergeability}
emitCDCTelemetry(sink, "ao.session.pr_raised", ev, payload)
if milestones.claim("first_pr_raised") {
emitCDCTelemetry(sink, "ao.onboarding.first_pr_raised", ev, map[string]any{"state": p.State})
}
}

func emitPRMerged(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event, log *slog.Logger) {
p, ok := decodePRPayload(ev, log)
if !ok || p.State != string(domain.PRStateMerged) {
return
}
// pr_updated fires on any tracked-field change, and a merged PR can still
// emit later updates (CI/review). Dedup the merge fact per PR URL so
// pr_merged is one event per PR.
if p.URL != "" && !milestones.claim("pr_merged:"+p.URL) {
return
}
emitCDCTelemetry(sink, "ao.session.pr_merged", ev, map[string]any{"state": p.State})
if milestones.claim("first_pr_merged") {
emitCDCTelemetry(sink, "ao.onboarding.first_pr_merged", ev, map[string]any{})
}
}

func emitPRReviewed(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event, log *slog.Logger) {
p, ok := decodePRPayload(ev, log)
if !ok {
return
}
if p.Review != string(domain.ReviewApproved) && p.Review != string(domain.ReviewChangesRequest) {
return
}
// pr_updated fires on any tracked-field change; dedup per (PR, decision) so
// each distinct human verdict is one pr_reviewed event.
if p.URL != "" && !milestones.claim("pr_reviewed:"+p.URL+":"+p.Review) {
return
}
emitCDCTelemetry(sink, "ao.session.pr_reviewed", ev, map[string]any{"decision": p.Review})
if milestones.claim("first_pr_reviewed") {
emitCDCTelemetry(sink, "ao.onboarding.first_pr_reviewed", ev, map[string]any{"decision": p.Review})
}
}

// prThreadPayload is the pr_review_thread_resolved trigger shape (migration
// 0004): a resolved review thread is the cleanest "agent addressed feedback"
// signal available without tracking review history.
type prThreadPayload struct {
PR string `json:"pr"`
Thread string `json:"thread"`
}

func emitPRRevised(sink ports.EventSink, milestones *milestoneStore, ev cdc.Event) {
var p prThreadPayload
if json.Unmarshal(ev.Payload, &p) != nil {
return
}
// One revision signal per resolved thread.
if p.Thread != "" && !milestones.claim("pr_revised:"+p.PR+":"+p.Thread) {
return
}
emitCDCTelemetry(sink, "ao.session.pr_revised", ev, map[string]any{})
if milestones.claim("first_pr_revised") {
emitCDCTelemetry(sink, "ao.onboarding.first_pr_revised", ev, map[string]any{})
}
}

func emitCDCTelemetry(sink ports.EventSink, name string, ev cdc.Event, payload map[string]any) {
out := ports.TelemetryEvent{
Name: name,
Source: "cdc",
OccurredAt: time.Now().UTC(),
Level: ports.TelemetryLevelInfo,
Payload: payload,
}
if ev.ProjectID != "" {
projectID := domain.ProjectID(ev.ProjectID)
out.ProjectID = &projectID
}
if ev.SessionID != "" {
sessionID := domain.SessionID(ev.SessionID)
out.SessionID = &sessionID
}
sink.Emit(context.Background(), out)
}
Loading
Loading