Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions cmd/ateapi/internal/controlapi/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (s *WorkerPoolSyncer) Start(ctx context.Context) {
return
}
slog.InfoContext(ctx, "Syncer: removing worker from store", slog.String("worker", pod.Namespace+"/"+pod.Name))
if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil {
slog.ErrorContext(ctx, "Failed to release actor bound to deleted worker", slog.Any("err", err))
}
err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name)
if err != nil {
slog.ErrorContext(ctx, "Failed to delete worker from store during delete event", slog.Any("err", err))
Expand Down Expand Up @@ -97,6 +100,9 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po

if pod.DeletionTimestamp != nil {
slog.InfoContext(ctx, "Syncer: removing worker from store (pod deleting)", slog.String("worker", pod.Namespace+"/"+pod.Name))
if err := s.releaseActorOnDeadWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name); err != nil {
slog.ErrorContext(ctx, "Failed to release actor bound to soft-deleting worker", slog.Any("err", err))
}
err := s.persistence.DeleteWorker(ctx, pod.Namespace, pod.Labels[workerPodLabel], pod.Name)
if err != nil {
slog.ErrorContext(ctx, "Failed to delete worker from store during update event (deleting)", slog.Any("err", err))
Expand Down Expand Up @@ -136,3 +142,50 @@ func (s *WorkerPoolSyncer) syncWorkerToStore(ctx context.Context, pod *corev1.Po
func isWorkerEligible(pod *corev1.Pod) bool {
return pod.Status.PodIP != ""
}

// releaseActorOnDeadWorker resets the actor bound to a vanishing worker
// pod back to STATUS_SUSPENDED so the next request reassigns it.
//
// UpdateActor uses optimistic version checking. A concurrent SuspendActor
// or ResumeActor wins; we drop this attempt silently.
//
// Best-effort only. The caller always proceeds to DeleteWorker after this
// returns, so any non-contention failure leaves the actor stranded
// (STATUS_RUNNING, pointer at a pod that no longer exists). Recovery
// then needs a manual SuspendActor.
//
// The long-term fix is a finalizer-based controller that holds the pod
// in Terminating state until the actor is gracefully suspended. Tracked
// in https://github.com/agent-substrate/substrate/issues/23.
func (s *WorkerPoolSyncer) releaseActorOnDeadWorker(ctx context.Context, namespace, pool, podName string) error {
worker, err := s.persistence.GetWorker(ctx, namespace, pool, podName)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil
}
return err
}
if worker.GetActorId() == "" {
return nil
}
actor, err := s.persistence.GetActor(ctx, worker.GetActorId())
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil
}
return err
}
// Skip if a concurrent SuspendActor already cleared the pointer.
if actor.GetAteomPodNamespace() != namespace || actor.GetAteomPodName() != podName {
return nil
}
actor.Status = ateapipb.Actor_STATUS_SUSPENDED
actor.AteomPodNamespace = ""
actor.AteomPodName = ""
actor.AteomPodIp = ""
actor.InProgressSnapshot = ""
if err := s.persistence.UpdateActor(ctx, actor, actor.GetVersion()); err != nil && !errors.Is(err, store.ErrPersistenceRetry) {
return err
}
return nil
}
60 changes: 60 additions & 0 deletions cmd/ateapi/internal/controlapi/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/agent-substrate/substrate/cmd/ateapi/internal/store"
"github.com/agent-substrate/substrate/cmd/ateapi/internal/store/storetest"
"github.com/agent-substrate/substrate/pkg/proto/ateapipb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -154,3 +155,62 @@ func TestSyncer_Lifecycle(t *testing.T) {
t.Fatalf("Worker still found in Redis after deletion: %v", err)
}
}

func TestSyncer_DeleteBoundWorker_ClearsActor(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

persistence, fakeK8s, cleanup := setupSyncerTest(t, ctx)
defer cleanup()

ns, pool, pod, ip := "ns-orphan", "pool1", "worker-orphan", "10.0.0.1"
if _, err := fakeK8s.CoreV1().Pods(ns).Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod, Namespace: ns,
Labels: map[string]string{workerPodLabel: pool}},
Status: corev1.PodStatus{Phase: corev1.PodRunning, PodIP: ip,
PodIPs: []corev1.PodIP{{IP: ip}}},
}, metav1.CreateOptions{}); err != nil {
t.Fatalf("create pod: %v", err)
}
if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) {
_, gerr := persistence.GetWorker(c, ns, pool, pod)
return gerr == nil, nil
}); err != nil {
t.Fatalf("worker row not materialised: %v", err)
}
actorID := "actor-orphan"
if err := persistence.CreateActor(ctx, &ateapipb.Actor{
ActorId: actorID, ActorTemplateNamespace: ns, ActorTemplateName: "tmpl",
Status: ateapipb.Actor_STATUS_RUNNING,
AteomPodNamespace: ns, AteomPodName: pod, AteomPodIp: ip,
LastSnapshot: "gs://snapshots/last", InProgressSnapshot: "gs://snapshots/partial",
}); err != nil {
t.Fatalf("create actor: %v", err)
}
w, _ := persistence.GetWorker(ctx, ns, pool, pod)
w.ActorId, w.ActorNamespace, w.ActorTemplate = actorID, ns, "tmpl"
if err := persistence.UpdateWorker(ctx, w, w.Version); err != nil {
t.Fatalf("update worker: %v", err)
}

if err := fakeK8s.CoreV1().Pods(ns).Delete(ctx, pod, metav1.DeleteOptions{}); err != nil {
t.Fatalf("delete pod: %v", err)
}
var got *ateapipb.Actor
if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 2*time.Second, true, func(c context.Context) (bool, error) {
a, gerr := persistence.GetActor(c, actorID)
if gerr != nil {
return false, gerr
}
got = a
return a.GetStatus() == ateapipb.Actor_STATUS_SUSPENDED, nil
}); err != nil {
t.Fatalf("actor not reset to SUSPENDED: %v", err)
}
if got.AteomPodName != "" || got.AteomPodNamespace != "" || got.AteomPodIp != "" || got.InProgressSnapshot != "" {
t.Errorf("bind fields not cleared: %+v", got)
}
if got.LastSnapshot == "" {
t.Errorf("LastSnapshot must be preserved")
}
}
Loading