diff --git a/alf/algorithms/data_transformer.py b/alf/algorithms/data_transformer.py index b7e783abd..cc5bfbb1b 100644 --- a/alf/algorithms/data_transformer.py +++ b/alf/algorithms/data_transformer.py @@ -22,10 +22,12 @@ import alf from alf.data_structures import AlgStep, Experience, namedtuple, StepType, TimeStep +from alf.environments import suite_socialbot from alf.experience_replayers.replay_buffer import ReplayBuffer, BatchInfo from alf.nest.utils import convert_device from alf.utils.normalizers import WindowNormalizer, EMNormalizer, AdaptiveNormalizer from alf.utils import common +from alf.utils.math_ops import l2_dist_close_reward_fn from alf.utils.normalizers import ScalarAdaptiveNormalizer FrameStackState = namedtuple('FrameStackState', ['steps', 'prev_frames']) @@ -137,13 +139,18 @@ def __init__(self, data_transformer_ctors, observation_spec): @staticmethod def _validate_order(data_transformers): + # Hindsight should probably not be used together with FrameStacker, + # unless done really carefully. Hindsight after FrameStacker is + # simply wrong, because Hindsight would read ``achieved_goal`` field + # of a future step directly from the replay buffer without stacking. def _tier_of(data_transformer): if isinstance(data_transformer, UntransformedTimeStep): return 1 - if isinstance(data_transformer, - (HindsightExperienceTransformer, FrameStacker)): + if isinstance(data_transformer, HindsightExperienceTransformer): return 2 - return 3 + if isinstance(data_transformer, FrameStacker): + return 3 + return 4 prev_tier = 0 for i in range(len(data_transformers)): @@ -180,6 +187,7 @@ def __init__(self, observation_spec, stack_size=4, stack_axis=0, + convert_only_minibatch_to_device=False, fields=None): """Create a FrameStacker object. @@ -187,6 +195,8 @@ def __init__(self, observation_spec (nested TensorSpec): describing the observation in timestep stack_size (int): stack so many frames stack_axis (int): the dimension to stack the observation. + convert_only_minibatch_to_device (bool): whether to convert only the + minibatch or the whole batch of data to the default device. fields (list[str]): fields to be stacked, A field str is a multi-level path denoted by "A.B.C". If None, then non-nested observation is stacked. """ @@ -198,6 +208,7 @@ def __init__(self, self._frames = dict() self._fields = fields if (fields is not None) else [None] self._exp_fields = [] + self._convert_only_minibatch_to_device = convert_only_minibatch_to_device prev_frames_spec = [] stacked_observation_spec = observation_spec for field in self._fields: @@ -350,9 +361,14 @@ def transform_experience(self, experience: Experience): B = torch.arange(batch_size) obs_index = (B.unsqueeze(-1).unsqueeze(-1), obs_index.unsqueeze(0)) + if self._convert_only_minibatch_to_device: + obs_index = convert_device(obs_index, device=replay_buffer.device) + def _stack_frame(obs, i): prev_obs = replay_buffer.get_field(self._exp_fields[i], env_ids, prev_positions) + if not self._convert_only_minibatch_to_device: + prev_obs = convert_device(prev_obs) stacked_shape = alf.nest.get_field( self._transformed_observation_spec, self._fields[i]).shape # [batch_size, mini_batch_length + stack_size - 1, ...] @@ -702,27 +718,6 @@ def forward(self, reward): return reward * self._scale -@alf.configurable -def l2_dist_close_reward_fn(achieved_goal, goal, threshold=.05): - """Giving -1/0 reward based on how close the achieved state is to the goal state. - - Args: - achieved_goal (Tensor): achieved state, of shape ``[batch_size, batch_length, ...]`` - goal (Tensor): goal state, of shape ``[batch_size, batch_length, ...]`` - threshold (float): L2 distance threshold for the reward. - - Returns: - Tensor for -1/0 reward of shape ``[batch_size, batch_length]``. - """ - - if goal.dim() == 2: # when goals are 1-dimensional - assert achieved_goal.dim() == goal.dim() - achieved_goal = achieved_goal.unsqueeze(2) - goal = goal.unsqueeze(2) - return -(torch.norm(achieved_goal - goal, dim=2) >= threshold).to( - torch.float32) - - @alf.configurable class HindsightExperienceTransformer(DataTransformer): """Randomly transform her_proportion of `batch_size` trajectories with hindsight relabel. @@ -734,14 +729,31 @@ class HindsightExperienceTransformer(DataTransformer): of the current timestep. The exact field names can be provided via arguments to the class ``__init__``. + NOTE: When the experience reward is multi-dimensional, the 0th dimension is assumed + to be the goal reward dimension, and is relabed by the HindsightExperienceTransformer. + All other reward dimensions are untouched. + TODO: Change the reward field into a nested field to be able to support multi-dimensional + goal rewards. + + NOTE: The HindsightExperienceTransformer has to happen before any transformer which changes + reward or achieved_goal fields, e.g. observation normalizer, reward clipper, etc.. + See `documentation <../../docs/notes/knowledge_base.rst#datatransformers>`_ for details. + To use this class, add it to any existing data transformers, e.g. use this config if ``ObservationNormalizer`` is an existing data transformer: .. code-block:: python - ReplayBuffer.keep_episodic_info=True - HindsightExperienceTransformer.her_proportion=0.8 - TrainerConfig.data_transformer_ctor=[@HindsightExperienceTransformer, @ObservationNormalizer] + alf.config('ReplayBuffer', keep_episodic_info=True) + alf.config( + 'HindsightExperienceTransformer', + her_proportion=0.8 + ) + alf.config( + 'TrainerConfig', + data_transformer_ctor=[ + HindsightExperienceTransformer, ObservationNormalizer + ]) See unit test for more details on behavior. """ @@ -751,7 +763,10 @@ def __init__(self, her_proportion=0.8, achieved_goal_field="time_step.observation.achieved_goal", desired_goal_field="time_step.observation.desired_goal", - reward_fn=l2_dist_close_reward_fn): + relabel_with_episodic_rewards=False, + relabeled_goal_noise=0, + reward_fn=l2_dist_close_reward_fn, + episodic_reward_transform=alf.utils.math_ops.identity): """ Args: her_proportion (float): proportion of hindsight relabeled experience. @@ -759,10 +774,16 @@ def __init__(self, exp nest. desired_goal_field (str): path to the desired_goal field in the exp nest. + relabel_with_episodic_rewards (bool): Whether to transform reward from -1/0 to 0/1. + This also makes the task episodic by relabeling rewarding step as LAST + and setting discount to 0. + relabeled_goal_noise (float): if positive, the noise added to relabeled goals. reward_fn (Callable): function to recompute reward based on achieve_goal and desired_goal. Default gives reward 0 when L2 distance less than 0.05 and -1 otherwise, same as is done in suite_robotics environments. + episodic_reward_transform (Callable): transforms reward from -1/0 to 0/1. + Only used when sparse_reward is True. """ super().__init__( transformed_observation_spec=transformed_observation_spec, @@ -770,11 +791,98 @@ def __init__(self, self._her_proportion = her_proportion self._achieved_goal_field = achieved_goal_field self._desired_goal_field = desired_goal_field + self._relabel_with_episodic_rewards = relabel_with_episodic_rewards + self._relabeled_goal_noise = relabeled_goal_noise self._reward_fn = reward_fn + self._episodic_reward_transform = episodic_reward_transform def transform_timestep(self, timestep: TimeStep, state): return timestep, state + def _verify_reward_function(self, her_cond, result, relabeled_rewards, + result_ag, result_desired_goal, env_ids, + start_pos, shape): + # Verify reward function is the same as used by the environment. + + # Handle multi dim reward, assumes 0th dim is goal reward. + goal_rewards = result.reward + if result.reward.ndim > 2: + goal_rewards = result.reward[:, :, 0] + + non_her_or_fst = ~her_cond.unsqueeze(1) & (result.step_type != + StepType.FIRST) + if not torch.allclose(relabeled_rewards[non_her_or_fst], + goal_rewards[non_her_or_fst]): + not_close = torch.abs(relabeled_rewards[non_her_or_fst] - + goal_rewards[non_her_or_fst]) > 0.01 + msg = ("hindsight_relabel:\nrelabeled_reward\n{}\n!=\n" + + "env_reward\n{}\nag:\n{}\ndg:\n{}\nenv_ids:\n{}\nstart_pos:" + + "\n{}").format( + relabeled_rewards[non_her_or_fst][not_close], + goal_rewards[non_her_or_fst][not_close], + result_ag[non_her_or_fst][not_close], + result_desired_goal[non_her_or_fst][not_close], + env_ids.unsqueeze(1).expand( + shape[:2])[non_her_or_fst][not_close], + start_pos.unsqueeze(1).expand( + shape[:2])[non_her_or_fst][not_close]) + logging.warning(msg) + # assert False, msg + # relabeled_rewards[non_her_or_fst] = goal_rewards[non_her_or_fst] + + def _add_noise(self, t): + # rejection sample from unit ball + if self._relabeled_goal_noise <= 0: + return t + bs, bl, dim = t.shape + assert dim < 20, "Cannot rejection sample from high dim ball yet." + n_samples, i = 0, 0 + while n_samples == 0: + _sample = torch.rand((bs * 2, dim)) + in_ball = torch.norm(_sample, dim=1) < 1. + if torch.any(in_ball): + sample = _sample[in_ball] + nsample = sample.shape[0] + if nsample < bs: + sample = sample.expand(bs // nsample + 1, nsample, + dim).reshape(-1, dim) + if sample.shape[0] > bs: + sample = sample[:bs, :] + break + assert i < 10, "shouldn't take 10 iterations" + i += 1 + return t + self._relabeled_goal_noise * sample.reshape(bs, 1, dim) + + def _episodic_relabel(self, result, relabeled_rewards, buffer): + # Assumes that original reward is -1/0 and 0 when goal is reached. + reward_achieved = relabeled_rewards >= 0 + # Cut off episode for any goal reached, making the task episodic. + end = reward_achieved + discount = torch.where(end, torch.tensor(0.), result.discount) + step_type = torch.where(end, torch.tensor(StepType.LAST), + result.step_type) + # Also relabel ``LAST``` steps to ``MID``` where aux goals were not + # achieved but env ended episode due to position goal achieved. + # -1/0 reward doesn't end episode on achieving position goal, and + # doesn't need to do this relabeling. + goal_reward = result.reward + if len(result.reward.shape) > 2: + goal_reward = result.reward[..., 0] + mid = (result.step_type == StepType.LAST) & ~reward_achieved & ( + goal_reward > 0) # assumes no multi dim goal reward. + discount = torch.where(mid, torch.tensor(1.), discount) + step_type = torch.where(mid, torch.tensor(StepType.MID), step_type) + + if alf.summary.should_record_summaries(): + alf.summary.scalar( + "replayer/" + buffer._name + ".discount_mean_after_relabel", + torch.mean(discount[:, 1:])) + + result = result._replace(discount=discount) + result = result._replace(step_type=step_type) + relabeled_rewards = self._episodic_reward_transform(relabeled_rewards) + return result, relabeled_rewards + def transform_experience(self, experience: Experience): """Hindsight relabel experience Note: The environments where the samples are from are ordered in the @@ -819,9 +927,10 @@ def transform_experience(self, experience: Experience): her_cond = torch.rand(batch_size) < her_proportion (her_indices, ) = torch.where(her_cond) - last_step_pos = start_pos[her_indices] + batch_length - 1 - last_env_ids = env_ids[her_indices] - # Get x, y indices of LAST steps + has_her = torch.any(her_cond) + last_step_pos = start_pos + batch_length - 1 + last_env_ids = env_ids + # Get x, y indices of LAST steps for the whole batch, not just the HER part. dist = buffer.steps_to_episode_end(last_step_pos, last_env_ids) if alf.summary.should_record_summaries(): alf.summary.scalar( @@ -829,66 +938,72 @@ def transform_experience(self, experience: Experience): torch.mean(dist.type(torch.float32))) # get random future state - future_idx = last_step_pos + (torch.rand(*dist.shape) * - (dist + 1)).to(torch.int64) - future_ag = buffer.get_field(self._achieved_goal_field, - last_env_ids, future_idx).unsqueeze(1) + future_dist = (torch.rand(*dist.shape) * (dist + 1)).to( + torch.int64) + future_idx = last_step_pos + future_dist + future_ag = self._add_noise( + buffer.get_field(self._achieved_goal_field, last_env_ids, + future_idx).unsqueeze(1)) # relabel desired goal result_desired_goal = alf.nest.get_field(result, self._desired_goal_field) - relabed_goal = result_desired_goal.clone() + relabeled_goal = result_desired_goal.clone() her_batch_index_tuple = (her_indices.unsqueeze(1), torch.arange(batch_length).unsqueeze(0)) - relabed_goal[her_batch_index_tuple] = future_ag + if has_her: + relabeled_goal[her_batch_index_tuple] = future_ag[her_indices] # recompute rewards result_ag = alf.nest.get_field(result, self._achieved_goal_field) - relabeled_rewards = self._reward_fn(result_ag, relabed_goal) - - non_her_or_fst = ~her_cond.unsqueeze(1) & (result.step_type != - StepType.FIRST) - # assert reward function is the same as used by the environment. - if not torch.allclose(relabeled_rewards[non_her_or_fst], - result.reward[non_her_or_fst]): - not_close = torch.abs(relabeled_rewards[non_her_or_fst] - - result.reward[non_her_or_fst]) > 0.01 - msg = ( - "hindsight_relabel:\nrelabeled_reward\n{}\n!=\n" + - "env_reward\n{}\nag:\n{}\ndg:\n{}\nenv_ids:\n{}\nstart_pos:" - + "\n{}").format( - relabeled_rewards[non_her_or_fst][not_close], - result.reward[non_her_or_fst][not_close], - result_ag[non_her_or_fst][not_close], - result_desired_goal[non_her_or_fst][not_close], - env_ids.unsqueeze(1).expand( - shape[:2])[non_her_or_fst][not_close], - start_pos.unsqueeze(1).expand( - shape[:2])[non_her_or_fst][not_close]) - logging.warning(msg) - # assert False, msg - relabeled_rewards[non_her_or_fst] = result.reward[ - non_her_or_fst] + relabeled_rewards = self._reward_fn(result_ag, relabeled_goal) + if alf.summary.should_record_summaries(): + alf.summary.scalar( + "replayer/" + buffer._name + + ".discount_mean_before_relabel", + torch.mean(result.discount[:, 1:])) + + self._verify_reward_function(her_cond, result, relabeled_rewards, + result_ag, result_desired_goal, + env_ids, start_pos, shape) + + if self._relabel_with_episodic_rewards: + result, relabeled_rewards = self._episodic_relabel( + result, relabeled_rewards, buffer) + + # Multi dimensional env reward. Assumes 0th dim is goal reward. + final_relabeled_rewards = relabeled_rewards + if result.reward.ndim > 2: + final_relabeled_rewards = result.reward.clone() + final_relabeled_rewards[:, :, 0] = relabeled_rewards if alf.summary.should_record_summaries(): alf.summary.scalar( "replayer/" + buffer._name + ".reward_mean_before_relabel", torch.mean(result.reward[her_indices][:-1])) - alf.summary.scalar( - "replayer/" + buffer._name + ".reward_mean_after_relabel", - torch.mean(relabeled_rewards[her_indices][:-1])) + if has_her: + alf.summary.scalar( + "replayer/" + buffer._name + ".reward_mean_after_relabel", + torch.mean(relabeled_rewards[her_indices][:-1])) + alf.summary.scalar("replayer/" + buffer._name + ".future_distance", + torch.mean(future_dist.float())) result = alf.nest.transform_nest( - result, self._desired_goal_field, lambda _: relabed_goal) - - result = result.update_time_step_field('reward', relabeled_rewards) - + result, self._desired_goal_field, lambda _: relabeled_goal) + result = result.update_time_step_field('reward', + final_relabeled_rewards) + info = info._replace(her=her_cond, future_distance=future_dist) if alf.get_default_device() != buffer.device: for f in accessed_fields: result = alf.nest.transform_nest( result, f, lambda t: convert_device(t)) - result = alf.nest.transform_nest( - result, "batch_info.replay_buffer", lambda _: buffer) + info = convert_device(info) + info = info._replace( + her=info.her.unsqueeze(1).expand(result.reward.shape[:2]), + future_distance=info.future_distance.unsqueeze(1).expand( + result.reward.shape[:2]), + replay_buffer=buffer) + result = alf.data_structures.add_batch_info(result, info) return result diff --git a/alf/algorithms/ddpg_algorithm.py b/alf/algorithms/ddpg_algorithm.py index 7c0678998..3191cc2da 100644 --- a/alf/algorithms/ddpg_algorithm.py +++ b/alf/algorithms/ddpg_algorithm.py @@ -40,9 +40,20 @@ DdpgActorState = namedtuple("DdpgActorState", ['actor', 'critics']) DdpgState = namedtuple("DdpgState", ['actor', 'critics']) DdpgInfo = namedtuple( - "DdpgInfo", [ - "reward", "step_type", "discount", "action", "action_distribution", - "actor_loss", "critic", "discounted_return" + "DdpgInfo", + [ + "reward", + "step_type", + "discount", + "action", + "action_distribution", + "actor_loss", + "critic", + # Optional fields for value target lower bounding or Hindsight relabeling. + # TODO: Extract these into a HerAlgorithm wrapper for easier adoption of HER. + "discounted_return", + "future_distance", + "her" ], default_value=()) DdpgLossInfo = namedtuple('DdpgLossInfo', ('actor', 'critic')) @@ -237,10 +248,12 @@ def _sample(a, ou): noisy_action, self._action_spec) state = empty_state._replace( actor=DdpgActorState(actor=state, critics=())) + # action_distribution is not supported for continuous actions for now. + # Returns empty action_distribution to fail early. return AlgStep( output=noisy_action, state=state, - info=DdpgInfo(action=noisy_action, action_distribution=action)) + info=DdpgInfo(action=noisy_action, action_distribution=())) def rollout_step(self, time_step: TimeStep, state=None): if self.need_full_rollout_state(): @@ -330,7 +343,8 @@ def train_step(self, inputs: TimeStep, state: DdpgState, reward=inputs.reward, step_type=inputs.step_type, discount=inputs.discount, - action_distribution=policy_step.output, + action=policy_step.output, + action_distribution=(), critic=critic_info, actor_loss=policy_step.info, discounted_return=rollout_info.discounted_return)) diff --git a/alf/algorithms/dqn_algorithm.py b/alf/algorithms/dqn_algorithm.py new file mode 100644 index 000000000..8b0a3c1bc --- /dev/null +++ b/alf/algorithms/dqn_algorithm.py @@ -0,0 +1,128 @@ +# Copyright (c) 2020 Horizon Robotics and ALF Contributors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""DQN Algorithm.""" + +import torch + +import alf +from alf.algorithms.config import TrainerConfig +from alf.algorithms.sac_algorithm import SacAlgorithm, ActionType, \ + SacState as DqnState, SacCriticState as DqnCriticState, \ + SacInfo as DqnInfo +from alf.data_structures import TimeStep +from alf.networks import QNetwork +from alf.optimizers import AdamTF +from alf.tensor_specs import TensorSpec, BoundedTensorSpec +from alf.utils.schedulers import as_scheduler + + +@alf.configurable +class DqnAlgorithm(SacAlgorithm): + r"""DQN/DDQN algorithm: + + :: + + Mnih et al "Playing Atari with Deep Reinforcement Learning", arXiv:1312.5602 + Hasselt et al "Deep Reinforcement Learning with Double Q-learning", arXiv:1509.06461 + + The difference with DDQN is that a minimum is taken from the two critics, + similar to TD3, instead of using one critic as the target of the other. + + The implementation is based on the SAC algorithm. + """ + + def __init__(self, + observation_spec, + action_spec: BoundedTensorSpec, + reward_spec=TensorSpec(()), + q_network_cls=QNetwork, + q_optimizer=None, + rollout_epsilon_greedy=0.1, + num_critic_replicas=2, + env=None, + config: TrainerConfig = None, + critic_loss_ctor=None, + debug_summaries=False, + name="DqnAlgorithm"): + """ + Args: + observation_spec (nested TensorSpec): representing the observations. + action_spec (nested BoundedTensorSpec): representing the actions; can + be a mixture of discrete and continuous actions. The number of + continuous actions can be arbitrary while only one discrete + action is allowed currently. If it's a mixture, then it must be + a tuple/list ``(discrete_action_spec, continuous_action_spec)``. + reward_spec (TensorSpec): a rank-1 or rank-0 tensor spec representing + the reward(s). + q_network (Callable): is used to construct QNetwork for estimating ``Q(s,a)`` + given that the action is discrete. Its output spec must be consistent with + the discrete action in ``action_spec``. + q_optimizer (torch.optim.optimizer): A custom optimizer for the q network. + Uses the enclosing algorithm's optimizer if None. + rollout_epsilon_greedy (float|Scheduler): epsilon greedy policy for rollout. + Together with the following two parameters, the SAC algorithm + can be converted to a DQN or DDQN algorithm when e.g. + ``rollout_epsilon_greedy=0.3``, ``max_target_action=True``, and + ``use_entropy_reward=False``. + num_critic_replicas (int): number of critics to be used. Default is 2. + env (Environment): The environment to interact with. ``env`` is a + batched environment, which means that it runs multiple simulations + simultateously. ``env` only needs to be provided to the root + algorithm. + config (TrainerConfig): config for training. It only needs to be + provided to the algorithm which performs ``train_iter()`` by + itself. + critic_loss_ctor (None|OneStepTDLoss|MultiStepLoss): a critic loss + constructor. If ``None``, a default ``OneStepTDLoss`` will be used. + debug_summaries (bool): True if debug summaries should be created. + name (str): The name of this algorithm. + """ + self._rollout_epsilon_greedy = as_scheduler(rollout_epsilon_greedy) + # Disable alpha learning: + alpha_optimizer = AdamTF(lr=0) + + super().__init__( + observation_spec=observation_spec, + action_spec=action_spec, + reward_spec=reward_spec, + actor_network_cls=None, + critic_network_cls=None, + q_network_cls=q_network_cls, + # Do not use entropy reward: + use_entropy_reward=False, + num_critic_replicas=num_critic_replicas, + env=env, + config=config, + critic_loss_ctor=critic_loss_ctor, + # Allow custom optimizer for q_network: + critic_optimizer=q_optimizer, + alpha_optimizer=alpha_optimizer, + debug_summaries=debug_summaries, + name=name) + assert self._act_type == ActionType.Discrete + + def rollout_step(self, inputs: TimeStep, state: DqnState): + return super().rollout_step( + inputs, state, eps=self._rollout_epsilon_greedy()) + + def _critic_train_step(self, inputs: TimeStep, state: DqnCriticState, + rollout_info: DqnInfo, action, action_distribution): + return super()._critic_train_step( + inputs, + state, + rollout_info, + action, + action_distribution, + # Pick the greedy target action: + target_action_picker=lambda t: torch.max(t, dim=1)[0]) diff --git a/alf/algorithms/one_step_loss.py b/alf/algorithms/one_step_loss.py index 34ee329ad..e687f9dc5 100644 --- a/alf/algorithms/one_step_loss.py +++ b/alf/algorithms/one_step_loss.py @@ -16,12 +16,12 @@ from typing import Union, List, Callable import alf -from alf.algorithms.td_loss import TDLoss, TDQRLoss +from alf.algorithms.td_loss import LowerBoundedTDLoss, TDQRLoss from alf.utils import losses @alf.configurable -class OneStepTDLoss(TDLoss): +class OneStepTDLoss(LowerBoundedTDLoss): def __init__(self, gamma: Union[float, List[float]] = 0.99, td_error_loss_fn: Callable = losses.element_wise_squared_loss, diff --git a/alf/algorithms/rl_algorithm.py b/alf/algorithms/rl_algorithm.py index c0d8e2a41..760dfdda4 100644 --- a/alf/algorithms/rl_algorithm.py +++ b/alf/algorithms/rl_algorithm.py @@ -223,12 +223,13 @@ def __init__(self, replay_buffer_length = adjust_replay_buffer_length( config, self._num_earliest_frames_ignored) + total_replay_size = replay_buffer_length * self._env.batch_size if config.whole_replay_buffer_training and config.clear_replay_buffer: # For whole replay buffer training, we would like to be sure # that the replay buffer have enough samples in it to perform # the training, which will most likely happen in the 2nd # iteration. The minimum_initial_collect_steps guarantees that. - minimum_initial_collect_steps = replay_buffer_length * self._env.batch_size + minimum_initial_collect_steps = total_replay_size if config.initial_collect_steps < minimum_initial_collect_steps: common.info( 'Set the initial_collect_steps to minimum required ' @@ -236,6 +237,9 @@ def __init__(self, 'whole_replay_buffer_training is on.') config.initial_collect_steps = minimum_initial_collect_steps + assert config.initial_collect_steps <= total_replay_size, \ + "Training will not happen - insufficient replay buffer size." + self.set_replay_buffer(self._env.batch_size, replay_buffer_length, config.priority_replay) diff --git a/alf/algorithms/sac_algorithm.py b/alf/algorithms/sac_algorithm.py index 91dd59134..845bec9c6 100644 --- a/alf/algorithms/sac_algorithm.py +++ b/alf/algorithms/sac_algorithm.py @@ -37,6 +37,7 @@ from alf.tensor_specs import TensorSpec, BoundedTensorSpec from alf.utils import losses, common, dist_utils, math_ops from alf.utils.normalizers import ScalarAdaptiveNormalizer +from alf.utils.schedulers import as_scheduler ActionType = Enum('ActionType', ('Discrete', 'Continuous', 'Mixed')) @@ -54,9 +55,22 @@ "SacActorInfo", ["actor_loss", "neg_entropy"], default_value=()) SacInfo = namedtuple( - "SacInfo", [ - "reward", "step_type", "discount", "action", "action_distribution", - "actor", "critic", "alpha", "log_pi", "discounted_return" + "SacInfo", + [ + "reward", + "step_type", + "discount", + "action", + "action_distribution", + "actor", + "critic", + "alpha", + "log_pi", + # Optional fields for value target lower bounding or Hindsight relabeling. + # TODO: Extract these into a HerAlgorithm wrapper for easier adoption of HER. + "discounted_return", + "future_distance", + "her" ], default_value=()) @@ -541,7 +555,7 @@ def predict_step(self, inputs: TimeStep, state: SacState): state=SacState(action=action_state), info=SacInfo(action_distribution=action_dist)) - def rollout_step(self, inputs: TimeStep, state: SacState): + def rollout_step(self, inputs: TimeStep, state: SacState, eps: float = 1.): """``rollout_step()`` basically predicts actions like what is done by ``predict_step()``. Additionally, if states are to be stored a in replay buffer, then this function also call ``_critic_networks`` and @@ -550,7 +564,7 @@ def rollout_step(self, inputs: TimeStep, state: SacState): action_dist, action, _, action_state = self._predict_action( inputs.observation, state=state.action, - epsilon_greedy=1.0, + epsilon_greedy=eps, eps_greedy_sampling=True, rollout=True) @@ -694,8 +708,13 @@ def _select_q_value(self, action, q_values): *self._reward_spec.shape).long() return q_values.gather(2, action).squeeze(2) - def _critic_train_step(self, inputs: TimeStep, state: SacCriticState, - rollout_info: SacInfo, action, action_distribution): + def _critic_train_step(self, + inputs: TimeStep, + state: SacCriticState, + rollout_info: SacInfo, + action, + action_distribution, + target_action_picker: Callable = None): critics, critics_state = self._compute_critics( self._critic_networks, inputs.observation, @@ -717,7 +736,10 @@ def _critic_train_step(self, inputs: TimeStep, state: SacCriticState, probs = common.expand_dims_as(action_distribution.probs, target_critics) # [B, reward_dim] - target_critics = torch.sum(probs * target_critics, dim=1) + if target_action_picker is not None: + target_critics = target_action_picker(target_critics) + else: + target_critics = torch.sum(probs * target_critics, dim=1) elif self._act_type == ActionType.Mixed: critics = self._select_q_value(rollout_info.action[0], critics) discrete_act_dist = action_distribution[0] diff --git a/alf/algorithms/td_loss.py b/alf/algorithms/td_loss.py index 80c2c0a93..8c6c32c57 100644 --- a/alf/algorithms/td_loss.py +++ b/alf/algorithms/td_loss.py @@ -31,9 +31,11 @@ class TDLoss(nn.Module): def __init__(self, gamma: Union[float, List[float]] = 0.99, td_error_loss_fn: Callable = element_wise_squared_loss, + clip: float = 0., td_lambda: float = 0.95, normalize_target: bool = False, debug_summaries: bool = False, + use_retrace: bool = False, name: str = "TDLoss"): r""" Let :math:`G_{t:T}` be the bootstraped return from t to T: @@ -80,10 +82,20 @@ def __init__(self, td_error_loss_fn: A function for computing the TD errors loss. This function takes as input the target and the estimated Q values and returns the loss for each element of the batch. + clip: When positive, loss clipping to the range [-clip, clip]. td_lambda: Lambda parameter for TD-lambda computation. normalize_target (bool): whether to normalize target. Note that the effect of this is to change the loss. The critic value itself is not normalized. + use_retrace: turn on retrace loss + + .. math:: + + \mathcal{R} Q(x, a) := Q(x, a) + \mathbb{E}_{\mu}\left[ + \sum_{t \geq 0} \gamma^{t}\left(\prod_{s=1}^{t} c_{s}\right) + \left(r_{t} + \gamma \mathbb{E}_{\pi} Q\left(x_{t+1}, \cdot\right)-Q\left(x_{t}, a_{t}\right) + \right)\right] + debug_summaries: True if debug summaries should be created. name: The name of this loss. """ @@ -92,7 +104,9 @@ def __init__(self, self._name = name self._gamma = torch.tensor(gamma) self._td_error_loss_fn = td_error_loss_fn + self._clip = clip self._lambda = td_lambda + self._use_retrace = use_retrace self._debug_summaries = debug_summaries self._normalize_target = normalize_target self._target_normalizer = None @@ -106,7 +120,11 @@ def gamma(self): """ return self._gamma.clone() - def compute_td_target(self, info: namedtuple, target_value: torch.Tensor): + def compute_td_target(self, + info: namedtuple, + value: torch.Tensor, + target_value: torch.Tensor, + qr: bool = False): """Calculate the td target. The first dimension of all the tensors is time dimension and the second @@ -119,46 +137,73 @@ def compute_td_target(self, info: namedtuple, target_value: torch.Tensor): - reward: - step_type: - discount: + value (torch.Tensor): the time-major tensor for the value at + each time step. Some of its value can be overwritten and passed + back to the caller. target_value (torch.Tensor): the time-major tensor for the value at each time step. This is used to calculate return. ``target_value`` - can be same as ``value``. + can be same as ``value``, except for Retrace. Returns: - td_target + td_target, updated value, optional constraint_loss """ + if not qr and info.reward.ndim == 3: + # Multi-dim reward, not quantile regression. + # [T, B, D] or [T, B, 1] + discounts = info.discount.unsqueeze(-1) * self._gamma + else: + # [T, B] + discounts = info.discount * self._gamma + if self._lambda == 1.0: returns = value_ops.discounted_return( rewards=info.reward, values=target_value, step_types=info.step_type, - discounts=info.discount * self._gamma) + discounts=discounts) elif self._lambda == 0.0: returns = value_ops.one_step_discounted_return( rewards=info.reward, values=target_value, step_types=info.step_type, - discounts=info.discount * self._gamma) - else: + discounts=discounts) + elif not self._use_retrace: advantages = value_ops.generalized_advantage_estimation( rewards=info.reward, values=target_value, step_types=info.step_type, - discounts=info.discount * self._gamma, + discounts=discounts, td_lambda=self._lambda) returns = advantages + target_value[:-1] + else: # Retrace + scope = alf.summary.scope(self.__class__.__name__) + assert info.rollout_info.action_distribution != (), \ + "Algorithm does not provide rollout action_distribution" + importance_ratio, importance_ratio_clipped = value_ops. \ + action_importance_ratio( + action_distribution=info.action_distribution, + rollout_action_distribution=info.rollout_info.action_distribution, + action=info.action, + clipping_mode='capping', + importance_ratio_clipping=0.0, + log_prob_clipping=0.0, + scope=scope, + check_numerics=False, + debug_summaries=self._debug_summaries) + advantages = value_ops.generalized_advantage_estimation_retrace( + importance_ratio=importance_ratio_clipped, + rewards=info.reward, + values=value, + target_value=target_value, + step_types=info.step_type, + discounts=discounts, + use_retrace=True, + time_major=True, + td_lambda=self._lambda) - disc_ret = () - if hasattr(info, "discounted_return"): - disc_ret = info.discounted_return - if disc_ret != (): - with alf.summary.scope(self._name): - episode_ended = disc_ret > self._default_return - alf.summary.scalar("episodic_discounted_return_all", - torch.mean(disc_ret[episode_ended])) - alf.summary.scalar( - "value_episode_ended_all", - torch.mean(value[:-1][:, episode_ended[0, :]])) + returns = advantages + value[:-1] + returns = returns.detach() - return returns + return returns, value, None def forward(self, info: namedtuple, value: torch.Tensor, target_value: torch.Tensor): @@ -182,7 +227,8 @@ def forward(self, info: namedtuple, value: torch.Tensor, Returns: LossInfo: with the ``extra`` field same as ``loss``. """ - returns = self.compute_td_target(info, target_value) + returns, value, constraint_loss = self.compute_td_target( + info, value, target_value) value = value[:-1] if self._normalize_target: @@ -219,17 +265,272 @@ def _summarize(v, r, td, suffix): suffix) loss = self._td_error_loss_fn(returns.detach(), value) + if self._clip > 0: + loss = torch.clamp(loss, min=-self._clip, max=self._clip) if loss.ndim == 3: # Multidimensional reward. Average over the critic loss for all dimensions loss = loss.mean(dim=2) + # For the subclass LowerBoundedTDLoss + if constraint_loss is not None: + assert constraint_loss.shape == loss.shape[1:], \ + f"{constraint_loss.shape} != {loss.shape}[1:]" + c_loss = constraint_loss.clone().unsqueeze(0).repeat( + (loss.shape[0], 1)) + c_loss[1:] = 0 + if self._lb_loss_scale: + scale = (torch.sum(loss) / torch.sum(c_loss + loss)).detach() + else: + scale = 1 + loss = (c_loss + loss) * scale + # The shape of the loss expected by Algorith.update_with_gradient is # [T, B], so we need to augment it with additional zeros. loss = tensor_utils.tensor_extend_zero(loss) return LossInfo(loss=loss, extra=loss) +@alf.configurable +class LowerBoundedTDLoss(TDLoss): + """Temporal difference loss with value target lower bounding.""" + + def __init__(self, + gamma: Union[float, List[float]] = 0.99, + td_error_loss_fn: Callable = element_wise_squared_loss, + clip: float = 0., + td_lambda: float = 0.95, + normalize_target: bool = False, + use_retrace: bool = False, + lb_target_q: float = 0., + default_return: float = -1000., + improve_w_goal_return: bool = False, + improve_w_nstep_bootstrap: bool = False, + improve_w_nstep_only: bool = False, + lower_bound_constraint: float = 0., + lb_loss_scale: bool = False, + reward_multiplier: float = 1., + positive_reward: bool = True, + debug_summaries: bool = False, + name: str = "LbTDLoss"): + r""" + Args: + gamma .. use_retrace: pass through to TDLoss. + lb_target_q: between 0 and 1. When not zero, use this mixing rate for the + lower bounded value target. Only supports batch_length == 2, one step td. + default_return: Keep it the same as replay_buffer.default_return to plot to + tensorboard episodic_discounted_return only for the timesteps whose + episode already ended. + improve_w_goal_return: Use return calculated from the distance to hindsight + goals. Only supports batch_length == 2, one step td. + improve_w_nstep_bootstrap: Look ahead 2 to n steps, and take the largest + bootstrapped return to lower bound the value target of the 1st step. + improve_w_nstep_only: Only use the n-th step bootstrapped return as + value target lower bound. + lower_bound_constraint: Use n-step bootstrapped return as lower bound + constraints of the value. See reference: + He, F. S., Liu, Y., Schwing, A. G., and Peng, J. + Learning to play in a day: Faster deep reinforcement learning + by optimality tightening. In 5th International Conference + on Learning Representations, ICLR 2017, Toulon, France, + April 24-26, 2017. https://openreview.net/forum?id=rJ8Je4clg + lb_loss_scale: Parameter for lower_bound_constraint. + reward_multiplier: Weight on the hindsight goal return. + positive_reward: If True, assumes 0/1 goal reward, otherwise, -1/0. + debug_summaries: True if debug summaries should be created. + name: The name of this loss. + """ + super().__init__( + gamma=gamma, + td_error_loss_fn=td_error_loss_fn, + clip=clip, + td_lambda=td_lambda, + normalize_target=normalize_target, + use_retrace=use_retrace, + name=name, + debug_summaries=debug_summaries) + + self._lb_target_q = lb_target_q + self._default_return = default_return + self._improve_w_goal_return = improve_w_goal_return + self._improve_w_nstep_bootstrap = improve_w_nstep_bootstrap + self._improve_w_nstep_only = improve_w_nstep_only + self._lower_bound_constraint = lower_bound_constraint + self._lb_loss_scale = lb_loss_scale + self._reward_multiplier = reward_multiplier + self._positive_reward = positive_reward + + def compute_td_target(self, + info: namedtuple, + value: torch.Tensor, + target_value: torch.Tensor, + qr: bool = False): + """Calculate the td target. + + The first dimension of all the tensors is time dimension and the second + dimesion is the batch dimension. + + Args: + info (namedtuple): experience collected from ``unroll()`` or + a replay buffer. All tensors are time-major. ``info`` should + contain the following fields: + - reward: + - step_type: + - discount: + value (torch.Tensor): the time-major tensor for the value at + each time step. Some of its value can be overwritten and passed + back to the caller. + target_value (torch.Tensor): the time-major tensor for the value at + each time step. This is used to calculate return. ``target_value`` + can be same as ``value``, except for Retrace. + Returns: + td_target, updated value, optional constraint_loss + """ + returns, value, _ = super().compute_td_target(info, value, + target_value, qr) + + constraint_loss = None + if self._improve_w_nstep_bootstrap: + assert self._lambda == 1.0, "td lambda does not work with this" + future_returns = value_ops.first_step_future_discounted_returns( + rewards=info.reward, + values=target_value, + step_types=info.step_type, + discounts=discounts) + returns = value_ops.one_step_discounted_return( + rewards=info.reward, + values=target_value, + step_types=info.step_type, + discounts=discounts) + assert torch.all((returns[0] == future_returns[0]) | ( + info.step_type[0] == alf.data_structures.StepType.LAST)), \ + str(returns[0]) + " ne\n" + str(future_returns[0]) + \ + '\nrwd: ' + str(info.reward[0:2]) + \ + '\nlast: ' + str(info.step_type[0:2]) + \ + '\ndisct: ' + str(discounts[0:2]) + \ + '\nv: ' + str(target_value[0:2]) + if self._improve_w_nstep_only: + future_returns = future_returns[ + -1] # last is the n-step return + else: + future_returns = torch.max(future_returns, dim=0)[0] + + with alf.summary.scope(self._name): + alf.summary.scalar( + "max_1_to_n_future_return_gt_td", + torch.mean((returns[0] < future_returns).float())) + if self._lower_bound_constraint > 0: + alf.summary.scalar( + "max_1_to_n_future_return_gt_value", + torch.mean((value[0] < future_returns).float())) + alf.summary.scalar("first_step_discounted_return", + torch.mean(returns[0])) + + if self._lower_bound_constraint > 0: + constraint_loss = self._lower_bound_constraint * torch.max( + torch.zeros_like(future_returns), + future_returns.detach() - value[0])**2 + else: + returns[0] = torch.max(future_returns, returns[0]).detach() + returns[1:] = 0 + value = value.clone() + value[1:] = 0 + + disc_ret = () + if hasattr(info, "discounted_return"): + disc_ret = info.discounted_return + if disc_ret != (): + with alf.summary.scope(self._name): + episode_ended = disc_ret > self._default_return + alf.summary.scalar("episodic_discounted_return_all", + torch.mean(disc_ret[episode_ended])) + alf.summary.scalar( + "value_episode_ended_all", + torch.mean(value[:-1][:, episode_ended[0, :]])) + + if self._lb_target_q > 0 and disc_ret != (): + her_cond = info.her + mask = torch.ones(returns.shape, dtype=torch.bool) + if her_cond != () and torch.any(~her_cond): + mask = ~her_cond[:-1] + disc_ret = disc_ret[ + 1:] # it's expanded in ddpg_algorithm, need to revert back. + assert returns.shape == disc_ret.shape, "%s %s" % (returns.shape, + disc_ret.shape) + with alf.summary.scope(self._name): + alf.summary.scalar( + "episodic_return_gt_td", + torch.mean((returns < disc_ret).float()[mask])) + alf.summary.scalar( + "episodic_discounted_return", + torch.mean( + disc_ret[mask & (disc_ret > self._default_return)])) + returns[mask] = (1 - self._lb_target_q) * returns[mask] + \ + self._lb_target_q * torch.max(returns, disc_ret)[mask] + + if self._improve_w_goal_return: + batch_length, batch_size = returns.shape[:2] + her_cond = info.her + if her_cond != () and torch.any(her_cond): + dist = info.future_distance + if self._positive_reward: + goal_return = torch.pow( + self._gamma * torch.ones(her_cond.shape), dist) + else: + goal_return = -(1. - torch.pow(self._gamma, dist)) / ( + 1. - self._gamma) + goal_return *= self._reward_multiplier + goal_return = goal_return[:-1] + returns_0 = returns + # Multi-dim reward: + if len(returns.shape) > 2: + returns_0 = returns[:, :, 0] + returns_0 = torch.where(her_cond[:-1], + torch.max(returns_0, goal_return), + returns_0) + with alf.summary.scope(self._name): + alf.summary.scalar( + "goal_return_gt_td", + torch.mean((returns_0 < goal_return).float())) + alf.summary.scalar("goal_return", torch.mean(goal_return)) + if len(returns.shape) > 2: + returns[:, :, 0] = returns_0 + else: + returns = returns_0 + + return returns, value, constraint_loss + + def forward(self, info: namedtuple, value: torch.Tensor, + target_value: torch.Tensor): + """Calculate the loss. + + The first dimension of all the tensors is time dimension and the second + dimesion is the batch dimension. + + Args: + info: experience collected from ``unroll()`` or + a replay buffer. All tensors are time-major. ``info`` should + contain the following fields: + - reward: + - step_type: + - discount: + value: the time-major tensor for the value at each time + step. The loss is between this and the calculated return. + target_value: the time-major tensor for the value at + each time step. This is used to calculate return. ``target_value`` + can be same as ``value``. + Returns: + LossInfo: with the ``extra`` field same as ``loss``. + """ + loss_info = super().forward(info, value, target_value) + loss = loss_info.loss + if self._improve_w_nstep_bootstrap: + # Ignore 2nd to n-th step losses. + loss[1:] = 0 + + return LossInfo(loss=loss, extra=loss) + + @alf.configurable class TDQRLoss(TDLoss): """Temporal difference quantile regression loss. @@ -301,7 +602,8 @@ def forward(self, info: namedtuple, value: torch.Tensor, assert target_value.shape[-1] == self._num_quantiles, ( "The input target_value should have same num_quantiles as pre-defiend." ) - returns = self.compute_td_target(info, target_value) + returns, value, constraint_loss = self.compute_td_target( + info, value, target_value, qr=True) value = value[:-1] # for quantile regression TD, the value and target both have shape diff --git a/alf/algorithms/td_loss_test.py b/alf/algorithms/td_loss_test.py new file mode 100644 index 000000000..2458fb89e --- /dev/null +++ b/alf/algorithms/td_loss_test.py @@ -0,0 +1,65 @@ +# Copyright (c) 2019 Horizon Robotics. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np +import torch + +import alf +from alf.algorithms.td_loss import LowerBoundedTDLoss +from alf.data_structures import TimeStep, StepType, namedtuple + +DataItem = namedtuple( + "DataItem", ["reward", "step_type", "discount"], default_value=()) + + +class LowerBoundedTDLossTest(unittest.TestCase): + """Tests for alf.algorithms.td_loss.LowerBoundedTDLoss + """ + + def _check(self, res, expected): + np.testing.assert_array_almost_equal(res, expected) + + def test_compute_td_target_nstep_bootstrap_lowerbound(self): + loss = LowerBoundedTDLoss( + gamma=1., improve_w_nstep_bootstrap=True, td_lambda=1) + # Tensors are transposed to be time_major [T, B, ...] + step_types = torch.tensor([[StepType.MID] * 5], + dtype=torch.int64).transpose(0, 1) + rewards = torch.tensor([[2.] * 5], dtype=torch.float32).transpose(0, 1) + discounts = torch.tensor([[0.9] * 5], dtype=torch.float32).transpose( + 0, 1) + values = torch.tensor([[1.] * 5], dtype=torch.float32).transpose(0, 1) + info = DataItem( + reward=rewards, step_type=step_types, discount=discounts) + returns, value, _ = loss.compute_td_target(info, values, values) + expected_return = torch.tensor( + [[2 + 0.9 * (2 + 0.9 * (2 + 0.9 * (2 + 0.9))), 0, 0, 0]], + dtype=torch.float32).transpose(0, 1) + self._check(res=returns, expected=expected_return) + + expected_value = torch.tensor([[1, 0, 0, 0, 0]], + dtype=torch.float32).transpose(0, 1) + self._check(res=value, expected=expected_value) + + # n-step return is below 1-step + values[2:] = -10 + expected_return[0] = 2 + 0.9 + returns, value, _ = loss.compute_td_target(info, values, values) + self._check(res=returns, expected=expected_return) + + +if __name__ == '__main__': + alf.test.main() diff --git a/alf/environments/gym_wrappers.py b/alf/environments/gym_wrappers.py index 6e0c634e3..e77c7bc8e 100644 --- a/alf/environments/gym_wrappers.py +++ b/alf/environments/gym_wrappers.py @@ -714,3 +714,13 @@ def __init__(self, env): def step(self, action): ob, reward, done, info = self.env.step(action) return ob, reward, False, info + + +@alf.configurable +class RemoveInfoWrapper(gym.Wrapper): + """Remove all the info from environment return. + """ + + def step(self, action): + obs, reward, done, info = self.env.step(action) + return obs, reward, done, {} diff --git a/alf/environments/suite_robotics.py b/alf/environments/suite_robotics.py index 3bcd2b70c..cdac169f1 100644 --- a/alf/environments/suite_robotics.py +++ b/alf/environments/suite_robotics.py @@ -34,6 +34,7 @@ import alf from alf.environments import suite_gym, alf_wrappers, process_environment +from alf.environments.gym_wrappers import RemoveInfoWrapper from alf.environments.utils import UnwrappedEnvChecker _unwrapped_env_checker_ = UnwrappedEnvChecker() @@ -43,19 +44,35 @@ def is_available(): return mujoco_py is not None +@alf.configurable class SparseReward(gym.Wrapper): """Convert the original :math:`-1/0` rewards to :math:`0/1`. """ - def __init__(self, env): + def __init__(self, + env, + reward_weight: float = 1., + positive_reward: bool = True): + """ + Args: + reward_weight: weight of output reward. + positive_reward: if True, returns 0/1 reward, otherwise, -1/0 reward. + """ gym.Wrapper.__init__(self, env) + self._reward_weight = reward_weight + self._positive_reward = positive_reward def step(self, action): # openai Robotics env will always return ``done=False`` ob, reward, done, info = self.env.step(action) if reward == 0: done = True - return ob, reward + 1, done, info + if self._positive_reward: + return_reward = reward + 1 + else: + return_reward = reward + return_reward *= self._reward_weight + return ob, return_reward, done, info @alf.configurable @@ -84,6 +101,41 @@ def step(self, action): return obs, reward, done, info +@alf.configurable +class TransformGoals(gym.Wrapper): + """Convert the original achieved_goal and desired_goal to first two dims, and produce sparse reward. + + It ignores original reward which is a multi dimensional negative distance to goal. + """ + + def __init__(self, env): + super().__init__(env) + goal_space = gym.spaces.Box( + env.observation_space["achieved_goal"].low[:2], + env.observation_space["achieved_goal"].high[:2]) + self.observation_space = gym.spaces.Dict({ + "achieved_goal": goal_space, + "desired_goal": goal_space, + "observation": env.observation_space["observation"] + }) + + def reset(self): + ob = self.env.reset() + ob["achieved_goal"] = ob["achieved_goal"][:2] + ob["desired_goal"] = ob["desired_goal"][:2] + return ob + + def step(self, action): + # openai Robotics env will always return ``done=False`` + ob, reward, done, info = self.env.step(action) + ob["achieved_goal"] = ob["achieved_goal"][:2] + ob["desired_goal"] = ob["desired_goal"][:2] + return_reward = alf.utils.math_ops.l2_dist_close_reward_fn_np( + ob["achieved_goal"], ob["desired_goal"]) + return_reward = return_reward[0] + return ob, return_reward, done, info + + @alf.configurable class ObservationClipWrapper(gym.ObservationWrapper): """Clip observation values according to OpenAI's baselines. @@ -124,6 +176,8 @@ def load(environment_name, Args: environment_name: Name for the environment to load. env_id: A scalar ``Tensor`` of the environment ID of the time step. + concat_desired_goal (bool): Whether to concat robot's observation and the goal + location. discount: Discount to use for the environment. max_episode_steps: If None the ``max_episode_steps`` will be set to the default step limit defined in the environment's spec. No limit is applied if set @@ -141,14 +195,28 @@ def load(environment_name, Returns: An AlfEnvironment instance. """ - assert (environment_name.startswith("Fetch") - or environment_name.startswith("HandManipulate")), ( - "This suite only supports OpenAI's Fetch and ShadowHand envs!") + assert ( + environment_name.startswith("Fetch") + or environment_name.startswith("HandManipulate") + or environment_name.startswith("Ant") + ), ("This suite only supports OpenAI's Fetch, ShadowHand and multiworld Ant envs!" + ) _unwrapped_env_checker_.check_and_update(wrap_with_process) + kwargs = {} + if environment_name.startswith("Ant"): + from multiworld.envs.mujoco import register_custom_envs + register_custom_envs() + gym_spec = gym.spec(environment_name) - env = gym_spec.make() + env = gym_spec.make(**kwargs) + if environment_name.startswith("Ant"): + from gym.wrappers import FilterObservation + env = RemoveInfoWrapper( + FilterObservation( + env, ["desired_goal", "achieved_goal", "observation"])) + env = TransformGoals(env) if max_episode_steps is None: if gym_spec.max_episode_steps is not None: diff --git a/alf/environments/suite_socialbot.py b/alf/environments/suite_socialbot.py index 5cae9d688..e1a7e749e 100644 --- a/alf/environments/suite_socialbot.py +++ b/alf/environments/suite_socialbot.py @@ -23,7 +23,9 @@ from fasteners.process_lock import InterProcessLock import functools import gym +import numpy as np import socket +import torch import alf from alf.environments import suite_gym, alf_wrappers, process_environment @@ -38,11 +40,61 @@ def is_available(): return social_bot is not None +@alf.configurable +def transform_reward(reward, reward_cap=1., positive_reward=True): + goal_reward = reward + if isinstance(reward, (np.ndarray, list)): + goal_reward = reward[0] + if positive_reward: + goal_reward = goal_reward >= 0 + goal_reward = goal_reward * reward_cap + if isinstance(reward, (np.ndarray, list)): + reward[0] = goal_reward + else: + reward = goal_reward + return reward + + +@alf.configurable +def transform_reward_tensor(reward, reward_cap=1., positive_reward=True): + goal_reward = reward + if reward.ndim > 2: + goal_reward = reward[:, :, 0] + if positive_reward: + goal_reward = torch.where(goal_reward >= 0, torch.ones(()), + torch.zeros(())) + goal_reward *= reward_cap + if reward.ndim > 2: + reward[:, :, 0] = goal_reward + else: + reward = goal_reward + return reward + + +class SparseReward(gym.Wrapper): + """Convert the original :math:`-1/0` rewards to :math:`0/1`. + """ + + def __init__(self, env): + gym.Wrapper.__init__(self, env) + + def step(self, action): + ob, reward, done, info = self.env.step(action) + goal_reward = reward + if isinstance(reward, (np.ndarray, list)): + goal_reward = reward[0] + if goal_reward == 0: + done = True + reward = transform_reward(reward) + return ob, reward, done, info + + @alf.configurable def load(environment_name, env_id=None, port=None, wrap_with_process=False, + sparse_reward=False, discount=1.0, max_episode_steps=None, gym_env_wrappers=(), @@ -57,6 +109,7 @@ def load(environment_name, env_id (int): (optional) ID of the environment. port (int): Port used for the environment wrap_with_process (bool): Whether wrap environment in a new process + sparse_reward (bool): Whether to use 0/1 instead of -1/0 reward. discount (float): Discount to use for the environment. max_episode_steps (int): If None the max_episode_steps will be set to the default step limit defined in the environment's spec. No limit is applied if set @@ -84,6 +137,8 @@ def load(environment_name, def env_ctor(port, env_id=None): gym_env = gym_spec.make(port=port) + if sparse_reward: + gym_env = SparseReward(gym_env) return suite_gym.wrap_env( gym_env, env_id=env_id, diff --git a/alf/examples/ac_target_navigation_states.gin b/alf/examples/ac_target_navigation_states.gin new file mode 100644 index 000000000..fb3b0fcd8 --- /dev/null +++ b/alf/examples/ac_target_navigation_states.gin @@ -0,0 +1,53 @@ +include 'ac_simple_navigation.gin' + +batch_size=30 +create_environment.num_parallel_environments=%batch_size + +create_environment.env_name='SocialBot-PlayGround-v0' + +suite_socialbot.load.gym_env_wrappers=[] +import alf.utils.math_ops + +conv_layer_params=None +actor/ActorDistributionNetwork.continuous_projection_net_ctor=@NormalProjectionNetwork +actor/ActorDistributionNetwork.conv_layer_params=%conv_layer_params +value/ValueNetwork.conv_layer_params=%conv_layer_params + +ac/AdamTF.lr=2e-4 +ac/AdamTF.gradient_clipping=0.5 + +ActorCriticLoss.entropy_regularization=0.0005 + +# Episodic limits +suite_socialbot.load.max_episode_steps=100 +GoalTask.max_steps=1000 +PlayGround.max_steps=1000 +GoalTask.fail_distance_thresh=1000 +GoalTask.end_episode_after_success=True +GoalTask.end_on_hitting_distraction=True + +# Goal & distraction object setup +GoalTask.random_goal=False +GoalTask.goal_name='ball' +GoalTask.distraction_list=['coke_can', 'car_wheel'] +GoalTask.distraction_penalty_distance_thresh=0.4 + +# Curriculum +GoalTask.use_curriculum_training=True +GoalTask.start_range=1 +GoalTask.max_reward_q_length=100 +GoalTask.reward_thresh_to_increase_range=0.9 +GoalTask.increase_range_by_percent=0.1 +GoalTask.percent_full_range_in_curriculum=0.2 + +# Observation +GoalTask.polar_coord=True +PlayGround.use_image_observation=False +PlayGround.with_language=False +TrainerConfig.data_transformer_ctor=None + +TrainerConfig.summary_interval=20 +TrainerConfig.num_checkpoints=10 +TrainerConfig.evaluate=True +TrainerConfig.eval_interval=100 +TrainerConfig.num_eval_episodes=50 diff --git a/alf/examples/ddpg_push_states.gin b/alf/examples/ddpg_push_states.gin new file mode 100644 index 000000000..46cb62f0e --- /dev/null +++ b/alf/examples/ddpg_push_states.gin @@ -0,0 +1,4 @@ +include 'ddpg_target_navigation_states.gin' + +import social_bot.tasks +PlayGround.tasks=[@PushReachTask] diff --git a/alf/examples/ddpg_target_navigation_states.gin b/alf/examples/ddpg_target_navigation_states.gin new file mode 100644 index 000000000..24f75b919 --- /dev/null +++ b/alf/examples/ddpg_target_navigation_states.gin @@ -0,0 +1,61 @@ +include 'ac_target_navigation_states.gin' +include 'ddpg.gin' + +# Goal conditioned task setup +GoalTask.success_with_angle_requirement=False +GazeboAgent.goal_conditioned=True +GoalTask.goal_conditioned=True +GoalTask.distraction_penalty_distance_thresh=0.4 +GoalTask.end_episode_after_success=0 +GoalTask.end_on_hitting_distraction=0 +GoalTask.goal_name="target_ball" +GoalTask.max_steps=50 +GoalTask.move_goal_during_episode=0 +GoalTask.multi_dim_reward=True +GoalTask.reset_time_limit_on_success=0 +GoalTask.use_aux_achieved=True +GoalTask.use_curriculum_training=0 +PlayGround.max_steps=50 + +suite_gym.wrap_env.image_channel_first=False + +# Networks +import alf.nest.utils +actor/ActorNetwork.preprocessing_combiner=@NestConcat() +critic/CriticNetwork.observation_preprocessing_combiner=@NestConcat() +critic/CriticNetwork.action_preprocessing_combiner=@NestConcat() +critic/CriticNetwork.output_tensor_spec=@get_reward_spec() + +hidden_layers=(256,256,256) + +value/ValueNetwork.preprocessing_combiner=@NestConcat() +value/ValueNetwork.output_tensor_spec=@get_reward_spec() +value/ValueNetwork.fc_layer_params=%hidden_layers + +actor/ActorNetwork.fc_layer_params=%hidden_layers +critic/CriticNetwork.joint_fc_layer_params=%hidden_layers + +Agent.rl_algorithm_cls=@DdpgAlgorithm +TrainerConfig.algorithm_ctor=@Agent +DdpgAlgorithm.actor_network_ctor=@actor/ActorNetwork +DdpgAlgorithm.critic_network_ctor=@critic/CriticNetwork +DdpgAlgorithm.use_parallel_network=False +DdpgAlgorithm.rollout_random_action=0.3 +DdpgAlgorithm.target_update_period=8 + +# GoalGenerator +observation_spec=@get_observation_spec() + +# TrainerConfig +TrainerConfig.initial_collect_steps=10000 +TrainerConfig.mini_batch_length=2 +TrainerConfig.mini_batch_size=5000 +TrainerConfig.num_updates_per_train_iter=40 +TrainerConfig.replay_buffer_length=200000 + +TrainerConfig.summary_interval=20 +TrainerConfig.num_iterations=1500 +TrainerConfig.num_checkpoints=10 +TrainerConfig.evaluate=True +TrainerConfig.eval_interval=500 +TrainerConfig.num_eval_episodes=50 diff --git a/alf/examples/dqn_breakout_conf-lbtq-Qbert.png b/alf/examples/dqn_breakout_conf-lbtq-Qbert.png new file mode 100644 index 000000000..89e9c8bc2 Binary files /dev/null and b/alf/examples/dqn_breakout_conf-lbtq-Qbert.png differ diff --git a/alf/examples/dqn_breakout_conf.py b/alf/examples/dqn_breakout_conf.py new file mode 100644 index 000000000..22dcee671 --- /dev/null +++ b/alf/examples/dqn_breakout_conf.py @@ -0,0 +1,36 @@ +# Copyright (c) 2022 Horizon Robotics. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: for lower bound value target improvement, add these flags: +# --conf_param='ReplayBuffer.keep_episodic_info=True' +# --conf_param='ReplayBuffer.record_episodic_return=True' +# --conf_param='LowerBoundedTDLoss.lb_target_q=True' + +import alf +from alf.algorithms.dqn_algorithm import DqnAlgorithm +from alf.utils.schedulers import LinearScheduler + +# Much of the network and critic loss parameters are the same as in sac_breakout. +from alf.examples.sac_breakout_conf import q_network_cls, critic_loss_ctor, \ + critic_optimizer + +alf.config( + 'DqnAlgorithm', + q_network_cls=q_network_cls, + rollout_epsilon_greedy=LinearScheduler( + progress_type="percent", schedule=[(0, 1.), (0.1, 0.1), (1., 0.1)]), + critic_loss_ctor=critic_loss_ctor, + q_optimizer=critic_optimizer) + +alf.config('Agent', rl_algorithm_cls=DqnAlgorithm) diff --git a/alf/examples/her_fetchpush_conf.py b/alf/examples/her_fetchpush_conf.py index e4a15ade0..e59092b2f 100644 --- a/alf/examples/her_fetchpush_conf.py +++ b/alf/examples/her_fetchpush_conf.py @@ -16,7 +16,7 @@ from alf.algorithms.data_transformer import HindsightExperienceTransformer, \ ObservationNormalizer from alf.algorithms.ddpg_algorithm import DdpgAlgorithm -from alf.environments import suite_robotics +from alf.environments import suite_robotics, suite_socialbot from alf.nest.utils import NestConcat from alf.examples import ddpg_fetchpush_conf @@ -29,7 +29,10 @@ action_preprocessing_combiner=NestConcat()) alf.config('ReplayBuffer', keep_episodic_info=True) -alf.config('HindsightExperienceTransformer', her_proportion=0.8) +alf.config( + 'HindsightExperienceTransformer', + her_proportion=0.8, + episodic_reward_transform=suite_socialbot.transform_reward_tensor) alf.config( 'TrainerConfig', data_transformer_ctor=[ diff --git a/alf/examples/her_push_states.gin b/alf/examples/her_push_states.gin new file mode 100644 index 000000000..4a56e3aa2 --- /dev/null +++ b/alf/examples/her_push_states.gin @@ -0,0 +1,2 @@ +include 'her_target_navigation_states.gin' +include 'ddpg_push_states.gin' diff --git a/alf/examples/her_target_navigation_states.gin b/alf/examples/her_target_navigation_states.gin index 22aea2506..b20d0753f 100644 --- a/alf/examples/her_target_navigation_states.gin +++ b/alf/examples/her_target_navigation_states.gin @@ -90,7 +90,7 @@ TrainerConfig.num_eval_episodes=50 # HER ReplayBuffer.keep_episodic_info=True HindsightExperienceTransformer.her_proportion=0.8 -l2_dist_close_reward_fn.threshold=0.5 +HindsightExperienceTransformer.episodic_reward_transform=@suite_socialbot.transform_reward_tensor TrainerConfig.data_transformer_ctor=@HindsightExperienceTransformer # Finer grain tensorboard summaries plus local action distribution diff --git a/alf/examples/sac_breakout_conf-lbtq-Qbert.png b/alf/examples/sac_breakout_conf-lbtq-Qbert.png new file mode 100644 index 000000000..f839c95cb Binary files /dev/null and b/alf/examples/sac_breakout_conf-lbtq-Qbert.png differ diff --git a/alf/examples/sac_breakout_conf.py b/alf/examples/sac_breakout_conf.py index e6b163393..8c754d56e 100644 --- a/alf/examples/sac_breakout_conf.py +++ b/alf/examples/sac_breakout_conf.py @@ -12,10 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +# NOTE: to use this on a different atari game, add this flag: +# --conf_param='create_environment.env_name="QbertNoFrameskip-v4"' + +# NOTE: for lower bound value target improvement, add these flags: +# --conf_param='ReplayBuffer.keep_episodic_info=True' +# --conf_param='ReplayBuffer.record_episodic_return=True' +# --conf_param='LowerBoundedTDLoss.lb_target_q=True' + import functools import alf -from alf.algorithms.td_loss import TDLoss +from alf.algorithms.td_loss import LowerBoundedTDLoss from alf.environments.alf_wrappers import AtariTerminalOnLifeLossWrapper from alf.networks import QNetwork from alf.optimizers import AdamTF @@ -42,7 +50,7 @@ def define_config(name, default_value): fc_layer_params=FC_LAYER_PARAMS, conv_layer_params=CONV_LAYER_PARAMS) -critic_loss_ctor = functools.partial(TDLoss, td_lambda=0.95) +critic_loss_ctor = functools.partial(LowerBoundedTDLoss, td_lambda=0.95) lr = define_config('lr', 5e-4) critic_optimizer = AdamTF(lr=lr) @@ -82,7 +90,8 @@ def define_config(name, default_value): num_env_steps=12000000, evaluate=True, num_eval_episodes=100, - num_evals=10, + num_evals=50, + num_eval_environments=20, num_checkpoints=5, num_summaries=100, debug_summaries=True, diff --git a/alf/experience_replayers/replay_buffer.py b/alf/experience_replayers/replay_buffer.py index 6e95fe334..af2da86f3 100644 --- a/alf/experience_replayers/replay_buffer.py +++ b/alf/experience_replayers/replay_buffer.py @@ -29,12 +29,20 @@ from .segment_tree import SumSegmentTree, MaxSegmentTree +# her (Tensor): of shape (batch_size, batch_length) indicating which transitions are relabeled +# with hindsight. +# future_distance (Tensor): of shape (batch_size, batch_length), is the distance from +# the transition's end state to the sampled future state in terms of number of +# environment steps. future_distance[:, 0] == future_distance[:, n], so only the first step +# is accurate. BatchInfo = namedtuple( "BatchInfo", [ "env_ids", "positions", "importance_weights", "replay_buffer", + "her", + "future_distance", "discounted_return", ], default_value=()) @@ -71,6 +79,7 @@ def __init__(self, gamma=.99, reward_clip=None, enable_checkpoint=False, + convert_only_minibatch_to_device=False, name="ReplayBuffer"): """ Args: @@ -118,6 +127,9 @@ def __init__(self, Usually consistent with ``TDLoss.gamma``. reward_clip (tuple|None): None or (min, max) for reward clipping. enable_checkpoint (bool): whether checkpointing this replay buffer. + convert_only_minibatch_to_device (bool): when True, only convert a minibatch + of experience to GPU (if GPU is used), to save GPU memory. Fractional unroll + is also able to save GPU memory, similarly. name (string): name of the replay buffer object. """ super().__init__( @@ -151,6 +163,7 @@ def __init__(self, self._recent_data_steps = recent_data_steps self._recent_data_ratio = recent_data_ratio self._with_replacement = with_replacement + self._convert_only_minibatch_to_device = convert_only_minibatch_to_device if self._keep_episodic_info: # _indexed_pos records for each timestep of experience in the # buffer the raw position of the first step of the episode in @@ -426,7 +439,8 @@ def get_batch(self, batch_size, batch_length): # Taking first timestep's return, to lowerbound training value. disc_ret = self._episodic_discounted_return[(env_ids, idx[:, 0])] info = info._replace(discounted_return=disc_ret) - if alf.get_default_device() != self._device: + if (not self._convert_only_minibatch_to_device + and alf.get_default_device() != self._device): result, info = convert_device((result, info)) info = info._replace(replay_buffer=self) return result, info @@ -836,7 +850,10 @@ def get_field(self, field_name, env_ids, positions): lambda name: alf.nest.get_field(self._buffer, name), field_name) indices = (env_ids, self.circular(positions)) result = alf.nest.map_structure(lambda x: x[indices], field) - return convert_device(result) + if self._convert_only_minibatch_to_device: + return result + else: + return convert_device(result) @property def total_size(self): diff --git a/alf/utils/data_buffer_test.py b/alf/utils/data_buffer_test.py index 28acc99ba..50f06ee61 100644 --- a/alf/utils/data_buffer_test.py +++ b/alf/utils/data_buffer_test.py @@ -29,7 +29,7 @@ DataItem = alf.data_structures.namedtuple( "DataItem", [ "env_id", "x", "o", "reward", "step_type", "batch_info", - "replay_buffer", "rollout_info_field" + "replay_buffer", "rollout_info_field", "discount" ], default_value=()) @@ -40,12 +40,20 @@ def get_batch(env_ids, dim, t, x): batch_size = len(env_ids) x = torch.as_tensor(x, dtype=torch.float32, device="cpu") t = torch.as_tensor(t, dtype=torch.int32, device="cpu") - ox = (x * torch.arange( - batch_size, dtype=torch.float32, requires_grad=True, - device="cpu").unsqueeze(1) * torch.arange( - dim, dtype=torch.float32, requires_grad=True, - device="cpu").unsqueeze(0)) - a = x * torch.ones(batch_size, dtype=torch.float32, device="cpu") + # ox = (x * torch.arange( + # batch_size, dtype=torch.float32, requires_grad=True, + # device="cpu").unsqueeze(1) * torch.arange( + # dim, dtype=torch.float32, requires_grad=True, + # device="cpu").unsqueeze(0)) + if batch_size > 1 and x.ndim > 0 and batch_size == x.shape[0]: + a = x + else: + a = x * torch.ones(batch_size, dtype=torch.float32, device="cpu") + if batch_size > 1 and t.ndim > 0 and batch_size == t.shape[0]: + pass + else: + t = t * torch.ones(batch_size, dtype=torch.int32, device="cpu") + ox = a.unsqueeze(1).clone().requires_grad_(True) g = torch.zeros(batch_size, dtype=torch.float32, device="cpu") # reward function adapted from ReplayBuffer: default_reward_fn r = torch.where( @@ -60,6 +68,10 @@ def get_batch(env_ids, dim, t, x): "a": a, "g": g }), + discount=torch.tensor( + t != alf.data_structures.StepType.LAST, + dtype=torch.float32, + device="cpu"), reward=r) @@ -79,6 +91,7 @@ def __init__(self, *args): "a": alf.TensorSpec(shape=(), dtype=torch.float32), "g": alf.TensorSpec(shape=(), dtype=torch.float32) }), + discount=alf.TensorSpec(shape=(), dtype=torch.float32), reward=alf.TensorSpec(shape=(), dtype=torch.float32)) @parameterized.named_parameters([ diff --git a/alf/utils/math_ops.py b/alf/utils/math_ops.py index 79368c449..136f3e139 100644 --- a/alf/utils/math_ops.py +++ b/alf/utils/math_ops.py @@ -14,6 +14,7 @@ """Various math ops.""" import functools +import numpy as np import torch import torch.nn as nn @@ -457,3 +458,36 @@ def transform(self, x): def inverse_transform(self, y): return y.sign() * ((y / self._alpha).abs().exp() - 1) + + +@alf.configurable +def l2_dist_close_reward_fn(achieved_goal, goal, threshold=.05): + """Giving -1/0 reward based on how close the achieved state is to the goal state. + + Args: + achieved_goal (Tensor): achieved state, of shape ``[batch_size, batch_length, ...]`` + goal (Tensor): goal state, of shape ``[batch_size, batch_length, ...]`` + threshold (float): L2 distance threshold for the reward. + + Returns: + Tensor for -1/0 reward of shape ``[batch_size, batch_length]``. + """ + + if goal.dim() == 2: # when goals are 1-dimentional + assert achieved_goal.dim() == goal.dim() + achieved_goal = achieved_goal.unsqueeze(2) + goal = goal.unsqueeze(2) + return -(torch.norm(achieved_goal - goal, dim=2) >= threshold).to( + torch.float32) + + +@alf.configurable +def l2_dist_close_reward_fn_np(achieved_goal, goal, threshold=.05): + # Only used in non batched cases. + return l2_dist_close_np(achieved_goal, goal, threshold) + + +def l2_dist_close_np(achieved_goal, goal, threshold): + return np.where( + np.linalg.norm(achieved_goal - goal) < threshold, + np.zeros(1, dtype=np.float32), -np.ones(1, dtype=np.float32)) diff --git a/alf/utils/value_ops.py b/alf/utils/value_ops.py index 8c36deff4..443cae9b2 100644 --- a/alf/utils/value_ops.py +++ b/alf/utils/value_ops.py @@ -118,6 +118,146 @@ def action_importance_ratio(action_distribution, return importance_ratio, importance_ratio_clipped +def generalized_advantage_estimation(rewards, + values, + step_types, + discounts, + td_lambda=1.0, + time_major=True): + """Computes generalized advantage estimation (GAE) for the first T-1 steps. + + For theory, see + "High-Dimensional Continuous Control Using Generalized Advantage Estimation" + by John Schulman, Philipp Moritz et al. + See https://arxiv.org/abs/1506.02438 for full paper. + + The difference between this function and the one tf_agents.utils.value_ops + is that the accumulated_td is reset to 0 for is_last steps in this function. + + Define abbreviations: + - B: batch size representing number of trajectories + - T: number of steps per trajectory + + Args: + rewards (Tensor): shape is [T, B] (or [T]) representing rewards. + values (Tensor): shape is [T,B] (or [T]) representing values. + step_types (Tensor): shape is [T,B] (or [T]) representing step types. + discounts (Tensor): shape is [T, B] (or [T]) representing discounts. + td_lambda (float): A scalar between [0, 1]. It's used for variance + reduction in temporal difference. + time_major (bool): Whether input tensors are time major. + False means input tensors have shape [B, T]. + + Returns: + A tensor with shape [T-1, B] representing advantages. Shape is [B, T-1] + when time_major is false. + """ + + if not time_major: + discounts = discounts.transpose(0, 1) + rewards = rewards.transpose(0, 1) + values = values.transpose(0, 1) + step_types = step_types.transpose(0, 1) + + assert values.shape[0] >= 2, ("The sequence length needs to be " + "at least 2. Got {s}".format( + s=values.shape[0])) + + is_lasts = (step_types == StepType.LAST).to(dtype=torch.float32) + is_lasts = common.expand_dims_as(is_lasts, values) + discounts = common.expand_dims_as(discounts, values) + + weighted_discounts = discounts[1:] * td_lambda + + advs = torch.zeros_like(values) + delta = rewards[1:] + discounts[1:] * values[1:] - values[:-1] + + with torch.no_grad(): + for t in reversed(range(rewards.shape[0] - 1)): + advs[t] = (1 - is_lasts[t]) * \ + (delta[t] + weighted_discounts[t] * advs[t + 1]) + advs = advs[:-1] + + if not time_major: + advs = advs.transpose(0, 1) + + return advs.detach() + + +def generalized_advantage_estimation_retrace(rewards, + values, + step_types, + discounts, + target_value, + importance_ratio, + use_retrace=False, + td_lambda=1.0, + time_major=True): + """Computes generalized advantage estimation (GAE) with Retrace for the first T-1 steps. + + For Retrace, see + "Safe and Efficient Off-Policy Reinforcement Learning" + by Remi Munos, Tom Stepleton, Anna Harutyunyan, and Marc G. Bellemare, NeurIPS 2016. + See https://proceedings.neurips.cc/paper/2016/hash/c3992e9a68c5ae12bd18488bc579b30d-Abstract.html for full paper. + + Args: + rewards (Tensor): shape is [T, B] (or [T]) representing rewards. + values (Tensor): shape is [T,B] (or [T]) representing values. + step_types (Tensor): shape is [T,B] (or [T]) representing step types. + discounts (Tensor): shape is [T, B] (or [T]) representing discounts. + importance_ratio (Tensor): shape is [T, B] (or [T]) representing action importance ratios. + use_retrace (bool): When True, uses Retrace to compute advantage. + td_lambda (float): A scalar between [0, 1]. It's used for variance + reduction in temporal difference. + time_major (bool): Whether input tensors are time major. + False means input tensors have shape [B, T]. + + Returns: + A tensor with shape [T-1, B] representing advantages. Shape is [B, T-1] + when time_major is false. + """ + + if not time_major: + discounts = discounts.transpose(0, 1) + rewards = rewards.transpose(0, 1) + values = values.transpose(0, 1) + step_types = step_types.transpose(0, 1) + if use_retrace: + importance_ratio = importance_ratio.transpose(0, 1) + target_value = target_value.transpose(0, 1) + + assert values.shape[0] >= 2, ("The sequence length needs to be " + "at least 2. Got {s}".format( + s=values.shape[0])) + + is_lasts = (step_types == StepType.LAST).to(dtype=torch.float32) + is_lasts = common.expand_dims_as(is_lasts, values) + discounts = common.expand_dims_as(discounts, values) + + advs = torch.zeros_like(values) + if use_retrace == False: + weighted_discounts = discounts[1:] * td_lambda + delta = rewards[1:] + discounts[1:] * values[1:] - values[:-1] + with torch.no_grad(): + for t in reversed(range(rewards.shape[0] - 1)): + advs[t] = (1 - is_lasts[t]) * \ + (delta[t] + weighted_discounts[t] * advs[t + 1]) + advs = advs[:-1] + else: + delta = (rewards[1:] + discounts[1:] * target_value[1:] - values[:-1]) + weighted_discounts = discounts[1:] * td_lambda * importance_ratio[:-1] + with torch.no_grad(): + for t in reversed(range(rewards.shape[0] - 1)): + advs[t] = (1 - is_lasts[t]) * \ + (delta[t] + weighted_discounts[t] * advs[t + 1]) + advs = advs[:-1] + + if not time_major: + advs = advs.transpose(0, 1) + + return advs.detach() + + def discounted_return(rewards, values, step_types, discounts, time_major=True): """Computes discounted return for the first T-1 steps. @@ -180,24 +320,36 @@ def discounted_return(rewards, values, step_types, discounts, time_major=True): return rets.detach() -def one_step_discounted_return(rewards, values, step_types, discounts): - """Calculate the one step discounted return for the first T-1 steps. +def first_step_future_discounted_returns(rewards, + values, + step_types, + discounts, + time_major=True): + """Computes future 1 to n step discounted returns for the first step. - return = next_reward + next_discount * next_value if is not the last step; - otherwise will set return = current_discount * current_value. + Define abbreviations: + + - B: batch size representing number of trajectories + - T: number of steps per trajectory - Note: Input tensors must be time major Args: rewards (Tensor): shape is [T, B] (or [T]) representing rewards. - values (Tensor): shape is [T, B] (or [T]) when representing values, - [T, B, n_quantiles] or [T, n_quantiles] when representing quantiles - of value distributions. - step_types (Tensor): shape is [T, B] (or [T]) representing step types. + values (Tensor): shape is [T,B] (or [T]) representing values. + step_types (Tensor): shape is [T,B] (or [T]) representing step types. discounts (Tensor): shape is [T, B] (or [T]) representing discounts. + time_major (bool): Whether input tensors are time major. + False means input tensors have shape [B, T]. + Returns: A tensor with shape [T-1, B] (or [T-1]) representing the discounted - returns. + returns. Shape is [B, T-1] when time_major is false. """ + if not time_major: + discounts = discounts.transpose(0, 1) + rewards = rewards.transpose(0, 1) + values = values.transpose(0, 1) + step_types = step_types.transpose(0, 1) + assert values.shape[0] >= 2, ("The sequence length needs to be " "at least 2. Got {s}".format( s=values.shape[0])) @@ -205,56 +357,50 @@ def one_step_discounted_return(rewards, values, step_types, discounts): is_lasts = (step_types == StepType.LAST).to(dtype=torch.float32) is_lasts = common.expand_dims_as(is_lasts, values) discounts = common.expand_dims_as(discounts, values) - rewards = common.expand_dims_as(rewards, values) - discounted_values = discounts * values - rets = (1 - is_lasts[:-1]) * (rewards[1:] + discounted_values[1:]) + \ - is_lasts[:-1] * discounted_values[:-1] - return rets.detach() + accw = torch.ones_like(values) + accw[0] = (1 - is_lasts[0]) * discounts[1] + rets = torch.zeros_like(values) + rets[0] = rewards[1] * (1 - is_lasts[0]) + accw[0] * values[1] + # When ith is LAST, v[i+1] shouldn't be used in computing ret[i]. When disc[i] == 0, v[i] isn't used in computing ret[i-1]. + # when 2nd is LAST, ret[0] = r[1] + disc[1] * v[1], ret[1] = r[1] + disc[1] * (r[2] + disc[2] * v[2]), ret[2] = r[1] + disc[1] * (r[2] + disc[2] * v[2]) + # r[t] = (1 - is_last[t]) * reward[t + 1] + # acc_return_to[t] = acc_return_to[t - 1] + r[t] + # bootstrapped_return[t] = r[t] + (1 - is_last[t + 1]) * discounts[t + 1] * v[t + 1] + with torch.no_grad(): + for t in range(rewards.shape[0] - 2): + accw[t + 1] = accw[t] * (1 - is_lasts[t + 1]) * discounts[t + 2] + rets[t + 1] = ( + rets[t] + rewards[t + 2] * (1 - is_lasts[t + 1]) * accw[t] + + values[t + 2] * accw[t + 1] - + accw[t] * values[t + 1] * (1 - is_lasts[t + 1])) + rets = rets[:-1] -def generalized_advantage_estimation(rewards, - values, - step_types, - discounts, - td_lambda=1.0, - time_major=True): - """Computes generalized advantage estimation (GAE) for the first T-1 steps. + if not time_major: + rets = rets.transpose(0, 1) - For theory, see - "High-Dimensional Continuous Control Using Generalized Advantage Estimation" - by John Schulman, Philipp Moritz et al. - See https://arxiv.org/abs/1506.02438 for full paper. + return rets.detach() - The difference between this function and the one tf_agents.utils.value_ops - is that the accumulated_td is reset to 0 for is_last steps in this function. - Define abbreviations: +def one_step_discounted_return(rewards, values, step_types, discounts): + """Calculate the one step discounted return for the first T-1 steps. - - B: batch size representing number of trajectories - - T: number of steps per trajectory + return = next_reward + next_discount * next_value if is not the last step; + otherwise will set return = current_discount * current_value. + Note: Input tensors must be time major Args: rewards (Tensor): shape is [T, B] (or [T]) representing rewards. - values (Tensor): shape is [T,B] (or [T]) representing values. - step_types (Tensor): shape is [T,B] (or [T]) representing step types. + values (Tensor): shape is [T, B] (or [T]) when representing values, + [T, B, n_quantiles] or [T, n_quantiles] when representing quantiles + of value distributions. + step_types (Tensor): shape is [T, B] (or [T]) representing step types. discounts (Tensor): shape is [T, B] (or [T]) representing discounts. - td_lambda (float): A scalar between [0, 1]. It's used for variance - reduction in temporal difference. - time_major (bool): Whether input tensors are time major. - False means input tensors have shape [B, T]. - Returns: - A tensor with shape [T-1, B] representing advantages. Shape is [B, T-1] - when time_major is false. + A tensor with shape [T-1, B] (or [T-1]) representing the discounted + returns. """ - - if not time_major: - discounts = discounts.transpose(0, 1) - rewards = rewards.transpose(0, 1) - values = values.transpose(0, 1) - step_types = step_types.transpose(0, 1) - assert values.shape[0] >= 2, ("The sequence length needs to be " "at least 2. Got {s}".format( s=values.shape[0])) @@ -262,19 +408,9 @@ def generalized_advantage_estimation(rewards, is_lasts = (step_types == StepType.LAST).to(dtype=torch.float32) is_lasts = common.expand_dims_as(is_lasts, values) discounts = common.expand_dims_as(discounts, values) + rewards = common.expand_dims_as(rewards, values) - weighted_discounts = discounts[1:] * td_lambda - - advs = torch.zeros_like(values) - delta = rewards[1:] + discounts[1:] * values[1:] - values[:-1] - - with torch.no_grad(): - for t in reversed(range(rewards.shape[0] - 1)): - advs[t] = (1 - is_lasts[t]) * \ - (delta[t] + weighted_discounts[t] * advs[t + 1]) - advs = advs[:-1] - - if not time_major: - advs = advs.transpose(0, 1) - - return advs.detach() + discounted_values = discounts * values + rets = (1 - is_lasts[:-1]) * (rewards[1:] + discounted_values[1:]) + \ + is_lasts[:-1] * discounted_values[:-1] + return rets.detach() diff --git a/alf/utils/value_ops_test.py b/alf/utils/value_ops_test.py index ebd526127..6477edbb2 100644 --- a/alf/utils/value_ops_test.py +++ b/alf/utils/value_ops_test.py @@ -23,23 +23,46 @@ class DiscountedReturnTest(unittest.TestCase): """Tests for alf.utils.value_ops.discounted_return """ - def _check(self, rewards, values, step_types, discounts, expected): - np.testing.assert_array_almost_equal( - value_ops.discounted_return( + def _check(self, + rewards, + values, + step_types, + discounts, + expected, + future=False): + if future: + res = value_ops.first_step_future_discounted_returns( rewards=rewards, values=values, step_types=step_types, discounts=discounts, - time_major=False), expected) + time_major=False) + else: + res = value_ops.discounted_return( + rewards=rewards, + values=values, + step_types=step_types, + discounts=discounts, + time_major=False) - np.testing.assert_array_almost_equal( - value_ops.discounted_return( + np.testing.assert_array_almost_equal(res, expected) + + if future: + res = value_ops.first_step_future_discounted_returns( rewards=torch.stack([rewards, 2 * rewards], dim=2), values=torch.stack([values, 2 * values], dim=2), step_types=step_types, discounts=discounts, - time_major=False), torch.stack([expected, 2 * expected], - dim=2)) + time_major=False) + else: + res = value_ops.discounted_return( + rewards=torch.stack([rewards, 2 * rewards], dim=2), + values=torch.stack([values, 2 * values], dim=2), + step_types=step_types, + discounts=discounts, + time_major=False) + np.testing.assert_array_almost_equal( + res, torch.stack([expected, 2 * expected], dim=2)) def test_discounted_return(self): values = torch.tensor([[1.] * 5], dtype=torch.float32) @@ -74,7 +97,7 @@ def test_discounted_return(self): discounts=discounts, expected=expected) - # tow episodes, and end normal (discount=0) + # two episodes, and end normal (discount=0) step_types = torch.tensor([[ StepType.MID, StepType.MID, StepType.LAST, StepType.MID, StepType.MID @@ -91,6 +114,100 @@ def test_discounted_return(self): discounts=discounts, expected=expected) + def test_first_step_future_discounted_returns(self): + values = torch.tensor([[1.] * 5], dtype=torch.float32) + step_types = torch.tensor([[StepType.MID] * 5], dtype=torch.int64) + rewards = torch.tensor([[2.] * 5], dtype=torch.float32) + discounts = torch.tensor([[0.9] * 5], dtype=torch.float32) + expected = torch.tensor([[ + 2 + 0.9, 2 + 0.9 * (2 + 0.9), 2 + 0.9 * (2 + 0.9 * (2 + 0.9)), + 2 + 0.9 * (2 + 0.9 * (2 + 0.9 * (2 + 0.9))) + ]], + dtype=torch.float32) + self._check( + rewards=rewards, + values=values, + step_types=step_types, + discounts=discounts, + expected=expected, + future=True) + + # two episodes, and exceed by time limit (discount=1) + step_types = torch.tensor([[ + StepType.MID, StepType.MID, StepType.LAST, StepType.MID, + StepType.MID + ]], + dtype=torch.int32) + expected = torch.tensor([[ + 2 + 0.9, 2 + 0.9 * (2 + 0.9), 2 + 0.9 * (2 + 0.9), + 2 + 0.9 * (2 + 0.9) + ]], + dtype=torch.float32) + self._check( + rewards=rewards, + values=values, + step_types=step_types, + discounts=discounts, + expected=expected, + future=True) + + # two episodes, and end normal (discount=0) + step_types = torch.tensor([[ + StepType.MID, StepType.MID, StepType.LAST, StepType.MID, + StepType.MID + ]], + dtype=torch.int32) + discounts = torch.tensor([[0.9, 0.9, 0.0, 0.9, 0.9]]) + expected = torch.tensor( + [[2 + 0.9, 2 + 0.9 * 2, 2 + 0.9 * 2, 2 + 0.9 * 2]], + dtype=torch.float32) + + self._check( + rewards=rewards, + values=values, + step_types=step_types, + discounts=discounts, + expected=expected, + future=True) + + # two episodes with discount 0 LAST. + values = torch.tensor([[1.] * 5], dtype=torch.float32) + step_types = torch.tensor([[ + StepType.MID, StepType.LAST, StepType.LAST, StepType.MID, + StepType.MID + ]], + dtype=torch.int32) + rewards = torch.tensor([[2.] * 5], dtype=torch.float32) + discounts = torch.tensor([[0.9, 0.0, 0.0, 0.9, 0.9]]) + expected = torch.tensor([[2, 2, 2, 2]], dtype=torch.float32) + + self._check( + rewards=rewards, + values=values, + step_types=step_types, + discounts=discounts, + expected=expected, + future=True) + + # two episodes with discount 0 LAST. + values = torch.tensor([[1.] * 5], dtype=torch.float32) + step_types = torch.tensor([[ + StepType.LAST, StepType.LAST, StepType.LAST, StepType.MID, + StepType.MID + ]], + dtype=torch.int32) + rewards = torch.tensor([[2.] * 5], dtype=torch.float32) + discounts = torch.tensor([[0.0, 0.0, 0.0, 0.9, 0.9]]) + expected = torch.tensor([[0, 0, 0, 0]], dtype=torch.float32) + + self._check( + rewards=rewards, + values=values, + step_types=step_types, + discounts=discounts, + expected=expected, + future=True) + class GeneralizedAdvantageTest(unittest.TestCase): """Tests for alf.utils.value_ops.generalized_advantage_estimation