diff --git a/numalogic/udfs/inference.py b/numalogic/udfs/inference.py index 96a5c2f6..38ea614e 100644 --- a/numalogic/udfs/inference.py +++ b/numalogic/udfs/inference.py @@ -104,7 +104,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: Messages instance """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) # Construct payload object json_data_payload = orjson.loads(datum.value) @@ -121,7 +121,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _stream_conf = self.get_stream_conf(payload.config_id) _conf = _stream_conf.ml_pipelines[payload.pipeline_id] - log = log_data_payload_values(log, json_data_payload) + logger = log_data_payload_values(logger, json_data_payload) artifact_data, payload = _load_artifact( skeys=[_ckey for _, _ckey in zip(_stream_conf.composite_keys, payload.composite_keys)], @@ -137,7 +137,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: msgs = Messages(get_trainer_message(keys, _stream_conf, payload)) if _conf.numalogic_conf.score.adjust: msgs.append(get_static_thresh_message(keys, payload)) - log.exception("Artifact model not loaded!") + logger.exception("Artifact model not loaded!") return msgs # Perform inference @@ -146,7 +146,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _update_info_metric(x_inferred, payload.metrics, _metric_label_values) except RuntimeError: _increment_counter(counter=RUNTIME_ERROR_COUNTER, labels=_metric_label_values) - log.exception( + logger.exception( "Runtime inference error!", keys=payload.composite_keys, metrics=payload.metrics, @@ -174,13 +174,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) # Send trainer message if artifact is stale if status == Status.ARTIFACT_STALE: - log.info("Inference artifact found is stale") + logger.info("Inference artifact found is stale") msgs.append(get_trainer_message(keys, _stream_conf, payload, *_metric_label_values)) _increment_counter(counter=MSG_PROCESSED_COUNTER, labels=_metric_label_values) msgs.append(Message(keys=keys, value=payload.to_json(), tags=["postprocess"])) - log.info( + logger.info( "Successfully inferred!", keys=payload.composite_keys, metrics=payload.metrics, diff --git a/numalogic/udfs/payloadtx.py b/numalogic/udfs/payloadtx.py index a7e9fef2..7251a433 100644 --- a/numalogic/udfs/payloadtx.py +++ b/numalogic/udfs/payloadtx.py @@ -45,13 +45,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) # check message sanity try: data_payload = orjson.loads(datum.value) except (orjson.JSONDecodeError, KeyError): # catch json decode error only - log.exception("Error while decoding input json") + logger.exception("Error while decoding input json") return Messages(Message.to_drop()) _stream_conf = self.get_stream_conf(data_payload["config_id"]) @@ -62,8 +62,8 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: data_payload["pipeline_id"] = pipeline messages.append(Message(keys=keys, value=orjson.dumps(data_payload))) - log = log_data_payload_values(log, data_payload) - log.info( + logger = log_data_payload_values(logger, data_payload) + logger.info( "Appended pipeline id to the payload", keys=keys, execution_time_ms=round((time.perf_counter() - _start_time) * 1000, 4), diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index 7528be91..36af8195 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -87,7 +87,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) # Construct payload object json_payload = orjson.loads(datum.value) @@ -111,7 +111,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: thresh_cfg = _conf.numalogic_conf.threshold postprocess_cfg = _conf.numalogic_conf.postprocess - log = log_data_payload_values(log, json_payload) + logger = log_data_payload_values(logger, json_payload) # load artifact thresh_artifact, payload = _load_artifact( @@ -135,7 +135,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: return msgs if payload.header == Header.STATIC_INFERENCE: - log.warning("Static inference not supported in postprocess yet") + logger.warning("Static inference not supported in postprocess yet") # Postprocess payload try: @@ -157,7 +157,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: except RuntimeError: _increment_counter(RUNTIME_ERROR_COUNTER, _metric_label_values) - log.exception( + logger.exception( "Runtime postprocess error!", uuid=payload.uuid, composite_keys=payload.composite_keys, @@ -199,7 +199,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: labels=_metric_label_values, ) - log.info( + logger.info( "Successfully post-processed!", composite_keys=out_payload.composite_keys, unified_anomaly=out_payload.unified_anomaly, diff --git a/numalogic/udfs/preprocess.py b/numalogic/udfs/preprocess.py index 4893ab97..ee1cec8f 100644 --- a/numalogic/udfs/preprocess.py +++ b/numalogic/udfs/preprocess.py @@ -101,20 +101,20 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) # check message sanity try: data_payload = orjson.loads(datum.value) except (orjson.JSONDecodeError, KeyError): # catch json decode error only - log.exception("Error while decoding input json") + logger.exception("Error while decoding input json") return Messages(Message.to_drop()) _stream_conf = self.get_stream_conf(data_payload["config_id"]) _conf = _stream_conf.ml_pipelines[data_payload.get("pipeline_id", "default")] raw_df, timestamps = get_df(data_payload=data_payload, stream_conf=_stream_conf) - log = log_data_payload_values(log, data_payload) + logger = log_data_payload_values(logger, data_payload) source = NUMALOGIC_METRICS if ( @@ -134,9 +134,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _increment_counter(counter=MSG_IN_COUNTER, labels=_metric_label_values) # Drop message if dataframe shape conditions are not met if raw_df.shape[0] < _stream_conf.window_size or raw_df.shape[1] != len(_conf.metrics): - log.critical( - "Dataframe shape: (%f, %f) conditions not met ", raw_df.shape[0], raw_df.shape[1] - ) + logger.critical("Dataframe shape conditions not met ", raw_df_shape=raw_df.shape) _increment_counter( counter=DATASHAPE_ERROR_COUNTER, labels=_metric_label_values, @@ -168,12 +166,12 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: if preproc_artifact: preproc_clf = preproc_artifact.artifact payload = replace(payload, status=Status.ARTIFACT_FOUND) - log = log.bind(artifact_source=preproc_artifact.extras.get("source")) + logger = logger.bind(artifact_source=preproc_artifact.extras.get("source")) else: msgs = Messages(get_trainer_message(keys, _stream_conf, payload)) if _conf.numalogic_conf.score.adjust: msgs.append(get_static_thresh_message(keys, payload)) - log.exception("Artifact model not loaded!") + logger.exception("Artifact model not loaded!") return msgs # Model will not be in registry else: @@ -181,7 +179,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: _increment_counter(SOURCE_COUNTER, labels=("config", *_metric_label_values)) preproc_clf = self._load_model_from_config(_conf.numalogic_conf.preprocess) payload = replace(payload, status=Status.ARTIFACT_FOUND) - log = log.bind(model_from_config=preproc_clf) + logger = logger.bind(model_from_config=preproc_clf) try: x_scaled = self.compute(model=preproc_clf, input_=payload.get_data()) @@ -197,7 +195,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: status=Status.ARTIFACT_FOUND, header=Header.MODEL_INFERENCE, ) - log.info( + logger.info( "Successfully preprocessed!", keys=keys, payload_metrics=payload.metrics, @@ -209,7 +207,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: counter=RUNTIME_ERROR_COUNTER, labels=_metric_label_values, ) - log.exception( + logger.exception( "Runtime preprocess error!", status=Status.RUNTIME_ERROR, payload_metrics=payload.metrics, diff --git a/numalogic/udfs/staticthresh.py b/numalogic/udfs/staticthresh.py index bc983842..1be7e3ab 100644 --- a/numalogic/udfs/staticthresh.py +++ b/numalogic/udfs/staticthresh.py @@ -46,11 +46,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: conf = self.get_ml_pipeline_conf(payload.config_id, payload.pipeline_id) adjust_conf = conf.numalogic_conf.score.adjust - log = _struct_log.bind(udf_vertex=self._vtx) - log = log_data_payload_values(log, json_data_payload) + logger = _struct_log.bind(udf_vertex=self._vtx) + logger = log_data_payload_values(logger, json_data_payload) if not adjust_conf: - log.warning("No score adjust config found") + logger.warning("No score adjust config found") return Messages(Message.to_drop()) try: @@ -60,7 +60,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) y_unified = self.compute_unified_score(y_features, adjust_conf.feature_agg) except RuntimeError: - log.exception("Error occurred while computing static anomaly scores") + logger.exception("Error occurred while computing static anomaly scores") return Messages(Message.to_drop()) out_payload = OutputPayload( @@ -73,7 +73,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: data=self._additional_scores(adjust_conf, y_features, y_unified), metadata=payload.metadata, ) - log.info( + logger.info( "Sending output payload", keys=out_payload.composite_keys, y_unified=y_unified, diff --git a/numalogic/udfs/tools.py b/numalogic/udfs/tools.py index 4fe9b6a4..b5541db4 100644 --- a/numalogic/udfs/tools.py +++ b/numalogic/udfs/tools.py @@ -167,7 +167,7 @@ def _load_artifact( payload.pipeline_id, ) - log = _struct_log.bind( + logger = _struct_log.bind( uuid=payload.uuid, skeys=skeys, dkeys=dkeys, payload_metrics=payload.metrics ) @@ -177,11 +177,11 @@ def _load_artifact( key = ":".join(dkeys) if key in artifact_version: version_to_load = artifact_version[key] - log.debug("Found version info for keys") + logger.debug("Found version info for keys") else: - log.debug("Could not find what version of model to load") + logger.debug("Could not find what version of model to load") else: - log.debug( + logger.debug( "No version info passed on! Loading latest artifact version, " "if one present in the registry" ) @@ -195,19 +195,19 @@ def _load_artifact( ) except RedisRegistryError: _increment_counter(REDIS_ERROR_COUNTER, labels=_metric_label_values) - log.warning("Error while fetching artifact") + logger.warning("Error while fetching artifact") return None, payload except Exception: _increment_counter(EXCEPTION_COUNTER, labels=_metric_label_values) - log.exception("Unhandled exception while fetching artifact") + logger.exception("Unhandled exception while fetching artifact") return None, payload else: - log = log.bind( + logger = logger.bind( artifact_source=artifact_data.extras.get("source"), artifact_version=artifact_data.extras.get("version"), ) - log.debug("Loaded Model!") + logger.debug("Loaded Model!") _increment_counter( counter=SOURCE_COUNTER, labels=(artifact_data.extras.get("source"), *_metric_label_values), @@ -339,10 +339,10 @@ def ack_read( ): _struct_log.debug( "There was insufficient data for the key in the past. Retrying fetching" - " and training after %s secs", - uuid, - key, - ((min_train_records - int(_msg_train_records)) * data_freq) + " and training after secs", + uuid=uuid, + key=key, + secs=((min_train_records - int(_msg_train_records)) * data_freq) - _curr_time + float(_msg_read_ts), ) diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index d2d2fbcc..8ab91cd3 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -151,7 +151,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: Messages instance (no forwarding) """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) # Construct payload object json_payload = orjson.loads(datum.value) @@ -171,7 +171,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: labels=[self._vtx, *_metric_label_values], ) - log = log_data_payload_values(log, json_payload) + logger = log_data_payload_values(logger, json_payload) # set the retry and retrain_freq retrain_freq_ts = _conf.numalogic_conf.trainer.retrain_freq_hr @@ -200,7 +200,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: counter=MSG_DROPPED_COUNTER, labels=(self._vtx, *_metric_label_values), ) - log.warning( + logger.warning( "Caught exception/error while fetching from source", uuid=payload.uuid, keys=payload.composite_keys, @@ -210,7 +210,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: # Check if data is sufficient if not self._is_data_sufficient(payload, df): - log.warning( + logger.warning( "Insufficient data found", uuid=payload.uuid, keys=payload.composite_keys, @@ -226,7 +226,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) return Messages(Message.to_drop()) - log.debug("Data fetched", uuid=payload.uuid, shape=df.shape) + logger.debug("Data fetched", uuid=payload.uuid, shape=df.shape) # Construct feature array x_train, nan_counter, inf_counter = self.get_feature_arr(df, _conf.metrics) @@ -264,14 +264,14 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: model_registry=self.model_registry, payload=payload, vertex_name=self._vtx, - log=log, + logger=logger, ) if self.train_msg_deduplicator.ack_train( key=[*payload.composite_keys, payload.pipeline_id], uuid=payload.uuid ): - log.info("Model trained and saved successfully", uuid=payload.uuid) + logger.info("Model trained and saved successfully", uuid=payload.uuid) - log.debug( + logger.debug( "Time taken in trainer", execution_time_secs=round(time.perf_counter() - _start_time, 4) ) _increment_counter( @@ -303,7 +303,7 @@ def artifacts_to_save( model_registry, payload: TrainerPayload, vertex_name: str, - log, + logger, ) -> None: """ Save artifacts. @@ -335,10 +335,14 @@ def artifacts_to_save( counter=REDIS_ERROR_COUNTER, labels=(vertex_name, ":".join(payload.composite_keys), payload.config_id), ) - log.exception("Error while saving artifact with skeys", uuid=payload.uuid, skeys=skeys) + logger.exception( + "Error while saving artifact with skeys", uuid=payload.uuid, skeys=skeys + ) else: - log.info("Artifact saved with with versions", uuid=payload.uuid, version_dict=ver_dict) + logger.info( + "Artifact saved with with versions", uuid=payload.uuid, version_dict=ver_dict + ) def _is_data_sufficient(self, payload: TrainerPayload, df: pd.DataFrame) -> bool: _conf = self.get_ml_pipeline_conf( diff --git a/numalogic/udfs/trainer/_druid.py b/numalogic/udfs/trainer/_druid.py index db20f276..f21f9d77 100644 --- a/numalogic/udfs/trainer/_druid.py +++ b/numalogic/udfs/trainer/_druid.py @@ -92,7 +92,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: Dataframe """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) _metric_label_values = ( payload.composite_keys, @@ -133,7 +133,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: counter=FETCH_EXCEPTION_COUNTER, labels=_metric_label_values, ) - log.exception("Error while fetching data from druid") + logger.exception("Error while fetching data from druid") return None _end_time = time.perf_counter() - _start_time _add_summary( @@ -142,7 +142,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: data=_end_time, ) - log.info( + logger.info( "Fetched data from druid", uuid=payload.uuid, config_id=payload.config_id, diff --git a/numalogic/udfs/trainer/_prom.py b/numalogic/udfs/trainer/_prom.py index 1f1d6beb..07c4e549 100644 --- a/numalogic/udfs/trainer/_prom.py +++ b/numalogic/udfs/trainer/_prom.py @@ -60,7 +60,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: Dataframe """ _start_time = time.perf_counter() - log = _struct_log.bind(udf_vertex=self._vtx) + logger = _struct_log.bind(udf_vertex=self._vtx) _metric_label_values = ( payload.composite_keys, @@ -92,7 +92,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: counter=FETCH_EXCEPTION_COUNTER, labels=_metric_label_values, ) - log.exception("Error while fetching data from Prometheus", uuid=payload.uuid) + logger.exception("Error while fetching data from Prometheus", uuid=payload.uuid) return None _end_time = time.perf_counter() - _start_time _add_summary( @@ -100,7 +100,7 @@ def fetch_data(self, payload: TrainerPayload) -> Optional[pd.DataFrame]: labels=_metric_label_values, data=_end_time, ) - log.info( + logger.info( "Fetched data from Prometheus", uuid=payload.uuid, config_id=payload.config_id,