Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/overview/sim/planners/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The `embodichain` project provides a unified interface for robot trajectory plan

- **MotionGenerator**: A unified trajectory planning interface that supports joint/Cartesian interpolation, automatic constraint handling, flexible planner selection, and is easily extensible for collision checking and additional planners.
- **ToppraPlanner**: A time-optimal trajectory planner based on the TOPPRA library, supporting joint trajectory generation under velocity and acceleration constraints.
- **NeuralPlanner** (experimental): A learning-based EEF waypoint planner for Franka Panda.
- **NeuralPlanner**: An experimental neural motion generation backend for end-effector waypoint trajectories, backed by a trained waypoint-conditioned policy.
- **TrajectorySampleMethod**: An enumeration for trajectory sampling strategies, supporting sampling by time, quantity, or distance.

These tools can be used to generate smooth and dynamically feasible robot trajectories, and are extensible for future collision checking and various sampling requirements.
Expand Down
44 changes: 41 additions & 3 deletions docs/source/overview/sim/planners/motion_generator.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# MotionGenerator

`MotionGenerator` provides a unified interface for robot trajectory planning, supporting both joint space and Cartesian space interpolation. It is designed to work with different planners (such as ToppraPlanner) and can be extended to support collision checking in the future.
`MotionGenerator` provides a unified interface for robot trajectory planning, supporting both joint space and Cartesian space interpolation. It selects a backend planner from `MotionGenCfg.planner_cfg`, such as `ToppraPlanner` for joint waypoint time-parameterization or `NeuralPlanner` for experimental end-effector waypoint motion generation.

## Features

* **Unified planning interface**: Supports trajectory planning with or without collision checking (collision checking is reserved for future implementation).
* **Flexible planner selection**: Supports TOPPRA and NeuralPlanner (experimental).
* **Flexible planner selection**: Allows selection of different planners through `planner_cfg.planner_type`.
* **Automatic constraint handling**: Retrieves velocity and acceleration limits from the robot or uses user-specified/default values.
* **Supports both joint and Cartesian interpolation**: Generates discrete trajectories using either joint space or Cartesian space interpolation.
* **Convenient sampling**: Supports various sampling strategies via `TrajectorySampleMethod`.
Expand Down Expand Up @@ -110,6 +110,43 @@ result = motion_gen.generate(
)
```

#### Neural EEF Waypoint Planning

```python
from embodichain.lab.sim.planners import (
MotionGenCfg,
MotionGenOptions,
MotionGenerator,
MoveType,
NeuralPlannerCfg,
PlanState,
)

motion_gen = MotionGenerator(
cfg=MotionGenCfg(
planner_cfg=NeuralPlannerCfg(
robot_uid="Franka",
checkpoint_path="/path/to/franka.pt",
control_part="main_arm",
)
)
)

target_states = [
PlanState(move_type=MoveType.EEF_MOVE, xpos=target_pose),
]

result = motion_gen.generate(
target_states=target_states,
options=MotionGenOptions(
control_part="main_arm",
start_qpos=start_qpos,
),
)
```

The neural backend only supports `EEF_MOVE` waypoint inputs and currently assumes a compatible 7-DoF checkpoint. It returns a policy rollout rather than a TOPPRA-constrained trajectory.

#### Cartesian Space Planning

```python
Expand Down Expand Up @@ -178,8 +215,9 @@ print(f"Estimated sample count: {sample_count}")

## Notes

* The planner type can be specified as a string or `PlannerType` enum.
* The planner type is selected by `planner_cfg.planner_type`.
* If the robot provides its own joint limits, those will be used; otherwise, default or user-specified limits are applied.
* For Cartesian interpolation, inverse kinematics (IK) is used to compute joint configurations for each interpolated pose.
* In atomic actions, the non-neural EEF path is IK plus joint interpolation. The neural planner is used only when the active planner type is `neural`.
* The class is designed to be extensible for additional planners and collision checking in the future.
* The sample count estimation is useful for predicting computational load and memory requirements.
173 changes: 156 additions & 17 deletions docs/source/overview/sim/planners/neural_planner.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,50 @@
````{admonition} Experimental
:class: warning

