This repository has been archived by the owner on Jul 22, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathevaluate.py
147 lines (120 loc) · 4.61 KB
/
evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import engine
import json
import pandas as pd
import numpy as np
from sim_setting import sim_setting_control
import argparse
from utility import parse_arguments
baseline_tt = {
"hangzhou_1x1_bc-tyc_18041610_1h": 479.19,
"hangzhou_1x1_bc-tyc_18041607_1h": 340.15,
"hangzhou_1x1_bc-tyc_18041608_1h": 573.18,
"hangzhou_1x1_kn-hz_18041607_1h": 161.48,
"hangzhou_1x1_sb-sx_18041607_1h": 258.61,
}
def main():
args = parse_arguments()
sim_setting = sim_setting_control
sim_setting["num_step"] = args.num_step
evaluate_one_traffic(sim_setting, args.scenario)
def evaluate_one_traffic(dic_sim_setting, scenario):
roadnetFile = "data/{}/roadnet.json".format(scenario)
flowFile = "data/{}/flow.json".format(scenario)
planFile = "data/{}/signal_plan_template.txt".format(scenario)
outFile = "data/{}/evaluation.txt".format(scenario)
if check(planFile, dic_sim_setting["num_step"]):
tt = cal_travel_time(dic_sim_setting, roadnetFile, flowFile, planFile)
print("====================== travel time ======================")
print("scenario_{0}: {1:.2f} s".format(scenario, tt))
print("====================== travel time ======================\n")
b = baseline_tt[scenario]
score = (b - tt)/b
print("====================== score ======================")
print("scenario_{0}: {1}".format(scenario, score))
print("====================== score ======================")
with open(outFile, "w") as f:
f.write(str(score))
else:
print("planFile is invalid, Rejected!")
def cal_travel_time(dic_sim_setting, roadnetFile, flowFile, planFile):
eng = engine.Engine(dic_sim_setting["interval"], dic_sim_setting["threadNum"],
dic_sim_setting["saveReplay"], dic_sim_setting["rlTrafficLight"],
dic_sim_setting["changeLane"])
eng.load_roadnet(roadnetFile)
eng.load_flow(flowFile)
plan = pd.read_csv(planFile, sep="\t", header=0, dtype=int)
intersection_id = plan.columns[0]
for step in range(dic_sim_setting["num_step"]):
phase = int(plan.loc[step])
eng.set_tl_phase(intersection_id, phase)
eng.next_step()
current_time = eng.get_current_time()
if current_time % 100 == 0:
print("Time: {} / {}".format(current_time, dic_sim_setting["num_step"]))
return eng.get_score()
def check(planFile, num_step):
flag = True
error_info = ''
try:
plan = pd.read_csv(planFile, sep='\t', header=0, dtype=int)
except:
flag = False
error_info = 'The format of signal plan is not valid and cannot be read by pd.read_csv!'
print(error_info)
return flag
intersection_id = plan.columns[0]
if intersection_id != 'intersection_1_1':
flag = False
error_info = 'The header intersection_id is wrong (for example: intersection_1_1)!'
print(error_info)
return flag
phases = plan.values
current_phase = phases[0][0]
if len(phases) < num_step:
flag = False
error_info = 'The time of signal plan is less than the default time!'
print(error_info)
return flag
if current_phase == 0:
yellow_time = 1
else:
yellow_time = 0
# get first green phase and check
last_green_phase = '*'
for next_phase in phases[1:]:
next_phase = next_phase[0]
# check phase itself
if next_phase == '':
continue
if next_phase not in [0, 1, 2, 3, 4, 5, 6, 7, 8]:
flag = False
error_info = 'Phase must be in [0, 1, 2, 3, 4, 5, 6, 7, 8]!'
break
# check changing phase
if next_phase != current_phase and next_phase != 0 and current_phase != 0:
flag = False
error_info = '5 seconds of yellow time must be inserted between two different phase!'
break
# check unchangeable phase
if next_phase != 0 and next_phase == last_green_phase:
flag = False
error_info = 'No yellow light is allowed between the same phase!'
break
# check yellow time
if next_phase != 0 and yellow_time != 0 and yellow_time != 5:
flag = False
error_info = 'Yellow time must be 5 seconds!'
break
# normal
if next_phase == 0:
yellow_time += 1
if current_phase != 0:
last_green_phase = current_phase
else:
yellow_time = 0
current_phase = next_phase
if not flag:
print(error_info)
return flag
if __name__ == "__main__":
main()