-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpush_worker.py
54 lines (47 loc) · 1.76 KB
/
push_worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# python3 push_worker.py <num_worker_processors> <dispatcher url>
import argparse
import zmq
import json
from model import Task
from util import new_task_handler
from local_worker import local_worker
def main(num, url, name):
t, task_queue, result_queue = new_task_handler(local_worker, num_processes=num)
context = zmq.Context()
router = context.socket(zmq.DEALER)
router.identity = b"WORKER" + name.encode()
# Should we maybe use this?
# router.setsockopt(zmq.IDENTITY, b"WORKER" + name.encode())
print(router.identity)
router.connect(url)
poller = zmq.Poller()
poller.register(router, zmq.POLLIN)
# executor = ThreadPoolExecutor(max_workers=num)
taskCnt = 0.0
while True:
if poller.poll(1000):
response = router.recv_multipart()
task = Task(**json.loads(response[1]))
# Process pool
task_queue.put(task)
taskCnt += 1
# Thread Pool
# future = executor.submit(execute_task, task)
# # Callback to send the result back to the dispatcher
# future.add_done_callback(lambda f: router.send_multipart([router.identity, json.dumps(f.result().dict()).encode()]))
else:
#Process pool
try:
result = result_queue.get(block=False)
taskCnt -= 1
router.send_multipart([router.identity, json.dumps(result.dict()).encode()])
except Exception:
pass
router.send_multipart([router.identity, b"REGISTER", str(taskCnt/num).encode()])
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-n', '--number', type=int, default=2)
parser.add_argument('-u', '--url', type=str, default="tcp://127.0.0.1:5555")
parser.add_argument('-k', '--name', type=str, default="0")
args = parser.parse_args()
main(args.number, args.url, args.name)