From 9e8f86d34a8ec1a74a2305a2fbb8aeefc90f657d Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Thu, 29 Nov 2018 19:30:58 +0100 Subject: [PATCH 01/14] Add check results sampling - Sampling can be used by ZMON backend to throttle certain account metrics ingestion - Method used: random non-uniform sampling --- config.yaml | 5 ++ tests/test_main_task.py | 41 ++++++++++ zmon_worker_monitor/workflow.py | 81 ++++++++++++++++++- zmon_worker_monitor/zmon_worker/tasks/main.py | 49 ++++++++++- 4 files changed, 170 insertions(+), 6 deletions(-) diff --git a/config.yaml b/config.yaml index eae3e36d..4c8c5555 100644 --- a/config.yaml +++ b/config.yaml @@ -60,3 +60,8 @@ plugin.appdynamics.url: 'https://appdynamics' plugin.saml.url: 'https://idp.example.com/profile/SAML2/Unsolicited/SSO?providerId=urn:self' plugin.openam.url: 'https://auth.zalando.com/z' + + +# sampling +zmon.sampling.rate: 100 +zmon.critical.checks: [] diff --git a/tests/test_main_task.py b/tests/test_main_task.py index b0ecf6a9..f032d90a 100644 --- a/tests/test_main_task.py +++ b/tests/test_main_task.py @@ -298,3 +298,44 @@ def test_main_task_configure_tags(monkeypatch, tags, result): task = MainTask() assert task._entity_tags == result + + +@pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed,is_sampled', ( + ({'default_sampling': 100, 'critical_checks': []}, 11, False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, True, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, False, True, True), + ({'default_sampling': 100, 'critical_checks': [11]}, 11, False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, False, False, False), + ({'default_sampling': 0, 'critical_checks': [11]}, 11, False, False, True), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {}}, 11, False, False, True), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, False, False, False), + ({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': 0}}, 11, False, False, True), + ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 100}}, 11, False, False, True), +)) +def test_main_task_sampling(monkeypatch, sampling_config, check_id, is_alert, is_changed, is_sampled): + reload(plugin_manager) + plugin_manager.init_plugin_manager() # init plugin manager + + MainTask.configure({'account': '123'}) + task = MainTask() + + assert task.is_sampled(sampling_config, check_id, is_alert, is_changed) is is_sampled + + +@pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed', ( + ({'default_sampling': 10, 'critical_checks': []}, 11, False, False), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 10}}, 11, False, False), + ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 10}}, 11, False, False), +)) +def test_main_task_sampling_rate(monkeypatch, sampling_config, check_id, is_alert, is_changed): + reload(plugin_manager) + plugin_manager.init_plugin_manager() # init plugin manager + + MainTask.configure({'account': '123'}) + task = MainTask() + + results = [task.is_sampled(sampling_config, check_id, is_alert, is_changed) for _ in range(100)] + sampled = len([s for s in results if s]) + + # We give some margin of error due to probabilistic sampling non-uniform sampling + assert sampled >= 5 and sampled <= 20 diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index 24bbb7c1..4d989d46 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -19,6 +19,8 @@ import json import snappy import opentracing +import requests +import tokens import settings @@ -38,10 +40,65 @@ OPENTRACING_QUEUE_OPERATION = 'worker_task_processing' OPENTRACING_TASK_EXPIRATION = 'worker_task_expire_time' +SAMPLING_RATE_UPDATE_DURATION = 60 +SAMPLING_RATE_ENTITY_ID = 'zmon-sampling-rate' + __config = None +def get_sampling_rate_config(config, current_span): + """ + Get sampling rate config from a ZMON entity or config vars. + Entity: + { + "id": "zmon-sampling-rate", + "type": "zmon_config", + "default_sampling": 100, + "critical_checks": [13, 14, 19], + "worker_sampling": { + "account-1": 50, + "account-2": 60, + "account-3": 0 + } + } + """ + default_sampling = config.get('zmon.sampling.rate') + critical_checks = config.get('zmon.critical.checks') + if type(critical_checks) is not list: + critical_checks = critical_checks.replace(' ', '').split(',') + + sampling_config = { + 'default_sampling': default_sampling, + 'critical_checks': critical_checks + } + + # We try to get sampling rate entity! + zmon_url = config.get('zmon.url') + if not zmon_url: + current_span.set_tag('sampling_entity_used', False) + else: + current_span.set_tag('sampling_entity_used', True) + + try: + tokens.configure() + tokens.manage('uid', ['uid']) + + url = '{}/api/v1/{}'.format(zmon_url, SAMPLING_RATE_ENTITY_ID) + headers = {'Authorization': 'Bearer {}'.format(tokens.get('uid'))} + resp = requests.get(url, headers=headers, timeout=2) + + resp.raise_for_status() + + entity = resp.json() + sampling_config.update(entity) + except Exception: + current_span.set_tag('sampling_entity_used', False) + current_span.log_kv({'exception': format_exc()}) + + return sampling_config + + def get_config(): global __config if __config is None: @@ -117,6 +174,9 @@ def flow_simple_queue_processor(queue='', **execution_context): expired_count = 0 count = 0 + sampling_rate_last_updated = datetime.utcnow() + sampling_config = {} + while True: try: @@ -131,7 +191,7 @@ def flow_simple_queue_processor(queue='', **execution_context): queue, msg = encoded_task - if not msg[:1] == '{': + if msg[:1] != '{': msg = snappy.decompress(msg) msg_obj = json.loads(msg) @@ -142,9 +202,19 @@ def flow_simple_queue_processor(queue='', **execution_context): span = extract_tracing_span(trace) span.set_operation_name(OPENTRACING_QUEUE_OPERATION) + # Get sampling rates. We update every minute. + if (datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION: + try: + sampling_config = get_sampling_rate_config(config) + span.set_tag('sampling_rate_updated', True) + except Exception: + span.set_tag('sampling_rate_updated', False) + span.log_kv({'exception': format_exc()}) + with span: try: - is_processed = process_message(queue, known_tasks, reactor, msg_obj, current_span=span) + is_processed = process_message( + queue, known_tasks, reactor, msg_obj, current_span=span, sampling_config=sampling_config) if is_processed: span.set_tag(OPENTRACING_TAG_QUEUE_RESULT, 'success') else: @@ -173,7 +243,7 @@ def flow_simple_queue_processor(queue='', **execution_context): # TODO: some exit condition on failure: maybe when number of consecutive failures > n ? -def process_message(queue, known_tasks, reactor, msg_obj, current_span): +def process_message(queue, known_tasks, reactor, msg_obj, current_span, sampling_config=None): """ Proccess and execute a task. @@ -192,6 +262,9 @@ def process_message(queue, known_tasks, reactor, msg_obj, current_span): :param current_span: Current OpenTracing span. :type current_span: opentracing.Span + :param sampling_config: Dict holding all sampling rate info. + :type sampling_config: dict + :return: Return True if the message was processed successfully :rtype: bool """ @@ -247,7 +320,7 @@ def process_message(queue, known_tasks, reactor, msg_obj, current_span): return False with reactor.enter_task_context(taskname, t_hard, t_soft): - known_tasks[taskname](*func_args, task_context=task_context, **func_kwargs) + known_tasks[taskname](*func_args, task_context=task_context, sampling_config=sampling_config, **func_kwargs) return True diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index d90ccffd..2d0b499f 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -16,6 +16,7 @@ import urllib from urllib3.util import parse_url import math +import numpy from base64 import b64decode from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -956,11 +957,39 @@ def send_to_dataservice(cls, check_results, timeout=10): logger.error('Error in data service send: url={} ex={}'.format(cls._dataservice_url, ex)) raise + def is_sampled(self, sampling_config, check_id, is_alert, alert_changed): + """ + Return sampling bool flag. Sampling flag is computed via random non-uniform sampling. + + We always return True if: + - check_id is critical (exists in smapling_config['critical_checks'] list). + - is_alert or alert_changed is True. + - No sampling_config specified. + + If our self._account exists in smapling_config['worker_sampling'], then we always take this value. + """ + if not sampling_config or is_alert or alert_changed: + return True + + # check if we have a specific sampling for this worker (account) + worker_sampling = sampling_config.get('worker_sampling', {}).get(self._account, None) + default_sampling = sampling_config.get('default_sampling', 100) + critical_checks = sampling_config.get('critical_checks', []) + + if check_id in critical_checks: + return True + + sampling = min(default_sampling if worker_sampling is None else worker_sampling, 100) / 100. + + return bool(numpy.random.choice([False, True], 1, p=(1 - sampling, sampling))) + @trace(pass_span=True) def check_and_notify(self, req, alerts, task_context=None, **kwargs): # Current OpenTracing span. current_span = extract_span_from_kwargs(**kwargs) + sampling_config = kwargs.get('sampling_config', {}) + self.task_context = task_context start_time = time.time() # soft_time_limit = req['interval'] @@ -1008,7 +1037,7 @@ def check_and_notify(self, req, alerts, task_context=None, **kwargs): 'exc': 1}, req, alerts, force_alert=True) else: - self.notify(val, req, alerts) + self.notify(val, req, alerts, sampling_config=sampling_config) @trace(pass_span=True) def trial_run(self, req, alerts, task_context=None, **kwargs): @@ -1501,6 +1530,8 @@ def notify(self, val, req, alerts, force_alert=False, **kwargs): # OpenTracing current span! current_span = extract_span_from_kwargs(**kwargs) + sampling_config = kwargs.get('sampling_config', {}) + def ts_serialize(ts): return datetime.fromtimestamp(ts, tz=self._timezone).isoformat(' ') if ts else None @@ -1517,10 +1548,13 @@ def ts_serialize(ts): 'check_result': val, 'exception': True if isinstance(val, dict) and val.get('exc') else False, 'alerts': {}, + 'is_sampled': True, # By default we consider a check result sampled! } try: setp(req['check_id'], entity_id, 'notify loop') + all_alerts_state = [] + all_alerts_changed_state = [] for alert in alerts: alert_id = alert['id'] alert_entities_key = 'zmon:alerts:{}'.format(alert_id) @@ -1535,6 +1569,10 @@ def ts_serialize(ts): func = getattr(self.con, ('sadd' if is_alert else 'srem')) changed = bool(func(alert_entities_key, entity_id)) + # Used later in sampling evaluation + all_alerts_state.append(is_alert) + all_alerts_changed_state.append(changed) + if is_alert: # bubble up: also update global set of alerts alert_changed = func('zmon:alerts', alert_id) @@ -1549,7 +1587,7 @@ def ts_serialize(ts): if alert_changed: _log_event('ALERT_ENDED', alert, val) - # PF-3318 If an alert has malformed time period, we should evaluate it anyway and continue with + # If an alert has malformed time period, we should evaluate it anyway and continue with # the remaining alert definitions. try: is_in_period = in_period(alert.get('period', '')) @@ -1572,6 +1610,7 @@ def ts_serialize(ts): capt_json = json.dumps(captures, cls=JsonDataEncoder) except Exception, e: self.logger.exception('failed to serialize captures') + current_span.log_kv({'exception': traceback.format_exc()}) captures = {'exception': str(e)} capt_json = json.dumps(captures, cls=JsonDataEncoder) # FIXME - set is_alert = True? @@ -1700,6 +1739,12 @@ def ts_serialize(ts): if self._dataservice_poster: check_result['entity'] = {'id': req['entity']['id']} + # Get sampling flag + check_result['is_sampled'] = self.is_sampled( + sampling_config, req['check_id'], any(all_alerts_state), any(all_alerts_changed_state)) + + current_span.set_tag('is_sampled', check_result['is_sampled']) + for k in self._entity_tags: if k in req['entity']: check_result['entity'][k] = req['entity'][k] From 31df441014218ad25ff578a9459a4e0c543db206 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Thu, 29 Nov 2018 19:34:02 +0100 Subject: [PATCH 02/14] fix typo Signed-off-by: Mohab Usama --- tests/test_main_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_main_task.py b/tests/test_main_task.py index f032d90a..ec037616 100644 --- a/tests/test_main_task.py +++ b/tests/test_main_task.py @@ -337,5 +337,5 @@ def test_main_task_sampling_rate(monkeypatch, sampling_config, check_id, is_aler results = [task.is_sampled(sampling_config, check_id, is_alert, is_changed) for _ in range(100)] sampled = len([s for s in results if s]) - # We give some margin of error due to probabilistic sampling non-uniform sampling + # We give some margin of error due to probabilistic non-uniform sampling assert sampled >= 5 and sampled <= 20 From b37a56e9e348c540fd04d93d9ef5d49daa9d152e Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Fri, 30 Nov 2018 16:22:39 +0100 Subject: [PATCH 03/14] Low frequency check excluded from sampling Signed-off-by: Mohab Usama --- tests/test_main_task.py | 30 ++++++++++--------- zmon_worker_monitor/zmon_worker/tasks/main.py | 13 +++++--- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/tests/test_main_task.py b/tests/test_main_task.py index ec037616..bf190411 100644 --- a/tests/test_main_task.py +++ b/tests/test_main_task.py @@ -300,26 +300,28 @@ def test_main_task_configure_tags(monkeypatch, tags, result): assert task._entity_tags == result -@pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed,is_sampled', ( - ({'default_sampling': 100, 'critical_checks': []}, 11, False, False, True), - ({'default_sampling': 0, 'critical_checks': []}, 11, True, False, True), - ({'default_sampling': 0, 'critical_checks': []}, 11, False, True, True), - ({'default_sampling': 100, 'critical_checks': [11]}, 11, False, False, True), - ({'default_sampling': 0, 'critical_checks': []}, 11, False, False, False), - ({'default_sampling': 0, 'critical_checks': [11]}, 11, False, False, True), - ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {}}, 11, False, False, True), - ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, False, False, False), - ({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': 0}}, 11, False, False, True), - ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 100}}, 11, False, False, True), +@pytest.mark.parametrize('sampling_config,check_id,interval,is_alert,is_changed,is_sampled', ( + ({'default_sampling': 100, 'critical_checks': []}, 11, 60, False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 300, False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 60, True, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, True, True), + ({'default_sampling': 100, 'critical_checks': ['11']}, 11, 60, False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, False, False), + ({'default_sampling': 0, 'critical_checks': ['11']}, 11, 60, False, False, True), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {}}, 11, 60, False, False, True), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 60, False, False, False), + ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 360, False, False, True), + ({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': 0}}, 11, 60, False, False, True), + ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 100}}, 11, 60, False, False, True), )) -def test_main_task_sampling(monkeypatch, sampling_config, check_id, is_alert, is_changed, is_sampled): +def test_main_task_sampling(monkeypatch, sampling_config, check_id, interval, is_alert, is_changed, is_sampled): reload(plugin_manager) plugin_manager.init_plugin_manager() # init plugin manager MainTask.configure({'account': '123'}) task = MainTask() - assert task.is_sampled(sampling_config, check_id, is_alert, is_changed) is is_sampled + assert task.is_sampled(sampling_config, check_id, interval, is_alert, is_changed) is is_sampled @pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed', ( @@ -334,7 +336,7 @@ def test_main_task_sampling_rate(monkeypatch, sampling_config, check_id, is_aler MainTask.configure({'account': '123'}) task = MainTask() - results = [task.is_sampled(sampling_config, check_id, is_alert, is_changed) for _ in range(100)] + results = [task.is_sampled(sampling_config, check_id, 60, is_alert, is_changed) for _ in range(100)] sampled = len([s for s in results if s]) # We give some margin of error due to probabilistic non-uniform sampling diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index 2d0b499f..ae531bbf 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -67,6 +67,9 @@ # interval in seconds for storing metrics in Redis METRICS_INTERVAL = 15 +# Any check interval below this threshold is eligible for sampling. Above this threshold check will be always sampled. +SAMPLING_INTERVAL_THRESHOLD = 300 + DEFAULT_CHECK_RESULTS_HISTORY_LENGTH = 20 TRIAL_RUN_RESULT_EXPIRY_TIME = 300 @@ -957,18 +960,19 @@ def send_to_dataservice(cls, check_results, timeout=10): logger.error('Error in data service send: url={} ex={}'.format(cls._dataservice_url, ex)) raise - def is_sampled(self, sampling_config, check_id, is_alert, alert_changed): + def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_changed): """ Return sampling bool flag. Sampling flag is computed via random non-uniform sampling. We always return True if: + - interval is above the SAMPLING_INTERVAL_THRESHOLD - check_id is critical (exists in smapling_config['critical_checks'] list). - is_alert or alert_changed is True. - No sampling_config specified. If our self._account exists in smapling_config['worker_sampling'], then we always take this value. """ - if not sampling_config or is_alert or alert_changed: + if not sampling_config or is_alert or alert_changed or interval >= SAMPLING_INTERVAL_THRESHOLD: return True # check if we have a specific sampling for this worker (account) @@ -976,7 +980,7 @@ def is_sampled(self, sampling_config, check_id, is_alert, alert_changed): default_sampling = sampling_config.get('default_sampling', 100) critical_checks = sampling_config.get('critical_checks', []) - if check_id in critical_checks: + if str(check_id) in critical_checks or check_id in critical_checks: return True sampling = min(default_sampling if worker_sampling is None else worker_sampling, 100) / 100. @@ -1741,7 +1745,8 @@ def ts_serialize(ts): # Get sampling flag check_result['is_sampled'] = self.is_sampled( - sampling_config, req['check_id'], any(all_alerts_state), any(all_alerts_changed_state)) + sampling_config, req['check_id'], req['interval'], any(all_alerts_state), + any(all_alerts_changed_state)) current_span.set_tag('is_sampled', check_result['is_sampled']) From f9bfa019a51d4357d06abbca030087ccc164d96d Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 11:31:27 +0100 Subject: [PATCH 04/14] Add span Signed-off-by: Mohab Usama --- zmon_worker_monitor/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index 4d989d46..f7cf16c2 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -205,7 +205,7 @@ def flow_simple_queue_processor(queue='', **execution_context): # Get sampling rates. We update every minute. if (datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION: try: - sampling_config = get_sampling_rate_config(config) + sampling_config = get_sampling_rate_config(config, span) span.set_tag('sampling_rate_updated', True) except Exception: span.set_tag('sampling_rate_updated', False) From b05cad4534ca855e8ce5ccd25085daa9c40d8ba9 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 12:52:42 +0100 Subject: [PATCH 05/14] Enhance OT span with sampling info Signed-off-by: Mohab Usama --- tests/test_main_task.py | 10 +++++++--- zmon_worker_monitor/workflow.py | 2 +- zmon_worker_monitor/zmon_worker/tasks/main.py | 11 ++++++++--- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/tests/test_main_task.py b/tests/test_main_task.py index bf190411..3bbf188b 100644 --- a/tests/test_main_task.py +++ b/tests/test_main_task.py @@ -311,17 +311,19 @@ def test_main_task_configure_tags(monkeypatch, tags, result): ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {}}, 11, 60, False, False, True), ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 60, False, False, False), ({'default_sampling': 100, 'critical_checks': [], 'worker_sampling': {'123': 0}}, 11, 360, False, False, True), - ({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': 0}}, 11, 60, False, False, True), + ({'default_sampling': 100, 'critical_checks': [11], 'worker_sampling': {'123': '0'}}, 11, 60, False, False, True), ({'default_sampling': 0, 'critical_checks': [], 'worker_sampling': {'123': 100}}, 11, 60, False, False, True), )) def test_main_task_sampling(monkeypatch, sampling_config, check_id, interval, is_alert, is_changed, is_sampled): reload(plugin_manager) plugin_manager.init_plugin_manager() # init plugin manager + span = MagicMock() + MainTask.configure({'account': '123'}) task = MainTask() - assert task.is_sampled(sampling_config, check_id, interval, is_alert, is_changed) is is_sampled + assert task.is_sampled(sampling_config, check_id, interval, is_alert, is_changed, span) is is_sampled @pytest.mark.parametrize('sampling_config,check_id,is_alert,is_changed', ( @@ -333,10 +335,12 @@ def test_main_task_sampling_rate(monkeypatch, sampling_config, check_id, is_aler reload(plugin_manager) plugin_manager.init_plugin_manager() # init plugin manager + span = MagicMock() + MainTask.configure({'account': '123'}) task = MainTask() - results = [task.is_sampled(sampling_config, check_id, 60, is_alert, is_changed) for _ in range(100)] + results = [task.is_sampled(sampling_config, check_id, 60, is_alert, is_changed, span) for _ in range(100)] sampled = len([s for s in results if s]) # We give some margin of error due to probabilistic non-uniform sampling diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index f7cf16c2..dd98026a 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -63,7 +63,7 @@ def get_sampling_rate_config(config, current_span): } } """ - default_sampling = config.get('zmon.sampling.rate') + default_sampling = int(config.get('zmon.sampling.rate', 100)) critical_checks = config.get('zmon.critical.checks') if type(critical_checks) is not list: critical_checks = critical_checks.replace(' ', '').split(',') diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index ae531bbf..0e2d1a65 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -960,7 +960,7 @@ def send_to_dataservice(cls, check_results, timeout=10): logger.error('Error in data service send: url={} ex={}'.format(cls._dataservice_url, ex)) raise - def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_changed): + def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_changed, current_span): """ Return sampling bool flag. Sampling flag is computed via random non-uniform sampling. @@ -973,6 +973,7 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change If our self._account exists in smapling_config['worker_sampling'], then we always take this value. """ if not sampling_config or is_alert or alert_changed or interval >= SAMPLING_INTERVAL_THRESHOLD: + current_span.log_kv({'sampling_rate': 100}) return True # check if we have a specific sampling for this worker (account) @@ -981,9 +982,13 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change critical_checks = sampling_config.get('critical_checks', []) if str(check_id) in critical_checks or check_id in critical_checks: + current_span.log_kv({'sampling_rate': 100}) + current_span.set_tag('critical_check', True) return True - sampling = min(default_sampling if worker_sampling is None else worker_sampling, 100) / 100. + sampling = min(default_sampling if worker_sampling is None else int(worker_sampling), 100) / 100. + + current_span.log_kv({'sampling_rate': sampling}) return bool(numpy.random.choice([False, True], 1, p=(1 - sampling, sampling))) @@ -1746,7 +1751,7 @@ def ts_serialize(ts): # Get sampling flag check_result['is_sampled'] = self.is_sampled( sampling_config, req['check_id'], req['interval'], any(all_alerts_state), - any(all_alerts_changed_state)) + any(all_alerts_changed_state), current_span) current_span.set_tag('is_sampled', check_result['is_sampled']) From a98eaadb13994931803700a3369c2bdedb15fee8 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 14:34:37 +0100 Subject: [PATCH 06/14] log sampling config Signed-off-by: Mohab Usama --- zmon_worker_monitor/workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index dd98026a..ddf895bf 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -206,6 +206,7 @@ def flow_simple_queue_processor(queue='', **execution_context): if (datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION: try: sampling_config = get_sampling_rate_config(config, span) + span.log_kv({'sampling_config': sampling_config}) span.set_tag('sampling_rate_updated', True) except Exception: span.set_tag('sampling_rate_updated', False) From 0e9e376c121c7d64c7999bd4ada09fdd06a53ed3 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 14:45:26 +0100 Subject: [PATCH 07/14] Add sampling ignored tag Signed-off-by: Mohab Usama --- zmon_worker_monitor/zmon_worker/tasks/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index 0e2d1a65..344f64c1 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -974,6 +974,7 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change """ if not sampling_config or is_alert or alert_changed or interval >= SAMPLING_INTERVAL_THRESHOLD: current_span.log_kv({'sampling_rate': 100}) + current_span.set_tag('sampling_ignored', True) return True # check if we have a specific sampling for this worker (account) @@ -983,6 +984,7 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change if str(check_id) in critical_checks or check_id in critical_checks: current_span.log_kv({'sampling_rate': 100}) + current_span.set_tag('sampling_ignored', True) current_span.set_tag('critical_check', True) return True From 3c85a7adf1371853a13992efeb1cf80d2ac92226 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 15:21:47 +0100 Subject: [PATCH 08/14] More logs Signed-off-by: Mohab Usama --- zmon_worker_monitor/zmon_worker/tasks/main.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index 344f64c1..b876c8e8 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -1755,6 +1755,8 @@ def ts_serialize(ts): sampling_config, req['check_id'], req['interval'], any(all_alerts_state), any(all_alerts_changed_state), current_span) + current_span.log_kv({'alerts_state': all_alerts_state}) + current_span.log_kv({'changed_state': all_alerts_changed_state}) current_span.set_tag('is_sampled', check_result['is_sampled']) for k in self._entity_tags: From 74326b4a61dfa916a11aee3a3108cec1709d8214 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 15:46:31 +0100 Subject: [PATCH 09/14] Fix interval condition Signed-off-by: Mohab Usama --- tests/test_main_task.py | 4 ++-- zmon_worker_monitor/zmon_worker/tasks/main.py | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/test_main_task.py b/tests/test_main_task.py index 3bbf188b..dd884629 100644 --- a/tests/test_main_task.py +++ b/tests/test_main_task.py @@ -301,8 +301,8 @@ def test_main_task_configure_tags(monkeypatch, tags, result): @pytest.mark.parametrize('sampling_config,check_id,interval,is_alert,is_changed,is_sampled', ( - ({'default_sampling': 100, 'critical_checks': []}, 11, 60, False, False, True), - ({'default_sampling': 0, 'critical_checks': []}, 11, 300, False, False, True), + ({'default_sampling': 100, 'critical_checks': []}, 11, '60', False, False, True), + ({'default_sampling': 0, 'critical_checks': []}, 11, '300', False, False, True), ({'default_sampling': 0, 'critical_checks': []}, 11, 60, True, False, True), ({'default_sampling': 0, 'critical_checks': []}, 11, 60, False, True, True), ({'default_sampling': 100, 'critical_checks': ['11']}, 11, 60, False, False, True), diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index b876c8e8..9718a1a6 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -972,8 +972,7 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change If our self._account exists in smapling_config['worker_sampling'], then we always take this value. """ - if not sampling_config or is_alert or alert_changed or interval >= SAMPLING_INTERVAL_THRESHOLD: - current_span.log_kv({'sampling_rate': 100}) + if not sampling_config or is_alert or alert_changed or int(interval) >= SAMPLING_INTERVAL_THRESHOLD: current_span.set_tag('sampling_ignored', True) return True @@ -983,7 +982,6 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change critical_checks = sampling_config.get('critical_checks', []) if str(check_id) in critical_checks or check_id in critical_checks: - current_span.log_kv({'sampling_rate': 100}) current_span.set_tag('sampling_ignored', True) current_span.set_tag('critical_check', True) return True From bb95239af028096c1a02c59b44f98c9cc1cc1848 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 16:00:59 +0100 Subject: [PATCH 10/14] Add interval tag Signed-off-by: Mohab Usama --- zmon_worker_monitor/zmon_worker/tasks/main.py | 1 + 1 file changed, 1 insertion(+) diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index 9718a1a6..4d9301d3 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -973,6 +973,7 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change If our self._account exists in smapling_config['worker_sampling'], then we always take this value. """ if not sampling_config or is_alert or alert_changed or int(interval) >= SAMPLING_INTERVAL_THRESHOLD: + current_span.set_tag('interval', interval) current_span.set_tag('sampling_ignored', True) return True From 72c0bc0be04f2b697db372b791bfee2a86e4efce Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 16:22:16 +0100 Subject: [PATCH 11/14] Get sampling config on first run Signed-off-by: Mohab Usama --- zmon_worker_monitor/workflow.py | 5 +++-- zmon_worker_monitor/zmon_worker/tasks/main.py | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index ddf895bf..1b0fe29c 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -175,7 +175,7 @@ def flow_simple_queue_processor(queue='', **execution_context): count = 0 sampling_rate_last_updated = datetime.utcnow() - sampling_config = {} + sampling_config = None while True: try: @@ -203,7 +203,8 @@ def flow_simple_queue_processor(queue='', **execution_context): span.set_operation_name(OPENTRACING_QUEUE_OPERATION) # Get sampling rates. We update every minute. - if (datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION: + if sampling_config is None or ( + (datetime.utcnow() - sampling_rate_last_updated).seconds > SAMPLING_RATE_UPDATE_DURATION): try: sampling_config = get_sampling_rate_config(config, span) span.log_kv({'sampling_config': sampling_config}) diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index 4d9301d3..90b1d529 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -973,7 +973,10 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change If our self._account exists in smapling_config['worker_sampling'], then we always take this value. """ if not sampling_config or is_alert or alert_changed or int(interval) >= SAMPLING_INTERVAL_THRESHOLD: + current_span.log_kv({'sampling_config': sampling_config}) current_span.set_tag('interval', interval) + current_span.set_tag('is_alert', is_alert) + current_span.set_tag('alert_changed', alert_changed) current_span.set_tag('sampling_ignored', True) return True From 6a34b654c65fc8c51517a22c2503ac81baf9a813 Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Mon, 3 Dec 2018 16:40:15 +0100 Subject: [PATCH 12/14] Fix task args Signed-off-by: Mohab Usama --- zmon_worker_monitor/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zmon_worker_monitor/tasks.py b/zmon_worker_monitor/tasks.py index bc7457a2..a4bba2db 100644 --- a/zmon_worker_monitor/tasks.py +++ b/zmon_worker_monitor/tasks.py @@ -24,14 +24,14 @@ def configure_tasks(config): def check_and_notify(req, alerts, task_context=None, **kwargs): logger.debug('check_and_notify received req=%s, alerts=%s, task_context=%s, ', req, alerts, task_context) - zmontask.check_and_notify(req, alerts, task_context=task_context) + zmontask.check_and_notify(req, alerts, task_context=task_context, **kwargs) def trial_run(req, alerts, task_context=None, **kwargs): logger.info('trial_run received <== check_id=%s', req['check_id']) logger.debug('trial_run received req=%s, alerts=%s, task_context=%s, ', req, alerts, task_context) - zmontask.trial_run(req, alerts, task_context=task_context) + zmontask.trial_run(req, alerts, task_context=task_context, **kwargs) def cleanup(*args, **kwargs): From 2f0f0d45a6a0eb506f268710b7a9207c09de903a Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Tue, 4 Dec 2018 11:49:08 +0100 Subject: [PATCH 13/14] Fix url Signed-off-by: Mohab Usama --- zmon_worker_monitor/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zmon_worker_monitor/workflow.py b/zmon_worker_monitor/workflow.py index 1b0fe29c..7c4d0122 100644 --- a/zmon_worker_monitor/workflow.py +++ b/zmon_worker_monitor/workflow.py @@ -84,7 +84,7 @@ def get_sampling_rate_config(config, current_span): tokens.configure() tokens.manage('uid', ['uid']) - url = '{}/api/v1/{}'.format(zmon_url, SAMPLING_RATE_ENTITY_ID) + url = '{}/api/v1/entities/{}'.format(zmon_url, SAMPLING_RATE_ENTITY_ID) headers = {'Authorization': 'Bearer {}'.format(tokens.get('uid'))} resp = requests.get(url, headers=headers, timeout=2) From e4dc705a6a399f84498e78b1e2366044e9ef2bdc Mon Sep 17 00:00:00 2001 From: Mohab Usama Date: Tue, 4 Dec 2018 18:22:01 +0100 Subject: [PATCH 14/14] log correct sampling rate Signed-off-by: Mohab Usama --- zmon_worker_monitor/zmon_worker/tasks/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zmon_worker_monitor/zmon_worker/tasks/main.py b/zmon_worker_monitor/zmon_worker/tasks/main.py index 90b1d529..649808a1 100755 --- a/zmon_worker_monitor/zmon_worker/tasks/main.py +++ b/zmon_worker_monitor/zmon_worker/tasks/main.py @@ -973,7 +973,6 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change If our self._account exists in smapling_config['worker_sampling'], then we always take this value. """ if not sampling_config or is_alert or alert_changed or int(interval) >= SAMPLING_INTERVAL_THRESHOLD: - current_span.log_kv({'sampling_config': sampling_config}) current_span.set_tag('interval', interval) current_span.set_tag('is_alert', is_alert) current_span.set_tag('alert_changed', alert_changed) @@ -990,9 +989,10 @@ def is_sampled(self, sampling_config, check_id, interval, is_alert, alert_change current_span.set_tag('critical_check', True) return True - sampling = min(default_sampling if worker_sampling is None else int(worker_sampling), 100) / 100. + sampling_rate = default_sampling if worker_sampling is None else int(worker_sampling) + sampling = min(sampling_rate, 100) / 100. - current_span.log_kv({'sampling_rate': sampling}) + current_span.log_kv({'sampling_rate': sampling_rate}) return bool(numpy.random.choice([False, True], 1, p=(1 - sampling, sampling)))