Source code for meta_policy_search.samplers.dice_sample_processor

from meta_policy_search.utils import utils, logger
from meta_policy_search.samplers.base import SampleProcessor
import numpy as np


[docs]class DiceSampleProcessor(SampleProcessor): """ Sample processor for DICE implementations - fits a reward baseline (use zero baseline to skip this step) - computes adjusted rewards (reward - baseline) - normalize adjusted rewards if desired - zero-pads paths to max_path_length - stacks the padded path data Args: baseline (Baseline) : a time dependent reward baseline object max_path_length (int): maximum path length discount (float) : reward discount factor normalize_adv (bool) : indicates whether to normalize the estimated advantages (zero mean and unit std) positive_adv (bool) : indicates whether to shift the (normalized) advantages so that they are all positive return_baseline (Baseline): (optional) a state(-time) dependent baseline - if provided it is also fitted and used to calculate GAE advantage estimates """ def __init__( self, baseline, max_path_length, discount=0.99, gae_lambda=1, normalize_adv=True, positive_adv=False, return_baseline=None ): assert 0 <= discount <= 1.0, 'discount factor must be in [0,1]' assert max_path_length > 0 assert hasattr(baseline, 'fit') and hasattr(baseline, 'predict') self.max_path_length = max_path_length self.baseline = baseline self.discount = discount self.gae_lambda = gae_lambda self.normalize_adv = normalize_adv self.positive_adv = positive_adv self.return_baseline = return_baseline
[docs] def process_samples(self, paths, log=False, log_prefix=''): """ Processes sampled paths, This involves: - computing discounted rewards - fitting a reward baseline - computing adjusted rewards (reward - baseline) - normalizing adjusted rewards if desired - stacking the padded path data - creating a mask which indicates padded values by zero and original values by one - logging statistics of the paths Args: paths (list): A list of paths of size (batch_size) x [5] x (max_path_length) log (boolean): indicates whether to log log_prefix (str): prefix for the logging keys Returns: (dict) : Processed sample data. A dict containing the following items with respective shapes: - mask: (batch_size, max_path_length) - observations: (batch_size, max_path_length, ndim_act) - actions: (batch_size, max_path_length, ndim_obs) - rewards: (batch_size, max_path_length) - adjusted_rewards: (batch_size, max_path_length) - env_infos: dict of ndarrays of shape (batch_size, max_path_length, ?) - agent_infos: dict of ndarrays of shape (batch_size, max_path_length, ?) """ assert type(paths) == list, 'paths must be a list' assert paths[0].keys() >= {'observations', 'actions', 'rewards'} assert self.baseline, 'baseline must be specified - use self.build_sample_processor(baseline_obj)' # fits baseline, compute advantages and stack path data samples_data, paths = self._compute_samples_data(paths) # 7) log statistics if desired self._log_path_stats(paths, log=log, log_prefix='') assert samples_data.keys() >= {'observations', 'actions', 'rewards', 'adjusted_rewards', 'mask'} return samples_data
""" helper functions """ def _compute_samples_data(self, paths): assert type(paths) == list # 1) compute discounted rewards and return paths = self._compute_discounted_rewards(paths) # 2) fit baseline estimator using the path returns and predict the return baselines self.baseline.fit(paths, target_key='discounted_rewards') all_path_baselines = [self.baseline.predict(path) for path in paths] # 3) compute adjusted rewards (r - b) paths = self._compute_adjusted_rewards(paths, all_path_baselines) # 4) stack path data mask, observations, actions, rewards, adjusted_rewards, env_infos, agent_infos = self._pad_and_stack_paths(paths) # 5) if desired normalize / shift adjusted_rewards if self.normalize_adv: adjusted_rewards = utils.normalize_advantages(adjusted_rewards) if self.positive_adv: adjusted_rewards = utils.shift_advantages_to_positive(adjusted_rewards) # 6) create samples_data object samples_data = dict( mask=mask, observations=observations, actions=actions, rewards=rewards, env_infos=env_infos, agent_infos=agent_infos, adjusted_rewards=adjusted_rewards, ) # if return baseline is provided also compute GAE advantage estimates if self.return_baseline is not None: paths, advantages = self._fit_reward_baseline_compute_advantages(paths) samples_data['advantages'] = advantages return samples_data, paths def _log_path_stats(self, paths, log=False, log_prefix=''): # compute log stats average_discounted_return = [sum(path["discounted_rewards"]) for path in paths] undiscounted_returns = [sum(path["rewards"]) for path in paths] if log == 'reward': logger.logkv(log_prefix + 'AverageReturn', np.mean(undiscounted_returns)) elif log == 'all' or log is True: logger.logkv(log_prefix + 'AverageDiscountedReturn', np.mean(average_discounted_return)) logger.logkv(log_prefix + 'AverageReturn', np.mean(undiscounted_returns)) logger.logkv(log_prefix + 'NumTrajs', len(paths)) logger.logkv(log_prefix + 'StdReturn', np.std(undiscounted_returns)) logger.logkv(log_prefix + 'MaxReturn', np.max(undiscounted_returns)) logger.logkv(log_prefix + 'MinReturn', np.min(undiscounted_returns)) def _compute_discounted_rewards(self, paths): discount_array = np.cumprod(np.concatenate([np.ones(1), np.ones(self.max_path_length - 1) * self.discount])) for path in paths: path_length = path['rewards'].shape[0] path["discounted_rewards"] = path['rewards'] * discount_array[:path_length] return paths def _compute_adjusted_rewards(self, paths, all_path_baselines): assert len(paths) == len(all_path_baselines) for idx, path in enumerate(paths): path_baselines = all_path_baselines[idx] deltas = path["discounted_rewards"] - path_baselines path["adjusted_rewards"] = deltas return paths def _pad_and_stack_paths(self, paths): mask, observations, actions, rewards, adjusted_rewards, env_infos, agent_infos = [], [], [], [], [], [], [] for path in paths: # zero-pad paths if they don't have full length + create mask path_length = path["observations"].shape[0] assert self.max_path_length >= path_length mask.append(self._pad(np.ones(path_length), path_length)) observations.append(self._pad(path["observations"], path_length)) actions.append(self._pad(path["actions"], path_length)) rewards.append(self._pad(path["rewards"], path_length)) adjusted_rewards.append(self._pad(path["adjusted_rewards"], path_length)) env_infos.append(dict([(key, self._pad(array, path_length)) for key, array in path["env_infos"].items()])) agent_infos.append((dict([(key, self._pad(array, path_length)) for key, array in path["agent_infos"].items()]))) # stack mask = np.stack(mask, axis=0) # shape: (batch_size, max_path_length) observations = np.stack(observations, axis=0) # shape: (batch_size, max_path_length, ndim_act) actions = np.stack(actions, axis=0) # shape: (batch_size, max_path_length, ndim_obs) rewards = np.stack(rewards, axis=0) # shape: (batch_size, max_path_length) adjusted_rewards = np.stack(adjusted_rewards, axis=0) # shape: (batch_size, max_path_length) env_infos = utils.stack_tensor_dict_list(env_infos) # dict of ndarrays of shape: (batch_size, max_path_length, ?) agent_infos = utils.stack_tensor_dict_list(agent_infos) # dict of ndarrays of shape: (batch_size, max_path_length, ?) return mask, observations, actions, rewards, adjusted_rewards, env_infos, agent_infos def _pad(self, array, path_length): assert path_length == array.shape[0] if array.ndim == 2: return np.pad(array, ((0, self.max_path_length - path_length), (0, 0)), mode='constant') elif array.ndim == 1: return np.pad(array, (0, self.max_path_length - path_length), mode='constant') else: raise NotImplementedError def _fit_reward_baseline_compute_advantages(self, paths): """ only to be called if return_baseline is provided. Computes GAE advantage estimates """ assert self.return_baseline is not None # a) compute returns for idx, path in enumerate(paths): path["returns"] = utils.discount_cumsum(path["rewards"], self.discount) # b) fit return baseline estimator using the path returns and predict the return baselines self.return_baseline.fit(paths, target_key='returns') all_path_baselines = [self.return_baseline.predict(path) for path in paths] # c) generalized advantage estimation for idx, path in enumerate(paths): path_baselines = np.append(all_path_baselines[idx], 0) deltas = path["rewards"] + \ self.discount * path_baselines[1:] - \ path_baselines[:-1] path["advantages"] = utils.discount_cumsum( deltas, self.discount * self.gae_lambda) # d) pad paths and stack them advantages = [] for path in paths: path_length = path["observations"].shape[0] advantages.append(self._pad(path["advantages"], path_length)) advantages = np.stack(advantages, axis=0) # e) desired normalize / shift advantages if self.normalize_adv: advantages = utils.normalize_advantages(advantages) if self.positive_adv: advantages = utils.shift_advantages_to_positive(advantages) return paths, advantages