sensor-fusion

تایید شده

Expert skill for multi-sensor fusion and state estimation using Kalman filtering. Implement EKF/UKF, configure robot_localization, fuse IMU, GPS, odometry, and visual sensors for robust localization.

@a5c-ai
MIT۱۴۰۴/۱۲/۳
(0)
۲۴۲
۱
۲

نصب مهارت

مهارت‌ها کدهای شخص ثالث از مخازن عمومی GitHub هستند. SkillHub الگوهای مخرب شناخته‌شده را اسکن می‌کند اما نمی‌تواند امنیت را تضمین کند. قبل از نصب، کد منبع را بررسی کنید.

نصب سراسری (سطح کاربر):

npx skillhub install a5c-ai/babysitter/sensor-fusion

نصب در پروژه فعلی:

npx skillhub install a5c-ai/babysitter/sensor-fusion --project

مسیر پیشنهادی: ~/.claude/skills/sensor-fusion/

محتوای SKILL.md

---
name: sensor-fusion
description: Expert skill for multi-sensor fusion and state estimation using Kalman filtering. Implement EKF/UKF, configure robot_localization, fuse IMU, GPS, odometry, and visual sensors for robust localization.
allowed-tools: Bash(*) Read Write Edit Glob Grep WebFetch
metadata:
  author: babysitter-sdk
  version: "1.0.0"
  category: state-estimation
  backlog-id: SK-009
---

# sensor-fusion

You are **sensor-fusion** - a specialized skill for multi-sensor fusion and state estimation using Kalman filtering and factor graph optimization.

## Overview

This skill enables AI-powered sensor fusion including:
- Implementing Extended Kalman Filter (EKF) for state estimation
- Configuring Unscented Kalman Filter (UKF) for nonlinear systems
- Setting up robot_localization package configuration
- Implementing IMU preintegration and bias estimation
- Configuring GPS/RTK integration with local coordinate frames
- Implementing wheel odometry fusion with slip compensation
- Setting up visual odometry integration
- Configuring outlier rejection (Mahalanobis, chi-squared)
- Tuning process and measurement noise covariances
- Implementing sensor delay compensation

## Prerequisites

- ROS2 with robot_localization package
- Calibrated sensors (IMU, cameras, wheel encoders)
- Understanding of coordinate frames (REP-105)
- Sensor noise characteristics

## Capabilities

### 1. robot_localization Configuration

Configure the ROS2 robot_localization package for EKF/UKF:

```yaml
# ekf_localization.yaml
ekf_filter_node:
  ros__parameters:
    # Coordinate frames
    map_frame: map
    odom_frame: odom
    base_link_frame: base_link
    world_frame: odom

    # Frequencies
    frequency: 50.0
    sensor_timeout: 0.1
    two_d_mode: false

    # Transform settings
    transform_time_offset: 0.0
    transform_timeout: 0.0
    print_diagnostics: true
    debug: false
    publish_tf: true
    publish_acceleration: false

    # IMU input
    imu0: /imu/data
    imu0_config: [false, false, false,  # x, y, z position
                  true,  true,  true,   # roll, pitch, yaw
                  false, false, false,  # vx, vy, vz
                  true,  true,  true,   # vroll, vpitch, vyaw
                  true,  true,  true]   # ax, ay, az
    imu0_differential: false
    imu0_relative: false
    imu0_queue_size: 10
    imu0_remove_gravitational_acceleration: true

    # Wheel odometry input
    odom0: /wheel_odom
    odom0_config: [true,  true,  false,  # x, y, z position
                   false, false, true,   # roll, pitch, yaw
                   true,  true,  false,  # vx, vy, vz
                   false, false, true,   # vroll, vpitch, vyaw
                   false, false, false]  # ax, ay, az
    odom0_differential: false
    odom0_relative: false
    odom0_queue_size: 10

    # GPS input (for map frame)
    # odom1: /gps/odom
    # odom1_config: [true,  true,  true,
    #                false, false, false,
    #                false, false, false,
    #                false, false, false,
    #                false, false, false]
    # odom1_differential: false

    # Process noise covariance (Q matrix diagonal)
    process_noise_covariance: [
      0.05,   # x
      0.05,   # y
      0.06,   # z
      0.03,   # roll
      0.03,   # pitch
      0.06,   # yaw
      0.025,  # vx
      0.025,  # vy
      0.04,   # vz
      0.01,   # vroll
      0.01,   # vpitch
      0.02,   # vyaw
      0.01,   # ax
      0.01,   # ay
      0.015   # az
    ]

    # Initial estimate covariance (P0 matrix diagonal)
    initial_estimate_covariance: [
      1e-9, 1e-9, 1e-9,     # position
      1e-9, 1e-9, 1e-9,     # orientation
      1e-9, 1e-9, 1e-9,     # velocity
      1e-9, 1e-9, 1e-9,     # angular velocity
      1e-9, 1e-9, 1e-9      # acceleration
    ]
```

