From 28e5c93255c06f471168c4069bb4119bd1c3e091 Mon Sep 17 00:00:00 2001 From: "Matthew M. Keeler" Date: Mon, 20 Nov 2023 16:42:20 -0500 Subject: [PATCH] feat: Introduce flag change tracker api (#229) The client instance will now provide access to a `flag_tracker`. This tracker allows developers to be notified when a flag configuration changes (or optionally when the /value/ of a flag changes for a particular context). --- ldclient/client.py | 25 +- ldclient/impl/datasource/status.py | 82 ++++- ldclient/impl/dependency_tracker.py | 119 +++++++ ldclient/impl/flag_tracker.py | 50 +++ ldclient/impl/listeners.py | 14 +- ldclient/interfaces.py | 137 ++++++- ldclient/versioned_data_kind.py | 12 +- .../impl/datasource/test_polling_processor.py | 6 +- testing/impl/datasource/test_streaming.py | 6 +- testing/impl/test_data_sink.py | 333 ++++++++++++++++++ testing/impl/test_flag_tracker.py | 80 +++++ testing/test_file_data_source.py | 4 +- 12 files changed, 838 insertions(+), 30 deletions(-) create mode 100644 ldclient/impl/dependency_tracker.py create mode 100644 ldclient/impl/flag_tracker.py create mode 100644 testing/impl/test_data_sink.py create mode 100644 testing/impl/test_flag_tracker.py diff --git a/ldclient/client.py b/ldclient/client.py index 500e3cea..b566dd4b 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -29,10 +29,11 @@ from ldclient.impl.listeners import Listeners from ldclient.impl.stubs import NullEventProcessor, NullUpdateProcessor from ldclient.impl.util import check_uwsgi, log -from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureRequester, FeatureStore +from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureRequester, FeatureStore, FlagTracker from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind from ldclient.feature_store import FeatureStore from ldclient.migrations import Stage, OpTracker +from ldclient.impl.flag_tracker import FlagTrackerImpl from threading import Lock @@ -103,9 +104,13 @@ def __init__(self, config: Config, start_wait: float=5): store = _FeatureStoreClientWrapper(self._config.feature_store) - listeners = Listeners() - self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) - self.__data_source_status_provider = DataSourceStatusProviderImpl(listeners, self._config._data_source_update_sink) + data_source_listeners = Listeners() + flag_change_listeners = Listeners() + + self.__flag_tracker = FlagTrackerImpl(flag_change_listeners, lambda key, context: self.variation(key, context, None)) + + self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, data_source_listeners, flag_change_listeners) + self.__data_source_status_provider = DataSourceStatusProviderImpl(data_source_listeners, self._config._data_source_update_sink) self._store = store # type: FeatureStore big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) @@ -510,5 +515,17 @@ def data_source_status_provider(self) -> DataSourceStatusProvider: """ return self.__data_source_status_provider + @property + def flag_tracker(self) -> FlagTracker: + """ + Returns an interface for tracking changes in feature flag configurations. + + The :class:`ldclient.interfaces.FlagTracker` contains methods for + requesting notifications about feature flag changes using an event + listener model. + """ + return self.__flag_tracker + + __all__ = ['LDClient', 'Config'] diff --git a/ldclient/impl/datasource/status.py b/ldclient/impl/datasource/status.py index 89034202..348fd174 100644 --- a/ldclient/impl/datasource/status.py +++ b/ldclient/impl/datasource/status.py @@ -1,16 +1,21 @@ +from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from ldclient.impl.dependency_tracker import DependencyTracker from ldclient.impl.listeners import Listeners -from ldclient.interfaces import DataSourceStatusProvider, DataSourceUpdateSink, DataSourceStatus, FeatureStore, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind +from ldclient.interfaces import DataSourceStatusProvider, DataSourceUpdateSink, DataSourceStatus, FeatureStore, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind, FlagChange from ldclient.impl.rwlock import ReadWriteLock from ldclient.versioned_data_kind import VersionedDataKind +from ldclient.impl.dependency_tracker import KindAndKey import time -from typing import Callable, Mapping, Optional +from typing import Callable, Mapping, Optional, Set class DataSourceUpdateSinkImpl(DataSourceUpdateSink): - def __init__(self, store: FeatureStore, listeners: Listeners): + def __init__(self, store: FeatureStore, status_listeners: Listeners, flag_change_listeners: Listeners): self.__store = store - self.__listeners = listeners + self.__status_listeners = status_listeners + self.__flag_change_listeners = flag_change_listeners + self.__tracker = DependencyTracker() self.__lock = ReadWriteLock() self.__status = DataSourceStatus( @@ -28,13 +33,38 @@ def status(self) -> DataSourceStatus: self.__lock.runlock() def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): - self.__monitor_store_update(lambda: self.__store.init(all_data)) + old_data = None + + def init_store(): + nonlocal old_data + if self.__flag_change_listeners.has_listeners(): + old_data = {} + for kind in [FEATURES, SEGMENTS]: + old_data[kind] = self.__store.all(kind, lambda x: x) + + self.__store.init(all_data) + + self.__monitor_store_update(init_store) + self.__reset_tracker_with_new_data(all_data) + + if old_data is None: + return + + self.__send_change_events( + self.__compute_changed_items_for_full_data_set(old_data, all_data) + ) def upsert(self, kind: VersionedDataKind, item: dict): self.__monitor_store_update(lambda: self.__store.upsert(kind, item)) + # TODO(sc-212471): We only want to do this if the store successfully + # updates the record. + key = item.get('key', '') + self.__update_dependency_for_single_item(kind, key, item) + def delete(self, kind: VersionedDataKind, key: str, version: int): self.__monitor_store_update(lambda: self.__store.delete(kind, key, version)) + self.__update_dependency_for_single_item(kind, key, None) def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): status_to_broadcast = None @@ -60,7 +90,7 @@ def update_status(self, new_state: DataSourceState, new_error: Optional[DataSour self.__lock.unlock() if status_to_broadcast is not None: - self.__listeners.notify(status_to_broadcast) + self.__status_listeners.notify(status_to_broadcast) def __monitor_store_update(self, fn: Callable[[], None]): try: @@ -75,6 +105,46 @@ def __monitor_store_update(self, fn: Callable[[], None]): self.update_status(DataSourceState.INTERRUPTED, error_info) raise + def __update_dependency_for_single_item(self, kind: VersionedDataKind, key: str, item: Optional[dict]): + self.__tracker.update_dependencies_from(kind, key, item) + if self.__flag_change_listeners.has_listeners(): + affected_items: Set[KindAndKey] = set() + self.__tracker.add_affected_items(affected_items, KindAndKey(kind=kind, key=key)) + self.__send_change_events(affected_items) + + def __reset_tracker_with_new_data(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + self.__tracker.reset() + + for kind, items in all_data.items(): + for key, item in items.items(): + self.__tracker.update_dependencies_from(kind, key, item) + + def __send_change_events(self, affected_items: Set[KindAndKey]): + for item in affected_items: + if item.kind == FEATURES: + self.__flag_change_listeners.notify(FlagChange(item.key)) + + def __compute_changed_items_for_full_data_set(self, old_data: Mapping[VersionedDataKind, Mapping[str, dict]], new_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + affected_items: Set[KindAndKey] = set() + + for kind in [FEATURES, SEGMENTS]: + old_items = old_data.get(kind, {}) + new_items = new_data.get(kind, {}) + + keys: Set[str] = set() + + for key in keys.union(old_items.keys(), new_items.keys()): + old_item = old_items.get(key) + new_item = new_items.get(key) + + if old_item is None and new_item is None: + continue + + if old_item is None or new_item is None or old_item['version'] < new_item['version']: + self.__tracker.add_affected_items(affected_items, KindAndKey(kind=kind, key=key)) + + return affected_items + class DataSourceStatusProviderImpl(DataSourceStatusProvider): def __init__(self, listeners: Listeners, updates_sink: DataSourceUpdateSinkImpl): diff --git a/ldclient/impl/dependency_tracker.py b/ldclient/impl/dependency_tracker.py new file mode 100644 index 00000000..5ec0762a --- /dev/null +++ b/ldclient/impl/dependency_tracker.py @@ -0,0 +1,119 @@ +from ldclient.impl.model.feature_flag import FeatureFlag +from ldclient.impl.model.segment import Segment +from ldclient.impl.model.clause import Clause +from ldclient.versioned_data_kind import VersionedDataKind, SEGMENTS, FEATURES + +from typing import Set, List, Dict, NamedTuple, Union, Optional + + +class KindAndKey(NamedTuple): + kind: VersionedDataKind + key: str + + +class DependencyTracker: + """ + The DependencyTracker is responsible for tracking both up and downstream + dependency relationships. Managing a bi-directional mapping allows us to + more easily perform updates to the tracker, and to determine affected items + when a downstream item is modified. + """ + + def __init__(self): + self.__children: Dict[KindAndKey, Set[KindAndKey]] = {} + self.__parents: Dict[KindAndKey, Set[KindAndKey]] = {} + + def update_dependencies_from(self, from_kind: VersionedDataKind, from_key: str, from_item: Optional[Union[dict, FeatureFlag, Segment]]): + """ + Updates the dependency graph when an item has changed. + + :param from_kind: the changed item's kind + :param from_key: the changed item's key + :param from_item: the changed item + + """ + from_what = KindAndKey(kind=from_kind, key=from_key) + updated_dependencies = DependencyTracker.compute_dependencies_from(from_kind, from_item) + + old_children_set = self.__children.get(from_what) + + if old_children_set is not None: + for kind_and_key in old_children_set: + parents_of_this_old_dep = self.__parents.get(kind_and_key, set()) + if from_what in parents_of_this_old_dep: + parents_of_this_old_dep.remove(from_what) + + self.__children[from_what] = updated_dependencies + for kind_and_key in updated_dependencies: + parents_of_this_new_dep = self.__parents.get(kind_and_key) + if parents_of_this_new_dep is None: + parents_of_this_new_dep = set() + self.__parents[kind_and_key] = parents_of_this_new_dep + + parents_of_this_new_dep.add(from_what) + + def add_affected_items(self, items_out: Set[KindAndKey], initial_modified_item: KindAndKey): + """ + + Populates the given set with the union of the initial item and all items that directly or indirectly + depend on it (based on the current state of the dependency graph). + + @param items_out [Set] + @param initial_modified_item [Object] + + """ + + if initial_modified_item in items_out: + return + + items_out.add(initial_modified_item) + + parents = self.__parents.get(initial_modified_item) + if parents is None: + return + + for parent in parents: + self.add_affected_items(items_out, parent) + + def reset(self): + """ + Clear any tracked dependencies and reset the tracking state to a clean slate. + """ + self.__children.clear() + self.__parents.clear() + + @staticmethod + def compute_dependencies_from(from_kind: VersionedDataKind, from_item: Optional[Union[dict, FeatureFlag, Segment]]) -> Set[KindAndKey]: + """ + @param from_kind [String] + @param from_item [LaunchDarkly::Impl::Model::FeatureFlag, LaunchDarkly::Impl::Model::Segment] + @return [Set] + """ + if from_item is None: + return set() + + from_item = from_kind.decode(from_item) if isinstance(from_item, dict) else from_item + + if from_kind == FEATURES and isinstance(from_item, FeatureFlag): + prereq_keys = [KindAndKey(kind=from_kind, key=p.key) for p in from_item.prerequisites] + segment_keys = [kindAndKey for rule in from_item.rules for kindAndKey in DependencyTracker.segment_keys_from_clauses(rule.clauses)] + + results = set(prereq_keys) + results.update(segment_keys) + + return results + elif from_kind == SEGMENTS and isinstance(from_item, Segment): + kind_and_keys = [key for rule in from_item.rules for key in DependencyTracker.segment_keys_from_clauses(rule.clauses)] + return set(kind_and_keys) + else: + return set() + + @staticmethod + def segment_keys_from_clauses(clauses: List[Clause]) -> List[KindAndKey]: + results = [] + for clause in clauses: + if clause.op == 'segmentMatch': + pairs = [KindAndKey(kind=SEGMENTS, key=value) for value in clause.values] + results.extend(pairs) + + return results diff --git a/ldclient/impl/flag_tracker.py b/ldclient/impl/flag_tracker.py new file mode 100644 index 00000000..35c185c7 --- /dev/null +++ b/ldclient/impl/flag_tracker.py @@ -0,0 +1,50 @@ +from ldclient.interfaces import FlagTracker, FlagChange, FlagValueChange +from ldclient.impl.listeners import Listeners +from ldclient.context import Context +from ldclient.impl.rwlock import ReadWriteLock + +from typing import Callable + + +class FlagValueChangeListener: + def __init__(self, key: str, context: Context, listener: Callable[[FlagValueChange], None], eval_fn: Callable): + self.__key = key + self.__context = context + self.__listener = listener + self.__eval_fn = eval_fn + + self.__lock = ReadWriteLock() + self.__value = eval_fn(key, context) + + def __call__(self, flag_change: FlagChange): + if flag_change.key != self.__key: + return + + new_value = self.__eval_fn(self.__key, self.__context) + + self.__lock.lock() + old_value, self.__value = self.__value, new_value + self.__lock.unlock() + + if new_value == old_value: + return + + self.__listener(FlagValueChange(self.__key, old_value, new_value)) + + +class FlagTrackerImpl(FlagTracker): + def __init__(self, listeners: Listeners, eval_fn: Callable): + self.__listeners = listeners + self.__eval_fn = eval_fn + + def add_listener(self, listener: Callable[[FlagChange], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[FlagChange], None]): + self.__listeners.remove(listener) + + def add_flag_value_change_listener(self, key: str, context: Context, fn: Callable[[FlagValueChange], None]) -> Callable[[FlagChange], None]: + listener = FlagValueChangeListener(key, context, fn, self.__eval_fn) + self.add_listener(listener) + + return listener diff --git a/ldclient/impl/listeners.py b/ldclient/impl/listeners.py index dda5bf52..ef3930cc 100644 --- a/ldclient/impl/listeners.py +++ b/ldclient/impl/listeners.py @@ -3,26 +3,32 @@ from threading import RLock from typing import Any, Callable + class Listeners: """ Simple abstraction for a list of callbacks that can receive a single value. Callbacks are done synchronously on the caller's thread. """ + def __init__(self): self.__listeners = [] self.__lock = RLock() - + + def has_listeners(self) -> bool: + with self.__lock: + return len(self.__listeners) > 0 + def add(self, listener: Callable): with self.__lock: self.__listeners.append(listener) - + def remove(self, listener: Callable): with self.__lock: try: self.__listeners.remove(listener) except ValueError: - pass # removing a listener that wasn't in the list is a no-op - + pass # removing a listener that wasn't in the list is a no-op + def notify(self, value: Any): with self.__lock: listeners_copy = self.__listeners.copy() diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index ba3595d2..f2b7f48d 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -3,6 +3,7 @@ They may be useful in writing new implementations of these components, or for testing. """ +from ldclient.context import Context from abc import ABCMeta, abstractmethod, abstractproperty from .versioned_data_kind import VersionedDataKind @@ -28,7 +29,7 @@ class FeatureStore: __metaclass__ = ABCMeta @abstractmethod - def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]=lambda x: x) -> Any: + def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: """ Retrieves the object to which the specified key is mapped, or None if the key is not found or the associated object has a ``deleted`` property of True. The retrieved object, if any (a @@ -41,7 +42,7 @@ def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]= """ @abstractmethod - def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any]=lambda x: x) -> Any: + def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x) -> Any: """ Retrieves a dictionary of all associated objects of a given kind. The retrieved dict of keys to objects can be transformed by the specified callback. @@ -258,6 +259,7 @@ class BigSegmentStoreMetadata: """ Values returned by :func:`BigSegmentStore.get_metadata()`. """ + def __init__(self, last_up_to_date: Optional[int]): self.__last_up_to_date = last_up_to_date pass @@ -324,6 +326,7 @@ def stop(self): """ pass + class BigSegmentStoreStatus: """ Information about the state of a Big Segment store, provided by :class:`BigSegmentStoreStatusProvider`. @@ -331,6 +334,7 @@ class BigSegmentStoreStatus: Big Segments are a specific type of user segments. For more information, read the LaunchDarkly documentation: https://docs.launchdarkly.com/home/users/big-segments """ + def __init__(self, available: bool, stale: bool): self.__available = available self.__stale = stale @@ -695,3 +699,132 @@ def update_status(self, new_state: DataSourceState, new_error: Optional[DataSour :param new_error: An optional error if the new state is an error condition """ pass + + +class FlagChange: + """ + Change event fired when some aspect of the flag referenced by the key has changed. + """ + + def __init__(self, key: str): + self.__key = key + + @property + def key(self) -> str: + """ + :return: The flag key that was modified by the store. + """ + return self.__key + + +class FlagValueChange: + """ + Change event fired when the evaluated value for the specified flag key has changed. + """ + + def __init__(self, key, old_value, new_value): + self.__key = key + self.__old_value = old_value + self.__new_value = new_value + + @property + def key(self): + """ + :return: The flag key that was modified by the store. + """ + return self.__key + + @property + def old_value(self): + """ + :return: The old evaluation result prior to the flag changing + """ + return self.__old_value + + @property + def new_value(self): + """ + :return: The new evaluation result after to the flag was changed + """ + return self.__new_value + + +class FlagTracker: + """ + An interface for tracking changes in feature flag configurations. + + An implementation of this interface is returned by :class:`ldclient.client.LDClient.flag_tracker`. + Application code never needs to implement this interface. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def add_listener(self, listener: Callable[[FlagChange], None]): + """ + Registers a listener to be notified of feature flag changes in general. + + The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + or to a user segment that is referenced by a feature flag. If the updated flag is used as a prerequisite + for other flags, the SDK assumes that those flags may now behave differently and sends flag change events + for them as well. + + Note that this does not necessarily mean the flag's value has changed for any particular evaluation + context, only that some part of the flag configuration was changed so that it may return a + different value than it previously returned for some context. If you want to track flag value changes, + use :func:`add_flag_value_change_listener` instead. + + It is possible, given current design restrictions, that a listener might be notified when no change has + occurred. This edge case will be addressed in a later version of the SDK. It is important to note this issue + does not affect :func:`add_flag_value_change_listener` listeners. + + If using the file data source, any change in a data file will be treated as a change to every flag. Again, + use :func:`add_flag_value_change_listener` (or just re-evaluate the flag # yourself) if you want to know whether + this is a change that really affects a flag's value. + + Change events only work if the SDK is actually connecting to LaunchDarkly (or using the file data source). + If the SDK is only reading flags from a database then it cannot know when there is a change, because + flags are read on an as-needed basis. + + The listener will be called from a worker thread. + + Calling this method for an already-registered listener has no effect. + + :param listener: listener to call when flag has changed + """ + pass + + @abstractmethod + def remove_listener(self, listener: Callable[[FlagChange], None]): + """ + Unregisters a listener so that it will no longer be notified of feature flag changes. + + Calling this method for a listener that was not previously registered has no effect. + + :param listener: the listener to remove + """ + pass + + @abstractmethod + def add_flag_value_change_listener(self, key: str, context: Context, listener: Callable[[FlagValueChange], None]): + """ + Registers a listener to be notified of a change in a specific feature flag's value for a specific + evaluation context. + + When you call this method, it first immediately evaluates the feature flag. It then uses + :func:`add_listener` to start listening for feature flag configuration + changes, and whenever the specified feature flag changes, it re-evaluates the flag for the same context. + It then calls your listener if and only if the resulting value has changed. + + All feature flag evaluations require an instance of :class:`ldclient.context.Context`. If the feature flag you are + tracking does not have any context targeting rules, you must still pass a dummy context such as + :func:`ldclient.context.Context.create("for-global-flags")`. If you do not want the user to appear on your dashboard, + use the anonymous property which can be set via the context builder. + + The returned listener represents the subscription that was created by this method + call; to unsubscribe, pass that object (not your listener) to :func:`remove_listener`. + + :param key: The flag key to monitor + :param context: The context to evaluate against the flag + :param listener: The listener to trigger if the value has changed + """ + pass diff --git a/ldclient/versioned_data_kind.py b/ldclient/versioned_data_kind.py index 93d3ca83..ac5e10d6 100644 --- a/ldclient/versioned_data_kind.py +++ b/ldclient/versioned_data_kind.py @@ -27,20 +27,20 @@ def __init__(self, namespace: str, request_api_path: str, stream_api_path: str, @property def namespace(self) -> str: return self._namespace - + @property def request_api_path(self) -> str: return self._request_api_path - + @property def stream_api_path(self) -> str: return self._stream_api_path - + def decode(self, data: Any) -> Any: if self._decoder is None or isinstance(data, ModelEntity): return data return self._decoder(data) - + def encode(self, item: Any) -> dict: return item.to_json_dict() if isinstance(item, ModelEntity) else item @@ -51,11 +51,11 @@ def __init__(self, namespace: str, request_api_path: str, stream_api_path: str, super().__init__(namespace, request_api_path, stream_api_path, decoder) self._priority = priority self._get_dependency_keys = get_dependency_keys - + @property def priority(self) -> int: return self._priority - + @property def get_dependency_keys(self) -> Optional[Callable[[dict], Iterable[str]]]: return self._get_dependency_keys diff --git a/testing/impl/datasource/test_polling_processor.py b/testing/impl/datasource/test_polling_processor.py index 24076557..bc9dae80 100644 --- a/testing/impl/datasource/test_polling_processor.py +++ b/testing/impl/datasource/test_polling_processor.py @@ -53,7 +53,7 @@ def test_successful_request_puts_feature_data_in_store(): listeners.add(spy) config = Config("SDK_KEY") - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) setup_processor(config) ready.wait() assert store.get(FEATURES, "flagkey", lambda x: x) == flag @@ -99,7 +99,7 @@ def verify_unrecoverable_http_error(http_status_code, ignore_mock): listeners.add(spy) config = Config("SDK_KEY") - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) mock_requester.exception = UnsuccessfulResponseException(http_status_code) setup_processor(config) @@ -120,7 +120,7 @@ def verify_recoverable_http_error(http_status_code, ignore_mock): listeners.add(spy) config = Config("SDK_KEY") - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) mock_requester.exception = UnsuccessfulResponseException(http_status_code) setup_processor(config) diff --git a/testing/impl/datasource/test_streaming.py b/testing/impl/datasource/test_streaming.py index 1f52d73c..42787228 100644 --- a/testing/impl/datasource/test_streaming.py +++ b/testing/impl/datasource/test_streaming.py @@ -301,7 +301,7 @@ def test_status_includes_http_code(status): listeners = Listeners() listeners.add(spy) - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) server.for_path('/all', two_errors_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: @@ -341,7 +341,7 @@ def listener(s): statuses.append(s) listeners.add(listener) - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) server.for_path('/all', SequentialHandler(invalid_stream, valid_stream)) with StreamingUpdateProcessor(config, store, ready, None) as sp: @@ -369,7 +369,7 @@ def test_failure_transitions_from_valid(): listeners = Listeners() listeners.add(spy) - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) # The sink has special handling for failures before the state is valid. So we manually set this to valid so we # can exercise the other branching logic within the sink. diff --git a/testing/impl/test_data_sink.py b/testing/impl/test_data_sink.py new file mode 100644 index 00000000..458dca06 --- /dev/null +++ b/testing/impl/test_data_sink.py @@ -0,0 +1,333 @@ +import pytest +import mock + +from typing import Dict, Callable + +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.interfaces import DataSourceState, DataSourceErrorKind +from ldclient.impl.listeners import Listeners +from ldclient.versioned_data_kind import FEATURES, SEGMENTS + +from testing.test_util import SpyListener +from testing.builders import FlagBuilder, FlagRuleBuilder, make_clause, SegmentBuilder, SegmentRuleBuilder + + +@pytest.fixture +def basic_data() -> Dict: + flag1 = FlagBuilder('flag1').version(1).on(False).build() + flag2 = FlagBuilder('flag2').version(1).on(False).build() + flag3 = FlagBuilder('flag3').version(1).rules( + FlagRuleBuilder().variation(0).id('rule_id').track_events(True).clauses( + make_clause('user', 'segmentMatch', 'segmentMatch', 'segment2') + ).build() + ).build() + segment1 = SegmentBuilder('segment1').version(1).build() + segment2 = SegmentBuilder('segment2').version(1).build() + + return { + FEATURES: { + flag1.key: flag1.to_json_dict(), + flag2.key: flag2.to_json_dict(), + flag3.key: flag3.to_json_dict(), + }, + SEGMENTS: { + segment1.key: segment1.to_json_dict(), + segment2.key: segment2.to_json_dict(), + }, + } + + +@pytest.fixture +def prereq_data() -> Dict: + flag1 = FlagBuilder('flag1').version(1).on(False).prerequisite('flag2', 0).build() + flag2 = FlagBuilder('flag2').version(1).on(False).prerequisite('flag3', 0).prerequisite('flag4', 0).prerequisite('flag6', 0).build() + flag3 = FlagBuilder('flag3').version(1).on(False).build() + flag4 = FlagBuilder('flag4').version(1).on(False).build() + flag5 = FlagBuilder('flag5').version(1).on(False).build() + flag6 = FlagBuilder('flag6').version(1).rules( + FlagRuleBuilder().variation(0).id('rule_id').track_events(True).clauses( + make_clause('user', 'segmentMatch', 'segmentMatch', 'segment2') + ).build() + ).build() + segment1 = SegmentBuilder('segment1').version(1).build() + segment2 = SegmentBuilder('segment2').version(1).rules( + SegmentRuleBuilder().clauses( + make_clause('user', 'segmentMatch', 'segmentMatch', 'segment1') + ).build() + ).build() + + return { + FEATURES: { + flag1.key: flag1.to_json_dict(), + flag2.key: flag2.to_json_dict(), + flag3.key: flag3.to_json_dict(), + flag4.key: flag4.to_json_dict(), + flag5.key: flag5.to_json_dict(), + flag6.key: flag6.to_json_dict(), + }, + SEGMENTS: { + segment1.key: segment1.to_json_dict(), + segment2.key: segment2.to_json_dict(), + }, + } + + +def test_defaults_to_initializing(): + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), Listeners()) + assert sink.status.state == DataSourceState.INITIALIZING + + +def test_interrupting_initializing_stays_initializing(): + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), Listeners()) + sink.update_status(DataSourceState.INTERRUPTED, None) + assert sink.status.state == DataSourceState.INITIALIZING + assert sink.status.error is None + + +def test_listener_is_only_triggered_for_state_changes(): + spy = SpyListener() + status_listener = Listeners() + status_listener.add(spy) + + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), status_listener, Listeners()) + sink.update_status(DataSourceState.VALID, None) + sink.update_status(DataSourceState.VALID, None) + assert len(spy.statuses) == 1 + + sink.update_status(DataSourceState.INTERRUPTED, None) + sink.update_status(DataSourceState.INTERRUPTED, None) + assert len(spy.statuses) == 2 + + +def test_all_listeners_triggered_for_single_change(): + spy1 = SpyListener() + spy2 = SpyListener() + + status_listener = Listeners() + status_listener.add(spy1) + status_listener.add(spy2) + + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), status_listener, Listeners()) + sink.update_status(DataSourceState.VALID, None) + + assert len(spy1.statuses) == 1 + assert len(spy2.statuses) == 1 + + +def test_is_called_once_per_flag_during_init(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + flag1 = FlagBuilder('flag1').version(2).on(False).build() + flag4 = FlagBuilder('flag4').version(1).on(False).build() + + spy = SpyListener() + flag_change_listener.add(spy) + sink.init({ + FEATURES: { + flag1.key: flag1, + flag4.key: flag4, + } + }) + + assert len(spy.statuses) == 4 + keys = set(s.key for s in spy.statuses) # No guaranteed order + + assert 'flag1' in keys # Version update + assert 'flag2' in keys # Deleted + assert 'flag3' in keys # Deleted + assert 'flag4' in keys # Newly created + + +def test_upsert_triggers_flag_listener(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + spy = SpyListener() + flag_change_listener.add(spy) + sink.upsert(FEATURES, FlagBuilder('flag1').version(2).on(False).build()) + + assert len(spy.statuses) == 1 + assert spy.statuses[0].key == 'flag1' + + +def test_delete_triggers_flag_listener(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + spy = SpyListener() + flag_change_listener.add(spy) + sink.delete(FEATURES, 'flag1', 2) + + # TODO(sc-212471): Once the store starts returning a success status on delete, the flag change + # notification can start ignoring duplicate requests like this. + # sink.delete(FEATURES, 'flag1', 2) + + assert len(spy.statuses) == 1 + assert spy.statuses[0].key == 'flag1' + + +def test_triggers_if_segment_changes(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + spy = SpyListener() + flag_change_listener.add(spy) + sink.upsert(SEGMENTS, SegmentBuilder('segment2').version(2).build()) + + assert len(spy.statuses) == 1 + assert spy.statuses[0].key == 'flag3' + + +def test_dependency_stack_if_top_of_chain_is_changed(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(FEATURES, FlagBuilder('flag4').version(2).on(False).build()) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag4' in keys + + +def test_triggers_when_new_prereqs_added(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(FEATURES, FlagBuilder('flag3').version(2).on(False).prerequisite('flag4', 0).build()) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag3' in keys + + +def test_triggers_when_prereqs_removed(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(FEATURES, FlagBuilder('flag2').version(2).on(False).prerequisite('flag3', 0).build()) + + assert len(spy.statuses) == 2 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + + +def test_triggers_dependency_stack_if_top_of_chain_is_deleted(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.delete(FEATURES, 'flag4', 2) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag4' in keys + + +def test_triggers_dependent_segment_is_modified(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(SEGMENTS, SegmentBuilder('segment1').version(2).build()) + # TODO(sc-212471): Once the store starts returning a success status on upsert, the flag change + # notification can start ignoring duplicate requests like this. + # sink.upsert(SEGMENTS, SegmentBuilder('segment1').version(2).build()) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag6' in keys + + +def test_triggers_if_dependent_segment_removed(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.delete(SEGMENTS, 'segment2', 2) + # TODO(sc-212471): Once the store starts returning a success status on delete, the flag change + # notification can start ignoring duplicate requests like this. + # sink.delete(SEGMENTS, 'segment2', 2) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag6' in keys + + +def confirm_store_error(fn: Callable[[DataSourceUpdateSinkImpl], None], expected_error: str): + status_listeners = Listeners() + + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), status_listeners, Listeners()) + # Make it valid first so the error changes from initializing + sink.update_status(DataSourceState.VALID, None) + + spy = SpyListener() + status_listeners.add(spy) + + try: + fn(sink) + except (Exception,): + pass + + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.INTERRUPTED + assert spy.statuses[0].error.kind == DataSourceErrorKind.STORE_ERROR + assert spy.statuses[0].error.message == expected_error + + +@mock.patch('ldclient.feature_store.InMemoryFeatureStore.init', side_effect=[Exception('cannot init')]) +def test_listener_is_triggered_for_init_error(prereq_data): + confirm_store_error(lambda sink: sink.init(prereq_data), 'cannot init') + + +@mock.patch('ldclient.feature_store.InMemoryFeatureStore.upsert', side_effect=[Exception('cannot upsert')]) +def test_listener_is_triggered_for_upsert_error(prereq_data): + confirm_store_error(lambda sink: sink.upsert(FEATURES, {}), 'cannot upsert') + + +@mock.patch('ldclient.feature_store.InMemoryFeatureStore.delete', side_effect=[Exception('cannot delete')]) +def test_listener_is_triggered_for_delete_error(prereq_data): + confirm_store_error(lambda sink: sink.delete(FEATURES, 'key', 1), 'cannot delete') diff --git a/testing/impl/test_flag_tracker.py b/testing/impl/test_flag_tracker.py new file mode 100644 index 00000000..bcdaba85 --- /dev/null +++ b/testing/impl/test_flag_tracker.py @@ -0,0 +1,80 @@ +from ldclient.impl.flag_tracker import FlagTrackerImpl +from testing.test_util import SpyListener +from ldclient.impl.listeners import Listeners +from ldclient.interfaces import FlagChange + + +def test_can_add_and_remove_listeners(): + spy = SpyListener() + listeners = Listeners() + + tracker = FlagTrackerImpl(listeners, lambda: None) + tracker.add_listener(spy) + + listeners.notify(FlagChange('flag-1')) + listeners.notify(FlagChange('flag-2')) + + tracker.remove_listener(spy) + + listeners.notify(FlagChange('flag-3')) + + assert len(spy.statuses) == 2 + assert spy.statuses[0].key == 'flag-1' + assert spy.statuses[1].key == 'flag-2' + + +def test_flag_change_listener_notified_when_value_changes(): + responses = ['initial', 'second', 'second', 'final'] + + def eval_fn(key, context): + return responses.pop(0) + + listeners = Listeners() + tracker = FlagTrackerImpl(listeners, eval_fn) + + spy = SpyListener() + tracker.add_flag_value_change_listener('flag-key', None, spy) + assert len(spy.statuses) == 0 + + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + # No change was returned here (:second -> :second), so expect no change + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 2 + + assert spy.statuses[0].key == 'flag-key' + assert spy.statuses[0].old_value == 'initial' + assert spy.statuses[0].new_value == 'second' + + assert spy.statuses[1].key == 'flag-key' + assert spy.statuses[1].old_value == 'second' + assert spy.statuses[1].new_value == 'final' + + +def test_flag_change_listener_returns_listener_we_can_unregister(): + responses = ['first', 'second', 'third'] + + def eval_fn(key, context): + return responses.pop(0) + + listeners = Listeners() + tracker = FlagTrackerImpl(listeners, eval_fn) + + spy = SpyListener() + created_listener = tracker.add_flag_value_change_listener('flag-key', None, spy) + assert len(spy.statuses) == 0 + + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + tracker.remove_listener(created_listener) + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + assert spy.statuses[0].key == 'flag-key' + assert spy.statuses[0].old_value == 'first' + assert spy.statuses[0].new_value == 'second' diff --git a/testing/test_file_data_source.py b/testing/test_file_data_source.py index 74789450..432d8bb6 100644 --- a/testing/test_file_data_source.py +++ b/testing/test_file_data_source.py @@ -138,7 +138,7 @@ def test_loads_flags_on_start_from_json(): try: config = Config("SDK_KEY") - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) source = make_data_source(config, paths = path) source.start() assert store.initialized is True @@ -158,7 +158,7 @@ def test_handles_invalid_format_correctly(): try: config = Config("SDK_KEY") - config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners) + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) source = make_data_source(config, paths = path) source.start() assert store.initialized is False