Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions backend_py/src/routes/recordings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,23 @@ def get_recordings(request: Request) -> list[RecordingInfo]:
return recordings_service.get_recordings()


@recordings_router.get("/zip", summary="Download all recordings as a zip file")
def zip_recordings(request: Request) -> FileResponse:
recordings_service: RecordingsService = request.app.state.recordings_service

zip_file_path = recordings_service.zip_recordings()
if not zip_file_path:
raise HTTPException(status_code=404, detail="No recordings to zip")

resp = FileResponse(
zip_file_path,
media_type="application/zip",
filename="recordings.zip",
headers={"Content-Disposition": "attachment; filename=recordings.zip"},
)
return resp


@recordings_router.get("/{recording_path}", summary="Get a specific recording")
def get_recording(request: Request, recording_path: str) -> FileResponse:

Expand Down Expand Up @@ -65,20 +82,3 @@ def rename_recording(
)

return response


@recordings_router.get("/zip", summary="Download all recordings as a zip file")
def zip_recordings(request: Request) -> FileResponse:
recordings_service: RecordingsService = request.app.state.recordings_service

zip_file_path = recordings_service.zip_recordings()
if not zip_file_path:
raise HTTPException(status_code=404, detail="No recordings to zip")

resp = FileResponse(
zip_file_path,
media_type="application/zip",
filename="recordings.zip",
headers={"Content-Disposition": "attachment; filename=recordings.zip"},
)
return resp
24 changes: 21 additions & 3 deletions backend_py/src/services/cameras/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import fcntl
import logging
import struct
import threading
from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import Any
Expand All @@ -28,6 +29,7 @@
ControlTypeEnum,
DeviceType,
FormatSizeModel,
FrameDropStats,
IntervalModel,
MenuItemModel,
StreamEncodeTypeEnum,
Expand Down Expand Up @@ -314,6 +316,7 @@ def _clear(self) -> None:
class Device(events.EventEmitter):
def __init__(self, device_info: DeviceInfo) -> None:
super().__init__()

self.cameras: list[Camera] = []
for device_path in device_info.device_paths:
self.cameras.append(Camera(device_path))
Expand All @@ -334,9 +337,14 @@ def __init__(self, device_info: DeviceInfo) -> None:
self.nickname = ""
self.stream = Stream()

# frame stats is touched by both the main thread and the capture thread
self._frame_stats_lock = threading.Lock()
self.frame_stats = FrameDropStats(num_drops=0)

# each device has a streamrunner, but not all of them are used if
# they are a follower (shd)
self.stream_runner = StreamRunner(self.stream)
self.stream_runner.on("frame_drop", self._update_drop_stats)

for camera in self.cameras:
for encoding in camera.formats:
Expand Down Expand Up @@ -369,6 +377,11 @@ def __init__(self, device_info: DeviceInfo) -> None:

self._get_controls()

def _update_drop_stats(self) -> None:
with self._frame_stats_lock:
self.frame_stats.num_drops += 1
self.emit("frame_stats")

def _on_stream_error(self, err: str) -> None:
self.logger.error(err)
# TODO
Expand Down Expand Up @@ -485,9 +498,9 @@ def add_control_from_option(
option_name: str,
default_value: Any,
control_type: ControlTypeEnum,
max_value: float = 0,
min_value: float = 0,
step: float = 0,
max_value: float | int = 0,
min_value: float | int = 0,
step: float | int = 0,
) -> None:
try:
option = self._options[option_name]
Expand Down Expand Up @@ -521,6 +534,10 @@ def start_stream(self) -> None:
self.stream.enabled = True
self.stream_runner.start()

with self._frame_stats_lock:
self.frame_stats = FrameDropStats(num_drops=0)
self.emit("frame_stats")

