forked from xicko7/reisa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulation.yml
186 lines (161 loc) · 7.97 KB
/
simulation.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
pdi:
logging:
level: "warn"
metadata:
pcoord_1d: int
pcoord: { type: array, subtype: int, size: 2 }
psize: { type: array, subtype: int, size: 2 }
dsize: { type: array, subtype: int, size: 2 }
MaxtimeSteps: int
timestep: int
mpi_per_node: int
data:
local_t:
type: array
subtype: double
size: ['$dsize[0]', '$dsize[1]']
subsize: ['$dsize[0] - 2', '$dsize[1] - 2']
start: [1, 1]
plugins:
mpi:
pycall:
on_event:
init:
with: {rank: $pcoord_1d, iterations: $MaxtimeSteps, mpi_per_node: $mpi_per_node}
exec: |
from ray._private.internal_api import free
from datetime import datetime
from reisa import RayList
import numpy as np
import netifaces
import logging
import time
import ray
import sys
import os
import gc
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
# In this event we init ray, define all the tasks and create the actors
mpi_per_node = int(mpi_per_node)
ib = netifaces.ifaddresses('ib0')[netifaces.AF_INET][0]['addr'] # Infiniband address
concurrency=8 # Actor concurrency value
threshold = 0.5
ray.init(address="auto", namespace="mpi", logging_level=logging.ERROR, _node_ip_address=ib)
initial_status=ray._private.state.state._available_resources_per_node()
cg = int(concurrency*3/4)
@ray.remote (max_restarts=-1, max_task_retries=-1, resources={"actor":1}, concurrency_groups={"store": cg, "exec": cg, "free": 1}, \
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(), # Resource actor indicates that the actor will be running in one simulation node
soft=False,
))
class ProcessActor:
def __init__(self):
self.data_queues = [] # Simulation data will be stored here
self.offset = int(rank) # Rank of the process that creates the actor
self.control = [0 for _ in range(iterations)] # Number of processes that have stored the data on each iteration
self.node_id = ray.get_runtime_context().get_node_id()
for i in range(mpi_per_node):
self.data_queues.append([None for _ in range(iterations)])
return
@ray.method(concurrency_group="store")
def free_mem(self, dep, i):
ray.wait(dep, num_returns=len(dep))
refs = [self.data_queues[mpid][i] for mpid in range(mpi_per_node)]
free(refs, local_only=True)
for mpid in range(mpi_per_node):
self.data_queues[mpid][i] = None
del refs
gc.collect()
return
def ready(self):
return
# Each MPI process will call this method every iteration for storing data
@ray.method(concurrency_group="store")
def add_value(self, mpid, result, i):
self.data_queues[mpid%mpi_per_node][i]=result[0]
self.control[i] = self.control[i]+1
gc.collect()
return
# The client will tell the actor to execute some tasks through this method
@ray.method(concurrency_group="exec")
def trigger(self, process_task: ray.remote_function.RemoteFunction, i: int):
tasks = RayList()
# Ensure that all processes have stored its value and throwed their tasks
while np.sum(self.control[:i+1]) < (mpi_per_node*(i+1)):
pass
tasks = [process_task.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=self.node_id,
soft=True,
)
).remote(mpid+self.offset, i, RayList(self.data_queues[mpid][:i+1])) \
for mpid in range(mpi_per_node)]
gc.collect()
return tasks
put_time = 0
actor = None
actorname = "ranktor"+str(rank - (rank%mpi_per_node)) # Common actor name for the processes within the same simulation node
if rank%mpi_per_node == 0: # One process per node creates the actor
actor = ProcessActor.options(max_concurrency=concurrency, name=actorname, namespace="mpi", lifetime="detached").remote()
else: # The processes which have not created the actor look for it
timeout = 10
start = time.time()
error = True
while error:
try:
actor = ray.get_actor(actorname, namespace="mpi")
error = False
except Exception as e:
actor = None
elapsed_time = time.time() - start
if elapsed_time >= timeout:
raise Exception("Cannot get the Ray actors. Simulation may break.")
# Get the base amount of memory
ray.get(actor.ready.remote())
Available:
with: { i: $timestep, data: $local_t}
exec: |
# This event manages what is happening for each iteration in the simulation
stopped = False
flag = True
while(flag):
status=ray._private.state.state._available_resources_per_node()
for node in status:
osm_used=initial_status[node]['object_store_memory']-status[node]['object_store_memory']
m_used=initial_status[node]['memory']-status[node]['memory']
osm_limit=initial_status[node]['object_store_memory']*threshold
m_limit=initial_status[node]['memory']*threshold
if m_used > m_limit or osm_used > osm_limit:
flag = True
# if not(stopped) and rank==0:
# eprint("["+str(datetime.now())+"]", m_used, ">", m_limit, "or", osm_used, ">", osm_limit)
# eprint("["+str(datetime.now())+"] Simulation stopped at iteration " + str(i) + " due to memory pressure.")
stopped=True
break
else:
flag = False
gc.collect()
# Measure time spent in ray.put() instruction
error = True
start = time.time()
while error:
try:
d = ray.put(data, _owner=actor)
error = False
except Exception as e:
error = True
put_time = put_time + (time.time()-start)
del data
# Tell the actor to save the reference of the put object
ray.get(actor.add_value.remote(rank, [d], i))
gc.collect()
finish:
with: {rank: $pcoord_1d, iterations: $MaxtimeSteps, mpi_per_node: $mpi_per_node}
exec: |
if rank == 0:
# Print measured values
eprint("{:<21}".format("GLOBAL_PUT_TIME:") + str(put_time) + " (avg:" + "{:.5f}".format(put_time/iterations)+")") # In process 0
eprint("{:<21}".format("ACTOR_CONCURRENCY:") + str(concurrency))
# ray.timeline(filename="timeline-client.json")
ray.shutdown()