diff --git a/dbt_common/events/base_types.py b/dbt_common/events/base_types.py index 78b03682..f5a30f1b 100644 --- a/dbt_common/events/base_types.py +++ b/dbt_common/events/base_types.py @@ -151,6 +151,22 @@ def msg_from_base_event(event: BaseEvent, level: Optional[EventLevel] = None): return new_event +def msg_to_dict(msg: EventMsg) -> dict: + msg_dict = MessageToDict( + msg, + preserving_proto_field_name=True, + including_default_value_fields=True, # type: ignore + ) + # We don't want an empty NodeInfo in output + if ( + "data" in msg_dict + and "node_info" in msg_dict["data"] + and msg_dict["data"]["node_info"]["node_name"] == "" + ): + del msg_dict["data"]["node_info"] + return msg_dict + + # DynamicLevel requires that the level be supplied on the # event construction call using the "info" function from functions.py class DynamicLevel(BaseEvent): diff --git a/dbt_common/events/cookie.py b/dbt_common/events/cookie.py new file mode 100644 index 00000000..fb659c47 --- /dev/null +++ b/dbt_common/events/cookie.py @@ -0,0 +1,32 @@ +from pathlib import Path +import uuid +from typing import Any, Dict + +import yaml + +# the C version is faster, but it doesn't always exist +try: + from yaml import CSafeLoader as SafeLoader +except ImportError: + from yaml import SafeLoader + + +class Cookie: + def __init__(self, directory: Path) -> None: + self.id: str = str(uuid.uuid4()) + self.path: Path = directory / ".user.yml" + self.save() + + def as_dict(self) -> Dict[str, Any]: + return {"id": self.id} + + def save(self) -> None: + with open(self.path, "w") as fh: + yaml.dump(self.as_dict(), fh) + + def load(self) -> Dict[str, Any]: + with open(self.path, "r") as fh: + try: + return yaml.load(fh, Loader=SafeLoader) + except yaml.reader.ReaderError: + return {} diff --git a/dbt_common/events/event_manager.py b/dbt_common/events/event_manager.py index 507588f3..b2eb5b72 100644 --- a/dbt_common/events/event_manager.py +++ b/dbt_common/events/event_manager.py @@ -3,13 +3,18 @@ from typing import List, Optional, Protocol, Tuple from dbt_common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, TCallback +from dbt_common.events.functions import track, tracker_factory from dbt_common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat +from dbt_common.events.tracker import TrackerConfig, Tracker +from dbt_common.events.user import User class EventManager: def __init__(self) -> None: self.loggers: List[_Logger] = [] + self.trackers: List[Tracker] = [] self.callbacks: List[TCallback] = [] + self.user: Optional[User] = None def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: msg = msg_from_base_event(e, level=level) @@ -28,6 +33,9 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: if logger.filter(msg): # type: ignore logger.write_line(msg) + for tracker in self.trackers: + track(tracker, self.user, msg) + for callback in self.callbacks: callback(msg) @@ -37,9 +45,15 @@ def add_logger(self, config: LoggerConfig) -> None: ) self.loggers.append(logger) + def add_tracker(self, config: TrackerConfig) -> None: + self.trackers.append(tracker_factory(config)) + def add_callback(self, callback: TCallback) -> None: self.callbacks.append(callback) + def add_user(self, user: User) -> None: + self.user = user + def flush(self) -> None: for logger in self.loggers: logger.flush() @@ -48,6 +62,7 @@ def flush(self) -> None: class IEventManager(Protocol): callbacks: List[TCallback] loggers: List[_Logger] + trackers: List[Tracker] def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: ... @@ -55,6 +70,9 @@ def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None: def add_logger(self, config: LoggerConfig) -> None: ... + def add_tracker(self, config: TrackerConfig) -> None: + ... + def add_callback(self, callback: TCallback) -> None: ... diff --git a/dbt_common/events/event_manager_client.py b/dbt_common/events/event_manager_client.py index 538d3199..c617d0fa 100644 --- a/dbt_common/events/event_manager_client.py +++ b/dbt_common/events/event_manager_client.py @@ -17,6 +17,11 @@ def add_logger_to_manager(logger) -> None: _EVENT_MANAGER.add_logger(logger) +def add_tracker_to_manager(tracker) -> None: + global _EVENT_MANAGER + _EVENT_MANAGER.add_tracker(tracker) + + def add_callback_to_manager(callback: TCallback) -> None: global _EVENT_MANAGER _EVENT_MANAGER.add_callback(callback) @@ -32,4 +37,5 @@ def cleanup_event_logger() -> None: # especially important for tests, since pytest replaces the stdout stream # during test runs, and closes the stream after the test is over. _EVENT_MANAGER.loggers.clear() + _EVENT_MANAGER.trackers.clear() _EVENT_MANAGER.callbacks.clear() diff --git a/dbt_common/events/functions.py b/dbt_common/events/functions.py index 4e055aa4..5126fd6f 100644 --- a/dbt_common/events/functions.py +++ b/dbt_common/events/functions.py @@ -1,20 +1,27 @@ -from pathlib import Path - -from dbt_common.events.event_manager_client import get_event_manager -from dbt_common.exceptions import EventCompilationError -from dbt_common.invocation import get_invocation_id -from dbt_common.helper_types import WarnErrorOptions -from dbt_common.utils.encoding import ForgivingJSONEncoder -from dbt_common.events.base_types import BaseEvent, EventLevel, EventMsg -from dbt_common.events.logger import LoggerConfig, LineFormat -from dbt_common.exceptions import scrub_secrets, env_secrets -from dbt_common.events.types import Note from functools import partial import json import os +from pathlib import Path import sys -from typing import Callable, Dict, Optional, TextIO, Union -from google.protobuf.json_format import MessageToDict +from typing import Any, Callable, Dict, Optional, TextIO, Union + +from dbt_common.helper_types import WarnErrorOptions +from dbt_common.invocation import get_invocation_id +from dbt_common.events.base_types import ( + BaseEvent, + EventLevel, + EventMsg, + msg_to_dict as _msg_to_dict, +) +from dbt_common.events.cookie import Cookie +from dbt_common.events.event_manager_client import get_event_manager +from dbt_common.events.logger import LoggerConfig, LineFormat +from dbt_common.events.tracker import FileTracker, SnowplowTracker, Tracker, TrackerConfig +from dbt_common.events.types import Note, SendingEvent, SendEventFailure +from dbt_common.events.user import User +from dbt_common.exceptions import EventCompilationError, scrub_secrets, env_secrets +from dbt_common.utils.encoding import ForgivingJSONEncoder + LOG_VERSION = 3 metadata_vars: Optional[Dict[str, str]] = None @@ -22,6 +29,7 @@ WARN_ERROR_OPTIONS = WarnErrorOptions(include=[], exclude=[]) WARN_ERROR = False + # This global, and the following two functions for capturing stdout logs are # an unpleasant hack we intend to remove as part of API-ification. The GitHub # issue #6350 was opened for that work. @@ -92,26 +100,14 @@ def msg_to_json(msg: EventMsg) -> str: def msg_to_dict(msg: EventMsg) -> dict: - msg_dict = dict() try: - msg_dict = MessageToDict( - msg, - preserving_proto_field_name=True, - including_default_value_fields=True, # type: ignore - ) + return _msg_to_dict(msg) except Exception as exc: event_type = type(msg).__name__ fire_event( Note(msg=f"type {event_type} is not serializable. {str(exc)}"), level=EventLevel.WARN ) - # We don't want an empty NodeInfo in output - if ( - "data" in msg_dict - and "node_info" in msg_dict["data"] - and msg_dict["data"]["node_info"]["node_name"] == "" - ): - del msg_dict["data"]["node_info"] - return msg_dict + return {} def warn_or_error(event, node=None) -> None: @@ -153,3 +149,57 @@ def get_metadata_vars() -> Dict[str, str]: def reset_metadata_vars() -> None: global metadata_vars metadata_vars = None + + +def tracker_factory(config: TrackerConfig) -> Tracker: + if all([config.invocation_id, config.endpoint, config.msg_schemas]): + return SnowplowTracker.from_config(config) + elif all([config.invocation_id, config.name, config.output_file_name]): + return FileTracker.from_config(config) + raise Exception("Invalid tracking configuration.") + + +def enable_tracking(tracker: Tracker, user: User): + cookie = _get_cookie(user) + user.enable_tracking(cookie) + tracker.enable_tracking(cookie) + + +def disable_tracking(tracker: Tracker, user: User): + user.disable_tracking() + tracker.disable_tracking() + + +def _get_cookie(user: User) -> Dict[str, Any]: + if cookie := user.cookie: + return cookie + return _set_cookie(user) + + +def _set_cookie(user: User) -> Dict[str, Any]: + """ + If the user points dbt to a profile directory which exists AND + contains a profiles.yml file, then we can set a cookie. If the + specified folder does not exist, or if there is not a profiles.yml + file in this folder, then an inconsistent cookie can be used. This + will change in every dbt invocation until the user points to a + profile dir file which contains a valid profiles.yml file. + + See: https://github.com/dbt-labs/dbt-core/issues/1645 + """ + if user.profile.exists(): + cookie = Cookie(user.directory) + user.cookie = cookie.as_dict() + return user.cookie + return {} + + +def track(tracker: Tracker, user: User, msg: EventMsg) -> None: + if user.do_not_track: + return + + fire_event(SendingEvent(kwargs=str(**msg_to_dict(msg)))) + try: + tracker.track(msg) + except Exception: + fire_event(SendEventFailure()) diff --git a/dbt_common/events/tracker.py b/dbt_common/events/tracker.py new file mode 100644 index 00000000..a99f3af8 --- /dev/null +++ b/dbt_common/events/tracker.py @@ -0,0 +1,121 @@ +from dataclasses import dataclass +import logging +from logging.handlers import RotatingFileHandler +from typing import Any, Dict, Optional, Protocol, Self + +import snowplow_tracker +from snowplow_tracker.typing import FailureCallback + +from dbt_common.events.base_types import EventMsg, msg_to_dict +from dbt_common.events.format import timestamp_to_datetime_string + + +@dataclass +class TrackerConfig: + invocation_id: Optional[str] = None + msg_schemas: Optional[Dict[str, str]] = None + endpoint: Optional[str] = None + protocol: Optional[str] = "https" + on_failure: Optional[FailureCallback] = None + name: Optional[str] = None + output_file_name: Optional[str] = None + output_file_max_bytes: Optional[int] = 10 * 1024 * 1024 # 10 mb + + +class Tracker(Protocol): + @classmethod + def from_config(cls, config: TrackerConfig) -> Self: + ... + + def track(self, msg: EventMsg) -> None: + ... + + def enable_tracking(self, cookie: Dict[str, Any]) -> None: + ... + + def disable_tracking(self) -> None: + ... + + +class FileTracker(Tracker): + def __init__(self, logger: logging.Logger, invocation_id: Optional[str]) -> None: + self.logger = logger + self.invocation_id = invocation_id + + @classmethod + def from_config(cls, config: TrackerConfig) -> Self: + file_handler = RotatingFileHandler( + filename=config.output_file_name, + maxBytes=config.output_file_max_bytes, # type: ignore + backupCount=5, + encoding="utf8", + ) + file_handler.setFormatter(logging.Formatter(fmt="%(message)s")) + + logger = logging.getLogger(config.name) + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + logger.propagate = False + logger.addHandler(file_handler) + return cls(logger, config.invocation_id) + + def track(self, msg: EventMsg) -> None: + ts: str = timestamp_to_datetime_string(msg.info.ts) + log_line = f"{ts} | {msg.info.msg}" + self.logger.debug(log_line) + + def enable_tracking(self, cookie: Dict[str, Any]) -> None: + pass + + def disable_tracking(self) -> None: + pass + + +class SnowplowTracker(Tracker): + def __init__( + self, + tracker: snowplow_tracker.Tracker, + msg_schemas: Dict[str, str], + invocation_id: Optional[str], + ) -> None: + self.tracker = tracker + self.msg_schemas = msg_schemas + self.invocation_id = invocation_id + + @classmethod + def from_config(cls, config: TrackerConfig) -> Self: + emitter = snowplow_tracker.Emitter( + config.endpoint, + config.protocol, + method="post", + batch_size=30, + on_failure=config.on_failure, + byte_limit=None, + request_timeout=5.0, + ) + tracker = snowplow_tracker.Tracker( + emitters=emitter, + namespace="cf", + app_id="dbt", + ) + return cls(tracker, config.msg_schemas, config.invocation_id) + + def track(self, msg: EventMsg) -> None: + data = msg_to_dict(msg) + schema = self.msg_schemas.get(msg.info.name) + context = [snowplow_tracker.SelfDescribingJson(schema, data)] + event = snowplow_tracker.StructuredEvent( + category="dbt", + action=msg.info.name, + label=self.invocation_id, + context=context, + ) + self.tracker.track(event) + + def enable_tracking(self, cookie: Dict[str, Any]) -> None: + subject = snowplow_tracker.Subject() + subject.set_user_id(cookie.get("id")) + self.tracker.set_subject(subject) + + def disable_tracking(self) -> None: + self.tracker.set_subject(None) diff --git a/dbt_common/events/types.proto b/dbt_common/events/types.proto index 3826a0f5..0ba2aea9 100644 --- a/dbt_common/events/types.proto +++ b/dbt_common/events/types.proto @@ -125,6 +125,34 @@ message FormattingMsg { Formatting data = 2; } +// Z039 +message DisableTracking { +} + +message DisableTrackingMsg { + CoreEventInfo info = 1; + DisableTracking data = 2; +} + +// Z040 +message SendingEvent { + string kwargs = 1; +} + +message SendingEventMsg { + CoreEventInfo info = 1; + SendingEvent data = 2; +} + +// Z041 +message SendEventFailure { +} + +message SendEventFailureMsg { + CoreEventInfo info = 1; + SendEventFailure data = 2; +} + // Z050 message Note { string msg = 1; diff --git a/dbt_common/events/types.py b/dbt_common/events/types.py index e098c0c3..af841726 100644 --- a/dbt_common/events/types.py +++ b/dbt_common/events/types.py @@ -132,6 +132,34 @@ def message(self) -> str: return self.msg +class DisableTracking(DebugLevel): + def code(self) -> str: + return "Z039" + + def message(self) -> str: + return ( + "Error sending anonymous usage statistics. Disabling tracking for this execution. " + "If you wish to permanently disable tracking, see: " + "https://docs.getdbt.com/reference/global-configs#send-anonymous-usage-stats." + ) + + +class SendingEvent(DebugLevel): + def code(self) -> str: + return "Z040" + + def message(self) -> str: + return f"Sending event: {self.kwargs}" + + +class SendEventFailure(DebugLevel): + def code(self) -> str: + return "Z041" + + def message(self) -> str: + return "An error was encountered while trying to send an event" + + class Note(InfoLevel): """Unstructured events. diff --git a/dbt_common/events/user.py b/dbt_common/events/user.py new file mode 100644 index 00000000..c7ee29a5 --- /dev/null +++ b/dbt_common/events/user.py @@ -0,0 +1,37 @@ +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional, Union + +import pytz + +from dbt_common.events.functions import get_invocation_id + + +class User: + def __init__(self, directory: Union[str, Path]) -> None: + self.cookie: Dict[str, Any] = {} + self.directory: Path = Path(directory) + self.invocation_id: str = get_invocation_id() + self.run_started_at: datetime = datetime.now(tz=pytz.utc) + + @property + def id(self) -> Optional[str]: + if self.cookie: + return self.cookie.get("id") + + @property + def do_not_track(self) -> bool: + return self.cookie != {} + + def state(self): + return "do not track" if self.do_not_track else "tracking" + + @property + def profile(self) -> Path: + return Path(self.directory) / "profiles.yml" + + def enable_tracking(self, cookie: Dict[str, Any]): + self.cookie = cookie + + def disable_tracking(self): + self.cookie = {} diff --git a/tests/unit/test_tracker.py b/tests/unit/test_tracker.py new file mode 100644 index 00000000..e69de29b