From c4ac5da1b210cdb7ae5914fe4f5612c236222273 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 18 Jun 2026 11:36:55 +0500 Subject: [PATCH 1/2] Replace lsblk fs detection with blkid --- runner/README.md | 3 +- runner/internal/shim/backends/backends.go | 15 ++ runner/internal/shim/docker.go | 165 -------------- runner/internal/shim/volumes.go | 250 ++++++++++++++++++++++ 4 files changed, 267 insertions(+), 166 deletions(-) create mode 100644 runner/internal/shim/backends/backends.go create mode 100644 runner/internal/shim/volumes.go diff --git a/runner/README.md b/runner/README.md index be91a9b178..056031fe0c 100644 --- a/runner/README.md +++ b/runner/README.md @@ -94,13 +94,14 @@ These are non-exhaustive lists of external dependencies (executables, libraries) * `umount` * `mountpoint` * `lsblk` +* `blkid` * `mkfs.ext4` * (NVIDIA GPU SSH fleet instances only) `nvidia-smi` * (AMD SSH fleet instances only) `docker` (used for `amd-smi` container) * (Intel Gaudi SSH fleet instances only) `hl-smi` * ... -Debian/Ubuntu packages: `mount` (`mount`, `umount`), `util-linux` (`mountpoint`, `lsblk`), `e2fsprogs` (`mkfs.ext4`) +Debian/Ubuntu packages: `mount` (`mount`, `umount`), `util-linux` (`mountpoint`, `lsblk`, `blkid`), `e2fsprogs` (`mkfs.ext4`) ### `dstack-runner` diff --git a/runner/internal/shim/backends/backends.go b/runner/internal/shim/backends/backends.go new file mode 100644 index 0000000000..e99910db1d --- /dev/null +++ b/runner/internal/shim/backends/backends.go @@ -0,0 +1,15 @@ +package backends + +import ( + "fmt" +) + +func GetBackend(backendType string) (Backend, error) { + switch backendType { + case "aws": + return NewAWSBackend(), nil + case "gcp": + return NewGCPBackend(), nil + } + return nil, fmt.Errorf("unknown backend: %q", backendType) +} diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index a1318e4243..81381125cf 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "os" - "os/exec" "os/user" "path/filepath" "strconv" @@ -35,7 +34,6 @@ import ( "github.com/dstackai/dstack/runner/internal/common/gpu" "github.com/dstackai/dstack/runner/internal/common/log" "github.com/dstackai/dstack/runner/internal/common/types" - "github.com/dstackai/dstack/runner/internal/shim/backends" "github.com/dstackai/dstack/runner/internal/shim/host" ) @@ -612,169 +610,6 @@ func (d *DockerRunner) remove(ctx context.Context, task *Task) (err error) { return nil } -func getBackend(backendType string) (backends.Backend, error) { - switch backendType { - case "aws": - return backends.NewAWSBackend(), nil - case "gcp": - return backends.NewGCPBackend(), nil - } - return nil, fmt.Errorf("unknown backend: %q", backendType) -} - -func prepareVolumes(ctx context.Context, taskConfig TaskConfig) error { - for _, volume := range taskConfig.Volumes { - err := formatAndMountVolume(ctx, volume) - if err != nil { - return fmt.Errorf("format and mount volume: %w", err) - } - } - return nil -} - -func unmountVolumes(ctx context.Context, taskConfig TaskConfig) error { - if len(taskConfig.Volumes) == 0 { - return nil - } - log.Debug(ctx, "Unmounting volumes...") - var failed []string - for _, volume := range taskConfig.Volumes { - mountPoint := getVolumeMountPoint(volume.Name) - cmd := exec.CommandContext(ctx, "mountpoint", mountPoint) - if output, err := cmd.CombinedOutput(); err != nil { - log.Info(ctx, "skipping", "mountpoint", mountPoint, "output", output) - continue - } - cmd = exec.CommandContext(ctx, "umount", "-qf", mountPoint) - if output, err := cmd.CombinedOutput(); err != nil { - log.Error(ctx, "failed to unmount", "mountpoint", mountPoint, "output", output) - failed = append(failed, mountPoint) - } else { - log.Debug(ctx, "unmounted", "mountpoint", mountPoint) - } - } - if len(failed) > 0 { - return fmt.Errorf("failed to unmount volume(s): %v", failed) - } - return nil -} - -func formatAndMountVolume(ctx context.Context, volume VolumeInfo) error { - backend, err := getBackend(volume.Backend) - if err != nil { - return fmt.Errorf("get backend: %w", err) - } - deviceName, err := backend.GetRealDeviceName(volume.VolumeId, volume.DeviceName) - if err != nil { - return fmt.Errorf("get real device name: %w", err) - } - fsCreated, err := initFileSystem(ctx, deviceName, !volume.InitFs) - if err != nil { - return fmt.Errorf("init file system: %w", err) - } - // Make FS root directory world-writable (0777) to give any job user - // a permission to create new files - // NOTE: mke2fs (that is, mkfs.ext4) supports `-E root_perms=0777` since 1.47.1: - // https://e2fsprogs.sourceforge.net/e2fsprogs-release.html#1.47.1 - // but, as of 2024-12-04, this version is too new to rely on, for example, - // Ubuntu 24.04 LTS has only 1.47.0 - // 0 means "do not chmod root directory" - var fsRootPerms os.FileMode = 0 - // Change permissions only if the FS was created by us, don't mess with - // user-formatted volumes - if fsCreated { - fsRootPerms = 0o777 - } - err = mountDisk(ctx, deviceName, getVolumeMountPoint(volume.Name), fsRootPerms) - if err != nil { - return fmt.Errorf("mount disk: %w", err) - } - return nil -} - -func getVolumeMountPoint(volumeName string) string { - // Put volumes in dstack-specific dir to avoid clashes with host dirs. - // /mnt/disks is used since on some VM images other places may not be writable (e.g. GCP COS). - return fmt.Sprintf("/mnt/disks/dstack-volumes/%s", volumeName) -} - -func prepareInstanceMountPoints(taskConfig TaskConfig) error { - // If the instance volume directory doesn't exist, create it with world-writable permissions (0777) - // to give any job user a permission to create new files - // If the directory already exists, do nothing, don't mess with already set permissions, especially - // on SSH fleets where permissions are managed by the host admin - for _, mountPoint := range taskConfig.InstanceMounts { - if _, err := os.Stat(mountPoint.InstancePath); errors.Is(err, os.ErrNotExist) { - // All missing parent dirs are created with 0755 permissions - if err = os.MkdirAll(mountPoint.InstancePath, 0o755); err != nil { - return fmt.Errorf("create instance mount directory: %w", err) - } - if err = os.Chmod(mountPoint.InstancePath, 0o777); err != nil { - return fmt.Errorf("chmod instance mount directory: %w", err) - } - } else if err != nil { - return fmt.Errorf("stat instance mount directory: %w", err) - } - } - return nil -} - -// initFileSystem creates an ext4 file system on a disk only if the disk is not already has a file system. -// Returns true if the file system is created. -func initFileSystem(ctx context.Context, deviceName string, errorIfNotExists bool) (bool, error) { - // Run the lsblk command to get filesystem type - cmd := exec.CommandContext(ctx, "lsblk", "-no", "FSTYPE", deviceName) - var out bytes.Buffer - cmd.Stdout = &out - if err := cmd.Run(); err != nil { - return false, fmt.Errorf("failed to check if disk is formatted: %w", err) - } - - // If the output is not empty, the disk is already formatted - fsType := strings.TrimSpace(out.String()) - if fsType != "" { - return false, nil - } - - if errorIfNotExists { - return false, fmt.Errorf("disk has no file system") - } - - log.Debug(ctx, "formatting disk with ext4 filesystem...", "device", deviceName) - cmd = exec.CommandContext(ctx, "mkfs.ext4", "-F", deviceName) - if output, err := cmd.CombinedOutput(); err != nil { - return false, fmt.Errorf("failed to format disk: %w, output: %s", err, string(output)) - } - log.Debug(ctx, "disk formatted succesfully!", "device", deviceName) - return true, nil -} - -func mountDisk(ctx context.Context, deviceName, mountPoint string, fsRootPerms os.FileMode) error { - // Create the mount point directory if it doesn't exist - if _, err := os.Stat(mountPoint); os.IsNotExist(err) { - log.Debug(ctx, "creating mount point...", "mountpoint", mountPoint) - if err := os.MkdirAll(mountPoint, 0o755); err != nil { - return fmt.Errorf("failed to create mount point: %w", err) - } - } - - // Mount the disk to the mount point - log.Debug(ctx, "mounting disk...", "device", deviceName, "mountpoint", mountPoint) - cmd := exec.CommandContext(ctx, "mount", deviceName, mountPoint) - if output, err := cmd.CombinedOutput(); err != nil { - return fmt.Errorf("failed to mount disk: %w, output: %s", err, string(output)) - } - - if fsRootPerms != 0 { - if err := os.Chmod(mountPoint, fsRootPerms); err != nil { - return fmt.Errorf("failed to chmod volume root directory %s: %w", mountPoint, err) - } - } - - log.Debug(ctx, "disk mounted successfully!") - return nil -} - func pullImage(ctx context.Context, client docker.APIClient, taskConfig TaskConfig, logPath string, tracker *PullTracker) error { if !strings.Contains(taskConfig.ImageName, ":") { taskConfig.ImageName += ":latest" diff --git a/runner/internal/shim/volumes.go b/runner/internal/shim/volumes.go new file mode 100644 index 0000000000..b517774a2a --- /dev/null +++ b/runner/internal/shim/volumes.go @@ -0,0 +1,250 @@ +package shim + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "strings" + "time" + + "golang.org/x/sys/unix" + + "github.com/dstackai/dstack/runner/internal/common/log" + "github.com/dstackai/dstack/runner/internal/shim/backends" +) + +func prepareVolumes(ctx context.Context, taskConfig TaskConfig) error { + for _, volume := range taskConfig.Volumes { + err := formatAndMountVolume(ctx, volume) + if err != nil { + return fmt.Errorf("format and mount volume: %w", err) + } + } + return nil +} + +func unmountVolumes(ctx context.Context, taskConfig TaskConfig) error { + if len(taskConfig.Volumes) == 0 { + return nil + } + log.Debug(ctx, "Unmounting volumes...") + var failed []string + for _, volume := range taskConfig.Volumes { + mountPoint := getVolumeMountPoint(volume.Name) + cmd := exec.CommandContext(ctx, "mountpoint", mountPoint) + if output, err := cmd.CombinedOutput(); err != nil { + log.Info(ctx, "skipping", "mountpoint", mountPoint, "output", output) + continue + } + cmd = exec.CommandContext(ctx, "umount", "-qf", mountPoint) + if output, err := cmd.CombinedOutput(); err != nil { + log.Error(ctx, "failed to unmount", "mountpoint", mountPoint, "output", output) + failed = append(failed, mountPoint) + } else { + log.Debug(ctx, "unmounted", "mountpoint", mountPoint) + } + } + if len(failed) > 0 { + return fmt.Errorf("failed to unmount volume(s): %v", failed) + } + return nil +} + +func formatAndMountVolume(ctx context.Context, volume VolumeInfo) error { + backend, err := backends.GetBackend(volume.Backend) + if err != nil { + return fmt.Errorf("get backend: %w", err) + } + deviceName, err := backend.GetRealDeviceName(volume.VolumeId, volume.DeviceName) + if err != nil { + return fmt.Errorf("get real device name: %w", err) + } + fsCreated, err := initFileSystem(ctx, deviceName, !volume.InitFs) + if err != nil { + return fmt.Errorf("init file system: %w", err) + } + // Make FS root directory world-writable (0777) to give any job user + // a permission to create new files + // NOTE: mke2fs (that is, mkfs.ext4) supports `-E root_perms=0777` since 1.47.1: + // https://e2fsprogs.sourceforge.net/e2fsprogs-release.html#1.47.1 + // but, as of 2024-12-04, this version is too new to rely on, for example, + // Ubuntu 24.04 LTS has only 1.47.0 + // 0 means "do not chmod root directory" + var fsRootPerms os.FileMode = 0 + // Change permissions only if the FS was created by us, don't mess with + // user-formatted volumes + if fsCreated { + fsRootPerms = 0o777 + } + err = mountDisk(ctx, deviceName, getVolumeMountPoint(volume.Name), fsRootPerms) + if err != nil { + return fmt.Errorf("mount disk: %w", err) + } + return nil +} + +func getVolumeMountPoint(volumeName string) string { + // Put volumes in dstack-specific dir to avoid clashes with host dirs. + // /mnt/disks is used since on some VM images other places may not be writable (e.g. GCP COS). + return fmt.Sprintf("/mnt/disks/dstack-volumes/%s", volumeName) +} + +func prepareInstanceMountPoints(taskConfig TaskConfig) error { + // If the instance volume directory doesn't exist, create it with world-writable permissions (0777) + // to give any job user a permission to create new files + // If the directory already exists, do nothing, don't mess with already set permissions, especially + // on SSH fleets where permissions are managed by the host admin + for _, mountPoint := range taskConfig.InstanceMounts { + if _, err := os.Stat(mountPoint.InstancePath); errors.Is(err, os.ErrNotExist) { + // All missing parent dirs are created with 0755 permissions + if err = os.MkdirAll(mountPoint.InstancePath, 0o755); err != nil { + return fmt.Errorf("create instance mount directory: %w", err) + } + if err = os.Chmod(mountPoint.InstancePath, 0o777); err != nil { + return fmt.Errorf("chmod instance mount directory: %w", err) + } + } else if err != nil { + return fmt.Errorf("stat instance mount directory: %w", err) + } + } + return nil +} + +// initFileSystem creates an ext4 file system on a disk only if it does not +// already have one. Returns true if the file system was created. +// +// Safety contract: mkfs is reached ONLY after the device is confirmed to be a +// real, ready, non-zero-sized block device AND a direct superblock probe +// positively reports no signature. +func initFileSystem(ctx context.Context, deviceName string, errorIfNotExists bool) (bool, error) { + if err := waitForBlockDevice(ctx, deviceName, 10*time.Second); err != nil { + return false, fmt.Errorf("device %s not ready: %w", deviceName, err) + } + + fsType, hasFS, err := probeFilesystem(ctx, deviceName) + if err != nil { + return false, fmt.Errorf("failed to check if disk is formatted: %w", err) + } + if hasFS { + log.Debug(ctx, "disk already has a filesystem, skipping format", + "device", deviceName, "fstype", fsType) + return false, nil + } + + if errorIfNotExists { + return false, fmt.Errorf("disk %s has no file system", deviceName) + } + + log.Debug(ctx, "formatting disk with ext4 filesystem...", "device", deviceName) + cmd := exec.CommandContext(ctx, "mkfs.ext4", "-F", deviceName) + if output, err := cmd.CombinedOutput(); err != nil { + return false, fmt.Errorf("failed to format disk: %w, output: %s", err, string(output)) + } + log.Debug(ctx, "disk formatted succesfully!", "device", deviceName) + return true, nil +} + +// waitForBlockDevice blocks until deviceName is a block device with non-zero +// size, or until timeout. The retry loop is for availability (don't fail a job +// on a transient mid-attach state); the non-zero-block-device requirement is +// for safety (don't make a format decision about a not-ready device). +func waitForBlockDevice(ctx context.Context, deviceName string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + var lastErr error + for { + size, err := blockDeviceSize(deviceName) + if err == nil && size > 0 { + return nil + } + if err != nil { + lastErr = err + } else { + lastErr = fmt.Errorf("device has zero size") + } + if time.Now().After(deadline) { + return fmt.Errorf("not a ready non-zero block device within %s: %w", timeout, lastErr) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(200 * time.Millisecond): + } + } +} + +// blockDeviceSize returns the size in bytes of a block device, erroring if the +// path is not a block device or cannot be opened/queried. +func blockDeviceSize(deviceName string) (uint64, error) { + fi, err := os.Stat(deviceName) + if err != nil { + return 0, err + } + if fi.Mode()&os.ModeDevice == 0 || fi.Mode()&os.ModeCharDevice != 0 { + return 0, fmt.Errorf("%s is not a block device", deviceName) + } + f, err := os.OpenFile(deviceName, os.O_RDONLY, 0) + if err != nil { + return 0, err + } + defer func() { _ = f.Close() }() + // BLKGETSIZE64 returns the device size in bytes. + size, err := unix.IoctlGetInt(int(f.Fd()), unix.BLKGETSIZE64) + if err != nil { + return 0, fmt.Errorf("BLKGETSIZE64 ioctl on %s: %w", deviceName, err) + } + return uint64(size), nil +} + +// probeFilesystem reports the filesystem type on deviceName via a direct +// superblock probe (blkid -p), independent of the udev/lsblk cache. +func probeFilesystem(ctx context.Context, deviceName string) (string, bool, error) { + cmd := exec.CommandContext(ctx, "blkid", "-p", "-o", "value", "-s", "TYPE", deviceName) + var out bytes.Buffer + cmd.Stdout = &out + runErr := cmd.Run() + fsType := strings.TrimSpace(out.String()) + if fsType != "" { + return fsType, true, nil // a filesystem signature was found + } + + var exitErr *exec.ExitError + if errors.As(runErr, &exitErr) && exitErr.ExitCode() == 2 { + return "", false, nil // exit 2: no signature at all -> genuinely blank + } + if runErr == nil { + return "", false, fmt.Errorf( + "device %s has a non-filesystem signature but no filesystem; likely wrong device resolved", + deviceName) + } + return "", false, fmt.Errorf("blkid probe of %s failed: %w (output: %q)", + deviceName, runErr, out.String()) +} + +func mountDisk(ctx context.Context, deviceName, mountPoint string, fsRootPerms os.FileMode) error { + // Create the mount point directory if it doesn't exist + if _, err := os.Stat(mountPoint); os.IsNotExist(err) { + log.Debug(ctx, "creating mount point...", "mountpoint", mountPoint) + if err := os.MkdirAll(mountPoint, 0o755); err != nil { + return fmt.Errorf("failed to create mount point: %w", err) + } + } + + // Mount the disk to the mount point + log.Debug(ctx, "mounting disk...", "device", deviceName, "mountpoint", mountPoint) + cmd := exec.CommandContext(ctx, "mount", deviceName, mountPoint) + if output, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to mount disk: %w, output: %s", err, string(output)) + } + + if fsRootPerms != 0 { + if err := os.Chmod(mountPoint, fsRootPerms); err != nil { + return fmt.Errorf("failed to chmod volume root directory %s: %w", mountPoint, err) + } + } + + log.Debug(ctx, "disk mounted successfully!") + return nil +} From 92158e026807a650fb180ccb1d78821f61c7e78d Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 18 Jun 2026 14:27:43 +0500 Subject: [PATCH 2/2] Probe filesystem multiple times --- runner/.justfile | 58 ++++++++++--------- runner/internal/shim/volumes.go | 36 +++++++++++- .../pipeline_tasks/jobs_terminating.py | 2 + 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/runner/.justfile b/runner/.justfile index 5419f9eb4d..e07a8c8eb7 100644 --- a/runner/.justfile +++ b/runner/.justfile @@ -5,11 +5,13 @@ # Configuration: # - DSTACK_SHIM_UPLOAD_VERSION: Version of the runner and shim to upload # - DSTACK_SHIM_UPLOAD_S3_BUCKET: S3 bucket to upload binaries to +# - DSTACK_SHIM_BUILD_ARCH: Target architecture for runner and shim (defaults to amd64) # # Build Process: -# - Runner is always built for linux/amd64 -# - Shim can be built for any platform (defaults to host platform) -# - When uploading, shim is automatically built for linux/amd64 +# - Runner and shim are always built for linux (GOOS=linux is the only supported OS) +# - The target architecture is configurable via DSTACK_SHIM_BUILD_ARCH (or `just --set arch ...`) +# - CGO is enabled only for native builds (Linux host with a matching architecture); +# otherwise it is disabled and DCGM support is dropped # # Development Workflows: # - Local Development: @@ -31,13 +33,12 @@ export version := env("DSTACK_SHIM_UPLOAD_VERSION", "0.0.0") # S3 bucket to upload binaries to export s3_bucket := env("DSTACK_SHIM_UPLOAD_S3_BUCKET", "dstack-runner-downloads-stgn") -# Download URLs -export runner_download_url := "s3://" + s3_bucket + "/" + version + "/binaries/dstack-runner-linux-amd64" -export shim_download_url := "s3://" + s3_bucket + "/" + version + "/binaries/dstack-shim-linux-amd64" +# Target architecture for runner and shim (GOOS is always linux) +export arch := env("DSTACK_SHIM_BUILD_ARCH", "amd64") -# Shim build configuration -export shim_os := "" -export shim_arch := "" +# Download URLs +export runner_download_url := "s3://" + s3_bucket + "/" + version + "/binaries/dstack-runner-linux-" + arch +export shim_download_url := "s3://" + s3_bucket + "/" + version + "/binaries/dstack-shim-linux-" + arch # Go toolchain image for running tests in a container (keep in sync with go.mod) export go_version := env("DSTACK_GO_VERSION", "1.25") @@ -47,8 +48,8 @@ export go_version := env("DSTACK_GO_VERSION", "1.25") build-runner-binary: #!/usr/bin/env bash set -e - echo "Building runner for linux/amd64" - cd {{source_directory()}}/cmd/runner && CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "-X 'main.Version=$version' -extldflags '-static'" + echo "Building runner for linux/$arch" + cd {{source_directory()}}/cmd/runner && CGO_ENABLED=0 GOOS=linux GOARCH=$arch go build -ldflags "-X 'main.Version=$version' -extldflags '-static'" echo "Runner build complete!" # Build shim @@ -57,23 +58,23 @@ build-shim-binary: #!/usr/bin/env bash set -e cd {{source_directory()}}/cmd/shim - if [ -n "$shim_os" ] && [ -n "$shim_arch" ]; then - echo "Building shim for $shim_os/$shim_arch" - if [ "$shim_os" = "linux" ] && [ "$(uname -s)" != "Linux" ]; then - echo "WARNING: Cross-compiling to Linux, disabling CGO (DCGM unavailable)" - CGO_ENABLED=0 GOOS=$shim_os GOARCH=$shim_arch go build -ldflags "-X 'main.Version=$version' -extldflags '-static'" - else - CGO_ENABLED=1 GOOS=$shim_os GOARCH=$shim_arch go build -ldflags "-X 'main.Version=$version'" - fi + echo "Building shim for linux/$arch" + host_arch=$(uname -m) + case "$host_arch" in + x86_64) host_arch=amd64 ;; + aarch64 | arm64) host_arch=arm64 ;; + esac + if [ "$(uname -s)" = "Linux" ] && [ "$host_arch" = "$arch" ]; then + CGO_ENABLED=1 GOOS=linux GOARCH=$arch go build -ldflags "-X 'main.Version=$version'" else - echo "Building shim for current platform" - go build -ldflags "-X 'main.Version=$version' -extldflags '-static'" + echo "WARNING: Cross-compiling to linux/$arch, disabling CGO (DCGM unavailable)" + CGO_ENABLED=0 GOOS=linux GOARCH=$arch go build -ldflags "-X 'main.Version=$version' -extldflags '-static'" fi echo "Shim build (version: $version) complete!" # Build both runner and shim build-runner: build-runner-binary build-shim-binary - echo "Build complete! Linux AMD64 binaries are in their respective cmd directories." + echo "Build complete! linux/$arch binaries are in their respective cmd directories." # Clean build artifacts clean-runner: @@ -98,13 +99,18 @@ test-runner-in-container *args="-short ./...": golang:{{go_version}} \ go test -race {{args}} -# Validate shim is built for linux/amd64 +# Validate shim is built for the configured linux architecture [private] validate-shim-binary: #!/usr/bin/env bash set -e - if ! file {{source_directory()}}/cmd/shim/shim | grep -q "ELF 64-bit LSB executable, x86-64"; then - echo "Error: Shim must be built for linux/amd64 for upload" + case "$arch" in + amd64) expected="x86-64" ;; + arm64) expected="ARM aarch64" ;; + *) echo "Error: Unsupported arch '$arch'"; exit 1 ;; + esac + if ! file {{source_directory()}}/cmd/shim/shim | grep -q "ELF 64-bit LSB executable, $expected"; then + echo "Error: Shim must be built for linux/$arch for upload" exit 1 fi @@ -125,7 +131,7 @@ upload-runner-binary: upload-shim-binary: #!/usr/bin/env bash set -e - just --set shim_os linux --set shim_arch amd64 build-shim-binary + just build-shim-binary just validate-shim-binary aws s3 cp {{source_directory()}}/cmd/shim/shim "{{shim_download_url}}" --acl public-read echo "Uploaded shim to S3" diff --git a/runner/internal/shim/volumes.go b/runner/internal/shim/volumes.go index b517774a2a..eb6fe024f6 100644 --- a/runner/internal/shim/volumes.go +++ b/runner/internal/shim/volumes.go @@ -118,13 +118,13 @@ func prepareInstanceMountPoints(taskConfig TaskConfig) error { // // Safety contract: mkfs is reached ONLY after the device is confirmed to be a // real, ready, non-zero-sized block device AND a direct superblock probe -// positively reports no signature. +// repeatedly confirms no signature. func initFileSystem(ctx context.Context, deviceName string, errorIfNotExists bool) (bool, error) { if err := waitForBlockDevice(ctx, deviceName, 10*time.Second); err != nil { return false, fmt.Errorf("device %s not ready: %w", deviceName, err) } - fsType, hasFS, err := probeFilesystem(ctx, deviceName) + fsType, hasFS, err := hasFilesystem(ctx, deviceName) if err != nil { return false, fmt.Errorf("failed to check if disk is formatted: %w", err) } @@ -198,6 +198,38 @@ func blockDeviceSize(deviceName string) (uint64, error) { return uint64(size), nil } +// hasFilesystem reports whether deviceName has a filesystem, re-confirming a +// "no filesystem" verdict before believing it. +// +// The check is asymmetric on purpose: it prevents a hypothetical +// transient false "no-fs" from leading to a destructive mkfs. +func hasFilesystem(ctx context.Context, deviceName string) (string, bool, error) { + const confirmAttempts = 3 + const confirmInterval = 1 * time.Second + + fsType, hasFS, err := probeFilesystem(ctx, deviceName) + if err != nil || hasFS { + return fsType, hasFS, err + } + for attempt := range confirmAttempts { + select { + case <-ctx.Done(): + return "", false, ctx.Err() + case <-time.After(confirmInterval): + } + fsType, hasFS, err = probeFilesystem(ctx, deviceName) + if err != nil { + return "", false, err + } + if hasFS { + log.Warning(ctx, "filesystem appeared on re-probe, not formatting", + "fstype", fsType, "attempt", attempt) + return fsType, true, nil + } + } + return "", false, nil +} + // probeFilesystem reports the filesystem type on deviceName via a direct // superblock probe (blkid -p), independent of the udev/lsblk cache. func probeFilesystem(ctx context.Context, deviceName string) (string, bool, error) { diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index adedf9bb4b..be8d80948d 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -530,6 +530,8 @@ async def _apply_process_result( ) if result.volume_update_rows: + # Safe to update volumes without lock as long as no other pipeline/task + # updates active attached volumes and/or the races are accepted. await session.execute(update(VolumeModel), result.volume_update_rows) if result.detached_volume_ids and instance_model is not None: