WIP: feat: add manipulation planning groups#2489
Conversation
❌ 6 Tests Failed:
View the top 3 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
f157926 to
31731b1
Compare
| return result | ||
|
|
||
| @rpc | ||
| def plan_to_pose(self, pose: Pose, robot_name: RobotName | None = None) -> bool: |
There was a problem hiding this comment.
plan_to_pose and plan_to_joints should be just wrapper of the more generic functions, no duplication. plan_to_poses and plan_to_joints
| if self._world_monitor is None or self._kinematics is None: | ||
| return False | ||
| if not pose_targets: | ||
| return self._fail("At least one pose target is required") | ||
| with self._lock: | ||
| if self._state not in (ManipulationState.IDLE, ManipulationState.COMPLETED): | ||
| logger.warning(f"Cannot plan: state is {self._state.name}") | ||
| return False | ||
| self._state = ManipulationState.PLANNING |
There was a problem hiding this comment.
should reuse begin_planning here
Co-authored-by: cc <55869557+TomCC7@users.noreply.github.com>
| if self._world_monitor is None or self._kinematics is None: | ||
| return False | ||
| if not pose_targets: | ||
| return self._fail("At least one pose target is required") | ||
| with self._lock: | ||
| if self._state not in (ManipulationState.IDLE, ManipulationState.COMPLETED): | ||
| logger.warning(f"Cannot plan: state is {self._state.name}") | ||
| return False | ||
| self._state = ManipulationState.PLANNING |
| self._world_monitor.world.list_planning_groups(), robot_name | ||
| ) | ||
|
|
||
| def _current_positions_by_name( |
There was a problem hiding this comment.
this function is basically a noop check, remove
| ) -> bool: | ||
| """Preview planned path in Meshcat.""" | ||
| return _client.preview_path(duration, robot_name, target_fps) | ||
| """Preview the last generated plan in Meshcat.""" |
There was a problem hiding this comment.
| """Preview the last generated plan in Meshcat.""" | |
| """Preview the last generated plan in Visualizer.""" |
| """Dual-arm coordinator blueprints with trajectory control. | ||
|
|
||
| Usage: | ||
| dimos run coordinator-dual-mock # Mock 7+6 DOF arms |
There was a problem hiding this comment.
ok we shouldn't remove this... bring it back
| return fallback_result | ||
| return _failure(IKStatus.NO_SOLUTION, f"Pink IK failed after {max_attempts} attempts") | ||
|
|
||
| def _solve_single( |
There was a problem hiding this comment.
again, solve single should be thin wrapper of the more general case (solve pose target)
| planning_groups: list[PlanningGroupInfo] | ||
|
|
||
|
|
||
| class PlanningGroupInfo(TypedDict): |
There was a problem hiding this comment.
this is just duplication... why not use original type?
| def _selected_joint_state(self, group_ids: tuple[PlanningGroupID, ...]) -> JointState | None: | ||
| """Collect current state for exactly the selected global joints.""" | ||
| assert self._world_monitor is not None | ||
| selection = self._world_monitor.planning_groups.select(group_ids) | ||
|
|
||
| robot_ids_by_name: dict[RobotName, WorldRobotID] = {} | ||
| for robot_name in selection.robot_names: | ||
| try: | ||
| robot_ids_by_name[robot_name] = self._robots[robot_name][0] | ||
| except KeyError: | ||
| logger.error("Robot '%s' is not registered", robot_name) | ||
| return None | ||
|
|
||
| current_by_robot: dict[RobotName, dict[str, float]] = {} | ||
| for robot_name, robot_id in robot_ids_by_name.items(): | ||
| current = self._world_monitor.get_current_joint_state(robot_id) | ||
| if current is None: | ||
| logger.error("No joint state for robot '%s'", robot_name) | ||
| return None | ||
| indexed_current = self._current_positions_by_name(robot_name, current) | ||
| if indexed_current is None: | ||
| return None | ||
| current_by_robot[robot_name] = indexed_current | ||
|
|
||
| names: list[str] = [] | ||
| positions: list[float] = [] | ||
| for group in selection.groups: | ||
| robot_state = current_by_robot[group.robot_name] | ||
| for resolved_name, local_name in zip( | ||
| group.joint_names, group.local_joint_names, strict=True | ||
| ): | ||
| if local_name not in robot_state: | ||
| logger.error("Current state missing selected joint '%s'", resolved_name) | ||
| return None | ||
| position = robot_state[local_name] | ||
| names.append(resolved_name) | ||
| positions.append(position) | ||
|
|
||
| return JointState(name=names, position=positions) |
There was a problem hiding this comment.
really can't simplify this?
| } | ||
|
|
||
| def evaluate_pose_target(self, pose: Pose, robot_name: RobotName) -> TargetEvaluation: | ||
| def evaluate_pose_target( |
There was a problem hiding this comment.
maybe we should just remove this and use IK + collision checking function instead of the all-in-one function
| logger.error(f"No coordinator_task_name for '{robot_name}'") | ||
| @rpc | ||
| def execute_plan( | ||
| self, plan: GeneratedPlan | None = None, robot_name: RobotName | None = None |
There was a problem hiding this comment.
execute a plan with only one robot makes zero sense. a plan is a whole and should always be executed together, just remove the robot_name variable
| def execute_plan( | ||
| self, plan: GeneratedPlan | None = None, robot_name: RobotName | None = None | ||
| ) -> bool: | ||
| """Project and execute a generated plan through affected trajectory tasks.""" |
There was a problem hiding this comment.
| """Project and execute a generated plan through affected trajectory tasks.""" | |
| """Project and execute a generated plan through affected trajectory tasks. TODO: proper time parametrization """ |
Summary
Verification
Notes