Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions dimos/robot/cli/dimos.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def run(
from dimos.utils.logging_config import set_run_log_dir, setup_exception_handler

setup_exception_handler()
_setup_wizard()

cli_config_overrides: dict[str, Any] = ctx.obj

Expand Down Expand Up @@ -822,5 +823,18 @@ def rerun_bridge_cmd(
)


def _setup_wizard() -> None:
try:
from dimwizard.setup import setup_wizard
except ImportError:
return
try:
setup_wizard()
except KeyboardInterrupt:
raise
except Exception as e:
print(f" dimwizard setup skipped: {e}")


if __name__ == "__main__":
cli_main()
Empty file.
41 changes: 41 additions & 0 deletions dimwizard/dimwizard/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from __future__ import annotations

import signal
import sys
import threading

from dimwizard.advertise import Advertiser
from dimwizard.install import is_installed


def _run_beacon() -> None:
advertiser = Advertiser()

stop_event = threading.Event()

def _handle_signal(sig: int, _frame: object) -> None:
stop_event.set()

signal.signal(signal.SIGTERM, _handle_signal)
signal.signal(signal.SIGINT, _handle_signal)

try:
advertiser.start()
stop_event.wait()
finally:
advertiser.stop()


def main() -> None:
if not is_installed():
print("dimwizard: not installed, exiting.", file=sys.stderr)
sys.exit(0)
try:
_run_beacon()
except Exception as e:
print(f"dimwizard: beacon error: {e}", file=sys.stderr)
sys.exit(1)


if __name__ == "__main__":
main()
63 changes: 63 additions & 0 deletions dimwizard/dimwizard/advertise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

import os
import socket

from zeroconf import ServiceInfo, Zeroconf

SERVICE_TYPE = "_dimensional._tcp.local."
_PORT = 7667
_VIRTUAL_IFACE_PREFIXES = ("docker", "virbr", "lo", "tun", "veth", "br-")


def local_ip() -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
if not ip.startswith("127."):
return ip
except OSError:
pass
try:
import psutil
for iface, addrs in psutil.net_if_addrs().items():
if any(iface.startswith(p) for p in _VIRTUAL_IFACE_PREFIXES):
continue
for addr in addrs:
if addr.family == socket.AF_INET and not addr.address.startswith("127."):
return addr.address
except ImportError:
pass
raise OSError("no non-loopback IPv4 address found")


class Advertiser:
"""Registers a static mDNS beacon so the harness can discover this robot."""

def __init__(self) -> None:
self._robot_name = os.environ.get("DIMENSIONAL_ROBOT_NAME", socket.gethostname().split(".")[0])
self._lcm_url = os.environ.get("LCM_DEFAULT_URL", "udpm://239.255.76.67:7667?ttl=1")
self._zeroconf: Zeroconf | None = None
self._info: ServiceInfo | None = None

def start(self) -> None:
ip = local_ip()
self._zeroconf = Zeroconf()
self._info = ServiceInfo(
SERVICE_TYPE,
f"{self._robot_name}.{SERVICE_TYPE}",
addresses=[socket.inet_aton(ip)],
port=_PORT,
properties={b"lcm_url": self._lcm_url.encode()},
)
self._zeroconf.register_service(self._info)

def stop(self) -> None:
if self._zeroconf is None:
return
if self._info is not None:
self._zeroconf.unregister_service(self._info)
self._info = None
self._zeroconf.close()
self._zeroconf = None
42 changes: 42 additions & 0 deletions dimwizard/dimwizard/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import annotations

import os
import socket

import typer

from dimwizard.advertise import SERVICE_TYPE, local_ip
from dimwizard.install import is_installed, is_running, uninstall

app = typer.Typer(help="dimwizard - robot network beacon")
Comment thread
johnny-kantaros marked this conversation as resolved.


@app.command()
def status() -> None:
"""Show beacon status."""
installed = is_installed()
running = is_running()

print(f"installed: {'yes' if installed else 'no'}")
print(f"running: {'yes' if running else 'no'}")

if installed:
robot_name = os.environ.get("DIMENSIONAL_ROBOT_NAME", socket.gethostname().split(".")[0])
lcm_url = os.environ.get("LCM_DEFAULT_URL", "udpm://239.255.76.67:7667?ttl=1")
print(f"robot: {robot_name}")
print(f"mdns: {robot_name}.{SERVICE_TYPE}")
print(f"lcm url: {lcm_url}")
try:
print(f"ip: {local_ip()}")
except OSError as e:
print(f"ip: unavailable ({e})")


@app.command()
def kill() -> None:
"""Remove the dimwizard beacon service."""
uninstall()


def main() -> None:
app()
190 changes: 190 additions & 0 deletions dimwizard/dimwizard/install.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from __future__ import annotations

import os
import platform
import shlex
import socket
import subprocess
import sys
from pathlib import Path
from xml.sax.saxutils import escape

_DEFAULT_LCM_URL = "udpm://239.255.76.67:7667?ttl=1"

_LABEL = "com.dimensional.dimwizard"
_PLIST_PATH = Path.home() / "Library" / "LaunchAgents" / f"{_LABEL}.plist"
_SYSTEMD_PATH = Path.home() / ".config" / "systemd" / "user" / "dimwizard.service"
_LOG_PATH = Path.home() / "Library" / "Logs" / "dimwizard.log"


def _find_executable() -> list[str]:
return [sys.executable, "-m", "dimwizard"]


def is_installed() -> bool:
if platform.system() == "Darwin":
return _PLIST_PATH.exists()
if platform.system() == "Linux":
return _SYSTEMD_PATH.exists()
return False


def is_running() -> bool:
if platform.system() == "Darwin":
result = subprocess.run(
["launchctl", "list", _LABEL],
capture_output=True,
text=True,
)
return result.returncode == 0 and '"PID"' in result.stdout
if platform.system() == "Linux":
try:
result = subprocess.run(
["systemctl", "--user", "is-active", "dimwizard"],
capture_output=True,
text=True,
)
return result.stdout.strip() == "active"
except FileNotFoundError:
return False
return False


def install() -> bool:
if platform.system() == "Darwin":
return _install_mac()
if platform.system() == "Linux":
return _install_linux()
print(f" Unsupported platform: {platform.system()}")
return False


def uninstall() -> None:
if platform.system() == "Darwin":
_uninstall_mac()
elif platform.system() == "Linux":
_uninstall_linux()


def _install_mac() -> bool:
_PLIST_PATH.parent.mkdir(parents=True, exist_ok=True)
_LOG_PATH.parent.mkdir(parents=True, exist_ok=True)

robot_name = os.environ.get("DIMENSIONAL_ROBOT_NAME", socket.gethostname().split(".")[0])
lcm_url = os.environ.get("LCM_DEFAULT_URL", _DEFAULT_LCM_URL)

executable = _find_executable()
plist_args = "\n".join(f" <string>{escape(a)}</string>" for a in executable)
log_path = escape(str(_LOG_PATH))

plist = f"""\
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN"
"http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{_LABEL}</string>
<key>ProgramArguments</key>
<array>
{plist_args}
</array>
<key>EnvironmentVariables</key>
<dict>
<key>DIMENSIONAL_ROBOT_NAME</key>
<string>{escape(robot_name)}</string>
<key>LCM_DEFAULT_URL</key>
<string>{escape(lcm_url)}</string>
</dict>
<key>KeepAlive</key>
<dict>
<key>SuccessfulExit</key>
<false/>
</dict>
<key>RunAtLoad</key>
<true/>
<key>StandardOutPath</key>
<string>{log_path}</string>
<key>StandardErrorPath</key>
<string>{log_path}</string>
</dict>
</plist>
"""
_PLIST_PATH.write_text(plist)

subprocess.run(["launchctl", "unload", str(_PLIST_PATH)], capture_output=True)
result = subprocess.run(
["launchctl", "load", "-w", str(_PLIST_PATH)],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(f" Warning: launchctl load failed: {result.stderr.strip()}")
_PLIST_PATH.unlink(missing_ok=True)
return False
print(f" dimwizard installed — logs at {_LOG_PATH}")
return True


def _uninstall_mac() -> None:
if not _PLIST_PATH.exists():
print("dimwizard is not installed.")
return
subprocess.run(["launchctl", "unload", "-w", str(_PLIST_PATH)], capture_output=True)
_PLIST_PATH.unlink(missing_ok=True)
print(" dimwizard removed.")


def _install_linux() -> bool:
_SYSTEMD_PATH.parent.mkdir(parents=True, exist_ok=True)

robot_name = os.environ.get("DIMENSIONAL_ROBOT_NAME", socket.gethostname().split(".")[0])
lcm_url = os.environ.get("LCM_DEFAULT_URL", _DEFAULT_LCM_URL)

exec_start = " ".join(shlex.quote(a) for a in _find_executable())

unit = f"""\
[Unit]
Description=DimWizard — Dimensional robot network beacon
After=network.target

[Service]
Type=simple
ExecStart={exec_start}
Restart=on-failure
RestartSec=5
Environment=PYTHONUNBUFFERED=1
Environment="DIMENSIONAL_ROBOT_NAME={robot_name}"
Environment="LCM_DEFAULT_URL={lcm_url}"

[Install]
WantedBy=default.target
"""
_SYSTEMD_PATH.write_text(unit)

try:
subprocess.run(["systemctl", "--user", "daemon-reload"], check=True)
subprocess.run(["systemctl", "--user", "enable", "--now", "dimwizard"], check=True)
print(" dimwizard installed.")
return True
except FileNotFoundError:
print(f" systemctl not found — start manually: {exec_start}")
_SYSTEMD_PATH.unlink(missing_ok=True)
return False
except subprocess.CalledProcessError as e:
print(f" Failed to enable service: {e}")
_SYSTEMD_PATH.unlink(missing_ok=True)
return False


def _uninstall_linux() -> None:
if not _SYSTEMD_PATH.exists():
print("dimwizard is not installed.")
return
try:
subprocess.run(["systemctl", "--user", "disable", "--now", "dimwizard"], check=True)
except subprocess.CalledProcessError:
pass
_SYSTEMD_PATH.unlink(missing_ok=True)
subprocess.run(["systemctl", "--user", "daemon-reload"], capture_output=True)
print(" dimwizard removed.")
31 changes: 31 additions & 0 deletions dimwizard/dimwizard/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from __future__ import annotations

import os
import socket

import questionary

from dimwizard.install import install, is_installed


def setup_wizard() -> None:
"""Hook for dimos run - runs setup on invocation, skips if already installed."""
if is_installed():
return
Comment thread
johnny-kantaros marked this conversation as resolved.

robot_name = os.environ.get("DIMENSIONAL_ROBOT_NAME", socket.gethostname().split(".")[0])
print()
confirmed = questionary.confirm(
"Set up robot network discovery? (recommended)",
default=True,
).ask()

Comment thread
johnny-kantaros marked this conversation as resolved.
if confirmed is None:
return
if not confirmed:
return

if not install():
print(" ✗ Service installation failed — re-run `dimos run` to retry.\n")
return
print(f" ✓ {robot_name} is now discoverable on the network\n")
Loading
Loading