`NeuralPlanner` is an **experimental** feature. The API, checkpoint format,
and default parameters may change without a deprecation cycle. It is currently
only validated on the **Franka Panda** robot.
`NeuralPlanner` is an experimental feature. The API, checkpoint format, and
default parameters may change without a deprecation cycle. It is currently
validated only on the Franka Panda checkpoint shipped for neural motion
generation.
````

`NeuralPlanner` is a learning-based EEF waypoint planner. It rolls out a
trained APG checkpoint through `MotionGenerator` to reach Cartesian targets.
`NeuralPlanner` is a learning-based motion generation backend for end-effector
waypoint trajectories. It rolls out a trained waypoint-conditioned APG policy
from the current arm joint state toward one or more target TCP poses.

## Configuration
Unlike `ToppraPlanner`, this planner is not a time-parameterization solver for
joint waypoints. It is intended for `MoveType.EEF_MOVE` inputs and returns a
policy rollout as joint positions plus the corresponding forward-kinematics
poses.

## Features

- **End-effector waypoint planning**: Accepts
`PlanState(move_type=MoveType.EEF_MOVE, xpos=...)` waypoints.
- **MotionGenerator integration**: Select it with
`MotionGenCfg(planner_cfg=NeuralPlannerCfg(...))`.
- **Atomic action integration**: EEF-based atomic actions route through
`TrajectoryBuilder.plan_arm_traj()` and use the neural backend when the active
planner type is `neural` or `neural_refine`.
- **Checkpoint-backed inference**: Loads a trained transformer waypoint
checkpoint and runs it in evaluation mode.

## Usage

Pre-trained checkpoints are hosted on HuggingFace and can be downloaded with
`download_neural_planner_checkpoint()` (requires `HF_TOKEN` environment variable).
`download_neural_planner_checkpoint()`. The repository is gated, so the process
must have access to an authenticated token through `HF_TOKEN` or
`huggingface-cli login` when no local checkpoint is provided.

If you already have `franka.pt`, pass it explicitly or place it at:

```text
~/.cache/embodichain_data/checkpoints/dexforce/neural_motion_generator/franka/franka.pt
```

You can also set `EMBODICHAIN_NEURAL_PLANNER_CHECKPOINT=/path/to/franka.pt`.

```python
from embodichain.data.assets.planner_assets import download_neural_planner_checkpoint
from embodichain.data.assets import download_neural_planner_checkpoint
from embodichain.lab.sim.planners import (
MotionGenCfg,
MotionGenOptions,
Expand All @@ -26,11 +55,10 @@ from embodichain.lab.sim.planners import (
NeuralPlannerCfg,
PlanState,
)
from embodichain.lab.sim.planners.neural_planner import NeuralPlanOptions

checkpoint_path = download_neural_planner_checkpoint()

motion_generator = MotionGenerator(
motion_gen = MotionGenerator(
cfg=MotionGenCfg(
planner_cfg=NeuralPlannerCfg(
robot_uid=robot.uid,
Expand All @@ -40,24 +68,135 @@ motion_generator = MotionGenerator(
)
)

result = motion_generator.generate(
result = motion_gen.generate(
target_states=[
PlanState(move_type=MoveType.EEF_MOVE, xpos=waypoint)
for waypoint in waypoints
],
options=MotionGenOptions(
plan_opts=NeuralPlanOptions(
control_part="main_arm",
start_qpos=start_qpos,
),
control_part="main_arm",
start_qpos=start_qpos,
),
)
```

## Example
## Atomic Actions

When the active planner is `neural`, EEF-based atomic actions such as
`MoveEndEffector`, `PickUp`, `MoveHeldObject`, `Place`, and the down phase of
`Press` call the neural planner through `MotionGenerator.generate()`. The raw
policy rollout is then resampled to the fixed waypoint count requested by the
action config.

When the active planner is `neural_refine`, the atomic-action trajectory builder
uses the neural rollout and appends a final IK-refined waypoint before
resampling. Joint-space actions and joint-space return phases, such as
`MoveJoints` and the return phase of `Press`, continue to use joint
interpolation.

