1
2"""
3PPO: Proximal Policy Optimization
4with data parallelism
5"""
6from typing import List, Tuple
7import sys
8sys.path.append('../envs')
9import chtrain as gym
10import numpy as np
11from policy import Policy
12from value_function import NNValueFunction
13import scipy.signal
14from utils import Logger, Scaler
15from datetime import datetime
16import argparse
17import signal
18from multiprocessing import Pool
19import run_episode
20
21"""" THIS VARIABLE TOGGLES TIME IN STATE.
22 REMEMBER THAT CHECKPOINT ARE NOT COMPATIBLE IF THE NUMBER OF STATE IS INCONSISTENT
23 """
24time_state = True
25
26# guard in the main module to avoid creating subprocesses recursively.
27if __name__ == "__main__":
28    pool = Pool(processes=6)
29
30class GracefulKiller:
31    """ Gracefully exit program on CTRL-C """
32    def __init__(self):
33        self.kill_now = False
34        signal.signal(signal.SIGINT, self.exit_gracefully)
35        signal.signal(signal.SIGTERM, self.exit_gracefully)
36
37    def exit_gracefully(self, signum, frame):
38        self.kill_now = True
39
40
41def init_gym(env_name, render):
42    """
43    Initialize gym environment, return dimension of observation
44    and action spaces.
45
46    Args:
47        render: True to toggle on visualization
48
49    Returns: 3-tuple
50        environment (object)
51        number of observation dimensions (int)
52        number of action dimensions (int)
53    """
54    env = gym.Init(env_name, render)
55    obs_dim = env.observation_space.shape[0]
56    act_dim = env.action_space.shape[0]
57
58    return env, obs_dim, act_dim
59
60
61
62
63def run_policy(env, policy, scaler, logger, args, episodes):
64    """ Run policy and collect data
65
66    Args:
67        env: environment (object)
68        policy: policy object with sample() method
69        scaler: scaler object, scales/offsets each observation
70        logger: logger object, used to save stats from episodes
71        episodes: total episodes to run
72
73    Returns: list of trajectory dictionaries, list length = number of episodes
74        'observes' : NumPy array of states from episode
75        'actions' : NumPy array of actions from episode
76        'rewards' : NumPy array of (un-discounted) rewards from episode
77        'unscaled_obs' : NumPy array of (un-scaled) states from episode
78    """
79    arg = []
80    for i in range(episodes):
81        arg.append(args)
82    trajectories = pool.map(run_episode.run_parallel_episodes, arg)
83
84    unscaled = np.concatenate([t['unscaled_obs'] for t in trajectories])
85    scaler.update(unscaled)  # update running statistics for scaling observations
86    logger.log({'_MeanReward': np.mean([t['rewards'].sum() for t in trajectories])})
87
88    return trajectories
89
90
91def discount(x, gamma):
92    """ Calculate discounted forward sum of a sequence at each point """
93    return scipy.signal.lfilter([1.0], [1.0, -gamma], x[::-1])[::-1]
94
95
96def add_disc_sum_rew(trajectories, gamma):
97    """ Adds discounted sum of rewards to all time steps of all trajectories
98
99    Args:
100        trajectories: as returned by run_policy()
101        gamma: discount
102
103    Returns:
104        None (mutates trajectories dictionary to add 'disc_sum_rew')
105    """
106    for trajectory in trajectories:
107        if gamma < 0.999:  # don't scale for gamma ~= 1
108            rewards = trajectory['rewards'] * (1 - gamma)
109        else:
110            rewards = trajectory['rewards']
111        disc_sum_rew = discount(rewards, gamma)
112        trajectory['disc_sum_rew'] = disc_sum_rew
113
114
115def add_value(trajectories, val_func):
116    """ Adds estimated value to all time steps of all trajectories
117
118    Args:
119        trajectories: as returned by run_policy()
120        val_func: object with predict() method, takes observations
121            and returns predicted state value
122
123    Returns:
124        None (mutates trajectories dictionary to add 'values')
125    """
126    for trajectory in trajectories:
127        observes = trajectory['observes']
128        values = val_func.predict(observes)
129        trajectory['values'] = values
130
131
132def add_gae(trajectories, gamma, lam):
133    """ Add generalized advantage estimator.
134    https://arxiv.org/pdf/1506.02438.pdf
135
136    Args:
137        trajectories: as returned by run_policy(), must include 'values'
138            key from add_value().
139        gamma: reward discount
140        lam: lambda (see paper).
141            lam=0 : use TD residuals
142            lam=1 : A =  Sum Discounted Rewards - V_hat(s)
143
144    Returns:
145        None (mutates trajectories dictionary to add 'advantages')
146    """
147    for trajectory in trajectories:
148        if gamma < 0.999:  # don't scale for gamma ~= 1
149            rewards = trajectory['rewards'] * (1 - gamma)
150        else:
151            rewards = trajectory['rewards']
152        values = trajectory['values']
153        # temporal differences
154        # values[1:] deletes the first element (Vo) and attachs a 0 at the end (the future state value function at the end of the trajectory is 0)
155        # r - Vs + gamma*Vst+1
156        tds = rewards - values + np.append(values[1:] * gamma, 0)
157        advantages = discount(tds, gamma * lam)
158        trajectory['advantages'] = advantages
159
160
161def build_train_set(trajectories):
162    """
163
164    Args:
165        trajectories: trajectories after processing by add_disc_sum_rew(),
166            add_value(), and add_gae()
167
168    Returns: 4-tuple of NumPy arrays
169        observes: shape = (N, obs_dim)
170        actions: shape = (N, act_dim)
171        advantages: shape = (N,)
172        disc_sum_rew: shape = (N,)
173    """
174    observes = np.concatenate([t['observes'] for t in trajectories])
175    actions = np.concatenate([t['actions'] for t in trajectories])
176    disc_sum_rew = np.concatenate([t['disc_sum_rew'] for t in trajectories])
177    advantages = np.concatenate([t['advantages'] for t in trajectories])
178    # normalize advantages
179    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-6)
180
181    return observes, actions, advantages, disc_sum_rew
182
183
184def log_batch_stats(observes, actions, advantages, disc_sum_rew, logger, episode):
185    """ Log various batch statistics """
186    logger.log({'_mean_obs': np.mean(observes),
187                '_min_obs': np.min(observes),
188                '_max_obs': np.max(observes),
189                '_std_obs': np.mean(np.var(observes, axis=0)),
190                '_mean_act': np.mean(actions),
191                '_min_act': np.min(actions),
192                '_max_act': np.max(actions),
193                '_std_act': np.mean(np.var(actions, axis=0)),
194                '_mean_adv': np.mean(advantages),
195                '_min_adv': np.min(advantages),
196                '_max_adv': np.max(advantages),
197                '_std_adv': np.var(advantages),
198                '_mean_discrew': np.mean(disc_sum_rew),
199                '_min_discrew': np.min(disc_sum_rew),
200                '_max_discrew': np.max(disc_sum_rew),
201                '_std_discrew': np.var(disc_sum_rew),
202                '_Episode': episode
203                })
204
205
206def main(env_name, num_episodes, gamma, lam, kl_targ, batch_size):
207    """ Main training loop
208
209    Args:
210        env_name: OpenAI Gym environment name, e.g. 'Hopper-v1'
211        num_episodes: maximum number of episodes to run
212        gamma: reward discount factor (float)
213        lam: lambda from Generalized Advantage Estimate
214        kl_targ: D_KL target for policy update [D_KL(pi_old || pi_new)
215        batch_size: number of episodes per policy training batch
216    """
217
218    killer = GracefulKiller()
219    env, obs_dim, act_dim = init_gym(env_name, False)
220    if time_state:
221        obs_dim += 1  # add 1 to obs dimension for time step feature (see run_episode())
222    now = datetime.utcnow().strftime("%b-%d_%H-%M-%S")  # create unique directories
223    logger = Logger(logname=env_name, now=now)
224
225    scaler = Scaler(obs_dim, env_name)
226    val_func = NNValueFunction(obs_dim, env_name, True)
227    arg = [obs_dim, act_dim, kl_targ, time_state, env_name]
228    policy = Policy(obs_dim, act_dim, kl_targ, env_name, True)
229
230    episode = 0
231
232    while episode < num_episodes:
233        trajectories = run_policy(env, policy, scaler, logger, arg,  episodes=batch_size)
234        episode += len(trajectories)
235        add_value(trajectories, val_func)  # add estimated values to episodes
236        add_disc_sum_rew(trajectories, gamma)  # calculated discounted sum of Rs
237        add_gae(trajectories, gamma, lam)  # calculate advantage
238        # concatenate all episodes into single NumPy arrays
239        observes, actions, advantages, disc_sum_rew = build_train_set(trajectories)
240        # add various stats to training log:
241        log_batch_stats(observes, actions, advantages, disc_sum_rew, logger, episode)
242        policy.update(observes, actions, advantages, logger)  # update policy
243        val_func.fit(observes, disc_sum_rew, logger)  # update value function
244        logger.write(display=True)  # write logger results to file and stdout
245        scaler.save()
246        if killer.kill_now:
247            if input('Terminate training (y/[n])? ') == 'y':
248                break
249            killer.kill_now = False
250    logger.close()
251    policy.close_sess()
252    val_func.close_sess()
253
254
255if __name__ == "__main__":
256    parser = argparse.ArgumentParser(description=('Train policy on OpenAI Gym environment '
257                                                  'using Proximal Policy Optimizer'))
258    parser.add_argument('env_name', type=str, help='OpenAI Gym environment name')
259    parser.add_argument('-n', '--num_episodes', type=int, help='Number of episodes to run',
260                        default=1000)
261    #parser.add_argument('--renderON',action='store_true', default=False, dest='render', help='Toggle ON video')
262    #parser.add_argument('--renderOFF',action='store_false', default=False, dest='render', help='Toggle OFF video')
263    #parser.add_argument('-r', '--render', type=bool, help='Display Video', default=False)
264    parser.add_argument('-g', '--gamma', type=float, help='Discount factor', default=0.995)
265    parser.add_argument('-l', '--lam', type=float, help='Lambda for Generalized Advantage Estimation',
266                        default=0.98)
267    parser.add_argument('-k', '--kl_targ', type=float, help='D_KL target value',
268                        default=0.003)
269    parser.add_argument('-b', '--batch_size', type=int,
270                        help='Number of episodes per training batch',
271                        default=20)
272
273    args = parser.parse_args()
274    main(**vars(args))
275