-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfig.py
111 lines (77 loc) · 3.99 KB
/
config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import logging
import os
import re
from typing import Callable, Dict, List, Optional, Protocol, Union
import attrs
from openeo_driver.config import OpenEoBackendConfig
from openeo_driver.config.load import ConfigGetter
from openeo_driver.server import build_backend_deploy_metadata
from openeo_driver.utils import smart_bool
import openeo_aggregator.about
_log = logging.getLogger(__name__)
CACHE_TTL_DEFAULT = 6 * 60 * 60
# Timeouts for requests to back-ends
CONNECTION_TIMEOUT_DEFAULT = 60
CONNECTION_TIMEOUT_INIT = 12.5
CONNECTION_TIMEOUT_RESULT = 15 * 60
CONNECTION_TIMEOUT_JOB_START = 5 * 60
CONNECTION_TIMEOUT_JOB_LOGS = 2 * 60
STREAM_CHUNK_SIZE_DEFAULT = 10 * 1024
class ConfigException(ValueError):
pass
class JobOptionsUpdater(Protocol):
"""API for `job_options_update` config (callable)"""
def __call__(self, job_options: Union[dict, None], backend_id: str) -> Union[dict, None]:
"""Return updated job options dict"""
...
class ProcessAllowed(Protocol):
"""API for a process allow/deny list, implemented as callable, based on process id, backend, experimental flag"""
def __call__(self, process_id: str, backend_id: str, experimental: bool) -> bool:
"""Allow given process id (for given backend id, or experimental flag)?"""
...
@attrs.frozen(kw_only=True)
class AggregatorBackendConfig(OpenEoBackendConfig):
capabilities_backend_version: str = openeo_aggregator.about.__version__
capabilities_deploy_metadata: dict = build_backend_deploy_metadata(
packages=["openeo", "openeo_driver", "openeo_aggregator"],
)
aggregator_backends: Dict[str, str] = attrs.field(validator=attrs.validators.min_len(1))
# See `ZooKeeperPartitionedJobDB.from_config` for supported fields.
partitioned_job_tracking: Optional[dict] = None
streaming_chunk_size: int = STREAM_CHUNK_SIZE_DEFAULT
auth_entitlement_check: Union[bool, dict] = False
# TTL for connection caching.
connections_cache_ttl: float = 5 * 60.0
# Allow list for collection ids to cover with the aggregator.
# By default (value `None`): support union of all upstream collections.
# To enable a real allow list, use a list of items as illustrated:
# [
# # Regular string: match exactly
# "COPERNICUS_30",
# # Regex pattern object: match collection id with regex (`fullmatch` mode)
# re.compile(r"CGLS_.*"),
# # Dict: match collection id (again as string or with regex pattern)
# # and additionally only consider specific backends by id (per `aggregator_backends` config)
# {"collection_id": "SENTINEL2_L2A", "allowed_backends": ["b2"]},
# ]
# A collection+backend combo will be included if any item matches.
# This means that if you want to exclude one backend from a particular collection,
# while still keeping all other collections (like the default behavior),
# you need to add an item that matches everything but that collection, e.g.:
# {"collection_id": "SENTINEL2_L2A", "allowed_backends": ["b2"]},
# re.compile("(?!SENTINEL2_L2A).*"),
collection_allow_list: Optional[List[Union[str, re.Pattern, dict]]] = None
# Process allow list (as callable) for process ids to cover with the aggregator. Accept all by default.
process_allowed: ProcessAllowed = lambda process_id, backend_id, experimental: True
zookeeper_prefix: str = "/openeo-aggregator/"
# See `memoizer_from_config` for details.
memoizer: Dict = attrs.Factory(lambda: {"type": "dict"})
zk_memoizer_tracking: bool = smart_bool(os.environ.get("OPENEO_AGGREGATOR_ZK_MEMOIZER_TRACKING"))
job_options_update: Optional[JobOptionsUpdater] = None
request_timeout_list_jobs: float = 10
request_timeout_list_jobs_overall: float = 20
# Internal singleton
_config_getter = ConfigGetter(expected_class=AggregatorBackendConfig)
def get_backend_config() -> AggregatorBackendConfig:
"""Public config getter"""
return _config_getter.get()