## Limitations

- Only `MoveType.EEF_MOVE` waypoint inputs are supported.
- Current transformer checkpoints assume a compatible arm/checkpoint pair; the
default checkpoint targets Franka Panda 7-DoF.
- Multi-env atomic actions are handled by calling the neural planner once per
environment, not by a batched policy rollout.
- The planner does not enforce TOPPRA-style velocity or acceleration
constraints.
- Collision checking and self-collision constraints are not provided by this
backend.
- Planning failure is explicit; the neural planner does not silently fall back
to IK interpolation or TOPPRA.

## Examples

Run the standalone planner example:

```bash
python examples/sim/planners/neural_planner.py --headless --device cuda
```

The example downloads the checkpoint automatically on first run.
Use a local checkpoint without HuggingFace:

```bash
python examples/sim/planners/neural_planner.py --headless --device cuda --checkpoint-path /path/to/franka.pt
```

The example downloads the checkpoint automatically on first run only when no
local checkpoint is provided or found in the default cache path.

For deciding whether NMG can replace the IK interpolation path in atomic
actions, compare several success notions:

- `action_success`: the atomic action produced a trajectory.
- `strict_pose_success`: the final TCP pose is within the script's strict
threshold, defaulting to 1 mm and 0.05 rad.
- `all_waypoint_strict_success`: every target waypoint is reached by some
trajectory sample within the strict threshold. Use this as the primary planner
quality signal for multi-waypoint NMG comparisons.
- `nmg_threshold_success`: the final TCP pose is within the NMG waypoint
threshold, defaulting to 5 cm and 0.3 rad.
- downstream task success: the simulated task outcome after trajectory replay.

Use `strict_pose_success`, `final_pos_error`, `final_rot_error`, and the
downstream task outcome rather than `action_success` alone.

## Benchmarks

Use the Franka benchmark in three layers:

1. Demo-matched planner benchmark: mirrors this example's fixed start qpos and
compact relative TCP offsets.
2. Reachable-FK planner benchmark: uses a broader bank of FK-generated reachable
target poses.
3. Atomic-action benchmark: runs the Franka `PickUp -> Place -> MoveEndEffector`
integration path with planner-only and optional physical replay modes.

Run the first two layers with `franka_planner.py`:

```bash
PYTHONPATH="$PWD" conda run -n embodichain040 python -m scripts.benchmark.robotics.nmg.franka_planner \
--device cuda \
--planner all \
--trial_source demo_offsets \
--neural_checkpoint franka.pt \
--sample_interval 120
```

```bash
PYTHONPATH="$PWD" conda run -n embodichain040 python -m scripts.benchmark.robotics.nmg.franka_planner \
--device cuda \
--planner all \
--trial_source fk_bank \
--neural_checkpoint franka.pt \
--sample_interval 120
```

Run the downstream atomic-action layer separately:

```bash
PYTHONPATH="$PWD" conda run -n embodichain040 python -m scripts.benchmark.robotics.nmg.franka_pick_place \
--device cuda \
--planner all \
--mode planner \
--neural_checkpoint franka.pt \
--object sugar_box \
--support_surface ground
```

For visual inspection, run the physical layer with `--open_window`. The
`attached` object replay mode is useful when you want to inspect the planned
held-object transform without requiring a successful contact grasp:

```bash
PYTHONPATH="$PWD" conda run -n embodichain040 python -m scripts.benchmark.robotics.nmg.franka_pick_place \
--device cuda \
--planner ik_interpolate \
--mode physical \
--neural_checkpoint franka.pt \
--object sugar_box \
--support_surface ground \
--object_replay_mode attached \
--replay_control target \
--open_window
```
69 changes: 65 additions & 4 deletions embodichain/data/assets/planner_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,82 @@
from __future__ import annotations

import os
from pathlib import Path

from huggingface_hub import hf_hub_download

from embodichain.data.constants import EMBODICHAIN_DEFAULT_DATA_ROOT

