Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TrafficLightFlow.py #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 39 additions & 67 deletions TrafficLightFlow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
"""Traffic Light Grid example."""

from flow.core.params import SumoParams, EnvParams, InitialConfig, NetParams, \
InFlows, SumoCarFollowingParams
from flow.core.params import VehicleParams
from flow.controllers import SimCarFollowingController, GridRouter
from flow.envs import TrafficLightGridPOEnv, TrafficLightGridEnv
from flow.envs import TrafficLightGridPOEnv, TrafficLightGridEnv, AccelEnv
from flow.networks import TrafficLightGridNetwork
from flow.core.experiment import Experiment
from StaticAgent import static_rl_actions
from flow.utils.registry import make_create_env

# time horizon of a single rollout
HORIZON = 200
Expand All @@ -18,17 +19,14 @@
# from the edges, and False otherwise
USE_INFLOWS = True


def gen_edges(col_num, row_num):
"""Generate the names of the outer edges in the traffic light grid network.

Parameters
----------
col_num : int
number of columns in the traffic light grid
row_num : int
number of rows in the traffic light grid

Returns
-------
list of str
Expand All @@ -38,18 +36,13 @@ def gen_edges(col_num, row_num):
for i in range(col_num):
edges += ['left' + str(row_num) + '_' + str(i)]
edges += ['right' + '0' + '_' + str(i)]

# build the left and then the right edges
for i in range(row_num):
edges += ['bot' + str(i) + '_' + '0']
edges += ['top' + str(i) + '_' + str(col_num)]

return edges


def get_inflow_params(col_num, row_num, additional_net_params):
"""Define the network and initial params in the presence of inflows.

Parameters
----------
col_num : int
Expand All @@ -58,7 +51,7 @@ def get_inflow_params(col_num, row_num, additional_net_params):
number of rows in the traffic light grid
additional_net_params : dict
network-specific parameters that are unique to the traffic light grid

Returns
-------
flow.core.params.InitialConfig
Expand All @@ -69,55 +62,22 @@ def get_inflow_params(col_num, row_num, additional_net_params):
"""
initial = InitialConfig(
spacing='custom', lanes_distribution=float('inf'), shuffle=True)

inflow = InFlows()
outer_edges = gen_edges(col_num, row_num)
for i in range(len(outer_edges)):
inflow.add(
veh_type='idm',
edge=outer_edges[i],
probability=0.25,
departLane='free',
departSpeed=10)

probability=0.05*(i+1),
depart_lane='free',
depart_speed=10)
net = NetParams(
inflows=inflow,
additional_params=additional_net_params)

return initial, net


def get_non_flow_params(enter_speed, add_net_params):
"""Define the network and initial params in the absence of inflows.

Note that when a vehicle leaves a network in this case, it is immediately
returns to the start of the row/column it was traversing, and in the same
direction as it was before.

Parameters
----------
enter_speed : float
initial speed of vehicles as they enter the network.
add_net_params: dict
additional network-specific parameters (unique to the traffic light grid)

Returns
-------
flow.core.params.InitialConfig
parameters specifying the initial configuration of vehicles in the
network
flow.core.params.NetParams
network-specific parameters used to generate the network
"""
additional_init_params = {'enter_speed': enter_speed}
initial = InitialConfig(
spacing='custom', additional_params=additional_init_params)
net = NetParams(additional_params=add_net_params)

return initial, net


V_ENTER = 15
V_ENTER = 10
INNER_LENGTH = 300
LONG_LENGTH = 100
SHORT_LENGTH = 300
Expand All @@ -141,35 +101,35 @@ def get_non_flow_params(enter_speed, add_net_params):
"cars_top": NUM_CARS_TOP,
"cars_bot": NUM_CARS_BOT
}

additional_env_params = {
'target_velocity': 50,
'switch_time': 3.0,
'num_observed': 2,
'discrete': False,
'tl_type': 'controlled'
}

additional_net_params = {
'speed_limit': 35,
'grid_array': grid_array,
'horizontal_lanes': 1,
'vertical_lanes': 1
}

vehicles = VehicleParams()
vehicles.add(
veh_id='idm',
acceleration_controller=(SimCarFollowingController, {}),
car_following_params=SumoCarFollowingParams(
minGap=2.5,
min_gap=2.5,
decel=7.5, # avoid collisions at emergency stops
max_speed=V_ENTER,
speed_mode="all_checks",
),
routing_controller=(GridRouter, {}),
num_vehicles=tot_cars)

# collect the initialization and network-specific parameters based on the
# choice to use inflows or not
if USE_INFLOWS:
Expand All @@ -181,51 +141,63 @@ def get_non_flow_params(enter_speed, add_net_params):
initial_config, net_params = get_non_flow_params(
enter_speed=V_ENTER,
add_net_params=additional_net_params)


flow_params = dict(
# name of the experiment
exp_tag='traffic_light_grid',

# name of the flow environment the experiment is running on
env_name=TrafficLightGridPOEnv,

# name of the network class the experiment is running on
network=TrafficLightGridNetwork,

# simulator that is used by the experiment
simulator='traci',

# sumo-related parameters (see flow.core.params.SumoParams)
sim=SumoParams(
sim_step=1,
render=True,
emission_path="Results"
),

# environment related parameters (see flow.core.params.EnvParams)
env=EnvParams(
horizon=HORIZON,
additional_params=additional_env_params,
),

# network-related parameters (see flow.core.params.NetParams and the
# network's documentation or ADDITIONAL_NET_PARAMS component). This is
# filled in by the setup_exps method below.
net=net_params,

# vehicles to be placed in the network at the start of a rollout (see
# flow.core.params.VehicleParams)
veh=vehicles,

# parameters specifying the positioning of vehicles upon initialization/
# reset (see flow.core.params.InitialConfig). This is filled in by the
# setup_exps method below.
initial=initial_config,
)



exp = Experiment(flow_params)
env = exp.env
t = 0

def rl_actions(state):
global t
t += 1
return [t%30 == 0]

exp.run(1, convert_to_csv=True)

def GetTrafficLightEnv():
# Get the env name and a creator for the environment.
create_env, _ = make_create_env(flow_params)
# Create the environment.
env = create_env()
return env

exp = Experiment(flow_params)
exp.run(1, static_rl_actions, convert_to_csv=True)