From 034339eb0eb270f2414a76fa20cf6817436efa5b Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Fri, 29 Nov 2024 16:21:52 +0100 Subject: [PATCH 1/8] added new eval_robot.py script to add human intervention in rollout --- lerobot/configs/robot/so100.yaml | 2 +- lerobot/scripts/eval_robot.py | 637 +++++++++++++++++++++++++++++++ 2 files changed, 638 insertions(+), 1 deletion(-) create mode 100644 lerobot/scripts/eval_robot.py diff --git a/lerobot/configs/robot/so100.yaml b/lerobot/configs/robot/so100.yaml index ec6f3e3fe..0978de64e 100644 --- a/lerobot/configs/robot/so100.yaml +++ b/lerobot/configs/robot/so100.yaml @@ -18,7 +18,7 @@ max_relative_target: null leader_arms: main: _target_: lerobot.common.robot_devices.motors.feetech.FeetechMotorsBus - port: /dev/tty.usbmodem585A0077581 + port: /dev/tty.usbmodem58760433331 motors: # name: (index, model) shoulder_pan: [1, "sts3215"] diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_robot.py new file mode 100644 index 000000000..c467b902e --- /dev/null +++ b/lerobot/scripts/eval_robot.py @@ -0,0 +1,637 @@ +#!/usr/bin/env python + +# Copyright 2024 The HuggingFace Inc. team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Evaluate a policy on an environment by running rollouts and computing metrics. + +Usage examples: + +You want to evaluate a model from the hub (eg: https://huggingface.co/lerobot/diffusion_pusht) +for 10 episodes. + +``` +python lerobot/scripts/eval.py -p lerobot/diffusion_pusht eval.n_episodes=10 +``` + +OR, you want to evaluate a model checkpoint from the LeRobot training script for 10 episodes. + +``` +python lerobot/scripts/eval.py \ + -p outputs/train/diffusion_pusht/checkpoints/005000/pretrained_model \ + eval.n_episodes=10 +``` + +Note that in both examples, the repo/folder should contain at least `config.json`, `config.yaml` and +`model.safetensors`. + +Note the formatting for providing the number of episodes. Generally, you may provide any number of arguments +with `qualified.parameter.name=value`. In this case, the parameter eval.n_episodes appears as `n_episodes` +nested under `eval` in the `config.yaml` found at +https://huggingface.co/lerobot/diffusion_pusht/tree/main. +""" + +import argparse +import json +import logging +import threading +import time +from contextlib import nullcontext +from copy import deepcopy +from datetime import datetime as dt +from pathlib import Path +from typing import Callable + +import einops +import gymnasium as gym +import numpy as np +import torch +from huggingface_hub import snapshot_download +from huggingface_hub.errors import RepositoryNotFoundError +from huggingface_hub.utils._validators import HFValidationError +from torch import Tensor, nn +from tqdm import trange + +from lerobot.common.datasets.factory import make_dataset +from lerobot.common.logger import log_output_dir +from lerobot.common.policies.factory import make_policy +from lerobot.common.policies.policy_protocol import Policy +from lerobot.common.policies.utils import get_device_from_parameters +from lerobot.common.utils.io_utils import write_video +from lerobot.common.utils.utils import ( + get_safe_torch_device, + init_hydra_config, + init_logging, + inside_slurm, + set_global_seed, +) + +from lerobot.common.robot_devices.robots.factory import make_robot, Robot +from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot +from lerobot.scripts.eval import get_pretrained_policy_path +from lerobot.common.utils.utils import log_say +from lerobot.common.robot_devices.control_utils import is_headless, predict_action, busy_wait + + +def rollout( + robot: Robot, + policy: Policy, + fps: int, + control_time_s: float = 20, + num_rollouts: int = 2, + use_amp: bool = True + +) -> dict: + """Run a batched policy rollout once through a batch of environments. + + Note that all environments in the batch are run until the last environment is done. This means some + data will probably need to be discarded (for environments that aren't the first one to be done). + + The return dictionary contains: + (optional) "observation": A a dictionary of (batch, sequence + 1, *) tensors mapped to observation + keys. NOTE the that this has an extra sequence element relative to the other keys in the + dictionary. This is because an extra observation is included for after the environment is + terminated or truncated. + "action": A (batch, sequence, action_dim) tensor of actions applied based on the observations (not + including the last observations). + "reward": A (batch, sequence) tensor of rewards received for applying the actions. + "success": A (batch, sequence) tensor of success conditions (the only time this can be True is upon + environment termination/truncation). + "done": A (batch, sequence) tensor of **cumulative** done conditions. For any given batch element, + the first True is followed by True's all the way till the end. This can be used for masking + extraneous elements from the sequences above. + + Args: + robot: + policy: The policy. Must be a PyTorch nn module. + + Returns: + The dictionary described above. + """ + #assert isinstance(policy, nn.Module), "Policy must be a PyTorch nn module." + #device = get_device_from_parameters(policy) + + # define keyboard listener + listener, events = init_keyboard_listener() + + # Reset the policy. + #policy.reset() + + # Get observation from real robot + observation = robot.capture_observation() + + # Calculate reward + # in HIL-SERL it will be with a reward classifier + reward = calculate_reward(observation) + all_observations = [] + all_actions = [] + all_rewards = [] + + step = 0 + while step < num_rollouts: + start_episode_t = time.perf_counter() + timestamp = 0.0 + while timestamp < control_time_s: + start_loop_t = time.perf_counter() + + all_observations.append(deepcopy(observation)) + #observation = {key: observation[key].to(device, non_blocking=True) for key in observation} + + # Apply the next action. + while events["pause_policy"] and not events["human_intervention_step"]: + busy_wait(0.5) + + if events["human_intervention_step"]: + # take over the robot's actions + observation, action = robot.teleop_step(record_data=True) + else: + # explore with policy + with torch.inference_mode(): + action = robot.follower_arms["main"].read("Present_Position") + robot.send_action(torch.from_numpy(action)) + #action = predict_action(observation, policy, device, use_amp) + + observation = robot.capture_observation() + # Calculate reward + # in HIL-SERL it will be with a reward classifier + reward = calculate_reward(observation) + + #all_actions.append(torch.from_numpy(action)) + #all_rewards.append(torch.from_numpy(reward)) + + dt_s = time.perf_counter() - start_loop_t + busy_wait(1 / fps - dt_s) + + timestamp = time.perf_counter() - start_episode_t + if events["exit_early"]: + events["exit_early"] = False + events["human_intervention_step"] = False + events["pause_policy"] = False + break + step += 1 + + all_observations.append(deepcopy(observation)) + + # Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors. + ret = {} + #ret = { + # "action": torch.stack(all_actions, dim=1), + # "reward": torch.stack(all_rewards, dim=1), + # "observation": torch.stack([obs[key] for obs in all_observations for key in obs], dim=1) + #} + + listener.stop() + if robot.is_connected: + robot.disconnect() + + return ret + + +def eval_policy( + robot: Robot, + policy: torch.nn.Module, + n_episodes: int, + max_episodes_rendered: int = 0, + videos_dir: Path | None = None, + return_episode_data: bool = False, + start_seed: int | None = None, +) -> dict: + """ + Args: + env: The batch of environments. + policy: The policy. + n_episodes: The number of episodes to evaluate. + max_episodes_rendered: Maximum number of episodes to render into videos. + videos_dir: Where to save rendered videos. + return_episode_data: Whether to return episode data for online training. Incorporates the data into + the "episodes" key of the returned dictionary. + start_seed: The first seed to use for the first individual rollout. For all subsequent rollouts the + seed is incremented by 1. If not provided, the environments are not manually seeded. + Returns: + Dictionary with metrics and data regarding the rollouts. + """ + if max_episodes_rendered > 0 and not videos_dir: + raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.") + + #assert isinstance(policy, Policy) + start = time.time() + #policy.eval() + + # Determine how many batched rollouts we need to get n_episodes. Note that if n_episodes is not evenly + # divisible by env.num_envs we end up discarding some data in the last batch. + #n_batches = n_episodes // env.num_envs + int((n_episodes % env.num_envs) != 0) + + # Keep track of some metrics. + sum_rewards = [] + max_rewards = [] + all_successes = [] + all_seeds = [] + threads = [] # for video saving threads + n_episodes_rendered = 0 # for saving the correct number of videos + + # Callback for visualization. + def render_frame(env: gym.vector.VectorEnv): + # noqa: B023 + if n_episodes_rendered >= max_episodes_rendered: + return + n_to_render_now = min(max_episodes_rendered - n_episodes_rendered, env.num_envs) + if isinstance(env, gym.vector.SyncVectorEnv): + ep_frames.append(np.stack([env.envs[i].render() for i in range(n_to_render_now)])) # noqa: B023 + elif isinstance(env, gym.vector.AsyncVectorEnv): + # Here we must render all frames and discard any we don't need. + ep_frames.append(np.stack(env.call("render")[:n_to_render_now])) + + if max_episodes_rendered > 0: + video_paths: list[str] = [] + + if return_episode_data: + episode_data: dict | None = None + + # we dont want progress bar when we use slurm, since it clutters the logs + #progbar = trange(n_batches, desc="Stepping through eval batches", disable=inside_slurm()) + for batch_ix in progbar: + # Cache frames for rendering videos. Each item will be (b, h, w, c), and the list indexes the rollout + # step. + if max_episodes_rendered > 0: + ep_frames: list[np.ndarray] = [] + + if start_seed is None: + seeds = None + else: + seeds = range( + start_seed + (batch_ix * env.num_envs), start_seed + ((batch_ix + 1) * env.num_envs) + ) + rollout_data = rollout( + env, + policy, + seeds=list(seeds) if seeds else None, + return_observations=return_episode_data, + render_callback=render_frame if max_episodes_rendered > 0 else None, + ) + + # Figure out where in each rollout sequence the first done condition was encountered (results after + # this won't be included). + n_steps = rollout_data["done"].shape[1] + # Note: this relies on a property of argmax: that it returns the first occurrence as a tiebreaker. + done_indices = torch.argmax(rollout_data["done"].to(int), dim=1) + + # Make a mask with shape (batch, n_steps) to mask out rollout data after the first done + # (batch-element-wise). Note the `done_indices + 1` to make sure to keep the data from the done step. + mask = (torch.arange(n_steps) <= einops.repeat(done_indices + 1, "b -> b s", s=n_steps)).int() + # Extend metrics. + batch_sum_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "sum") + sum_rewards.extend(batch_sum_rewards.tolist()) + batch_max_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "max") + max_rewards.extend(batch_max_rewards.tolist()) + batch_successes = einops.reduce((rollout_data["success"] * mask), "b n -> b", "any") + all_successes.extend(batch_successes.tolist()) + if seeds: + all_seeds.extend(seeds) + else: + all_seeds.append(None) + + # FIXME: episode_data is either None or it doesn't exist + if return_episode_data: + this_episode_data = _compile_episode_data( + rollout_data, + done_indices, + start_episode_index=batch_ix * env.num_envs, + start_data_index=(0 if episode_data is None else (episode_data["index"][-1].item() + 1)), + fps=env.unwrapped.metadata["render_fps"], + ) + if episode_data is None: + episode_data = this_episode_data + else: + # Some sanity checks to make sure we are correctly compiling the data. + assert episode_data["episode_index"][-1] + 1 == this_episode_data["episode_index"][0] + assert episode_data["index"][-1] + 1 == this_episode_data["index"][0] + # Concatenate the episode data. + episode_data = {k: torch.cat([episode_data[k], this_episode_data[k]]) for k in episode_data} + + # Maybe render video for visualization. + if max_episodes_rendered > 0 and len(ep_frames) > 0: + batch_stacked_frames = np.stack(ep_frames, axis=1) # (b, t, *) + for stacked_frames, done_index in zip( + batch_stacked_frames, done_indices.flatten().tolist(), strict=False + ): + if n_episodes_rendered >= max_episodes_rendered: + break + + videos_dir.mkdir(parents=True, exist_ok=True) + video_path = videos_dir / f"eval_episode_{n_episodes_rendered}.mp4" + video_paths.append(str(video_path)) + thread = threading.Thread( + target=write_video, + args=( + str(video_path), + stacked_frames[: done_index + 1], # + 1 to capture the last observation + env.unwrapped.metadata["render_fps"], + ), + ) + thread.start() + threads.append(thread) + n_episodes_rendered += 1 + + progbar.set_postfix( + {"running_success_rate": f"{np.mean(all_successes[:n_episodes]).item() * 100:.1f}%"} + ) + + # Wait till all video rendering threads are done. + for thread in threads: + thread.join() + + # Compile eval info. + info = { + "per_episode": [ + { + "episode_ix": i, + "sum_reward": sum_reward, + "max_reward": max_reward, + "success": success, + "seed": seed, + } + for i, (sum_reward, max_reward, success, seed) in enumerate( + zip( + sum_rewards[:n_episodes], + max_rewards[:n_episodes], + all_successes[:n_episodes], + all_seeds[:n_episodes], + strict=True, + ) + ) + ], + "aggregated": { + "avg_sum_reward": float(np.nanmean(sum_rewards[:n_episodes])), + "avg_max_reward": float(np.nanmean(max_rewards[:n_episodes])), + "pc_success": float(np.nanmean(all_successes[:n_episodes]) * 100), + "eval_s": time.time() - start, + "eval_ep_s": (time.time() - start) / n_episodes, + }, + } + + if return_episode_data: + info["episodes"] = episode_data + + if max_episodes_rendered > 0: + info["video_paths"] = video_paths + + return info + + +def _compile_episode_data( + rollout_data: dict, done_indices: Tensor, start_episode_index: int, start_data_index: int, fps: float +) -> dict: + """Convenience function for `eval_policy(return_episode_data=True)` + + Compiles all the rollout data into a Hugging Face dataset. + + Similar logic is implemented when datasets are pushed to hub (see: `push_to_hub`). + """ + ep_dicts = [] + total_frames = 0 + for ep_ix in range(rollout_data["action"].shape[0]): + # + 2 to include the first done frame and the last observation frame. + num_frames = done_indices[ep_ix].item() + 2 + total_frames += num_frames + + # Here we do `num_frames - 1` as we don't want to include the last observation frame just yet. + ep_dict = { + "action": rollout_data["action"][ep_ix, : num_frames - 1], + "episode_index": torch.tensor([start_episode_index + ep_ix] * (num_frames - 1)), + "frame_index": torch.arange(0, num_frames - 1, 1), + "timestamp": torch.arange(0, num_frames - 1, 1) / fps, + "next.done": rollout_data["done"][ep_ix, : num_frames - 1], + "next.success": rollout_data["success"][ep_ix, : num_frames - 1], + "next.reward": rollout_data["reward"][ep_ix, : num_frames - 1].type(torch.float32), + } + + # For the last observation frame, all other keys will just be copy padded. + for k in ep_dict: + ep_dict[k] = torch.cat([ep_dict[k], ep_dict[k][-1:]]) + + for key in rollout_data["observation"]: + ep_dict[key] = rollout_data["observation"][key][ep_ix, :num_frames] + + ep_dicts.append(ep_dict) + + data_dict = {} + for key in ep_dicts[0]: + data_dict[key] = torch.cat([x[key] for x in ep_dicts]) + + data_dict["index"] = torch.arange(start_data_index, start_data_index + total_frames, 1) + + return data_dict + + +def main( + robot_path, + robot_overrides, + pretrained_policy_path: Path | None = None, + hydra_cfg_path: str | None = None, + out_dir: str | None = None, + config_overrides: list[str] | None = None, +): + assert (pretrained_policy_path is None) ^ (hydra_cfg_path is None) + if pretrained_policy_path is not None: + hydra_cfg = init_hydra_config(str(pretrained_policy_path / "config.yaml"), config_overrides) + else: + hydra_cfg = init_hydra_config(hydra_cfg_path, config_overrides) + + if hydra_cfg.eval.batch_size > hydra_cfg.eval.n_episodes: + raise ValueError( + "The eval batch size is greater than the number of eval episodes " + f"({hydra_cfg.eval.batch_size} > {hydra_cfg.eval.n_episodes}). As a result, {hydra_cfg.eval.batch_size} " + f"eval environments will be instantiated, but only {hydra_cfg.eval.n_episodes} will be used. " + "This might significantly slow down evaluation. To fix this, you should update your command " + f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={hydra_cfg.eval.batch_size}`), " + f"or lower the batch size (e.g. `eval.batch_size={hydra_cfg.eval.n_episodes}`)." + ) + + if out_dir is None: + out_dir = f"outputs/eval/{dt.now().strftime('%Y-%m-%d/%H-%M-%S')}_{hydra_cfg.env.name}_{hydra_cfg.policy.name}" + + # Check device is available + device = get_safe_torch_device(hydra_cfg.device, log=True) + + torch.backends.cudnn.benchmark = True + torch.backends.cuda.matmul.allow_tf32 = True + set_global_seed(hydra_cfg.seed) + + log_output_dir(out_dir) + + logging.info("Making environment.") + robot_cfg = init_hydra_config(robot_path, robot_overrides) + robot = make_robot(robot_cfg) + if not robot.is_connected: + robot.connect() + + logging.info("Making policy.") + if hydra_cfg_path is None: + policy = make_policy(hydra_cfg=hydra_cfg, pretrained_policy_name_or_path=str(pretrained_policy_path)) + else: + # Note: We need the dataset stats to pass to the policy's normalization modules. + policy = make_policy(hydra_cfg=hydra_cfg, dataset_stats=make_dataset(hydra_cfg).meta.stats) + + assert isinstance(policy, nn.Module) + policy.eval() + + with torch.no_grad(), torch.autocast(device_type=device.type) if hydra_cfg.use_amp else nullcontext(): + info = eval_policy( + robot, + policy, + 10,#hydra_cfg.eval.n_episodes, + max_episodes_rendered=10, + videos_dir=Path(out_dir) / "videos", + #start_seed=hydra_cfg.seed, + ) + print(info["aggregated"]) + + # Save info + with open(Path(out_dir) / "eval_info.json", "w") as f: + json.dump(info, f, indent=2) + + logging.info("End of eval") + if robot.is_connected: + robot.disconnect() + +def calculate_reward(observation): + """ + Method to calculate reward function in some way. + In HIL-SERL this is done through defining a reward classifier + """ + #reward = reward_classifier(observation) + return 0. + +def init_keyboard_listener(): + # Allow to exit early while recording an episode or resetting the environment, + # by tapping the right arrow key '->'. This might require a sudo permission + # to allow your terminal to monitor keyboard events. + events = {} + events["exit_early"] = False + events["rerecord_episode"] = False + events["stop_recording"] = False + events["pause_policy"] = False + events["human_intervention_step"] = False + + if is_headless(): + logging.warning( + "Headless environment detected. On-screen cameras display and keyboard inputs will not be available." + ) + listener = None + return listener, events + + # Only import pynput if not in a headless environment + from pynput import keyboard + + def on_press(key): + try: + if key == keyboard.Key.right: + print("Right arrow key pressed. Exiting loop...") + events["exit_early"] = True + elif key == keyboard.Key.left: + print("Left arrow key pressed. Exiting loop and rerecord the last episode...") + events["rerecord_episode"] = True + events["exit_early"] = True + elif key == keyboard.Key.esc: + print("Escape key pressed. Stopping data recording...") + events["stop_recording"] = True + events["exit_early"] = True + elif key == keyboard.Key.space: + # check if first space press then pause the policy for the user to get ready + # if second space press then the user is ready to start intervention + if not events["pause_policy"]: + print("Space key pressed. Human intervention required.\n" \ + "Place the leader in similar pose to the follower and press space again.") + events["pause_policy"] = True + log_say("Get ready to take over.", play_sounds=True) + else: + events["human_intervention_step"] = True + log_say("Starting human intervention.", play_sounds=True) + + except Exception as e: + print(f"Error handling key press: {e}") + + listener = keyboard.Listener(on_press=on_press) + listener.start() + + return listener, events + + +if __name__ == "__main__": + init_logging() + + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "--robot-path", + type=str, + default="lerobot/configs/robot/koch.yaml", + help="Path to robot yaml file used to instantiate the robot using `make_robot` factory function.", + ) + group.add_argument( + "--robot-overrides", + type=str, + nargs="*", + help="Any key=value arguments to override config values (use dots for.nested=overrides)", + ) + group.add_argument( + "-p", + "--pretrained-policy-name-or-path", + help=( + "Either the repo ID of a model hosted on the Hub or a path to a directory containing weights " + "saved using `Policy.save_pretrained`. If not provided, the policy is initialized from scratch " + "(useful for debugging). This argument is mutually exclusive with `--config`." + ), + ) + group.add_argument( + "--config", + help=( + "Path to a yaml config you want to use for initializing a policy from scratch (useful for " + "debugging). This argument is mutually exclusive with `--pretrained-policy-name-or-path` (`-p`)." + ), + ) + parser.add_argument("--revision", help="Optionally provide the Hugging Face Hub revision ID.") + parser.add_argument( + "--out-dir", + help=( + "Where to save the evaluation outputs. If not provided, outputs are saved in " + "outputs/eval/{timestamp}_{env_name}_{policy_name}" + ), + ) + parser.add_argument( + "overrides", + nargs="*", + help="Any key=value arguments to override config values (use dots for.nested=overrides)", + ) + args = parser.parse_args() + + robot_cfg = init_hydra_config(args.robot_path, args.robot_overrides) + robot = make_robot(robot_cfg) + if not robot.is_connected: + robot.connect() + + rollout(robot, None, fps=40, control_time_s=100, num_rollouts=10) + #if args.pretrained_policy_name_or_path is None: + # main(hydra_cfg_path=args.config, out_dir=args.out_dir, config_overrides=args.overrides) + #else: + # pretrained_policy_path = get_pretrained_policy_path( + # arg s.pretrained_policy_name_or_path, revision=args.revision + # ) + + # main( + # pretrained_policy_path=pretrained_policy_path, + # out_dir=args.out_dir, + # config_overrides=args.overrides, + # ) From 90091cf86f2bae8ad2242551306c0732cedff2b0 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 2 Dec 2024 20:26:32 +0100 Subject: [PATCH 2/8] fixed eval_policy function in eval_robot.py --- lerobot/configs/robot/koch.yaml | 4 +- lerobot/scripts/eval_robot.py | 448 +++++++------------------------- 2 files changed, 92 insertions(+), 360 deletions(-) diff --git a/lerobot/configs/robot/koch.yaml b/lerobot/configs/robot/koch.yaml index 40969dc73..334db8302 100644 --- a/lerobot/configs/robot/koch.yaml +++ b/lerobot/configs/robot/koch.yaml @@ -10,7 +10,7 @@ max_relative_target: null leader_arms: main: _target_: lerobot.common.robot_devices.motors.dynamixel.DynamixelMotorsBus - port: /dev/tty.usbmodem575E0031751 + port: /dev/tty.usbmodem58760430441 motors: # name: (index, model) shoulder_pan: [1, "xl330-m077"] @@ -23,7 +23,7 @@ leader_arms: follower_arms: main: _target_: lerobot.common.robot_devices.motors.dynamixel.DynamixelMotorsBus - port: /dev/tty.usbmodem575E0032081 + port: /dev/tty.usbmodem585A0083391 motors: # name: (index, model) shoulder_pan: [1, "xl430-w250"] diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_robot.py index c467b902e..3185688b6 100644 --- a/lerobot/scripts/eval_robot.py +++ b/lerobot/scripts/eval_robot.py @@ -42,42 +42,26 @@ """ import argparse -import json import logging import threading import time -from contextlib import nullcontext from copy import deepcopy -from datetime import datetime as dt from pathlib import Path -from typing import Callable import einops -import gymnasium as gym import numpy as np import torch -from huggingface_hub import snapshot_download -from huggingface_hub.errors import RepositoryNotFoundError -from huggingface_hub.utils._validators import HFValidationError from torch import Tensor, nn from tqdm import trange -from lerobot.common.datasets.factory import make_dataset -from lerobot.common.logger import log_output_dir from lerobot.common.policies.factory import make_policy from lerobot.common.policies.policy_protocol import Policy -from lerobot.common.policies.utils import get_device_from_parameters -from lerobot.common.utils.io_utils import write_video from lerobot.common.utils.utils import ( - get_safe_torch_device, init_hydra_config, init_logging, - inside_slurm, - set_global_seed, ) from lerobot.common.robot_devices.robots.factory import make_robot, Robot -from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot from lerobot.scripts.eval import get_pretrained_policy_path from lerobot.common.utils.utils import log_say from lerobot.common.robot_devices.control_utils import is_headless, predict_action, busy_wait @@ -88,7 +72,6 @@ def rollout( policy: Policy, fps: int, control_time_s: float = 20, - num_rollouts: int = 2, use_amp: bool = True ) -> dict: @@ -136,63 +119,66 @@ def rollout( all_observations = [] all_actions = [] all_rewards = [] - - step = 0 - while step < num_rollouts: - start_episode_t = time.perf_counter() - timestamp = 0.0 - while timestamp < control_time_s: - start_loop_t = time.perf_counter() - - all_observations.append(deepcopy(observation)) - #observation = {key: observation[key].to(device, non_blocking=True) for key in observation} - - # Apply the next action. - while events["pause_policy"] and not events["human_intervention_step"]: - busy_wait(0.5) - - if events["human_intervention_step"]: - # take over the robot's actions - observation, action = robot.teleop_step(record_data=True) - else: - # explore with policy - with torch.inference_mode(): - action = robot.follower_arms["main"].read("Present_Position") - robot.send_action(torch.from_numpy(action)) - #action = predict_action(observation, policy, device, use_amp) - - observation = robot.capture_observation() - # Calculate reward - # in HIL-SERL it will be with a reward classifier - reward = calculate_reward(observation) + all_successes = [] - #all_actions.append(torch.from_numpy(action)) - #all_rewards.append(torch.from_numpy(reward)) + start_episode_t = time.perf_counter() + timestamp = 0.0 + while timestamp < control_time_s: + start_loop_t = time.perf_counter() - dt_s = time.perf_counter() - start_loop_t - busy_wait(1 / fps - dt_s) - - timestamp = time.perf_counter() - start_episode_t - if events["exit_early"]: - events["exit_early"] = False - events["human_intervention_step"] = False - events["pause_policy"] = False - break - step += 1 - all_observations.append(deepcopy(observation)) - - # Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors. - ret = {} - #ret = { - # "action": torch.stack(all_actions, dim=1), - # "reward": torch.stack(all_rewards, dim=1), - # "observation": torch.stack([obs[key] for obs in all_observations for key in obs], dim=1) - #} + #observation = {key: observation[key].to(device, non_blocking=True) for key in observation} + + # Apply the next action. + while events["pause_policy"] and not events["human_intervention_step"]: + busy_wait(0.5) + + if events["human_intervention_step"]: + # take over the robot's actions + observation, action = robot.teleop_step(record_data=True) + action = action['action'] # teleop step returns torch tensors but in a dict + else: + # explore with policy + with torch.inference_mode(): + action = robot.follower_arms["main"].read("Present_Position") + action = torch.from_numpy(action) + robot.send_action(action) + #action = predict_action(observation, policy, device, use_amp) + + observation = robot.capture_observation() + # Calculate reward + # in HIL-SERL it will be with a reward classifier + reward = calculate_reward(observation) + + all_actions.append(action) + all_rewards.append(torch.from_numpy(reward)) + all_successes.append(torch.tensor([False])) + + dt_s = time.perf_counter() - start_loop_t + busy_wait(1 / fps - dt_s) + timestamp = time.perf_counter() - start_episode_t + if events["exit_early"]: + events["exit_early"] = False + events["human_intervention_step"] = False + events["pause_policy"] = False + break + all_observations.append(deepcopy(observation)) + + dones = torch.tensor([False] * len(all_actions)) + dones[-1] = True + # Stack the sequence along the first dimension so that we have (batch, sequence, *) tensors. + ret = { + "action": torch.stack(all_actions, dim=1), + "next.reward": torch.stack(all_rewards, dim=1), + "next.success": torch.stack(all_successes, dim=1), + "done": dones, + } + stacked_observations = {} + for key in all_observations[0]: + stacked_observations[key] = torch.stack([obs[key] for obs in all_observations], dim=1) + ret["observation"] = stacked_observations listener.stop() - if robot.is_connected: - robot.disconnect() return ret @@ -200,317 +186,82 @@ def rollout( def eval_policy( robot: Robot, policy: torch.nn.Module, + fps: float, n_episodes: int, - max_episodes_rendered: int = 0, - videos_dir: Path | None = None, - return_episode_data: bool = False, - start_seed: int | None = None, + control_time_s: int = 20, + use_amp: bool = True ) -> dict: """ Args: env: The batch of environments. policy: The policy. n_episodes: The number of episodes to evaluate. - max_episodes_rendered: Maximum number of episodes to render into videos. - videos_dir: Where to save rendered videos. - return_episode_data: Whether to return episode data for online training. Incorporates the data into - the "episodes" key of the returned dictionary. - start_seed: The first seed to use for the first individual rollout. For all subsequent rollouts the - seed is incremented by 1. If not provided, the environments are not manually seeded. Returns: Dictionary with metrics and data regarding the rollouts. """ - if max_episodes_rendered > 0 and not videos_dir: - raise ValueError("If max_episodes_rendered > 0, videos_dir must be provided.") - + # TODO (michel-aractingi) comment this out for testing with a fixed policy #assert isinstance(policy, Policy) - start = time.time() #policy.eval() - # Determine how many batched rollouts we need to get n_episodes. Note that if n_episodes is not evenly - # divisible by env.num_envs we end up discarding some data in the last batch. - #n_batches = n_episodes // env.num_envs + int((n_episodes % env.num_envs) != 0) - - # Keep track of some metrics. sum_rewards = [] max_rewards = [] - all_successes = [] - all_seeds = [] - threads = [] # for video saving threads - n_episodes_rendered = 0 # for saving the correct number of videos - - # Callback for visualization. - def render_frame(env: gym.vector.VectorEnv): - # noqa: B023 - if n_episodes_rendered >= max_episodes_rendered: - return - n_to_render_now = min(max_episodes_rendered - n_episodes_rendered, env.num_envs) - if isinstance(env, gym.vector.SyncVectorEnv): - ep_frames.append(np.stack([env.envs[i].render() for i in range(n_to_render_now)])) # noqa: B023 - elif isinstance(env, gym.vector.AsyncVectorEnv): - # Here we must render all frames and discard any we don't need. - ep_frames.append(np.stack(env.call("render")[:n_to_render_now])) - - if max_episodes_rendered > 0: - video_paths: list[str] = [] - - if return_episode_data: - episode_data: dict | None = None - - # we dont want progress bar when we use slurm, since it clutters the logs - #progbar = trange(n_batches, desc="Stepping through eval batches", disable=inside_slurm()) - for batch_ix in progbar: - # Cache frames for rendering videos. Each item will be (b, h, w, c), and the list indexes the rollout - # step. - if max_episodes_rendered > 0: - ep_frames: list[np.ndarray] = [] - - if start_seed is None: - seeds = None - else: - seeds = range( - start_seed + (batch_ix * env.num_envs), start_seed + ((batch_ix + 1) * env.num_envs) - ) + successes = [] + rollouts = [] + + start_eval = time.perf_counter() + progbar = trange(n_episodes, desc="Evaluating policy on real robot") + for batch_idx in progbar: + rollout_data = rollout( - env, + robot, policy, - seeds=list(seeds) if seeds else None, - return_observations=return_episode_data, - render_callback=render_frame if max_episodes_rendered > 0 else None, - ) - - # Figure out where in each rollout sequence the first done condition was encountered (results after - # this won't be included). - n_steps = rollout_data["done"].shape[1] - # Note: this relies on a property of argmax: that it returns the first occurrence as a tiebreaker. - done_indices = torch.argmax(rollout_data["done"].to(int), dim=1) - - # Make a mask with shape (batch, n_steps) to mask out rollout data after the first done - # (batch-element-wise). Note the `done_indices + 1` to make sure to keep the data from the done step. - mask = (torch.arange(n_steps) <= einops.repeat(done_indices + 1, "b -> b s", s=n_steps)).int() - # Extend metrics. - batch_sum_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "sum") - sum_rewards.extend(batch_sum_rewards.tolist()) - batch_max_rewards = einops.reduce((rollout_data["reward"] * mask), "b n -> b", "max") - max_rewards.extend(batch_max_rewards.tolist()) - batch_successes = einops.reduce((rollout_data["success"] * mask), "b n -> b", "any") - all_successes.extend(batch_successes.tolist()) - if seeds: - all_seeds.extend(seeds) - else: - all_seeds.append(None) - - # FIXME: episode_data is either None or it doesn't exist - if return_episode_data: - this_episode_data = _compile_episode_data( - rollout_data, - done_indices, - start_episode_index=batch_ix * env.num_envs, - start_data_index=(0 if episode_data is None else (episode_data["index"][-1].item() + 1)), - fps=env.unwrapped.metadata["render_fps"], - ) - if episode_data is None: - episode_data = this_episode_data - else: - # Some sanity checks to make sure we are correctly compiling the data. - assert episode_data["episode_index"][-1] + 1 == this_episode_data["episode_index"][0] - assert episode_data["index"][-1] + 1 == this_episode_data["index"][0] - # Concatenate the episode data. - episode_data = {k: torch.cat([episode_data[k], this_episode_data[k]]) for k in episode_data} - - # Maybe render video for visualization. - if max_episodes_rendered > 0 and len(ep_frames) > 0: - batch_stacked_frames = np.stack(ep_frames, axis=1) # (b, t, *) - for stacked_frames, done_index in zip( - batch_stacked_frames, done_indices.flatten().tolist(), strict=False - ): - if n_episodes_rendered >= max_episodes_rendered: - break - - videos_dir.mkdir(parents=True, exist_ok=True) - video_path = videos_dir / f"eval_episode_{n_episodes_rendered}.mp4" - video_paths.append(str(video_path)) - thread = threading.Thread( - target=write_video, - args=( - str(video_path), - stacked_frames[: done_index + 1], # + 1 to capture the last observation - env.unwrapped.metadata["render_fps"], - ), - ) - thread.start() - threads.append(thread) - n_episodes_rendered += 1 - - progbar.set_postfix( - {"running_success_rate": f"{np.mean(all_successes[:n_episodes]).item() * 100:.1f}%"} + fps, + control_time_s, + use_amp ) - - # Wait till all video rendering threads are done. - for thread in threads: - thread.join() - - # Compile eval info. + + rollouts.append(rollout_data) + sum_rewards.append(sum(rollout_data['next.reward'])) + max_rewards.append(max(rollout_data['next.reward'])) + successes.append(rollout_data["next.success"][-1]) + info = { "per_episode": [ { "episode_ix": i, "sum_reward": sum_reward, "max_reward": max_reward, - "success": success, - "seed": seed, + "pc_success": float(np.nanmean(successes[:n_episodes]) * 100), } - for i, (sum_reward, max_reward, success, seed) in enumerate( + for i, (sum_reward, max_reward) in enumerate( zip( sum_rewards[:n_episodes], max_rewards[:n_episodes], - all_successes[:n_episodes], - all_seeds[:n_episodes], - strict=True, + successes[:n_episodes] ) ) ], "aggregated": { "avg_sum_reward": float(np.nanmean(sum_rewards[:n_episodes])), "avg_max_reward": float(np.nanmean(max_rewards[:n_episodes])), - "pc_success": float(np.nanmean(all_successes[:n_episodes]) * 100), - "eval_s": time.time() - start, - "eval_ep_s": (time.time() - start) / n_episodes, + "pc_success": float(np.mean(successes) * 100), + "eval_s": time.time() - start_eval, + "eval_ep_s": (time.time() - start_eval) / n_episodes, }, } - if return_episode_data: - info["episodes"] = episode_data - - if max_episodes_rendered > 0: - info["video_paths"] = video_paths - - return info - - -def _compile_episode_data( - rollout_data: dict, done_indices: Tensor, start_episode_index: int, start_data_index: int, fps: float -) -> dict: - """Convenience function for `eval_policy(return_episode_data=True)` - - Compiles all the rollout data into a Hugging Face dataset. - - Similar logic is implemented when datasets are pushed to hub (see: `push_to_hub`). - """ - ep_dicts = [] - total_frames = 0 - for ep_ix in range(rollout_data["action"].shape[0]): - # + 2 to include the first done frame and the last observation frame. - num_frames = done_indices[ep_ix].item() + 2 - total_frames += num_frames - - # Here we do `num_frames - 1` as we don't want to include the last observation frame just yet. - ep_dict = { - "action": rollout_data["action"][ep_ix, : num_frames - 1], - "episode_index": torch.tensor([start_episode_index + ep_ix] * (num_frames - 1)), - "frame_index": torch.arange(0, num_frames - 1, 1), - "timestamp": torch.arange(0, num_frames - 1, 1) / fps, - "next.done": rollout_data["done"][ep_ix, : num_frames - 1], - "next.success": rollout_data["success"][ep_ix, : num_frames - 1], - "next.reward": rollout_data["reward"][ep_ix, : num_frames - 1].type(torch.float32), - } - - # For the last observation frame, all other keys will just be copy padded. - for k in ep_dict: - ep_dict[k] = torch.cat([ep_dict[k], ep_dict[k][-1:]]) - - for key in rollout_data["observation"]: - ep_dict[key] = rollout_data["observation"][key][ep_ix, :num_frames] - - ep_dicts.append(ep_dict) - - data_dict = {} - for key in ep_dicts[0]: - data_dict[key] = torch.cat([x[key] for x in ep_dicts]) - - data_dict["index"] = torch.arange(start_data_index, start_data_index + total_frames, 1) - - return data_dict - - -def main( - robot_path, - robot_overrides, - pretrained_policy_path: Path | None = None, - hydra_cfg_path: str | None = None, - out_dir: str | None = None, - config_overrides: list[str] | None = None, -): - assert (pretrained_policy_path is None) ^ (hydra_cfg_path is None) - if pretrained_policy_path is not None: - hydra_cfg = init_hydra_config(str(pretrained_policy_path / "config.yaml"), config_overrides) - else: - hydra_cfg = init_hydra_config(hydra_cfg_path, config_overrides) - - if hydra_cfg.eval.batch_size > hydra_cfg.eval.n_episodes: - raise ValueError( - "The eval batch size is greater than the number of eval episodes " - f"({hydra_cfg.eval.batch_size} > {hydra_cfg.eval.n_episodes}). As a result, {hydra_cfg.eval.batch_size} " - f"eval environments will be instantiated, but only {hydra_cfg.eval.n_episodes} will be used. " - "This might significantly slow down evaluation. To fix this, you should update your command " - f"to increase the number of episodes to match the batch size (e.g. `eval.n_episodes={hydra_cfg.eval.batch_size}`), " - f"or lower the batch size (e.g. `eval.batch_size={hydra_cfg.eval.n_episodes}`)." - ) - - if out_dir is None: - out_dir = f"outputs/eval/{dt.now().strftime('%Y-%m-%d/%H-%M-%S')}_{hydra_cfg.env.name}_{hydra_cfg.policy.name}" - - # Check device is available - device = get_safe_torch_device(hydra_cfg.device, log=True) - - torch.backends.cudnn.benchmark = True - torch.backends.cuda.matmul.allow_tf32 = True - set_global_seed(hydra_cfg.seed) - - log_output_dir(out_dir) - - logging.info("Making environment.") - robot_cfg = init_hydra_config(robot_path, robot_overrides) - robot = make_robot(robot_cfg) - if not robot.is_connected: - robot.connect() - - logging.info("Making policy.") - if hydra_cfg_path is None: - policy = make_policy(hydra_cfg=hydra_cfg, pretrained_policy_name_or_path=str(pretrained_policy_path)) - else: - # Note: We need the dataset stats to pass to the policy's normalization modules. - policy = make_policy(hydra_cfg=hydra_cfg, dataset_stats=make_dataset(hydra_cfg).meta.stats) - - assert isinstance(policy, nn.Module) - policy.eval() - - with torch.no_grad(), torch.autocast(device_type=device.type) if hydra_cfg.use_amp else nullcontext(): - info = eval_policy( - robot, - policy, - 10,#hydra_cfg.eval.n_episodes, - max_episodes_rendered=10, - videos_dir=Path(out_dir) / "videos", - #start_seed=hydra_cfg.seed, - ) - print(info["aggregated"]) - - # Save info - with open(Path(out_dir) / "eval_info.json", "w") as f: - json.dump(info, f, indent=2) - - logging.info("End of eval") if robot.is_connected: robot.disconnect() + return info + def calculate_reward(observation): """ Method to calculate reward function in some way. In HIL-SERL this is done through defining a reward classifier """ #reward = reward_classifier(observation) - return 0. + return np.array([0.]) def init_keyboard_listener(): # Allow to exit early while recording an episode or resetting the environment, @@ -519,7 +270,6 @@ def init_keyboard_listener(): events = {} events["exit_early"] = False events["rerecord_episode"] = False - events["stop_recording"] = False events["pause_policy"] = False events["human_intervention_step"] = False @@ -542,10 +292,6 @@ def on_press(key): print("Left arrow key pressed. Exiting loop and rerecord the last episode...") events["rerecord_episode"] = True events["exit_early"] = True - elif key == keyboard.Key.esc: - print("Escape key pressed. Stopping data recording...") - events["stop_recording"] = True - events["exit_early"] = True elif key == keyboard.Key.space: # check if first space press then pause the policy for the user to get ready # if second space press then the user is ready to start intervention @@ -553,9 +299,10 @@ def on_press(key): print("Space key pressed. Human intervention required.\n" \ "Place the leader in similar pose to the follower and press space again.") events["pause_policy"] = True - log_say("Get ready to take over.", play_sounds=True) + log_say("Human intervention stage. Get ready to take over.", play_sounds=True) else: events["human_intervention_step"] = True + print("Space key pressed. Human intervention starting.") log_say("Starting human intervention.", play_sounds=True) except Exception as e: @@ -610,11 +357,7 @@ def on_press(key): "outputs/eval/{timestamp}_{env_name}_{policy_name}" ), ) - parser.add_argument( - "overrides", - nargs="*", - help="Any key=value arguments to override config values (use dots for.nested=overrides)", - ) + args = parser.parse_args() robot_cfg = init_hydra_config(args.robot_path, args.robot_overrides) @@ -622,16 +365,5 @@ def on_press(key): if not robot.is_connected: robot.connect() - rollout(robot, None, fps=40, control_time_s=100, num_rollouts=10) - #if args.pretrained_policy_name_or_path is None: - # main(hydra_cfg_path=args.config, out_dir=args.out_dir, config_overrides=args.overrides) - #else: - # pretrained_policy_path = get_pretrained_policy_path( - # arg s.pretrained_policy_name_or_path, revision=args.revision - # ) - - # main( - # pretrained_policy_path=pretrained_policy_path, - # out_dir=args.out_dir, - # config_overrides=args.overrides, - # ) + #rollout(robot, None, fps=40, control_time_s=100) + eval_policy(robot, None, fps=40, n_episodes=20, control_time_s=100) From 8d08f972afb600d3e995dfdb9a4776a1962d68c0 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 9 Dec 2024 11:08:36 +0100 Subject: [PATCH 3/8] Update lerobot/scripts/eval_robot.py Co-authored-by: Yoel --- lerobot/scripts/eval_robot.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_robot.py index 3185688b6..e164a20f4 100644 --- a/lerobot/scripts/eval_robot.py +++ b/lerobot/scripts/eval_robot.py @@ -242,9 +242,9 @@ def eval_policy( ) ], "aggregated": { - "avg_sum_reward": float(np.nanmean(sum_rewards[:n_episodes])), - "avg_max_reward": float(np.nanmean(max_rewards[:n_episodes])), - "pc_success": float(np.mean(successes) * 100), + "avg_sum_reward": float(np.nanmean(torch.cat(sum_rewards[:n_episodes]))), + "avg_max_reward": float(np.nanmean(torch.cat(max_rewards[:n_episodes]))), + "pc_success": float(np.nanmean(torch.cat(successes[:n_episodes])) * 100), "eval_s": time.time() - start_eval, "eval_ep_s": (time.time() - start_eval) / n_episodes, }, From a3c2cdd668c4e12544a728ade342067bd11ce33d Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 9 Dec 2024 17:24:06 +0100 Subject: [PATCH 4/8] Update lerobot/scripts/eval_robot.py Co-authored-by: Yoel --- lerobot/scripts/eval_robot.py | 1 - 1 file changed, 1 deletion(-) diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_robot.py index e164a20f4..e113603cc 100644 --- a/lerobot/scripts/eval_robot.py +++ b/lerobot/scripts/eval_robot.py @@ -237,7 +237,6 @@ def eval_policy( zip( sum_rewards[:n_episodes], max_rewards[:n_episodes], - successes[:n_episodes] ) ) ], From 2269b44d7f5d49610cbc11ce3f6f02735dd43e4d Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 9 Dec 2024 17:24:13 +0100 Subject: [PATCH 5/8] Update lerobot/scripts/eval_robot.py Co-authored-by: Yoel --- lerobot/scripts/eval_robot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_robot.py index e113603cc..d78305b58 100644 --- a/lerobot/scripts/eval_robot.py +++ b/lerobot/scripts/eval_robot.py @@ -231,7 +231,7 @@ def eval_policy( "episode_ix": i, "sum_reward": sum_reward, "max_reward": max_reward, - "pc_success": float(np.nanmean(successes[:n_episodes]) * 100), + "pc_success": float(np.nanmean(torch.cat(successes[:n_episodes])) * 100), } for i, (sum_reward, max_reward) in enumerate( zip( From 08c2abbfb0f5d1e565fa4f06af9fb87e88a320b6 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 9 Dec 2024 17:25:29 +0100 Subject: [PATCH 6/8] eval_robot.py --> eval_on_robot.py --- lerobot/scripts/{eval_robot.py => eval_on_robot.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename lerobot/scripts/{eval_robot.py => eval_on_robot.py} (100%) diff --git a/lerobot/scripts/eval_robot.py b/lerobot/scripts/eval_on_robot.py similarity index 100% rename from lerobot/scripts/eval_robot.py rename to lerobot/scripts/eval_on_robot.py From da353bad3200231020ae94ebab6aa5d911af8a87 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 9 Dec 2024 19:04:52 +0100 Subject: [PATCH 7/8] notes --- lerobot/scripts/eval_on_robot.py | 141 ++++++++++++------------------- 1 file changed, 54 insertions(+), 87 deletions(-) diff --git a/lerobot/scripts/eval_on_robot.py b/lerobot/scripts/eval_on_robot.py index d78305b58..485e79b63 100644 --- a/lerobot/scripts/eval_on_robot.py +++ b/lerobot/scripts/eval_on_robot.py @@ -13,75 +13,44 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Evaluate a policy on an environment by running rollouts and computing metrics. +"""Evaluate a policy by running rollouts on the real robot and computing metrics. -Usage examples: +Usage examples: evaluate a checkpoint from the LeRobot training script for 10 episodes. -You want to evaluate a model from the hub (eg: https://huggingface.co/lerobot/diffusion_pusht) -for 10 episodes. - -``` -python lerobot/scripts/eval.py -p lerobot/diffusion_pusht eval.n_episodes=10 ``` - -OR, you want to evaluate a model checkpoint from the LeRobot training script for 10 episodes. - -``` -python lerobot/scripts/eval.py \ - -p outputs/train/diffusion_pusht/checkpoints/005000/pretrained_model \ +python lerobot/scripts/eval_on_robot.py \ + -p outputs/train/model/checkpoints/005000/pretrained_model \ eval.n_episodes=10 ``` -Note that in both examples, the repo/folder should contain at least `config.json`, `config.yaml` and -`model.safetensors`. - -Note the formatting for providing the number of episodes. Generally, you may provide any number of arguments -with `qualified.parameter.name=value`. In this case, the parameter eval.n_episodes appears as `n_episodes` -nested under `eval` in the `config.yaml` found at -https://huggingface.co/lerobot/diffusion_pusht/tree/main. +**NOTE** (michel-aractingi): This script is incomplete and it is being prepared +for running training on the real robot. """ import argparse import logging -import threading import time from copy import deepcopy -from pathlib import Path -import einops import numpy as np import torch -from torch import Tensor, nn from tqdm import trange -from lerobot.common.policies.factory import make_policy from lerobot.common.policies.policy_protocol import Policy +from lerobot.common.robot_devices.control_utils import busy_wait, is_headless +from lerobot.common.robot_devices.robots.factory import Robot, make_robot from lerobot.common.utils.utils import ( init_hydra_config, init_logging, + log_say, ) -from lerobot.common.robot_devices.robots.factory import make_robot, Robot -from lerobot.scripts.eval import get_pretrained_policy_path -from lerobot.common.utils.utils import log_say -from lerobot.common.robot_devices.control_utils import is_headless, predict_action, busy_wait - -def rollout( - robot: Robot, - policy: Policy, - fps: int, - control_time_s: float = 20, - use_amp: bool = True - -) -> dict: - """Run a batched policy rollout once through a batch of environments. - - Note that all environments in the batch are run until the last environment is done. This means some - data will probably need to be discarded (for environments that aren't the first one to be done). +def rollout(robot: Robot, policy: Policy, fps: int, control_time_s: float = 20, use_amp: bool = True) -> dict: + """Run a batched policy rollout on the real robot. The return dictionary contains: - (optional) "observation": A a dictionary of (batch, sequence + 1, *) tensors mapped to observation + "robot": A a dictionary of (batch, sequence + 1, *) tensors mapped to observation keys. NOTE the that this has an extra sequence element relative to the other keys in the dictionary. This is because an extra observation is included for after the environment is terminated or truncated. @@ -95,56 +64,56 @@ def rollout( extraneous elements from the sequences above. Args: - robot: + robot: The robot class that defines the interface with the real robot. policy: The policy. Must be a PyTorch nn module. - + Returns: The dictionary described above. """ - #assert isinstance(policy, nn.Module), "Policy must be a PyTorch nn module." - #device = get_device_from_parameters(policy) + # assert isinstance(policy, nn.Module), "Policy must be a PyTorch nn module." + # device = get_device_from_parameters(policy) # define keyboard listener listener, events = init_keyboard_listener() - # Reset the policy. - #policy.reset() - + # Reset the policy. TODO (michel-aractingi) add real policy evaluation once the code is ready. + # policy.reset() + # Get observation from real robot observation = robot.capture_observation() - - # Calculate reward + + # Calculate reward. TODO (michel-aractingi) # in HIL-SERL it will be with a reward classifier reward = calculate_reward(observation) all_observations = [] all_actions = [] all_rewards = [] all_successes = [] - + start_episode_t = time.perf_counter() timestamp = 0.0 while timestamp < control_time_s: start_loop_t = time.perf_counter() - + all_observations.append(deepcopy(observation)) - #observation = {key: observation[key].to(device, non_blocking=True) for key in observation} + # observation = {key: observation[key].to(device, non_blocking=True) for key in observation} # Apply the next action. while events["pause_policy"] and not events["human_intervention_step"]: busy_wait(0.5) - + if events["human_intervention_step"]: - # take over the robot's actions + # take over the robot's actions observation, action = robot.teleop_step(record_data=True) - action = action['action'] # teleop step returns torch tensors but in a dict - else: + action = action["action"] # teleop step returns torch tensors but in a dict + else: # explore with policy with torch.inference_mode(): action = robot.follower_arms["main"].read("Present_Position") action = torch.from_numpy(action) robot.send_action(action) - #action = predict_action(observation, policy, device, use_amp) - + # action = predict_action(observation, policy, device, use_amp) + observation = robot.capture_observation() # Calculate reward # in HIL-SERL it will be with a reward classifier @@ -189,7 +158,7 @@ def eval_policy( fps: float, n_episodes: int, control_time_s: int = 20, - use_amp: bool = True + use_amp: bool = True, ) -> dict: """ Args: @@ -200,31 +169,24 @@ def eval_policy( Dictionary with metrics and data regarding the rollouts. """ # TODO (michel-aractingi) comment this out for testing with a fixed policy - #assert isinstance(policy, Policy) - #policy.eval() + # assert isinstance(policy, Policy) + # policy.eval() sum_rewards = [] max_rewards = [] successes = [] - rollouts = [] - + rollouts = [] + start_eval = time.perf_counter() progbar = trange(n_episodes, desc="Evaluating policy on real robot") - for batch_idx in progbar: - - rollout_data = rollout( - robot, - policy, - fps, - control_time_s, - use_amp - ) - + for _batch_idx in progbar: + rollout_data = rollout(robot, policy, fps, control_time_s, use_amp) + rollouts.append(rollout_data) - sum_rewards.append(sum(rollout_data['next.reward'])) - max_rewards.append(max(rollout_data['next.reward'])) + sum_rewards.append(sum(rollout_data["next.reward"])) + max_rewards.append(max(rollout_data["next.reward"])) successes.append(rollout_data["next.success"][-1]) - + info = { "per_episode": [ { @@ -237,6 +199,7 @@ def eval_policy( zip( sum_rewards[:n_episodes], max_rewards[:n_episodes], + strict=False, ) ) ], @@ -254,13 +217,15 @@ def eval_policy( return info + def calculate_reward(observation): """ Method to calculate reward function in some way. In HIL-SERL this is done through defining a reward classifier """ - #reward = reward_classifier(observation) - return np.array([0.]) + # reward = reward_classifier(observation) + return np.array([0.0]) + def init_keyboard_listener(): # Allow to exit early while recording an episode or resetting the environment, @@ -293,16 +258,18 @@ def on_press(key): events["exit_early"] = True elif key == keyboard.Key.space: # check if first space press then pause the policy for the user to get ready - # if second space press then the user is ready to start intervention + # if second space press then the user is ready to start intervention if not events["pause_policy"]: - print("Space key pressed. Human intervention required.\n" \ - "Place the leader in similar pose to the follower and press space again.") + print( + "Space key pressed. Human intervention required.\n" + "Place the leader in similar pose to the follower and press space again." + ) events["pause_policy"] = True log_say("Human intervention stage. Get ready to take over.", play_sounds=True) else: events["human_intervention_step"] = True print("Space key pressed. Human intervention starting.") - log_say("Starting human intervention.", play_sounds=True) + log_say("Starting human intervention.", play_sounds=True) except Exception as e: print(f"Error handling key press: {e}") @@ -358,11 +325,11 @@ def on_press(key): ) args = parser.parse_args() - + robot_cfg = init_hydra_config(args.robot_path, args.robot_overrides) robot = make_robot(robot_cfg) if not robot.is_connected: robot.connect() - #rollout(robot, None, fps=40, control_time_s=100) + # rollout(robot, None, fps=40, control_time_s=100) eval_policy(robot, None, fps=40, n_episodes=20, control_time_s=100) From a0f3eba8e340639731455d1350d0002505375674 Mon Sep 17 00:00:00 2001 From: Michel Aractingi Date: Mon, 9 Dec 2024 19:17:08 +0100 Subject: [PATCH 8/8] updated stats --- lerobot/scripts/eval_on_robot.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lerobot/scripts/eval_on_robot.py b/lerobot/scripts/eval_on_robot.py index 485e79b63..6a790f0a5 100644 --- a/lerobot/scripts/eval_on_robot.py +++ b/lerobot/scripts/eval_on_robot.py @@ -193,12 +193,13 @@ def eval_policy( "episode_ix": i, "sum_reward": sum_reward, "max_reward": max_reward, - "pc_success": float(np.nanmean(torch.cat(successes[:n_episodes])) * 100), + "pc_success": success * 100, } - for i, (sum_reward, max_reward) in enumerate( + for i, (sum_reward, max_reward, success) in enumerate( zip( sum_rewards[:n_episodes], max_rewards[:n_episodes], + successes[:n_episodes], strict=False, ) ) @@ -331,5 +332,4 @@ def on_press(key): if not robot.is_connected: robot.connect() - # rollout(robot, None, fps=40, control_time_s=100) - eval_policy(robot, None, fps=40, n_episodes=20, control_time_s=100) + eval_policy(robot, None, fps=40, n_episodes=2, control_time_s=100)