forked from microsoft/soundscape
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkubescape.py
51 lines (44 loc) · 1.61 KB
/
kubescape.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
"""
This is a minimal shim to replace missing code referenced by ingest.py.
"""
import os
from psycopg2.extensions import make_dsn, parse_dsn
class SoundscapeKube:
def __init__(self, arg1, arg2):
self.databases = {
'osm': {
'name': 'osm',
'dsn2': make_dsn(
user=os.environ['POSTGIS_USER'],
password=os.environ['POSTGIS_PASSWORD'],
host=os.environ['POSTGIS_HOST'],
port=os.environ['POSTGIS_PORT'],
dbname=os.environ['POSTGIS_DBNAME'],
),
'dbstatus': None,
}
}
def connect(self):
pass
def enumerate_databases(self):
return self.databases.values()
def get_database_status(self, db_name):
return self.databases[db_name]['dbstatus']
def set_database_status(self, db_name, status):
self.databases[db_name]['dbstatus'] = status
def get_url_dsn(self, dsn):
args = parse_dsn(dsn)
user = args.get('user', '')
password = args.get('password', '')
host = args.get('host', '')
port = args.get('port', '')
dbname = args.get('dbname', '')
return f"postgis://{user}:{password}@{host}:{port}/{dbname}"
def get_connstring_dsn(self, dsn):
args = parse_dsn(dsn)
user = args.get('user', '')
password = args.get('password', '')
host = args.get('host', '')
port = args.get('port', '')
dbname = args.get('dbname', '')
return f"dbname={dbname} user={user} password={password} host={host}"