diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ae8ee57 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.vscode* \ No newline at end of file diff --git a/rover_ws/vision/.gitignore b/rover_ws/vision/.gitignore new file mode 100644 index 0000000..29e437e --- /dev/null +++ b/rover_ws/vision/.gitignore @@ -0,0 +1,2 @@ +*.pcd +__pycache__ \ No newline at end of file diff --git a/rover_ws/vision/README.md b/rover_ws/vision/README.md new file mode 100644 index 0000000..7438954 --- /dev/null +++ b/rover_ws/vision/README.md @@ -0,0 +1,237 @@ +# ๐Ÿ›ฐ๏ธ Rover Vision โ€” Road Feature Detection & BEV Mapping + +A real-time computer vision pipeline for autonomous rover competition, performing **lane detection**, **road marker recognition**, and **bird's-eye-view (BEV) ground projection** from a single monocular camera. + +--- + +## Overview + +This system takes a forward-facing camera feed and produces: + +- Detected **lane lines** in pixel space and metric ground coordinates +- Detected **circular road markers** (e.g. roundabouts, stop circles) with real-world center and radius +- A **bird's-eye-view warp** of the scene for top-down situational awareness +- **3D point clouds** (`.pcd`) of lane lines and circle boundaries on the ground plane + +--- + +## Architecture + +``` +Camera Frame + โ”‚ + โ–ผ +RoadFeatureDetector + โ”œโ”€โ”€ detect_edges() โ†’ White-line mask (HSV threshold + morphology + thinning) + โ”œโ”€โ”€ detect_lines() โ†’ Hough line segments (lane markings) + โ””โ”€โ”€ _detect_circles() โ†’ Hough circles + contour fallback (road markers) + โ”‚ + โ–ผ +HomographyBEV + โ”œโ”€โ”€ pixel_to_ground() โ†’ Single pixel โ†’ (X, Y) metric ground coords + โ”œโ”€โ”€ pixels_to_ground() โ†’ Batch pixel projection + โ”œโ”€โ”€ mask_to_pointcloud()โ†’ Full mask โ†’ (N, 3) XYZ point cloud + โ””โ”€โ”€ warp_to_bev() โ†’ Undistort + perspective warp to top-down view + โ”‚ + โ–ผ +RoadFeatureBEVPipeline + โ”œโ”€โ”€ Lane point cloud โ†’ (N, 3) float64 [X, Y, 0] in metres + โ”œโ”€โ”€ Circle list โ†’ [(X, Y, radius_m), ...] + โ”œโ”€โ”€ Circle clouds โ†’ [(N, 3), ...] ring/disc point clouds + โ””โ”€โ”€ BEV image โ†’ Warped top-down BGR frame +``` + +--- + +## Modules + +| File | Description | +|---|---| +| `homography.py` | Camera model, homography computation, lens undistortion, ground projection, BEV warping, PCD export | +| `road_features_detector.py` | Lane and circle detection pipeline on raw frames | +| `pipeline.py` | End-to-end orchestration; outputs annotated frames, BEV, and point clouds | + +--- + +## Setup + +### Requirements + +```bash +pip install opencv-python opencv-contrib-python numpy +``` + +> `opencv-contrib-python` is required for `cv2.ximgproc.thinning` (skeletonisation of lane masks). + +### Folder Structure + +``` +project/ +โ”œโ”€โ”€ data/ +โ”‚ โ””โ”€โ”€ raw/ +โ”‚ โ”œโ”€โ”€ test_lane.mp4 +โ”‚ โ””โ”€โ”€ ground.jpeg +โ”œโ”€โ”€ homography.py +โ”œโ”€โ”€ road_features_detector.py +โ””โ”€โ”€ pipeline.py +``` + +--- + +## Camera Calibration + +The pipeline requires intrinsic camera parameters. Provide the **3ร—3 intrinsic matrix K**: + +```python +K = np.array([ + [fx, 0, cx], + [ 0, fy, cy], + [ 0, 0, 1] +], dtype=np.float64) +``` + +Optionally provide a **distortion coefficient vector** (OpenCV 5-parameter model): + +```python +dist_coeffs = np.array([k1, k2, p1, p2, k3]) +``` + +If omitted, distortion is assumed to be zero. When provided, lens undistortion is applied automatically before every projection and BEV warp. + +And the camera **mounting parameters**: + +| Parameter | Description | +|---|---| +| `camera_height` | Height of camera above ground plane (metres) | +| `pitch_deg` | Camera pitch angle (degrees, negative = downward tilt) | +| `yaw_deg` | Camera yaw angle (degrees, positive = rotated right). Default `0.0` | +| `roll_deg` | Camera roll angle (degrees, positive = tilted right). Default `0.0` | + +Example values used in competition testing: + +```python +camera_height = 1.33 # metres +pitch_deg = -45 # degrees +yaw_deg = -2 # degrees โ€” small left offset from mount +roll_deg = -7 # degrees โ€” slight sideways tilt +``` + +### Rotation Convention + +The full world-to-camera rotation is composed as: + +``` +R = M ยท R_pitch ยท R_yaw ยท R_roll +``` + +where `M = diag(-1, 1, 1)` mirrors the X axis so that +X is rightward in the image. Each angle has an independent, physically meaningful effect: + +| Angle | Axis | Effect | +|---|---|---| +| `pitch_deg` | X | Tilts camera up/down โ€” primary mounting angle | +| `yaw_deg` | Z | Rotates camera left/right relative to rover heading | +| `roll_deg` | Y | Tilts camera sideways โ€” corrects lateral mounting error | + +For a perfectly aligned camera only `pitch_deg` is non-zero. In practice, small yaw/roll corrections (ยฑ2โ€“7ยฐ) compensate for mounting tolerances that would otherwise produce a skewed BEV. + +--- + +## Usage + +### Run the full pipeline on a video + +```bash +python pipeline.py +``` + +This opens a video feed and displays three windows: + +- **Road Features** โ€” annotated frame with detected lanes and circles +- **Lane Mask** โ€” binary mask of detected lane lines +- **BEV** โ€” bird's-eye-view warp of the current frame + +### Keyboard Controls + +| Key | Action | +|---|---| +| `ESC` | Quit | +| `s` | Save current frame's lane point cloud as `.pcd` | + +On exit, the full merged lane point cloud across all frames is saved to `lane_cloud_full.pcd`. + +### Run homography standalone (single image) + +```bash +python homography.py +``` + +Loads `data/raw/ground.jpeg`, projects a pixel to metric ground, warps to BEV, generates a point cloud from a thresholded mask, and saves `ground_plane.pcd`. + +--- + +## Outputs + +### Point Cloud (`.pcd`) + +Ground-plane point clouds are saved in **PCD ASCII format** compatible with CloudCompare, Open3D, and ROS: + +``` +FIELDS x y z +TYPE F F F +DATA ascii +``` + +Each point is a 3D position `(X, Y, 0)` in the rover's ground coordinate frame, where: + +- **X** โ€” lateral axis (left/right) +- **Y** โ€” forward axis (depth from rover) +- **Z** โ€” always 0 (ground plane) + +### Circle Detection Output + +Each detected circle is reported as: + +``` +center = (X, Y) # metric ground position in metres +radius = r # estimated real-world radius in metres +``` + +--- + +## Coordinate Frame + +``` + โ–ฒ Y (forward) + โ”‚ + โ”‚ + โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ถ X (right) + โ”‚ + Camera/Rover origin +``` + +The ground plane is `Z = 0`. All projections assume a flat ground surface. + +--- + +## Tuning + +Key parameters to adjust for different environments: + +| Parameter | Location | Effect | +|---|---|---| +| `lower_white` / `upper_white` | `RoadFeatureDetector.__init__` | HSV range for white line detection | +| `min_radius` / `max_radius` | `RoadFeatureDetector.__init__` | Circle size filter (pixels) | +| Hough `threshold`, `minLineLength`, `maxLineGap` | `detect_lines()` | Lane line sensitivity | +| `circularity` threshold (0.6) | `_detect_circles()` | Roundness filter for contour fallback | +| `white_ratio` threshold (0.35) | `_detect_circles()` | Minimum white fill inside detected circle | +| `yaw_deg` / `roll_deg` | `HomographyBEV.__init__` | Fine-tune mounting misalignment; adjust until BEV lanes appear straight and parallel | + +--- + +## Competition Notes + +- The pipeline runs **frame-by-frame** with no temporal filtering โ€” adding a Kalman filter or frame-to-frame tracking would improve stability. +- BEV warping assumes a **flat, level ground plane**. Uneven terrain will introduce projection errors. +- Point clouds accumulate across frames (`all_points` list in `pipeline.py`) and are merged on exit โ€” useful for building a local map of the course. +- For real-time performance, consider downscaling input frames before processing. +- Lens undistortion is applied on every frame and every pixel projection. If `dist_coeffs` are inaccurate, straight lines will appear curved in the BEV โ€” re-run calibration with a checkerboard to obtain reliable coefficients. diff --git a/rover_ws/vision/homography.py b/rover_ws/vision/homography.py new file mode 100644 index 0000000..ba35db1 --- /dev/null +++ b/rover_ws/vision/homography.py @@ -0,0 +1,316 @@ +import cv2 +import numpy as np + + +class HomographyBEV: + + def __init__( + self, + K, + camera_height, + pitch_deg, + yaw_deg=0.0, + roll_deg=0.0, + image_size=None, + dist_coeffs=None + ): + + self.K = K.astype(np.float64) + + self.dist_coeffs = ( + np.zeros(5, dtype=np.float64) + if dist_coeffs is None + else dist_coeffs.astype(np.float64) + ) + + self.camera_height = camera_height + self.pitch_deg = pitch_deg + self.yaw_deg = yaw_deg + self.roll_deg = roll_deg + + self.pitch = np.deg2rad(pitch_deg) + self.yaw = np.deg2rad(yaw_deg) + self.roll = np.deg2rad(roll_deg) + + self.img_w = image_size[0] + self.img_h = image_size[1] + + self._build_extrinsics() + self._build_homography() + self._build_bev_scaling() + + # ========================================================= + # BUILD EXTRINSICS + # ========================================================= + + def _build_extrinsics(self): + + cp, sp = np.cos(self.pitch), np.sin(self.pitch) + cy, sy = np.cos(self.yaw), np.sin(self.yaw) + cr, sr = np.cos(self.roll), np.sin(self.roll) + + # Rotation around X-axis (pitch: tilts camera up/down) + R_pitch = np.array([ + [ 1, 0, 0], + [ 0, cp, -sp], + [ 0, sp, cp] + ], dtype=np.float64) + + # Rotation around Z-axis (yaw: rotates camera left/right) + R_yaw = np.array([ + [ cy, -sy, 0], + [ sy, cy, 0], + [ 0, 0, 1] + ], dtype=np.float64) + + # Rotation around Y-axis (roll: tilts camera sideways) + R_roll = np.array([ + [ cr, 0, sr], + [ 0, 1, 0], + [-sr, 0, cr] + ], dtype=np.float64) + + # Full world-to-camera rotation + R_world = R_pitch @ R_yaw @ R_roll + + # Mirror X so +X is rightward in the image + M = np.diag([-1.0, 1.0, 1.0]) + self.R = M @ R_world + + # Camera position in world frame + C = np.array([[0], [0], [self.camera_height]], dtype=np.float64) + self.t = -self.R @ C + + # ========================================================= + # BUILD HOMOGRAPHY + # ========================================================= + + def _build_homography(self): + + H = np.column_stack((self.R[:, 0], self.R[:, 1], self.t)) + + self.H = self.K @ H + self.H_inv = np.linalg.inv(self.H) + + # ========================================================= + # BUILD BEV SCALE + # ========================================================= + + def _build_bev_scaling(self): + + corners_px = np.array([ + [0, self.img_h * 0.5], + [self.img_w - 1, self.img_h * 0.5], + [0, self.img_h - 1], + [self.img_w - 1, self.img_h - 1], + ], dtype=np.float64) + + world_pts = [] + + for (u, v) in corners_px: + ground = self.H_inv @ np.array([u, v, 1.0]) + ground /= ground[2] + world_pts.append((ground[0], ground[1])) + + world_pts = np.array(world_pts) + + x_min = world_pts[:, 0].min() + x_max = world_pts[:, 0].max() + y_min = world_pts[:, 1].min() + y_max = world_pts[:, 1].max() + + self.out_w = self.img_w + self.out_h = self.img_h // 2 + + scale = min( + self.out_w / (x_max - x_min), + self.out_h / (y_max - y_min) + ) + + self.S = np.array([ + [scale, 0, -scale * x_min], + [0, -scale, scale * y_max], + [0, 0, 1 ] + ], dtype=np.float64) + + self.S_inv = np.linalg.inv(self.S) + self.H_bev = self.S @ self.H_inv + + # ========================================================= + # PIXEL -> GROUND + # ========================================================= + + def pixel_to_ground(self, u, v): + + pts = np.array([[[u, v]]], dtype=np.float32) + undist = cv2.undistortPoints(pts, self.K, self.dist_coeffs, P=self.K) + u2, v2 = undist[0, 0] + + pixel = np.array([u2, v2, 1.0], dtype=np.float64) + ground = self.H_inv @ pixel + ground /= ground[2] + + return ground[0], ground[1] + + # ========================================================= + # MULTIPLE PIXELS -> GROUND + # ========================================================= + + def pixels_to_ground(self, pixels): + + pixels = np.asarray(pixels, dtype=np.float32).reshape(-1, 1, 2) + undist = cv2.undistortPoints(pixels, self.K, self.dist_coeffs, P=self.K) + undist = undist.reshape(-1, 2).astype(np.float64) + + px = np.stack([ + undist[:, 0], + undist[:, 1], + np.ones(len(undist)) + ], axis=0) + + ground = self.H_inv @ px + ground /= ground[2] + + return np.stack([ground[0], ground[1]], axis=1) + + # ========================================================= + # MASK -> POINT CLOUD + # ========================================================= + + def mask_to_pointcloud(self, mask): + + ys, xs = np.where(mask > 0) + + pts = np.stack([xs, ys], axis=1).astype(np.float32).reshape(-1, 1, 2) + undist = cv2.undistortPoints(pts, self.K, self.dist_coeffs, P=self.K) + undist = undist.reshape(-1, 2).astype(np.float64) + + px = np.stack([ + undist[:, 0], + undist[:, 1], + np.ones(len(undist)) + ], axis=0) + + ground = self.H_inv @ px + ground /= ground[2] + + X = ground[0] + Y = ground[1] + Z = np.zeros_like(X) + + return np.stack([X, Y, Z], axis=1) + + # ========================================================= + # WARP IMAGE TO BEV + # ========================================================= + + def warp_to_bev(self, image): + + undistorted = cv2.undistort(image, self.K, self.dist_coeffs) + + bev = cv2.warpPerspective( + undistorted, + self.H_bev, + (self.out_w, self.out_h), + flags=cv2.INTER_LINEAR, + borderMode=cv2.BORDER_CONSTANT, + borderValue=(0, 0, 0) + ) + + return bev + + # ========================================================= + # SAVE PCD + # ========================================================= + + def save_pcd(self, points, filename): + + n = len(points) + + header = ( + f"# .PCD v0.7\n" + f"FIELDS x y z\n" + f"SIZE 4 4 4\n" + f"TYPE F F F\n" + f"COUNT 1 1 1\n" + f"WIDTH {n}\n" + f"HEIGHT 1\n" + f"VIEWPOINT 0 0 0 1 0 0 0\n" + f"POINTS {n}\n" + f"DATA ascii\n" + ) + + with open(filename, 'w') as f: + f.write(header) + np.savetxt(f, points, fmt="%.4f") + + print(f"Saved: {filename}") + + +# ============================================================= +# EXAMPLE USAGE +# ============================================================= +if __name__ == "__main__": + + image = cv2.imread("../data/raw/ground.jpeg") + h, w = image.shape[:2] + + K = np.array([ + [793.79768697, 0, 290.78702859], + [ 0, 813.96117996, 241.57106901], + [ 0, 0, 1 ] + ], dtype=np.float64) + + bev = HomographyBEV( + K=K, + camera_height=1.33, + pitch_deg=-45, + yaw_deg=-2, + roll_deg=-7, + image_size=(w, h), + dist_coeffs=np.array([ + -4.97661814e-01, + 8.05356640e+00, + 9.44660547e-03, + -2.64434172e-02, + -4.33974203e+01 + ]) + ) + + print(f"w={w}\nh={h}") + + # ========================================================= + # SINGLE PIXEL + # ========================================================= + + X, Y = bev.pixel_to_ground(585, 375) + print(f"Ground point: X={X:.3f}, Y={Y:.3f}") + + # ========================================================= + # WARP FULL IMAGE + # ========================================================= + + bird_eye = bev.warp_to_bev(image) + + # ========================================================= + # MASK -> POINT CLOUD + # ========================================================= + + gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + _, mask = cv2.threshold(gray, 120, 255, cv2.THRESH_BINARY) + + points = bev.mask_to_pointcloud(mask) + print("Point cloud shape:", points.shape) + + bev.save_pcd(points, "ground_plane.pcd") + + # ========================================================= + # DISPLAY + # ========================================================= + + cv2.imshow("Original", image) + cv2.imshow("Mask", mask) + cv2.imshow("Bird Eye", bird_eye) + + cv2.waitKey(0) + cv2.destroyAllWindows() \ No newline at end of file diff --git a/rover_ws/vision/pipeline.py b/rover_ws/vision/pipeline.py new file mode 100644 index 0000000..b312b0e --- /dev/null +++ b/rover_ws/vision/pipeline.py @@ -0,0 +1,206 @@ +import cv2 +import numpy as np + +from road_features_detector import RoadFeatureDetector + + +# ============================================================= +# HELPERS +# ============================================================= + +def lines_to_mask(lines, shape): + mask = np.zeros(shape[:2], dtype=np.uint8) + if lines is None: + return mask + for line in lines: + x1, y1, x2, y2 = line[0] + cv2.line(mask, (x1, y1), (x2, y2), 255, thickness=2) + return mask + + +def mask_to_pixels(mask): + ys, xs = np.where(mask > 0) + return np.stack([xs, ys], axis=1).astype(np.float64) + + +# ============================================================= +# PIPELINE CLASS +# ============================================================= + +class RoadFeatureBEVPipeline: + + def __init__(self, K, camera_height, pitch_deg,yaw_deg,roll_deg,dist_coeffs, image_size, min_radius=10, max_radius=200): + + self.detector = RoadFeatureDetector( + K=K, + camera_height=camera_height, + pitch_deg=pitch_deg, + yaw_deg=yaw_deg, + roll_deg=roll_deg, + dist_coeffs=dist_coeffs, + image_size=image_size, + min_radius=min_radius, + max_radius=max_radius + ) + + self.bev = self.detector.bev # reuse same HomographyBEV instance + + # ---------------------------------------------------------- + # PROCESS A SINGLE FRAME + # ---------------------------------------------------------- + + def process_frame(self, frame): + """ + Returns + ------- + output : annotated BGR frame (lanes + circles drawn) + bev_image : bird's-eye-view warp of the frame + lane_mask : rasterised detected-line mask + lane_points : (N, 3) float64 lane ground point cloud [X, Y, 0] + ground_circles : list of (X, Y, radius_m) center tuples + circle_clouds : list of (N, 3) arrays, one per detected circle + """ + + # 1. Run detector (lanes + circles) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + output, edges, lines, ground_circles, circle_clouds, _ = \ + self.detector.process(frame, draw_bev=False) + + # 2. Rasterise detected lines โ†’ mask โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + lane_mask = lines_to_mask(lines, frame.shape) + + # 3. Project lane mask pixels โ†’ ground plane โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + pixels = mask_to_pixels(lane_mask) + + if len(pixels) == 0: + lane_points = np.zeros((0, 3), dtype=np.float64) + else: + xy = self.bev.pixels_to_ground(pixels) + z = np.zeros((len(xy), 1), dtype=np.float64) + lane_points = np.hstack([xy, z]) + + # 4. BEV warp โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ + bev_image = self.bev.warp_to_bev(frame) + + return output, bev_image, lane_mask, lane_points, ground_circles, circle_clouds + + # ---------------------------------------------------------- + # SAVE HELPERS + # ---------------------------------------------------------- + + def save_pcd(self, points, filename="lane_cloud.pcd"): + self.bev.save_pcd(points, filename) + + # setters to allow dynamic reconfigure + def set_camera_height(self, value): + self.bev.camera_height = value # same object shared with detector, so no need to update there + + def set_pitch_deg(self, value): + self.bev.pitch_deg = value + + def set_yaw_deg(self, value): + self.bev.yaw_deg = value + + def set_roll_deg(self, value): + self.bev.roll_deg = value + + def set_dist_coeffs(self, value): + self.bev.dist_coeffs = value + + def set_min_radius(self, value): + self.detector.min_radius = value + + def set_max_radius(self, value): + self.detector.max_radius = value + + +# ============================================================= +# MAIN LOOP +# ============================================================= + +def main(): + + K = np.array([ + [793.79768697, 0, 290.78702859], + [ 0, 813.96117996, 241.57106901], + [ 0, 0, 1], + ], dtype=np.float64) + + cap = cv2.VideoCapture("../data/raw/test_lane.mp4") + + if not cap.isOpened(): + print("Cannot open video file") + return + + ret, first_frame = cap.read() + if not ret: + print("Cannot read video") + return + + h, w = first_frame.shape[:2] + cap.set(cv2.CAP_PROP_POS_FRAMES, 0) + + pipeline = RoadFeatureBEVPipeline( + K=K, + camera_height=1.33, + pitch_deg=-45, + yaw_deg=-2, + roll_deg=-7, + image_size=(w, h), + dist_coeffs=np.array([ + -4.97661814e-01, + 8.05356640e+00, + 9.44660547e-03, + -2.64434172e-02, + -4.33974203e+01 + ]) + ) + frame_idx = 0 + all_points = [] + + while True: + + ret, frame = cap.read() + if not ret: + cap.set(cv2.CAP_PROP_POS_FRAMES, 0) + continue + + output, bev_image, lane_mask, lane_points, ground_circles, circle_clouds = \ + pipeline.process_frame(frame) + + print(f"[frame {frame_idx:04d}] " + f"lane pixels: {len(lane_points):6d} " + f"circles: {len(ground_circles)}") + + for i, (cloud, (X, Y, rm)) in enumerate(zip(circle_clouds, ground_circles)): + print(f" circle {i+1}: center=({X:.2f}m, {Y:.2f}m) " + f"r={rm:.2f}m cloud_pts={len(cloud)}") + + #if frame_idx % 30 == 0 and len(lane_points) > 0: + # pipeline.save_pcd(lane_points, f"lane_cloud_{frame_idx:04d}.pcd") + + all_points.append(lane_points) + + cv2.imshow("Road Features", output) + cv2.imshow("Lane Mask", lane_mask) + cv2.imshow("BEV", bev_image) + + key = cv2.waitKey(1) + if key == 27: + break + if key == ord('s') and len(lane_points) > 0: + pipeline.save_pcd(lane_points, f"lane_cloud_manual_{frame_idx:04d}.pcd") + print(f" โ†’ saved lane_cloud_manual_{frame_idx:04d}.pcd") + + frame_idx += 1 + + if all_points: + merged = np.vstack([p for p in all_points if len(p) > 0]) + pipeline.save_pcd(merged, "lane_cloud_full.pcd") + print(f"Saved merged cloud: {len(merged)} points") + + cap.release() + cv2.destroyAllWindows() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/rover_ws/vision/requirements.txt b/rover_ws/vision/requirements.txt new file mode 100644 index 0000000..d385a52 --- /dev/null +++ b/rover_ws/vision/requirements.txt @@ -0,0 +1,2 @@ +numpy==2.4.4 +opencv-contrib-python==4.13.0.92 diff --git a/rover_ws/vision/road_features_detector.py b/rover_ws/vision/road_features_detector.py new file mode 100644 index 0000000..320fb45 --- /dev/null +++ b/rover_ws/vision/road_features_detector.py @@ -0,0 +1,269 @@ +import cv2 +import numpy as np +from cv2 import ximgproc +from homography import HomographyBEV + + +class RoadFeatureDetector: + + def __init__(self, K, camera_height, pitch_deg,yaw_deg,roll_deg, image_size, dist_coeffs=None, + min_radius=10, max_radius=200): + + self.bev = HomographyBEV( + K=np.array(K, dtype=np.float64), + camera_height=float(camera_height), + pitch_deg=float(pitch_deg), + yaw_deg=float(yaw_deg), + roll_deg=float(roll_deg), + image_size=tuple(image_size), + dist_coeffs=None if dist_coeffs is None else np.array(dist_coeffs, dtype=np.float64) + ) + + self.min_radius = min_radius + self.max_radius = max_radius + + self.lower_white = np.array([0, 0, 200]) + self.upper_white = np.array([180, 50, 255]) + self.kernel = np.ones((5, 5), np.uint8) + + # ====================================================== + # ROI (kept separate, NOT used in pipeline by default) + # ====================================================== + def region_of_interest(self, frame): + + height, width = frame.shape[:2] + + polygon = np.array([[ + (0, height), + (width, height), + (width // 2 + 150, height // 2), + (width // 2 - 150, height // 2) + ]], np.int32) + + mask = np.zeros_like(frame) + cv2.fillPoly(mask, polygon, 255) + + return cv2.bitwise_and(frame, mask) + + # ====================================================== + # EDGE DETECTION + # ====================================================== + def detect_edges(self, frame): + + blur = cv2.GaussianBlur(frame, (5, 5), 0) + hsv = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV) + + mask = cv2.inRange(hsv, self.lower_white, self.upper_white) + + mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, self.kernel) + mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, self.kernel) + + thin = ximgproc.thinning(mask) + + return thin, mask + + # ====================================================== + # HOUGH LINES + # ====================================================== + def detect_lines(self, edges): + + lines = cv2.HoughLinesP( + edges, + 1, + np.pi / 180, + 50, + minLineLength=50, + maxLineGap=30 + ) + + return lines + + # ====================================================== + # DRAW LINES + # ====================================================== + def draw_lines(self, frame, lines): + + if lines is None: + return frame + + for line in lines: + x1, y1, x2, y2 = line[0] + cv2.line(frame, (x1, y1), (x2, y2), (0, 255, 0), 5) + + return frame + + # ====================================================== + # CIRCLE DETECTION + # ====================================================== + def _detect_circles(self, image, white_mask): + + blurred = cv2.GaussianBlur(white_mask, (9, 9), 2) + circles = cv2.HoughCircles( + blurred, cv2.HOUGH_GRADIENT, dp=1.2, minDist=30, + param1=50, param2=30, + minRadius=self.min_radius, maxRadius=self.max_radius + ) + + detected = [] + if circles is not None: + for x, y, r in np.round(circles[0]).astype(int): + y0, y1 = max(0, y - r), min(image.shape[0], y + r) + x0, x1 = max(0, x - r), min(image.shape[1], x + r) + roi = white_mask[y0:y1, x0:x1] + if roi.size == 0: + continue + white_ratio = np.sum(roi > 0) / roi.size + if white_ratio > 0.35: + detected.append((int(x), int(y), int(r))) + + # Contour fallback + contours, _ = cv2.findContours(white_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + for cnt in contours: + area = cv2.contourArea(cnt) + if area < np.pi * (self.min_radius ** 2) or area > np.pi * (self.max_radius ** 2): + continue + perimeter = cv2.arcLength(cnt, True) + if perimeter == 0: + continue + circularity = 4 * np.pi * area / (perimeter * perimeter) + if circularity < 0.6: + continue + (cx, cy), radius = cv2.minEnclosingCircle(cnt) + if self.min_radius <= radius <= self.max_radius: + detected.append((int(cx), int(cy), int(radius))) + + # Merge close duplicates + merged = [] + for x, y, r in detected: + dup = False + for mx, my, mr in merged: + if np.hypot(mx - x, my - y) < 20: + dup = True + break + if not dup: + merged.append((x, y, r)) + + return merged + + # ====================================================== + # MAP CIRCLE CENTER TO GROUND + # ====================================================== + def circle_to_ground(self, circle): + + x, y, r = circle + X, Y = self.bev.pixel_to_ground(x, y) + Xp, Yp = self.bev.pixel_to_ground(x + r, y) + radius_m = np.hypot(Xp - X, Yp - Y) + + return (X, Y, radius_m) + + # ====================================================== + # CIRCLE POINT CLOUD + # ====================================================== + def circle_to_ground_cloud(self, circle, num_points=36, filled=False): + + x, y, r = circle + X, Y = self.bev.pixel_to_ground(x, y) + Xp, Yp = self.bev.pixel_to_ground(x + r, y) + radius_m = np.hypot(Xp - X, Yp - Y) + + angles = np.linspace(0, 2 * np.pi, num_points, endpoint=False) + + if filled: + radii = np.linspace(0, radius_m, num=10) + points = [ + (X + rr * np.cos(a), Y + rr * np.sin(a), 0.0) + for rr in radii for a in angles + ] + else: + points = [ + (X + radius_m * np.cos(a), Y + radius_m * np.sin(a), 0.0) + for a in angles + ] + + return np.array(points, dtype=np.float64) # (N, 3) + + # ====================================================== + # FULL PIPELINE + # ====================================================== + def process(self, frame, draw_bev=False): + + # --- Lanes --- + edges, white_mask = self.detect_edges(frame) + lines = self.detect_lines(edges) + output = self.draw_lines(frame.copy(), lines) + + # --- Circles --- + circles = self._detect_circles(frame, white_mask) + ground_circles = [self.circle_to_ground(c) for c in circles] + circle_clouds = [self.circle_to_ground_cloud(c) for c in circles] + + for x, y, r in circles: + cv2.circle(output, (x, y), r, (0, 255, 0), 2) + + bev_image = None + if draw_bev: + bev_image = self.bev.warp_to_bev(output) + + return output, edges, lines, ground_circles, circle_clouds, bev_image + + +# ====================================================== +# MAIN LOOP +# ====================================================== +if __name__ == "__main__": + + K = np.array([ + [1000, 0, 960], + [0, 1000, 540], + [0, 0, 1] + ], dtype=np.float64) + + camera_height = 1.2 + pitch_deg = -30 + + cap = cv2.VideoCapture("../data/raw/test_lane.mp4") + if not cap.isOpened(): + print("Cannot open video file") + exit() + + ret, frame = cap.read() + if not ret: + print("Cannot read frame from camera") + exit() + + img_h, img_w = frame.shape[:2] + cap.set(cv2.CAP_PROP_POS_FRAMES, 0) + + detector = RoadFeatureDetector( + K=K, + camera_height=camera_height, + pitch_deg=pitch_deg, + image_size=(img_w, img_h), + yaw_deg=0, + roll_deg=0 + ) + + while True: + ret, frame = cap.read() + if not ret: + cap.set(cv2.CAP_PROP_POS_FRAMES, 0) + continue + + output, edges, lines, ground_circles, circle_clouds, bev = \ + detector.process(frame, draw_bev=True) + + print("Ground circles:", ground_circles) + for i, cloud in enumerate(circle_clouds): + print(f" circle {i+1} cloud points: {len(cloud)}") + + cv2.imshow("Road Features", output) + cv2.imshow("Edges", edges) + if bev is not None: + cv2.imshow("BEV", bev) + + if cv2.waitKey(1) == 27: + break + + cap.release() + cv2.destroyAllWindows() \ No newline at end of file