From 7db4069bb0f355c698497fb5d35e70908850b7b3 Mon Sep 17 00:00:00 2001 From: NishanthJKumar Date: Wed, 26 Jul 2023 17:28:24 -0400 Subject: [PATCH] success! it works! --- predicators/envs/spot_env.py | 6 +- predicators/spot_utils/perception_utils.py | 407 +++++++++++---------- predicators/spot_utils/spot_utils.py | 310 +++++++--------- 3 files changed, 344 insertions(+), 379 deletions(-) diff --git a/predicators/envs/spot_env.py b/predicators/envs/spot_env.py index 130a47e311..b7cfc9304d 100644 --- a/predicators/envs/spot_env.py +++ b/predicators/envs/spot_env.py @@ -313,20 +313,20 @@ def _build_observation(self, may vary per environment. """ # Get the camera images. - rgb_img_dict, rgb_img_response_dict, _, depth_img_response_dict = self._spot_interface.get_camera_images( + rgb_img_dict, rgb_img_response_dict, depth_img_dict, depth_img_response_dict = self._spot_interface.get_camera_images( ) # Detect objects using AprilTags (which we need for surfaces anyways). object_names_in_view_by_camera = {} object_names_in_view_by_camera_apriltag = \ self._spot_interface.get_objects_in_view_by_camera\ - (from_apriltag=True, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_response_dict=depth_img_response_dict) + (from_apriltag=True, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_dict=depth_img_dict, depth_image_response_dict=depth_img_response_dict) object_names_in_view_by_camera.update( object_names_in_view_by_camera_apriltag) # Additionally, if we're using SAM, then update using that. if CFG.spot_grasp_use_sam: object_names_in_view_by_camera_sam = self._spot_interface.\ - get_objects_in_view_by_camera(from_apriltag=False, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_response_dict=depth_img_response_dict) + get_objects_in_view_by_camera(from_apriltag=False, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_dict=depth_img_dict, depth_image_response_dict=depth_img_response_dict) # Combine these together to get all objects in view. for k, v in object_names_in_view_by_camera.items(): v.update(object_names_in_view_by_camera_sam[k]) diff --git a/predicators/spot_utils/perception_utils.py b/predicators/spot_utils/perception_utils.py index fdec14b585..b7c4db8c1b 100644 --- a/predicators/spot_utils/perception_utils.py +++ b/predicators/spot_utils/perception_utils.py @@ -12,13 +12,14 @@ import matplotlib import matplotlib.pyplot as plt import numpy as np +import PIL import requests from bosdyn.api import image_pb2 from numpy.typing import NDArray -from PIL import Image from scipy import ndimage from predicators.settings import CFG +from predicators.structs import Image # NOTE: uncomment this line if trying to visualize stuff locally # and matplotlib isn't displaying. @@ -32,9 +33,21 @@ 'left_fisheye_image': 0, 'right_fisheye_image': 180 } +CAMERA_NAMES = [ + "hand_color_image", "left_fisheye_image", "right_fisheye_image", + "frontleft_fisheye_image", "frontright_fisheye_image", "back_fisheye_image" +] +RGB_TO_DEPTH_CAMERAS = { + "hand_color_image": "hand_depth_in_hand_color_frame", + "left_fisheye_image": "left_depth_in_visual_frame", + "right_fisheye_image": "right_depth_in_visual_frame", + "frontleft_fisheye_image": "frontleft_depth_in_visual_frame", + "frontright_fisheye_image": "frontright_depth_in_visual_frame", + "back_fisheye_image": "back_depth_in_visual_frame" +} -def image_to_bytes(img: Image.Image) -> io.BytesIO: +def image_to_bytes(img: PIL.Image.Image) -> io.BytesIO: """Helper function to convert from a PIL image into a bytes object.""" buf = io.BytesIO() img.save(buf, format="PNG") @@ -42,7 +55,7 @@ def image_to_bytes(img: Image.Image) -> io.BytesIO: return buf -def visualize_output(im: Image.Image, masks: NDArray, input_boxes: NDArray, +def visualize_output(im: PIL.Image.Image, masks: NDArray, input_boxes: NDArray, classes: NDArray, scores: NDArray) -> None: """Visualizes the output of SAM; useful for debugging. @@ -98,73 +111,87 @@ def show_box(box: NDArray, ax: matplotlib.axes.Axes) -> None: lw=2)) -def query_detic_sam(image_in: NDArray, classes: List[str], - viz: bool) -> Dict[str, List[NDArray]]: +def query_detic_sam(rgb_image_dict_in: Dict[str, Image], classes: List[str], + viz: bool) -> Dict[str, Dict[str, List[NDArray]]]: """Send a query to SAM and return the response. - The response is a dictionary that contains 4 keys: 'boxes', - 'classes', 'masks' and 'scores'. + TODO """ - image = Image.fromarray(image_in) - buf = image_to_bytes(image) - r = requests.post("http://localhost:5550/predict", - files={"file": buf}, + buf_dict = {} + for source_name in rgb_image_dict_in.keys(): + image = PIL.Image.fromarray(rgb_image_dict_in[source_name]) + buf_dict[source_name] = image_to_bytes(image) + + r = requests.post("http://localhost:5550/batch_predict", + files=buf_dict, data={"classes": ",".join(classes)}) - d_filtered: Dict[str, List[NDArray]] = { - "boxes": [], - "classes": [], - "masks": [], - "scores": [] - } + detic_sam_results: Dict[str, Dict[str, List[NDArray]]] = {} + for source_name in rgb_image_dict_in.keys(): + detic_sam_results[source_name] = { + "boxes": [], + "classes": [], + "masks": [], + "scores": [] + } + # If the status code is not 200, then fail. if r.status_code != 200: - return d_filtered + return detic_sam_results with io.BytesIO(r.content) as f: try: arr = np.load(f, allow_pickle=True) except pkl.UnpicklingError: - return d_filtered - - boxes = arr['boxes'] - ret_classes = arr['classes'] - masks = arr['masks'] - scores = arr['scores'] - - d = { - "boxes": boxes, - "classes": ret_classes, - "masks": masks, - "scores": scores - } - - if viz: - # Optional visualization useful for debugging. - visualize_output(image, d["masks"], d["boxes"], d["classes"], - d["scores"]) - - # Filter out detections by confidence. We threshold detections - # at a set confidence level minimum, and if there are multiple - #, we only select the most confident one. This structure makes - # it easy for us to select multiple detections if that's ever - # necessary in the future. - for obj_class in classes: - class_mask = (d['classes'] == obj_class) - if not np.any(class_mask): - continue - max_score = np.max(d['scores'][class_mask]) - max_score_idx = np.where(d['scores'] == max_score)[0] - if d['scores'][max_score_idx] < CFG.spot_vision_detection_threshold: - continue - for key, value in d.items(): - # Sanity check to ensure that we're selecting a value from - # the class we're looking for. - if key == "classes": - assert value[max_score_idx] == obj_class - d_filtered[key].append(value[max_score_idx]) - - return d_filtered + return detic_sam_results + for source_name in rgb_image_dict_in.keys(): + curr_boxes = arr[source_name + '_boxes'] + curr_ret_classes = arr[source_name + '_classes'] + curr_masks = arr[source_name + '_masks'] + curr_scores = arr[source_name + '_scores'] + + # If there were no detections (which means all the + # returned values will be numpy arrays of shape (0, 0)) + # then just skip this source. + if curr_ret_classes.size == 0: + continue + + d = { + "boxes": curr_boxes, + "classes": curr_ret_classes, + "masks": curr_masks, + "scores": curr_scores + } + + if viz: + image = PIL.Image.fromarray(rgb_image_dict_in[source_name]) + # Optional visualization useful for debugging. + visualize_output(image, d["masks"], d["boxes"], d["classes"], + d["scores"]) + + # Filter out detections by confidence. We threshold detections + # at a set confidence level minimum, and if there are multiple + #, we only select the most confident one. This structure makes + # it easy for us to select multiple detections if that's ever + # necessary in the future. + for obj_class in classes: + class_mask = (d['classes'] == obj_class) + if not np.any(class_mask): + continue + max_score = np.max(d['scores'][class_mask]) + max_score_idx = np.where(d['scores'] == max_score)[0] + if d['scores'][ + max_score_idx] < CFG.spot_vision_detection_threshold: + continue + for key, value in d.items(): + # Sanity check to ensure that we're selecting a value from + # the class we're looking for. + if key == "classes": + assert value[max_score_idx] == obj_class + detic_sam_results[source_name][key].append( + value[max_score_idx]) + + return detic_sam_results # NOTE: the below function is useful for visualization; uncomment @@ -249,50 +276,6 @@ def query_detic_sam(image_in: NDArray, classes: List[str], # return np.vstack((x, y, z)).T, valid_inds -def process_image_response(image_response: bosdyn.api.image_pb2.ImageResponse, - to_rgb: bool = False) -> NDArray: - """Given a Boston Dynamics SDK image response, extract the correct np array - corresponding to the image.""" - # pylint: disable=no-member - num_bytes = 1 # Assume a default of 1 byte encodings. - if image_response.shot.image.pixel_format == \ - image_pb2.Image.PIXEL_FORMAT_DEPTH_U16: - dtype = np.uint16 - else: - if image_response.shot.image.pixel_format == \ - image_pb2.Image.PIXEL_FORMAT_RGB_U8: - num_bytes = 3 - elif image_response.shot.image.pixel_format == \ - image_pb2.Image.PIXEL_FORMAT_RGBA_U8: - num_bytes = 4 - elif image_response.shot.image.pixel_format == \ - image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U8: - num_bytes = 1 - elif image_response.shot.image.pixel_format == \ - image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U16: - num_bytes = 2 - dtype = np.uint8 # type: ignore - - img = np.frombuffer(image_response.shot.image.data, dtype=dtype) - if image_response.shot.image.format == image_pb2.Image.FORMAT_RAW: - try: - # Attempt to reshape array into a RGB rows X cols shape. - img = img.reshape((image_response.shot.image.rows, - image_response.shot.image.cols, num_bytes)) - except ValueError: - # Unable to reshape the image data, trying a regular decode. - img = cv2.imdecode(img, -1) - else: - img = cv2.imdecode(img, -1) - - # Convert to RGB color, as some perception models assume RGB format - # By default, still use BGR to keep backward compability - if to_rgb: - img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) - - return img - - def get_xyz_from_depth(image_response: bosdyn.api.image_pb2.ImageResponse, depth_value: float, point_x: float, point_y: float) -> Tuple[float, float, float]: @@ -327,26 +310,29 @@ def get_xyz_from_depth(image_response: bosdyn.api.image_pb2.ImageResponse, def get_pixel_locations_with_detic_sam( obj_class: str, - in_res_image: Dict[str, NDArray], + rgb_image_dict: Dict[str, NDArray], plot: bool = False) -> List[Tuple[float, float]]: """Method to get the pixel locations of specific objects with class names listed in 'classes' within an input image.""" - res_segment = query_detic_sam(image_in=in_res_image['rgb'], + res_segment = query_detic_sam(rgb_image_dict_in=rgb_image_dict, classes=[obj_class], viz=plot) - # return: 'masks', 'boxes', 'classes' - if len(res_segment['classes']) == 0: - return [] + + assert len(rgb_image_dict.keys()) == 1 + camera_name = list(rgb_image_dict.keys())[0] + if len(res_segment[camera_name]['classes']) == 0: + return pixel_locations + pixel_locations = [] # Compute geometric center of object bounding box - x1, y1, x2, y2 = res_segment['boxes'][0].squeeze() + x1, y1, x2, y2 = res_segment[camera_name]['boxes'][0].squeeze() x_c = (x1 + x2) / 2 y_c = (y1 + y2) / 2 # Plot center and segmentation mask if plot: - plt.imshow(res_segment['masks'][0][0].squeeze()) + plt.imshow(res_segment[camera_name]['masks'][0][0].squeeze()) plt.show() pixel_locations.append((x_c, y_c)) @@ -355,9 +341,10 @@ def get_pixel_locations_with_detic_sam( def get_object_locations_with_detic_sam( classes: List[str], - res_image: Dict[str, NDArray], - res_image_responses: Dict[str, bosdyn.api.image_pb2.ImageResponse], - source_name: str, + rgb_image_dict: Dict[str, Image], + depth_image_dict: Dict[str, Image], + depth_image_response_dict: Dict[str, + bosdyn.api.image_pb2.ImageResponse], plot: bool = False) -> Dict[str, Tuple[float, float, float]]: """Given a list of string queries (classes), call SAM on these and return the positions of the centroids of these detections in the camera frame. @@ -371,97 +358,111 @@ def get_object_locations_with_detic_sam( # First, rotate the rgb and depth images by the correct angle. # Importantly, DO NOT reshape the image, because this will # lead to a bunch of problems when we reverse the rotation later. - rotated_rgb = ndimage.rotate(res_image['rgb'], - ROTATION_ANGLE[source_name], - reshape=False) - rotated_depth = ndimage.rotate(res_image['depth'], - ROTATION_ANGLE[source_name], - reshape=False) - - # Plot the rotated image before querying SAM. - if plot: - plt.imshow(rotated_rgb) - plt.show() - - # Start by querying SAM - res_segment = query_detic_sam(image_in=rotated_rgb, - classes=classes, - viz=plot) - - if len(res_segment['classes']) == 0: - return {} - - ret_obj_positions: Dict[str, Tuple[float, float, float]] = {} - for i, obj_class in enumerate(res_segment['classes']): - # Check that this particular class is one of the - # classes we passed in, and that there was only one - # instance of this class that was found. - assert obj_class.item() in classes - assert res_segment['classes'].count(obj_class) == 1 - # Compute median value of depth - depth_median = np.median( - rotated_depth[res_segment['masks'][i][0].squeeze() - & (rotated_depth > 2)[:, :, 0]]) - # Compute geometric center of object bounding box - x1, y1, x2, y2 = res_segment['boxes'][i].squeeze() - x_c = (x1 + x2) / 2 - y_c = (y1 + y2) / 2 - # Create a transformation matrix for the rotation. Be very - # careful to use radians, since np.cos and np.sin expect - # angles in radians and not degrees. - rotation_radians = np.radians(ROTATION_ANGLE[source_name]) - transform_matrix = np.array( - [[np.cos(rotation_radians), -np.sin(rotation_radians)], - [np.sin(rotation_radians), - np.cos(rotation_radians)]]) - # Subtract the center of the image from the pixel location to - # translate the rotation to the origin. - center = np.array( - [rotated_rgb.shape[1] / 2., rotated_rgb.shape[0] / 2.]) - pixel_centered = np.array([x_c, y_c]) - center - # Apply the rotation - rotated_pixel_centered = np.matmul(transform_matrix, pixel_centered) - # Add the center of the image back to the pixel location to - # translate the rotation back from the origin. - rotated_pixel = rotated_pixel_centered + center - # Now rotated_pixel is the location of the centroid pixel after the - # inverse rotation. - x_c_rotated = rotated_pixel[0] - y_c_rotated = rotated_pixel[1] - - # Plot (1) the original RGB image, (2) the rotated - # segmentation mask from SAM on top of it, (3) the - # center of the image, (4) the centroid of the detected - # object that comes from SAM, and (5) the centroid - # after we rotate it back to align with the original - # RGB image. + rotated_rgb_image_dict = {} + rotated_depth_image_dict = {} + for source_name in CAMERA_NAMES: + assert source_name in rgb_image_dict and source_name in depth_image_dict + rotated_rgb = ndimage.rotate(rgb_image_dict[source_name], + ROTATION_ANGLE[source_name], + reshape=False) + rotated_depth = ndimage.rotate(depth_image_dict[source_name], + ROTATION_ANGLE[source_name], + reshape=False) + rotated_rgb_image_dict[source_name] = rotated_rgb + rotated_depth_image_dict[source_name] = rotated_depth + + # Plot the rotated image before querying DETIC-SAM. if plot: - inverse_rotation_angle = -ROTATION_ANGLE[source_name] - plt.imshow(res_image['depth']) - plt.imshow(ndimage.rotate(res_segment['masks'][i][0].squeeze(), - inverse_rotation_angle, - reshape=False), - alpha=0.5, - cmap='Reds') - plt.scatter(x=x_c_rotated, - y=y_c_rotated, - marker='*', - color='red', - zorder=3) - plt.scatter(x=center[0], - y=center[1], - marker='.', - color='blue', - zorder=3) - plt.scatter(x=x_c, y=y_c, marker='*', color='green', zorder=3) + plt.imshow(rotated_rgb) plt.show() - # Get XYZ of the point at center of bounding box and median depth value. - x0, y0, z0 = get_xyz_from_depth(res_image_responses['depth'], - depth_value=depth_median, - point_x=x_c_rotated, - point_y=y_c_rotated) - if not math.isnan(x0) and not math.isnan(y0) and not math.isnan(z0): - ret_obj_positions[obj_class.item()] = (x0, y0, z0) - - return ret_obj_positions + # Start by querying the DETIC-SAM model. + deticsam_results_all_cameras = query_detic_sam( + rgb_image_dict_in=rotated_rgb_image_dict, classes=classes, viz=plot) + + ret_camera_to_obj_positions: Dict[str, + Dict[str, + Tuple[float, float, float]]] = { + source_name: {} + for source_name in CAMERA_NAMES + } + for source_name in CAMERA_NAMES: + curr_res_segment = deticsam_results_all_cameras[source_name] + for i, obj_class in enumerate(curr_res_segment['classes']): + # Check that this particular class is one of the + # classes we passed in, and that there was only one + # instance of this class that was found. + assert obj_class.item() in classes + assert curr_res_segment['classes'].count(obj_class) == 1 + # Compute median value of depth + depth_median = np.median( + rotated_depth[curr_res_segment['masks'][i][0].squeeze() + & (rotated_depth > 2)[:, :, 0]]) + # Compute geometric center of object bounding box + x1, y1, x2, y2 = curr_res_segment['boxes'][i].squeeze() + x_c = (x1 + x2) / 2 + y_c = (y1 + y2) / 2 + # Create a transformation matrix for the rotation. Be very + # careful to use radians, since np.cos and np.sin expect + # angles in radians and not degrees. + rotation_radians = np.radians(ROTATION_ANGLE[source_name]) + transform_matrix = np.array( + [[np.cos(rotation_radians), -np.sin(rotation_radians)], + [np.sin(rotation_radians), + np.cos(rotation_radians)]]) + # Subtract the center of the image from the pixel location to + # translate the rotation to the origin. + center = np.array( + [rotated_rgb.shape[1] / 2., rotated_rgb.shape[0] / 2.]) + pixel_centered = np.array([x_c, y_c]) - center + # Apply the rotation + rotated_pixel_centered = np.matmul(transform_matrix, + pixel_centered) + # Add the center of the image back to the pixel location to + # translate the rotation back from the origin. + rotated_pixel = rotated_pixel_centered + center + # Now rotated_pixel is the location of the centroid pixel after the + # inverse rotation. + x_c_rotated = rotated_pixel[0] + y_c_rotated = rotated_pixel[1] + + # Plot (1) the original RGB image, (2) the rotated + # segmentation mask from SAM on top of it, (3) the + # center of the image, (4) the centroid of the detected + # object that comes from SAM, and (5) the centroid + # after we rotate it back to align with the original + # RGB image. + if plot: + inverse_rotation_angle = -ROTATION_ANGLE[source_name] + plt.imshow(depth_image_dict[source_name]) + plt.imshow(ndimage.rotate( + curr_res_segment['masks'][i][0].squeeze(), + inverse_rotation_angle, + reshape=False), + alpha=0.5, + cmap='Reds') + plt.scatter(x=x_c_rotated, + y=y_c_rotated, + marker='*', + color='red', + zorder=3) + plt.scatter(x=center[0], + y=center[1], + marker='.', + color='blue', + zorder=3) + plt.scatter(x=x_c, y=y_c, marker='*', color='green', zorder=3) + plt.show() + + # Get XYZ of the point at center of bounding box and median depth value. + x0, y0, z0 = get_xyz_from_depth( + depth_image_response_dict[source_name], + depth_value=depth_median, + point_x=x_c_rotated, + point_y=y_c_rotated) + if not math.isnan(x0) and not math.isnan(y0) and not math.isnan( + z0): + ret_camera_to_obj_positions[source_name][obj_class.item()] = ( + x0, y0, z0) + + return ret_camera_to_obj_positions diff --git a/predicators/spot_utils/spot_utils.py b/predicators/spot_utils/spot_utils.py index 0ed4c6afd5..2cd01ccf0a 100644 --- a/predicators/spot_utils/spot_utils.py +++ b/predicators/spot_utils/spot_utils.py @@ -34,9 +34,9 @@ from predicators.settings import CFG from predicators.spot_utils.helpers.graph_nav_command_line import \ GraphNavInterface -from predicators.spot_utils.perception_utils import \ - get_object_locations_with_detic_sam, get_pixel_locations_with_detic_sam, \ - process_image_response +from predicators.spot_utils.perception_utils import CAMERA_NAMES, \ + RGB_TO_DEPTH_CAMERAS, get_object_locations_with_detic_sam, \ + get_pixel_locations_with_detic_sam from predicators.structs import Array, Image, Object g_image_click = None @@ -116,19 +116,6 @@ def get_memorized_waypoint(obj_name: str) -> Optional[Tuple[str, Array]]: COMMAND_TIMEOUT = 20.0 -CAMERA_NAMES = [ - "hand_color_image", "left_fisheye_image", "right_fisheye_image", - "frontleft_fisheye_image", "frontright_fisheye_image", "back_fisheye_image" -] -RGB_TO_DEPTH_CAMERAS = { - "hand_color_image": "hand_depth_in_hand_color_frame", - "left_fisheye_image": "left_depth_in_visual_frame", - "right_fisheye_image": "right_depth_in_visual_frame", - "frontleft_fisheye_image": "frontleft_depth_in_visual_frame", - "frontright_fisheye_image": "frontright_depth_in_visual_frame", - "back_fisheye_image": "back_depth_in_visual_frame" -} - def _find_object_center(img: Image, obj_name: str) -> Optional[Tuple[int, int]]: @@ -197,7 +184,7 @@ def __init__(self) -> None: self._force_horizontal_grasp = False self._force_squeeze_grasp = False self._force_top_down_grasp = False - self._image_source = "hand_color_image" + self._grasp_image_source = "hand_color_image" self.hand_x, self.hand_y, self.hand_z = (0.80, 0, 0.45) self.hand_x_bounds = (0.3, 0.9) @@ -328,40 +315,34 @@ def get_single_camera_image( def get_objects_in_view_by_camera( self, from_apriltag: bool, rgb_image_dict: Dict[str, Image], + depth_image_dict: Dict[str, Image], rgb_image_response_dict: Dict[str, bosdyn.api.image_pb2.ImageResponse], depth_image_response_dict: Dict[str, bosdyn.api.image_pb2.ImageResponse] ) -> Dict[str, Dict[str, Tuple[float, float, float]]]: """Get objects currently in view for each camera.""" - tag_to_pose: Dict[str, - Dict[int, - Tuple[float, float, - float]]] = {k: {} - for k in CAMERA_NAMES} - for source_name in CAMERA_NAMES: - if from_apriltag: - viewable_obj_poses = self.get_apriltag_pose_from_img( - rgb_image_dict[source_name], - rgb_image_response_dict[source_name]) - else: - # First, get a dictionary mapping vision prompts - # to the corresponding location of that object in the - # scene by camera - sam_pose_results = self.get_deticsam_pose_from_imgs( - rgb_image_response=rgb_image_response_dict[source_name], - depth_image_response=depth_image_response_dict[ - source_name], - source_rgb=source_name, - classes=list(obj_name_to_vision_prompt.values())) - # Next, convert the keys of this dictionary to be april - # tag id's instead. - viewable_obj_poses: Dict[int, # type: ignore - Tuple[float, float, float]] = {} - for k, v in sam_pose_results.items(): + tag_to_pose = {source_name: {} for source_name in CAMERA_NAMES} + if from_apriltag: + tag_to_pose = self.get_apriltag_poses_from_imgs( + rgb_image_dict, rgb_image_response_dict) + else: + # First, get a dictionary mapping vision prompts + # to the corresponding location of that object in the + # scene by camera + detic_sam_pose_results = self.get_deticsam_pose_from_imgs( + classes=list(obj_name_to_vision_prompt.values()), + rgb_image_dict=rgb_image_dict, + depth_image_dict=depth_image_dict, + rgb_image_response_dict=rgb_image_response_dict, + depth_image_response_dict=depth_image_response_dict) + # Next, convert the keys of this dictionary to be april + # tag id's instead. + for source_name, obj_pose_dict in detic_sam_pose_results.items(): + viewable_obj_poses: Dict[int, Tuple[float, float, float]] = {} + for k, v in obj_pose_dict.items(): viewable_obj_poses[obj_name_to_apriltag_id[ vision_prompt_to_obj_name[k]]] = v - - tag_to_pose[source_name].update(viewable_obj_poses) + tag_to_pose[source_name].update(viewable_obj_poses) apriltag_id_to_obj_name = { v: k @@ -427,111 +408,123 @@ def get_localized_state(self) -> Any: logging.warning("WARNING: Localization timed out!") return state - def get_apriltag_pose_from_img( + def get_apriltag_poses_from_imgs( self, - rgb_image: Image, - rgb_image_response: bosdyn.api.image_pb2.ImageResponse, + rgb_image_dict: Dict[str, Image], + rgb_image_response_dict: Dict[str, bosdyn.api.image_pb2.ImageResponse], fiducial_size: float = CFG.spot_fiducial_size, - ) -> Dict[int, Tuple[float, float, float]]: + ) -> Dict[str, Dict[int, Tuple[float, float, float]]]: """Get the poses of all fiducials in camera view. - This only works with these camera sources: "hand_color_image", - "back_fisheye_image", "left_fisheye_image". Also, the fiducial - size has to be correctly defined in arguments (in mm). Also, it - only works for tags that start with "40" in their ID. + The fiducial size has to be correctly defined in arguments (in mm). + Also, it only works for tags that start with "40" in their ID. Returns a dict mapping the integer of the tag id to an (x, y, z) position tuple in the map frame. """ - # Camera body transform. - camera_tform_body = get_a_tform_b( - rgb_image_response.shot.transforms_snapshot, - rgb_image_response.shot.frame_name_image_sensor, BODY_FRAME_NAME) - - # Camera intrinsics for the given source camera. - intrinsics = rgb_image_response.source.pinhole.intrinsics - - # Convert the image to grayscale. - image_grey = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2GRAY) - - # Create apriltag detector and get all apriltag locations. - options = apriltag.DetectorOptions(families="tag36h11") - options.refine_pose = 1 - detector = apriltag.Detector(options) - detections = detector.detect(image_grey) - obj_poses: Dict[int, Tuple[float, float, float]] = {} - # For every detection find location in graph_nav frame. - for detection in detections: - pose = detector.detection_pose( - detection, - (intrinsics.focal_length.x, intrinsics.focal_length.y, - intrinsics.principal_point.x, intrinsics.principal_point.y), - fiducial_size)[0] - tx, ty, tz, tw = pose[:, -1] - assert np.isclose(tw, 1.0) - fiducial_rt_camera_frame = np.array( - [float(tx) / 1000.0, - float(ty) / 1000.0, - float(tz) / 1000.0]) - - body_tform_fiducial = ( - camera_tform_body.inverse()).transform_point( - fiducial_rt_camera_frame[0], fiducial_rt_camera_frame[1], - fiducial_rt_camera_frame[2]) - - # Get graph_nav to body frame. - state = self.get_localized_state() - gn_origin_tform_body = math_helpers.SE3Pose.from_obj( - state.localization.seed_tform_body) - - # Apply transform to fiducial to body location - fiducial_rt_gn_origin = gn_origin_tform_body.transform_point( - body_tform_fiducial[0], body_tform_fiducial[1], - body_tform_fiducial[2]) - - # This only works for small fiducials because of initial size. - if detection.tag_id in obj_name_to_apriltag_id.values(): - obj_poses[detection.tag_id] = fiducial_rt_gn_origin + tag_to_pose: Dict[str, Dict[int, Tuple[float, float, float]]] = { + source_name: {} + for source_name in CAMERA_NAMES + } + for source_name in CAMERA_NAMES: + assert source_name in rgb_image_dict + assert source_name in rgb_image_response_dict + rgb_image = rgb_image_dict[source_name] + rgb_image_response = rgb_image_response_dict[source_name] - return obj_poses + # Camera body transform. + camera_tform_body = get_a_tform_b( + rgb_image_response.shot.transforms_snapshot, + rgb_image_response.shot.frame_name_image_sensor, + BODY_FRAME_NAME) + + # Camera intrinsics for the given source camera. + intrinsics = rgb_image_response.source.pinhole.intrinsics + + # Convert the image to grayscale. + image_grey = cv2.cvtColor(rgb_image, cv2.COLOR_BGR2GRAY) + + # Create apriltag detector and get all apriltag locations. + options = apriltag.DetectorOptions(families="tag36h11") + options.refine_pose = 1 + detector = apriltag.Detector(options) + detections = detector.detect(image_grey) + obj_poses: Dict[int, Tuple[float, float, float]] = {} + # For every detection find location in graph_nav frame. + for detection in detections: + pose = detector.detection_pose( + detection, + (intrinsics.focal_length.x, intrinsics.focal_length.y, + intrinsics.principal_point.x, + intrinsics.principal_point.y), fiducial_size)[0] + tx, ty, tz, tw = pose[:, -1] + assert np.isclose(tw, 1.0) + fiducial_rt_camera_frame = np.array([ + float(tx) / 1000.0, + float(ty) / 1000.0, + float(tz) / 1000.0 + ]) + + body_tform_fiducial = ( + camera_tform_body.inverse()).transform_point( + fiducial_rt_camera_frame[0], + fiducial_rt_camera_frame[1], + fiducial_rt_camera_frame[2]) + + # Get graph_nav to body frame. + state = self.get_localized_state() + gn_origin_tform_body = math_helpers.SE3Pose.from_obj( + state.localization.seed_tform_body) + + # Apply transform to fiducial to body location + fiducial_rt_gn_origin = gn_origin_tform_body.transform_point( + body_tform_fiducial[0], body_tform_fiducial[1], + body_tform_fiducial[2]) + + # This only works for small fiducials because of initial size. + if detection.tag_id in obj_name_to_apriltag_id.values(): + obj_poses[detection.tag_id] = fiducial_rt_gn_origin + + tag_to_pose[source_name].update(obj_poses) + + return tag_to_pose def get_deticsam_pose_from_imgs( self, classes: List[str], - rgb_image_response: bosdyn.api.image_pb2.ImageResponse, - depth_image_response: bosdyn.api.image_pb2.ImageResponse, - source_rgb: str, + rgb_image_dict: Dict[str, Image], + depth_image_dict: Dict[str, Image], + rgb_image_response_dict: Dict[str, bosdyn.api.image_pb2.ImageResponse], + depth_image_response_dict: Dict[str, + bosdyn.api.image_pb2.ImageResponse], ) -> Dict[str, Tuple[float, float, float]]: """Get object location in 3D (no orientation) estimated using - pretrained SAM model.""" - image = { - 'rgb': process_image_response(rgb_image_response, to_rgb=True), - 'depth': process_image_response(depth_image_response, - to_rgb=False), - } - image_responses = { - 'rgb': rgb_image_response, - 'depth': depth_image_response, - } - + pretrained DETIC-SAM model.""" res_location_dict = get_object_locations_with_detic_sam( classes=classes, - res_image=image, - res_image_responses=image_responses, - source_name=source_rgb, + rgb_image_dict=rgb_image_dict, + depth_image_dict=depth_image_dict, + depth_image_response_dict=depth_image_response_dict, plot=CFG.spot_visualize_vision_model_outputs) - transformed_location_dict: Dict[str, Tuple[float, float, float]] = {} - for obj_class in classes: - if obj_class in res_location_dict: - camera_tform_body = get_a_tform_b( - image_responses['rgb'].shot.transforms_snapshot, - image_responses['rgb'].shot.frame_name_image_sensor, - BODY_FRAME_NAME) - x, y, z = res_location_dict[obj_class] - object_rt_gn_origin = self.convert_obj_location( - camera_tform_body, x, y, z) - transformed_location_dict[obj_class] = object_rt_gn_origin + transformed_location_dict: Dict[str, Dict[str, Tuple[ + float, float, + float]]] = {source_name: {} + for source_name in CAMERA_NAMES} + for source_name in CAMERA_NAMES: + assert source_name in res_location_dict + for obj_class in classes: + if obj_class in res_location_dict[source_name]: + camera_tform_body = get_a_tform_b( + rgb_image_response_dict[source_name].shot. + transforms_snapshot, + rgb_image_response_dict[source_name].shot. + frame_name_image_sensor, BODY_FRAME_NAME) + x, y, z = res_location_dict[source_name][obj_class] + object_rt_gn_origin = self.convert_obj_location( + camera_tform_body, x, y, z) + transformed_location_dict[source_name][ + obj_class] = object_rt_gn_origin # Use the input class name as the identifier for object(s) and # their positions. @@ -736,18 +729,18 @@ def _scan_for_objects( self.navigate_to(waypoint_id, offset) for _ in range(8): objects_in_view: Dict[str, Tuple[float, float, float]] = {} - rgb_img_dict, rgb_img_response_dict, _, depth_img_response_dict = self.get_camera_images( + rgb_img_dict, rgb_img_response_dict, depth_img_dict, depth_img_response_dict = self.get_camera_images( ) # We want to get objects in view both using AprilTags and # using SAM potentially. objects_in_view_by_camera = {} objects_in_view_by_camera_apriltag = \ - self.get_objects_in_view_by_camera(from_apriltag=True, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_response_dict=depth_img_response_dict) + self.get_objects_in_view_by_camera(from_apriltag=True, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_dict=depth_img_dict, depth_image_response_dict=depth_img_response_dict) objects_in_view_by_camera.update( objects_in_view_by_camera_apriltag) if CFG.spot_grasp_use_sam: objects_in_view_by_camera_sam = \ - self.get_objects_in_view_by_camera(from_apriltag=False, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_response_dict=depth_img_response_dict) + self.get_objects_in_view_by_camera(from_apriltag=False, rgb_image_dict=rgb_img_dict, rgb_image_response_dict=rgb_img_response_dict, depth_image_dict=depth_img_dict, depth_image_response_dict=depth_img_response_dict) # Combine these together to get all objects in view. for k, v in objects_in_view_by_camera.items(): v.update(objects_in_view_by_camera_sam[k]) @@ -900,33 +893,17 @@ def arm_object_grasp(self, obj: Object) -> None: assert basic_command_pb2.StandCommand.Feedback.STATUS_IS_STANDING # Take a picture with a camera - self.robot.logger.debug(f'Getting an image from: {self._image_source}') - image_responses = self.image_client.get_image_from_sources( - [self._image_source]) - - if len(image_responses) != 1: - print(f'Got invalid number of images: {str(len(image_responses))}') - print(image_responses) - assert False - - image = image_responses[0] - if image.shot.image.pixel_format == image_pb2.Image.\ - PIXEL_FORMAT_DEPTH_U16: - dtype = np.uint16 # type: ignore - else: - dtype = np.uint8 # type: ignore - img = np.fromstring(image.shot.image.data, dtype=dtype) # type: ignore - if image.shot.image.format == image_pb2.Image.FORMAT_RAW: - img = img.reshape(image.shot.image.rows, image.shot.image.cols) - else: - img = cv2.imdecode(img, -1) + self.robot.logger.debug( + f'Getting an image from: {self._grasp_image_source}') + rgb_img, rgb_img_response = self.get_single_camera_image( + self._grasp_image_source) # pylint: disable=global-variable-not-assigned, global-statement global g_image_click, g_image_display if CFG.spot_grasp_use_apriltag: # Convert Image to grayscale - gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + gray = cv2.cvtColor(rgb_img, cv2.COLOR_BGR2GRAY) # Define the AprilTags detector options and then detect the tags. self.robot.logger.info("[INFO] detecting AprilTags...") @@ -940,25 +917,12 @@ def arm_object_grasp(self, obj: Object) -> None: elif CFG.spot_grasp_use_cv2: if obj.name in ["hammer", "hex_key", "brush", "hex_screwdriver"]: - g_image_click = _find_object_center(img, obj.name) + g_image_click = _find_object_center(rgb_img, obj.name) elif CFG.spot_grasp_use_sam: - image_sources = [ - "hand_color_image", "hand_depth_in_hand_color_frame" - ] - image_request = [ - build_image_request(source, pixel_format=None) - for source in image_sources - ] - image_responses = self.image_client.get_image(image_request) - - image_for_sam = { - 'rgb': process_image_response(image_responses[0]), - 'depth': process_image_response(image_responses[1]), - } results = get_pixel_locations_with_detic_sam( obj_class=obj_name_to_vision_prompt[obj.name], - in_res_image=image_for_sam, + rgb_image_dict={self._grasp_image_source: rgb_img}, plot=CFG.spot_visualize_vision_model_outputs) if len(results) > 0: @@ -975,7 +939,7 @@ def arm_object_grasp(self, obj: Object) -> None: image_title = 'Click to grasp' cv2.namedWindow(image_title) cv2.setMouseCallback(image_title, self.cv_mouse_callback) - g_image_display = img + g_image_display = rgb_img cv2.imshow(image_title, g_image_display) while g_image_click is None: @@ -1002,9 +966,9 @@ def arm_object_grasp(self, obj: Object) -> None: # Build the proto grasp = manipulation_api_pb2.PickObjectInImage( pixel_xy=pick_vec, - transforms_snapshot_for_camera=image.shot.transforms_snapshot, - frame_name_image_sensor=image.shot.frame_name_image_sensor, - camera_model=image.source.pinhole) + transforms_snapshot_for_camera=rgb_img_response.shot.transforms_snapshot, + frame_name_image_sensor=rgb_img_response.shot.frame_name_image_sensor, + camera_model=rgb_img_response.source.pinhole) # Optionally add a grasp constraint. This lets you tell the robot you # only want top-down grasps or side-on grasps.