# HuggingFace endpoint. Mirrors (e.g. hf-mirror.com) often redirect to the
# real hub without forwarding the required commit-hash response headers, so we
# default to the canonical endpoint and rely on the system proxy when needed.
_HF_ENDPOINT = "https://huggingface.co"
_NEURAL_PLANNER_LOCAL_CHECKPOINT = Path(
"checkpoints/dexforce/neural_motion_generator/franka/franka.pt"
)
NEURAL_PLANNER_CHECKPOINT_ENV = "EMBODICHAIN_NEURAL_PLANNER_CHECKPOINT"

__all__ = [
"NEURAL_PLANNER_CHECKPOINT_ENV",
"download_neural_planner_checkpoint",
"get_default_neural_planner_checkpoint_path",
]


def get_default_neural_planner_checkpoint_path(
data_root: str | os.PathLike[str] | None = None,
) -> str:
"""Return the default local NeuralPlanner checkpoint path."""
root = EMBODICHAIN_DEFAULT_DATA_ROOT if data_root is None else data_root
return str(Path(root).expanduser() / _NEURAL_PLANNER_LOCAL_CHECKPOINT)


def _normalize_existing_checkpoint_path(
checkpoint_path: str | os.PathLike[str],
) -> str:
path = os.path.abspath(os.path.expanduser(os.fspath(checkpoint_path)))
if not os.path.isfile(path):
raise FileNotFoundError(f"NeuralPlanner checkpoint not found: {path}")
return path

__all__ = ["download_neural_planner_checkpoint"]

def _resolve_local_neural_planner_checkpoint(
checkpoint_path: str | os.PathLike[str] | None,
) -> str | None:
if checkpoint_path is not None:
return _normalize_existing_checkpoint_path(checkpoint_path)

env_checkpoint_path = os.environ.get(NEURAL_PLANNER_CHECKPOINT_ENV)
if env_checkpoint_path:
return _normalize_existing_checkpoint_path(env_checkpoint_path)

default_checkpoint_path = get_default_neural_planner_checkpoint_path()
if os.path.isfile(default_checkpoint_path):
return default_checkpoint_path

return None


def download_neural_planner_checkpoint(
repo_id: str = "dexforce/neural_motion_generator",
filename: str = "franka/franka.pt",
token: str | None = None,
endpoint: str = _HF_ENDPOINT,
checkpoint_path: str | os.PathLike[str] | None = None,
) -> str:
"""Download a neural planner checkpoint from HuggingFace.
"""Resolve or download a neural planner checkpoint.

The repository is gated. Either set the ``HF_TOKEN`` environment variable or
run ``huggingface-cli login`` before calling this function.
Local checkpoint resolution is tried first, in this order:

1. The explicit ``checkpoint_path`` argument.
2. The ``EMBODICHAIN_NEURAL_PLANNER_CHECKPOINT`` environment variable.
3. ``~/.cache/embodichain_data/checkpoints/dexforce/neural_motion_generator/franka/franka.pt``
unless ``EMBODICHAIN_DATA_ROOT`` overrides the data root.

If no local checkpoint is found, the checkpoint is downloaded from
HuggingFace. The repository is gated. Either set the ``HF_TOKEN``
environment variable or run ``huggingface-cli login`` before calling this
function.

If your network requires an HTTP proxy, set ``HTTPS_PROXY`` or
``https_proxy`` in the environment before launching Python.
Expand All @@ -50,13 +105,19 @@ def download_neural_planner_checkpoint(
endpoint: HuggingFace-compatible endpoint URL. Defaults to
``https://huggingface.co``. Mirrors that merely redirect to the
real hub are not supported.
checkpoint_path: Optional local checkpoint path. If provided, this path
must exist and HuggingFace is not used.

Returns:
str: Local path to the downloaded checkpoint file.

Raises:
RuntimeError: If the download fails, with authentication instructions.
"""
local_checkpoint_path = _resolve_local_neural_planner_checkpoint(checkpoint_path)
if local_checkpoint_path is not None:
return local_checkpoint_path

# Normalize proxy env vars: the ``requests`` library on Linux requires the
# lowercase form (``https_proxy``), but users typically export the uppercase
# form (``HTTPS_PROXY``).
Expand Down
Loading
Loading