Skip to content
This repository has been archived by the owner on Apr 22, 2020. It is now read-only.

Metrics sampling #382

Merged
merged 14 commits into from
Dec 6, 2018
5 changes: 5 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,8 @@ plugin.appdynamics.url: 'https://appdynamics'

plugin.saml.url: 'https://idp.example.com/profile/SAML2/Unsolicited/SSO?providerId=urn:self'
plugin.openam.url: 'https://auth.zalando.com/z'


# sampling
zmon.sampling.rate: 100
zmon.critical.checks: []
47 changes: 47 additions & 0 deletions tests/test_main_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,50 @@ def test_main_task_configure_tags(monkeypatch, tags, result):
task = MainTask()

assert task._entity_tags == result


@pytest.mark.parametrize('sampling_config,check_id,interval,is_alert,is_changed,is_sampled', (
({'default_sampling': 100, 'critical_checks': []}, 11, '60', False, False, True),
({'default_sampling': 0, 'critical_checks': []}, 11, '300', False, False, True),
({'default_sampling': 0, 'critical_checks': []}, 11, 60, True, False, True),
({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, True, True),
({'default_sampling': 100, 'critical_checks': ['11']}, 11, 60, False, False, True),
({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, False, False),
({'default_sampling': 0, 'critical_checks': ['11']}, 11, 60, False, False, True),
({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {}}, 11, 60, False, False, True),
({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 60, False, False, False),
({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 360, False, False, True),
({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': '0'}}, 11, 60, False, False, True),
({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 100}}, 11, 60, False, False, True),
))
def test_main_task_sampling(monkeypatch, sampling_config, check_id, interval, is_alert, is_changed, is_sampled):
reload(plugin_manager)
plugin_manager.init_plugin_manager() # init plugin manager

span = MagicMock()

MainTask.configure({'account': '123'})
task = MainTask()

assert task.is_sampled(sampling_config, check_id, interval, is_alert, is_changed, span) is is_sampled


@pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed', (
({'default_sampling': 10, 'critical_checks': []}, 11, False, False),
({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 10}}, 11, False, False),
({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 10}}, 11, False, False),
))
def test_main_task_sampling_rate(monkeypatch, sampling_config, check_id, is_alert, is_changed):
reload(plugin_manager)
plugin_manager.init_plugin_manager() # init plugin manager

span = MagicMock()

MainTask.configure({'account': '123'})
task = MainTask()

results = [task.is_sampled(sampling_config, check_id, 60, is_alert, is_changed, span) for _ in range(100)]
sampled = len([s for s in results if s])

# We give some margin of error due to probabilistic non-uniform sampling
assert sampled >= 5 and sampled <= 20
4 changes: 2 additions & 2 deletions zmon_worker_monitor/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ def configure_tasks(config):
def check_and_notify(req, alerts, task_context=None, **kwargs):
logger.debug('check_and_notify received req=%s, alerts=%s, task_context=%s, ', req, alerts, task_context)

zmontask.check_and_notify(req, alerts, task_context=task_context)
zmontask.check_and_notify(req, alerts, task_context=task_context, **kwargs)


def trial_run(req, alerts, task_context=None, **kwargs):
logger.info('trial_run received <== check_id=%s', req['check_id'])
logger.debug('trial_run received req=%s, alerts=%s, task_context=%s, ', req, alerts, task_context)

zmontask.trial_run(req, alerts, task_context=task_context)
zmontask.trial_run(req, alerts, task_context=task_context, **kwargs)


def cleanup(*args, **kwargs):
Expand Down
83 changes: 79 additions & 4 deletions zmon_worker_monitor/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import json
import snappy
import opentracing
import requests
import tokens

import settings

Expand All @@ -38,10 +40,65 @@
OPENTRACING_QUEUE_OPERATION = 'worker_task_processing'
OPENTRACING_TASK_EXPIRATION = 'worker_task_expire_time'

SAMPLING_RATE_UPDATE_DURATION = 60
SAMPLING_RATE_ENTITY_ID = 'zmon-sampling-rate'


__config = None


def get_sampling_rate_config(config, current_span):
"""
Get sampling rate config from a ZMON entity or config vars.
Entity:
{
"id": "zmon-sampling-rate",
"type": "zmon_config",
"default_sampling": 100,
"critical_checks": [13, 14, 19],
"worker_sampling": {
"account-1": 50,
"account-2": 60,
"account-3": 0
}
}
"""
default_sampling = int(config.get('zmon.sampling.rate', 100))
critical_checks = config.get('zmon.critical.checks')
if type(critical_checks) is not list:
critical_checks = critical_checks.replace(' ', '').split(',')

sampling_config = {
'default_sampling': default_sampling,
'critical_checks': critical_checks
}

# We try to get sampling rate entity!
zmon_url = config.get('zmon.url')
if not zmon_url:
current_span.set_tag('sampling_entity_used', False)
else:
current_span.set_tag('sampling_entity_used', True)

try:
tokens.configure()
tokens.manage('uid', ['uid'])

url = '{}/api/v1/entities/{}'.format(zmon_url, SAMPLING_RATE_ENTITY_ID)
headers = {'Authorization': 'Bearer {}'.format(tokens.get('uid'))}
resp = requests.get(url, headers=headers, timeout=2)

resp.raise_for_status()

entity = resp.json()
sampling_config.update(entity)
except Exception:
current_span.set_tag('sampling_entity_used', False)
current_span.log_kv({'exception': format_exc()})

return sampling_config


def get_config():
global __config
if __config is None:
Expand Down Expand Up @@ -117,6 +174,9 @@ def flow_simple_queue_processor(queue='', **execution_context):
expired_count = 0
count = 0

sampling_rate_last_updated = datetime.utcnow()
sampling_config = None

while True:
try:

Expand All @@ -131,7 +191,7 @@ def flow_simple_queue_processor(queue='', **execution_context):

queue, msg = encoded_task

if not msg[:1] == '{':
if msg[:1] != '{':
msg = snappy.decompress(msg)

msg_obj = json.loads(msg)
Expand All @@ -142,9 +202,21 @@ def flow_simple_queue_processor(queue='', **execution_context):
span = extract_tracing_span(trace)
span.set_operation_name(OPENTRACING_QUEUE_OPERATION)

# Get sampling rates. We update every minute.
if sampling_config is None or (
(datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION):
try:
sampling_config = get_sampling_rate_config(config, span)
span.log_kv({'sampling_config': sampling_config})
span.set_tag('sampling_rate_updated', True)
except Exception:
span.set_tag('sampling_rate_updated', False)
span.log_kv({'exception': format_exc()})

with span:
try:
is_processed = process_message(queue, known_tasks, reactor, msg_obj, current_span=span)
is_processed = process_message(
queue, known_tasks, reactor, msg_obj, current_span=span, sampling_config=sampling_config)
if is_processed:
span.set_tag(OPENTRACING_TAG_QUEUE_RESULT, 'success')
else:
Expand Down Expand Up @@ -173,7 +245,7 @@ def flow_simple_queue_processor(queue='', **execution_context):
# TODO: some exit condition on failure: maybe when number of consecutive failures > n ?


def process_message(queue, known_tasks, reactor, msg_obj, current_span):
def process_message(queue, known_tasks, reactor, msg_obj, current_span, sampling_config=None):
"""
Proccess and execute a task.
Expand All @@ -192,6 +264,9 @@ def process_message(queue, known_tasks, reactor, msg_obj, current_span):
:param current_span: Current OpenTracing span.
:type current_span: opentracing.Span
:param sampling_config: Dict holding all sampling rate info.
:type sampling_config: dict
:return: Return True if the message was processed successfully
:rtype: bool
"""
Expand Down Expand Up @@ -247,7 +322,7 @@ def process_message(queue, known_tasks, reactor, msg_obj, current_span):
return False

with reactor.enter_task_context(taskname, t_hard, t_soft):
known_tasks[taskname](*func_args, task_context=task_context, **func_kwargs)
known_tasks[taskname](*func_args, task_context=task_context, sampling_config=sampling_config, **func_kwargs)
return True


Expand Down
65 changes: 63 additions & 2 deletions zmon_worker_monitor/zmon_worker/tasks/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import urllib
from urllib3.util import parse_url
import math
import numpy
from base64 import b64decode
from cryptography import x509
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -66,6 +67,9 @@
# interval in seconds for storing metrics in Redis
METRICS_INTERVAL = 15

# Any check interval below this threshold is eligible for sampling. Above this threshold check will be always sampled.
SAMPLING_INTERVAL_THRESHOLD = 300

DEFAULT_CHECK_RESULTS_HISTORY_LENGTH = 20

TRIAL_RUN_RESULT_EXPIRY_TIME = 300
Expand Down Expand Up @@ -956,11 +960,49 @@ def send_to_dataservice(cls, check_results, timeout=10):
logger.error('Error in data service send: url={} ex={}'.format(cls._dataservice_url, ex))
raise

def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_changed, current_span):
"""
Return sampling bool flag. Sampling flag is computed via random non-uniform sampling.
We always return True if:
- interval is above the SAMPLING_INTERVAL_THRESHOLD
- check_id is critical (exists in smapling_config['critical_checks'] list).
- is_alert or alert_changed is True.
- No sampling_config specified.
If our self._account exists in smapling_config['worker_sampling'], then we always take this value.
"""
if not sampling_config or is_alert or alert_changed or int(interval) >= SAMPLING_INTERVAL_THRESHOLD:
current_span.set_tag('interval', interval)
current_span.set_tag('is_alert', is_alert)
current_span.set_tag('alert_changed', alert_changed)
current_span.set_tag('sampling_ignored', True)
return True

# check if we have a specific sampling for this worker (account)
worker_sampling = sampling_config.get('worker_sampling', {}).get(self._account, None)
default_sampling = sampling_config.get('default_sampling', 100)
critical_checks = sampling_config.get('critical_checks', [])

if str(check_id) in critical_checks or check_id in critical_checks:
current_span.set_tag('sampling_ignored', True)
current_span.set_tag('critical_check', True)
return True

sampling_rate = default_sampling if worker_sampling is None else int(worker_sampling)
sampling = min(sampling_rate, 100) / 100.

current_span.log_kv({'sampling_rate': sampling_rate})

return bool(numpy.random.choice([False, True], 1, p=(1 - sampling, sampling)))

@trace(pass_span=True)
def check_and_notify(self, req, alerts, task_context=None, **kwargs):
# Current OpenTracing span.
current_span = extract_span_from_kwargs(**kwargs)

sampling_config = kwargs.get('sampling_config', {})

self.task_context = task_context
start_time = time.time()
# soft_time_limit = req['interval']
Expand Down Expand Up @@ -1008,7 +1050,7 @@ def check_and_notify(self, req, alerts, task_context=None, **kwargs):
'exc': 1}, req, alerts,
force_alert=True)
else:
self.notify(val, req, alerts)
self.notify(val, req, alerts, sampling_config=sampling_config)

@trace(pass_span=True)
def trial_run(self, req, alerts, task_context=None, **kwargs):
Expand Down Expand Up @@ -1501,6 +1543,8 @@ def notify(self, val, req, alerts, force_alert=False, **kwargs):
# OpenTracing current span!
current_span = extract_span_from_kwargs(**kwargs)

sampling_config = kwargs.get('sampling_config', {})

def ts_serialize(ts):
return datetime.fromtimestamp(ts, tz=self._timezone).isoformat(' ') if ts else None

Expand All @@ -1517,10 +1561,13 @@ def ts_serialize(ts):
'check_result': val,
'exception': True if isinstance(val, dict) and val.get('exc') else False,
'alerts': {},
'is_sampled': True, # By default we consider a check result sampled!
}

try:
setp(req['check_id'], entity_id, 'notify loop')
all_alerts_state = []
all_alerts_changed_state = []
for alert in alerts:
alert_id = alert['id']
alert_entities_key = 'zmon:alerts:{}'.format(alert_id)
Expand All @@ -1535,6 +1582,10 @@ def ts_serialize(ts):
func = getattr(self.con, ('sadd' if is_alert else 'srem'))
changed = bool(func(alert_entities_key, entity_id))

# Used later in sampling evaluation
all_alerts_state.append(is_alert)
all_alerts_changed_state.append(changed)

