Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 89 additions & 15 deletions app/routes/v1/proxmox/vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import json
from typing import Literal
from urllib.parse import quote

import httpx
from fastapi import APIRouter, Depends
Expand All @@ -21,7 +22,7 @@
from app.core.vmid_guard import VmidProtectedError as GuardVmidProtected
from app.core.vmid_guard import assert_vmid_safe
from app.schemas.v1.common import Page
from app.schemas.v1.proxmox import VmActionResult, VmSummary
from app.schemas.v1.proxmox import TaskStatus, VmActionResult, VmSummary

router = APIRouter()
log = get_logger(__name__)
Expand Down Expand Up @@ -57,6 +58,23 @@ def _auth_headers(row: ProxmoxHost) -> dict[str, str]:
return {"Authorization": f"PVEAPIToken={row.token_ref}"}


def _assert_vmid_safe(row: ProxmoxHost, vmid: int, action: str) -> None:
"""Refuse destructive actions on protected VMIDs (canonical ranges + per-host
overrides). `action` only feeds the error message."""
overrides = (
json.loads(row.protected_vmids_override_json)
if row.protected_vmids_override_json
else None
)
try:
assert_vmid_safe(vmid, host_overrides=overrides)
except GuardVmidProtected as e:
raise VmidProtectedError(
message=f"VMID {vmid} is protected; '{action}' is refused",
details=e.details,
) from e


@router.get("/hosts/{host_id}/vms", response_model=Page[VmSummary])
async def list_host_vms(host_id: str, session: AsyncSession = Depends(_session)):
row = await _get_host(host_id, session)
Expand Down Expand Up @@ -118,21 +136,8 @@ async def vm_status_action(
details=[{"field": "action", "reason": f"one of {sorted(_ALLOWED_ACTIONS)}"}],
)
row = await _get_host(host_id, session)
# Destructive actions must respect the canonical protected-VMID ranges plus
# any per-host overrides (single guard, shared with deploy preflight).
if action in _DESTRUCTIVE_ACTIONS:
overrides = (
json.loads(row.protected_vmids_override_json)
if row.protected_vmids_override_json
else None
)
try:
assert_vmid_safe(vmid, host_overrides=overrides)
except GuardVmidProtected as e:
raise VmidProtectedError(
message=f"VMID {vmid} is protected; '{action}' is refused",
details=e.details,
) from e
_assert_vmid_safe(row, vmid, action)
base = row.api_url.rstrip("/")
url = f"{base}/api2/json/nodes/{row.node_name}/{vmtype}/{vmid}/status/{action}"
try:
Expand Down Expand Up @@ -167,3 +172,72 @@ def _unreachable(row: ProxmoxHost, err: Exception) -> Range42Error:
message=f"Proxmox host {row.name} is unreachable",
details=[{"field": "api_url", "reason": str(err)[:200]}],
)


@router.delete("/hosts/{host_id}/vms/{vmid}", response_model=VmActionResult)
async def vm_delete(
host_id: str,
vmid: int,
vmtype: Literal["qemu", "lxc"] = "qemu",
purge: bool = True,
session: AsyncSession = Depends(_session),
):
row = await _get_host(host_id, session)
_assert_vmid_safe(row, vmid, "delete")
base = row.api_url.rstrip("/")
url = f"{base}/api2/json/nodes/{row.node_name}/{vmtype}/{vmid}"
params: dict[str, int] = {}
if purge:
params["purge"] = 1
params["destroy-unreferenced-disks"] = 1
try:
async with httpx.AsyncClient(verify=False, timeout=15) as cli:
r = await cli.delete(url, headers=_auth_headers(row), params=params)
except httpx.RequestError as e:
raise _unreachable(row, e) from e
if r.status_code in (401, 403):
raise AuthFailedError(
details=[{"field": "token_ref", "reason": f"Proxmox API rejected credentials ({r.status_code})"}]
)
if r.status_code != 200:
text = (r.text or "").lower()
# PVE phrasing varies by version/guest type; also honor an explicit 409.
running_markers = ("running", "stop it first", "qemu process", "container is running")
if r.status_code == 409 or any(m in text for m in running_markers):
raise Range42Error(error="conflict", code="VM_RUNNING", status=409,
message="Stop the VM before deleting",
details=[{"field": "vmid", "reason": (r.text or "")[:300]}])
raise Range42Error(error="upstream_error", code="PROXMOX_ERROR", status=502,
message=f"Proxmox returned {r.status_code} for delete",
details=[{"field": "vmid", "reason": (r.text or "")[:300]}])
return VmActionResult(status="accepted", upid=r.json().get("data"))


