Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ lark
numpy<2.0
opencv-contrib-python==4.10.0.84
ahrs
pyserial
10 changes: 10 additions & 0 deletions src/msgs/msg/LedStatus.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
uint8 CMD_SOLID = 1
uint8 CMD_FLASH = 2
uint8 CMD_PULSE = 3
uint8 CMD_OFF = 255

uint8 cmd
uint8 r
uint8 g
uint8 b
uint8 param # rate in tenths-of-Hz for FLASH/PULSE; 0 = default (2 Hz)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from launch import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description():
return LaunchDescription([
Node(
package="led_indicator",
executable="led_indicator_node",
name="led_indicator",
output="screen",
parameters=[{
"serial_port": "auto",
"baud_rate": 115200,
}],
)
])
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy

from msgs.msg import LedStatus

import serial
import serial.tools.list_ports

_SYNC = 0xAA


def _packet(cmd: int, r: int = 0, g: int = 0, b: int = 0, param: int = 0) -> bytes:
xor = cmd ^ r ^ g ^ b ^ param
return bytes([_SYNC, cmd, r, g, b, param, xor])


def _find_data_port() -> str | None:
ports = sorted(
p.device
for p in serial.tools.list_ports.comports()
if "trinkey" in (p.description or "").lower()
or "trinkey" in (p.product or "").lower()
or "adafruit" in (p.manufacturer or "").lower()
)
return ports[-1] if ports else None


class LedIndicatorNode(Node):
def __init__(self):
super().__init__("led_indicator")

self.declare_parameter("serial_port", "auto")
self.declare_parameter("baud_rate", 115200)

port = self.get_parameter("serial_port").value
baud = self.get_parameter("baud_rate").value

if port == "auto":
port = _find_data_port()
if port is None:
self.get_logger().warn(
"Trinkey data port not found — defaulting to /dev/ttyACM1. "
"Set 'serial_port' explicitly if wrong."
)
port = "/dev/ttyACM1"
else:
self.get_logger().info(f"Auto-detected Trinkey data port: {port}")

self._ser: serial.Serial | None = None
try:
self._ser = serial.Serial(port, baud, timeout=1, dsrdtr=False, rtscts=False)
self.get_logger().info(f"Connected to Pixel Trinkey on {port}")
except serial.SerialException as e:
self.get_logger().error(f"Could not open {port}: {e}. LED output disabled.")

reliable_qos = QoSProfile(depth=1, reliability=ReliabilityPolicy.RELIABLE)
self.create_subscription(LedStatus, "/led_status", self._cb, reliable_qos)

self._send(_packet(LedStatus.CMD_OFF))

def _cb(self, msg: LedStatus) -> None:
self._send(_packet(msg.cmd, msg.r, msg.g, msg.b, msg.param))

def _send(self, pkt: bytes) -> None:
if self._ser is None or not self._ser.is_open:
return
try:
self._ser.write(pkt)
except serial.SerialException as e:
self.get_logger().warn(f"Serial write failed: {e}", throttle_duration_sec=5.0)

def destroy_node(self) -> None:
self._send(_packet(LedStatus.CMD_OFF))
if self._ser and self._ser.is_open:
self._ser.close()
super().destroy_node()


def main(args=None):
rclpy.init(args=args)
node = LedIndicatorNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()


if __name__ == "__main__":
main()
23 changes: 23 additions & 0 deletions src/subsystems/navigation/led_indicator/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>led_indicator</name>
<version>0.0.0</version>
<description>LED status indicator for navigation mode (Red=autonomous, Blue=teleop, Flashing Green=arrived)</description>
<maintainer email="mdurrani808@gmail.com">mdurrani</maintainer>
<license>MIT</license>

<depend>rclpy</depend>
<depend>msgs</depend>
<depend>std_msgs</depend>
<exec_depend>python3-serial</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
4 changes: 4 additions & 0 deletions src/subsystems/navigation/led_indicator/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/led_indicator
[install]
install_scripts=$base/lib/led_indicator
28 changes: 28 additions & 0 deletions src/subsystems/navigation/led_indicator/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from setuptools import find_packages, setup
from glob import glob
import os

package_name = "led_indicator"

setup(
name=package_name,
version="0.0.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name, ["package.xml"]),
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
],
install_requires=["setuptools", "pyserial"],
zip_safe=True,
maintainer="mdurrani",
maintainer_email="mdurrani808@gmail.com",
description="LED status indicator via Adafruit Pixel Trinkey + NeoPixels",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"led_indicator_node = led_indicator.led_indicator_node:main",
],
},
)
Loading