Skip to content

Commit

Permalink
Merge pull request #1 from gistart/robust-udp
Browse files Browse the repository at this point in the history
robust udp + udp dns round robin support?
  • Loading branch information
gistart authored May 12, 2021
2 parents 1e292c6 + 317097d commit 73abc55
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 16 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ __pycache__
.coverage
coverage.xml
test/poke.py
build/*
dist/*
compare/*
build*
dist*
compare*

!.gitkeep
!/.gitignore
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ Metrics with no labels are initialized at creation time. This can have unpleasan

To avoid that we'll have to properly isolate each task's metrics, which can be impossible or rather tricky, or we can create metrics with default, non-changing labels (like `hostname`). Such metrics will be initialized on first use (inc), and we'll be pushing only those we actually utilized.

## Batch clients
## Clients

### Batch clients

Batch clients spawn synchronization jobs "in background" (meaning in a thread or asyncio task) to periodically send all metrics from `ppc.PUSH_REGISTRY` to the destination.

Expand Down Expand Up @@ -107,7 +109,7 @@ async def main(urls):
```


## Streaming clients
### Streaming clients

If for some reason every metric change needs to be synced, UDP streaming clients are implemented in this library.

Expand All @@ -120,4 +122,9 @@ def statsd_udp_stream(host, port):

Usage is completely identical to batch clients' decorators / context managers.

:warning: Histogram and Summary `.time() decorator` doesn't work in this mode atm, because it can't be monkey-patched easily.
:warning: Histogram and Summary `.time() decorator` doesn't work in this mode atm, because it can't be monkey-patched easily.


## Transports

Main goal is not to interrupt measured jobs with errors from monitoring code. Therefor all transports will attempt to catch all network errors, logging error info and corresponding tracebacks to stdout.
5 changes: 3 additions & 2 deletions prometheus_push_client/clients/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, format, transport, period=15.0, *args, **kwargs):
self.stop_event = None

self._period_step = 0.25 # check event every 0.25 seconds
self._min_period = 0.1 # sleep no less than

super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -56,7 +57,7 @@ def run(self):
self.transport.push_all(data_gen)
except Exception:
log.error("push crashed", exc_info=True)
period = self.period - (time.time() - ts_start)
period = max(self._min_period, self.period - (time.time() - ts_start))


class AsyncBatchClient(BaseBatchClient):
Expand Down Expand Up @@ -90,4 +91,4 @@ async def run(self):
await self.transport.push_all(data_gen)
except Exception:
log.error("push crashed", exc_info=True)
period = self.period - (time.time() - ts_start)
period = max(self._min_period, self.period - (time.time() - ts_start))
41 changes: 33 additions & 8 deletions prometheus_push_client/transports/udp.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import asyncio
import socket
import logging

from prometheus_push_client import compat


class BaseUdpTransport:
log = logging.getLogger("prometheus.udp")


class BaseUdpTransport:
def __init__(self, host, port, mtu=508, datagram_lines=25):
self.host = host
self.port = int(port)
Expand Down Expand Up @@ -40,28 +43,42 @@ def push_all_sync(self, iterable):
self.push_one(data)

def push_one(self, data):
raise NotImplementedError()
try:
return self.transport.sendto(data, (self.host, self.port))
except socket.gaierror: # name resolution error
pass


# TODO: crashes on creation time DNS errors -- retry?
# TODO: ipv6 support?


class SyncUdpTransport(BaseUdpTransport):
def start(self):
self._getaddrinfo()
self.transport = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

push_all = BaseUdpTransport.push_all_sync

def push_one(self, data):
self.transport.sendto(data, (self.host, self.port))
def _getaddrinfo(self):
try:
return socket.getaddrinfo(
self.host,
self.port,
family=socket.AF_INET,
type=socket.SOCK_DGRAM,
)
except socket.gaierror as e:
log.error("%s -- %s:%s", e, self.host, self.port)


class AioUdpTransport(BaseUdpTransport):
async def start(self, loop=None):
loop = loop or compat.get_running_loop()
await self._getaddrinfo(loop)
self.transport, _ = await loop.create_datagram_endpoint(
lambda: asyncio.DatagramProtocol(),
remote_addr=(self.host, self.port)
family=socket.AF_INET,
allow_broadcast=False,
)

async def stop(self):
Expand All @@ -70,5 +87,13 @@ async def stop(self):
async def push_all(self, iterable):
self.push_all_sync(iterable)

def push_one(self, data):
self.transport.sendto(data)
async def _getaddrinfo(self, loop):
try:
return await loop.getaddrinfo(
self.host,
self.port,
family=socket.AF_INET,
type=socket.SOCK_DGRAM,
)
except socket.gaierror as e:
log.error("%s -- %s:%s", e, self.host, self.port)
36 changes: 36 additions & 0 deletions test/test_offline/test_udp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import pytest
import logging
import socket

import prometheus_push_client as ppc


def test_sync_udp_gaierror(caplog):
transport = ppc.SyncUdpTransport("X-does-not-exist-X-123", 1)
with caplog.at_level(logging.INFO):
transport.start()

assert any(
lr.name == "prometheus.udp" and
any(isinstance(a, socket.gaierror) for a in lr.args)
for lr in caplog.records
)

# does not raise
transport.push_all([b"1", b"2"])


@pytest.mark.asyncio
async def test_async_udp_gaierror(caplog):
transport = ppc.AioUdpTransport("X-does-not-exist-X-123", 1)
with caplog.at_level(logging.INFO):
await transport.start()

assert any(
lr.name == "prometheus.udp" and
any(isinstance(a, socket.gaierror) for a in lr.args)
for lr in caplog.records
)

# does not raise
await transport.push_all([b"1", b"2"])

0 comments on commit 73abc55

Please sign in to comment.