-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_ppo_cartpole.py
102 lines (74 loc) · 2.47 KB
/
test_ppo_cartpole.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import torch
import numpy as np
from vectorenv.dummy import VectorEnv
from collector import Collector
from network import Network
from ppo import PPO
from test.test_envs.cartpole_continous import CartPoleContinousEnv
from gym.wrappers.time_limit import TimeLimit
import gym
import time
from tqdm import tqdm
#C_avg_runs = 10
C_avg_runs = 1
H_num_epochs = 10000
H_steps_per_iter = 2048
H_test_episodes = 100
H_repeat = 2
def train_policy(seed):
#construct envs
def MakeEnv():
return TimeLimit(CartPoleContinousEnv(), max_episode_steps=200)
def IsStop(reward):
return reward >= 200
train_env = VectorEnv([MakeEnv for _ in range(16)])
test_env = VectorEnv([MakeEnv for _ in range(100)])
#seed
np.random.seed(seed)
torch.manual_seed(seed)
train_env.seed(seed)
test_env.seed(seed)
#construct policy
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = Network((4), (1), 1.0, device).to(device)
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
policy = PPO(net, optimizer)
#construct collector
train_collector = Collector(train_env, policy)
test_collector = Collector(test_env, policy)
total_duration = 0
#train policy
for _ in range(H_num_epochs):
start_time = time.time()
#collect experience
train_collector.clear_buffer()
result = train_collector.collect(n_step=H_steps_per_iter)
batch = train_collector.get_experience()
#train model
train_metric = policy.train(batch, H_repeat)
total_duration += time.time() - start_time
avg_metric = {}
for k, v in train_metric.items():
avg_metric[k] = np.mean(v)
tqdm.write(str(result))
tqdm.write(str(avg_metric))
#need to stop?
if IsStop(result["rew"]):
#test
test_result = test_collector.collect(n_episode=H_test_episodes)
if IsStop(test_result["rew"]):
break
train_env.close()
test_env.close()
#visualize result
render_env = VectorEnv([MakeEnv for _ in range(1)])
render_collector = Collector(render_env, policy)
render_collector.collect(n_episode=1, render=True)
render_env.close()
return total_duration
#Average all runs
run_time = []
for run_id in tqdm(range(C_avg_runs)):
torch.cuda.empty_cache()
run_time.append(train_policy(run_id))
tqdm.write("{}s +/- {}s".format(np.mean(run_time), np.std(run_time)))