def stop_stream(self) -> None:
self.stream.enabled = False
self.stream_runner.stop()
Expand All @@ -529,6 +546,7 @@ def close(self) -> None:
"""
Cleanup resources of the device
"""
self.stream_runner.stop()
for camera in self.cameras:
camera.close()
self.v4l2_device.close()
Expand Down
126 changes: 82 additions & 44 deletions backend_py/src/services/cameras/device_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ def __init__(

self.logger = logging.getLogger("dwe_os_2.cameras.DeviceManager")

# Captured in start_monitoring
self._loop: asyncio.AbstractEventLoop | None = None

def start_monitoring(self) -> None:
"""
Begin monitoring for devices in the background
"""
self._is_monitoring = True
self._loop = asyncio.get_running_loop()
asyncio.create_task(self._monitor())

def stop_monitoring(self) -> None:
Expand Down Expand Up @@ -125,6 +129,8 @@ def create_device(self, device_info: DeviceInfo) -> Device | None:
lambda _: self._append_stream_error(DeviceModel.model_validate(device)),
)

device.on("frame_stats", lambda: self._schedule_emit_frame_stats(device))

if self.serial:
device.on("pwm_frequency", lambda fps: self.serial.apply_from_fps(fps))

Expand Down Expand Up @@ -278,7 +284,7 @@ async def _get_devices(self, old_devices: list[DeviceInfo]) -> list[DeviceInfo]:
new_devices = list_diff(devices_info, old_devices)

# find the removed devices
removed_devices = list_diff(old_devices, devices_info)
removed_devices: list[DeviceInfo] = list_diff(old_devices, devices_info)

device_added = False

Expand Down Expand Up @@ -313,51 +319,52 @@ async def _get_devices(self, old_devices: list[DeviceInfo]) -> list[DeviceInfo]:

# remove the old devices
for device_info in removed_devices:
for device in self.devices:
if device.device_info == device_info:
device.stream_runner.stop()

# What to do when a device is unplugged
# Remove unplugged followers from leaders, and unplugged leaders
# as leaders
if (
device.device_type == DeviceType.STELLARHD_LEADER
or device.device_type == DeviceType.STELLARHD_FOLLOWER
):
leader_casted = cast(SHDDevice, device)
for follower_bus_info in leader_casted.followers:
# This can be optimized, but it truly does not matter
follower = self._find_device_with_bus_info(
follower_bus_info
)
# Remember, follower might not exist now - never inherent
# truth to its existance
if follower:
follower_casted = cast(SHDDevice, follower)
removed_device = find_device_with_bus_info(
self.devices, device_info.bus_info
)

if not removed_device:
continue

removed_device.stream_runner.stop()

# What to do when a device is unplugged
# Remove unplugged followers from leaders, and unplugged leaders
# as leaders
if (
removed_device.device_type == DeviceType.STELLARHD_LEADER
or removed_device.device_type == DeviceType.STELLARHD_FOLLOWER
):
leader_casted = cast(SHDDevice, removed_device)
for follower_bus_info in leader_casted.followers:
# This can be optimized, but it truly does not matter
try:
follower = self._find_device_with_bus_info(follower_bus_info)
# Remember, follower might not exist now - never inherent
# truth to its existence
follower_casted = cast(SHDDevice, follower)
leader_casted.remove_follower(follower_casted)
self.settings_manager.save_device(leader_casted)
except DeviceNotFoundException:
continue

if removed_device.device_type == DeviceType.STELLARHD_FOLLOWER:
follower_casted = cast(SHDDevice, removed_device)
if follower_casted.is_managed:
for device in self.devices:
if (
device.device_type == DeviceType.STELLARHD_LEADER
or device.device_type == DeviceType.STELLARHD_FOLLOWER
):
leader_casted = cast(SHDDevice, device)
if follower_casted.bus_info in leader_casted.followers:
leader_casted.remove_follower(follower_casted)
self.settings_manager.save_device(leader_casted)
if device.device_type == DeviceType.STELLARHD_FOLLOWER:
follower_casted = cast(SHDDevice, device)
if follower_casted.is_managed:
# TODO: Fix this
for device in self.devices:
if (
device.device_type == DeviceType.STELLARHD_LEADER
or device.device_type
== DeviceType.STELLARHD_FOLLOWER
):
leader_casted = cast(SHDDevice, device)
if (
follower_casted.bus_info
in leader_casted.followers
):
leader_casted.remove_follower(follower_casted)
self.settings_manager.save_device(leader_casted)

