From 08674abca515e5c0dd83fff69b0d95c722bec089 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 17 Apr 2026 00:13:10 +0000 Subject: [PATCH 1/2] rewrite sfcompute provider for sfc-go Co-authored-by: Brian Lechthaler --- go.mod | 2 + go.sum | 4 + v1/providers/sfcompute/client.go | 27 +- v1/providers/sfcompute/instance.go | 914 +++++++++++++++++------- v1/providers/sfcompute/instance_test.go | 63 ++ v1/providers/sfcompute/instancetype.go | 137 ++-- v1/providers/sfcompute/raw_api.go | 123 ++++ 7 files changed, 941 insertions(+), 329 deletions(-) create mode 100644 v1/providers/sfcompute/instance_test.go create mode 100644 v1/providers/sfcompute/raw_api.go diff --git a/go.mod b/go.mod index 92a51b28..6fdfb694 100644 --- a/go.mod +++ b/go.mod @@ -82,9 +82,11 @@ require ( github.com/prometheus/common v0.66.1 // indirect github.com/prometheus/procfs v0.17.0 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/sfcompute/sfc-go v0.0.0-20260416201327-643324aa73f0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/afero v1.15.0 // indirect github.com/spf13/pflag v1.0.10 // indirect + github.com/spyzhov/ajson v0.8.0 // indirect github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect diff --git a/go.sum b/go.sum index 55a8123a..d0e4d6d2 100644 --- a/go.sum +++ b/go.sum @@ -162,12 +162,16 @@ github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0t github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/sfcompute/nodes-go v0.1.0-alpha.4 h1:oFBWcMPSpqLYm/NDs5I1jTvzgx9rsXDL9Ghsm30Hc0Q= github.com/sfcompute/nodes-go v0.1.0-alpha.4/go.mod h1:nUviHgK+Fgt2hDFcRL3M8VoyiypC8fc0dsY8C30QU8M= +github.com/sfcompute/sfc-go v0.0.0-20260416201327-643324aa73f0 h1:nBXgsBKgPRWpY2zVtHBceMG+RuEcE41Uvzb5U4gVdks= +github.com/sfcompute/sfc-go v0.0.0-20260416201327-643324aa73f0/go.mod h1:vhUpRpAHKitZzzWPg87RjreC+pzK57PGe4ZuSIQSk94= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spyzhov/ajson v0.8.0 h1:sFXyMbi4Y/BKjrsfkUZHSjA2JM1184enheSjjoT/zCc= +github.com/spyzhov/ajson v0.8.0/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= diff --git a/v1/providers/sfcompute/client.go b/v1/providers/sfcompute/client.go index 7dc20317..796ec1ad 100644 --- a/v1/providers/sfcompute/client.go +++ b/v1/providers/sfcompute/client.go @@ -2,11 +2,12 @@ package v1 import ( "context" + "net/http" + "sync" + "time" v1 "github.com/brevdev/cloud/v1" - "github.com/sfcompute/nodes-go/option" - - sfcnodes "github.com/sfcompute/nodes-go" + sfc "github.com/sfcompute/sfc-go" ) const CloudProviderID = "sfcompute" @@ -47,8 +48,13 @@ type SFCClient struct { refID string location string apiKey string - client sfcnodes.Client + client *sfc.Sfc + baseURL string + httpClient *http.Client logger v1.Logger + + workspaceMu sync.Mutex + defaultWorkspace string } var _ v1.CloudClient = &SFCClient{} @@ -62,12 +68,15 @@ func WithLogger(logger v1.Logger) SFCClientOption { } func (c *SFCCredential) MakeClientWithOptions(_ context.Context, location string, opts ...SFCClientOption) (v1.CloudClient, error) { + httpClient := &http.Client{Timeout: 60 * time.Second} sfcClient := &SFCClient{ - refID: c.RefID, - apiKey: c.APIKey, - client: sfcnodes.NewClient(option.WithBearerToken(c.APIKey)), - location: location, - logger: &v1.NoopLogger{}, + refID: c.RefID, + apiKey: c.APIKey, + client: sfc.New(sfc.WithSecurity(c.APIKey), sfc.WithClient(httpClient), sfc.WithTimeout(60*time.Second)), + baseURL: "https://api.sfcompute.com", + httpClient: httpClient, + location: location, + logger: &v1.NoopLogger{}, } for _, opt := range opts { diff --git a/v1/providers/sfcompute/instance.go b/v1/providers/sfcompute/instance.go index dfaf2447..520ff5cd 100644 --- a/v1/providers/sfcompute/instance.go +++ b/v1/providers/sfcompute/instance.go @@ -11,59 +11,152 @@ import ( "github.com/alecthomas/units" "github.com/brevdev/cloud/internal/errors" v1 "github.com/brevdev/cloud/v1" - sfcnodes "github.com/sfcompute/nodes-go" - "github.com/sfcompute/nodes-go/packages/param" + sfc "github.com/sfcompute/sfc-go" + "github.com/sfcompute/sfc-go/models/components" + "github.com/sfcompute/sfc-go/models/operations" + "github.com/sfcompute/sfc-go/optionalnullable" ) const ( - maxPricePerNodeHour = 1600 - defaultPort = 2222 - defaultSSHUsername = "ubuntu" - vmStatusRunning = "running" + defaultSSHPort = 22 + defaultSSHUsername = "root" + defaultImageName = "ubuntu-22.04.5-cuda-12.7" + defaultManagedWindowMinutes = 60 + defaultMinSellPriceDollarsPerNodeHr = "0.010000" + + tagManagedBy = "brev-managed-by" + tagManagedByValue = "brev-cloud-sfcompute" + tagCloudCredRefID = "brev-cloud-cred-ref-id" + tagStage = "brev-stage" + tagRefID = "brev-ref-id" + tagInstanceName = "brev-instance-name" + tagLocation = "brev-location" + tagInstanceType = "brev-instance-type" ) func (c *SFCClient) CreateInstance(ctx context.Context, attrs v1.CreateInstanceAttrs) (*v1.Instance, error) { - // Get the zone for the location (do not include unavailable zones) - zone, err := c.getZone(ctx, attrs.Location, false) + if existing, err := c.findManagedInstanceByRefID(ctx, attrs.RefID); err == nil && existing != nil { + return existing, nil + } + + location := attrs.Location + if location == "" { + location = c.location + } + if location == "" { + return nil, errors.WrapAndTrace(fmt.Errorf("location is required")) + } + + zone, err := c.getZone(ctx, location, false) + if err != nil { + return nil, errors.WrapAndTrace(err) + } + + selectedType, err := getInstanceTypeForZone(*zone) + if err != nil { + return nil, errors.WrapAndTrace(err) + } + if attrs.InstanceType != "" && attrs.InstanceType != selectedType.Type { + return nil, errors.WrapAndTrace(fmt.Errorf("instance type %q is not available in zone %q", attrs.InstanceType, zone.Name)) + } + + workspace, err := c.getDefaultWorkspace(ctx) + if err != nil { + return nil, errors.WrapAndTrace(err) + } + + imageID, err := c.resolveImageID(ctx, attrs.ImageID) if err != nil { return nil, errors.WrapAndTrace(err) } - // Pack cloud cred ref ID, brev stage, instance ref ID, and name into the SFC node name. - // SFC has no tags API, so the node name is the only place to persist this metadata. stage := getStageFromTags(attrs.Tags) - name := brevDataToSFCName(c.refID, stage, attrs.RefID, attrs.Name) - - // Create the node - resp, err := c.client.Nodes.New(ctx, sfcnodes.NodeNewParams{ - CreateNodesRequest: sfcnodes.CreateNodesRequestParam{ - DesiredCount: 1, - MaxPricePerNodeHour: maxPricePerNodeHour, - Zone: zone.Name, - Names: []string{name}, - CloudInitUserData: param.Opt[string]{Value: sshKeyCloudInit(attrs.PublicKey)}, - }, - }) + nodeName := brevDataToSFCName(c.refID, stage, attrs.RefID, attrs.Name) + nodeTags := makeManagedNodeTags(c.refID, stage, attrs, location, selectedType.Type) + resourceTags := makeManagedResourceTags(c.refID, stage, attrs.RefID, attrs.Name, location, selectedType.Type) + + capacityName := makeManagedPoolName("brev-cap", c.refID, stage, location) + procurementName := makeManagedPoolName("brev-proc", c.refID, stage, location) + + var createdCapacityID string + var createdProcurementID string + var createdNodeID string + cleanup := true + defer func() { + if cleanup { + c.cleanupFailedInstanceCreate(ctx, createdNodeID, createdProcurementID, createdCapacityID) + } + }() + + capacityID, procurementID, createdCapacity, createdProcurement, err := c.ensureManagedCapacityAndProcurement( + ctx, + workspace, + zone.Name, + stage, + selectedType.Type, + resourceTags, + capacityName, + procurementName, + selectedType.BasePrice.Number(), + ) + if err != nil { + return nil, errors.WrapAndTrace(err) + } + if createdCapacity { + createdCapacityID = capacityID + } + if createdProcurement { + createdProcurementID = procurementID + } + + nodeReq := components.CreateNodeRequest{ + Name: optionalnullable.From(&nodeName), + Capacity: capacityID, + Image: imageID, + Tags: optionalnullable.From(&nodeTags), + } + if userData := cloudInitUserDataForCreate(attrs); userData != nil { + nodeReq.CloudInitUserData = userData + } + + nodeResp, err := c.client.Nodes.Create(ctx, nodeReq) if err != nil { return nil, errors.WrapAndTrace(err) } - if len(resp.Data) == 0 { - return nil, errors.WrapAndTrace(fmt.Errorf("no nodes returned")) + node := nodeResp.GetNodeResponse() + if node == nil { + return nil, errors.WrapAndTrace(fmt.Errorf("node response missing node")) } - node := resp.Data[0] + createdNodeID = node.ID - // Get the instance instance, err := c.GetInstance(ctx, v1.CloudProviderInstanceID(node.ID)) if err != nil { return nil, errors.WrapAndTrace(err) } + cleanup = false return instance, nil } -func sshKeyCloudInit(sshKey string) string { - script := fmt.Sprintf("#cloud-config\nssh_authorized_keys:\n - %s", sshKey) - return base64.StdEncoding.EncodeToString([]byte(script)) +func cloudInitUserDataForCreate(attrs v1.CreateInstanceAttrs) *string { + if attrs.UserDataBase64 != "" { + return &attrs.UserDataBase64 + } + if attrs.PublicKey == "" { + return nil + } + + script := fmt.Sprintf(`#!/bin/bash +set -e +mkdir -p /root/.ssh +chmod 700 /root/.ssh +cat >>/root/.ssh/authorized_keys <<'EOF' +%s +EOF +chmod 600 /root/.ssh/authorized_keys +`, strings.TrimSpace(attrs.PublicKey)) + encoded := base64.StdEncoding.EncodeToString([]byte(script)) + return &encoded } func (c *SFCClient) GetInstance(ctx context.Context, id v1.CloudProviderInstanceID) (*v1.Instance, error) { @@ -72,19 +165,16 @@ func (c *SFCClient) GetInstance(ctx context.Context, id v1.CloudProviderInstance v1.LogField("location", c.location), ) - // Get the node from the API - node, err := c.client.Nodes.Get(ctx, string(id)) + resp, err := c.client.Nodes.Fetch(ctx, string(id), nil) if err != nil { return nil, errors.WrapAndTrace(err) } - - // Get the zone for the location (include unavailable zones, in case the zone is not available but the node is still running) - zone, err := c.getZone(ctx, node.Zone, true) - if err != nil { - return nil, errors.WrapAndTrace(err) + node := resp.GetNodeResponse() + if node == nil { + return nil, errors.WrapAndTrace(fmt.Errorf("node response missing node")) } - nodeInfo, err := c.sfcNodeInfoFromNode(ctx, node, zone) + nodeInfo, err := c.sfcNodeInfoFromNode(ctx, node) if err != nil { return nil, errors.WrapAndTrace(err) } @@ -102,99 +192,72 @@ func (c *SFCClient) GetInstance(ctx context.Context, id v1.CloudProviderInstance return instance, nil } -func (c *SFCClient) getZone(ctx context.Context, location string, includeUnavailable bool) (*sfcnodes.ZoneListResponseData, error) { - // Fetch the zones to ensure the location is valid - zones, err := c.getZones(ctx, includeUnavailable) - if err != nil { - return nil, errors.WrapAndTrace(err) - } - if len(zones) == 0 { - return nil, errors.WrapAndTrace(fmt.Errorf("no zones available")) - } - - // Find the zone that matches the location - var zone *sfcnodes.ZoneListResponseData - for _, z := range zones { - if z.Name == location { - zone = &z - break - } - } - if zone == nil { - return nil, errors.WrapAndTrace(fmt.Errorf("zone not found in location %s", location)) - } - - return zone, nil -} - func (c *SFCClient) ListInstances(ctx context.Context, args v1.ListInstancesArgs) ([]v1.Instance, error) { c.logger.Debug(ctx, "sfc: ListInstances start", v1.LogField("location", c.location), v1.LogField("args", fmt.Sprintf("%+v", args)), ) - resp, err := c.client.Nodes.List(ctx, sfcnodes.NodeListParams{}) + req := operations.ListNodesRequest{ + Limit: sfc.Int64(200), + } + if len(args.InstanceIDs) > 0 { + req.ID = make([]string, 0, len(args.InstanceIDs)) + for _, id := range args.InstanceIDs { + req.ID = append(req.ID, string(id)) + } + } + + resp, err := c.client.Nodes.List(ctx, req) if err != nil { return nil, errors.WrapAndTrace(err) } - c.logger.Debug(ctx, "sfc: ListInstances nodes list", - v1.LogField("node count", len(resp.Data)), - ) + var instances []v1.Instance + for resp != nil { + list := resp.GetListNodesResponse() + if list == nil { + break + } - zoneCache := make(map[string]*sfcnodes.ZoneListResponseData) + for _, node := range list.Data { + nodeInfo, err := c.sfcNodeInfoFromNode(ctx, &node) + if err != nil { + c.logger.Error(ctx, err, + v1.LogField("msg", "sfc: ListInstances skipping node due to error"), + v1.LogField("nodeID", node.ID), + v1.LogField("nodeName", node.Name), + ) + continue + } - var instances []v1.Instance - for _, node := range resp.Data { - // Get the zone for the node, checking the cache first - zone, ok := zoneCache[node.Zone] - if !ok { - z, err := c.getZone(ctx, node.Zone, true) + inst, err := c.sfcNodeToBrevInstance(*nodeInfo) if err != nil { - return nil, errors.WrapAndTrace(err) + c.logger.Error(ctx, err, + v1.LogField("msg", "sfc: ListInstances skipping node due to conversion error"), + v1.LogField("nodeID", node.ID), + v1.LogField("nodeName", node.Name), + ) + continue } - zoneCache[node.Zone] = z - zone = z - } - // Filter by locations - if args.Locations != nil && !args.Locations.IsAllowed(zone.Name) { - c.logger.Debug(ctx, "sfc: ListInstances node filtered out by location", - v1.LogField("nodeID", node.ID), - v1.LogField("location", zone.Name), - ) - continue - } + if args.Locations != nil && !args.Locations.IsAllowed(inst.Location) { + continue + } + if len(args.TagFilters) > 0 && !matchesTagFilters(inst.Tags, args.TagFilters) { + continue + } - // Filter by instance IDs - if len(args.InstanceIDs) > 0 && !slices.Contains(args.InstanceIDs, v1.CloudProviderInstanceID(node.ID)) { - c.logger.Debug(ctx, "sfc: ListInstances node filtered out by instance ID", - v1.LogField("nodeID", node.ID), - v1.LogField("instanceID", v1.CloudProviderInstanceID(node.ID)), - ) - continue + instances = append(instances, *inst) } - nodeInfo, err := c.sfcNodeInfoFromNodeListResponseData(ctx, &node, zone) - if err != nil { - c.logger.Error(ctx, err, - v1.LogField("msg", "sfc: ListInstances skipping node due to error"), - v1.LogField("nodeID", node.ID), - v1.LogField("nodeName", node.Name), - ) - continue + if !list.HasMore || resp.Next == nil { + break } - - inst, err := c.sfcNodeToBrevInstance(*nodeInfo) + resp, err = resp.Next() if err != nil { - c.logger.Error(ctx, err, - v1.LogField("msg", "sfc: ListInstances skipping node due to conversion error"), - v1.LogField("nodeID", node.ID), - v1.LogField("nodeName", node.Name), - ) - continue + return nil, errors.WrapAndTrace(err) } - instances = append(instances, *inst) } c.logger.Debug(ctx, "sfc: ListInstances end", @@ -209,7 +272,7 @@ func (c *SFCClient) TerminateInstance(ctx context.Context, id v1.CloudProviderIn v1.LogField("instanceID", id), ) - _, err := c.client.Nodes.Release(ctx, string(id)) + _, err := c.client.Nodes.TerminateNode(ctx, string(id)) if err != nil { return errors.WrapAndTrace(err) } @@ -222,29 +285,22 @@ func (c *SFCClient) TerminateInstance(ctx context.Context, id v1.CloudProviderIn } type sfcNodeInfo struct { - id string - name string - createdAt time.Time - status v1.LifecycleStatus - gpuType string - sshUsername string - sshHostname string - zone *sfcnodes.ZoneListResponseData + id string + name string + refID string + cloudCredRefID string + location string + instanceType string + createdAt time.Time + status v1.LifecycleStatus + sshUsername string + sshHostname string + sshPort int + tags v1.Tags } func (c *SFCClient) sfcNodeToBrevInstance(node sfcNodeInfo) (*v1.Instance, error) { - // Parse cloud cred ref ID, brev stage, instance ref ID, and name from the node name. - // Old-format names (refID_name) return empty cloudCredRefID — fall back to c.refID. - cloudCredRefID, _, refID, name, err := sfcNameToBrevData(node.name) - if err != nil { - return nil, errors.WrapAndTrace(err) - } - if cloudCredRefID == "" { - cloudCredRefID = c.refID - } - - // Get the instance type for the zone - instanceType, err := getInstanceTypeForZone(*node.zone) + instanceType, err := getInstanceTypeForLocationAndType(node.location, node.instanceType) if err != nil { return nil, errors.WrapAndTrace(err) } @@ -255,154 +311,355 @@ func (c *SFCClient) sfcNodeToBrevInstance(node sfcNodeInfo) (*v1.Instance, error } diskSize := units.Base2Bytes(diskSizeInt64 * int64(units.Gibibyte)) - // Create the instance inst := &v1.Instance{ - Name: name, + Name: node.name, CloudID: v1.CloudProviderInstanceID(node.id), - RefID: refID, + RefID: node.refID, PublicDNS: node.sshHostname, PublicIP: node.sshHostname, SSHUser: node.sshUsername, - SSHPort: defaultPort, + SSHPort: node.sshPort, CreatedAt: node.createdAt, DiskSize: diskSize, - DiskSizeBytes: instanceType.SupportedStorage[0].SizeBytes, // TODO: this should be pulled from the node itself + DiskSizeBytes: instanceType.SupportedStorage[0].SizeBytes, Status: v1.Status{ LifecycleStatus: node.status, }, InstanceTypeID: instanceType.ID, InstanceType: instanceType.Type, - Location: node.zone.Name, + Location: node.location, Spot: false, Stoppable: false, Rebootable: false, - CloudCredRefID: cloudCredRefID, + CloudCredRefID: node.cloudCredRefID, + Tags: node.tags, } return inst, nil } -func (c *SFCClient) sfcNodeInfoFromNode(ctx context.Context, node *sfcnodes.Node, zone *sfcnodes.ZoneListResponseData) (*sfcNodeInfo, error) { - nodeStatus := sfcStatusToLifecycleStatus(fmt.Sprint(node.Status)) - - // Check node-level status first before inspecting individual VMs. - // A node in a terminal state (e.g. "released") may still have VMs reporting as "Running" - // because SFC keeps the VM alive until the end of its allotted time window. The node status - // is the source of truth — if the node is released/destroyed/deleted, the instance is - // terminated; if the node is failed, the instance is failed. VM status should not be - // consulted in either case. - // Additionally, terminal nodes can accumulate multiple VM records (previous + current), - // which would otherwise cause a "multiple VMs found" error and break ListInstances entirely. - if isTerminalNodeStatus(fmt.Sprint(node.Status)) { - if nodeStatus == v1.LifecycleStatusFailed { - for _, vm := range node.VMs.Data { - if strings.ToLower(vm.Status) == vmStatusRunning { - c.logger.Warn(ctx, "sfc: node is failed but VM is still running", - v1.LogField("node_id", node.ID), - v1.LogField("node_status", fmt.Sprint(node.Status)), - v1.LogField("vm_id", vm.ID), - v1.LogField("vm_status", vm.Status), - ) - } - } - } - return &sfcNodeInfo{ - id: node.ID, - name: node.Name, - createdAt: time.Unix(node.CreatedAt, 0), - status: nodeStatus, - gpuType: string(node.GPUType), - sshUsername: defaultSSHUsername, - sshHostname: "", - zone: zone, - }, nil +func (c *SFCClient) sfcNodeInfoFromNode(ctx context.Context, node *components.NodeResponse) (*sfcNodeInfo, error) { + tags := optionalTagsToMap(node.Tags) + + cloudCredRefID, refID, name, err := c.instanceMetadataFromNode(node.Name, tags) + if err != nil { + return nil, err } - sshHostname, err := c.sshHostnameFromVMs(ctx, node.VMs.Data) + location, err := c.locationFromNode(node, tags) if err != nil { - return nil, errors.WrapAndTrace(err) + return nil, err + } + + instanceType, err := c.instanceTypeFromNode(ctx, node, tags, location) + if err != nil { + return nil, err + } + + status := sfcStatusToLifecycleStatus(string(node.Status)) + sshHostname := "" + sshPort := defaultSSHPort + if status == v1.LifecycleStatusRunning { + hostname, port, ok := c.getSSHInfoForNode(ctx, node.ID) + if ok { + sshHostname = hostname + sshPort = port + } else { + // The v2 API reports nodes as running before the VM has fully booted. + // Treat the instance as pending until SSH information is available. + status = v1.LifecycleStatusPending + } } return &sfcNodeInfo{ - id: node.ID, - name: node.Name, - createdAt: time.Unix(node.CreatedAt, 0), - status: nodeStatus, - gpuType: string(node.GPUType), - sshUsername: defaultSSHUsername, - sshHostname: sshHostname, - zone: zone, + id: node.ID, + name: name, + refID: refID, + cloudCredRefID: cloudCredRefID, + location: location, + instanceType: instanceType, + createdAt: time.Unix(node.CreatedAt, 0), + status: status, + sshUsername: defaultSSHUsername, + sshHostname: sshHostname, + sshPort: sshPort, + tags: tags, }, nil } -func (c *SFCClient) sfcNodeInfoFromNodeListResponseData(ctx context.Context, node *sfcnodes.ListResponseNodeData, zone *sfcnodes.ZoneListResponseData) (*sfcNodeInfo, error) { - sfcNode := sfcListResponseNodeDataToNode(node) - return c.sfcNodeInfoFromNode(ctx, sfcNode, zone) -} - -// Convert the sfcnodes.ListResponseNodeData into a node *sfcnodes.Node -- these are fundamentally the same object, but they -// lack a common interface. One type is returned from a single "get" call, the other is the type of each object returned by -// a "list" call. This conversion function allows the rest of our business logic to treat these as the same type. -func sfcListResponseNodeDataToNode(node *sfcnodes.ListResponseNodeData) *sfcnodes.Node { - vms := make([]sfcnodes.NodeVMsData, len(node.VMs.Data)) - for i, vm := range node.VMs.Data { - vms[i] = sfcnodes.NodeVMsData{ //nolint:staticcheck // ok - ID: vm.ID, - CreatedAt: vm.CreatedAt, - EndAt: vm.EndAt, - Object: vm.Object, - StartAt: vm.StartAt, - Status: vm.Status, - UpdatedAt: vm.UpdatedAt, - ImageID: vm.ImageID, - JSON: vm.JSON, - } - } - - return &sfcnodes.Node{ - ID: node.ID, - GPUType: node.GPUType, - Name: node.Name, - NodeType: node.NodeType, - Object: node.Object, - Owner: node.Owner, - Status: node.Status, - CreatedAt: node.CreatedAt, - DeletedAt: node.DeletedAt, - EndAt: node.EndAt, - MaxPricePerNodeHour: node.MaxPricePerNodeHour, - ProcurementID: node.ProcurementID, - StartAt: node.StartAt, - UpdatedAt: node.UpdatedAt, - Zone: node.Zone, - JSON: node.JSON, - VMs: sfcnodes.NodeVMs{ - Data: vms, - Object: node.VMs.Object, - JSON: node.VMs.JSON, +func (c *SFCClient) getSSHInfoForNode(ctx context.Context, nodeID string) (string, int, bool) { + resp, err := c.client.Nodes.GetSSHInfoForNode(ctx, nodeID) + if err != nil { + c.logger.Debug(ctx, "sfc: SSH info unavailable yet", + v1.LogField("nodeID", nodeID), + v1.LogField("error", err.Error()), + ) + return "", 0, false + } + + info := resp.GetNodeSSHInfo() + if info == nil || info.Hostname == "" { + return "", 0, false + } + + port := int(info.Port) + if port == 0 { + port = defaultSSHPort + } + return info.Hostname, port, true +} + +func (c *SFCClient) locationFromNode(node *components.NodeResponse, tags v1.Tags) (string, error) { + if location := tags[tagLocation]; location != "" { + return location, nil + } + if zone, ok := optionalStringValue(node.Zone); ok && zone != "" { + return zone, nil + } + if c.location != "" { + return c.location, nil + } + return "", fmt.Errorf("node %s missing location metadata", node.ID) +} + +func (c *SFCClient) instanceTypeFromNode(ctx context.Context, node *components.NodeResponse, tags v1.Tags, location string) (string, error) { + if instanceType := tags[tagInstanceType]; instanceType != "" { + return instanceType, nil + } + + zoneName := location + if zone, ok := optionalStringValue(node.Zone); ok && zone != "" { + zoneName = zone + } + + zone, err := c.getZone(ctx, zoneName, true) + if err != nil { + return "", err + } + return makeInstanceTypeName(*zone), nil +} + +func (c *SFCClient) instanceMetadataFromNode(nodeName string, tags v1.Tags) (string, string, string, error) { + if tags[tagManagedBy] == tagManagedByValue && tags[tagRefID] != "" { + cloudCredRefID := tags[tagCloudCredRefID] + if cloudCredRefID == "" { + cloudCredRefID = c.refID + } + return cloudCredRefID, tags[tagRefID], tags[tagInstanceName], nil + } + + cloudCredRefID, _, refID, name, err := sfcNameToBrevData(nodeName) + if err != nil { + return "", "", "", err + } + if cloudCredRefID == "" { + cloudCredRefID = c.refID + } + return cloudCredRefID, refID, name, nil +} + +func (c *SFCClient) resolveImageID(ctx context.Context, requested string) (string, error) { + if strings.HasPrefix(requested, "image_") || strings.HasPrefix(requested, "sfc:image:") { + return requested, nil + } + + targetName := requested + if targetName == "" { + targetName = defaultImageName + } + + req := operations.ListImagesRequest{ + Limit: sfc.Int64(200), + } + resp, err := c.client.Images.List(ctx, req) + if err != nil { + return "", err + } + + var fallbackImageID string + for resp != nil { + list := resp.GetListImagesResponse() + if list == nil { + break + } + + for _, image := range list.Data { + if image.UploadStatus != components.ImageUploadStatusCompleted { + continue + } + if fallbackImageID == "" { + fallbackImageID = image.ID + } + if image.ID == requested || image.ResourcePath == requested || image.Name == targetName { + return image.ID, nil + } + } + + if !list.HasMore || resp.Next == nil { + break + } + resp, err = resp.Next() + if err != nil { + return "", err + } + } + + if requested != "" { + return "", fmt.Errorf("image %q not found", requested) + } + if fallbackImageID != "" { + return fallbackImageID, nil + } + return "", fmt.Errorf("no completed SF Compute images available") +} + +func (c *SFCClient) findManagedInstanceByRefID(ctx context.Context, refID string) (*v1.Instance, error) { + if refID == "" { + return nil, nil + } + + resp, err := c.client.Nodes.List(ctx, operations.ListNodesRequest{ + Limit: sfc.Int64(50), + Tag: []string{ + fmt.Sprintf("%s=%s", tagManagedBy, tagManagedByValue), + fmt.Sprintf("%s=%s", tagRefID, refID), }, + }) + if err != nil { + return nil, err + } + for resp != nil { + list := resp.GetListNodesResponse() + if list == nil { + break + } + + for _, node := range list.Data { + nodeInfo, err := c.sfcNodeInfoFromNode(ctx, &node) + if err != nil { + return nil, err + } + if nodeInfo.status == v1.LifecycleStatusTerminated { + continue + } + return c.sfcNodeToBrevInstance(*nodeInfo) + } + + if !list.HasMore || resp.Next == nil { + break + } + resp, err = resp.Next() + if err != nil { + return nil, err + } } + + return nil, nil } -// sfcStatusToLifecycleStatus maps SFC node-level statuses to Brev lifecycle statuses. -// -// SFC node statuses (from the nodes-go SDK): -// - "pending" → node is being provisioned -// - "awaitingcapacity" → node is waiting for capacity (auto-reserved nodes) -// - "running" → node is active with a VM provisioned -// - "released" → node was released via Nodes.Release(). This is a TERMINAL state. -// VMs may still report "Running" underneath until their allotted time ends, -// but the node lease is over. "released" means terminated, NOT terminating. -// - "terminated" → node has been terminated -// - "deleted" → node has been deleted -// - "destroyed" → node has been destroyed -// - "failed" → node provisioning or operation failed -// - "unknown" → unknown status -// -// Note: SFC does NOT have a transitional "releasing" or "terminating" status. -// The Release API transitions a node directly from "running" to "released". +func (c *SFCClient) cleanupFailedInstanceCreate(ctx context.Context, nodeID string, procurementID string, capacityID string) { + if nodeID != "" { + if _, err := c.client.Nodes.TerminateNode(ctx, nodeID); err != nil { + c.logger.Error(ctx, err, v1.LogField("msg", "sfc: failed to clean up node after create failure"), v1.LogField("nodeID", nodeID)) + } + } + if procurementID != "" { + if _, err := c.client.Procurements.Delete(ctx, procurementID); err != nil { + c.logger.Error(ctx, err, v1.LogField("msg", "sfc: failed to clean up procurement after create failure"), v1.LogField("procurementID", procurementID)) + } + } + if capacityID != "" { + if _, err := c.client.Capacities.Delete(ctx, capacityID); err != nil { + c.logger.Error(ctx, err, v1.LogField("msg", "sfc: failed to clean up capacity after create failure"), v1.LogField("capacityID", capacityID)) + } + } +} + +func makeManagedNodeTags(cloudCredRefID string, stage string, attrs v1.CreateInstanceAttrs, location string, instanceType string) map[string]string { + tags := make(map[string]string, len(attrs.Tags)+7) + for k, v := range attrs.Tags { + tags[k] = v + } + + tags[tagManagedBy] = tagManagedByValue + tags[tagCloudCredRefID] = cloudCredRefID + tags[tagStage] = stage + tags[tagRefID] = attrs.RefID + tags[tagInstanceName] = attrs.Name + tags[tagLocation] = location + tags[tagInstanceType] = instanceType + return tags +} + +func makeManagedResourceTags(cloudCredRefID string, stage string, refID string, name string, location string, instanceType string) map[string]string { + return map[string]string{ + tagManagedBy: tagManagedByValue, + tagCloudCredRefID: cloudCredRefID, + tagStage: stage, + tagRefID: refID, + tagInstanceName: name, + tagLocation: location, + tagInstanceType: instanceType, + } +} + +func makeManagedPoolName(prefix string, cloudCredRefID string, stage string, location string) string { + sanitized := sanitizeManagedNamePart(fmt.Sprintf("%s-%s-%s", cloudCredRefID, stage, location)) + return fmt.Sprintf("%s-%s", prefix, sanitized) +} + +func sanitizeManagedNamePart(value string) string { + sanitized := strings.Map(func(r rune) rune { + switch { + case r >= 'a' && r <= 'z': + return r + case r >= 'A' && r <= 'Z': + return r + case r >= '0' && r <= '9': + return r + case r == '-', r == '_', r == '.': + return r + default: + return '-' + } + }, value) + sanitized = strings.TrimLeft(sanitized, "-_.") + if sanitized == "" { + sanitized = "default" + } + return sanitized +} + +func optionalTagsToMap(tags optionalnullable.OptionalNullable[map[string]string]) v1.Tags { + if tags == nil { + return nil + } + value, ok := tags.GetOrZero() + if !ok || tags.IsNull() { + return nil + } + + copied := make(v1.Tags, len(value)) + for k, v := range value { + copied[k] = v + } + return copied +} + +func optionalStringValue(value optionalnullable.OptionalNullable[string]) (string, bool) { + if value == nil { + return "", false + } + s, ok := value.GetOrZero() + if !ok || value.IsNull() { + return "", false + } + return s, true +} + +// sfcStatusToLifecycleStatus maps v2 node statuses to Brev lifecycle statuses. func sfcStatusToLifecycleStatus(status string) v1.LifecycleStatus { switch strings.ToLower(status) { - case "pending", "unspecified", "awaitingcapacity", "unknown": + case "awaiting_allocation", "pending", "unspecified", "unknown": return v1.LifecycleStatusPending case "running": return v1.LifecycleStatusRunning @@ -410,7 +667,7 @@ func sfcStatusToLifecycleStatus(status string) v1.LifecycleStatus { return v1.LifecycleStatusStopped case "terminating": return v1.LifecycleStatusTerminating - case "released", "destroyed", "deleted": + case "terminated", "released", "destroyed", "deleted": return v1.LifecycleStatusTerminated case "nodefailure", "failed": return v1.LifecycleStatusFailed @@ -419,39 +676,17 @@ func sfcStatusToLifecycleStatus(status string) v1.LifecycleStatus { } } -// isTerminalNodeStatus returns true if the SFC node status is a terminal state where -// the node is no longer active. When a node is terminal, its VM statuses should not be -// inspected — they may be stale (e.g. VM still "Running" after the node was released). -func isTerminalNodeStatus(status string) bool { - lifecycleStatus := sfcStatusToLifecycleStatus(status) - return lifecycleStatus == v1.LifecycleStatusTerminated || lifecycleStatus == v1.LifecycleStatusFailed -} - -// sshHostnameFromVMs finds the first running VM and returns its SSH hostname. -// Nodes can accumulate multiple VM records (e.g. awaitingcapacity nodes with previous -// destroyed VMs); only a running VM provides usable SSH info. -func (c *SFCClient) sshHostnameFromVMs(ctx context.Context, vms []sfcnodes.NodeVMsData) (string, error) { - for _, vm := range vms { - if strings.ToLower(vm.Status) == vmStatusRunning { - return c.getSSHHostnameFromVM(ctx, vm.ID, vm.Status) +func matchesTagFilters(instanceTags map[string]string, tagFilters map[string][]string) bool { + for filterKey, acceptableValues := range tagFilters { + instanceValue, hasTag := instanceTags[filterKey] + if !hasTag { + return false + } + if !slices.Contains(acceptableValues, instanceValue) { + return false } } - return "", nil -} - -func (c *SFCClient) getSSHHostnameFromVM(ctx context.Context, vmID string, vmStatus string) (string, error) { - // If the VM is not running, set the SSH username and hostname to empty strings - if strings.ToLower(vmStatus) != vmStatusRunning { - return "", nil - } - - // If the VM is running, get the SSH username and hostname - sshResponse, err := c.client.VMs.SSH(ctx, sfcnodes.VMSSHParams{VMID: vmID}) - if err != nil { - return "", errors.WrapAndTrace(err) - } - - return sshResponse.SSHHostname, nil + return true } // brevDataToSFCName packs cloud credential ref ID, brev stage, instance ref ID, and instance @@ -495,6 +730,133 @@ func getStageFromTags(tags v1.Tags) string { return "unknown" } +func (c *SFCClient) ensureManagedCapacityAndProcurement( + ctx context.Context, + workspace string, + location string, + stage string, + instanceType string, + resourceTags map[string]string, + capacityName string, + procurementName string, + maxBuyPrice string, +) (capacityID string, procurementID string, createdCapacity bool, createdProcurement bool, err error) { + capacityID, err = c.findManagedCapacityID(ctx, stage, location, instanceType) + if err != nil { + return "", "", false, false, err + } + + if capacityID == "" { + capacityResp, err := c.client.Capacities.Create(ctx, components.CreateCapacityRequest{ + Name: optionalnullable.From(&capacityName), + Workspace: workspace, + Zones: []string{location}, + Tags: optionalnullable.From(&resourceTags), + }) + if err != nil { + return "", "", false, false, err + } + capacity := capacityResp.GetCapacityResponse() + if capacity == nil { + return "", "", false, false, fmt.Errorf("capacity response missing capacity") + } + capacityID = capacity.ID + createdCapacity = true + } + + procurementID, err = c.findManagedProcurementID(ctx, capacityID) + if err != nil { + return "", "", createdCapacity, false, err + } + if procurementID != "" { + return capacityID, procurementID, createdCapacity, false, nil + } + + enabled := true + procurementResp, err := c.client.Procurements.Create(ctx, components.CreateProcurementRequest{ + Name: optionalnullable.From(&procurementName), + Target: components.CreateProcurementTargetNodeCountTag("node_count"), + Capacity: capacityID, + MinSellPriceDollarsPerNodeHour: defaultMinSellPriceDollarsPerNodeHr, + MaxBuyPriceDollarsPerNodeHour: maxBuyPrice, + ManagedWindowMinutes: defaultManagedWindowMinutes, + Enabled: &enabled, + }) + if err != nil { + return "", "", createdCapacity, false, err + } + procurement := procurementResp.GetProcurementResponse() + if procurement == nil { + return "", "", createdCapacity, false, fmt.Errorf("procurement response missing procurement") + } + + return capacityID, procurement.ID, createdCapacity, true, nil +} + +func (c *SFCClient) findManagedCapacityID(ctx context.Context, stage string, location string, instanceType string) (string, error) { + resp, err := c.client.Capacities.List(ctx, operations.ListCapacitiesRequest{ + Limit: sfc.Int64(100), + Tag: []string{ + fmt.Sprintf("%s=%s", tagManagedBy, tagManagedByValue), + fmt.Sprintf("%s=%s", tagCloudCredRefID, c.refID), + fmt.Sprintf("%s=%s", tagStage, stage), + fmt.Sprintf("%s=%s", tagLocation, location), + fmt.Sprintf("%s=%s", tagInstanceType, instanceType), + }, + }) + if err != nil { + return "", err + } + + for resp != nil { + list := resp.GetListCapacitiesResponse() + if list == nil { + break + } + if len(list.Data) > 0 { + return list.Data[0].ID, nil + } + if !list.HasMore || resp.Next == nil { + break + } + resp, err = resp.Next() + if err != nil { + return "", err + } + } + + return "", nil +} + +func (c *SFCClient) findManagedProcurementID(ctx context.Context, capacityID string) (string, error) { + resp, err := c.client.Procurements.List(ctx, operations.ListProcurementsRequest{ + Capacity: &capacityID, + Limit: sfc.Int64(100), + }) + if err != nil { + return "", err + } + + for resp != nil { + list := resp.GetListProcurementsResponse() + if list == nil { + break + } + if len(list.Data) > 0 { + return list.Data[0].ID, nil + } + if !list.HasMore || resp.Next == nil { + break + } + resp, err = resp.Next() + if err != nil { + return "", err + } + } + + return "", nil +} + // Optional if supported: func (c *SFCClient) RebootInstance(_ context.Context, _ v1.CloudProviderInstanceID) error { return v1.ErrNotImplemented diff --git a/v1/providers/sfcompute/instance_test.go b/v1/providers/sfcompute/instance_test.go new file mode 100644 index 00000000..20fc64c6 --- /dev/null +++ b/v1/providers/sfcompute/instance_test.go @@ -0,0 +1,63 @@ +package v1 + +import ( + "encoding/base64" + "strings" + "testing" + + v1 "github.com/brevdev/cloud/v1" + "github.com/stretchr/testify/require" +) + +func TestInstanceMetadataFromNodePrefersManagedTags(t *testing.T) { + t.Parallel() + + client := &SFCClient{refID: "fallback-ref"} + tags := v1.Tags{ + tagManagedBy: tagManagedByValue, + tagCloudCredRefID: "cloud-cred-ref", + tagRefID: "instance-ref", + tagInstanceName: "instance-name", + } + + cloudCredRefID, refID, name, err := client.instanceMetadataFromNode("legacy_name", tags) + require.NoError(t, err) + require.Equal(t, "cloud-cred-ref", cloudCredRefID) + require.Equal(t, "instance-ref", refID) + require.Equal(t, "instance-name", name) +} + +func TestSFCStatusToLifecycleStatus(t *testing.T) { + t.Parallel() + + cases := map[string]v1.LifecycleStatus{ + "awaiting_allocation": v1.LifecycleStatusPending, + "running": v1.LifecycleStatusRunning, + "terminated": v1.LifecycleStatusTerminated, + "failed": v1.LifecycleStatusFailed, + } + + for input, expected := range cases { + t.Run(input, func(t *testing.T) { + t.Parallel() + require.Equal(t, expected, sfcStatusToLifecycleStatus(input)) + }) + } +} + +func TestCloudInitUserDataForCreateUsesShellScript(t *testing.T) { + t.Parallel() + + userData := cloudInitUserDataForCreate(v1.CreateInstanceAttrs{ + PublicKey: "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITest user@example.com", + }) + require.NotNil(t, userData) + + decoded, err := base64.StdEncoding.DecodeString(*userData) + require.NoError(t, err) + + script := string(decoded) + require.True(t, strings.HasPrefix(script, "#!/bin/bash")) + require.Contains(t, script, "/root/.ssh/authorized_keys") + require.Contains(t, script, "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAITest user@example.com") +} diff --git a/v1/providers/sfcompute/instancetype.go b/v1/providers/sfcompute/instancetype.go index 34b7f3e5..6d7b6745 100644 --- a/v1/providers/sfcompute/instancetype.go +++ b/v1/providers/sfcompute/instancetype.go @@ -8,7 +8,6 @@ import ( "github.com/alecthomas/units" "github.com/bojanz/currency" - sfcnodes "github.com/sfcompute/nodes-go" v1 "github.com/brevdev/cloud/v1" ) @@ -42,9 +41,7 @@ func (c *SFCClient) GetInstanceTypes(ctx context.Context, args v1.GetInstanceTyp v1.LogField("args", fmt.Sprintf("%+v", args)), ) - // Fetch all available zones - includeUnavailable := false - zones, err := c.getZones(ctx, includeUnavailable) + zones, err := c.getZones(ctx, false) if err != nil { return nil, err } @@ -86,8 +83,8 @@ func (c *SFCClient) GetInstanceTypes(ctx context.Context, args v1.GetInstanceTyp return instanceTypes, nil } -func getInstanceTypeForZone(zone sfcnodes.ZoneListResponseData) (*v1.InstanceType, error) { - gpuType := strings.ToLower(string(zone.HardwareType)) +func getInstanceTypeForZone(zone sfcZone) (*v1.InstanceType, error) { + gpuType := strings.ToLower(zone.HardwareType) gpuMetadata, err := getInstanceTypeMetadata(gpuType) if err != nil { @@ -123,6 +120,7 @@ func getInstanceTypeForZone(zone sfcnodes.ZoneListResponseData) (*v1.InstanceTyp Rebootable: false, IsContainer: false, Provider: CloudProviderID, + Cloud: CloudProviderID, BasePrice: &gpuMetadata.price, EstimatedDeployTime: &gpuMetadata.estimatedDeployTime, SupportedGPUs: []v1.GPU{{ @@ -152,12 +150,12 @@ func gpuTypeIsAllowed(gpuType string) bool { return gpuType == gpuTypeH100 || gpuType == gpuTypeH200 } -func makeInstanceTypeName(zone sfcnodes.ZoneListResponseData) string { +func makeInstanceTypeName(zone sfcZone) string { interconnect := "" if strings.ToLower(zone.InterconnectType) == interconnectInfiniband { interconnect = ".ib" } - return fmt.Sprintf("%s%s", strings.ToLower(string(zone.HardwareType)), interconnect) + return fmt.Sprintf("%s%s", strings.ToLower(zone.HardwareType), interconnect) } func (c *SFCClient) GetLocations(ctx context.Context, args v1.GetLocationsArgs) ([]v1.Location, error) { @@ -175,42 +173,9 @@ func (c *SFCClient) GetLocations(ctx context.Context, args v1.GetLocationsArgs) return locations, nil } -func (c *SFCClient) getZones(ctx context.Context, includeUnavailable bool) ([]sfcnodes.ZoneListResponseData, error) { - // Fetch the zones from the API - resp, err := c.client.Zones.List(ctx) - if err != nil { - return nil, err - } - - // If there are no zones, return an empty list - if resp == nil || len(resp.Data) == 0 { - return []sfcnodes.ZoneListResponseData{}, nil - } - - zones := make([]sfcnodes.ZoneListResponseData, 0, len(resp.Data)) - for _, zone := range resp.Data { - // If there is no current available capacity, skip it. - // AvailableCapacity contains time-windowed rectangles; we must check - // that at least one rectangle covers the current time with quantity > 0. - if !hasCurrentCapacity(zone.AvailableCapacity) && !includeUnavailable { - continue - } - - // If the delivery type is not VM, skip it - if zone.DeliveryType != deliveryTypeVM { - continue - } - - // Add the zone to the list - zones = append(zones, zone) - } - - return zones, nil -} - // hasCurrentCapacity returns true if any availability rectangle covers the // current time and has quantity > 0. -func hasCurrentCapacity(capacity []sfcnodes.ZoneListResponseDataAvailableCapacity) bool { +func hasCurrentCapacity(capacity []sfcZoneAvailability) bool { now := time.Now().Unix() for _, c := range capacity { if c.StartTimestamp <= now && now < c.EndTimestamp && c.Quantity > 0 { @@ -220,14 +185,98 @@ func hasCurrentCapacity(capacity []sfcnodes.ZoneListResponseDataAvailableCapacit return false } -func zoneToLocation(zone sfcnodes.ZoneListResponseData) v1.Location { +func zoneToLocation(zone sfcZone) v1.Location { return v1.Location{ Name: zone.Name, - Description: fmt.Sprintf("sfc_%s_%s", zone.Name, string(zone.HardwareType)), + Description: fmt.Sprintf("sfc_%s_%s", zone.Name, zone.HardwareType), Available: true, } } +func (c *SFCClient) getZone(ctx context.Context, location string, includeUnavailable bool) (*sfcZone, error) { + zones, err := c.getZones(ctx, includeUnavailable) + if err != nil { + return nil, err + } + if len(zones) == 0 { + return nil, fmt.Errorf("no zones available") + } + + for _, zone := range zones { + if zone.Name == location { + matched := zone + return &matched, nil + } + } + + return nil, fmt.Errorf("zone not found in location %s", location) +} + +func getInstanceTypeForLocationAndType(location string, instanceTypeName string) (*v1.InstanceType, error) { + gpuType := strings.ToLower(instanceTypeName) + if idx := strings.Index(gpuType, "."); idx > 0 { + gpuType = gpuType[:idx] + } + + gpuMetadata, err := getInstanceTypeMetadata(gpuType) + if err != nil { + return nil, err + } + + ramInt64, err := gpuMetadata.memoryBytes.ByteCountInUnitInt64(v1.Gibibyte) + if err != nil { + return nil, err + } + ram := units.Base2Bytes(ramInt64 * int64(units.Gibibyte)) + + memoryInt64, err := gpuMetadata.gpuVRAM.ByteCountInUnitInt64(v1.Gibibyte) + if err != nil { + return nil, err + } + memory := units.Base2Bytes(memoryInt64 * int64(units.Gibibyte)) + + diskSizeInt64, err := gpuMetadata.diskBytes.ByteCountInUnitInt64(v1.Gibibyte) + if err != nil { + return nil, err + } + diskSize := units.Base2Bytes(diskSizeInt64 * int64(units.Gibibyte)) + + instanceType := v1.InstanceType{ + IsAvailable: true, + Type: instanceTypeName, + Memory: ram, + MemoryBytes: gpuMetadata.memoryBytes, + VCPU: gpuMetadata.vcpu, + Location: location, + Stoppable: false, + Rebootable: false, + IsContainer: false, + Provider: CloudProviderID, + Cloud: CloudProviderID, + BasePrice: &gpuMetadata.price, + EstimatedDeployTime: &gpuMetadata.estimatedDeployTime, + SupportedGPUs: []v1.GPU{{ + Count: gpuMetadata.gpuCount, + Type: strings.ToUpper(gpuType), + Manufacturer: gpuMetadata.gpuManufacturer, + Name: strings.ToUpper(gpuType), + Memory: memory, + MemoryBytes: gpuMetadata.gpuVRAM, + NetworkDetails: gpuMetadata.formFactor, + }}, + SupportedStorage: []v1.Storage{{ + Type: diskTypeSSD, + Count: 1, + Size: diskSize, + SizeBytes: gpuMetadata.diskBytes, + }}, + SupportedArchitectures: []v1.Architecture{gpuMetadata.architecture}, + } + + instanceType.ID = v1.MakeGenericInstanceTypeID(instanceType) + return &instanceType, nil +} + // sfcInstanceTypeMetadata is a struct that contains the metadata for a given instance type. // These values are not currently provided by the SFCompute API, so we need to hardcode them. type sfcInstanceTypeMetadata struct { diff --git a/v1/providers/sfcompute/raw_api.go b/v1/providers/sfcompute/raw_api.go new file mode 100644 index 00000000..1dc5fffb --- /dev/null +++ b/v1/providers/sfcompute/raw_api.go @@ -0,0 +1,123 @@ +package v1 + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" +) + +type sfcAccountMeResponse struct { + ID string `json:"id"` +} + +type sfcZoneListResponse struct { + Data []sfcZone `json:"data"` +} + +type sfcZone struct { + Name string `json:"name"` + Region string `json:"region"` + HardwareType string `json:"hardware_type"` + InterconnectType string `json:"interconnect_type"` + DeliveryType string `json:"delivery_type"` + AvailableCapacity []sfcZoneAvailability `json:"available_capacity"` +} + +type sfcZoneAvailability struct { + StartTimestamp int64 `json:"start_timestamp"` + EndTimestamp int64 `json:"end_timestamp"` + Quantity int64 `json:"quantity"` +} + +func (c *SFCClient) doJSON(ctx context.Context, method string, path string, body any, out any) error { + fullURL := strings.TrimRight(c.baseURL, "/") + path + + var requestBody io.Reader + if body != nil { + payload, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("marshal request body: %w", err) + } + requestBody = bytes.NewReader(payload) + } + + req, err := http.NewRequestWithContext(ctx, method, fullURL, requestBody) + if err != nil { + return fmt.Errorf("build request: %w", err) + } + + req.Header.Set("Authorization", "Bearer "+c.apiKey) + req.Header.Set("Accept", "application/json") + if body != nil { + req.Header.Set("Content-Type", "application/json") + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("execute request: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + rawBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read response body: %w", err) + } + + if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { + return fmt.Errorf("sfc api %s %s failed with status %d: %s", method, path, resp.StatusCode, strings.TrimSpace(string(rawBody))) + } + + if out == nil || len(rawBody) == 0 { + return nil + } + + if err := json.Unmarshal(rawBody, out); err != nil { + return fmt.Errorf("decode response body: %w", err) + } + + return nil +} + +func (c *SFCClient) getDefaultWorkspace(ctx context.Context) (string, error) { + c.workspaceMu.Lock() + defer c.workspaceMu.Unlock() + + if c.defaultWorkspace != "" { + return c.defaultWorkspace, nil + } + + var account sfcAccountMeResponse + if err := c.doJSON(ctx, http.MethodGet, "/v1/account/me", nil, &account); err != nil { + return "", err + } + if account.ID == "" { + return "", fmt.Errorf("account response missing id") + } + + c.defaultWorkspace = fmt.Sprintf("sfc:workspace:%s:default", account.ID) + return c.defaultWorkspace, nil +} + +func (c *SFCClient) getZones(ctx context.Context, includeUnavailable bool) ([]sfcZone, error) { + var resp sfcZoneListResponse + if err := c.doJSON(ctx, http.MethodGet, "/v0/zones", nil, &resp); err != nil { + return nil, err + } + + zones := make([]sfcZone, 0, len(resp.Data)) + for _, zone := range resp.Data { + if !hasCurrentCapacity(zone.AvailableCapacity) && !includeUnavailable { + continue + } + if strings.ToUpper(zone.DeliveryType) != deliveryTypeVM { + continue + } + zones = append(zones, zone) + } + + return zones, nil +} From d3b41e47630afed381ba1d3715ab0976f705671e Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Fri, 17 Apr 2026 21:11:46 +0000 Subject: [PATCH 2/2] remove sfcompute create cleanup side effects Co-authored-by: Brian Lechthaler --- v1/providers/sfcompute/instance.go | 38 +----------------------------- 1 file changed, 1 insertion(+), 37 deletions(-) diff --git a/v1/providers/sfcompute/instance.go b/v1/providers/sfcompute/instance.go index 520ff5cd..05b79886 100644 --- a/v1/providers/sfcompute/instance.go +++ b/v1/providers/sfcompute/instance.go @@ -78,17 +78,7 @@ func (c *SFCClient) CreateInstance(ctx context.Context, attrs v1.CreateInstanceA capacityName := makeManagedPoolName("brev-cap", c.refID, stage, location) procurementName := makeManagedPoolName("brev-proc", c.refID, stage, location) - var createdCapacityID string - var createdProcurementID string - var createdNodeID string - cleanup := true - defer func() { - if cleanup { - c.cleanupFailedInstanceCreate(ctx, createdNodeID, createdProcurementID, createdCapacityID) - } - }() - - capacityID, procurementID, createdCapacity, createdProcurement, err := c.ensureManagedCapacityAndProcurement( + capacityID, _, _, _, err := c.ensureManagedCapacityAndProcurement( ctx, workspace, zone.Name, @@ -102,12 +92,6 @@ func (c *SFCClient) CreateInstance(ctx context.Context, attrs v1.CreateInstanceA if err != nil { return nil, errors.WrapAndTrace(err) } - if createdCapacity { - createdCapacityID = capacityID - } - if createdProcurement { - createdProcurementID = procurementID - } nodeReq := components.CreateNodeRequest{ Name: optionalnullable.From(&nodeName), @@ -127,14 +111,12 @@ func (c *SFCClient) CreateInstance(ctx context.Context, attrs v1.CreateInstanceA if node == nil { return nil, errors.WrapAndTrace(fmt.Errorf("node response missing node")) } - createdNodeID = node.ID instance, err := c.GetInstance(ctx, v1.CloudProviderInstanceID(node.ID)) if err != nil { return nil, errors.WrapAndTrace(err) } - cleanup = false return instance, nil } @@ -556,24 +538,6 @@ func (c *SFCClient) findManagedInstanceByRefID(ctx context.Context, refID string return nil, nil } -func (c *SFCClient) cleanupFailedInstanceCreate(ctx context.Context, nodeID string, procurementID string, capacityID string) { - if nodeID != "" { - if _, err := c.client.Nodes.TerminateNode(ctx, nodeID); err != nil { - c.logger.Error(ctx, err, v1.LogField("msg", "sfc: failed to clean up node after create failure"), v1.LogField("nodeID", nodeID)) - } - } - if procurementID != "" { - if _, err := c.client.Procurements.Delete(ctx, procurementID); err != nil { - c.logger.Error(ctx, err, v1.LogField("msg", "sfc: failed to clean up procurement after create failure"), v1.LogField("procurementID", procurementID)) - } - } - if capacityID != "" { - if _, err := c.client.Capacities.Delete(ctx, capacityID); err != nil { - c.logger.Error(ctx, err, v1.LogField("msg", "sfc: failed to clean up capacity after create failure"), v1.LogField("capacityID", capacityID)) - } - } -} - func makeManagedNodeTags(cloudCredRefID string, stage string, attrs v1.CreateInstanceAttrs, location string, instanceType string) map[string]string { tags := make(map[string]string, len(attrs.Tags)+7) for k, v := range attrs.Tags {