Source code for meta_policy_search.samplers.base

from meta_policy_search.utils import utils, logger
import numpy as np


[docs]class Sampler(object): """ Sampler interface Args: env (gym.Env) : environment object policy (meta_policy_search.policies.policy) : policy object batch_size (int) : number of trajectories per task max_path_length (int) : max number of steps per trajectory """ def __init__(self, env, policy, batch_size, max_path_length): assert hasattr(env, 'reset') and hasattr(env, 'step') self.env = env self.policy = policy self.batch_size = batch_size self.max_path_length = max_path_length
[docs] def obtain_samples(self): """ Collect batch_size trajectories Returns: (list) : A list of paths. """ raise NotImplementedError
[docs]class SampleProcessor(object): """ Sample processor interface - fits a reward baseline (use zero baseline to skip this step) - performs Generalized Advantage Estimation to provide advantages (see Schulman et al. 2015 - https://arxiv.org/abs/1506.02438) Args: baseline (Baseline) : a reward baseline object discount (float) : reward discount factor gae_lambda (float) : Generalized Advantage Estimation lambda normalize_adv (bool) : indicates whether to normalize the estimated advantages (zero mean and unit std) positive_adv (bool) : indicates whether to shift the (normalized) advantages so that they are all positive """ def __init__( self, baseline, discount=0.99, gae_lambda=1, normalize_adv=False, positive_adv=False, ): assert 0 <= discount <= 1.0, 'discount factor must be in [0,1]' assert 0 <= gae_lambda <= 1.0, 'gae_lambda must be in [0,1]' assert hasattr(baseline, 'fit') and hasattr(baseline, 'predict') self.baseline = baseline self.discount = discount self.gae_lambda = gae_lambda self.normalize_adv = normalize_adv self.positive_adv = positive_adv
[docs] def process_samples(self, paths, log=False, log_prefix=''): """ Processes sampled paths. This involves: - computing discounted rewards (returns) - fitting baseline estimator using the path returns and predicting the return baselines - estimating the advantages using GAE (+ advantage normalization id desired) - stacking the path data - logging statistics of the paths Args: paths (list): A list of paths of size (batch_size) x [5] x (max_path_length) log (boolean): indicates whether to log log_prefix (str): prefix for the logging keys Returns: (dict) : Processed sample data of size [7] x (batch_size x max_path_length) """ assert type(paths) == list, 'paths must be a list' assert paths[0].keys() >= {'observations', 'actions', 'rewards'} assert self.baseline, 'baseline must be specified - use self.build_sample_processor(baseline_obj)' # fits baseline, compute advantages and stack path data samples_data, paths = self._compute_samples_data(paths) # 7) log statistics if desired self._log_path_stats(paths, log=log, log_prefix='') assert samples_data.keys() >= {'observations', 'actions', 'rewards', 'advantages', 'returns'} return samples_data
""" helper functions """ def _compute_samples_data(self, paths): assert type(paths) == list # 1) compute discounted rewards (returns) for idx, path in enumerate(paths): path["returns"] = utils.discount_cumsum(path["rewards"], self.discount) # 2) fit baseline estimator using the path returns and predict the return baselines self.baseline.fit(paths, target_key="returns") all_path_baselines = [self.baseline.predict(path) for path in paths] # 3) compute advantages and adjusted rewards paths = self._compute_advantages(paths, all_path_baselines) # 4) stack path data observations, actions, rewards, returns, advantages, env_infos, agent_infos = self._stack_path_data(paths) # 5) if desired normalize / shift advantages if self.normalize_adv: advantages = utils.normalize_advantages(advantages) if self.positive_adv: advantages = utils.shift_advantages_to_positive(advantages) # 6) create samples_data object samples_data = dict( observations=observations, actions=actions, rewards=rewards, returns=returns, advantages=advantages, env_infos=env_infos, agent_infos=agent_infos, ) return samples_data, paths def _log_path_stats(self, paths, log=False, log_prefix=''): # compute log stats average_discounted_return = np.mean([path["returns"][0] for path in paths]) undiscounted_returns = [sum(path["rewards"]) for path in paths] if log == 'reward': logger.logkv(log_prefix + 'AverageReturn', np.mean(undiscounted_returns)) elif log == 'all' or log is True: logger.logkv(log_prefix + 'AverageDiscountedReturn', average_discounted_return) logger.logkv(log_prefix + 'AverageReturn', np.mean(undiscounted_returns)) logger.logkv(log_prefix + 'NumTrajs', len(paths)) logger.logkv(log_prefix + 'StdReturn', np.std(undiscounted_returns)) logger.logkv(log_prefix + 'MaxReturn', np.max(undiscounted_returns)) logger.logkv(log_prefix + 'MinReturn', np.min(undiscounted_returns)) def _compute_advantages(self, paths, all_path_baselines): assert len(paths) == len(all_path_baselines) for idx, path in enumerate(paths): path_baselines = np.append(all_path_baselines[idx], 0) deltas = path["rewards"] + \ self.discount * path_baselines[1:] - \ path_baselines[:-1] path["advantages"] = utils.discount_cumsum( deltas, self.discount * self.gae_lambda) return paths def _stack_path_data(self, paths): observations = np.concatenate([path["observations"] for path in paths]) actions = np.concatenate([path["actions"] for path in paths]) rewards = np.concatenate([path["rewards"] for path in paths]) returns = np.concatenate([path["returns"] for path in paths]) advantages = np.concatenate([path["advantages"] for path in paths]) env_infos = utils.concat_tensor_dict_list([path["env_infos"] for path in paths]) agent_infos = utils.concat_tensor_dict_list([path["agent_infos"] for path in paths]) return observations, actions, rewards, returns, advantages, env_infos, agent_infos