### 2. Two-EKF Setup (Odom + Map Frames)

Configure two EKF instances for continuous odometry and global localization:

```yaml
# dual_ekf_navsat.yaml

# EKF for continuous odometry (odom frame)
ekf_filter_node_odom:
  ros__parameters:
    frequency: 50.0
    two_d_mode: false

    map_frame: map
    odom_frame: odom
    base_link_frame: base_link
    world_frame: odom

    # Fuse IMU and wheel odometry only
    imu0: /imu/data
    imu0_config: [false, false, false,
                  true,  true,  true,
                  false, false, false,
                  true,  true,  true,
                  true,  true,  true]
    imu0_remove_gravitational_acceleration: true

    odom0: /wheel_odom
    odom0_config: [true,  true,  false,
                   false, false, true,
                   true,  true,  false,
                   false, false, true,
                   false, false, false]

# EKF for global localization (map frame)
ekf_filter_node_map:
  ros__parameters:
    frequency: 50.0
    two_d_mode: false

    map_frame: map
    odom_frame: odom
    base_link_frame: base_link
    world_frame: map

    # Fuse odometry output and GPS
    odom0: /odometry/filtered
    odom0_config: [true,  true,  true,
                   true,  true,  true,
                   true,  true,  true,
                   true,  true,  true,
                   true,  true,  true]

    odom1: /gps/odom
    odom1_config: [true,  true,  true,
                   false, false, false,
                   false, false, false,
                   false, false, false,
                   false, false, false]
    odom1_differential: false

# NavSat transform node
navsat_transform_node:
  ros__parameters:
    frequency: 50.0
    delay: 0.0
    magnetic_declination_radians: 0.0
    yaw_offset: 0.0
    zero_altitude: true
    broadcast_utm_transform: true
    publish_filtered_gps: true
    use_odometry_yaw: false
    wait_for_datum: false
```

### 3. Custom EKF Implementation

Implement a custom EKF for state estimation:

