forked from bassemmarji/flask_sync_async
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtasks.py
80 lines (68 loc) · 3.08 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import time
from celery import Celery, signals
from celery.utils.log import get_task_logger
from flask_socketio import SocketIO
import config
import common
# Setup the logger (compatible with celery version 4)
logger = get_task_logger(__name__)
# Setup the celery client
celery = Celery(__name__)
# Load celery configurations from celeryconfig.py
celery.config_from_object("celeryconfig")
# Setup and connect the socket instance to Redis Server
socketio = SocketIO(message_queue=config.BROKER_URL)
# Initialize OTEL on worker connect
@signals.worker_process_init.connect(weak=False)
def initialize_honeycomb(**kwargs):
common.otel_init()
###############################################################################
def long_sync_task(n):
print(f"This task will take {n} seconds.")
for i in range(n):
print(f"i = {i}")
time.sleep(1)
###############################################################################
@celery.task(name = 'tasks.long_async_task')
def long_async_task(n,session):
print(f"The task of session {session} will take {n} seconds.")
for i in range(n):
print(f"i = {i}")
time.sleep(1)
###############################################################################
def send_message(event, namespace, room, message):
print("Message = ", message)
socketio.emit(event, {'msg': message}, namespace=namespace, room=room)
@celery.task(name = 'tasks.long_async_taskf')
def long_async_taskf(data):
room = data['sessionid']
namespace = data['namespase']
n = data['waittime']
#Send messages signaling the lifecycle of the task
send_message('status', namespace, room, 'Begin')
send_message('msg', namespace, room, 'Begin Task {}'.format(long_async_taskf.request.id))
send_message('msg', namespace, room, 'This task will take {} seconds'.format(n))
print(f"This task will take {n} seconds.")
for i in range(n):
msg = f"{i}"
send_message('msg', namespace, room, msg )
time.sleep(1)
send_message('msg', namespace, room, 'End Task {}'.format(long_async_taskf.request.id))
send_message('status', namespace, room, 'End')
###############################################################################
@celery.task(name = 'tasks.long_async_sch_task')
def long_async_sch_task(data):
room = data['sessionid']
namespace = data['namespase']
n = data['waittime']
send_message('status', namespace, room, 'Begin')
send_message('msg' , namespace, room, 'Begin Task {}'.format(long_async_sch_task.request.id))
send_message('msg' , namespace, room, 'This task will take {} seconds'.format(n))
print(f"This task will take {n} seconds.")
for i in range(n):
msg = f"{i}"
send_message('msg', namespace, room, msg )
time.sleep(1)
send_message('msg' , namespace, room, 'End Task {}'.format(long_async_sch_task.request.id))
send_message('status', namespace, room, 'End')
###############################################################################