1 2""" 3PPO: Proximal Policy Optimization 4with data parallelism 5""" 6from typing import List, Tuple 7import sys 8sys.path.append('../envs') 9import chtrain as gym 10import numpy as np 11from policy import Policy 12from value_function import NNValueFunction 13import scipy.signal 14from utils import Logger, Scaler 15from datetime import datetime 16import argparse 17import signal 18from multiprocessing import Pool 19import run_episode 20 21"""" THIS VARIABLE TOGGLES TIME IN STATE. 22 REMEMBER THAT CHECKPOINT ARE NOT COMPATIBLE IF THE NUMBER OF STATE IS INCONSISTENT 23 """ 24time_state = True 25 26# guard in the main module to avoid creating subprocesses recursively. 27if __name__ == "__main__": 28 pool = Pool(processes=6) 29 30class GracefulKiller: 31 """ Gracefully exit program on CTRL-C """ 32 def __init__(self): 33 self.kill_now = False 34 signal.signal(signal.SIGINT, self.exit_gracefully) 35 signal.signal(signal.SIGTERM, self.exit_gracefully) 36 37 def exit_gracefully(self, signum, frame): 38 self.kill_now = True 39 40 41def init_gym(env_name, render): 42 """ 43 Initialize gym environment, return dimension of observation 44 and action spaces. 45 46 Args: 47 render: True to toggle on visualization 48 49 Returns: 3-tuple 50 environment (object) 51 number of observation dimensions (int) 52 number of action dimensions (int) 53 """ 54 env = gym.Init(env_name, render) 55 obs_dim = env.observation_space.shape[0] 56 act_dim = env.action_space.shape[0] 57 58 return env, obs_dim, act_dim 59 60 61 62 63def run_policy(env, policy, scaler, logger, args, episodes): 64 """ Run policy and collect data 65 66 Args: 67 env: environment (object) 68 policy: policy object with sample() method 69 scaler: scaler object, scales/offsets each observation 70 logger: logger object, used to save stats from episodes 71 episodes: total episodes to run 72 73 Returns: list of trajectory dictionaries, list length = number of episodes 74 'observes' : NumPy array of states from episode 75 'actions' : NumPy array of actions from episode 76 'rewards' : NumPy array of (un-discounted) rewards from episode 77 'unscaled_obs' : NumPy array of (un-scaled) states from episode 78 """ 79 arg = [] 80 for i in range(episodes): 81 arg.append(args) 82 trajectories = pool.map(run_episode.run_parallel_episodes, arg) 83 84 unscaled = np.concatenate([t['unscaled_obs'] for t in trajectories]) 85 scaler.update(unscaled) # update running statistics for scaling observations 86 logger.log({'_MeanReward': np.mean([t['rewards'].sum() for t in trajectories])}) 87 88 return trajectories 89 90 91def discount(x, gamma): 92 """ Calculate discounted forward sum of a sequence at each point """ 93 return scipy.signal.lfilter([1.0], [1.0, -gamma], x[::-1])[::-1] 94 95 96def add_disc_sum_rew(trajectories, gamma): 97 """ Adds discounted sum of rewards to all time steps of all trajectories 98 99 Args: 100 trajectories: as returned by run_policy() 101 gamma: discount 102 103 Returns: 104 None (mutates trajectories dictionary to add 'disc_sum_rew') 105 """ 106 for trajectory in trajectories: 107 if gamma < 0.999: # don't scale for gamma ~= 1 108 rewards = trajectory['rewards'] * (1 - gamma) 109 else: 110 rewards = trajectory['rewards'] 111 disc_sum_rew = discount(rewards, gamma) 112 trajectory['disc_sum_rew'] = disc_sum_rew 113 114 115def add_value(trajectories, val_func): 116 """ Adds estimated value to all time steps of all trajectories 117 118 Args: 119 trajectories: as returned by run_policy() 120 val_func: object with predict() method, takes observations 121 and returns predicted state value 122 123 Returns: 124 None (mutates trajectories dictionary to add 'values') 125 """ 126 for trajectory in trajectories: 127 observes = trajectory['observes'] 128 values = val_func.predict(observes) 129 trajectory['values'] = values 130 131 132def add_gae(trajectories, gamma, lam): 133 """ Add generalized advantage estimator. 134 https://arxiv.org/pdf/1506.02438.pdf 135 136 Args: 137 trajectories: as returned by run_policy(), must include 'values' 138 key from add_value(). 139 gamma: reward discount 140 lam: lambda (see paper). 141 lam=0 : use TD residuals 142 lam=1 : A = Sum Discounted Rewards - V_hat(s) 143 144 Returns: 145 None (mutates trajectories dictionary to add 'advantages') 146 """ 147 for trajectory in trajectories: 148 if gamma < 0.999: # don't scale for gamma ~= 1 149 rewards = trajectory['rewards'] * (1 - gamma) 150 else: 151 rewards = trajectory['rewards'] 152 values = trajectory['values'] 153 # temporal differences 154 # values[1:] deletes the first element (Vo) and attachs a 0 at the end (the future state value function at the end of the trajectory is 0) 155 # r - Vs + gamma*Vst+1 156 tds = rewards - values + np.append(values[1:] * gamma, 0) 157 advantages = discount(tds, gamma * lam) 158 trajectory['advantages'] = advantages 159 160 161def build_train_set(trajectories): 162 """ 163 164 Args: 165 trajectories: trajectories after processing by add_disc_sum_rew(), 166 add_value(), and add_gae() 167 168 Returns: 4-tuple of NumPy arrays 169 observes: shape = (N, obs_dim) 170 actions: shape = (N, act_dim) 171 advantages: shape = (N,) 172 disc_sum_rew: shape = (N,) 173 """ 174 observes = np.concatenate([t['observes'] for t in trajectories]) 175 actions = np.concatenate([t['actions'] for t in trajectories]) 176 disc_sum_rew = np.concatenate([t['disc_sum_rew'] for t in trajectories]) 177 advantages = np.concatenate([t['advantages'] for t in trajectories]) 178 # normalize advantages 179 advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-6) 180 181 return observes, actions, advantages, disc_sum_rew 182 183 184def log_batch_stats(observes, actions, advantages, disc_sum_rew, logger, episode): 185 """ Log various batch statistics """ 186 logger.log({'_mean_obs': np.mean(observes), 187 '_min_obs': np.min(observes), 188 '_max_obs': np.max(observes), 189 '_std_obs': np.mean(np.var(observes, axis=0)), 190 '_mean_act': np.mean(actions), 191 '_min_act': np.min(actions), 192 '_max_act': np.max(actions), 193 '_std_act': np.mean(np.var(actions, axis=0)), 194 '_mean_adv': np.mean(advantages), 195 '_min_adv': np.min(advantages), 196 '_max_adv': np.max(advantages), 197 '_std_adv': np.var(advantages), 198 '_mean_discrew': np.mean(disc_sum_rew), 199 '_min_discrew': np.min(disc_sum_rew), 200 '_max_discrew': np.max(disc_sum_rew), 201 '_std_discrew': np.var(disc_sum_rew), 202 '_Episode': episode 203 }) 204 205 206def main(env_name, num_episodes, gamma, lam, kl_targ, batch_size): 207 """ Main training loop 208 209 Args: 210 env_name: OpenAI Gym environment name, e.g. 'Hopper-v1' 211 num_episodes: maximum number of episodes to run 212 gamma: reward discount factor (float) 213 lam: lambda from Generalized Advantage Estimate 214 kl_targ: D_KL target for policy update [D_KL(pi_old || pi_new) 215 batch_size: number of episodes per policy training batch 216 """ 217 218 killer = GracefulKiller() 219 env, obs_dim, act_dim = init_gym(env_name, False) 220 if time_state: 221 obs_dim += 1 # add 1 to obs dimension for time step feature (see run_episode()) 222 now = datetime.utcnow().strftime("%b-%d_%H-%M-%S") # create unique directories 223 logger = Logger(logname=env_name, now=now) 224 225 scaler = Scaler(obs_dim, env_name) 226 val_func = NNValueFunction(obs_dim, env_name, True) 227 arg = [obs_dim, act_dim, kl_targ, time_state, env_name] 228 policy = Policy(obs_dim, act_dim, kl_targ, env_name, True) 229 230 episode = 0 231 232 while episode < num_episodes: 233 trajectories = run_policy(env, policy, scaler, logger, arg, episodes=batch_size) 234 episode += len(trajectories) 235 add_value(trajectories, val_func) # add estimated values to episodes 236 add_disc_sum_rew(trajectories, gamma) # calculated discounted sum of Rs 237 add_gae(trajectories, gamma, lam) # calculate advantage 238 # concatenate all episodes into single NumPy arrays 239 observes, actions, advantages, disc_sum_rew = build_train_set(trajectories) 240 # add various stats to training log: 241 log_batch_stats(observes, actions, advantages, disc_sum_rew, logger, episode) 242 policy.update(observes, actions, advantages, logger) # update policy 243 val_func.fit(observes, disc_sum_rew, logger) # update value function 244 logger.write(display=True) # write logger results to file and stdout 245 scaler.save() 246 if killer.kill_now: 247 if input('Terminate training (y/[n])? ') == 'y': 248 break 249 killer.kill_now = False 250 logger.close() 251 policy.close_sess() 252 val_func.close_sess() 253 254 255if __name__ == "__main__": 256 parser = argparse.ArgumentParser(description=('Train policy on OpenAI Gym environment ' 257 'using Proximal Policy Optimizer')) 258 parser.add_argument('env_name', type=str, help='OpenAI Gym environment name') 259 parser.add_argument('-n', '--num_episodes', type=int, help='Number of episodes to run', 260 default=1000) 261 #parser.add_argument('--renderON',action='store_true', default=False, dest='render', help='Toggle ON video') 262 #parser.add_argument('--renderOFF',action='store_false', default=False, dest='render', help='Toggle OFF video') 263 #parser.add_argument('-r', '--render', type=bool, help='Display Video', default=False) 264 parser.add_argument('-g', '--gamma', type=float, help='Discount factor', default=0.995) 265 parser.add_argument('-l', '--lam', type=float, help='Lambda for Generalized Advantage Estimation', 266 default=0.98) 267 parser.add_argument('-k', '--kl_targ', type=float, help='D_KL target value', 268 default=0.003) 269 parser.add_argument('-b', '--batch_size', type=int, 270 help='Number of episodes per training batch', 271 default=20) 272 273 args = parser.parse_args() 274 main(**vars(args)) 275