if is_alert:
# bubble up: also update global set of alerts
alert_changed = func('zmon:alerts', alert_id)
Expand All @@ -1549,7 +1600,7 @@ def ts_serialize(ts):
if alert_changed:
_log_event('ALERT_ENDED', alert, val)

# PF-3318 If an alert has malformed time period, we should evaluate it anyway and continue with
# If an alert has malformed time period, we should evaluate it anyway and continue with
# the remaining alert definitions.
try:
is_in_period = in_period(alert.get('period', ''))
Expand All @@ -1572,6 +1623,7 @@ def ts_serialize(ts):
capt_json = json.dumps(captures, cls=JsonDataEncoder)
except Exception, e:
self.logger.exception('failed to serialize captures')
current_span.log_kv({'exception': traceback.format_exc()})
captures = {'exception': str(e)}
capt_json = json.dumps(captures, cls=JsonDataEncoder)
# FIXME - set is_alert = True?
Expand Down Expand Up @@ -1700,6 +1752,15 @@ def ts_serialize(ts):
if self._dataservice_poster:
check_result['entity'] = {'id': req['entity']['id']}

# Get sampling flag
check_result['is_sampled'] = self.is_sampled(
sampling_config, req['check_id'], req['interval'], any(all_alerts_state),
any(all_alerts_changed_state), current_span)

current_span.log_kv({'alerts_state': all_alerts_state})
current_span.log_kv({'changed_state': all_alerts_changed_state})
current_span.set_tag('is_sampled', check_result['is_sampled'])

for k in self._entity_tags:
if k in req['entity']:
check_result['entity'][k] = req['entity'][k]
Expand Down