self.devices.remove(device)
self.logger.info(f"Device Removed: {device_info.bus_info}")

await self.sio.emit("device_removed", device_info.bus_info)

self.devices.remove(removed_device)
self.logger.info(f"Device Removed: {device_info.bus_info}")

await self.sio.emit("device_removed", device_info.bus_info)

if device_added:
# FIXME: Issue where sometimes frontend updates too quickly before the
Expand All @@ -366,6 +373,37 @@ async def _get_devices(self, old_devices: list[DeviceInfo]) -> list[DeviceInfo]:

return devices_info

def _schedule_emit_frame_stats(self, device: Device) -> None:
"""
Schedule a frame_stats emit from any thread onto the main asyncio loop.
"""
loop = self._loop
if loop is None or loop.is_closed():
return
try:
asyncio.run_coroutine_threadsafe(self._emit_frame_stats(device), loop)
except RuntimeError:
return

async def _emit_frame_stats(self, device: Device) -> None:
"""
Emit frame stats to the frontend via SocketIO
"""
# Snapshot under the lock so we don't race with the capture thread's
# increment or with start_stream's reset.
# NOTE: This may cause minor perf issues when dropping a lot of frames
with device._frame_stats_lock:
frame_stats_payload = device.frame_stats.model_dump()

# TODO: switch more to use namespace
await self.sio.emit(
"device.frame_stats",
{
"bus_info": device.bus_info,
"frame_stats": frame_stats_payload,
},
)

async def _monitor(self) -> None:
"""
Internal code to monitor devices for changes
Expand Down
3 changes: 2 additions & 1 deletion backend_py/src/services/cameras/ehd.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def _get_options(self) -> dict[str, BaseOption]:
lambda bitrate: int(
round(bitrate * 1000000)
), # convert to bps from mpbs (round for float imprecision)
lambda bitrate: cast(int, bitrate) / 1000000.0, # convert to mpbs from bps
# convert to mpbs from bps
lambda bitrate: cast(int, bitrate) / 1000000.0,
)

# UVC xu gop control
Expand Down
17 changes: 12 additions & 5 deletions backend_py/src/services/cameras/pydantic_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ class Config:


class ControlFlagsModel(BaseModel):
default_value: float
max_value: float
min_value: float
step: float
default_value: float | int
max_value: float | int
min_value: float | int
step: float | int
control_type: ControlTypeEnum = Field(...)
menu: list[MenuItemModel] = Field(default_factory=list)

Expand All @@ -117,7 +117,7 @@ class ControlModel(BaseModel):
flags: ControlFlagsModel
control_id: int
name: str
value: float
value: float | int

class Config:
from_attributes = True
Expand Down Expand Up @@ -165,6 +165,10 @@ class Config:
from_attributes = True


class FrameDropStats(BaseModel):
num_drops: int


class DeviceModel(BaseModel):
# List of cameras, e.g. /dev/video0, /dev/video2
cameras: list[CameraModel] | None = None
Expand All @@ -191,6 +195,9 @@ class DeviceModel(BaseModel):
followers: list[str] = []
# True if is a follower and stream is managed by the leader
is_managed: bool = False
# Per-stream drop stats. Resets every time the stream is restarted so
# the count is "drops in the current stream", not cumulative.
frame_stats: FrameDropStats = FrameDropStats(num_drops=0)

class Config:
from_attributes = True
Expand Down
Loading
Loading