@router.get("/hosts/{host_id}/tasks/{upid:path}/status", response_model=TaskStatus)
async def task_status(host_id: str, upid: str, session: AsyncSession = Depends(_session)):
row = await _get_host(host_id, session)
base = row.api_url.rstrip("/")
enc = quote(upid, safe="")
url = f"{base}/api2/json/nodes/{row.node_name}/tasks/{enc}/status"
try:
async with httpx.AsyncClient(verify=False, timeout=15) as cli:
r = await cli.get(url, headers=_auth_headers(row))
except httpx.RequestError as e:
raise _unreachable(row, e) from e
if r.status_code in (401, 403):
raise AuthFailedError(
details=[{"field": "token_ref", "reason": f"Proxmox API rejected credentials ({r.status_code})"}]
)
if r.status_code != 200:
raise Range42Error(
error="upstream_error",
code="PROXMOX_ERROR",
status=502,
message=f"Proxmox returned {r.status_code} for task status",
details=[{"field": "upid", "reason": (r.text or "")[:300]}],
)
data = r.json().get("data", {}) or {}
raw_status = data.get("status", "running")
status = raw_status if raw_status in ("running", "stopped") else "stopped"
return TaskStatus(upid=upid, status=status,
exitstatus=data.get("exitstatus"), node=row.node_name)
1 change: 1 addition & 0 deletions app/routes/vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import os
from pathlib import Path
from typing import Any

from fastapi import APIRouter, HTTPException
from fastapi.responses import JSONResponse
Expand Down
8 changes: 8 additions & 0 deletions app/schemas/v1/proxmox.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import annotations

from datetime import datetime
from typing import Literal

from pydantic import BaseModel, HttpUrl

Expand Down Expand Up @@ -47,3 +48,10 @@ class VmSummary(BaseModel):
class VmActionResult(BaseModel):
status: str = "accepted"
upid: str | None = None


class TaskStatus(BaseModel):
upid: str
status: Literal["running", "stopped"]
exitstatus: str | None = None
node: str
182 changes: 178 additions & 4 deletions tests/routes/test_proxmox_vms.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def __aexit__(self, *a):
return False

