diff --git a/.gitignore b/.gitignore index 4e08d9e..bc58aec 100644 --- a/.gitignore +++ b/.gitignore @@ -15,9 +15,9 @@ __pycache__ .coverage coverage.xml test/poke.py -build/* -dist/* -compare/* +build* +dist* +compare* !.gitkeep !/.gitignore diff --git a/README.md b/README.md index 309fe97..6b114c0 100644 --- a/README.md +++ b/README.md @@ -59,7 +59,9 @@ Metrics with no labels are initialized at creation time. This can have unpleasan To avoid that we'll have to properly isolate each task's metrics, which can be impossible or rather tricky, or we can create metrics with default, non-changing labels (like `hostname`). Such metrics will be initialized on first use (inc), and we'll be pushing only those we actually utilized. -## Batch clients +## Clients + +### Batch clients Batch clients spawn synchronization jobs "in background" (meaning in a thread or asyncio task) to periodically send all metrics from `ppc.PUSH_REGISTRY` to the destination. @@ -107,7 +109,7 @@ async def main(urls): ``` -## Streaming clients +### Streaming clients If for some reason every metric change needs to be synced, UDP streaming clients are implemented in this library. @@ -120,4 +122,9 @@ def statsd_udp_stream(host, port): Usage is completely identical to batch clients' decorators / context managers. -:warning: Histogram and Summary `.time() decorator` doesn't work in this mode atm, because it can't be monkey-patched easily. \ No newline at end of file +:warning: Histogram and Summary `.time() decorator` doesn't work in this mode atm, because it can't be monkey-patched easily. + + +## Transports + +Main goal is not to interrupt measured jobs with errors from monitoring code. Therefor all transports will attempt to catch all network errors, logging error info and corresponding tracebacks to stdout. \ No newline at end of file diff --git a/prometheus_push_client/clients/batch.py b/prometheus_push_client/clients/batch.py index 93ccbf6..9909395 100644 --- a/prometheus_push_client/clients/batch.py +++ b/prometheus_push_client/clients/batch.py @@ -17,6 +17,7 @@ def __init__(self, format, transport, period=15.0, *args, **kwargs): self.stop_event = None self._period_step = 0.25 # check event every 0.25 seconds + self._min_period = 0.1 # sleep no less than super().__init__(*args, **kwargs) @@ -56,7 +57,7 @@ def run(self): self.transport.push_all(data_gen) except Exception: log.error("push crashed", exc_info=True) - period = self.period - (time.time() - ts_start) + period = max(self._min_period, self.period - (time.time() - ts_start)) class AsyncBatchClient(BaseBatchClient): @@ -90,4 +91,4 @@ async def run(self): await self.transport.push_all(data_gen) except Exception: log.error("push crashed", exc_info=True) - period = self.period - (time.time() - ts_start) \ No newline at end of file + period = max(self._min_period, self.period - (time.time() - ts_start)) \ No newline at end of file diff --git a/prometheus_push_client/transports/udp.py b/prometheus_push_client/transports/udp.py index ed14592..e13d448 100644 --- a/prometheus_push_client/transports/udp.py +++ b/prometheus_push_client/transports/udp.py @@ -1,11 +1,14 @@ import asyncio import socket +import logging from prometheus_push_client import compat -class BaseUdpTransport: +log = logging.getLogger("prometheus.udp") + +class BaseUdpTransport: def __init__(self, host, port, mtu=508, datagram_lines=25): self.host = host self.port = int(port) @@ -40,28 +43,42 @@ def push_all_sync(self, iterable): self.push_one(data) def push_one(self, data): - raise NotImplementedError() + try: + return self.transport.sendto(data, (self.host, self.port)) + except socket.gaierror: # name resolution error + pass -# TODO: crashes on creation time DNS errors -- retry? +# TODO: ipv6 support? class SyncUdpTransport(BaseUdpTransport): def start(self): + self._getaddrinfo() self.transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) push_all = BaseUdpTransport.push_all_sync - def push_one(self, data): - self.transport.sendto(data, (self.host, self.port)) + def _getaddrinfo(self): + try: + return socket.getaddrinfo( + self.host, + self.port, + family=socket.AF_INET, + type=socket.SOCK_DGRAM, + ) + except socket.gaierror as e: + log.error("%s -- %s:%s", e, self.host, self.port) class AioUdpTransport(BaseUdpTransport): async def start(self, loop=None): loop = loop or compat.get_running_loop() + await self._getaddrinfo(loop) self.transport, _ = await loop.create_datagram_endpoint( lambda: asyncio.DatagramProtocol(), - remote_addr=(self.host, self.port) + family=socket.AF_INET, + allow_broadcast=False, ) async def stop(self): @@ -70,5 +87,13 @@ async def stop(self): async def push_all(self, iterable): self.push_all_sync(iterable) - def push_one(self, data): - self.transport.sendto(data) \ No newline at end of file + async def _getaddrinfo(self, loop): + try: + return await loop.getaddrinfo( + self.host, + self.port, + family=socket.AF_INET, + type=socket.SOCK_DGRAM, + ) + except socket.gaierror as e: + log.error("%s -- %s:%s", e, self.host, self.port) \ No newline at end of file diff --git a/test/test_offline/test_udp.py b/test/test_offline/test_udp.py new file mode 100644 index 0000000..181a9d2 --- /dev/null +++ b/test/test_offline/test_udp.py @@ -0,0 +1,36 @@ +import pytest +import logging +import socket + +import prometheus_push_client as ppc + + +def test_sync_udp_gaierror(caplog): + transport = ppc.SyncUdpTransport("X-does-not-exist-X-123", 1) + with caplog.at_level(logging.INFO): + transport.start() + + assert any( + lr.name == "prometheus.udp" and + any(isinstance(a, socket.gaierror) for a in lr.args) + for lr in caplog.records + ) + + # does not raise + transport.push_all([b"1", b"2"]) + + +@pytest.mark.asyncio +async def test_async_udp_gaierror(caplog): + transport = ppc.AioUdpTransport("X-does-not-exist-X-123", 1) + with caplog.at_level(logging.INFO): + await transport.start() + + assert any( + lr.name == "prometheus.udp" and + any(isinstance(a, socket.gaierror) for a in lr.args) + for lr in caplog.records + ) + + # does not raise + await transport.push_all([b"1", b"2"]) \ No newline at end of file