diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index 3a1ec744..00000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,190 +0,0 @@ -version: 2.1 - -orbs: - win: circleci/windows@1.0.0 - -workflows: - test: - jobs: - - test-linux: - name: Python 3.8 - docker-image: cimg/python:3.8 - test-build-docs: true - skip-sse-contract-tests: true - skip-contract-tests: true - - test-linux: - name: Python 3.9 - docker-image: cimg/python:3.9 - - test-linux: - name: Python 3.10 - docker-image: cimg/python:3.10 - - test-linux: - name: Python 3.11 - docker-image: cimg/python:3.11 - - test-linux: - name: Python 3.12 - docker-image: cimg/python:3.12 - - test-windows: - name: Windows Python 3 - py3: true - -jobs: - test-linux: - parameters: - docker-image: - type: string - test-packaging: - type: boolean - default: true - test-build-docs: - type: boolean - default: false - test-with-mypy: - type: boolean - default: true - skip-sse-contract-tests: - type: boolean - default: false - skip-contract-tests: - type: boolean - default: false - docker: - - image: <> - - image: redis - - image: amazon/dynamodb-local - - image: hashicorp/consul - steps: - - checkout - - run: python --version - - run: - name: install requirements - command: | - pip install --upgrade pip - pip install setuptools - pip install -r test-requirements.txt; - pip install -r test-filesource-optional-requirements.txt; - pip install -r consul-requirements.txt; - python setup.py install; - pip freeze - - run: - name: run tests - command: | - mkdir test-reports - pytest -s --cov=ldclient --cov-report=html:build/html --junitxml=test-reports/junit.xml testing -W error::SyntaxWarning - - when: - condition: <> - steps: - - run: - name: test packaging/install - command: | - sudo rm -rf dist *.egg-info - ./test-packaging/test-packaging.sh - - when: - condition: <> - steps: - - run: - name: verify typehints - command: | - export PATH="/home/circleci/.local/bin:$PATH" - make lint - - when: - condition: <> - steps: - - run: - name: verify docs can be built successfully - command: | - make docs - - - unless: - condition: <> - steps: - - run: - name: build SSE contract test service - command: cd sse-contract-tests && make build-test-service - - run: - name: start SSE contract test service - command: cd sse-contract-tests && make start-test-service - background: true - - run: - name: run SSE contract tests - command: cd sse-contract-tests && make run-contract-tests - - - unless: - condition: <> - steps: - - run: make build-contract-tests - - run: - command: make start-contract-test-service - background: true - - run: - name: run contract tests - command: TEST_HARNESS_PARAMS="-junit test-reports/contract-tests-junit.xml" make run-contract-tests - - - store_test_results: - path: test-reports - - store_artifacts: - path: build/html - - - test-windows: - executor: - name: win/vs2019 - shell: powershell.exe - parameters: - py3: - type: boolean - steps: - - checkout - - run: - name: install Python 3 - command: choco install python --no-progress - - run: python --version - - run: - name: set up DynamoDB - command: | - $ProgressPreference = "SilentlyContinue" - iwr -outf dynamo.zip https://s3-us-west-2.amazonaws.com/dynamodb-local/dynamodb_local_latest.zip - mkdir dynamo - Expand-Archive -Path dynamo.zip -DestinationPath dynamo - cd dynamo - javaw -D"java.library.path=./DynamoDBLocal_lib" -jar DynamoDBLocal.jar - background: true - - run: - name: set up Consul - command: | - $ProgressPreference = "SilentlyContinue" - iwr -outf consul.zip https://releases.hashicorp.com/consul/1.4.2/consul_1.4.2_windows_amd64.zip - mkdir consul - Expand-Archive -Path consul.zip -DestinationPath consul - cd consul - sc.exe create "Consul" binPath="$(Get-Location)/consul.exe agent -dev" - sc.exe start "Consul" - - run: - name: start Redis - command: | - $ProgressPreference = "SilentlyContinue" - iwr -outf redis.zip https://github.com/MicrosoftArchive/redis/releases/download/win-3.0.504/Redis-x64-3.0.504.zip - mkdir redis - Expand-Archive -Path redis.zip -DestinationPath redis - cd redis - ./redis-server --service-install - ./redis-server --service-start - Start-Sleep -s 5 - ./redis-cli ping - - run: - name: install requirements - command: | - python --version - pip install setuptools - pip install -r test-requirements.txt - pip install -r consul-requirements.txt - python setup.py install - - run: - name: run tests - command: | - mkdir test-reports - python -m pytest -s --junitxml=test-reports/junit.xml testing; - - store_test_results: - path: test-reports - - store_artifacts: - path: test-reports diff --git a/.github/actions/publish/action.yml b/.github/actions/publish/action.yml new file mode 100644 index 00000000..207670cb --- /dev/null +++ b/.github/actions/publish/action.yml @@ -0,0 +1,34 @@ +name: Publish Package +description: 'Publish the package to PyPI' +inputs: + token: + description: 'Token to use for publishing.' + required: true + dry_run: + description: 'Is this a dry run. If so no package will be published.' + required: true + +runs: + using: composite + steps: + - name: Set up Python 3.11 + uses: actions/setup-python@v4 + with: + python-version: 3.11 + + - name: Install dependencies + shell: bash + run: | + pip install -r requirements.txt + pip install wheel + pip install setuptools + + - name: Building publishable packages + shell: bash + run: python setup.py sdist bdist_wheel + + - name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + if: ${{ inputs.dry_run == 'false' }} + with: + password: ${{inputs.token}} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..91ca0c6a --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,134 @@ +name: Quality control checks + +on: + push: + branches: [ main ] + paths-ignore: + - '**.md' # Do not need to run CI for markdown changes. + pull_request: + branches: [ main ] + paths-ignore: + - '**.md' + +jobs: + linux: + runs-on: ubuntu-latest + + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + + services: + redis: + image: redis + ports: + - 6379:6379 + dynamodb: + image: amazon/dynamodb-local + ports: + - 8000:8000 + consul: + image: hashicorp/consul + ports: + - 8500:8500 + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install requirements + run: | + pipx install virtualenv + pip install setuptools + pip install -r test-requirements.txt + pip install -r test-filesource-optional-requirements.txt + pip install -r consul-requirements.txt + python setup.py install + pip freeze + + - name: Run tests + run: pytest -s testing -W error::SyntaxWarning + + - name: Test packaging + run: | + sudo rm -rf dist *.egg-info + ./test-packaging/test-packaging.sh + + - name: Verify typehints + run: make lint + + - name: Verify docs can be successfully built + run: make docs + + - name: Build contract tests + run: make build-contract-tests + + - name: Start contract test service + run: make start-contract-test-service & + + - name: run contract tests + run: make run-contract-tests + + windows: + runs-on: windows-latest + + defaults: + run: + shell: powershell + + strategy: + matrix: + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup DynamoDB + run: | + $ProgressPreference = "SilentlyContinue" + iwr -outf dynamo.zip https://s3-us-west-2.amazonaws.com/dynamodb-local/dynamodb_local_latest.zip + mkdir dynamo + Expand-Archive -Path dynamo.zip -DestinationPath dynamo + cd dynamo + cmd /c "START /b java -Djava.library.path=./DynamoDBLocal_lib -jar ./DynamoDBLocal.jar" + + - name: Setup Consul + run: | + $ProgressPreference = "SilentlyContinue" + iwr -outf consul.zip https://releases.hashicorp.com/consul/1.4.2/consul_1.4.2_windows_amd64.zip + mkdir consul + Expand-Archive -Path consul.zip -DestinationPath consul + cd consul + sc.exe create "Consul" binPath="$(Get-Location)/consul.exe agent -dev" + sc.exe start "Consul" + + - name: Setup Redis + run: | + $ProgressPreference = "SilentlyContinue" + iwr -outf redis.zip https://github.com/MicrosoftArchive/redis/releases/download/win-3.0.504/Redis-x64-3.0.504.zip + mkdir redis + Expand-Archive -Path redis.zip -DestinationPath redis + cd redis + ./redis-server --service-install + ./redis-server --service-start + Start-Sleep -s 5 + ./redis-cli ping + + - name: Install requirements + run: | + pip install setuptools + pip install -r test-requirements.txt + pip install -r test-filesource-optional-requirements.txt + pip install -r consul-requirements.txt + python setup.py install + pip freeze + + - name: Run tests + run: pytest -s testing -W error::SyntaxWarning diff --git a/.github/workflows/lint-pr-title.yml b/.github/workflows/lint-pr-title.yml new file mode 100644 index 00000000..4ba79c13 --- /dev/null +++ b/.github/workflows/lint-pr-title.yml @@ -0,0 +1,12 @@ +name: Lint PR title + +on: + pull_request_target: + types: + - opened + - edited + - synchronize + +jobs: + lint-pr-title: + uses: launchdarkly/gh-actions/.github/workflows/lint-pr-title.yml@main diff --git a/.github/workflows/manual-publish.yml b/.github/workflows/manual-publish.yml new file mode 100644 index 00000000..edd263ea --- /dev/null +++ b/.github/workflows/manual-publish.yml @@ -0,0 +1,31 @@ +name: Publish Package +on: + workflow_dispatch: + inputs: + dry_run: + description: 'Is this a dry run? If so no package will be published.' + type: boolean + required: true + +jobs: + build-publish: + runs-on: ubuntu-latest + # Needed to get tokens during publishing. + permissions: + id-token: write + contents: read + steps: + - uses: actions/checkout@v4 + + - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.0.0 + name: 'Get PyPI token' + with: + aws_assume_role: ${{ vars.AWS_ROLE_ARN }} + ssm_parameter_pairs: '/production/common/releasing/pypi/token = PYPI_AUTH_TOKEN' + + - id: publish + name: Publish Package + uses: ./.github/actions/publish + with: + token: ${{env.PYPI_AUTH_TOKEN}} + dry_run: ${{ inputs.dry_run }} diff --git a/.github/workflows/release-please.yml b/.github/workflows/release-please.yml new file mode 100644 index 00000000..c36e79c9 --- /dev/null +++ b/.github/workflows/release-please.yml @@ -0,0 +1,40 @@ +name: Run Release Please + +on: + push: + branches: [ main ] + +jobs: + release-package: + runs-on: ubuntu-latest + permissions: + id-token: write # Needed if using OIDC to get release secrets. + contents: write # Contents and pull-requests are for release-please to make releases. + pull-requests: write + steps: + - uses: google-github-actions/release-please-action@v3 + id: release + with: + command: manifest + token: ${{secrets.GITHUB_TOKEN}} + default-branch: main + + - uses: actions/checkout@v4 + if: ${{ steps.release.outputs.releases_created }} + with: + fetch-depth: 0 # If you only need the current version keep this. + + - uses: launchdarkly/gh-actions/actions/release-secrets@release-secrets-v1.0.0 + if: ${{ steps.release.outputs.releases_created }} + name: 'Get PyPI token' + with: + aws_assume_role: ${{ vars.AWS_ROLE_ARN }} + ssm_parameter_pairs: '/production/common/releasing/pypi/token = PYPI_AUTH_TOKEN' + + - id: publish + name: Publish Package + uses: ./.github/actions/publish + if: ${{ steps.release.outputs.releases_created }} + with: + token: ${{env.PYPI_AUTH_TOKEN}} + dry_run: false diff --git a/.ldrelease/build.sh b/.ldrelease/build.sh deleted file mode 100755 index c826c9f1..00000000 --- a/.ldrelease/build.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/bash - -set -ue - -echo "Installing requirements" -pip install -r requirements.txt || { echo "installing requirements.txt failed" >&2; exit 1; } -pip install wheel || { echo "installing wheel failed" >&2; exit 1; } - -echo "Running setup.py sdist bdist_wheel" -python setup.py sdist bdist_wheel || { echo "setup.py sdist bdist_wheel failed" >&2; exit 1; } diff --git a/.ldrelease/config.yml b/.ldrelease/config.yml deleted file mode 100644 index 400a239f..00000000 --- a/.ldrelease/config.yml +++ /dev/null @@ -1,28 +0,0 @@ -version: 2 - -repo: - public: python-server-sdk - private: python-server-sdk-private - -publications: - - url: https://pypi.org/project/launchdarkly-server-sdk/ - description: PyPI - - url: https://launchdarkly-python-sdk.readthedocs.io/en/latest/ - description: documentation (readthedocs.io) - -branches: - - name: main - description: 9.x - - name: 8.x - - name: 7.x - - name: 6.x - -jobs: - - docker: {} - template: - name: python - env: - LD_SKIP_DATABASE_TESTS: 1 - -sdk: - displayName: "Python" diff --git a/.release-please-manifest.json b/.release-please-manifest.json new file mode 100644 index 00000000..256be89a --- /dev/null +++ b/.release-please-manifest.json @@ -0,0 +1,3 @@ +{ + ".": "9.0.1" +} diff --git a/Makefile b/Makefile index e007f6d9..9a9af080 100644 --- a/Makefile +++ b/Makefile @@ -22,8 +22,7 @@ TEMP_TEST_OUTPUT=/tmp/contract-test-service.log # TEST_HARNESS_PARAMS := $(TEST_HARNESS_PARAMS) \ -# port 8000 and 9000 is already used in the CI environment because we're -# running a DynamoDB container and an SSE contract test +# port 8000 is already used in the CI environment because we're running a DynamoDB container PORT=10000 build-contract-tests: diff --git a/README.md b/README.md index 34d36eff..d659e85f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # LaunchDarkly Server-side SDK for Python -[![Circle CI](https://img.shields.io/circleci/project/launchdarkly/python-server-sdk.png)](https://circleci.com/gh/launchdarkly/python-server-sdk) +[![Actions Status](https://github.com/launchdarkly/python-server-sdk/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/launchdarkly/python-server-sdk/actions/workflows/ci.yml) [![readthedocs](https://readthedocs.org/projects/launchdarkly-python-sdk/badge/)](https://launchdarkly-python-sdk.readthedocs.io/en/latest/) [![PyPI](https://img.shields.io/pypi/v/launchdarkly-server-sdk.svg?maxAge=2592000)](https://pypi.python.org/pypi/launchdarkly-server-sdk) diff --git a/docs/requirements.txt b/docs/requirements.txt index 37aab242..108af708 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -2,7 +2,6 @@ sphinx sphinx_rtd_theme sphinx-autodoc-typehints -backoff>=1.4.3 certifi>=2018.4.16 expiringdict>=1.1.4 pyRFC3339>=1.0 diff --git a/ldclient/client.py b/ldclient/client.py index d7a24941..d2767d48 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -2,7 +2,7 @@ This submodule contains the client class that provides most of the SDK functionality. """ -from typing import Optional, Any, Dict, Mapping, Union, Tuple +from typing import Optional, Any, Dict, Mapping, Union, Tuple, Callable from .impl import AnyNum @@ -20,50 +20,143 @@ from ldclient.impl.datasource.feature_requester import FeatureRequesterImpl from ldclient.impl.datasource.polling import PollingUpdateProcessor from ldclient.impl.datasource.streaming import StreamingUpdateProcessor +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl, DataSourceStatusProviderImpl +from ldclient.impl.datastore.status import DataStoreUpdateSinkImpl, DataStoreStatusProviderImpl from ldclient.impl.evaluator import Evaluator, error_reason from ldclient.impl.events.diagnostics import create_diagnostic_id, _DiagnosticAccumulator from ldclient.impl.events.event_processor import DefaultEventProcessor from ldclient.impl.events.types import EventFactory from ldclient.impl.model.feature_flag import FeatureFlag +from ldclient.impl.listeners import Listeners +from ldclient.impl.rwlock import ReadWriteLock from ldclient.impl.stubs import NullEventProcessor, NullUpdateProcessor from ldclient.impl.util import check_uwsgi, log -from ldclient.interfaces import BigSegmentStoreStatusProvider, FeatureRequester, FeatureStore +from ldclient.impl.repeating_task import RepeatingTask +from ldclient.interfaces import BigSegmentStoreStatusProvider, DataSourceStatusProvider, FeatureStore, FlagTracker, DataStoreUpdateSink, DataStoreStatus, DataStoreStatusProvider from ldclient.versioned_data_kind import FEATURES, SEGMENTS, VersionedDataKind -from ldclient.feature_store import FeatureStore from ldclient.migrations import Stage, OpTracker +from ldclient.impl.flag_tracker import FlagTrackerImpl from threading import Lock + class _FeatureStoreClientWrapper(FeatureStore): """Provides additional behavior that the client requires before or after feature store operations. - Currently this just means sorting the data set for init(). In the future we may also use this - to provide an update listener capability. + Currently this just means sorting the data set for init() and dealing with data store status listeners. """ - def __init__(self, store: FeatureStore): + def __init__(self, store: FeatureStore, store_update_sink: DataStoreUpdateSink): self.store = store + self.__store_update_sink = store_update_sink + self.__monitoring_enabled = self.is_monitoring_enabled() + + # Covers the following variables + self.__lock = ReadWriteLock() + self.__last_available = True + self.__poller: Optional[RepeatingTask] = None def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): - return self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data)) + return self.__wrapper(lambda: self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data))) def get(self, kind, key, callback): - return self.store.get(kind, key, callback) + return self.__wrapper(lambda: self.store.get(kind, key, callback)) def all(self, kind, callback): - return self.store.all(kind, callback) + return self.__wrapper(lambda: self.store.all(kind, callback)) def delete(self, kind, key, version): - return self.store.delete(kind, key, version) + return self.__wrapper(lambda: self.store.delete(kind, key, version)) def upsert(self, kind, item): - return self.store.upsert(kind, item) + return self.__wrapper(lambda: self.store.upsert(kind, item)) @property def initialized(self) -> bool: return self.store.initialized + def __wrapper(self, fn: Callable): + try: + return fn() + except BaseException: + if self.__monitoring_enabled: + self.__update_availability(False) + raise + + def __update_availability(self, available: bool): + try: + self.__lock.lock() + if available == self.__last_available: + return + self.__last_available = available + finally: + self.__lock.unlock() + + status = DataStoreStatus(available, False) + + if available: + log.warn("Persistent store is available again") + + self.__store_update_sink.update_status(status) + + if available: + try: + self.__lock.lock() + if self.__poller is not None: + self.__poller.stop() + self.__poller = None + finally: + self.__lock.unlock() + + return + + log.warn("Detected persistent store unavailability; updates will be cached until it recovers") + task = RepeatingTask(0.5, 0, self.__check_availability) + + self.__lock.lock() + self.__poller = task + self.__poller.start() + self.__lock.unlock() + + def __check_availability(self): + try: + if self.store.available: + self.__update_availability(True) + except BaseException as e: + log.error("Unexpected error from data store status function: %s", e) + + def is_monitoring_enabled(self) -> bool: + """ + This methods determines whether the wrapped store can support enabling monitoring. + + The wrapped store must provide a monitoring_enabled method, which must + be true. But this alone is not sufficient. + + Because this class wraps all interactions with a provided store, it can + technically "monitor" any store. However, monitoring also requires that + we notify listeners when the store is available again. + + We determine this by checking the store's `available?` method, so this + is also a requirement for monitoring support. + + These extra checks won't be necessary once `available` becomes a part + of the core interface requirements and this class no longer wraps every + feature store. + """ + + if not hasattr(self.store, 'is_monitoring_enabled'): + return False + + if not hasattr(self.store, 'is_available'): + return False + + monitoring_enabled = getattr(self.store, 'is_monitoring_enabled') + if not callable(monitoring_enabled): + return False + + return monitoring_enabled() + def _get_store_item(store, kind: VersionedDataKind, key: str) -> Any: # This decorator around store.get provides backward compatibility with any custom data @@ -99,7 +192,19 @@ def __init__(self, config: Config, start_wait: float=5): self._event_factory_default = EventFactory(False) self._event_factory_with_reasons = EventFactory(True) - store = _FeatureStoreClientWrapper(self._config.feature_store) + data_store_listeners = Listeners() + store_sink = DataStoreUpdateSinkImpl(data_store_listeners) + store = _FeatureStoreClientWrapper(self._config.feature_store, store_sink) + + self.__data_store_status_provider = DataStoreStatusProviderImpl(store, store_sink) + + data_source_listeners = Listeners() + flag_change_listeners = Listeners() + + self.__flag_tracker = FlagTrackerImpl(flag_change_listeners, lambda key, context: self.variation(key, context, None)) + + self._config._data_source_update_sink = DataSourceUpdateSinkImpl(store, data_source_listeners, flag_change_listeners) + self.__data_source_status_provider = DataSourceStatusProviderImpl(data_source_listeners, self._config._data_source_update_sink) self._store = store # type: FeatureStore big_segment_store_manager = BigSegmentStoreManager(self._config.big_segments) @@ -489,5 +594,47 @@ def big_segment_store_status_provider(self) -> BigSegmentStoreStatusProvider: """ return self.__big_segment_store_manager.status_provider + @property + def data_source_status_provider(self) -> DataSourceStatusProvider: + """ + Returns an interface for tracking the status of the data source. + + The data source is the mechanism that the SDK uses to get feature flag configurations, such + as a streaming connection (the default) or poll requests. The + :class:`ldclient.interfaces.DataSourceStatusProvider` has methods for checking whether the + data source is (as far as the SDK knows) currently operational and tracking changes in this + status. + + :return: The data source status provider + """ + return self.__data_source_status_provider + + @property + def data_store_status_provider(self) -> DataStoreStatusProvider: + """ + Returns an interface for tracking the status of a persistent data store. + + The provider has methods for checking whether the data store is (as far + as the SDK knows) currently operational, tracking changes in this + status, and getting cache statistics. These are only relevant for a + persistent data store; if you are using an in-memory data store, then + this method will return a stub object that provides no information. + + :return: The data store status provider + """ + return self.__data_store_status_provider + + @property + def flag_tracker(self) -> FlagTracker: + """ + Returns an interface for tracking changes in feature flag configurations. + + The :class:`ldclient.interfaces.FlagTracker` contains methods for + requesting notifications about feature flag changes using an event + listener model. + """ + return self.__flag_tracker + + __all__ = ['LDClient', 'Config'] diff --git a/ldclient/config.py b/ldclient/config.py index a84a8419..47d747eb 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -8,7 +8,7 @@ from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.util import log, validate_application_info -from ldclient.interfaces import BigSegmentStore, EventProcessor, FeatureStore, UpdateProcessor +from ldclient.interfaces import BigSegmentStore, EventProcessor, FeatureStore, UpdateProcessor, DataSourceUpdateSink GET_LATEST_FEATURES_PATH = '/sdk/latest-flags' STREAM_FLAGS_PATH = '/flags' @@ -269,6 +269,7 @@ def __init__(self, self.__http = http self.__big_segments = BigSegmentsConfig() if not big_segments else big_segments self.__application = validate_application_info(application or {}, log) + self._data_source_update_sink: Optional[DataSourceUpdateSink] = None def copy_with_new_sdk_key(self, new_sdk_key: str) -> 'Config': """Returns a new ``Config`` instance that is the same as this one, except for having a different SDK key. @@ -440,6 +441,20 @@ def application(self) -> dict: """ return self.__application + @property + def data_source_update_sink(self) -> Optional[DataSourceUpdateSink]: + """ + Returns the component that allows a data source to push data into the SDK. + + This property should only be set by the SDK. Long term access of this + property is not supported; it is temporarily being exposed to maintain + backwards compatibility while the SDK structure is updated. + + Custom data source implementations should integrate with this sink if + they want to provide support for data source status listeners. + """ + return self._data_source_update_sink + def _validate(self): if self.offline is False and self.sdk_key is None or self.sdk_key == '': log.warning("Missing or blank sdk_key.") diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index 401010db..88829724 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -78,6 +78,12 @@ def __init__(self): self._initialized = False self._items = defaultdict(dict) + def is_monitoring_enabled(self) -> bool: + return False + + def is_available(self) -> bool: + return True + def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]=lambda x: x) -> Any: """ """ diff --git a/ldclient/feature_store_helpers.py b/ldclient/feature_store_helpers.py index 2ed911f9..99fb43ca 100644 --- a/ldclient/feature_store_helpers.py +++ b/ldclient/feature_store_helpers.py @@ -32,12 +32,21 @@ def __init__(self, core: FeatureStoreCore, cache_config: CacheConfig): :param cache_config: the caching parameters """ self._core = core + self.__has_available_method = callable(getattr(core, 'is_available', None)) + if cache_config.enabled: self._cache = ExpiringDict(max_len=cache_config.capacity, max_age_seconds=cache_config.expiration) else: self._cache = None self._inited = False + def is_monitoring_enabled(self) -> bool: + return self.__has_available_method + + def is_available(self) -> bool: + # We know is_available exists since we are checking __has_available_method + return self._core.is_available() if self.__has_available_method else False # type: ignore + def init(self, all_encoded_data: Mapping[VersionedDataKind, Mapping[str, Dict[Any, Any]]]): """ """ diff --git a/ldclient/impl/datasource/polling.py b/ldclient/impl/datasource/polling.py index 68f61ebc..b53dcc2c 100644 --- a/ldclient/impl/datasource/polling.py +++ b/ldclient/impl/datasource/polling.py @@ -8,12 +8,16 @@ from ldclient.config import Config from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.util import UnsuccessfulResponseException, http_error_message, is_http_error_recoverable, log -from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor +from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor, DataSourceUpdateSink, DataSourceErrorInfo, DataSourceErrorKind, DataSourceState + +import time +from typing import Optional class PollingUpdateProcessor(UpdateProcessor): def __init__(self, config: Config, requester: FeatureRequester, store: FeatureStore, ready: Event): self._config = config + self._data_source_update_sink: Optional[DataSourceUpdateSink] = config.data_source_update_sink self._requester = requester self._store = store self._ready = ready @@ -27,24 +31,74 @@ def initialized(self): return self._ready.is_set() is True and self._store.initialized is True def stop(self): + self.__stop_with_error_info(None) + + def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]): log.info("Stopping PollingUpdateProcessor") self._task.stop() + if self._data_source_update_sink is None: + return + + self._data_source_update_sink.update_status( + DataSourceState.OFF, + error + ) + + def _sink_or_store(self): + """ + The original implementation of this class relied on the feature store + directly, which we are trying to move away from. Customers who might have + instantiated this directly for some reason wouldn't know they have to set + the config's sink manually, so we have to fall back to the store if the + sink isn't present. + + The next major release should be able to simplify this structure and + remove the need for fall back to the data store because the update sink + should always be present. + """ + if self._data_source_update_sink is None: + return self._store + + return self._data_source_update_sink + def _poll(self): try: all_data = self._requester.get_all_data() - self._store.init(all_data) + self._sink_or_store().init(all_data) if not self._ready.is_set() and self._store.initialized: log.info("PollingUpdateProcessor initialized ok") self._ready.set() + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status(DataSourceState.VALID, None) except UnsuccessfulResponseException as e: + error_info = DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + e.status, + time.time(), + str(e) + ) + http_error_message_result = http_error_message(e.status, "polling request") - if is_http_error_recoverable(e.status): - log.warning(http_error_message_result) - else: + if not is_http_error_recoverable(e.status): log.error(http_error_message_result) - self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited - self.stop() + self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited + self.__stop_with_error_info(error_info) + else: + log.warning(http_error_message_result) + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) except Exception as e: log.exception( 'Error: Exception encountered when updating flags. %s' % e) + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time, str(e)) + ) diff --git a/ldclient/impl/datasource/status.py b/ldclient/impl/datasource/status.py new file mode 100644 index 00000000..011c5aa7 --- /dev/null +++ b/ldclient/impl/datasource/status.py @@ -0,0 +1,162 @@ +from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from ldclient.impl.dependency_tracker import DependencyTracker +from ldclient.impl.listeners import Listeners +from ldclient.interfaces import DataSourceStatusProvider, DataSourceUpdateSink, DataSourceStatus, FeatureStore, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind, FlagChange +from ldclient.impl.rwlock import ReadWriteLock +from ldclient.versioned_data_kind import VersionedDataKind +from ldclient.impl.dependency_tracker import KindAndKey + +import time +from typing import Callable, Mapping, Optional, Set + + +class DataSourceUpdateSinkImpl(DataSourceUpdateSink): + def __init__(self, store: FeatureStore, status_listeners: Listeners, flag_change_listeners: Listeners): + self.__store = store + self.__status_listeners = status_listeners + self.__flag_change_listeners = flag_change_listeners + self.__tracker = DependencyTracker() + + self.__lock = ReadWriteLock() + self.__status = DataSourceStatus( + DataSourceState.INITIALIZING, + time.time(), + None + ) + + @property + def status(self) -> DataSourceStatus: + try: + self.__lock.rlock() + return self.__status + finally: + self.__lock.runlock() + + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + old_data = None + + def init_store(): + nonlocal old_data + if self.__flag_change_listeners.has_listeners(): + old_data = {} + for kind in [FEATURES, SEGMENTS]: + old_data[kind] = self.__store.all(kind, lambda x: x) + + self.__store.init(all_data) + + self.__monitor_store_update(init_store) + self.__reset_tracker_with_new_data(all_data) + + if old_data is None: + return + + self.__send_change_events( + self.__compute_changed_items_for_full_data_set(old_data, all_data) + ) + + def upsert(self, kind: VersionedDataKind, item: dict): + self.__monitor_store_update(lambda: self.__store.upsert(kind, item)) + + # TODO(sc-212471): We only want to do this if the store successfully + # updates the record. + key = item.get('key', '') + self.__update_dependency_for_single_item(kind, key, item) + + def delete(self, kind: VersionedDataKind, key: str, version: int): + self.__monitor_store_update(lambda: self.__store.delete(kind, key, version)) + self.__update_dependency_for_single_item(kind, key, None) + + def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): + status_to_broadcast = None + + try: + self.__lock.lock() + old_status = self.__status + + if new_state == DataSourceState.INTERRUPTED and old_status.state == DataSourceState.INITIALIZING: + new_state = DataSourceState.INITIALIZING + + if new_state == old_status.state and new_error is None: + return + + self.__status = DataSourceStatus( + new_state, + self.__status.since if new_state == self.__status.state else time.time(), + self.__status.error if new_error is None else new_error + ) + + status_to_broadcast = self.__status + finally: + self.__lock.unlock() + + if status_to_broadcast is not None: + self.__status_listeners.notify(status_to_broadcast) + + def __monitor_store_update(self, fn: Callable[[], None]): + try: + fn() + except Exception as e: + error_info = DataSourceErrorInfo( + DataSourceErrorKind.STORE_ERROR, + 0, + time.time(), + str(e) + ) + self.update_status(DataSourceState.INTERRUPTED, error_info) + raise + + def __update_dependency_for_single_item(self, kind: VersionedDataKind, key: str, item: Optional[dict]): + self.__tracker.update_dependencies_from(kind, key, item) + if self.__flag_change_listeners.has_listeners(): + affected_items: Set[KindAndKey] = set() + self.__tracker.add_affected_items(affected_items, KindAndKey(kind=kind, key=key)) + self.__send_change_events(affected_items) + + def __reset_tracker_with_new_data(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + self.__tracker.reset() + + for kind, items in all_data.items(): + for key, item in items.items(): + self.__tracker.update_dependencies_from(kind, key, item) + + def __send_change_events(self, affected_items: Set[KindAndKey]): + for item in affected_items: + if item.kind == FEATURES: + self.__flag_change_listeners.notify(FlagChange(item.key)) + + def __compute_changed_items_for_full_data_set(self, old_data: Mapping[VersionedDataKind, Mapping[str, dict]], new_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + affected_items: Set[KindAndKey] = set() + + for kind in [FEATURES, SEGMENTS]: + old_items = old_data.get(kind, {}) + new_items = new_data.get(kind, {}) + + keys: Set[str] = set() + + for key in keys.union(old_items.keys(), new_items.keys()): + old_item = old_items.get(key) + new_item = new_items.get(key) + + if old_item is None and new_item is None: + continue + + if old_item is None or new_item is None or old_item['version'] < new_item['version']: + self.__tracker.add_affected_items(affected_items, KindAndKey(kind=kind, key=key)) + + return affected_items + + +class DataSourceStatusProviderImpl(DataSourceStatusProvider): + def __init__(self, listeners: Listeners, update_sink: DataSourceUpdateSinkImpl): + self.__listeners = listeners + self.__update_sink = update_sink + + @property + def status(self) -> DataSourceStatus: + return self.__update_sink.status + + def add_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataSourceStatus], None]): + self.__listeners.remove(listener) diff --git a/ldclient/impl/datasource/streaming.py b/ldclient/impl/datasource/streaming.py index 052e7744..ec8debe9 100644 --- a/ldclient/impl/datasource/streaming.py +++ b/ldclient/impl/datasource/streaming.py @@ -1,23 +1,21 @@ -""" -Default implementation of the streaming component. -""" -# currently excluded from documentation - see docs/README.md - from collections import namedtuple - import json from threading import Thread +from typing import Optional -import logging import time +from ldclient.interfaces import DataSourceErrorInfo, DataSourceErrorKind, DataSourceState from ldclient.impl.http import HTTPFactory, _http_factory -from ldclient.impl.retry_delay import RetryDelayStrategy, DefaultBackoffStrategy, DefaultJitterStrategy -from ldclient.impl.sse import SSEClient -from ldclient.impl.util import log, UnsuccessfulResponseException, http_error_message, is_http_error_recoverable +from ldclient.impl.util import http_error_message, is_http_error_recoverable, log from ldclient.interfaces import UpdateProcessor from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from ld_eventsource import SSEClient +from ld_eventsource.actions import Event, Fault +from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy +from ld_eventsource.errors import HTTPStatusError + # allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the # stream will keep this from triggering stream_read_timeout = 5 * 60 @@ -37,94 +35,120 @@ def __init__(self, config, store, ready, diagnostic_accumulator): self.daemon = True self._uri = config.stream_base_uri + STREAM_ALL_PATH self._config = config + self._data_source_update_sink = config.data_source_update_sink self._store = store self._running = False self._ready = ready self._diagnostic_accumulator = diagnostic_accumulator - self._es_started = None - self._retry_delay = RetryDelayStrategy( - config.initial_reconnect_delay, - BACKOFF_RESET_INTERVAL, - DefaultBackoffStrategy(MAX_RETRY_DELAY), - DefaultJitterStrategy(JITTER_RATIO)) - - # We need to suppress the default logging behavior of the backoff package, because - # it logs messages at ERROR level with variable content (the delay time) which will - # prevent monitors from coalescing multiple messages. The backoff package attempts - # to suppress its own output by default by giving the logger a NullHandler, but it - # will still propagate up to the root logger unless we do this: - logging.getLogger('backoff').propagate = False - - # Retry/backoff logic: - # Upon any error establishing the stream connection we retry with backoff + jitter. - # Upon any error processing the results of the stream we reconnect after one second. + self._connection_attempt_start_time = None + def run(self): log.info("Starting StreamingUpdateProcessor connecting to uri: " + self._uri) self._running = True - attempts = 0 - while self._running: - if attempts > 0: - delay = self._retry_delay.next_retry_delay(time.time()) - log.info("Will reconnect after delay of %fs" % delay) - time.sleep(delay) - attempts += 1 - try: - self._es_started = int(time.time() * 1000) - messages = self._connect() - for msg in messages: - if not self._running: - break - self._retry_delay.set_good_since(time.time()) - message_ok = self.process_message(self._store, msg) - if message_ok: - self._record_stream_init(False) - self._es_started = None - if message_ok is True and self._ready.is_set() is False: + self._sse = self._create_sse_client() + self._connection_attempt_start_time = time.time() + for action in self._sse.all: + if isinstance(action, Event): + message_ok = False + try: + message_ok = self._process_message(self._sink_or_store(), action) + except json.decoder.JSONDecodeError as e: + log.info("Error while handling stream event; will restart stream: %s" % e) + self._sse.interrupt() + + self._handle_error(e) + except Exception as e: + log.info("Error while handling stream event; will restart stream: %s" % e) + self._sse.interrupt() + + if self._data_source_update_sink is not None: + error_info = DataSourceErrorInfo( + DataSourceErrorKind.UNKNOWN, + 0, + time.time(), + str(e) + ) + + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) + + if message_ok: + self._record_stream_init(False) + self._connection_attempt_start_time = None + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status(DataSourceState.VALID, None) + + if not self._ready.is_set(): log.info("StreamingUpdateProcessor initialized ok.") self._ready.set() - except UnsuccessfulResponseException as e: - self._record_stream_init(True) - self._es_started = None - - http_error_message_result = http_error_message(e.status, "stream connection") - if is_http_error_recoverable(e.status): - log.warning(http_error_message_result) - else: - log.error(http_error_message_result) - self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited - self.stop() + elif isinstance(action, Fault): + # If the SSE client detects the stream has closed, then it will emit a fault with no-error. We can + # ignore this since we want the connection to continue. + if action.error is None: + continue + + if not self._handle_error(action.error): break - except Exception as e: - log.warning("Unexpected error on stream connection: %s, will retry" % e) - self._record_stream_init(True) - self._es_started = None - # no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals - - def _record_stream_init(self, failed): - if self._diagnostic_accumulator and self._es_started: + self._sse.close() + + def _record_stream_init(self, failed: bool): + if self._diagnostic_accumulator and self._connection_attempt_start_time: current_time = int(time.time() * 1000) - self._diagnostic_accumulator.record_stream_init(current_time, current_time - self._es_started, failed) + elapsed = current_time - int(self._connection_attempt_start_time * 1000) + self._diagnostic_accumulator.record_stream_init(current_time, elapsed if elapsed >= 0 else 0, failed) - def _connect(self): + def _create_sse_client(self) -> SSEClient: # We don't want the stream to use the same read timeout as the rest of the SDK. http_factory = _http_factory(self._config) - stream_http_factory = HTTPFactory(http_factory.base_headers, http_factory.http_config, override_read_timeout=stream_read_timeout) - client = SSEClient( - self._uri, - http_factory = stream_http_factory + stream_http_factory = HTTPFactory(http_factory.base_headers, http_factory.http_config, + override_read_timeout=stream_read_timeout) + return SSEClient( + connect=ConnectStrategy.http( + url=self._uri, + headers=http_factory.base_headers, + pool=stream_http_factory.create_pool_manager(1, self._uri), + urllib3_request_options={"timeout": stream_http_factory.timeout} + ), + error_strategy=ErrorStrategy.always_continue(), # we'll make error-handling decisions when we see a Fault + initial_retry_delay=self._config.initial_reconnect_delay, + retry_delay_strategy=RetryDelayStrategy.default( + max_delay=MAX_RETRY_DELAY, + backoff_multiplier=2, + jitter_multiplier=JITTER_RATIO + ), + retry_delay_reset_threshold=BACKOFF_RESET_INTERVAL, + logger=log ) - return client.events def stop(self): + self.__stop_with_error_info(None) + + def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]): log.info("Stopping StreamingUpdateProcessor") self._running = False + if self._data_source_update_sink is None: + return + + self._data_source_update_sink.update_status( + DataSourceState.OFF, + error + ) + + def _sink_or_store(self): + if self._data_source_update_sink is None: + return self._store + + return self._data_source_update_sink + def initialized(self): return self._running and self._ready.is_set() is True and self._store.initialized is True # Returns True if we initialized the feature store - @staticmethod - def process_message(store, msg): + def _process_message(self, store, msg: Event) -> bool: if msg.event == 'put': all_data = json.loads(msg.data) init_data = { @@ -132,7 +156,7 @@ def process_message(store, msg): SEGMENTS: all_data['data']['segments'] } log.debug("Received put event with %d flags and %d segments", - len(init_data[FEATURES]), len(init_data[SEGMENTS])) + len(init_data[FEATURES]), len(init_data[SEGMENTS])) store.init(init_data) return True elif msg.event == 'patch': @@ -160,8 +184,70 @@ def process_message(store, msg): log.warning('Unhandled event in stream processor: ' + msg.event) return False + # Returns true to continue, false to stop + def _handle_error(self, error: Exception) -> bool: + if not self._running: + return False # don't retry if we've been deliberately stopped + + if isinstance(error, json.decoder.JSONDecodeError): + error_info = DataSourceErrorInfo( + DataSourceErrorKind.INVALID_DATA, + 0, + time.time(), + str(error) + ) + + log.error("Unexpected error on stream connection: %s, will retry" % error) + self._record_stream_init(True) + self._connection_attempt_start_time = None + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) + elif isinstance(error, HTTPStatusError): + self._record_stream_init(True) + self._connection_attempt_start_time = None + + error_info = DataSourceErrorInfo( + DataSourceErrorKind.ERROR_RESPONSE, + error.status, + time.time(), + str(error) + ) + + http_error_message_result = http_error_message(error.status, "stream connection") + if not is_http_error_recoverable(error.status): + log.error(http_error_message_result) + self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited + self.__stop_with_error_info(error_info) + self.stop() + return False + else: + log.warning(http_error_message_result) + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + error_info + ) + else: + log.warning("Unexpected error on stream connection: %s, will retry" % error) + self._record_stream_init(True) + self._connection_attempt_start_time = None + + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time(), str(error)) + ) + # no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals + self._connection_attempt_start_time = time.time() + self._sse.next_retry_delay + return True + @staticmethod - def _parse_path(path): + def _parse_path(path: str): for kind in [FEATURES, SEGMENTS]: if path.startswith(kind.stream_api_path): return ParsedPath(kind = kind, key = path[len(kind.stream_api_path):]) @@ -170,6 +256,6 @@ def _parse_path(path): # magic methods for "with" statement (used in testing) def __enter__(self): return self - + def __exit__(self, type, value, traceback): self.stop() diff --git a/ldclient/impl/datastore/__init__.py b/ldclient/impl/datastore/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ldclient/impl/datastore/status.py b/ldclient/impl/datastore/status.py new file mode 100644 index 00000000..3a698b0f --- /dev/null +++ b/ldclient/impl/datastore/status.py @@ -0,0 +1,56 @@ +from __future__ import annotations +from typing import Callable, TYPE_CHECKING +from copy import copy + +from ldclient.interfaces import DataStoreStatusProvider, DataStoreStatus, DataStoreUpdateSink +from ldclient.impl.listeners import Listeners +from ldclient.impl.rwlock import ReadWriteLock + +if TYPE_CHECKING: + from ldclient.client import _FeatureStoreClientWrapper + + +class DataStoreUpdateSinkImpl(DataStoreUpdateSink): + def __init__(self, listeners: Listeners): + self.__listeners = listeners + + self.__lock = ReadWriteLock() + self.__status = DataStoreStatus(True, False) + + @property + def listeners(self) -> Listeners: + return self.__listeners + + def status(self) -> DataStoreStatus: + self.__lock.rlock() + status = copy(self.__status) + self.__lock.runlock() + + return status + + def update_status(self, status: DataStoreStatus): + self.__lock.lock() + old_value, self.__status = self.__status, status + self.__lock.unlock() + + if old_value != status: + self.__listeners.notify(status) + + +class DataStoreStatusProviderImpl(DataStoreStatusProvider): + def __init__(self, store: _FeatureStoreClientWrapper, update_sink: DataStoreUpdateSinkImpl): + self.__store = store + self.__update_sink = update_sink + + @property + def status(self) -> DataStoreStatus: + return self.__update_sink.status() + + def is_monitoring_enabled(self) -> bool: + return self.__store.is_monitoring_enabled() + + def add_listener(self, listener: Callable[[DataStoreStatus], None]): + self.__update_sink.listeners.add(listener) + + def remove_listener(self, listener: Callable[[DataStoreStatus], None]): + self.__update_sink.listeners.remove(listener) diff --git a/ldclient/impl/dependency_tracker.py b/ldclient/impl/dependency_tracker.py new file mode 100644 index 00000000..5ec0762a --- /dev/null +++ b/ldclient/impl/dependency_tracker.py @@ -0,0 +1,119 @@ +from ldclient.impl.model.feature_flag import FeatureFlag +from ldclient.impl.model.segment import Segment +from ldclient.impl.model.clause import Clause +from ldclient.versioned_data_kind import VersionedDataKind, SEGMENTS, FEATURES + +from typing import Set, List, Dict, NamedTuple, Union, Optional + + +class KindAndKey(NamedTuple): + kind: VersionedDataKind + key: str + + +class DependencyTracker: + """ + The DependencyTracker is responsible for tracking both up and downstream + dependency relationships. Managing a bi-directional mapping allows us to + more easily perform updates to the tracker, and to determine affected items + when a downstream item is modified. + """ + + def __init__(self): + self.__children: Dict[KindAndKey, Set[KindAndKey]] = {} + self.__parents: Dict[KindAndKey, Set[KindAndKey]] = {} + + def update_dependencies_from(self, from_kind: VersionedDataKind, from_key: str, from_item: Optional[Union[dict, FeatureFlag, Segment]]): + """ + Updates the dependency graph when an item has changed. + + :param from_kind: the changed item's kind + :param from_key: the changed item's key + :param from_item: the changed item + + """ + from_what = KindAndKey(kind=from_kind, key=from_key) + updated_dependencies = DependencyTracker.compute_dependencies_from(from_kind, from_item) + + old_children_set = self.__children.get(from_what) + + if old_children_set is not None: + for kind_and_key in old_children_set: + parents_of_this_old_dep = self.__parents.get(kind_and_key, set()) + if from_what in parents_of_this_old_dep: + parents_of_this_old_dep.remove(from_what) + + self.__children[from_what] = updated_dependencies + for kind_and_key in updated_dependencies: + parents_of_this_new_dep = self.__parents.get(kind_and_key) + if parents_of_this_new_dep is None: + parents_of_this_new_dep = set() + self.__parents[kind_and_key] = parents_of_this_new_dep + + parents_of_this_new_dep.add(from_what) + + def add_affected_items(self, items_out: Set[KindAndKey], initial_modified_item: KindAndKey): + """ + + Populates the given set with the union of the initial item and all items that directly or indirectly + depend on it (based on the current state of the dependency graph). + + @param items_out [Set] + @param initial_modified_item [Object] + + """ + + if initial_modified_item in items_out: + return + + items_out.add(initial_modified_item) + + parents = self.__parents.get(initial_modified_item) + if parents is None: + return + + for parent in parents: + self.add_affected_items(items_out, parent) + + def reset(self): + """ + Clear any tracked dependencies and reset the tracking state to a clean slate. + """ + self.__children.clear() + self.__parents.clear() + + @staticmethod + def compute_dependencies_from(from_kind: VersionedDataKind, from_item: Optional[Union[dict, FeatureFlag, Segment]]) -> Set[KindAndKey]: + """ + @param from_kind [String] + @param from_item [LaunchDarkly::Impl::Model::FeatureFlag, LaunchDarkly::Impl::Model::Segment] + @return [Set] + """ + if from_item is None: + return set() + + from_item = from_kind.decode(from_item) if isinstance(from_item, dict) else from_item + + if from_kind == FEATURES and isinstance(from_item, FeatureFlag): + prereq_keys = [KindAndKey(kind=from_kind, key=p.key) for p in from_item.prerequisites] + segment_keys = [kindAndKey for rule in from_item.rules for kindAndKey in DependencyTracker.segment_keys_from_clauses(rule.clauses)] + + results = set(prereq_keys) + results.update(segment_keys) + + return results + elif from_kind == SEGMENTS and isinstance(from_item, Segment): + kind_and_keys = [key for rule in from_item.rules for key in DependencyTracker.segment_keys_from_clauses(rule.clauses)] + return set(kind_and_keys) + else: + return set() + + @staticmethod + def segment_keys_from_clauses(clauses: List[Clause]) -> List[KindAndKey]: + results = [] + for clause in clauses: + if clause.op == 'segmentMatch': + pairs = [KindAndKey(kind=SEGMENTS, key=value) for value in clause.values] + results.extend(pairs) + + return results diff --git a/ldclient/impl/flag_tracker.py b/ldclient/impl/flag_tracker.py new file mode 100644 index 00000000..35c185c7 --- /dev/null +++ b/ldclient/impl/flag_tracker.py @@ -0,0 +1,50 @@ +from ldclient.interfaces import FlagTracker, FlagChange, FlagValueChange +from ldclient.impl.listeners import Listeners +from ldclient.context import Context +from ldclient.impl.rwlock import ReadWriteLock + +from typing import Callable + + +class FlagValueChangeListener: + def __init__(self, key: str, context: Context, listener: Callable[[FlagValueChange], None], eval_fn: Callable): + self.__key = key + self.__context = context + self.__listener = listener + self.__eval_fn = eval_fn + + self.__lock = ReadWriteLock() + self.__value = eval_fn(key, context) + + def __call__(self, flag_change: FlagChange): + if flag_change.key != self.__key: + return + + new_value = self.__eval_fn(self.__key, self.__context) + + self.__lock.lock() + old_value, self.__value = self.__value, new_value + self.__lock.unlock() + + if new_value == old_value: + return + + self.__listener(FlagValueChange(self.__key, old_value, new_value)) + + +class FlagTrackerImpl(FlagTracker): + def __init__(self, listeners: Listeners, eval_fn: Callable): + self.__listeners = listeners + self.__eval_fn = eval_fn + + def add_listener(self, listener: Callable[[FlagChange], None]): + self.__listeners.add(listener) + + def remove_listener(self, listener: Callable[[FlagChange], None]): + self.__listeners.remove(listener) + + def add_flag_value_change_listener(self, key: str, context: Context, fn: Callable[[FlagValueChange], None]) -> Callable[[FlagChange], None]: + listener = FlagValueChangeListener(key, context, fn, self.__eval_fn) + self.add_listener(listener) + + return listener diff --git a/ldclient/impl/integrations/consul/consul_feature_store.py b/ldclient/impl/integrations/consul/consul_feature_store.py index 497828a3..5f6f8130 100644 --- a/ldclient/impl/integrations/consul/consul_feature_store.py +++ b/ldclient/impl/integrations/consul/consul_feature_store.py @@ -45,6 +45,13 @@ def __init__(self, host, port, prefix, consul_opts): self._prefix = ("launchdarkly" if prefix is None else prefix) + "/" self._client = consul.Consul(**opts) + def is_available(self) -> bool: + try: + self._client.kv.get(self._inited_key()) + return True + except BaseException: + return False + def init_internal(self, all_data): # Start by reading the existing keys; we will later delete any of these that weren't in all_data. index, keys = self._client.kv.get(self._prefix, recurse=True, keys=True) diff --git a/ldclient/impl/integrations/dynamodb/dynamodb_feature_store.py b/ldclient/impl/integrations/dynamodb/dynamodb_feature_store.py index 98963e72..3c7a6ba7 100644 --- a/ldclient/impl/integrations/dynamodb/dynamodb_feature_store.py +++ b/ldclient/impl/integrations/dynamodb/dynamodb_feature_store.py @@ -53,6 +53,14 @@ def __init__(self, table_name, prefix, dynamodb_opts): self._prefix = (prefix + ":") if prefix else "" self._client = boto3.client('dynamodb', **dynamodb_opts) + def is_available(self) -> bool: + try: + inited_key = self._inited_key() + self._get_item_by_keys(inited_key, inited_key) + return True + except BaseException: + return False + def init_internal(self, all_data): # Start by reading the existing keys; we will later delete any of these that weren't in all_data. unused_old_keys = self._read_existing_keys(all_data.keys()) diff --git a/ldclient/impl/integrations/files/file_data_source.py b/ldclient/impl/integrations/files/file_data_source.py index 1b292fee..d02d5b28 100644 --- a/ldclient/impl/integrations/files/file_data_source.py +++ b/ldclient/impl/integrations/files/file_data_source.py @@ -1,6 +1,8 @@ import json import os import traceback +import time +from typing import Optional have_yaml = False try: @@ -20,16 +22,19 @@ from ldclient.impl.repeating_task import RepeatingTask from ldclient.impl.util import log -from ldclient.interfaces import UpdateProcessor +from ldclient.interfaces import UpdateProcessor, DataSourceUpdateSink, DataSourceState, DataSourceErrorInfo, DataSourceErrorKind from ldclient.versioned_data_kind import FEATURES, SEGMENTS + def _sanitize_json_item(item): if not ('version' in item): item['version'] = 1 + class _FileDataSource(UpdateProcessor): - def __init__(self, store, ready, paths, auto_update, poll_interval, force_polling): + def __init__(self, store, data_source_update_sink: Optional[DataSourceUpdateSink], ready, paths, auto_update, poll_interval, force_polling): self._store = store + self._data_source_update_sink = data_source_update_sink self._ready = ready self._inited = False self._paths = paths @@ -40,6 +45,23 @@ def __init__(self, store, ready, paths, auto_update, poll_interval, force_pollin self._poll_interval = poll_interval self._force_polling = force_polling + def _sink_or_store(self): + """ + The original implementation of this class relied on the feature store + directly, which we are trying to move away from. Customers who might have + instantiated this directly for some reason wouldn't know they have to set + the config's sink manually, so we have to fall back to the store if the + sink isn't present. + + The next major release should be able to simplify this structure and + remove the need for fall back to the data store because the update sink + should always be present. + """ + if self._data_source_update_sink is None: + return self._store + + return self._data_source_update_sink + def start(self): self._load_all() @@ -65,13 +87,25 @@ def _load_all(self): except Exception as e: log.error('Unable to load flag data from "%s": %s' % (path, repr(e))) traceback.print_exc() + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.INVALID_DATA, 0, time.time, str(e)) + ) return try: - self._store.init(all_data) + self._sink_or_store().init(all_data) self._inited = True + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status(DataSourceState.VALID, None) except Exception as e: log.error('Unable to store data: %s' % repr(e)) traceback.print_exc() + if self._data_source_update_sink is not None: + self._data_source_update_sink.update_status( + DataSourceState.INTERRUPTED, + DataSourceErrorInfo(DataSourceErrorKind.UNKNOWN, 0, time.time, str(e)) + ) def _load_file(self, path, all_data): content = None diff --git a/ldclient/impl/integrations/redis/redis_feature_store.py b/ldclient/impl/integrations/redis/redis_feature_store.py index 7467b239..442e34a3 100644 --- a/ldclient/impl/integrations/redis/redis_feature_store.py +++ b/ldclient/impl/integrations/redis/redis_feature_store.py @@ -24,6 +24,13 @@ def __init__(self, url, prefix, redis_opts: Dict[str, Any]): self.test_update_hook = None # exposed for testing log.info("Started RedisFeatureStore connected to URL: " + redact_password(url) + " using prefix: " + self._prefix) + def is_available(self) -> bool: + try: + self.initialized_internal() + return True + except BaseException: + return False + def _items_key(self, kind): return "{0}:{1}".format(self._prefix, kind.namespace) diff --git a/ldclient/impl/listeners.py b/ldclient/impl/listeners.py index dda5bf52..ef3930cc 100644 --- a/ldclient/impl/listeners.py +++ b/ldclient/impl/listeners.py @@ -3,26 +3,32 @@ from threading import RLock from typing import Any, Callable + class Listeners: """ Simple abstraction for a list of callbacks that can receive a single value. Callbacks are done synchronously on the caller's thread. """ + def __init__(self): self.__listeners = [] self.__lock = RLock() - + + def has_listeners(self) -> bool: + with self.__lock: + return len(self.__listeners) > 0 + def add(self, listener: Callable): with self.__lock: self.__listeners.append(listener) - + def remove(self, listener: Callable): with self.__lock: try: self.__listeners.remove(listener) except ValueError: - pass # removing a listener that wasn't in the list is a no-op - + pass # removing a listener that wasn't in the list is a no-op + def notify(self, value: Any): with self.__lock: listeners_copy = self.__listeners.copy() diff --git a/ldclient/impl/retry_delay.py b/ldclient/impl/retry_delay.py deleted file mode 100644 index f07d8405..00000000 --- a/ldclient/impl/retry_delay.py +++ /dev/null @@ -1,93 +0,0 @@ -from random import Random - -# This implementation is based on the equivalent code in the Go eventsource library. - -class RetryDelayStrategy: - """Encapsulation of configurable backoff/jitter behavior, used for stream connections. - - - The system can either be in a "good" state or a "bad" state. The initial state is "bad"; the - caller is responsible for indicating when it transitions to "good". When we ask for a new retry - delay, that implies the state is now transitioning to "bad". - - - There is a configurable base delay, which can be changed at any time (if the SSE server sends - us a "retry:" directive). - - - There are optional strategies for applying backoff and jitter to the delay. - - This object is meant to be used from a single thread once it's been created; its methods are - not safe for concurrent use. - """ - def __init__(self, base_delay, reset_interval, backoff_strategy, jitter_strategy): - self.__base_delay = base_delay - self.__reset_interval = reset_interval - self.__backoff = backoff_strategy - self.__jitter = jitter_strategy - self.__retry_count = 0 - self.__good_since = None - - def next_retry_delay(self, current_time): - """Computes the next retry interval. This also sets the current state to "bad". - - Note that current_time is passed as a parameter instead of computed by this function to - guarantee predictable behavior in tests. - - :param float current_time: the current time, in seconds - """ - if self.__good_since and self.__reset_interval and (current_time - self.__good_since >= self.__reset_interval): - self.__retry_count = 0 - self.__good_since = None - delay = self.__base_delay - if self.__backoff: - delay = self.__backoff.apply_backoff(delay, self.__retry_count) - self.__retry_count += 1 - if self.__jitter: - delay = self.__jitter.apply_jitter(delay) - return delay - - def set_good_since(self, good_since): - """Marks the current state as "good" and records the time. - - :param float good_since: the time that the state became "good", in seconds - """ - self.__good_since = good_since - - def set_base_delay(self, base_delay): - """Changes the initial retry delay and resets the backoff (if any) so the next retry will use - that value. - - This is used to implement the optional SSE behavior where the server sends a "retry:" command to - set the base retry to a specific value. Note that we will still apply a jitter, if jitter is enabled, - and subsequent retries will still increase exponentially. - """ - self.__base_delay = base_delay - self.__retry_count = 0 - -class DefaultBackoffStrategy: - """The default implementation of exponential backoff, which doubles the delay each time up to - the specified maximum. - - If a reset_interval was specified for the RetryDelayStrategy, and the system has been in a "good" - state for at least that long, the delay is reset back to the base. This avoids perpetually increasing - delays in a situation where failures are rare). - """ - def __init__(self, max_delay): - self.__max_delay = max_delay - - def apply_backoff(self, delay, retry_count): - d = delay * (2 ** retry_count) - return d if d <= self.__max_delay else self.__max_delay - -class DefaultJitterStrategy: - """The default implementation of jitter, which subtracts a pseudo-random amount from each delay. - """ - def __init__(self, ratio, rand_seed = None): - """Creates an instance. - - :param float ratio: a number in the range [0.0, 1.0] representing 0%-100% jitter - :param int rand_seed: if not None, will use this random seed (for test determinacy) - """ - self.__ratio = ratio - self.__random = Random(rand_seed) - - def apply_jitter(self, delay): - return delay - (self.__random.random() * self.__ratio * delay) diff --git a/ldclient/impl/sse.py b/ldclient/impl/sse.py deleted file mode 100644 index 1e37b659..00000000 --- a/ldclient/impl/sse.py +++ /dev/null @@ -1,189 +0,0 @@ -from ldclient.config import HTTPConfig -from ldclient.impl.http import HTTPFactory -from ldclient.impl.util import throw_if_unsuccessful_response - - -class _BufferedLineReader: - """ - Helper class that encapsulates the logic for reading UTF-8 stream data as a series of text lines, - each of which can be terminated by \n, \r, or \r\n. - """ - def lines_from(chunks): - """ - Takes an iterable series of encoded chunks (each of "bytes" type) and parses it into an iterable - series of strings, each of which is one line of text. The line does not include the terminator. - """ - last_char_was_cr = False - partial_line = None - - for chunk in chunks: - if len(chunk) == 0: - continue - - # bytes.splitlines() will correctly break lines at \n, \r, or \r\n, and is faster than - # iterating through the characters in Python code. However, we have to adjust the results - # in several ways as described below. - lines = chunk.splitlines() - if last_char_was_cr: - last_char_was_cr = False - if chunk[0] == 10: - # If the last character we saw was \r, and then the first character in buf is \n, then - # that's just a single \r\n terminator, so we should remove the extra blank line that - # splitlines added for that first \n. - lines.pop(0) - if len(lines) == 0: - continue # ran out of data, continue to get next chunk - if partial_line is not None: - # On our last time through the loop, we ended up with an unterminated line, so we should - # treat our first parsed line here as a continuation of that. - lines[0] = partial_line + lines[0] - partial_line = None - # Check whether the buffer really ended in a terminator. If it did not, then the last line in - # lines is a partial line and should not be emitted yet. - last_char = chunk[len(chunk)-1] - if last_char == 13: - last_char_was_cr = True # remember this in case the next chunk starts with \n - elif last_char != 10: - partial_line = lines.pop() # remove last element which is the partial line - for line in lines: - yield line.decode() - - -class Event: - """ - An event received by SSEClient. - """ - def __init__(self, event='message', data='', last_event_id=None): - self._event = event - self._data = data - self._id = last_event_id - - @property - def event(self): - """ - The event type, or "message" if not specified. - """ - return self._event - - @property - def data(self): - """ - The event data. - """ - return self._data - - @property - def last_event_id(self): - """ - The last non-empty "id" value received from this stream so far. - """ - return self._id - - def dump(self): - lines = [] - if self.id: - lines.append('id: %s' % self.id) - - # Only include an event line if it's not the default already. - if self.event != 'message': - lines.append('event: %s' % self.event) - - lines.extend('data: %s' % d for d in self.data.split('\n')) - return '\n'.join(lines) + '\n\n' - - -class SSEClient: - """ - A simple Server-Sent Events client. - - This implementation does not include automatic retrying of a dropped connection; the caller will do that. - If a connection ends, the events iterator will simply end. - """ - def __init__(self, url, last_id=None, http_factory=None, **kwargs): - self.url = url - self.last_id = last_id - self._chunk_size = 10000 - - if http_factory is None: - http_factory = HTTPFactory({}, HTTPConfig()) - self._timeout = http_factory.timeout - base_headers = http_factory.base_headers - - self.http = http_factory.create_pool_manager(1, url) - - # Any extra kwargs will be fed into the request call later. - self.requests_kwargs = kwargs - - # The SSE spec requires making requests with Cache-Control: nocache - if 'headers' not in self.requests_kwargs: - self.requests_kwargs['headers'] = {} - - self.requests_kwargs['headers'].update(base_headers) - - self.requests_kwargs['headers']['Cache-Control'] = 'no-cache' - - # The 'Accept' header is not required, but explicit > implicit - self.requests_kwargs['headers']['Accept'] = 'text/event-stream' - - self._connect() - - def _connect(self): - if self.last_id: - self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id - - # Use session if set. Otherwise fall back to requests module. - self.resp = self.http.request( - 'GET', - self.url, - timeout=self._timeout, - preload_content=False, - retries=0, # caller is responsible for implementing appropriate retry semantics, e.g. backoff - **self.requests_kwargs) - - # Raw readlines doesn't work because we may be missing newline characters until the next chunk - # For some reason, we also need to specify a chunk size because stream=True doesn't seem to guarantee - # that we get the newlines in a timeline manner - self.resp_file = self.resp.stream(amt=self._chunk_size) - - # TODO: Ensure we're handling redirects. Might also stick the 'origin' - # attribute on Events like the Javascript spec requires. - throw_if_unsuccessful_response(self.resp) - - @property - def events(self): - """ - An iterable series of Event objects received from the stream. - """ - event_type = "" - event_data = None - for line in _BufferedLineReader.lines_from(self.resp_file): - if line == "": - if event_data is not None: - yield Event("message" if event_type == "" else event_type, event_data, self.last_id) - event_type = "" - event_data = None - continue - colon_pos = line.find(':') - if colon_pos < 0: - continue # malformed line - ignore - if colon_pos == 0: - continue # comment - currently we're not surfacing these - name = line[0:colon_pos] - if colon_pos < (len(line) - 1) and line[colon_pos + 1] == ' ': - colon_pos += 1 - value = line[colon_pos+1:] - if name == 'event': - event_type = value - elif name == 'data': - event_data = value if event_data is None else (event_data + "\n" + value) - elif name == 'id': - self.last_id = value - elif name == 'retry': - pass # auto-reconnect is not implemented in this simplified client - # unknown field names are ignored in SSE - - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self.close() diff --git a/ldclient/integrations/__init__.py b/ldclient/integrations/__init__.py index de2b10f8..79735fe7 100644 --- a/ldclient/integrations/__init__.py +++ b/ldclient/integrations/__init__.py @@ -251,4 +251,4 @@ def new_data_source(paths: List[str], :return: an object (actually a lambda) to be stored in the ``update_processor_class`` configuration property """ - return lambda config, store, ready : _FileDataSource(store, ready, paths, auto_update, poll_interval, force_polling) + return lambda config, store, ready : _FileDataSource(store, config.data_source_update_sink, ready, paths, auto_update, poll_interval, force_polling) diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index c3b1f2f7..30180e5a 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -3,10 +3,14 @@ They may be useful in writing new implementations of these components, or for testing. """ +from ldclient.context import Context +from ldclient.impl.listeners import Listeners from abc import ABCMeta, abstractmethod, abstractproperty from .versioned_data_kind import VersionedDataKind from typing import Any, Callable, Mapping, Optional +from enum import Enum + class FeatureStore: """ @@ -26,7 +30,7 @@ class FeatureStore: __metaclass__ = ABCMeta @abstractmethod - def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]=lambda x: x) -> Any: + def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any] = lambda x: x) -> Any: """ Retrieves the object to which the specified key is mapped, or None if the key is not found or the associated object has a ``deleted`` property of True. The retrieved object, if any (a @@ -39,7 +43,7 @@ def get(self, kind: VersionedDataKind, key: str, callback: Callable[[Any], Any]= """ @abstractmethod - def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any]=lambda x: x) -> Any: + def all(self, kind: VersionedDataKind, callback: Callable[[Any], Any] = lambda x: x) -> Any: """ Retrieves a dictionary of all associated objects of a given kind. The retrieved dict of keys to objects can be transformed by the specified callback. @@ -87,6 +91,58 @@ def initialized(self) -> bool: Returns whether the store has been initialized yet or not """ + # WARN: This isn't a required method on a FeatureStore yet. The SDK will + # currently check if the provided store responds to this method, and if + # it does, will take appropriate action based on the documented behavior + # below. This will become required in a future major version release of + # the SDK. + # + # @abstractmethod + # def is_monitoring_enabled(self) -> bool: + # """ + # Returns true if this data store implementation supports status + # monitoring. + # + # This is normally only true for persistent data stores but it could also + # be true for any custom :class:`FeatureStore` implementation. + # + # Returning true means that the store guarantees that if it ever enters + # an invalid state (that is, an operation has failed or it knows that + # operations cannot succeed at the moment), it will publish a status + # update, and will then publish another status update once it has + # returned to a valid state. + # + # Custom implementations must implement :func:`FeatureStore.is_available` + # which synchronously checks if the store is available. Without this + # method, the SDK cannot ensure status updates will occur once the store + # has gone offline. + # + # The same value will be returned from + # :func:`DataStoreStatusProvider.is_monitoring_enabled`. + # """ + + # WARN: This isn't a required method on a FeatureStore. The SDK will + # check if the provided store responds to this method, and if it does, + # will take appropriate action based on the documented behavior below. + # Usage of this method will be dropped in a future version of the SDK. + # + # @abstractmethod + # def is_available(self) -> bool: + # """ + # Tests whether the data store seems to be functioning normally. + # + # This should not be a detailed test of different kinds of operations, + # but just the smallest possible operation to determine whether (for + # instance) we can reach the database. + # + # Whenever one of the store's other methods throws an exception, the SDK + # will assume that it may have become unavailable (e.g. the database + # connection was lost). The SDK will then call is_available at intervals + # until it returns true. + # + # :return: true if the underlying data store is reachable + # """ + class FeatureStoreCore: """ @@ -155,6 +211,28 @@ def initialized_internal(self) -> bool: this value; ``CachingStoreWrapper`` will only call it when necessary. """ + # WARN: This isn't a required method on a FeatureStoreCore. The SDK will + # check if the provided store responds to this method, and if it does, + # will take appropriate action based on the documented behavior below. + # Usage of this method will be dropped in a future version of the SDK. + # + # @abstractmethod + # def is_available(self) -> bool: + # """ + # Tests whether the data store seems to be functioning normally. + # + # This should not be a detailed test of different kinds of operations, + # but just the smallest possible operation to determine whether (for + # instance) we can reach the database. + # + # Whenever one of the store's other methods throws an exception, the SDK + # will assume that it may have become unavailable (e.g. the database + # connection was lost). The SDK will then call is_available at intervals + # until it returns true. + # + # :return: true if the underlying data store is reachable + # """ + # Internal use only. Common methods for components that perform a task in the background. class BackgroundOperation: @@ -256,6 +334,7 @@ class BigSegmentStoreMetadata: """ Values returned by :func:`BigSegmentStore.get_metadata()`. """ + def __init__(self, last_up_to_date: Optional[int]): self.__last_up_to_date = last_up_to_date pass @@ -291,12 +370,12 @@ def get_metadata(self) -> BigSegmentStoreMetadata: def get_membership(self, context_hash: str) -> Optional[dict]: """ Queries the store for a snapshot of the current segment state for a specific context. - + The context_hash is a base64-encoded string produced by hashing the context key as defined by the Big Segments specification; the store implementation does not need to know the details of how this is done, because it deals only with already-hashed keys, but the string can be assumed to only contain characters that are valid in base64. - + The return value should be either a ``dict``, or None if the context is not referenced in any big segments. Each key in the dictionary is a "segment reference", which is how segments are identified in Big Segment data. This string is not identical to the segment key-- the SDK @@ -306,7 +385,7 @@ def get_membership(self, context_hash: str) -> Optional[dict]: explicitly included (that is, if both an include and an exclude existed in the data, the include would take precedence). If the context's status in a particular segment is undefined, there should be no key or value for that segment. - + This dictionary may be cached by the SDK, so it should not be modified after it is created. It is a snapshot of the segment membership state at one point in time. @@ -322,6 +401,7 @@ def stop(self): """ pass + class BigSegmentStoreStatus: """ Information about the state of a Big Segment store, provided by :class:`BigSegmentStoreStatusProvider`. @@ -329,6 +409,7 @@ class BigSegmentStoreStatus: Big Segments are a specific type of user segments. For more information, read the LaunchDarkly documentation: https://docs.launchdarkly.com/home/users/big-segments """ + def __init__(self, available: bool, stale: bool): self.__available = available self.__stale = stale @@ -338,7 +419,7 @@ def available(self) -> bool: """ True if the Big Segment store is able to respond to queries, so that the SDK can evaluate whether a user is in a segment or not. - + If this property is False, the store is not able to make queries (for instance, it may not have a valid database connection). In this case, the SDK will treat any reference to a Big Segment as if no users are included in that segment. Also, the :func:`ldclient.evaluation.EvaluationDetail.reason` @@ -346,7 +427,7 @@ def available(self) -> bool: available will have a ``bigSegmentsStatus`` of ``"STORE_ERROR"``. """ return self.__available - + @property def stale(self) -> bool: """ @@ -365,19 +446,19 @@ def stale(self) -> bool: class BigSegmentStoreStatusProvider: """ An interface for querying the status of a Big Segment store. - + The Big Segment store is the component that receives information about Big Segments, normally from a database populated by the LaunchDarkly Relay Proxy. Big Segments are a specific type of user segments. For more information, read the LaunchDarkly documentation: https://docs.launchdarkly.com/home/users/big-segments - + An implementation of this abstract class is returned by :func:`ldclient.client.LDClient.big_segment_store_status_provider`. Application code never needs to implement this interface. - + There are two ways to interact with the status. One is to simply get the current status; if its ``available`` property is true, then the SDK is able to evaluate user membership in Big Segments, and the ``stale`` property indicates whether the data might be out of date. - + The other way is to subscribe to status change notifications. Applications may wish to know if there is an outage in the Big Segment store, or if it has become stale (the Relay Proxy has stopped updating it with new data), since then flag evaluations that reference a Big Segment @@ -414,3 +495,556 @@ def remove_listener(self, listener: Callable[[BigSegmentStoreStatus], None]) -> this method does nothing """ pass + + +class DataSourceState(Enum): + """ + Enumeration representing the states a data source can be in at any given time. + """ + + INITIALIZING = 'initializing' + """ + The initial state of the data source when the SDK is being initialized. + + If it encounters an error that requires it to retry initialization, the state will remain at + :class:`DataSourceState.INITIALIZING` until it either succeeds and becomes {VALID}, or permanently fails and + becomes {OFF}. + """ + + VALID = 'valid' + """ + Indicates that the data source is currently operational and has not had any problems since the + last time it received data. + + In streaming mode, this means that there is currently an open stream connection and that at least + one initial message has been received on the stream. In polling mode, it means that the last poll + request succeeded. + """ + + INTERRUPTED = 'interrupted' + """ + Indicates that the data source encountered an error that it will attempt to recover from. + + In streaming mode, this means that the stream connection failed, or had to be dropped due to some + other error, and will be retried after a backoff delay. In polling mode, it means that the last poll + request failed, and a new poll request will be made after the configured polling interval. + """ + + OFF = 'off' + """ + Indicates that the data source has been permanently shut down. + + This could be because it encountered an unrecoverable error (for instance, the LaunchDarkly service + rejected the SDK key; an invalid SDK key will never become valid), or because the SDK client was + explicitly shut down. + """ + + +class DataSourceErrorKind(Enum): + """ + Enumeration representing the types of errors a data source can encounter. + """ + + UNKNOWN = 'unknown' + """ + An unexpected error, such as an uncaught exception. + """ + + NETWORK_ERROR = 'network_error' + """ + An I/O error such as a dropped connection. + """ + + ERROR_RESPONSE = 'error_response' + """ + The LaunchDarkly service returned an HTTP response with an error status. + """ + + INVALID_DATA = 'invalid_data' + """ + The SDK received malformed data from the LaunchDarkly service. + """ + + STORE_ERROR = 'store_error' + """ + The data source itself is working, but when it tried to put an update into the data store, the data + store failed (so the SDK may not have the latest data). + + Data source implementations do not need to report this kind of error; it will be automatically + reported by the SDK when exceptions are detected. + """ + + +class DataSourceErrorInfo: + """ + A description of an error condition that the data source encountered. + """ + + def __init__(self, kind: DataSourceErrorKind, status_code: int, time: float, message: Optional[str]): + self.__kind = kind + self.__status_code = status_code + self.__time = time + self.__message = message + + @property + def kind(self) -> DataSourceErrorKind: + """ + :return: The general category of the error + """ + return self.__kind + + @property + def status_code(self) -> int: + """ + :return: An HTTP status or zero. + """ + return self.__status_code + + @property + def time(self) -> float: + """ + :return: Unix timestamp when the error occurred + """ + return self.__time + + @property + def message(self) -> Optional[str]: + """ + :return: Message an error message if applicable, or None + """ + return self.__message + + +class DataSourceStatus: + """ + Information about the data source's status and about the last status change. + """ + + def __init__(self, state: DataSourceState, state_since: float, last_error: Optional[DataSourceErrorInfo]): + self.__state = state + self.__state_since = state_since + self.__last_error = last_error + + @property + def state(self) -> DataSourceState: + """ + :return: The basic state of the data source. + """ + return self.__state + + @property + def since(self) -> float: + """ + :return: Unix timestamp of the last state transition. + """ + return self.__state_since + + @property + def error(self) -> Optional[DataSourceErrorInfo]: + """ + :return: A description of the last error, or None if there are no errors since startup + """ + return self.__last_error + + +class DataSourceStatusProvider: + """ + An interface for querying the status of the SDK's data source. The data source is the component + that receives updates to feature flag data; normally this is a streaming connection, but it + could be polling or file data depending on your configuration. + + An implementation of this interface is returned by + :func:`ldclient.client.LDClient.data_source_status_provider`. Application code never needs to + implement this interface. + """ + __metaclass__ = ABCMeta + + @abstractproperty + def status(self) -> DataSourceStatus: + """ + Returns the current status of the data source. + + All the built-in data source implementations are guaranteed to update this status whenever they + successfully initialize, encounter an error, or recover after an error. + + For a custom data source implementation, it is the responsibility of the data source to push + status updates to the SDK; if it does not do so, the status will always be reported as + :class:`DataSourceState.INITIALIZING`. + + :return: the status + """ + pass + + @abstractmethod + def add_listener(self, listener: Callable[[DataSourceStatus], None]): + """ + Subscribes for notifications of status changes. + + The listener is a function or method that will be called with a single parameter: the + new ``DataSourceStatus``. + + :param listener: the listener to add + """ + pass + + @abstractmethod + def remove_listener(self, listener: Callable[[DataSourceStatus], None]): + """ + Unsubscribes from notifications of status changes. + + :param listener: a listener that was previously added with :func:`add_listener()`; if it was not, + this method does nothing + """ + pass + + +class DataSourceUpdateSink: + """ + Interface that a data source implementation will use to push data into + the SDK. + + The data source interacts with this object, rather than manipulating + the data store directly, so that the SDK can perform any other + necessary operations that must happen when data is updated. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def init(self, all_data: Mapping[VersionedDataKind, Mapping[str, dict]]): + """ + Initializes (or re-initializes) the store with the specified set of entities. Any + existing entries will be removed. Implementations can assume that this data set is up to + date-- there is no need to perform individual version comparisons between the existing + objects and the supplied features. + + If possible, the store should update the entire data set atomically. If that is not possible, + it should iterate through the outer hash and then the inner hash using the existing iteration + order of those hashes (the SDK will ensure that the items were inserted into the hashes in + the correct order), storing each item, and then delete any leftover items at the very end. + + :param all_data: All objects to be stored + """ + pass + + @abstractmethod + def upsert(self, kind: VersionedDataKind, item: dict): + """ + Attempt to add an entity, or update an existing entity with the same key. An update + should only succeed if the new item's version is greater than the old one; + otherwise, the method should do nothing. + + :param kind: The kind of object to update + :param item: The object to update or insert + """ + pass + + @abstractmethod + def delete(self, kind: VersionedDataKind, key: str, version: int): + """ + Attempt to delete an entity if it exists. Deletion should only succeed if the + version parameter is greater than the existing entity's version; otherwise, the + method should do nothing. + + :param kind: The kind of object to delete + :param key: The key of the object to be deleted + :param version: The version for the delete operation + """ + pass + + @abstractmethod + def update_status(self, new_state: DataSourceState, new_error: Optional[DataSourceErrorInfo]): + """ + Informs the SDK of a change in the data source's status. + + Data source implementations should use this method if they have any + concept of being in a valid state, a temporarily disconnected state, + or a permanently stopped state. + + If `new_state` is different from the previous state, and/or + `new_error` is non-null, the SDK will start returning the new status + (adding a timestamp for the change) from :class:`DataSourceStatusProvider.status`, and + will trigger status change events to any registered listeners. + + A special case is that if {new_state} is :class:`DataSourceState.INTERRUPTED`, but the + previous state was :class:`DataSourceState.INITIALIZING`, the state will remain at + :class:`DataSourceState.INITIALIZING` because :class:`DataSourceState.INTERRUPTED` is only meaningful + after a successful startup. + + :param new_state: The updated state of the data source + :param new_error: An optional error if the new state is an error condition + """ + pass + + +class FlagChange: + """ + Change event fired when some aspect of the flag referenced by the key has changed. + """ + + def __init__(self, key: str): + self.__key = key + + @property + def key(self) -> str: + """ + :return: The flag key that was modified by the store. + """ + return self.__key + + +class FlagValueChange: + """ + Change event fired when the evaluated value for the specified flag key has changed. + """ + + def __init__(self, key, old_value, new_value): + self.__key = key + self.__old_value = old_value + self.__new_value = new_value + + @property + def key(self): + """ + :return: The flag key that was modified by the store. + """ + return self.__key + + @property + def old_value(self): + """ + :return: The old evaluation result prior to the flag changing + """ + return self.__old_value + + @property + def new_value(self): + """ + :return: The new evaluation result after to the flag was changed + """ + return self.__new_value + + +class FlagTracker: + """ + An interface for tracking changes in feature flag configurations. + + An implementation of this interface is returned by :class:`ldclient.client.LDClient.flag_tracker`. + Application code never needs to implement this interface. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def add_listener(self, listener: Callable[[FlagChange], None]): + """ + Registers a listener to be notified of feature flag changes in general. + + The listener will be notified whenever the SDK receives any change to any feature flag's configuration, + or to a user segment that is referenced by a feature flag. If the updated flag is used as a prerequisite + for other flags, the SDK assumes that those flags may now behave differently and sends flag change events + for them as well. + + Note that this does not necessarily mean the flag's value has changed for any particular evaluation + context, only that some part of the flag configuration was changed so that it may return a + different value than it previously returned for some context. If you want to track flag value changes, + use :func:`add_flag_value_change_listener` instead. + + It is possible, given current design restrictions, that a listener might be notified when no change has + occurred. This edge case will be addressed in a later version of the SDK. It is important to note this issue + does not affect :func:`add_flag_value_change_listener` listeners. + + If using the file data source, any change in a data file will be treated as a change to every flag. Again, + use :func:`add_flag_value_change_listener` (or just re-evaluate the flag # yourself) if you want to know whether + this is a change that really affects a flag's value. + + Change events only work if the SDK is actually connecting to LaunchDarkly (or using the file data source). + If the SDK is only reading flags from a database then it cannot know when there is a change, because + flags are read on an as-needed basis. + + The listener will be called from a worker thread. + + Calling this method for an already-registered listener has no effect. + + :param listener: listener to call when flag has changed + """ + pass + + @abstractmethod + def remove_listener(self, listener: Callable[[FlagChange], None]): + """ + Unregisters a listener so that it will no longer be notified of feature flag changes. + + Calling this method for a listener that was not previously registered has no effect. + + :param listener: the listener to remove + """ + pass + + @abstractmethod + def add_flag_value_change_listener(self, key: str, context: Context, listener: Callable[[FlagValueChange], None]): + """ + Registers a listener to be notified of a change in a specific feature flag's value for a specific + evaluation context. + + When you call this method, it first immediately evaluates the feature flag. It then uses + :func:`add_listener` to start listening for feature flag configuration + changes, and whenever the specified feature flag changes, it re-evaluates the flag for the same context. + It then calls your listener if and only if the resulting value has changed. + + All feature flag evaluations require an instance of :class:`ldclient.context.Context`. If the feature flag you are + tracking does not have any context targeting rules, you must still pass a dummy context such as + :func:`ldclient.context.Context.create("for-global-flags")`. If you do not want the user to appear on your dashboard, + use the anonymous property which can be set via the context builder. + + The returned listener represents the subscription that was created by this method + call; to unsubscribe, pass that object (not your listener) to :func:`remove_listener`. + + :param key: The flag key to monitor + :param context: The context to evaluate against the flag + :param listener: The listener to trigger if the value has changed + """ + pass + + +class DataStoreStatus: + """ + Information about the data store's status. + """ + __metaclass__ = ABCMeta + + def __init__(self, available: bool, stale: bool): + self.__available = available + self.__stale = stale + + @property + def available(self) -> bool: + """ + Returns true if the SDK believes the data store is now available. + + This property is normally true. If the SDK receives an exception while + trying to query or update the data store, then it sets this property to + false (notifying listeners, if any) and polls the store at intervals + until a query succeeds. Once it succeeds, it sets the property back to + true (again notifying listeners). + + :return: if store is available + """ + return self.__available + + @property + def stale(self) -> bool: + """ + Returns true if the store may be out of date due to a previous + outage, so the SDK should attempt to refresh all feature flag data + and rewrite it to the store. + + This property is not meaningful to application code. + + :return: true if data should be rewritten + """ + + +class DataStoreUpdateSink: + """ + Interface that a data store implementation can use to report information + back to the SDK. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def status(self) -> DataStoreStatus: + """ + Inspect the data store's operational status. + """ + pass + + @abstractmethod + def update_status(self, status: DataStoreStatus): + """ + Reports a change in the data store's operational status. + + This is what makes the status monitoring mechanisms in + :class:`DataStoreStatusProvider` work. + + :param status: the updated status properties + """ + pass + + @abstractproperty + def listeners(self) -> Listeners: + """ + Access the listeners associated with this sink instance. + """ + pass + + +class DataStoreStatusProvider: + """ + An interface for querying the status of a persistent data store. + + An implementation of this interface is returned by :func:`ldclient.client.LDClient.data_store_status_provider`. + Application code should not implement this interface. + """ + __metaclass__ = ABCMeta + + @abstractproperty + def status(self) -> DataStoreStatus: + """ + Returns the current status of the store. + + This is only meaningful for persistent stores, or any custom data store implementation that makes use of + the status reporting mechanism provided by the SDK. For the default in-memory store, the status will always + be reported as "available". + + :return: the latest status + """ + + @abstractmethod + def is_monitoring_enabled(self) -> bool: + """ + Indicates whether the current data store implementation supports status + monitoring. + + This is normally true for all persistent data stores, and false for the + default in-memory store. A true value means that any listeners added + with {#add_listener} can expect to be notified if there is any error in + storing data, and then notified again when the error condition is + resolved. A false value means that the status is not meaningful and + listeners should not expect to be notified. + + :return: true if status monitoring is enabled + """ + + @abstractmethod + def add_listener(self, listener: Callable[[DataStoreStatus], None]): + """ + Subscribes for notifications of status changes. + + Applications may wish to know if there is an outage in a persistent + data store, since that could mean that flag evaluations are unable to + get the flag data from the store (unless it is currently cached) and + therefore might return default values. + + If the SDK receives an exception while trying to query or update the + data store, then it notifies listeners that the store appears to be + offline ({Status#available} is false) and begins polling the store at + intervals until a query succeeds. Once it succeeds, it notifies + listeners again with {Status#available} set to true. + + This method has no effect if the data store implementation does not + support status tracking, such as if you are using the default in-memory + store rather than a persistent store. + + :param listener: the listener to add + """ + + @abstractmethod + def remove_listener(self, listener: Callable[[DataStoreStatus], None]): + """ + Unsubscribes from notifications of status changes. + + This method has no effect if the data store implementation does not + support status tracking, such as if you are using the default in-memory + store rather than a persistent store. + + :param listener: the listener to remove; if no such listener was added, this does nothing + """ diff --git a/ldclient/versioned_data_kind.py b/ldclient/versioned_data_kind.py index 93d3ca83..ac5e10d6 100644 --- a/ldclient/versioned_data_kind.py +++ b/ldclient/versioned_data_kind.py @@ -27,20 +27,20 @@ def __init__(self, namespace: str, request_api_path: str, stream_api_path: str, @property def namespace(self) -> str: return self._namespace - + @property def request_api_path(self) -> str: return self._request_api_path - + @property def stream_api_path(self) -> str: return self._stream_api_path - + def decode(self, data: Any) -> Any: if self._decoder is None or isinstance(data, ModelEntity): return data return self._decoder(data) - + def encode(self, item: Any) -> dict: return item.to_json_dict() if isinstance(item, ModelEntity) else item @@ -51,11 +51,11 @@ def __init__(self, namespace: str, request_api_path: str, stream_api_path: str, super().__init__(namespace, request_api_path, stream_api_path, decoder) self._priority = priority self._get_dependency_keys = get_dependency_keys - + @property def priority(self) -> int: return self._priority - + @property def get_dependency_keys(self) -> Optional[Callable[[dict], Iterable[str]]]: return self._get_dependency_keys diff --git a/release-please-config.json b/release-please-config.json new file mode 100644 index 00000000..90edd09a --- /dev/null +++ b/release-please-config.json @@ -0,0 +1,11 @@ +{ + "packages": { + ".": { + "release-type": "python", + "versioning": "default", + "include-v-in-tag": false, + "extra-files": ["ldclient/version.py"], + "include-component-in-tag": false + } + } +} diff --git a/requirements.txt b/requirements.txt index 796ed901..27a65bcd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ expiringdict>=1.1.4 pyRFC3339>=1.0 semver>=2.10.2 urllib3>=1.22.0,<3 +launchdarkly-eventsource>=1.1.0,<2.0.0 diff --git a/scripts/release.sh b/scripts/release.sh deleted file mode 100755 index d2b24e73..00000000 --- a/scripts/release.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env bash -# This script updates the version for the ldclient library and releases it to PyPi -# It will only work if you have the proper credentials set up in ~/.pypirc - -# It takes exactly one argument: the new version. -# It should be run from the root of this git repo like this: -# ./scripts/release.sh 4.0.9 - -# When done you should commit and push the changes made. - -set -uxe -echo "Starting python-server-sdk release." - -VERSION=$1 - -# Update version in ldclient/version.py - setup.py references this constant -echo "VERSION = \"${VERSION}\"" > ldclient/version.py - -# Prepare distribution -python setup.py sdist - -# Upload with Twine -pip install twine -python -m twine upload dist/* - -echo "Done with python-server-sdk release" diff --git a/sse-contract-tests/Makefile b/sse-contract-tests/Makefile deleted file mode 100644 index 1d4da244..00000000 --- a/sse-contract-tests/Makefile +++ /dev/null @@ -1,27 +0,0 @@ - -TEMP_TEST_OUTPUT=/tmp/sse-contract-test-service.log - -# port 8000 is already used in the CI environment because we're running a DynamoDB container -PORT=9000 - -# we're skipping the "reconnection" test group because the simplified SSE client we're currently using -# does not do automatic retrying of connections - that is done at a higher level in the SDK -EXTRA_TEST_PARAMS=-skip reconnection - -build-test-service: - @pip install -r requirements.txt - -start-test-service: - @python service.py $(PORT) - -start-test-service-bg: - @echo "Test service output will be captured in $(TEMP_TEST_OUTPUT)" - @make start-test-service >$(TEMP_TEST_OUTPUT) 2>&1 & - -run-contract-tests: - @curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/v2.0.0/downloader/run.sh \ - | VERSION=v1 PARAMS="-url http://localhost:$(PORT) -debug -stop-service-at-end $(EXTRA_TEST_PARAMS)" sh - -contract-tests: build-test-service start-test-service-bg run-contract-tests - -.PHONY: build-test-service start-test-service start-test-service-bg run-contract-tests contract-tests diff --git a/sse-contract-tests/README.md b/sse-contract-tests/README.md deleted file mode 100644 index f5892c91..00000000 --- a/sse-contract-tests/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# SSE client contract test service - -This directory contains an implementation of the cross-platform SSE testing protocol defined by https://github.com/launchdarkly/sse-contract-tests. See that project's `README` for details of this protocol, and the kinds of SSE client capabilities that are relevant to the contract tests. This code should not need to be updated unless the SSE client has added or removed such capabilities. - -To run these tests locally, run `make contract-tests`. This downloads the correct version of the test harness tool automatically. diff --git a/sse-contract-tests/requirements.txt b/sse-contract-tests/requirements.txt deleted file mode 100644 index 46a07968..00000000 --- a/sse-contract-tests/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -Flask==2.3.2 -urllib3>=1.22.0,<3 diff --git a/sse-contract-tests/service.py b/sse-contract-tests/service.py deleted file mode 100644 index 389b1a1f..00000000 --- a/sse-contract-tests/service.py +++ /dev/null @@ -1,91 +0,0 @@ -from stream_entity import StreamEntity - -import json -import logging -import os -import sys -import urllib3 -from flask import Flask, request -from flask.logging import default_handler -from logging.config import dictConfig - -default_port = 8000 - -# logging configuration -dictConfig({ - 'version': 1, - 'formatters': { - 'default': { - 'format': '[%(asctime)s] [%(name)s] %(levelname)s: %(message)s', - } - }, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - 'formatter': 'default' - } - }, - 'root': { - 'level': 'INFO', - 'handlers': ['console'] - }, - 'loggers': { - 'werkzeug': { 'level': 'ERROR' } # disable irrelevant Flask app logging - } -}) - -app = Flask(__name__) -app.logger.removeHandler(default_handler) - -stream_counter = 0 -streams = {} -global_log = logging.getLogger('testservice') - -http_client = urllib3.PoolManager() - -@app.route('/', methods=['GET']) -def status(): - body = { - 'capabilities': [ - 'headers', - 'last-event-id' - ] - } - return (json.dumps(body), 200, {'Content-type': 'application/json'}) - -@app.route('/', methods=['DELETE']) -def delete_stop_service(): - print("Test service has told us to exit") - quit() - -@app.route('/', methods=['POST']) -def post_create_stream(): - global stream_counter, streams - - options = json.loads(request.data) - - stream_counter += 1 - stream_id = str(stream_counter) - resource_url = '/streams/%s' % stream_id - - stream = StreamEntity(options) - streams[stream_id] = stream - - return ('', 201, {'Location': resource_url}) - -@app.route('/streams/', methods=['DELETE']) -def delete_stream(id): - global streams - - stream = streams[id] - if stream is None: - return ('', 404) - stream.close() - return ('', 202) - -if __name__ == "__main__": - port = default_port - if sys.argv[len(sys.argv) - 1] != 'service.py': - port = int(sys.argv[len(sys.argv) - 1]) - global_log.info('Listening on port %d', port) - app.run(host='0.0.0.0', port=port) diff --git a/sse-contract-tests/stream_entity.py b/sse-contract-tests/stream_entity.py deleted file mode 100644 index ac5c7d00..00000000 --- a/sse-contract-tests/stream_entity.py +++ /dev/null @@ -1,99 +0,0 @@ -import json -import logging -import os -import sys -import threading -import traceback -import urllib3 - -# Import ldclient from parent directory -sys.path.insert(1, os.path.join(sys.path[0], '..')) -from ldclient.config import HTTPConfig -from ldclient.impl.http import HTTPFactory -from ldclient.impl.sse import SSEClient - -port = 8000 - -stream_counter = 0 -streams = {} - -http_client = urllib3.PoolManager() - -class StreamEntity: - def __init__(self, options): - self.options = options - self.callback_url = options["callbackUrl"] - self.log = logging.getLogger(options["tag"]) - self.closed = False - self.callback_counter = 0 - - thread = threading.Thread(target=self.run) - thread.start() - - def run(self): - stream_url = self.options["streamUrl"] - http_factory = HTTPFactory( - self.options.get("headers", {}), - HTTPConfig(read_timeout = - None if self.options.get("readTimeoutMs") is None else - self.options["readTimeoutMs"] / 1000) - ) - try: - self.log.info('Opening stream from %s', stream_url) - sse = SSEClient( - stream_url, - # Currently this client implementation does not support automatic retry - # retry = - # None if self.options.get("initialDelayMs") is None else - # self.options.get("initialDelayMs") / 1000, - last_id = self.options.get("lastEventId"), - http_factory = http_factory - ) - self.sse = sse - for message in sse.events: - self.log.info('Received event from stream (%s)', message.event) - self.send_message({ - 'kind': 'event', - 'event': { - 'type': message.event, - 'data': message.data, - 'id': message.last_event_id - } - }) - self.send_message({ - 'kind': 'error', - 'error': 'Stream closed' - }) - except Exception as e: - self.log.info('Received error from stream: %s', e) - self.log.info(traceback.format_exc()) - self.send_message({ - 'kind': 'error', - 'error': str(e) - }) - - def send_message(self, message): - global http_client - - if self.closed: - return - self.callback_counter += 1 - callback_url = "%s/%d" % (self.options["callbackUrl"], self.callback_counter) - - try: - resp = http_client.request( - 'POST', - callback_url, - headers = {'Content-Type': 'application/json'}, - body = json.dumps(message) - ) - if resp.status >= 300 and not self.closed: - self.log.error('Callback request returned HTTP error %d', resp.status) - except Exception as e: - if not self.closed: - self.log.error('Callback request failed: %s', e) - - def close(self): - # how to close the stream?? - self.closed = True - self.log.info('Test ended') diff --git a/testing/impl/datasource/test_polling_processor.py b/testing/impl/datasource/test_polling_processor.py index 068d1684..bc9dae80 100644 --- a/testing/impl/datasource/test_polling_processor.py +++ b/testing/impl/datasource/test_polling_processor.py @@ -5,11 +5,15 @@ from ldclient.config import Config from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.datasource.polling import PollingUpdateProcessor +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl +from ldclient.impl.listeners import Listeners from ldclient.impl.util import UnsuccessfulResponseException +from ldclient.interfaces import DataSourceStatus, DataSourceState, DataSourceErrorKind from ldclient.versioned_data_kind import FEATURES, SEGMENTS from testing.builders import * from testing.stub_util import MockFeatureRequester, MockResponse +from testing.test_util import SpyListener pp = None mock_requester = None @@ -43,19 +47,28 @@ def test_successful_request_puts_feature_data_in_store(): "segkey": segment.to_json_dict() } } - setup_processor(Config("SDK_KEY")) + + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + setup_processor(config) ready.wait() assert store.get(FEATURES, "flagkey", lambda x: x) == flag assert store.get(SEGMENTS, "segkey", lambda x: x) == segment assert store.initialized assert pp.initialized() + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.VALID + assert spy.statuses[0].error is None # Note that we have to mock Config.poll_interval because Config won't let you set a value less than 30 seconds @mock.patch('ldclient.config.Config.poll_interval', new_callable=mock.PropertyMock, return_value=0.1) def test_general_connection_error_does_not_cause_immediate_failure(ignore_mock): mock_requester.exception = Exception("bad") - start_time = time.time() setup_processor(Config("SDK_KEY")) ready.wait(0.3) assert not pp.initialized() @@ -80,19 +93,45 @@ def test_http_503_error_does_not_cause_immediate_failure(): verify_recoverable_http_error(503) @mock.patch('ldclient.config.Config.poll_interval', new_callable=mock.PropertyMock, return_value=0.1) -def verify_unrecoverable_http_error(status, ignore_mock): - mock_requester.exception = UnsuccessfulResponseException(status) - setup_processor(Config("SDK_KEY")) +def verify_unrecoverable_http_error(http_status_code, ignore_mock): + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + + mock_requester.exception = UnsuccessfulResponseException(http_status_code) + setup_processor(config) finished = ready.wait(0.5) assert finished assert not pp.initialized() assert mock_requester.request_count == 1 + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.OFF + assert spy.statuses[0].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[0].error.status_code == http_status_code + @mock.patch('ldclient.config.Config.poll_interval', new_callable=mock.PropertyMock, return_value=0.1) -def verify_recoverable_http_error(status, ignore_mock): - mock_requester.exception = UnsuccessfulResponseException(status) - setup_processor(Config("SDK_KEY")) +def verify_recoverable_http_error(http_status_code, ignore_mock): + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + + mock_requester.exception = UnsuccessfulResponseException(http_status_code) + setup_processor(config) finished = ready.wait(0.5) assert not finished assert not pp.initialized() assert mock_requester.request_count >= 2 + + assert len(spy.statuses) > 1 + + for status in spy.statuses: + assert status.state == DataSourceState.INITIALIZING + assert status.error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert status.error.status_code == http_status_code diff --git a/testing/impl/datasource/test_streaming.py b/testing/impl/datasource/test_streaming.py index b017b9a8..a3cfe60f 100644 --- a/testing/impl/datasource/test_streaming.py +++ b/testing/impl/datasource/test_streaming.py @@ -1,19 +1,23 @@ -import json import pytest from threading import Event +from typing import List import time from ldclient.config import Config from ldclient.feature_store import InMemoryFeatureStore from ldclient.impl.datasource.streaming import StreamingUpdateProcessor from ldclient.impl.events.diagnostics import _DiagnosticAccumulator +from ldclient.impl.listeners import Listeners from ldclient.version import VERSION from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from ldclient.interfaces import DataSourceStatus, DataSourceState, DataSourceErrorKind +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl from testing.builders import * from testing.http_util import start_server, BasicResponse, CauseNetworkError, SequentialHandler from testing.proxy_test_util import do_proxy_tests -from testing.stub_util import make_delete_event, make_patch_event, make_put_event, stream_content +from testing.stub_util import make_delete_event, make_patch_event, make_put_event, make_invalid_put_event, stream_content +from testing.test_util import SpyListener brief_delay = 0.001 @@ -21,7 +25,7 @@ # the test server running at localhost tests are *extremely* slow. It looks like a similar issue to what's # described at https://stackoverflow.com/questions/2617615/slow-python-http-server-on-localhost but we had no # luck with the advice that was given there. -start_wait = 5 +start_wait = 10 update_wait = 3 def test_request_properties(): @@ -189,7 +193,7 @@ def test_retries_on_network_error(): server.for_path('/all', two_errors_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: - sp.start() + sp.start() ready.wait(start_wait) assert sp.initialized() server.await_request @@ -207,7 +211,7 @@ def test_recoverable_http_error(status): server.for_path('/all', two_errors_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: - sp.start() + sp.start() ready.wait(start_wait) assert sp.initialized() server.should_have_requests(3) @@ -224,7 +228,7 @@ def test_unrecoverable_http_error(status): server.for_path('/all', error_then_success) with StreamingUpdateProcessor(config, store, ready, None) as sp: - sp.start() + sp.start() ready.wait(5) assert not sp.initialized() server.should_have_requests(1) @@ -283,6 +287,108 @@ def test_records_diagnostic_on_stream_init_failure(): assert len(recorded_inits) == 2 assert recorded_inits[0]['failed'] is True assert recorded_inits[1]['failed'] is False +@pytest.mark.parametrize("status", [ 400, 408, 429, 500, 503 ]) +def test_status_includes_http_code(status): + error_handler = BasicResponse(status) + store = InMemoryFeatureStore() + ready = Event() + with start_server() as server: + with stream_content(make_put_event()) as stream: + two_errors_then_success = SequentialHandler(error_handler, error_handler, stream) + config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) + + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + server.for_path('/all', two_errors_then_success) + + with StreamingUpdateProcessor(config, store, ready, None) as sp: + sp.start() + ready.wait(start_wait) + assert sp.initialized() + server.should_have_requests(3) + + assert len(spy.statuses) == 3 + + assert spy.statuses[0].state == DataSourceState.INITIALIZING + assert spy.statuses[0].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[0].error.status_code == status + + assert spy.statuses[1].state == DataSourceState.INITIALIZING + assert spy.statuses[1].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[1].error.status_code == status + + assert spy.statuses[2].state == DataSourceState.VALID + assert spy.statuses[2].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[2].error.status_code == status + + +def test_invalid_json_triggers_listener(): + store = InMemoryFeatureStore() + ready = Event() + with start_server() as server: + with stream_content(make_put_event()) as valid_stream, stream_content(make_invalid_put_event()) as invalid_stream: + config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) + + statuses: List[DataSourceStatus] = [] + listeners = Listeners() + + def listener(s): + if len(statuses) == 0: + invalid_stream.close() + statuses.append(s) + listeners.add(listener) + + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + server.for_path('/all', SequentialHandler(invalid_stream, valid_stream)) + + with StreamingUpdateProcessor(config, store, ready, None) as sp: + sp.start() + ready.wait(start_wait) + assert sp.initialized() + server.should_have_requests(2) + + assert len(statuses) == 2 + + assert statuses[0].state == DataSourceState.INITIALIZING + assert statuses[0].error.kind == DataSourceErrorKind.INVALID_DATA + assert statuses[0].error.status_code == 0 + + assert statuses[1].state == DataSourceState.VALID + +def test_failure_transitions_from_valid(): + store = InMemoryFeatureStore() + ready = Event() + error_handler = BasicResponse(401) + with start_server() as server: + config = Config(sdk_key = 'sdk-key', stream_uri = server.uri, initial_reconnect_delay = brief_delay) + + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + + # The sink has special handling for failures before the state is valid. So we manually set this to valid so we + # can exercise the other branching logic within the sink. + config.data_source_update_sink.update_status(DataSourceState.VALID, None) + server.for_path('/all', error_handler) + + with StreamingUpdateProcessor(config, store, ready, None) as sp: + sp.start() + ready.wait(start_wait) + server.should_have_requests(1) + + assert len(spy.statuses) == 2 + + assert spy.statuses[0].state == DataSourceState.VALID + + assert spy.statuses[1].state == DataSourceState.OFF + assert spy.statuses[1].error.kind == DataSourceErrorKind.ERROR_RESPONSE + assert spy.statuses[1].error.status_code == 401 + def expect_item(store, kind, item): assert store.get(kind, item['key'], lambda x: x) == item diff --git a/testing/impl/test_data_sink.py b/testing/impl/test_data_sink.py new file mode 100644 index 00000000..458dca06 --- /dev/null +++ b/testing/impl/test_data_sink.py @@ -0,0 +1,333 @@ +import pytest +import mock + +from typing import Dict, Callable + +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.interfaces import DataSourceState, DataSourceErrorKind +from ldclient.impl.listeners import Listeners +from ldclient.versioned_data_kind import FEATURES, SEGMENTS + +from testing.test_util import SpyListener +from testing.builders import FlagBuilder, FlagRuleBuilder, make_clause, SegmentBuilder, SegmentRuleBuilder + + +@pytest.fixture +def basic_data() -> Dict: + flag1 = FlagBuilder('flag1').version(1).on(False).build() + flag2 = FlagBuilder('flag2').version(1).on(False).build() + flag3 = FlagBuilder('flag3').version(1).rules( + FlagRuleBuilder().variation(0).id('rule_id').track_events(True).clauses( + make_clause('user', 'segmentMatch', 'segmentMatch', 'segment2') + ).build() + ).build() + segment1 = SegmentBuilder('segment1').version(1).build() + segment2 = SegmentBuilder('segment2').version(1).build() + + return { + FEATURES: { + flag1.key: flag1.to_json_dict(), + flag2.key: flag2.to_json_dict(), + flag3.key: flag3.to_json_dict(), + }, + SEGMENTS: { + segment1.key: segment1.to_json_dict(), + segment2.key: segment2.to_json_dict(), + }, + } + + +@pytest.fixture +def prereq_data() -> Dict: + flag1 = FlagBuilder('flag1').version(1).on(False).prerequisite('flag2', 0).build() + flag2 = FlagBuilder('flag2').version(1).on(False).prerequisite('flag3', 0).prerequisite('flag4', 0).prerequisite('flag6', 0).build() + flag3 = FlagBuilder('flag3').version(1).on(False).build() + flag4 = FlagBuilder('flag4').version(1).on(False).build() + flag5 = FlagBuilder('flag5').version(1).on(False).build() + flag6 = FlagBuilder('flag6').version(1).rules( + FlagRuleBuilder().variation(0).id('rule_id').track_events(True).clauses( + make_clause('user', 'segmentMatch', 'segmentMatch', 'segment2') + ).build() + ).build() + segment1 = SegmentBuilder('segment1').version(1).build() + segment2 = SegmentBuilder('segment2').version(1).rules( + SegmentRuleBuilder().clauses( + make_clause('user', 'segmentMatch', 'segmentMatch', 'segment1') + ).build() + ).build() + + return { + FEATURES: { + flag1.key: flag1.to_json_dict(), + flag2.key: flag2.to_json_dict(), + flag3.key: flag3.to_json_dict(), + flag4.key: flag4.to_json_dict(), + flag5.key: flag5.to_json_dict(), + flag6.key: flag6.to_json_dict(), + }, + SEGMENTS: { + segment1.key: segment1.to_json_dict(), + segment2.key: segment2.to_json_dict(), + }, + } + + +def test_defaults_to_initializing(): + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), Listeners()) + assert sink.status.state == DataSourceState.INITIALIZING + + +def test_interrupting_initializing_stays_initializing(): + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), Listeners()) + sink.update_status(DataSourceState.INTERRUPTED, None) + assert sink.status.state == DataSourceState.INITIALIZING + assert sink.status.error is None + + +def test_listener_is_only_triggered_for_state_changes(): + spy = SpyListener() + status_listener = Listeners() + status_listener.add(spy) + + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), status_listener, Listeners()) + sink.update_status(DataSourceState.VALID, None) + sink.update_status(DataSourceState.VALID, None) + assert len(spy.statuses) == 1 + + sink.update_status(DataSourceState.INTERRUPTED, None) + sink.update_status(DataSourceState.INTERRUPTED, None) + assert len(spy.statuses) == 2 + + +def test_all_listeners_triggered_for_single_change(): + spy1 = SpyListener() + spy2 = SpyListener() + + status_listener = Listeners() + status_listener.add(spy1) + status_listener.add(spy2) + + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), status_listener, Listeners()) + sink.update_status(DataSourceState.VALID, None) + + assert len(spy1.statuses) == 1 + assert len(spy2.statuses) == 1 + + +def test_is_called_once_per_flag_during_init(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + flag1 = FlagBuilder('flag1').version(2).on(False).build() + flag4 = FlagBuilder('flag4').version(1).on(False).build() + + spy = SpyListener() + flag_change_listener.add(spy) + sink.init({ + FEATURES: { + flag1.key: flag1, + flag4.key: flag4, + } + }) + + assert len(spy.statuses) == 4 + keys = set(s.key for s in spy.statuses) # No guaranteed order + + assert 'flag1' in keys # Version update + assert 'flag2' in keys # Deleted + assert 'flag3' in keys # Deleted + assert 'flag4' in keys # Newly created + + +def test_upsert_triggers_flag_listener(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + spy = SpyListener() + flag_change_listener.add(spy) + sink.upsert(FEATURES, FlagBuilder('flag1').version(2).on(False).build()) + + assert len(spy.statuses) == 1 + assert spy.statuses[0].key == 'flag1' + + +def test_delete_triggers_flag_listener(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + spy = SpyListener() + flag_change_listener.add(spy) + sink.delete(FEATURES, 'flag1', 2) + + # TODO(sc-212471): Once the store starts returning a success status on delete, the flag change + # notification can start ignoring duplicate requests like this. + # sink.delete(FEATURES, 'flag1', 2) + + assert len(spy.statuses) == 1 + assert spy.statuses[0].key == 'flag1' + + +def test_triggers_if_segment_changes(basic_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(basic_data) + + spy = SpyListener() + flag_change_listener.add(spy) + sink.upsert(SEGMENTS, SegmentBuilder('segment2').version(2).build()) + + assert len(spy.statuses) == 1 + assert spy.statuses[0].key == 'flag3' + + +def test_dependency_stack_if_top_of_chain_is_changed(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(FEATURES, FlagBuilder('flag4').version(2).on(False).build()) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag4' in keys + + +def test_triggers_when_new_prereqs_added(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(FEATURES, FlagBuilder('flag3').version(2).on(False).prerequisite('flag4', 0).build()) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag3' in keys + + +def test_triggers_when_prereqs_removed(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(FEATURES, FlagBuilder('flag2').version(2).on(False).prerequisite('flag3', 0).build()) + + assert len(spy.statuses) == 2 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + + +def test_triggers_dependency_stack_if_top_of_chain_is_deleted(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.delete(FEATURES, 'flag4', 2) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag4' in keys + + +def test_triggers_dependent_segment_is_modified(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.upsert(SEGMENTS, SegmentBuilder('segment1').version(2).build()) + # TODO(sc-212471): Once the store starts returning a success status on upsert, the flag change + # notification can start ignoring duplicate requests like this. + # sink.upsert(SEGMENTS, SegmentBuilder('segment1').version(2).build()) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag6' in keys + + +def test_triggers_if_dependent_segment_removed(prereq_data): + flag_change_listener = Listeners() + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), Listeners(), flag_change_listener) + sink.init(prereq_data) + + spy = SpyListener() + flag_change_listener.add(spy) + + sink.delete(SEGMENTS, 'segment2', 2) + # TODO(sc-212471): Once the store starts returning a success status on delete, the flag change + # notification can start ignoring duplicate requests like this. + # sink.delete(SEGMENTS, 'segment2', 2) + + assert len(spy.statuses) == 3 + + keys = set(s.key for s in spy.statuses) + assert 'flag1' in keys + assert 'flag2' in keys + assert 'flag6' in keys + + +def confirm_store_error(fn: Callable[[DataSourceUpdateSinkImpl], None], expected_error: str): + status_listeners = Listeners() + + sink = DataSourceUpdateSinkImpl(InMemoryFeatureStore(), status_listeners, Listeners()) + # Make it valid first so the error changes from initializing + sink.update_status(DataSourceState.VALID, None) + + spy = SpyListener() + status_listeners.add(spy) + + try: + fn(sink) + except (Exception,): + pass + + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.INTERRUPTED + assert spy.statuses[0].error.kind == DataSourceErrorKind.STORE_ERROR + assert spy.statuses[0].error.message == expected_error + + +@mock.patch('ldclient.feature_store.InMemoryFeatureStore.init', side_effect=[Exception('cannot init')]) +def test_listener_is_triggered_for_init_error(prereq_data): + confirm_store_error(lambda sink: sink.init(prereq_data), 'cannot init') + + +@mock.patch('ldclient.feature_store.InMemoryFeatureStore.upsert', side_effect=[Exception('cannot upsert')]) +def test_listener_is_triggered_for_upsert_error(prereq_data): + confirm_store_error(lambda sink: sink.upsert(FEATURES, {}), 'cannot upsert') + + +@mock.patch('ldclient.feature_store.InMemoryFeatureStore.delete', side_effect=[Exception('cannot delete')]) +def test_listener_is_triggered_for_delete_error(prereq_data): + confirm_store_error(lambda sink: sink.delete(FEATURES, 'key', 1), 'cannot delete') diff --git a/testing/impl/test_flag_tracker.py b/testing/impl/test_flag_tracker.py new file mode 100644 index 00000000..bcdaba85 --- /dev/null +++ b/testing/impl/test_flag_tracker.py @@ -0,0 +1,80 @@ +from ldclient.impl.flag_tracker import FlagTrackerImpl +from testing.test_util import SpyListener +from ldclient.impl.listeners import Listeners +from ldclient.interfaces import FlagChange + + +def test_can_add_and_remove_listeners(): + spy = SpyListener() + listeners = Listeners() + + tracker = FlagTrackerImpl(listeners, lambda: None) + tracker.add_listener(spy) + + listeners.notify(FlagChange('flag-1')) + listeners.notify(FlagChange('flag-2')) + + tracker.remove_listener(spy) + + listeners.notify(FlagChange('flag-3')) + + assert len(spy.statuses) == 2 + assert spy.statuses[0].key == 'flag-1' + assert spy.statuses[1].key == 'flag-2' + + +def test_flag_change_listener_notified_when_value_changes(): + responses = ['initial', 'second', 'second', 'final'] + + def eval_fn(key, context): + return responses.pop(0) + + listeners = Listeners() + tracker = FlagTrackerImpl(listeners, eval_fn) + + spy = SpyListener() + tracker.add_flag_value_change_listener('flag-key', None, spy) + assert len(spy.statuses) == 0 + + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + # No change was returned here (:second -> :second), so expect no change + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 2 + + assert spy.statuses[0].key == 'flag-key' + assert spy.statuses[0].old_value == 'initial' + assert spy.statuses[0].new_value == 'second' + + assert spy.statuses[1].key == 'flag-key' + assert spy.statuses[1].old_value == 'second' + assert spy.statuses[1].new_value == 'final' + + +def test_flag_change_listener_returns_listener_we_can_unregister(): + responses = ['first', 'second', 'third'] + + def eval_fn(key, context): + return responses.pop(0) + + listeners = Listeners() + tracker = FlagTrackerImpl(listeners, eval_fn) + + spy = SpyListener() + created_listener = tracker.add_flag_value_change_listener('flag-key', None, spy) + assert len(spy.statuses) == 0 + + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + tracker.remove_listener(created_listener) + listeners.notify(FlagChange('flag-key')) + assert len(spy.statuses) == 1 + + assert spy.statuses[0].key == 'flag-key' + assert spy.statuses[0].old_value == 'first' + assert spy.statuses[0].new_value == 'second' diff --git a/testing/impl/test_retry_delay.py b/testing/impl/test_retry_delay.py deleted file mode 100644 index 0538f34f..00000000 --- a/testing/impl/test_retry_delay.py +++ /dev/null @@ -1,81 +0,0 @@ -from ldclient.impl.retry_delay import RetryDelayStrategy, DefaultBackoffStrategy, DefaultJitterStrategy - -import math -import time - -def test_fixed_retry_delay(): - d0 = 10 - r = RetryDelayStrategy(d0, 0, None, None) - t0 = time.time() - 60 - d1 = r.next_retry_delay(t0) - d2 = r.next_retry_delay(t0 + 1) - d3 = r.next_retry_delay(t0 + 2) - assert d1 == d0 - assert d2 == d0 - assert d3 == d0 - -def test_backoff_without_jitter(): - d0 = 10 - max = 60 - r = RetryDelayStrategy(d0, 0, DefaultBackoffStrategy(max), None) - t0 = time.time() - 60 - d1 = r.next_retry_delay(t0) - d2 = r.next_retry_delay(t0 + 1) - d3 = r.next_retry_delay(t0 + 2) - d4 = r.next_retry_delay(t0 + 3) - assert d1 == d0 - assert d2 == d0 * 2 - assert d3 == d0 * 4 - assert d4 == max - -def test_jitter_without_backoff(): - d0 = 1 - seed = 1000 - r = RetryDelayStrategy(d0, 0, None, DefaultJitterStrategy(0.5, seed)) - t0 = time.time() - 60 - d1 = r.next_retry_delay(t0) - d2 = r.next_retry_delay(t0 + 1) - d3 = r.next_retry_delay(t0 + 2) - assert math.trunc(d1 * 1000) == 611 # these are the randomized values we expect from that fixed seed value - assert math.trunc(d2 * 1000) == 665 - assert math.trunc(d3 * 1000) == 950 - -def test_jitter_with_backoff(): - d0 = 1 - max = 60 - seed = 1000 - r = RetryDelayStrategy(d0, 0, DefaultBackoffStrategy(max), DefaultJitterStrategy(0.5, seed)) - t0 = time.time() - 60 - d1 = r.next_retry_delay(t0) - d2 = r.next_retry_delay(t0 + 1) - d3 = r.next_retry_delay(t0 + 2) - assert math.trunc(d1 * 1000) == 611 - assert math.trunc(d2 / 2 * 1000) == 665 - assert math.trunc(d3 / 4 * 1000) == 950 - -def test_backoff_reset_interval(): - d0 = 10 - max = 60 - reset_interval = 45 - r = RetryDelayStrategy(d0, reset_interval, DefaultBackoffStrategy(max), None) - - t0 = time.time() - 60 - r.set_good_since(50) - - t1 = t0 + 1 - d1 = r.next_retry_delay(t1) - assert d1 == d0 - - t2 = t1 + 1 - r.set_good_since(t2) - - t3 = t2 + 10 - d2 = r.next_retry_delay(t3) - assert d2 == d0 * 2 - - t4 = t3 + d2 - r.set_good_since(t4) - - t5 = t4 + reset_interval - d3 = r.next_retry_delay(t5) - assert d3 == d0 # it's gone back to the initial delay because reset_interval has elapsed since t4 diff --git a/testing/impl/test_sse.py b/testing/impl/test_sse.py deleted file mode 100644 index 9e006531..00000000 --- a/testing/impl/test_sse.py +++ /dev/null @@ -1,89 +0,0 @@ -from ldclient.impl.sse import _BufferedLineReader, SSEClient - -from testing.http_util import ChunkedResponse, start_server - -import pytest - - -class TestBufferedLineReader: - @pytest.fixture(params = ["\r", "\n", "\r\n"]) - def terminator(self, request): - return request.param - - @pytest.fixture(params = [ - [ - [ "first line*", "second line*", "3rd line*" ], - [ "first line", "second line", "3rd line"] - ], - [ - [ "*", "second line*", "3rd line*" ], - [ "", "second line", "3rd line"] - ], - [ - [ "first line*", "*", "3rd line*" ], - [ "first line", "", "3rd line"] - ], - [ - [ "first line*", "*", "*", "*", "3rd line*" ], - [ "first line", "", "", "", "3rd line" ] - ], - [ - [ "first line*second line*third", " line*fourth line*"], - [ "first line", "second line", "third line", "fourth line" ] - ], - ]) - def inputs_outputs(self, terminator, request): - inputs = list(s.replace("*", terminator).encode() for s in request.param[0]) - return [inputs, request.param[1]] - - def test_parsing(self, inputs_outputs): - assert list(_BufferedLineReader.lines_from(inputs_outputs[0])) == inputs_outputs[1] - - def test_mixed_terminators(self): - chunks = [ - b"first line\nsecond line\r\nthird line\r", - b"\nfourth line\r", - b"\r\nlast\r\n" - ] - expected = [ - "first line", - "second line", - "third line", - "fourth line", - "", - "last" - ] - assert list(_BufferedLineReader.lines_from(chunks)) == expected - - -# The tests for SSEClient are fairly basic, just ensuring that it is really making HTTP requests and that the -# API works as expected. The contract test suite is much more thorough - see sse-contract-tests. - -class TestSSEClient: - def test_sends_expected_headers(self): - with start_server() as server: - with ChunkedResponse({ 'Content-Type': 'text/event-stream' }) as stream: - server.for_path('/', stream) - client = SSEClient(server.uri) - - r = server.await_request() - assert r.headers['Accept'] == 'text/event-stream' - assert r.headers['Cache-Control'] == 'no-cache' - - def test_receives_messages(self): - with start_server() as server: - with ChunkedResponse({ 'Content-Type': 'text/event-stream' }) as stream: - server.for_path('/', stream) - client = SSEClient(server.uri) - - stream.push("event: event1\ndata: data1\n\nevent: event2\ndata: data2\n\n") - - events = client.events - - event1 = next(events) - assert event1.event == 'event1' - assert event1.data == 'data1' - - event2 = next(events) - assert event2.event == 'event2' - assert event2.data == 'data2' diff --git a/testing/integrations/persistent_feature_store_test_base.py b/testing/integrations/persistent_feature_store_test_base.py index bb02bda0..82dae2da 100644 --- a/testing/integrations/persistent_feature_store_test_base.py +++ b/testing/integrations/persistent_feature_store_test_base.py @@ -1,5 +1,4 @@ from abc import abstractmethod, abstractproperty -from os import environ import pytest from ldclient.feature_store import CacheConfig @@ -7,8 +6,7 @@ from ldclient.versioned_data_kind import FEATURES from testing.feature_store_test_base import FeatureStoreTestBase, FeatureStoreTester, StoreTestScope - -skip_database_tests = environ.get('LD_SKIP_DATABASE_TESTS') == '1' +from testing.test_util import skip_database_tests # The standard test suite to be run against all persistent feature store implementations. See diff --git a/testing/integrations/test_consul.py b/testing/integrations/test_consul.py index d70b8174..4292b445 100644 --- a/testing/integrations/test_consul.py +++ b/testing/integrations/test_consul.py @@ -1,6 +1,8 @@ from ldclient.integrations import Consul from testing.integrations.persistent_feature_store_test_base import * +from testing.test_util import skip_database_tests +import pytest have_consul = False try: @@ -12,6 +14,20 @@ pytestmark = pytest.mark.skipif(not have_consul, reason="skipping Consul tests because consul module is not installed") +@pytest.mark.skipif(skip_database_tests, reason="skipping database tests") +def consul_defaults_to_available(): + consul = Consul.new_feature_store() + assert consul.is_monitoring_enabled() is True + assert consul.is_available() is True + + +@pytest.mark.skipif(skip_database_tests, reason="skipping database tests") +def consul_detects_nonexistent_store(): + consul = Consul.new_feature_store(host='http://i-mean-what-are-the-odds') + assert consul.is_monitoring_enabled() is True + assert consul.is_available() is False + + class ConsulFeatureStoreTester(PersistentFeatureStoreTester): def create_persistent_feature_store(self, prefix, caching) -> FeatureStore: return Consul.new_feature_store(prefix=prefix, caching=caching) diff --git a/testing/integrations/test_dynamodb.py b/testing/integrations/test_dynamodb.py index 4c314ad7..e558a8c6 100644 --- a/testing/integrations/test_dynamodb.py +++ b/testing/integrations/test_dynamodb.py @@ -5,6 +5,7 @@ from testing.integrations.big_segment_store_test_base import * from testing.integrations.persistent_feature_store_test_base import * +from testing.test_util import skip_database_tests import time @@ -18,6 +19,24 @@ pytestmark = pytest.mark.skipif(not have_dynamodb, reason="skipping DynamoDB tests because boto3 module is not installed") +@pytest.mark.skipif(skip_database_tests, reason="skipping database tests") +def dynamodb_defaults_to_available(): + dynamodb = DynamoDB.new_feature_store(DynamoDBTestHelper.table_name, + prefix=prefix, caching=caching, dynamodb_opts=DynamoDBTestHelper.options) + assert dynamodb.is_monitoring_enabled() is True + assert dynamodb.is_available() is True + + +@pytest.mark.skipif(skip_database_tests, reason="skipping database tests") +def dynamodb_detects_nonexistent_store(): + options = DynamoDBTestHelper.options + options['endpoint_url'] = 'http://i-mean-what-are-the-odds' + dynamodb = DynamoDB.new_feature_store(DynamoDBTestHelper.table_name, + prefix=prefix, caching=caching, dynamodb_opts=options) + assert dynamodb.is_monitoring_enabled() is True + assert dynamodb.is_available() is False + + class DynamoDBTestHelper: table_name = 'LD_DYNAMODB_TEST_TABLE' table_created = False diff --git a/testing/integrations/test_redis.py b/testing/integrations/test_redis.py index 9301092e..52e571cc 100644 --- a/testing/integrations/test_redis.py +++ b/testing/integrations/test_redis.py @@ -4,7 +4,9 @@ from testing.integrations.big_segment_store_test_base import * from testing.integrations.persistent_feature_store_test_base import * +from testing.test_util import skip_database_tests +import pytest import json have_redis = False @@ -17,11 +19,25 @@ pytestmark = pytest.mark.skipif(not have_redis, reason="skipping Redis tests because redis module is not installed") +@pytest.mark.skipif(skip_database_tests, reason="skipping database tests") +def redis_defaults_to_available(): + redis = Redis.new_feature_store() + assert redis.is_monitoring_enabled() is True + assert redis.is_available() is True + + +@pytest.mark.skipif(skip_database_tests, reason="skipping database tests") +def redis_detects_nonexistent_store(): + redis = Redis.new_feature_store(url='http://i-mean-what-are-the-odds') + assert redis.is_monitoring_enabled() is True + assert redis.is_available() is False + + class RedisTestHelper: @staticmethod def make_client() -> redis.StrictRedis: return redis.StrictRedis(host="localhost", port=6379, db=0) - + def clear_data_for_prefix(prefix): r = RedisTestHelper.make_client() for key in r.keys("%s:*" % prefix): diff --git a/testing/stub_util.py b/testing/stub_util.py index 8bddcdad..2a94d9e8 100644 --- a/testing/stub_util.py +++ b/testing/stub_util.py @@ -20,6 +20,9 @@ def make_put_event(flags = [], segments = []): data = { "data": { "flags": make_items_map(flags), "segments": make_items_map(segments) } } return 'event:put\ndata: %s\n\n' % json.dumps(data) +def make_invalid_put_event(): + return 'event:put\ndata: {"data": {\n\n' + def make_patch_event(kind, item): path = '%s%s' % (kind.stream_api_path, item['key']) data = { "path": path, "data": item_as_json(item) } diff --git a/testing/test_feature_store_client_wrapper.py b/testing/test_feature_store_client_wrapper.py new file mode 100644 index 00000000..134c268d --- /dev/null +++ b/testing/test_feature_store_client_wrapper.py @@ -0,0 +1,105 @@ +from unittest.mock import Mock +from typing import Callable, List +from threading import Event + +from ldclient.client import _FeatureStoreClientWrapper +from ldclient.impl.listeners import Listeners +from ldclient.impl.datastore.status import DataStoreUpdateSinkImpl + + +class CallbackListener: + def __init__(self, fn: Callable): + self.__fn = fn + + def __call__(self, status): + self.__fn(status) + + +class RecordStatusListener: + def __init__(self): + self.__status = [] + + def __call__(self, status): + self.__status.append(status) + + @property + def statuses(self) -> List: + return self.__status + + +def raise_an_error(): + raise Exception('init error') + + +def test_store_will_not_notify_if_wrapped_store_does_not_support_monitoring(): + store = Mock() + store.is_monitoring_enabled = lambda: False + store.init = raise_an_error + + listener = RecordStatusListener() + listeners = Listeners() + listeners.add(listener) + sink = DataStoreUpdateSinkImpl(listeners) + + wrapper = _FeatureStoreClientWrapper(store, sink) + try: + wrapper.init({}) + raise Exception("init should have raised an exception") + except BaseException: + pass + + assert len(listener.statuses) == 0 + + +def test_store_will_not_notify_if_wrapped_store_cannot_come_back_online(): + store = Mock() + store.is_monitoring_enabled = lambda: True + store.init = raise_an_error + + listener = RecordStatusListener() + listeners = Listeners() + listeners.add(listener) + sink = DataStoreUpdateSinkImpl(listeners) + + wrapper = _FeatureStoreClientWrapper(store, sink) + try: + wrapper.init({}) + raise Exception("init should have raised an exception") + except BaseException: + pass + + assert len(listener.statuses) == 1 + + +def test_sink_will_be_notified_when_store_is_back_online(): + event = Event() + statuses = [] + + def set_event(status): + statuses.append(status) + if status.available: + event.set() + + results = [False, True] + store = Mock() + store.is_monitoring_enabled = lambda: True + store.is_available = lambda: results.pop(0) + store.init = raise_an_error + + listener = CallbackListener(set_event) + listeners = Listeners() + listeners.add(listener) + sink = DataStoreUpdateSinkImpl(listeners) + + wrapper = _FeatureStoreClientWrapper(store, sink) + try: + wrapper.init({}) + raise Exception("init should have raised an exception") + except BaseException: + pass + + event.wait(2) + + assert len(statuses) == 2 + assert statuses[0].available is False + assert statuses[1].available is True diff --git a/testing/test_feature_store_helpers.py b/testing/test_feature_store_helpers.py index 0e2da11b..0896313d 100644 --- a/testing/test_feature_store_helpers.py +++ b/testing/test_feature_store_helpers.py @@ -1,5 +1,6 @@ import pytest from time import sleep +from unittest.mock import Mock from ldclient.feature_store import CacheConfig from ldclient.feature_store_helpers import CachingStoreWrapper @@ -71,6 +72,21 @@ class CustomError(Exception): pass class TestCachingStoreWrapper: + @pytest.mark.parametrize("available", [False, True]) + def test_monitoring_enabled_if_available_is_defined(self, available: bool): + core = Mock() + core.is_available = lambda: available + + wrapper = make_wrapper(core, False) + + assert wrapper.is_monitoring_enabled() is True + assert wrapper.is_available() is available + + @pytest.mark.parametrize("available", [False, True]) + def test_monitoring_not_enabled_if_available_is_not_defined(self, available: bool): + wrapper = make_wrapper(MockCore(), False) + assert wrapper.is_monitoring_enabled() is False + @pytest.mark.parametrize("cached", [False, True]) def test_get_item(self, cached): core = MockCore() diff --git a/testing/test_file_data_source.py b/testing/test_file_data_source.py index 0ff3b0d6..b84e986b 100644 --- a/testing/test_file_data_source.py +++ b/testing/test_file_data_source.py @@ -1,5 +1,7 @@ import json import os +from typing import List + import pytest import tempfile import threading @@ -8,9 +10,14 @@ from ldclient.client import LDClient, Context from ldclient.config import Config from ldclient.feature_store import InMemoryFeatureStore +from ldclient.impl.datasource.status import DataSourceUpdateSinkImpl +from ldclient.impl.listeners import Listeners from ldclient.integrations import Files +from ldclient.interfaces import DataSourceStatus, DataSourceState, DataSourceErrorKind from ldclient.versioned_data_kind import FEATURES, SEGMENTS +from testing.test_util import SpyListener + have_yaml = False try: import yaml @@ -98,9 +105,9 @@ def teardown_function(): if data_source is not None: data_source.stop() -def make_data_source(**kwargs): +def make_data_source(config, **kwargs): global data_source - data_source = Files.new_data_source(**kwargs)(Config("SDK_KEY"), store, ready) + data_source = Files.new_data_source(**kwargs)(config, store, ready) return data_source def make_temp_file(content): @@ -116,7 +123,7 @@ def replace_file(path, content): def test_does_not_load_data_prior_to_start(): path = make_temp_file('{"flagValues":{"key":"value"}}') try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) assert ready.is_set() is False assert source.initialized() is False assert store.initialized is False @@ -125,11 +132,40 @@ def test_does_not_load_data_prior_to_start(): def test_loads_flags_on_start_from_json(): path = make_temp_file(all_properties_json) + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + try: - source = make_data_source(paths = path) + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + source = make_data_source(config, paths = path) source.start() assert store.initialized is True assert sorted(list(store.all(FEATURES, lambda x: x).keys())) == all_flag_keys + + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.VALID + assert spy.statuses[0].error is None + finally: + os.remove(path) + +def test_handles_invalid_format_correctly(): + path = make_temp_file('{"flagValues":{') + spy = SpyListener() + listeners = Listeners() + listeners.add(spy) + + try: + config = Config("SDK_KEY") + config._data_source_update_sink = DataSourceUpdateSinkImpl(store, listeners, Listeners()) + source = make_data_source(config, paths = path) + source.start() + assert store.initialized is False + + assert len(spy.statuses) == 1 + assert spy.statuses[0].state == DataSourceState.INITIALIZING + assert spy.statuses[0].error.kind == DataSourceErrorKind.INVALID_DATA finally: os.remove(path) @@ -138,7 +174,7 @@ def test_loads_flags_on_start_from_yaml(): pytest.skip("skipping file source test with YAML because pyyaml isn't available") path = make_temp_file(all_properties_yaml) try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) source.start() assert store.initialized is True assert sorted(list(store.all(FEATURES, lambda x: x).keys())) == all_flag_keys @@ -148,7 +184,7 @@ def test_loads_flags_on_start_from_yaml(): def test_sets_ready_event_and_initialized_on_successful_load(): path = make_temp_file(all_properties_json) try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) source.start() assert source.initialized() is True assert ready.is_set() is True @@ -157,7 +193,7 @@ def test_sets_ready_event_and_initialized_on_successful_load(): def test_sets_ready_event_and_does_not_set_initialized_on_unsuccessful_load(): bad_file_path = 'no-such-file' - source = make_data_source(paths = bad_file_path) + source = make_data_source(Config("SDK_KEY"), paths = bad_file_path) source.start() assert source.initialized() is False assert ready.is_set() is True @@ -166,7 +202,7 @@ def test_can_load_multiple_files(): path1 = make_temp_file(flag_only_json) path2 = make_temp_file(segment_only_json) try: - source = make_data_source(paths = [ path1, path2 ]) + source = make_data_source(Config("SDK_KEY"), paths = [ path1, path2 ]) source.start() assert len(store.all(FEATURES, lambda x: x)) == 1 assert len(store.all(SEGMENTS, lambda x: x)) == 1 @@ -178,7 +214,7 @@ def test_does_not_allow_duplicate_keys(): path1 = make_temp_file(flag_only_json) path2 = make_temp_file(flag_only_json) try: - source = make_data_source(paths = [ path1, path2 ]) + source = make_data_source(Config("SDK_KEY"), paths = [ path1, path2 ]) source.start() assert len(store.all(FEATURES, lambda x: x)) == 0 finally: @@ -188,7 +224,7 @@ def test_does_not_allow_duplicate_keys(): def test_does_not_reload_modified_file_if_auto_update_is_off(): path = make_temp_file(flag_only_json) try: - source = make_data_source(paths = path) + source = make_data_source(Config("SDK_KEY"), paths = path) source.start() assert len(store.all(SEGMENTS, lambda x: x)) == 0 time.sleep(0.5) @@ -202,17 +238,17 @@ def do_auto_update_test(options): path = make_temp_file(flag_only_json) options['paths'] = path try: - source = make_data_source(**options) + source = make_data_source(Config("SDK_KEY"), **options) source.start() assert len(store.all(SEGMENTS, lambda x: x)) == 0 time.sleep(0.5) replace_file(path, segment_only_json) - deadline = time.time() + 10 + deadline = time.time() + 20 while time.time() < deadline: time.sleep(0.1) if len(store.all(SEGMENTS, lambda x: x)) == 1: return - assert False, "Flags were not reloaded after 10 seconds" + assert False, "Flags were not reloaded after 20 seconds" finally: os.remove(path) diff --git a/testing/test_in_memory_feature_store.py b/testing/test_in_memory_feature_store.py index 5cd8ba8c..72076897 100644 --- a/testing/test_in_memory_feature_store.py +++ b/testing/test_in_memory_feature_store.py @@ -5,6 +5,12 @@ from testing.feature_store_test_base import FeatureStoreTestBase, FeatureStoreTester +def test_in_memory_status_checks(): + store = InMemoryFeatureStore() + + assert store.is_monitoring_enabled() is False + assert store.is_available() is True + class InMemoryFeatureStoreTester(FeatureStoreTester): def create_feature_store(self) -> FeatureStore: diff --git a/testing/test_ldclient_evaluation.py b/testing/test_ldclient_evaluation.py index 3e66e855..02ecd0a9 100644 --- a/testing/test_ldclient_evaluation.py +++ b/testing/test_ldclient_evaluation.py @@ -323,7 +323,7 @@ def test_all_flags_state_can_omit_details_for_untracked_flags(): store.init({ FEATURES: { 'key1': flag1, 'key2': flag2, 'key3': flag3 } }) client = make_client(store) state = client.all_flags_state(user, with_reasons=True, details_only_for_tracked_flags=True) - assert state.valid == True + assert state.valid is True result = state.to_json_dict() assert result == { 'key1': 'value1', diff --git a/testing/test_ldclient_listeners.py b/testing/test_ldclient_listeners.py index b160135e..2a7798b7 100644 --- a/testing/test_ldclient_listeners.py +++ b/testing/test_ldclient_listeners.py @@ -1,10 +1,13 @@ from ldclient.client import LDClient, Config +from ldclient.interfaces import DataSourceState from ldclient.config import BigSegmentsConfig from testing.mock_components import MockBigSegmentStore -from testing.stub_util import MockEventProcessor, MockUpdateProcessor +from testing.stub_util import MockEventProcessor, MockUpdateProcessor, make_put_event, stream_content +from testing.http_util import start_server from queue import Queue + def test_big_segment_store_status_unavailable(): config=Config( sdk_key='SDK_KEY', @@ -45,3 +48,23 @@ def test_big_segment_store_status_updates(): assert status3.stale == False assert client.big_segment_store_status_provider.status.available == True + +def test_data_source_status_default(): + config=Config( + sdk_key='SDK_KEY', + event_processor_class=MockEventProcessor, + update_processor_class=MockUpdateProcessor + ) + client = LDClient(config) + assert client.data_source_status_provider.status.state == DataSourceState.INITIALIZING + + +def test_data_source_status_updates(): + with start_server() as stream_server: + with stream_content(make_put_event()) as stream_handler: + stream_server.for_path('/all', stream_handler) + config = Config(sdk_key='sdk-key', stream_uri=stream_server.uri, send_events=False) + + with LDClient(config=config) as client: + assert client.data_source_status_provider.status.state == DataSourceState.VALID + assert client.data_source_status_provider.status.error is None diff --git a/testing/test_util.py b/testing/test_util.py index 5329d018..86f044f0 100644 --- a/testing/test_util.py +++ b/testing/test_util.py @@ -1,5 +1,8 @@ from ldclient.impl.util import redact_password import pytest +import os + +skip_database_tests = os.environ.get('LD_SKIP_DATABASE_TESTS') == '1' @pytest.fixture(params = [ ("rediss://user:password=@redis-server-url:6380/0?ssl_cert_reqs=CERT_REQUIRED", "rediss://user:xxxx@redis-server-url:6380/0?ssl_cert_reqs=CERT_REQUIRED"), @@ -14,3 +17,15 @@ def test_can_redact_password(password_redaction_tests): input, expected = password_redaction_tests assert redact_password(input) == expected + + +class SpyListener: + def __init__(self): + self._statuses = [] + + def __call__(self, status): + self._statuses.append(status) + + @property + def statuses(self): + return self._statuses