-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocesser.py
121 lines (92 loc) · 3.69 KB
/
processer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Event processing state machine
Two basic layers: request, response
worker.request.next = response
Plugin layers: journal
worker.request.next = journal
worker.journal.next = response
Fixme: How to detech exception of processing thread and restart it?
"""
import thread
import threading
from logging import info, debug
from oodict import OODict
from exception import *
from util import *
class Processer:
def __init__(self):
self.queue = []
self.next = None # Next processer in the state machine
self.queue_empty = threading.Condition()
def start(self):
thread.start_new_thread(self.mainloop, ())
def mainloop(self):
while True:
self.queue_empty.acquire()
while len(self.queue) == 0:
self.queue_empty.wait()
self.queue_empty.release()
item = self.queue.pop(0) # Proccessing from head
try:
result = self.processing(item)
# Error happens during processing item, need queue it back
if result is None:
self.submit(item)
continue
# If we are not the last step, pass item to the next processer
if self.next:
self.next.submit(result) # Pass result to the next processer
except Exception, err:
debug('Bug found in processer, unexcepted exception: %s, please restart me', err)
raise # Fixme: restart processer, should raise or just igore this silently?
def processing(self, item):
"""Process each item, please reimplement this function
If you want item be queued back, return None.
Anything not None will be passed to next processer, and causes item be
deleted in self.queue
"""
return item
def submit(self, item):
self.queue_empty.acquire()
self.queue.append(item)
self.queue_empty.notify()
self.queue_empty.release()
class RequestProcesser(Processer):
def __init__(self):
self.services = {}
Processer.__init__(self)
def register_service(self, name, ops):
self.services[name] = ops
def handle_req(self, req):
service, method = req.method.split('.')
if service not in self.services:
raise RequestHandleError('unknown service %s' % service)
try:
handler = getattr(self.services[service], method)
except AttributeError:
raise RequestHandleError('unknown method %s' % method)
if not callable(handler):
raise RequestHandleError('unknown method %s' % method)
return handler(req)
def processing(self, req):
response = OODict()
response._id = req._id
try:
result = self.handle_req(req) # Must set req._status
if result is None:
return None # Request not done, need to queue back
# For complicated calls, read for example, should return a tuple: value, payload
if isinstance(result, tuple):
response.value, response.payload = result
else:
response.value = result
except RequestHandleError, err:
response.error = str(err)
#debug('Processed, request: %s response: %s', filter_req(req), filter_req(response))
# Journal processer needs req, but response processer needs resp!
response._req = req
return response
class ResponseProcesser(Processer):
"""Please set self.handler to the real function sending response to
network"""
def processing(self, response):
return self.handler(response)