This repository has been archived by the owner on Jun 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathfeatures.py
295 lines (220 loc) · 9.47 KB
/
features.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""
Registers and gets features added to Aiohttp by brewblox services.
"""
from abc import ABC
from enum import Enum, auto
from functools import wraps
from typing import Any, Hashable, Optional, Type, Union
from aiohttp import web
from brewblox_service import brewblox_logger
FEATURES_KEY = '#features'
LOGGER = brewblox_logger(__name__)
def add(app: web.Application,
feature: Any,
key: Optional[Union[Hashable, Type[Any]]] = None,
exist_ok: bool = False
):
"""
Adds a new feature to the app.
Features can either be registered as the default feature for the class,
or be given an explicit name.
Args:
app (web.Application):
The current Aiohttp application.
feature (Any):
The new feature that should be registered.
It is recommended, but not required to use a `ServiceFeature`.
key (Hashable, optional):
The key under which the feature should be registered.
Defaults to `type(feature)`.
exist_ok (bool):
If truthy, this function will do nothing if a feature was already registered for `key`.
Otherwise, an exception is raised.
"""
if FEATURES_KEY not in app:
app[FEATURES_KEY] = dict()
key = key or type(feature)
if key in app[FEATURES_KEY]:
if exist_ok:
return
else:
raise KeyError(f'Feature "{key}" already registered')
app[FEATURES_KEY][key] = feature
def get(app: web.Application,
feature_type: Optional[Type[Any]] = None,
key: Optional[Hashable] = None
) -> Any:
"""
Finds declared feature.
Identification is done based on feature type and key.
Args:
app (web.Application):
The current Aiohttp application.
feature_type (Type[Any]):
The Python type of the desired feature.
If specified, it will be checked against the found feature.
key (Hashable):
A specific identifier for the desired feature.
Defaults to `feature_type`
Returns:
Any: The feature found for the combination of `feature_type` and `key`
"""
actual: Union[Hashable, Type[Any], None] = key or feature_type
if actual is None:
raise AssertionError('No feature identifier provided')
try:
found = app[FEATURES_KEY][actual]
except KeyError as ex:
raise KeyError(f'No feature found for "{actual}"') from ex
if feature_type and not isinstance(found, feature_type):
raise AssertionError(f'Found {found} did not match type "{feature_type}"')
return found
class Startup(Enum):
MANAGED = auto()
MANUAL = auto()
AUTODETECT = auto()
class ServiceFeature(ABC):
"""Base class for long-lived Aiohttp handler classes.
For classes with async functionality,
the (synchronous) `__init__()` and `__del__()` functions may not be sufficient.
Aiohttp offers comparable init/deinit hooks, but inside the context of a running event loop.
ServiceFeature registers the `self.startup(self, app)`, `self.before_shutdown(app)`,
and `self.shutdown(self, app)` as lifecycle callbacks.
They will be called by Aiohttp at the appropriate moment.
By overriding these functions, subclasses can perform initialization/deinitialization that requires an event loop.
Note: Aiohttp will not accept registration of new callbacks after it started running.
Startup management can be adjusted by using the `startup` argument in `ServiceFeature.__init__()`
Example class:
import asyncio
import random
from aiohttp import web
from brewblox_service import scheduler, service
from brewblox_service.features import ServiceFeature
class MyFeature(ServiceFeature):
def __init__(self, app: web.Application):
super().__init__(app)
self._task: asyncio.Task = None
async def startup(self, app: web.Application):
# Schedule a long-running background task
self._task = await scheduler.create(app, self._hello())
async def before_shutdown(self, app: web.Application):
print('Any minute now...')
async def shutdown(self, app: web.Application):
# Orderly cancel the background task
await scheduler.cancel(app, self._task)
async def _hello(self):
while True:
await asyncio.sleep(5)
print(random.choice([
'Hellooo',
'Searching',
'Sentry mode activated',
'Is anyone there?',
'Could you come over here?',
]))
Example use:
parser = service.create_parser('my_service')
config = service.create_config(parser)
app = service.create_app(config)
async def setup():
scheduler.setup(app)
greeter = MyFeature(app)
service.run_app(app, setup())
# greeter.startup(app) is called now
# Press Ctrl+C to quit
# greeter.before_shutdown(app) will be called
# greeter.shutdown(app) will be called
"""
def __hook(self, func, evt):
@wraps(func)
async def decorator(app):
LOGGER.debug(f'--> {evt} {self}')
retv = await func(app)
LOGGER.debug(f'<-- {evt} {self}')
return retv
return decorator
async def _startup(self, app: web.Application):
"""
Internal lifecycle hook.
This allows intermediate classes to extend lifecycle callbacks
without the user having to explicitly call the super() implementation.
Only override this if you know what you are doing.
"""
await self.startup(app)
async def _before_shutdown(self, app: web.Application):
"""
Internal lifecycle hook.
This allows intermediate classes to extend lifecycle callbacks
without the user having to explicitly call the super() implementation.
Only override this if you know what you are doing.
"""
await self.before_shutdown(app)
async def _shutdown(self, app: web.Application):
"""
Internal lifecycle hook.
This allows intermediate classes to extend lifecycle callbacks
without the user having to explicitly call the super() implementation.
Only override this if you know what you are doing.
"""
await self.shutdown(app)
def __init__(self, app: web.Application, startup=Startup.AUTODETECT):
"""
ServiceFeature constructor.
Args:
app (web.Application):
The Aiohttp application with which the feature should be associated.
startup (Startup):
How feature lifecycle management should be handled. Default is AUTODETECT.
MANAGED: Feature always registers lifecycle hooks.
This will raise an exception when creating
the feature while the application is running.
MANUAL: Feature will not register lifecycle hooks.
startup() and shutdown() must be called manually.
AUTODETECT: Feature will register lifecycle hooks only if app is not running.
Behaves like MANAGED before application start,
and like MANUAL after application start.
"""
self.__active_app: web.Application = app
if any([
startup == Startup.MANAGED,
startup == Startup.AUTODETECT and not app.frozen
]):
app.on_startup.append(self.__hook(self._startup, 'startup'))
app.on_shutdown.append(self.__hook(self._before_shutdown, 'before_shutdown'))
app.on_cleanup.append(self.__hook(self._shutdown, 'shutdown'))
def __str__(self):
return f'<{type(self).__name__}>'
@property
def app(self) -> web.Application:
"""Currently active `web.Application`
Returns:
web.Application: The current app.
"""
return self.__active_app
async def startup(self, app: web.Application):
"""Lifecycle hook for initializing the feature in an async context.
Unless overridden by subclasses, this function is a no-op.
Depending on the `startup` argument in `__init__()`,
`startup()` will be called when Aiohttp starts running.
Args:
app (web.Application):
Current Aiohttp application.
"""
async def before_shutdown(self, app: web.Application):
"""Lifecycle hook for preparing to shut down the feature.
Unless overridden by subclasses, this function is a no-op.
Depending on the `startup` argument in `__init__()`,
`before_shutdown()` will be called when Aiohttp is closing.
Args:
app (web.Application):
Current Aiohttp application.
"""
async def shutdown(self, app: web.Application):
"""Lifecycle hook for shutting down the feature before the event loop is closed.
Unless overridden by subclasses, this function is a no-op.
Depending on the `startup` argument in `__init__()`,
`shutdown()` will be called when Aiohttp is closing.
Args:
app (web.Application):
Current Aiohttp application.
"""