-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmodels.py
114 lines (89 loc) · 3.82 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import enum
from dataclasses import asdict, dataclass, field
import logging
from datetime import datetime, timezone
from typing import Dict, List, Optional
from backends import BaseBackend, RequestBackend, SocketBackend
from persistence import BaseRepository
logger = logging.getLogger(__name__)
HEALTHCHECK_BACKENDS = {
'socket': SocketBackend,
'request': RequestBackend,
}
class ServiceStatus(enum.Enum):
UNKNOWN = 'unknown'
HEALTHY = 'healthy'
UNHEALTHY = 'unhealthy'
def service_asdict_factory(data):
def convert_value(obj):
if isinstance(obj, ServiceStatus):
return obj.value
elif isinstance(obj, datetime):
return obj.strftime('%Y-%m-%dT%H:%M:%S.%f')
return obj
return dict((k, convert_value(v)) for k, v in data)
@dataclass
class Service:
service_type: str = field()
name: str = field()
domain: str = field()
port: int = field()
enabled: bool = field(default=True)
last_time_healthy: Optional[datetime] = field(default=None)
time_to_first_byte: float = field(default=0.0)
status: ServiceStatus = field(init=True, default=ServiceStatus.UNKNOWN)
expire_date: Optional[datetime] = field(default=None)
@property
def healthcheck_backend(self) -> BaseBackend:
return HEALTHCHECK_BACKENDS[self.service_type](self)
def __repr__(self) -> str: # pragma: no cover
return f'{self.name} <{self.domain}:{self.port}>'
def __str__(self) -> str: # pragma: no cover
return f'{self.name} <{self.domain}>'
def to_dict(self) -> Dict:
return asdict(self, dict_factory=service_asdict_factory)
class ServiceManager:
def __init__(self, persistence_backend: BaseRepository) -> None:
self.persistence_backend = persistence_backend
def update_service_status(self, service: Service, time_to_first_byte):
service.last_time_healthy = datetime.now(timezone.utc)
service.time_to_first_byte = time_to_first_byte
self.persistence_backend.update(service.to_dict())
def mark_as_healthy(self, service: Service):
service.status = ServiceStatus.HEALTHY
self.persistence_backend.update(service.to_dict())
def mark_as_unhealthy(self, service: Service):
service.status = ServiceStatus.UNHEALTHY
self.persistence_backend.update(service.to_dict())
def update(self, services: List[Dict]):
self.persistence_backend.bulk_update(services)
def fetch_all(self) -> List[Service]:
services = []
for service_data in self.persistence_backend.fetch_all():
status = service_data.pop('status')
service_status = ServiceStatus(status)
try:
service_data['last_time_healthy'] = datetime.strptime(
service_data['last_time_healthy'],
'%Y-%m-%dT%H:%M:%S.%f'
)
except (TypeError, KeyError) as e:
logger.debug(f'Exception occurred {e}')
try:
service_data['expire_date'] = datetime.strptime(
service_data['expire_date'],
'%Y-%m-%dT%H:%M:%S.%f'
)
except (TypeError, KeyError) as e:
logger.debug(f'Exception occurred {e}')
services.append(Service(status=service_status, **service_data))
return services
def fetch_active(self) -> List[Service]:
return [service for service in self.fetch_all() if service.enabled is True]
def add(self, service_type: str, name: str, domain: str, port: int, enabled: bool = True) -> Service:
service = Service(service_type.lower(), name, domain, int(port), enabled)
self.persistence_backend.add(service.to_dict())
return service
def remove(self, name) -> None:
self.persistence_backend.remove(name)
self.fetch_all()