diff --git a/leo_fw/leo_fw/nodes/__init__.py b/leo_fw/leo_fw/nodes/__init__.py index a8dfa76..11fc4f2 100644 --- a/leo_fw/leo_fw/nodes/__init__.py +++ b/leo_fw/leo_fw/nodes/__init__.py @@ -19,5 +19,6 @@ # THE SOFTWARE. from .parameter_bridge import ParameterBridge +from .imu_calibrator import ImuCalibrator -__all__ = ["ParameterBridge"] +__all__ = ["ParameterBridge", "ImuCalibrator"] diff --git a/leo_fw/leo_fw/nodes/imu_calibrator.py b/leo_fw/leo_fw/nodes/imu_calibrator.py new file mode 100644 index 0000000..b426d5f --- /dev/null +++ b/leo_fw/leo_fw/nodes/imu_calibrator.py @@ -0,0 +1,83 @@ +# Copyright 2023 Fictionlab sp. z o.o. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +#!/usr/bin/env python3 + +import numpy as np + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSReliabilityPolicy +from leo_msgs.srv import SetImuCalibration +from leo_msgs.msg import Imu + + +class ImuCalibrator(Node): + def __init__(self, time=2.0): + super().__init__("imu_calibrator") + + self.service_client = self.create_client( + SetImuCalibration, "set_imu_calibration" + ) + + while not self.service_client.wait_for_service(timeout_sec=1.0): + self.get_logger().info("service not available, waiting again...") + + self.data = [] + self.end_time = self.get_clock().now() + rclpy.duration.Duration(seconds=time) + + qos = QoSProfile( + depth=1, + durability=QoSDurabilityPolicy.VOLATILE, + reliability=QoSReliabilityPolicy.BEST_EFFORT, + ) + + self.running = True + + self.imu_sub = self.create_subscription( + Imu, "firmware/imu", self.imu_sub_callback, qos + ) + + self.get_logger().info(f"Collecting IMU measurements for {time} seconds...") + + def imu_sub_callback(self, data: Imu): + if self.get_clock().now() >= self.end_time: + self.running = False + self.destroy_subscription(self.imu_sub) + + self.data.append([data.gyro_x, data.gyro_y, data.gyro_z]) + + def send_bias(self): + self.get_logger().info(f"Calculating bias from {len(self.data)} samples.") + + matrix = np.matrix(self.data) + bias = matrix.mean(0) * -1.0 + bias = bias.tolist()[0] + + self.get_logger().info(f"Calculated bias: {bias}") + + req = SetImuCalibration.Request() + req.gyro_bias_x, req.gyro_bias_y, req.gyro_bias_z = bias + + future = self.service_client.call_async(req) + rclpy.spin_until_future_complete(self, future) + + if not future.result().success: + self.get_logger().error("Failed to set new imu calibration.") diff --git a/leo_fw/package.xml b/leo_fw/package.xml index 8a31d50..77775ab 100644 --- a/leo_fw/package.xml +++ b/leo_fw/package.xml @@ -22,6 +22,7 @@ ament_index_python geometry_msgs python3-dbus + python3-numpy python3-whichcraft python3-yaml rclpy diff --git a/leo_fw/scripts/calibrate_imu b/leo_fw/scripts/calibrate_imu index b83cf68..cff6a01 100755 --- a/leo_fw/scripts/calibrate_imu +++ b/leo_fw/scripts/calibrate_imu @@ -2,95 +2,34 @@ import argparse -import numpy as np +from leo_fw.nodes import ImuCalibrator import rclpy -from rclpy.node import Node -from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSReliabilityPolicy -from leo_msgs.srv import SetImuCalibration -from leo_msgs.msg import Imu +rclpy.init() -class Calibrator(Node): - def __init__(self, time=2.0): - super().__init__("calibrate_imu") +parser = argparse.ArgumentParser( + description="Calculate imu bias and send it to firmware_message_converter.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, +) - self.service_client = self.create_client( - SetImuCalibration, "set_imu_calibration" - ) +parser.add_argument( + "duration", + default=10.0, + type=float, + nargs="?", + help="The duration in seconds for which the IMU data is collected.", +) - while not self.service_client.wait_for_service(timeout_sec=1.0): - self.get_logger().info("service not available, waiting again...") +args = parser.parse_args() - self.data = [] - self.end_time = self.get_clock().now() + rclpy.duration.Duration(seconds=time) +calib = ImuCalibrator(args.duration) - qos = QoSProfile( - depth=1, - durability=QoSDurabilityPolicy.VOLATILE, - reliability=QoSReliabilityPolicy.BEST_EFFORT, - ) +while calib.running and rclpy.ok(): + rclpy.spin_once(calib) - self.running = True +calib.send_bias() - self.imu_sub = self.create_subscription( - Imu, "firmware/imu", self.imu_sub_callback, qos - ) - - self.get_logger().info(f"Collecting IMU measurements for {time} seconds...") - - def imu_sub_callback(self, data: Imu): - if self.get_clock().now() >= self.end_time: - self.running = False - self.destroy_subscription(self.imu_sub) - - self.data.append([data.gyro_x, data.gyro_y, data.gyro_z]) - - def send_bias(self): - self.get_logger().info(f"Calculating bias from {len(self.data)} samples.") - - matrix = np.matrix(self.data) - bias = matrix.mean(0) * -1.0 - bias = bias.tolist()[0] - - self.get_logger().info(f"Calculated bias: {bias}") - - req = SetImuCalibration.Request() - req.gyro_bias_x, req.gyro_bias_y, req.gyro_bias_z = bias - - self.future = self.service_client.call_async(req) - rclpy.spin_until_future_complete(self, self.future) - - if not self.future.result().success: - self.get_logger().error(f"Failed to set new imu calibration.") - - -def add_arguments(parser: argparse.ArgumentParser): - parser.add_argument( - "duration", - default=10.0, - type=float, - nargs="?", - help="The duration in seconds for which the IMU data is collected.", - ) - - -if __name__ == "__main__": - rclpy.init() - - parser = argparse.ArgumentParser( - description="Calculate imu bias and send it to firmware_message_converter.", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - add_arguments(parser) - args = parser.parse_args() - - calib = Calibrator(args.duration) - - while calib.running and rclpy.ok(): - rclpy.spin_once(calib) - - calib.send_bias() - calib.get_logger().info("Finishing node.") - calib.destroy_node() - rclpy.shutdown() +calib.get_logger().info("Finishing node.") +calib.destroy_node() +rclpy.shutdown()