```python
import numpy as np
from scipy.linalg import block_diag

class RobotEKF:
    """Extended Kalman Filter for robot localization."""

    def __init__(self, dt=0.02):
        self.dt = dt

        # State: [x, y, z, roll, pitch, yaw, vx, vy, vz, wx, wy, wz]
        self.n_states = 12

        # State vector
        self.x = np.zeros(self.n_states)

        # State covariance
        self.P = np.eye(self.n_states) * 0.1

        # Process noise
        self.Q = np.diag([
            0.01, 0.01, 0.01,    # position noise
            0.001, 0.001, 0.001, # orientation noise
            0.1, 0.1, 0.1,       # velocity noise
            0.01, 0.01, 0.01     # angular velocity noise
        ])

    def predict(self, u=None):
        """Predict step using motion model."""
        dt = self.dt
        x, y, z = self.x[0:3]
        roll, pitch, yaw = self.x[3:6]
        vx, vy, vz = self.x[6:9]
        wx, wy, wz = self.x[9:12]

        # State transition (constant velocity model)
        # Position update
        self.x[0] += vx * np.cos(yaw) * dt - vy * np.sin(yaw) * dt
        self.x[1] += vx * np.sin(yaw) * dt + vy * np.cos(yaw) * dt
        self.x[2] += vz * dt

        # Orientation update
        self.x[3] += wx * dt
        self.x[4] += wy * dt
        self.x[5] += wz * dt

        # Jacobian of state transition
        F = self._compute_jacobian()

        # Covariance prediction
        self.P = F @ self.P @ F.T + self.Q

    def _compute_jacobian(self):
        """Compute Jacobian of state transition."""
        dt = self.dt
        yaw = self.x[5]
        vx, vy = self.x[6:8]

        F = np.eye(self.n_states)

        # Position derivatives w.r.t. yaw
        F[0, 5] = -vx * np.sin(yaw) * dt - vy * np.cos(yaw) * dt
        F[1, 5] = vx * np.cos(yaw) * dt - vy * np.sin(yaw) * dt

        # Position derivatives w.r.t. velocity
        F[0, 6] = np.cos(yaw) * dt
        F[0, 7] = -np.sin(yaw) * dt
        F[1, 6] = np.sin(yaw) * dt
        F[1, 7] = np.cos(yaw) * dt
        F[2, 8] = dt

        # Orientation derivatives w.r.t. angular velocity
        F[3, 9] = dt
        F[4, 10] = dt
        F[5, 11] = dt

        return F

    def update_imu(self, imu_data):
        """Update with IMU measurement (orientation, angular velocity, acceleration)."""
        # Measurement: [roll, pitch, yaw, wx, wy, wz, ax, ay, az]
        z = np.array([
            imu_data['roll'], imu_data['pitch'], imu_data['yaw'],
            imu_data['wx'], imu_data['wy'], imu_data['wz']
        ])

        # Measurement matrix
        H = np.zeros((6, self.n_states))
        H[0:3, 3:6] = np.eye(3)  # Orientation
        H[3:6, 9:12] = np.eye(3)  # Angular velocity

        # Measurement noise
        R = np.diag([0.01, 0.01, 0.02, 0.001, 0.001, 0.001])

        self._ekf_update(z, H, R)

    def update_odom(self, odom_data):
        """Update with odometry measurement (velocity)."""
        z = np.array([odom_data['vx'], odom_data['vy'], odom_data['wz']])

        # Measurement matrix
        H = np.zeros((3, self.n_states))
        H[0, 6] = 1  # vx
        H[1, 7] = 1  # vy
        H[2, 11] = 1  # wz

        # Measurement noise
        R = np.diag([0.05, 0.05, 0.02])

        self._ekf_update(z, H, R)

    def update_gps(self, gps_data):
        """Update with GPS measurement (position)."""
        z = np.array([gps_data['x'], gps_data['y'], gps_data['z']])

        # Measurement matrix
        H = np.zeros((3, self.n_states))
        H[0:3, 0:3] = np.eye(3)

        # Measurement noise (GPS typically 1-5m accuracy)
        R = np.diag([2.0, 2.0, 5.0])

        # Outlier rejection using Mahalanobis distance
        y = z - H @ self.x
        S = H @ self.P @ H.T + R
        mahal_dist = np.sqrt(y.T @ np.linalg.inv(S) @ y)

        if mahal_dist < 5.0:  # Chi-squared threshold
            self._ekf_update(z, H, R)
        else:
            print(f"GPS outlier rejected: Mahalanobis distance = {mahal_dist:.2f}")

    def _ekf_update(self, z, H, R):
        """Standard EKF update step."""
        # Innovation
        y = z - H @ self.x

        # Innovation covariance
        S = H @ self.P @ H.T + R

        # Kalman gain
        K = self.P @ H.T @ np.linalg.inv(S)

        # State update
        self.x = self.x + K @ y

        # Covariance update (Joseph form for numerical stability)
        I_KH = np.eye(self.n_states) - K @ H
        self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T

    def get_state(self):
        """Return current state estimate."""
        return {
            'position': self.x[0:3],
            'orientation': self.x[3:6],
            'velocity': self.x[6:9],
            'angular_velocity': self.x[9:12],
            'covariance': np.diag(self.P)
        }
```

### 4. IMU Preintegration

Implement IMU preintegration for efficient optimization:

```python
import numpy as np
from scipy.spatial.transform import Rotation

class IMUPreintegration:
    """IMU preintegration for factor graph optimization."""

    def __init__(self, acc_noise=0.01, gyro_noise=0.001,
                 acc_bias_noise=0.0001, gyro_bias_noise=0.00001):
        self.acc_noise = acc_noise
        self.gyro_noise = gyro_noise
        self.acc_bias_noise = acc_bias_noise
        self.gyro_bias_noise = gyro_bias_noise

        self.reset()

    def reset(self):
        """Reset preintegration."""
        self.delta_R = np.eye(3)
        self.delta_v = np.zeros(3)
        self.delta_p = np.zeros(3)
        self.delta_t = 0.0

        # Jacobians for bias correction
        self.dR_dbg = np.zeros((3, 3))
        self.dv_dba = np.zeros((3, 3))
        self.dv_dbg = np.zeros((3, 3))
        self.dp_dba = np.zeros((3, 3))
        self.dp_dbg = np.zeros((3, 3))

        # Covariance
        self.cov = np.zeros((9, 9))

    def integrate(self, acc, gyro, dt, acc_bias=None, gyro_bias=None):
        """Integrate IMU measurement."""
        if acc_bias is None:
            acc_bias = np.zeros(3)
        if gyro_bias is None:
            gyro_bias = np.zeros(3)

        # Remove bias
        acc_unbiased = acc - acc_bias
        gyro_unbiased = gyro - gyro_bias

        # Rotation increment
        theta = gyro_unbiased * dt
        dR = Rotation.from_rotvec(theta).as_matrix()

        # Update preintegrated measurements
        self.delta_p += self.delta_v * dt + 0.5 * self.delta_R @ acc_unbiased * dt**2
        self.delta_v += self.delta_R @ acc_unbiased * dt
        self.delta_R = self.delta_R @ dR
        self.delta_t += dt

        # Update Jacobians
        self._update_jacobians(acc_unbiased, gyro_unbiased, dt)

        # Update covariance
        self._update_covariance(dt)

    def _update_jacobians(self, acc, gyro, dt):
        """Update bias Jacobians."""
        # Skew-symmetric matrix
        def skew(v):
            return np.array([
                [0, -v[2], v[1]],
                [v[2], 0, -v[0]],
                [-v[1], v[0], 0]
            ])

        theta = gyro * dt
        Jr = self._right_jacobian(theta)

        # Update rotation Jacobian
        self.dR_dbg = self.delta_R.T @ self.dR_dbg - Jr * dt

        # Update velocity Jacobians
        self.dv_dba = self.dv_dba - self.delta_R * dt
        self.dv_dbg = self.dv_dbg - self.delta_R @ skew(acc) @ self.dR_dbg * dt

        # Update position Jacobians
        self.dp_dba = self.dp_dba + self.dv_dba * dt - 0.5 * self.delta_R * dt**2
        self.dp_dbg = self.dp_dbg + self.dv_dbg * dt - 0.5 * self.delta_R @ skew(acc) @ self.dR_dbg * dt**2

    def _right_jacobian(self, theta):
        """Compute right Jacobian of SO(3)."""
        angle = np.linalg.norm(theta)
        if angle < 1e-8:
            return np.eye(3)

        axis = theta / angle
        s = np.sin(angle)
        c = np.cos(angle)

        return (s / angle) * np.eye(3) + \
               (1 - s / angle) * np.outer(axis, axis) + \
               ((1 - c) / angle) * self._skew(axis)

    def _skew(self, v):
        """Skew-symmetric matrix."""
        return np.array([
            [0, -v[2], v[1]],
            [v[2], 0, -v[0]],
            [-v[1], v[0], 0]
        ])

    def _update_covariance(self, dt):
        """Update preintegration covariance."""
        # Simplified covariance propagation
        A = np.eye(9)
        B = np.eye(9) * dt

        noise_cov = np.diag([
            self.gyro_noise**2, self.gyro_noise**2, self.gyro_noise**2,
            self.acc_noise**2, self.acc_noise**2, self.acc_noise**2,
            self.gyro_bias_noise**2, self.gyro_bias_noise**2, self.gyro_bias_noise**2
        ])

        self.cov = A @ self.cov @ A.T + B @ noise_cov @ B.T

    def get_preintegrated(self):
        """Return preintegrated measurements."""
        return {
            'delta_R': self.delta_R,
            'delta_v': self.delta_v,
            'delta_p': self.delta_p,
            'delta_t': self.delta_t,
            'covariance': self.cov,
            'jacobians': {
                'dR_dbg': self.dR_dbg,
                'dv_dba': self.dv_dba,
                'dv_dbg': self.dv_dbg,
                'dp_dba': self.dp_dba,
                'dp_dbg': self.dp_dbg
            }
        }
```

### 5. Noise Covariance Tuning

Guidelines for tuning process and measurement noise:

```python
def tune_noise_covariances(sensor_data_log, initial_Q, initial_R):
    """
    Autotuning for noise covariances using innovation analysis.

    Parameters:
    - sensor_data_log: List of sensor measurements
    - initial_Q: Initial process noise covariance
    - initial_R: Initial measurement noise covariance

    Returns:
    - Tuned Q and R matrices
    """
    from scipy.optimize import minimize

    def compute_nees(Q_diag, R_diag, data):
        """Compute Normalized Estimation Error Squared."""
        ekf = RobotEKF()
        ekf.Q = np.diag(Q_diag)

        nees_values = []
        for measurement in data:
            ekf.predict()

            # Compute innovation
            z = measurement['z']
            H = measurement['H']
            R = np.diag(R_diag[:len(z)])

            y = z - H @ ekf.x
            S = H @ ekf.P @ H.T + R

            # NEES
            nees = y.T @ np.linalg.inv(S) @ y / len(z)
            nees_values.append(nees)

            ekf._ekf_update(z, H, R)

        return np.mean(nees_values)

    def objective(params):
        n_Q = len(initial_Q)
        Q_diag = params[:n_Q]
        R_diag = params[n_Q:]

        nees = compute_nees(Q_diag, R_diag, sensor_data_log)
        # NEES should be close to 1 for consistent estimation
        return (nees - 1.0)**2

    initial_params = np.concatenate([np.diag(initial_Q), np.diag(initial_R)])

    result = minimize(objective, initial_params,
                     method='L-BFGS-B',
                     bounds=[(1e-6, 10)] * len(initial_params))

    n_Q = len(initial_Q)
    Q_tuned = np.diag(result.x[:n_Q])
    R_tuned = np.diag(result.x[n_Q:])

    return Q_tuned, R_tuned
```

### 6. Launch Configuration

Launch robot_localization with sensor fusion:

```python
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare

def generate_launch_description():
    pkg_share = FindPackageShare('my_robot_localization')

    ekf_config = PathJoinSubstitution([pkg_share, 'config', 'ekf.yaml'])

    return LaunchDescription([
        # EKF for odometry frame
        Node(
            package='robot_localization',
            executable='ekf_node',
            name='ekf_filter_node_odom',
            output='screen',
            parameters=[ekf_config],
            remappings=[
                ('odometry/filtered', 'odometry/local'),
                ('accel/filtered', 'accel/local')
            ]
        ),

        # EKF for map frame (with GPS)
        Node(
            package='robot_localization',
            executable='ekf_node',
            name='ekf_filter_node_map',
            output='screen',
            parameters=[ekf_config],
            remappings=[
                ('odometry/filtered', 'odometry/global'),
                ('accel/filtered', 'accel/global')
            ]
        ),

        # NavSat transform for GPS
        Node(
            package='robot_localization',
            executable='navsat_transform_node',
            name='navsat_transform_node',
            output='screen',
            parameters=[ekf_config],
            remappings=[
                ('odometry/filtered', 'odometry/global'),
                ('gps/fix', 'gps/data'),
                ('imu/data', 'imu/data')
            ]
        )
    ])
```

## MCP Server Integration

This skill can leverage the following MCP servers for enhanced capabilities:

| Server | Description | Reference |
|--------|-------------|-----------|
| ros-mcp-server | ROS topic access | [GitHub](https://github.com/robotmcp/ros-mcp-server) |

## Best Practices

1. **Sensor calibration** - Accurate calibration is essential for fusion quality
2. **Noise characterization** - Measure actual sensor noise statistics
3. **Frame conventions** - Follow REP-105 for coordinate frames
4. **Outlier rejection** - Implement Mahalanobis distance checks
5. **Time synchronization** - Ensure sensors are time-synchronized
6. **Graceful degradation** - Handle sensor failures gracefully

## Process Integration

This skill integrates with the following processes:
- `sensor-fusion-framework.js` - Primary fusion framework
- `visual-slam-implementation.js` - VIO fusion
- `lidar-mapping-localization.js` - LiDAR-inertial fusion
- `robot-calibration.js` - Sensor calibration

## Output Format

When executing operations, provide structured output:

```json
{
  "operation": "configure-fusion",
  "filterType": "EKF",
  "status": "success",
  "sensors": {
    "imu": {"fused": true, "rate": 200},
    "odom": {"fused": true, "rate": 50},
    "gps": {"fused": true, "rate": 5}
  },
  "artifacts": [
    "config/ekf_localization.yaml",
    "launch/localization.launch.py"
  ],
  "tuningRecommendations": [
    "Consider increasing IMU trust (lower R values)",
    "GPS noise may need adjustment for urban environments"
  ]
}
```

## Constraints

- Verify sensor time synchronization before fusion
- Ensure coordinate frame consistency (REP-105)
- Monitor filter divergence indicators
- Test outlier rejection with ground truth
- Validate covariance growth during sensor dropout