From 18b77ff46c5045d1d333cdf1390615ee2e93a9fa Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Thu, 6 Dec 2018 13:41:54 +0100 Subject: [PATCH] Metrics sampling (#382) * Add check results sampling - Sampling can be used by ZMON backend to throttle certain account metrics ingestion - Method used: random non-uniform sampling * fix typo Signed-off-by: Mohab Usama * Low frequency check excluded from sampling Signed-off-by: Mohab Usama * Add span Signed-off-by: Mohab Usama * Enhance OT span with sampling info Signed-off-by: Mohab Usama * log sampling config Signed-off-by: Mohab Usama * Add sampling ignored tag Signed-off-by: Mohab Usama * More logs Signed-off-by: Mohab Usama * Fix interval condition Signed-off-by: Mohab Usama * Add interval tag Signed-off-by: Mohab Usama * Get sampling config on first run Signed-off-by: Mohab Usama * Fix task args Signed-off-by: Mohab Usama * Fix url Signed-off-by: Mohab Usama * log correct sampling rate Signed-off-by: Mohab Usama --- config.yaml | 5 ++ tests/test_main_task.py | 47 +++++++++++ zmon_worker_monitor/tasks.py | 4 +- zmon_worker_monitor/workflow.py | 83 ++++++++++++++++++- zmon_worker_monitor/zmon_worker/tasks/main.py | 65 ++++++++++++++- 5 files changed, 196 insertions(+), 8 deletions(-) diff --git a/config.yaml b/config.yaml index eae3e36d..4c8c5555 100644 --- a/config.yaml +++ b/config.yaml @@ -60,3 +60,8 @@ plugin.appdynamics.url: 'https://appdynamics' plugin.saml.url: 'https://idp.example.com/profile/SAML2/Unsolicited/SSO?providerId=urn:self' plugin.openam.url: 'https://auth.zalando.com/z' + + +# sampling +zmon.sampling.rate: 100 +zmon.critical.checks: [] diff --git a/tests/test_main_task.py b/tests/test_main_task.py index b0ecf6a9..dd884629 100644 --- a/tests/test_main_task.py +++ b/tests/test_main_task.py @@ -298,3 +298,50 @@ def test_main_task_configure_tags(monkeypatch, tags, result): task = MainTask() assert task._entity_tags == result + + +@pytest.mark.parametrize('sampling_config,check_id,interval,is_alert,is_changed,is_sampled', ( + ({'default_sampling': 100, 'critical_checks': []}, 11, '60', False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, '300', False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 60, True, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, True, True), + ({'default_sampling': 100, 'critical_checks': ['11']}, 11, 60, False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, False, False), + ({'default_sampling': 0, 'critical_checks': ['11']}, 11, 60, False, False, True), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {}}, 11, 60, False, False, True), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 60, False, False, False), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 360, False, False, True), + ({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': '0'}}, 11, 60, False, False, True), + ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 100}}, 11, 60, False, False, True), +)) +def test_main_task_sampling(monkeypatch, sampling_config, check_id, interval, is_alert, is_changed, is_sampled): + reload(plugin_manager) + plugin_manager.init_plugin_manager() # init plugin manager + + span = MagicMock() + + MainTask.configure({'account': '123'}) + task = MainTask() + + assert task.is_sampled(sampling_config, check_id, interval, is_alert, is_changed, span) is is_sampled + + +@pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed', ( + ({'default_sampling': 10, 'critical_checks': []}, 11, False, False), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 10}}, 11, False, False), + ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 10}}, 11, False, False), +)) +def test_main_task_sampling_rate(monkeypatch, sampling_config, check_id, is_alert, is_changed): + reload(plugin_manager) + plugin_manager.init_plugin_manager() # init plugin manager + + span = MagicMock() + + MainTask.configure({'account': '123'}) + task = MainTask() + + results = [task.is_sampled(sampling_config, check_id, 60, is_alert, is_changed, span) for _ in range(100)] + sampled = len([s for s in results if s]) + + # We give some margin of error due to probabilistic non-uniform sampling + assert sampled >= 5 and sampled <= 20 diff --git a/zmon_worker_monitor/tasks.py b/zmon_worker_monitor/tasks.py index bc7457a2..a4bba2db 100644 --- a/zmon_worker_monitor/tasks.py +++ b/zmon_worker_monitor/tasks.py @@ -24,14 +24,14 @@ def configure_tasks(config): def check_and_notify(req, alerts, task_context=None, **kwargs): logger.debug('check_and_notify received req=%s, alerts=%s, task_context=%s, ', req, alerts, task_context) - zmontask.check_and_notify(req, alerts, task_context=task_context) + zmontask.check_and_notify(req, alerts, task_context=task_context, **kwargs) def trial_run(req, alerts, task_context=None, **kwargs): logger.info('trial_run received <== check_id=%s', req['check_id']) logger.debug('trial_run received req=%s, alerts=%s, task_context=%s, ', req, alerts, task_context) - zmontask.trial_run(req, alerts, task_context=task_context) + zmontask.trial_run(req, alerts, task_context=task_context, **kwargs) def cleanup(*args, **kwargs): diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index 24bbb7c1..7c4d0122 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -19,6 +19,8 @@ import json import snappy import opentracing +import requests +import tokens import settings @@ -38,10 +40,65 @@ OPENTRACING_QUEUE_OPERATION = 'worker_task_processing' OPENTRACING_TASK_EXPIRATION = 'worker_task_expire_time' +SAMPLING_RATE_UPDATE_DURATION = 60 +SAMPLING_RATE_ENTITY_ID = 'zmon-sampling-rate' + __config = None +def get_sampling_rate_config(config, current_span): + """ + Get sampling rate config from a ZMON entity or config vars. + Entity: + { + "id": "zmon-sampling-rate", + "type": "zmon_config", + "default_sampling": 100, + "critical_checks": [13, 14, 19], + "worker_sampling": { + "account-1": 50, + "account-2": 60, + "account-3": 0 + } + } + """ + default_sampling = int(config.get('zmon.sampling.rate', 100)) + critical_checks = config.get('zmon.critical.checks') + if type(critical_checks) is not list: + critical_checks = critical_checks.replace(' ', '').split(',') + + sampling_config = { + 'default_sampling': default_sampling, + 'critical_checks': critical_checks + } + + # We try to get sampling rate entity! + zmon_url = config.get('zmon.url') + if not zmon_url: + current_span.set_tag('sampling_entity_used', False) + else: + current_span.set_tag('sampling_entity_used', True) + + try: + tokens.configure() + tokens.manage('uid', ['uid']) + + url = '{}/api/v1/entities/{}'.format(zmon_url, SAMPLING_RATE_ENTITY_ID) + headers = {'Authorization': 'Bearer {}'.format(tokens.get('uid'))} + resp = requests.get(url, headers=headers, timeout=2) + + resp.raise_for_status() + + entity = resp.json() + sampling_config.update(entity) + except Exception: + current_span.set_tag('sampling_entity_used', False) + current_span.log_kv({'exception': format_exc()}) + + return sampling_config + + def get_config(): global __config if __config is None: @@ -117,6 +174,9 @@ def flow_simple_queue_processor(queue='', **execution_context): expired_count = 0 count = 0 + sampling_rate_last_updated = datetime.utcnow() + sampling_config = None + while True: try: @@ -131,7 +191,7 @@ def flow_simple_queue_processor(queue='', **execution_context): queue, msg = encoded_task - if not msg[:1] == '{': + if msg[:1] != '{': msg = snappy.decompress(msg) msg_obj = json.loads(msg) @@ -142,9 +202,21 @@ def flow_simple_queue_processor(queue='', **execution_context): span = extract_tracing_span(trace) span.set_operation_name(OPENTRACING_QUEUE_OPERATION) + # Get sampling rates. We update every minute. + if sampling_config is None or ( + (datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION): + try: + sampling_config = get_sampling_rate_config(config, span) + span.log_kv({'sampling_config': sampling_config}) + span.set_tag('sampling_rate_updated', True) + except Exception: + span.set_tag('sampling_rate_updated', False) + span.log_kv({'exception': format_exc()}) + with span: try: - is_processed = process_message(queue, known_tasks, reactor, msg_obj, current_span=span) + is_processed = process_message( + queue, known_tasks, reactor, msg_obj, current_span=span, sampling_config=sampling_config) if is_processed: span.set_tag(OPENTRACING_TAG_QUEUE_RESULT, 'success') else: @@ -173,7 +245,7 @@ def flow_simple_queue_processor(queue='', **execution_context): # TODO: some exit condition on failure: maybe when number of consecutive failures > n ? -def process_message(queue, known_tasks, reactor, msg_obj, current_span): +def process_message(queue, known_tasks, reactor, msg_obj, current_span, sampling_config=None): """ Proccess and execute a task. @@ -192,6 +264,9 @@ def process_message(queue, known_tasks, reactor, msg_obj, current_span): :param current_span: Current OpenTracing span. :type current_span: opentracing.Span + :param sampling_config: Dict holding all sampling rate info. + :type sampling_config: dict + :return: Return True if the message was processed successfully :rtype: bool """ @@ -247,7 +322,7 @@ def process_message(queue, known_tasks, reactor, msg_obj, current_span): return False with reactor.enter_task_context(taskname, t_hard, t_soft): - known_tasks[taskname](*func_args, task_context=task_context, **func_kwargs) + known_tasks[taskname](*func_args, task_context=task_context, sampling_config=sampling_config, **func_kwargs) return True diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index d90ccffd..649808a1 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -16,6 +16,7 @@ import urllib from urllib3.util import parse_url import math +import numpy from base64 import b64decode from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -66,6 +67,9 @@ # interval in seconds for storing metrics in Redis METRICS_INTERVAL = 15 +# Any check interval below this threshold is eligible for sampling. Above this threshold check will be always sampled. +SAMPLING_INTERVAL_THRESHOLD = 300 + DEFAULT_CHECK_RESULTS_HISTORY_LENGTH = 20 TRIAL_RUN_RESULT_EXPIRY_TIME = 300 @@ -956,11 +960,49 @@ def send_to_dataservice(cls, check_results, timeout=10): logger.error('Error in data service send: url={} ex={}'.format(cls._dataservice_url, ex)) raise + def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_changed, current_span): + """ + Return sampling bool flag. Sampling flag is computed via random non-uniform sampling. + + We always return True if: + - interval is above the SAMPLING_INTERVAL_THRESHOLD + - check_id is critical (exists in smapling_config['critical_checks'] list). + - is_alert or alert_changed is True. + - No sampling_config specified. + + If our self._account exists in smapling_config['worker_sampling'], then we always take this value. + """ + if not sampling_config or is_alert or alert_changed or int(interval) >= SAMPLING_INTERVAL_THRESHOLD: + current_span.set_tag('interval', interval) + current_span.set_tag('is_alert', is_alert) + current_span.set_tag('alert_changed', alert_changed) + current_span.set_tag('sampling_ignored', True) + return True + + # check if we have a specific sampling for this worker (account) + worker_sampling = sampling_config.get('worker_sampling', {}).get(self._account, None) + default_sampling = sampling_config.get('default_sampling', 100) + critical_checks = sampling_config.get('critical_checks', []) + + if str(check_id) in critical_checks or check_id in critical_checks: + current_span.set_tag('sampling_ignored', True) + current_span.set_tag('critical_check', True) + return True + + sampling_rate = default_sampling if worker_sampling is None else int(worker_sampling) + sampling = min(sampling_rate, 100) / 100. + + current_span.log_kv({'sampling_rate': sampling_rate}) + + return bool(numpy.random.choice([False, True], 1, p=(1 - sampling, sampling))) + @trace(pass_span=True) def check_and_notify(self, req, alerts, task_context=None, **kwargs): # Current OpenTracing span. current_span = extract_span_from_kwargs(**kwargs) + sampling_config = kwargs.get('sampling_config', {}) + self.task_context = task_context start_time = time.time() # soft_time_limit = req['interval'] @@ -1008,7 +1050,7 @@ def check_and_notify(self, req, alerts, task_context=None, **kwargs): 'exc': 1}, req, alerts, force_alert=True) else: - self.notify(val, req, alerts) + self.notify(val, req, alerts, sampling_config=sampling_config) @trace(pass_span=True) def trial_run(self, req, alerts, task_context=None, **kwargs): @@ -1501,6 +1543,8 @@ def notify(self, val, req, alerts, force_alert=False, **kwargs): # OpenTracing current span! current_span = extract_span_from_kwargs(**kwargs) + sampling_config = kwargs.get('sampling_config', {}) + def ts_serialize(ts): return datetime.fromtimestamp(ts, tz=self._timezone).isoformat(' ') if ts else None @@ -1517,10 +1561,13 @@ def ts_serialize(ts): 'check_result': val, 'exception': True if isinstance(val, dict) and val.get('exc') else False, 'alerts': {}, + 'is_sampled': True, # By default we consider a check result sampled! } try: setp(req['check_id'], entity_id, 'notify loop') + all_alerts_state = [] + all_alerts_changed_state = [] for alert in alerts: alert_id = alert['id'] alert_entities_key = 'zmon:alerts:{}'.format(alert_id) @@ -1535,6 +1582,10 @@ def ts_serialize(ts): func = getattr(self.con, ('sadd' if is_alert else 'srem')) changed = bool(func(alert_entities_key, entity_id)) + # Used later in sampling evaluation + all_alerts_state.append(is_alert) + all_alerts_changed_state.append(changed) + if is_alert: # bubble up: also update global set of alerts alert_changed = func('zmon:alerts', alert_id) @@ -1549,7 +1600,7 @@ def ts_serialize(ts): if alert_changed: _log_event('ALERT_ENDED', alert, val) - # PF-3318 If an alert has malformed time period, we should evaluate it anyway and continue with + # If an alert has malformed time period, we should evaluate it anyway and continue with # the remaining alert definitions. try: is_in_period = in_period(alert.get('period', '')) @@ -1572,6 +1623,7 @@ def ts_serialize(ts): capt_json = json.dumps(captures, cls=JsonDataEncoder) except Exception, e: self.logger.exception('failed to serialize captures') + current_span.log_kv({'exception': traceback.format_exc()}) captures = {'exception': str(e)} capt_json = json.dumps(captures, cls=JsonDataEncoder) # FIXME - set is_alert = True? @@ -1700,6 +1752,15 @@ def ts_serialize(ts): if self._dataservice_poster: check_result['entity'] = {'id': req['entity']['id']} + # Get sampling flag + check_result['is_sampled'] = self.is_sampled( + sampling_config, req['check_id'], req['interval'], any(all_alerts_state), + any(all_alerts_changed_state), current_span) + + current_span.log_kv({'alerts_state': all_alerts_state}) + current_span.log_kv({'changed_state': all_alerts_changed_state}) + current_span.set_tag('is_sampled', check_result['is_sampled']) + for k in self._entity_tags: if k in req['entity']: check_result['entity'][k] = req['entity'][k]