async def get(self, url, headers=None):
_FakeProxmox.calls.append(("GET", url))
_FakeProxmox.calls.append(("GET", url, None))
if url.endswith("/qemu"):
return _FakeResp(200, [
{"vmid": 4001, "name": "vuln-box-01", "status": "running",
Expand All @@ -68,12 +68,23 @@ async def get(self, url, headers=None):
return _FakeResp(200, [
{"vmid": 200, "name": "ct-1", "status": "stopped"},
])
if "/tasks/" in url and url.endswith("/status"):
return _FakeResp(200, {
"upid": "UPID:pve01:0001:delete::",
"status": "stopped",
"exitstatus": "OK",
"pid": 1,
})
return _FakeResp(404, [])

async def post(self, url, headers=None, json=None):
_FakeProxmox.calls.append(("POST", url))
return _FakeResp(200, "UPID:pve01:0000:start::")

async def delete(self, url, headers=None, params=None):
_FakeProxmox.calls.append(("DELETE", url, params))
return _FakeResp(200, "UPID:pve01:0001:delete::")


async def _create_host(c):
r = await c.post("/v1/proxmox/hosts", json={
Expand Down Expand Up @@ -102,7 +113,7 @@ async def test_list_host_vms_merges_qemu_and_lxc(tmp_path, monkeypatch):
assert byid[200]["type"] == "lxc"
# trailing slash in api_url must not produce a double slash
assert any(u == "https://pve01:8006/api2/json/nodes/pve01/qemu"
for (_, u) in _FakeProxmox.calls)
for (_, u, *_) in _FakeProxmox.calls)
finally:
await dbmod.dispose_engine()

Expand Down Expand Up @@ -133,7 +144,7 @@ async def test_vm_action_start_posts_to_pve_status_endpoint(tmp_path, monkeypatc
assert r.status_code == 200, r.text
assert r.json()["status"] == "accepted"
assert r.json()["upid"].startswith("UPID")
posts = [u for (m, u) in _FakeProxmox.calls if m == "POST"]
posts = [u for (m, u, *_) in _FakeProxmox.calls if m == "POST"]
assert posts == [
"https://pve01:8006/api2/json/nodes/pve01/qemu/4001/status/start"
]
Expand Down Expand Up @@ -169,9 +180,172 @@ async def test_vm_action_guards_protected_vmids_on_destructive(tmp_path, monkeyp
r = await c.post(f"/v1/proxmox/hosts/{hid}/vms/9000/status/shutdown")
assert r.status_code == 409
# no destructive POST reached Proxmox
assert [m for (m, _) in _FakeProxmox.calls if m == "POST"] == []
assert [m for (m, *_) in _FakeProxmox.calls if m == "POST"] == []
# non-destructive start IS allowed on a protected vmid
r = await c.post(f"/v1/proxmox/hosts/{hid}/vms/101/status/start")
assert r.status_code == 200, r.text
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_vm_delete_happy_path(tmp_path, monkeypatch):
app, dbmod = await _boot(tmp_path, monkeypatch)
monkeypatch.setattr(httpx, "AsyncClient", _FakeProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
r = await c.delete(f"/v1/proxmox/hosts/{hid}/vms/2001?vmtype=qemu&purge=true")
assert r.status_code == 200, r.text
assert r.json()["status"] == "accepted"
assert r.json()["upid"].startswith("UPID")
deletes = [(u, p) for (m, u, p) in _FakeProxmox.calls if m == "DELETE"]
assert deletes, "expected a DELETE to Proxmox"
url, params = deletes[0]
assert url == "https://pve01:8006/api2/json/nodes/pve01/qemu/2001"
assert params.get("purge") == 1
assert params.get("destroy-unreferenced-disks") == 1
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_vm_delete_refuses_protected_vmid(tmp_path, monkeypatch):
app, dbmod = await _boot(tmp_path, monkeypatch)
monkeypatch.setattr(httpx, "AsyncClient", _FakeProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
r = await c.delete(f"/v1/proxmox/hosts/{hid}/vms/100?vmtype=qemu")
assert r.status_code == 409
assert r.json()["code"] == "VMID_PROTECTED"
assert [m for (m, *_) in _FakeProxmox.calls if m == "DELETE"] == []
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_vm_delete_running_guest_conflict(tmp_path, monkeypatch):
app, dbmod = await _boot(tmp_path, monkeypatch)

class _RunningProxmox(_FakeProxmox):
async def delete(self, url, headers=None, params=None):
_FakeProxmox.calls.append(("DELETE", url, params))
return _FakeResp(500, "can't remove VM 4001 - running, stop it first")

monkeypatch.setattr(httpx, "AsyncClient", _RunningProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
r = await c.delete(f"/v1/proxmox/hosts/{hid}/vms/2001?vmtype=qemu")
assert r.status_code == 409, r.text
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_task_status_stopped_ok(tmp_path, monkeypatch):
app, dbmod = await _boot(tmp_path, monkeypatch)
monkeypatch.setattr(httpx, "AsyncClient", _FakeProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
upid = "UPID:pve01:0001:delete::"
r = await c.get(f"/v1/proxmox/hosts/{hid}/tasks/{upid}/status")
assert r.status_code == 200, r.text
body = r.json()
assert body["status"] == "stopped"
assert body["exitstatus"] == "OK"
assert body["node"] == "pve01"
gets = [u for (m, u, *_) in _FakeProxmox.calls if m == "GET"]
assert any("/tasks/UPID%3Apve01%3A0001%3Adelete%3A%3A/status" in u for u in gets)
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_vm_delete_running_guest_qemu_process_phrasing(tmp_path, monkeypatch):
"""Proxmox sometimes emits 'qemu process is still running' instead of 'stop it first';
the route must still return 409 VM_RUNNING for this phrasing."""
app, dbmod = await _boot(tmp_path, monkeypatch)

class _QemuProcessProxmox(_FakeProxmox):
async def delete(self, url, headers=None, params=None):
_FakeProxmox.calls.append(("DELETE", url, params))
return _FakeResp(500, "VM 2001 qemu process is still running")

monkeypatch.setattr(httpx, "AsyncClient", _QemuProcessProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
r = await c.delete(f"/v1/proxmox/hosts/{hid}/vms/2001?vmtype=qemu")
assert r.status_code == 409, r.text
assert r.json()["code"] == "VM_RUNNING"
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_task_status_unexpected_status_coerced_to_stopped(tmp_path, monkeypatch):
"""An unknown Proxmox status value must not raise a Pydantic ValidationError;
it should be coerced to 'stopped' and return HTTP 200."""
app, dbmod = await _boot(tmp_path, monkeypatch)

class _WeirdStatusProxmox(_FakeProxmox):
async def get(self, url, headers=None):
_FakeProxmox.calls.append(("GET", url, None))
if "/tasks/" in url and url.endswith("/status"):
return _FakeResp(200, {
"upid": "UPID:pve01:0001:delete::",
"status": "weird",
"exitstatus": "OK",
"pid": 1,
})
return await super().get(url, headers=headers)

monkeypatch.setattr(httpx, "AsyncClient", _WeirdStatusProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
r = await c.get(
f"/v1/proxmox/hosts/{hid}/tasks/UPID:pve01:0001:delete::/status"
)
assert r.status_code == 200, r.text
assert r.json()["status"] == "stopped"
finally:
await dbmod.dispose_engine()


@pytest.mark.asyncio
async def test_task_status_stopped_error(tmp_path, monkeypatch):
app, dbmod = await _boot(tmp_path, monkeypatch)

class _FailedTaskProxmox(_FakeProxmox):
async def get(self, url, headers=None):
_FakeProxmox.calls.append(("GET", url, None))
if "/tasks/" in url and url.endswith("/status"):
return _FakeResp(200, {
"upid": "UPID:pve01:0001:delete::",
"status": "stopped",
"exitstatus": "command failed",
"pid": 1,
})
return _FakeResp(404, [])

monkeypatch.setattr(httpx, "AsyncClient", _FailedTaskProxmox)
_FakeProxmox.calls = []
try:
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://t") as c:
hid = await _create_host(c)
r = await c.get(f"/v1/proxmox/hosts/{hid}/tasks/UPID:pve01:0001:delete::/status")
assert r.status_code == 200, r.text
assert r.json()["status"] == "stopped"
assert r.json()["exitstatus"] == "command failed"
finally:
await dbmod.dispose_engine()
Loading