|
""" |
|
Visual Odometry for Drone Navigation |
|
===================================== |
|
A lightweight 2D visual odometry system for aerial drone navigation. |
|
Built for the 2024 AI Transportation Competition. |
|
|
|
This module estimates drone position using only a downward-facing camera, |
|
solving two key challenges: |
|
1. Monocular Scale Ambiguity - using GPS calibration |
|
2. Coordinate System Alignment - using brute-force auto-alignment |
|
|
|
Author: myavuzokumus |
|
License: MIT |
|
""" |
|
|
|
import cv2 |
|
import numpy as np |
|
from undistort_image import undistort_image |
|
from const import CameraParams |
|
|
|
|
|
class DroneTrajectoryEstimator: |
|
""" |
|
Estimates drone trajectory using monocular visual odometry. |
|
|
|
The algorithm works in two phases: |
|
1. Calibration Phase (GPS available): Collects data to calculate scale factor |
|
and determine correct coordinate transformation |
|
2. Navigation Phase (GPS unavailable): Uses calibrated parameters to estimate |
|
position from visual features alone |
|
|
|
Attributes: |
|
MATCH_DISTANCE_THRESHOLD: Ratio threshold for filtering good feature matches |
|
RANSAC_REPROJ_THRESHOLD: RANSAC reprojection error threshold in pixels |
|
""" |
|
|
|
# Matching distance threshold (Lowe's ratio test) |
|
MATCH_DISTANCE_THRESHOLD = 0.75 |
|
|
|
# RANSAC reprojection threshold for outlier rejection |
|
RANSAC_REPROJ_THRESHOLD = 3.0 |
|
|
|
def __init__(self): |
|
# Previous frame's keypoints and descriptors for matching |
|
self.prev_keypoints = None |
|
self.prev_des = None |
|
|
|
# ORB (Oriented FAST and Rotated BRIEF) feature detector |
|
# - nfeatures: Maximum number of features to retain |
|
# - scaleFactor: Pyramid decimation ratio (>1) |
|
# - nlevels: Number of pyramid levels |
|
# - WTA_K: Number of points for oriented BRIEF descriptor |
|
self.orb = cv2.ORB_create( |
|
nfeatures=1000, |
|
scaleFactor=1.2, |
|
nlevels=8, |
|
edgeThreshold=31, |
|
firstLevel=0, |
|
WTA_K=2, |
|
scoreType=cv2.ORB_HARRIS_SCORE, |
|
patchSize=31, |
|
fastThreshold=20 |
|
) |
|
|
|
# Scale factor: converts pixel displacement to meters |
|
# Calculated during calibration phase using GPS ground truth |
|
self.scale = None |
|
|
|
# Current estimated position [x, y] in meters |
|
self.current_position = np.zeros(2) |
|
|
|
# GPS positions recorded during calibration (ground truth) |
|
self.real_recorded_positions = np.empty((0, 2)) |
|
|
|
# Visual odometry positions in pixel space (before scaling) |
|
self.vo_recorded_positions = np.zeros((1, 2)) |
|
|
|
# Best coordinate transformation found during calibration |
|
# Format: (flip_x, flip_y, switch_axes) |
|
self.best_transformation = (1, 1, False) |
|
|
|
def process_frame(self, frame, health_status, gt_translation=None): |
|
""" |
|
Process a single frame and update position estimate. |
|
|
|
Args: |
|
frame: BGR image from drone camera |
|
health_status: 1 = GPS available, 0 = GPS unavailable |
|
gt_translation: Ground truth position [x, y] from GPS (if available) |
|
|
|
Returns: |
|
np.ndarray: Current estimated position [x, y] in meters |
|
""" |
|
# Convert to grayscale for feature detection |
|
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
# First frame: initialize features, no motion to estimate yet |
|
if self.prev_keypoints is None and self.prev_des is None: |
|
img_undistorted = undistort_image(frame) |
|
self.prev_keypoints, self.prev_des = self.orb.detectAndCompute( |
|
img_undistorted, None |
|
) |
|
return self.current_position |
|
|
|
# Remove lens distortion for accurate feature positions |
|
img_undistorted = undistort_image(frame) |
|
|
|
# Detect features in current frame |
|
keypoints, des = self.orb.detectAndCompute(img_undistorted, None) |
|
|
|
# Match features between previous and current frame |
|
matches = self.match_features(self.prev_des, des) |
|
|
|
# Filter matches using distance ratio test |
|
good_matches = self.filter_good_matches(matches) |
|
|
|
# Extract matched point coordinates |
|
src_pts, dst_pts = self.get_matched_points(good_matches, keypoints) |
|
|
|
# Estimate 2D affine transformation (rotation + translation + scale) |
|
# Using RANSAC for robustness against outliers |
|
M, inliers = cv2.estimateAffinePartial2D( |
|
src_pts, dst_pts, |
|
method=cv2.RANSAC, |
|
ransacReprojThreshold=self.RANSAC_REPROJ_THRESHOLD |
|
) |
|
|
|
# End of calibration phase: calculate scale factor |
|
if self.scale is None and health_status == 0: |
|
self.calculate_scale() |
|
self.current_position = self.real_recorded_positions[-1] |
|
|
|
# Update position using affine transformation |
|
if M is not None: |
|
self.update_position(M) |
|
|
|
# Store current frame data for next iteration |
|
self.prev_keypoints = keypoints |
|
self.prev_des = des |
|
|
|
# During calibration: record GPS ground truth |
|
if health_status == 1 and gt_translation is not None: |
|
self.real_recorded_positions = np.concatenate( |
|
(self.real_recorded_positions, gt_translation.copy().reshape(1, -1)), |
|
axis=0 |
|
) |
|
self.current_position = gt_translation |
|
|
|
return self.current_position |
|
|
|
def filter_good_matches(self, matches): |
|
""" |
|
Filter matches using adaptive distance threshold. |
|
|
|
Uses mean distance as baseline - matches significantly worse than |
|
average are likely incorrect. |
|
|
|
Args: |
|
matches: List of cv2.DMatch objects |
|
|
|
Returns: |
|
List of filtered matches |
|
""" |
|
mean_distance = np.mean([match.distance for match in matches]) |
|
return [m for m in matches if m.distance < self.MATCH_DISTANCE_THRESHOLD * mean_distance] |
|
|
|
def get_matched_points(self, good_matches, keypoints): |
|
""" |
|
Extract point coordinates from matches. |
|
|
|
Args: |
|
good_matches: Filtered list of cv2.DMatch objects |
|
keypoints: Keypoints from current frame |
|
|
|
Returns: |
|
Tuple of (source_points, destination_points) as numpy arrays |
|
""" |
|
src_pts = np.float32( |
|
[self.prev_keypoints[m.queryIdx].pt for m in good_matches] |
|
).reshape(-1, 1, 2) |
|
|
|
dst_pts = np.float32( |
|
[keypoints[m.trainIdx].pt for m in good_matches] |
|
).reshape(-1, 1, 2) |
|
|
|
return src_pts, dst_pts |
|
|
|
def calculate_scale(self): |
|
""" |
|
Calculate scale factor using GPS ground truth. |
|
|
|
Compares total displacement in pixel space (VO) against |
|
real-world displacement (GPS) to derive meters-per-pixel ratio. |
|
|
|
This solves the monocular scale ambiguity problem by using |
|
the calibration phase GPS data as reference. |
|
""" |
|
# Real-world displacement from GPS (meters) |
|
delta_x_real = (self.real_recorded_positions[-1][0] - |
|
self.real_recorded_positions[0][0]) |
|
delta_y_real = (self.real_recorded_positions[-1][1] - |
|
self.real_recorded_positions[0][1]) |
|
|
|
# Visual displacement in pixels (with axis correction applied) |
|
corrected_data = self.correct_sign_and_axis(self.vo_recorded_positions[-1]) |
|
delta_x_vo = corrected_data[0] |
|
delta_y_vo = corrected_data[1] |
|
|
|
# Scale = meters / pixels (separate for X and Y axes) |
|
self.scale = np.array([ |
|
abs(delta_x_real / delta_x_vo), |
|
abs(delta_y_real / delta_y_vo) |
|
]) |
|
|
|
def update_position(self, M): |
|
""" |
|
Update position estimate using affine transformation matrix. |
|
|
|
The affine matrix M has structure: |
|
| a b tx | |
|
| c d ty | |
|
|
|
Where tx = M[0,2] and ty = M[1,2] are the translation components. |
|
|
|
Args: |
|
M: 2x3 affine transformation matrix from cv2.estimateAffinePartial2D |
|
""" |
|
if self.scale is None: |
|
# Calibration phase: accumulate raw pixel displacements |
|
vo_current = self.vo_recorded_positions[-1].copy() |
|
vo_current[0] += M[0, 2] # tx: horizontal displacement |
|
vo_current[1] += M[1, 2] # ty: vertical displacement |
|
|
|
self.vo_recorded_positions = np.concatenate( |
|
(self.vo_recorded_positions, vo_current.reshape(1, -1)), |
|
axis=0 |
|
) |
|
else: |
|
# Navigation phase: convert pixels to meters using scale |
|
corrected = self.apply_transformations(np.array([M[0, 2], M[1, 2]])) |
|
self.current_position[0] += corrected[0] * self.scale[0] |
|
self.current_position[1] += corrected[1] * self.scale[1] |
|
|
|
def match_features(self, desc1, desc2, method='bf'): |
|
""" |
|
Match feature descriptors between two frames. |
|
|
|
Args: |
|
desc1: Descriptors from previous frame |
|
desc2: Descriptors from current frame |
|
method: 'bf' for BruteForce, 'flann' for FLANN-based matching |
|
|
|
Returns: |
|
List of cv2.DMatch objects, sorted by distance |
|
""" |
|
if method == 'bf': |
|
# BruteForce matcher with Hamming distance (for binary descriptors) |
|
matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) |
|
elif method == 'flann': |
|
# FLANN-based matcher (faster for large datasets) |
|
FLANN_INDEX_LSH = 6 |
|
index_params = dict( |
|
algorithm=FLANN_INDEX_LSH, |
|
table_number=6, |
|
key_size=12, |
|
multi_probe_level=1 |
|
) |
|
search_params = dict(checks=50) |
|
matcher = cv2.FlannBasedMatcher(index_params, search_params) |
|
else: |
|
raise ValueError(f"Unsupported matching method: {method}") |
|
|
|
matches = matcher.match(desc1, desc2) |
|
return sorted(matches, key=lambda x: x.distance) |
|
|
|
def undistort_points(self, points): |
|
""" |
|
Remove lens distortion from keypoint coordinates. |
|
|
|
Args: |
|
points: List of cv2.KeyPoint objects |
|
|
|
Returns: |
|
np.ndarray: Undistorted point coordinates |
|
""" |
|
points = np.array([kp.pt for kp in points], dtype=np.float32) |
|
points_undistorted = cv2.undistortPoints( |
|
np.expand_dims(points, axis=1), |
|
CameraParams['IntrinsicMatrix'], |
|
CameraParams['DistCoeffs'], |
|
None, |
|
CameraParams['IntrinsicMatrix'] |
|
) |
|
return points_undistorted[:, 0, :] |
|
|
|
def correct_sign_and_axis(self, sim_point): |
|
""" |
|
Find optimal coordinate transformation by brute-force search. |
|
|
|
The camera coordinate system may not align with GPS coordinates: |
|
- X might be inverted |
|
- Y might be inverted |
|
- X and Y might be swapped |
|
|
|
This method tests all 8 possible combinations (2×2×2) and selects |
|
the one that minimizes error against GPS ground truth. |
|
|
|
Args: |
|
sim_point: Point in visual odometry coordinate space |
|
|
|
Returns: |
|
np.ndarray: Transformed point that best matches GPS coordinates |
|
""" |
|
min_distance = float('inf') |
|
best_correction = sim_point |
|
|
|
# Test all 8 geometric transformations |
|
for flip_x in [-1, 1]: # X-axis sign |
|
for flip_y in [-1, 1]: # Y-axis sign |
|
for switch_axes in [False, True]: # Swap X and Y |
|
|
|
# Apply candidate transformation |
|
test_point = np.array([ |
|
sim_point[0] * flip_x, |
|
sim_point[1] * flip_y |
|
]) |
|
if switch_axes: |
|
test_point = test_point[::-1] |
|
|
|
# Calculate error against GPS ground truth |
|
distances = np.linalg.norm( |
|
self.real_recorded_positions - test_point, axis=1 |
|
) |
|
distance = np.min(distances) |
|
|
|
# Keep transformation with minimum error |
|
if distance < min_distance: |
|
min_distance = distance |
|
best_correction = test_point |
|
self.best_transformation = (flip_x, flip_y, switch_axes) |
|
|
|
return np.array(best_correction) |
|
|
|
def apply_transformations(self, pixel_displacement): |
|
""" |
|
Apply the locked-in coordinate transformation. |
|
|
|
After calibration, the best transformation is fixed and applied |
|
to all subsequent measurements. |
|
|
|
Args: |
|
pixel_displacement: Raw [dx, dy] from affine matrix |
|
|
|
Returns: |
|
np.ndarray: Corrected displacement in GPS coordinate system |
|
""" |
|
flip_x, flip_y, switch_axes = self.best_transformation |
|
|
|
corrected = np.array([ |
|
pixel_displacement[0] * flip_x, |
|
pixel_displacement[1] * flip_y |
|
]) |
|
|
|
if switch_axes: |
|
corrected = corrected[::-1] |
|
|
|
return corrected |
|
|
|
def show_result(self, frame1, frame2, keypoints1, keypoints2, matches): |
|
""" |
|
Visualize feature matches between two frames (for debugging). |
|
|
|
Args: |
|
frame1, frame2: Input images |
|
keypoints1, keypoints2: Detected keypoints |
|
matches: List of matches to visualize |
|
""" |
|
# Resize for display |
|
scale_percent = 50 |
|
width = int(frame1.shape[1] * scale_percent / 100) |
|
height = int(frame2.shape[0] * scale_percent / 100) |
|
dim = (width, height) |
|
|
|
resized_img1 = cv2.resize(frame1, dim, interpolation=cv2.INTER_AREA) |
|
resized_img2 = cv2.resize(frame2, dim, interpolation=cv2.INTER_AREA) |
|
|
|
# Draw top 10 matches |
|
img_matches = cv2.drawMatches( |
|
resized_img1, keypoints1, |
|
resized_img2, keypoints2, |
|
matches[:10], None, |
|
flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS |
|
) |
|
|
|
cv2.imshow('Matches', img_matches) |
|
cv2.waitKey(0) |
|
cv2.destroyAllWindows() |