-
Notifications
You must be signed in to change notification settings - Fork 101
/
Copy pathtest_env.py
70 lines (61 loc) · 2.86 KB
/
test_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import numpy as np
from envs.JSBSim.envs import SingleCombatEnv, SingleControlEnv, MultipleCombatEnv
from envs.env_wrappers import SubprocVecEnv, ShareDummyVecEnv, ShareSubprocVecEnv, DummyVecEnv
from envs.JSBSim.core.catalog import Catalog as c
import logging
import time
logging.basicConfig(level=logging.DEBUG)
def test_env():
parallel_num = 1
envs = DummyVecEnv([lambda: SingleCombatEnv("1v1/NoWeapon/HierarchySelfplay") for _ in range(parallel_num)])
envs.reset()
# DataType test
obs_shape = (parallel_num, envs.num_agents, *envs.observation_space.shape)
# act_shape = (parallel_num, envs.num_agents, *envs.action_space.shape)
reward_shape = (parallel_num, envs.num_agents, 1)
done_shape = (parallel_num, envs.num_agents, 1)
def convert(sample):
return np.concatenate((sample[0], np.expand_dims(sample[1], axis=0)))
episode_reward = 0
step = 0
while True:
actions = np.array([[envs.action_space.sample() for _ in range(envs.num_agents)] for _ in range(parallel_num)])
obss, rewards, dones, infos = envs.step(actions)
bloods = [envs.envs[0].agents[agent_id].bloods for agent_id in envs.envs[0].agents.keys()]
print(f"step:{step}, bloods:{bloods}")
episode_reward += rewards[:,0,:]
envs.render(mode='txt', filepath='JSBSimRecording.txt.acmi')
# terminate if any of the parallel envs has been done
if np.all(dones):
print(episode_reward)
break
step += 1
envs.close()
def test_multi_env():
parallel_num = 1
envs = ShareDummyVecEnv([lambda: MultipleCombatEnv('2v2/NoWeapon/HierarchySelfplay') for _ in range(parallel_num)])
assert envs.num_agents == 4
obs_shape = (parallel_num, envs.num_agents, *envs.observation_space.shape)
share_obs_shape = (parallel_num, envs.num_agents, *envs.share_observation_space.shape)
reward_shape = (parallel_num, envs.num_agents, 1)
done_shape = (parallel_num, envs.num_agents, 1)
# DataType test
obs, share_obs = envs.reset()
step = 0
envs.render(mode='txt', filepath='JSBSimRecording.txt.acmi')
assert obs.shape == obs_shape and share_obs.shape == share_obs_shape
while True:
actions = np.array([[envs.action_space.sample() for _ in range(envs.num_agents)] for _ in range(parallel_num)])
start = time.time()
obs, share_obs, rewards, dones, info = envs.step(actions)
bloods = [envs.envs[0].agents[agent_id].bloods for agent_id in envs.envs[0].agents.keys()]
print(f"step:{step}, bloods:{bloods}")
end = time.time()
# print(rewards)
envs.render(mode='txt', filepath='JSBSimRecording.txt.acmi')
assert obs.shape == obs_shape and rewards.shape == reward_shape and dones.shape == done_shape and share_obs_shape
if np.all(dones):
break
step += 1
envs.close()
test_multi_env()