-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathlistener
executable file
·111 lines (96 loc) · 3.93 KB
/
listener
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
#!/usr/bin/env python
#
# Copyright 2016 Kenneth A. Giusti
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import time
import uuid
from oslo_config import cfg
import oslo_messaging
from oslo_log import log as logging
_options = [
cfg.StrOpt("topic",
default="my-topic",
help="target topic, default 'my-topic'"),
cfg.StrOpt("url",
required=True,
default="rabbit://localhost:5672",
help="transport address, default 'rabbit://localhost:5672'"),
cfg.BoolOpt("quiet",
default=False,
help="Suppress all stdout output"),
cfg.StrOpt("pool",
help="Notification pool identifier"),
cfg.StrOpt("log_levels",
help="Set module specific log levels, e.g."
" 'amqp=WARN,oslo.messaging=INFO,...'")
]
class TestNotificationEndpoint(object):
def __init__(self, name, quiet=False):
self.name = name
self.quiet = quiet
def _report(self, severity, ctx, publisher, event_type, payload, metadata):
if not self.quiet:
print("%s Notification %s:%s:%s:%s:%s"
% (self.name, severity, publisher,
event_type, payload, metadata))
def debug(self, ctx, publisher, event_type, payload, metadata):
self._report("debug", ctx, publisher, event_type, payload, metadata)
def audit(self, ctx, publisher, event_type, payload, metadata):
self._report("audit", ctx, publisher, event_type, payload, metadata)
def critical(self, ctx, publisher, event_type, payload, metadata):
self._report("critical", ctx, publisher, event_type, payload, metadata)
def error(self, ctx, publisher, event_type, payload, metadata):
self._report("error", ctx, publisher, event_type, payload, metadata)
def info(self, ctx, publisher, event_type, payload, metadata):
self._report("info", ctx, publisher, event_type, payload, metadata)
def warn(self, ctx, publisher, event_type, payload, metadata):
self._report("warn", ctx, publisher, event_type, payload, metadata)
def main(argv=None):
logging.register_options(cfg.CONF)
cfg.CONF.register_cli_opts(_options)
cfg.CONF(sys.argv[1:])
if cfg.CONF.log_levels:
logging.set_defaults(
default_log_levels=cfg.CONF.log_levels.split(',')
)
logging.setup(cfg.CONF, "listener")
quiet = cfg.CONF.quiet
name = "listener-" + uuid.uuid4().hex
topic = cfg.CONF.topic
url = cfg.CONF.url
pool = cfg.CONF.pool
targets = [oslo_messaging.Target(topic=topic)]
transport = oslo_messaging.get_notification_transport(cfg.CONF, url=url)
listener = oslo_messaging.get_notification_listener(transport,
targets,
[TestNotificationEndpoint(name,
quiet)],
executor="threading",
pool=pool)
listener.start()
if not quiet:
print("listener %s: url=%s, topics=%s" % (name, url, topic))
try:
while True:
time.sleep(10)
except KeyboardInterrupt:
print("Stopping..")
listener.stop()
listener.wait()
transport.cleanup()
return 0
if __name__ == "__main__":
sys.exit(main())