Skip to content

Commit

Permalink
[EDS-631] Prevents long queries from consuming duplicate resources if…
Browse files Browse the repository at this point in the history
… queries are resubmitted by users (#137)

* Optimize imports

* Implement an object storage cache layer

Provides an interface and two implementations.

* Implement query synchronizer to allow processes to share a single query.

* Update search interface to take advantage of new query synchronizer to attach to ongoing queries and return their results.

* [Bot] Update version to 2.2.0

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
Thomas Thorogood and github-actions[bot] authored Jul 6, 2022
1 parent 62f2ad0 commit b4280d0
Show file tree
Hide file tree
Showing 12 changed files with 551 additions and 141 deletions.
30 changes: 21 additions & 9 deletions husky_directory/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)
from husky_directory.blueprints.search import SearchBlueprint
from husky_directory.models.search import SearchDirectoryInput
from husky_directory.services.object_store import ObjectStoreInjectorModule
from husky_directory.util import MetricsClient
from .app_config import (
ApplicationConfig,
Expand All @@ -45,6 +46,7 @@ def get_app_injector_modules() -> List[Type[Module]]:
return [
ApplicationConfigInjectorModule,
IdentityProviderModule,
ObjectStoreInjectorModule,
]


Expand Down Expand Up @@ -133,6 +135,7 @@ def provide_app(
app_blueprint: AppBlueprint,
saml_blueprint: SAMLBlueprint,
mock_saml_blueprint: MockSAMLBlueprint,
redis: Redis,
) -> Flask:
# First we have to do some logging configuration, before the
# app instance is created.
Expand Down Expand Up @@ -160,7 +163,7 @@ def provide_app(
# our dependencies appropriate for each request.
FlaskInjector(app=app, injector=injector)
FlaskJSONLogger(app)
self._configure_app_session(app, app_settings)
self._configure_app_session(app, app_settings, redis)
self._configure_prometheus(app, app_settings, injector)
attach_app_error_handlers(app)
self.register_jinja_extensions(app)
Expand Down Expand Up @@ -208,7 +211,9 @@ def verify_credentials(username: str, password: str):
injector.binder.bind(MetricsClient, metrics, scope=singleton)

@staticmethod
def _configure_app_session(app: Flask, app_settings: ApplicationConfig) -> NoReturn:
def _configure_app_session(
app: Flask, app_settings: ApplicationConfig, redis: Redis
) -> NoReturn:
# There is something wrong with the flask_session implementation that
# is supposed to translate flask config values into redis settings;
# also, it doesn't support authorization (what?!) so we have to
Expand All @@ -220,20 +225,27 @@ def _configure_app_session(app: Flask, app_settings: ApplicationConfig) -> NoRet
if app.config["SESSION_TYPE"] == "redis":
redis_settings = app_settings.redis_settings
app.logger.info(
f"Setting up redis cache with settings: {redis_settings.flask_config_values}"
f"Setting up redis session cache with settings: {redis_settings.flask_config_values}"
)
app.session_interface = RedisSessionInterface(
redis=Redis(
host=redis_settings.host,
port=redis_settings.port,
username=redis_settings.namespace,
password=redis_settings.password.get_secret_value(),
),
redis,
key_prefix=redis_settings.flask_config_values["SESSION_KEY_PREFIX"],
)
else:
Session(app)

@provider
def provide_redis(self, app_settings: ApplicationConfig) -> Redis:
redis_settings = app_settings.redis_settings
if not redis_settings.password:
return None
return Redis(
host=redis_settings.host,
port=redis_settings.port,
username=redis_settings.namespace,
password=redis_settings.password.get_secret_value(),
)


def create_app(injector: Optional[Injector] = None) -> Flask:
injector = injector or create_app_injector()
Expand Down
139 changes: 60 additions & 79 deletions husky_directory/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import string
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Optional, Type, TypeVar, Union, cast
from typing import Any, Dict, Optional, TypeVar, cast

import yaml
from injector import Module, inject, provider, singleton
from injector import Module, provider, singleton
from pydantic import BaseSettings, Field, SecretStr, validator

logger = logging.getLogger("app_config")
Expand Down Expand Up @@ -102,6 +101,9 @@ class RedisSettings(FlaskConfigurationSettings):
port: str = Field("6379", env="REDIS_PORT")
namespace: str = Field(None, env="REDIS_NAMESPACE")
password: SecretStr = Field(None, env="REDIS_PASSWORD")
default_cache_expire_seconds: Optional[int] = Field(
None, env="REDIS_CACHE_DEFAULT_EXPIRE_SECONDS"
)

@property
def flask_config_values(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -177,6 +179,59 @@ class ApplicationSecrets(BaseSettings):
prometheus_password: Optional[SecretStr] = Field(None, env="PROMETHEUS_PASSWORD")


class CacheExpirationSettings(BaseSettings):
class Config:
"""
Any of these settings can be tweaked by updating
the environment variables in the gcp-k8 helm release
for this app using this prefix (or in any environment
running this app).
e.g.: QUERY_CACHE_IN_PROGRESS_STATUS_EXPIRATION=60
"""

env_prefix = "QUERY_CACHE_"

# We should never expect a query to take more than
# 5 minutes to complete. If a query has taken /that/ long,
# we simply delete the lock. This can mean that if a
# query does take longer than 5 minutes,
# the next request for it will be allowed
# to proceed in its own process.
in_progress_status_expiration: int = 300 # Five minutes

# We do not want to cache completed queries for very long, because
# we want updates to user profiles to be reflected in
# near real-time. But, we want the value to persist
# long enough that if the user mashes the 'search' button
# while their browser is rendering thousands of
# entries as HTML, it'll still be there, and the results will
# seem to come faster to the user the second time around.
# Therefore, the value is 7.
completed_status_expiration: int = 7

# The error status expiration lets us check for and/or
# alarm on issues directly, without relying on logs, if the
# issue happens during the query process (which is the
# most likely place for an issue to occur). Remembering that
# if the query is re-attempted, its status will revert back to
# 'in progress', there is not really a point to keeping these for
# super long.
error_status_expiration: int = 3600 # 1 hour

# The error message expiration lasts longer so that event
# responders can access the error messages that were logged
# via the cache, instead of poring through logs, if necessary,
# for easier investigation. However, this is only a minor
# convenience, as the JSON logging will already contain a
# lot of information.

# To find errors in a redis cache, you can use the command:
# 'keys *:status:message'
# to get a list of all relevant keys in the shared cache.
error_message_expiration: int = 3600 * 24 # 24 hours


@singleton
class ApplicationConfig(FlaskConfigurationSettings):
"""
Expand Down Expand Up @@ -208,8 +263,9 @@ class ApplicationConfig(FlaskConfigurationSettings):
pws_settings: PWSSettings = PWSSettings()
auth_settings: AuthSettings = AuthSettings()
session_settings: SessionSettings = SessionSettings()
redis_settings: Optional[RedisSettings]
redis_settings: RedisSettings = RedisSettings()
metrics_settings: MetricsSettings = MetricsSettings()
cache_expiration_settings: CacheExpirationSettings = CacheExpirationSettings()
secrets: ApplicationSecrets = ApplicationSecrets()

@validator("redis_settings")
Expand Down Expand Up @@ -256,78 +312,3 @@ def provide_application_config(self) -> ApplicationConfig:
SettingsType = TypeVar(
"SettingsType", bound=BaseSettings
) # Used to type hint the return value of load_settings below


@singleton
class YAMLSettingsLoader:
"""
Complex configuration is hard to express as environment variables; so, for everything else, there's YAML.
YAML files loaded this way expect stage-based configuration.
Here is an example of a simple YAML file:
# foo.yml
base: &base
foo: bar
baz: boop
development: &development
<<: *base # Development uses all values from base
eval: &eval # Eval uses all settings from development, but overrides the 'baz' setting.
<<: *development
baz: snap
special: # Here is a special one-off stage that doesn't use anyone else's values
foo: blah
baz: also blah
prod:
<<: *eval
foo: AH!
The above configuration could be modeled and loaded:
class FooSettings:
foo: str
baz: str
settings = loader.load_settings('foo', output_type=FooSettings)
settings.foo # 'bar'
settings = loader.load_settings('foo')
settings['foo'] # 'bar'
"""

@inject
def __init__(self, app_config: ApplicationConfig):
self.app_config = app_config

@property
def settings_dir(self) -> str:
return self.app_config.settings_dir

def load_settings(
self,
settings_name: str,
output_type: Union[Type[SettingsType], Type[Dict]] = Dict,
) -> Union[Dict, SettingsType]:
"""
Given a configuration name, looks up the setting file from ApplicationConfig.settings_dir,
and loads the stage declared by ApplicationConfig.stage
If no output type is provided, the results will be in dict form.
"""
filename = os.path.join(self.settings_dir, f"{settings_name}.yml")
stage = self.app_config.stage
with open(filename) as f:
try:
settings = yaml.load(f, yaml.SafeLoader)[stage]
except KeyError as e:
raise KeyError(
f"{filename} has no configuration for stage '{stage}': {str(e)}"
)

if output_type is Dict:
return settings
return output_type.parse_obj(settings)
122 changes: 122 additions & 0 deletions husky_directory/services/object_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import json
import time
from abc import ABC, abstractmethod
from copy import copy
from enum import Enum
from typing import Any, Optional

from flask_injector import request
from injector import Module, provider
from pydantic import BaseModel
from redis import Redis

from husky_directory.app_config import ApplicationConfig
from husky_directory.util import AppLoggerMixIn


class ObjectStorageInterface(AppLoggerMixIn, ABC):
"""
Basic interface that does nothing but declare
abstractions.
It also provides a utility method that can convert
anything* into a string.
*if the thing you want to convert can't be converted,
add a case in the normalize_object_data implementation below.
"""

@abstractmethod
def get(self, key: str) -> Optional[str]:
return None

@abstractmethod
def put(self, key: str, obj: Any, expire_after_seconds: Optional[int] = None):
pass

@staticmethod
def normalize_object_data(obj: Any) -> str:
if isinstance(obj, BaseModel):
return obj.json()
if isinstance(obj, Enum):
obj = obj.value
if not isinstance(obj, str):
return json.dumps(obj)
return obj


class InMemoryObjectStorage(ObjectStorageInterface):
"""
Used when testing locally using flask itself,
cannot be shared between processes. This is a very
basic implementation which checks for key expiration
on every `put`.
"""

def __init__(self):
self.__store__ = {}
self.__key_expirations__ = {}

def validate_key_expiration(self, key: str):
expiration = self.__key_expirations__.get(key)
now = time.time()
if expiration:
max_elapsed = expiration["max"]
if not max_elapsed:
return
elapsed = now - expiration["stored"]
if elapsed > max_elapsed:
del self.__key_expirations__[key]
if key in self.__store__:
del self.__store__[key]

def expire_keys(self):
for key in copy(self.__key_expirations__):
self.validate_key_expiration(key)

def get(self, key: str) -> Optional[str]:
self.validate_key_expiration(key)
return self.__store__.get(key)

def put(self, key: str, obj: Any, expire_after_seconds: Optional[int] = None):
self.expire_keys()
self.__store__[key] = self.normalize_object_data(obj)
now = time.time()
self.__key_expirations__[key] = {"stored": now, "max": expire_after_seconds}
return key


class RedisObjectStorage(ObjectStorageInterface):
def __init__(self, redis: Redis, config: ApplicationConfig):
self.redis = redis
self.prefix = f"{config.redis_settings.namespace}:obj"

def normalize_key(self, key: str) -> str:
"""Normalizes the key using the configured namespace."""
if not key.startswith(self.prefix):
key = f"{self.prefix}:{key}"
return key

def put(self, key: str, obj: Any, expire_after_seconds: Optional[int] = None):
key = self.normalize_key(key)
self.redis.set(key, self.normalize_object_data(obj), ex=expire_after_seconds)
return key

def get(self, key: str) -> Optional[str]:
val = self.redis.get(self.normalize_key(key))
if val:
if isinstance(val, bytes):
return val.decode("UTF-8")
return val
return None


class ObjectStoreInjectorModule(Module):
@request
@provider
def provide_object_store(
self, redis: Redis, config: ApplicationConfig
) -> ObjectStorageInterface:
if config.redis_settings.host:
return RedisObjectStorage(redis, config)
return InMemoryObjectStorage()
Loading

0 comments on commit b4280d0

Please sign in to comment.