Skip to content

Commit

Permalink
Add "groups" and "comment" to user info
Browse files Browse the repository at this point in the history
  • Loading branch information
thomas-vogels committed Nov 14, 2021
1 parent 0caac56 commit 76eac1b
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 74 deletions.
4 changes: 2 additions & 2 deletions python/etl/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ def __init__(self):
super().__init__(
"list_users",
"list users as they are configured",
"List all users and their groups in the way that they are configurd.",
"List all users and their groups in the way that they are configured.",
)

def add_arguments(self, parser):
Expand Down Expand Up @@ -756,7 +756,7 @@ def add_arguments(self, parser):
parser.add_argument(
"-a", "--add-user-schema", help="add new schema, writable for the user", action="store_true"
)
parser.add_argument("username", help="name of existing user")
parser.add_argument("name", help="name of user")

def callback(self, args):
with etl.db.log_error():
Expand Down
122 changes: 76 additions & 46 deletions python/etl/config/dw.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Data warehouse configuration based on config files for setup, sources, transformations, users."""

from typing import Dict, List
from typing import Dict, Iterable, List

import etl.config.env
import etl.db
Expand All @@ -13,14 +13,20 @@ class DataWarehouseUser:
"""
Data warehouse users have always a name and group associated with them.
Users may have a schema "belong" to them which they then have write access to.
This is useful for system users, mostly, since end users should treat the
data warehouse as read-only.
Users may have a schema "belong" to them which they then have write access to. This is useful
for system users, mostly, since end users should treat the data warehouse as read-only.
"""

def __init__(self, user_info) -> None:
self.name = user_info["name"]
self.group = user_info["group"]
self.comment = user_info.get("comment")
# TODO(tom): Support an array of group names using "#/$defs/identifier_list".
if "group" in user_info:
# Backward compatibility: support single group "group".
self.group = user_info["group"]
else:
# Forward compatibility: for now, just pick the only element from the list.
[self.group] = user_info["groups"]
self.schema = user_info.get("schema")


Expand All @@ -29,13 +35,16 @@ class S3DataFormat:
Enable specifying a data format (and options) for all source tables from one schema.
The expected format in the configuration file is something like this:
"s3_data_format": {
"format": "JSON",
"compression": "GZIP"
}
>>> sdf = S3DataFormat({
... "s3_data_format": {
... "format": "JSON",
... "compression": "GZIP"
... }
... })
"""

def __init__(self, s3_data_format) -> None:
def __init__(self, s3_data_format: dict) -> None:
self.format = s3_data_format.get("format")
self.format_option = s3_data_format.get("format_option")
self.compression = s3_data_format.get("compression")
Expand Down Expand Up @@ -179,56 +188,77 @@ def __init__(self, settings) -> None:
self._admin_access = dw_settings["admin_access"]
self._etl_access = dw_settings["etl_access"]
self._check_access_to_cluster()
root = DataWarehouseUser(dw_settings["owner"])
# Users are in the order from the config
other_users = [
DataWarehouseUser(user) for user in dw_settings.get("users", []) if user["name"] != "default"
]

# Note that the "owner," which is our super-user of sorts, comes first.
self.users = [root] + other_users
schema_owner_map = {u.schema: u.name for u in self.users if u.schema}
self.users = self.parse_users(dw_settings)
schema_owner_map = {user.schema: user.name for user in self.users if user.schema}

# Schemas (upstream sources followed by transformations, keeps order of settings file)
self.schemas = [
DataWarehouseSchema(
dict(info, owner=schema_owner_map.get(info["name"], root.name)), self._etl_access
)
for info in schema_settings
if not info.get("external", False)
]
self.schemas = self.parse_schemas(
filter(lambda info: not info.get("external"), schema_settings), schema_owner_map
)
self._schema_lookup = {schema.name: schema for schema in self.schemas}

# External schemas are kept separate (for example, we don't back them up).
self.external_schemas = [
DataWarehouseSchema(
dict(info, owner=schema_owner_map.get(info["name"], root.name)), self._etl_access
)
for info in schema_settings
if info.get("external", False)
]
self.external_schemas = self.parse_schemas(
filter(lambda info: info.get("external", False), schema_settings), schema_owner_map
)

# Schemas may grant access to groups that have no bootstrapped users, so create all
# mentioned user groups.
other_groups = {u.group for u in other_users} | {
g for schema in self.schemas for g in schema.reader_groups
}
# Schemas may grant access to groups that have no bootstrapped users.
# So we "union" groups mentioned for users and groups mentioned for schemas.
self.groups = sorted(
{user.group for user in self.users}.union(
{group for schema in self.schemas for group in schema.groups}
)
)
self.default_group = self.parse_default_group(dw_settings)

# Groups are in sorted order after the root group
self.groups = [root.group] + sorted(other_groups)
try:
[self.default_group] = [
user["group"] for user in dw_settings["users"] if user["name"] == "default"
]
except ValueError:
raise ETLConfigError("Failed to find group of default user")
# Relation glob patterns indicating unacceptable load failures; matches everything if unset
required_patterns = dw_settings.get("required_for_success", [])
self.required_in_full_load_selector = etl.names.TableSelector(required_patterns)

# Map of SQL types to be able to automatically insert "expressions" into table design files.
self.type_maps = settings["type_maps"]

@staticmethod
def parse_default_group(dw_settings: dict) -> str:
"""Return default group for users based on a user called "default"."""
try:
[user_settings] = [user for user in dw_settings["users"] if user["name"] == "default"]
except ValueError:
raise ETLConfigError("failed to find user 'default'")
return DataWarehouseUser(user_settings).group

def parse_schemas(
self, partial_settings: Iterable[dict], schema_owner_map: dict
) -> List[DataWarehouseSchema]:
"""
Return list of schemas.
A schemas owner is set based on the configuration, tied back to users potentially.
Any schema that is not explicitly claimed belongs to the owner of the data warehouse.
"""
return [
DataWarehouseSchema(
dict(info, owner=schema_owner_map.get(info["name"], self.owner.name)), self._etl_access
)
for info in partial_settings
]

@staticmethod
def parse_users(dw_settings: dict) -> List[DataWarehouseUser]:
"""
Return list of users (with the owner as the first user).
Users are in the order from the config (but skip pseudo user "default").
"""
owner = DataWarehouseUser(dw_settings["owner"])
other_users = [
DataWarehouseUser(user)
for user in dw_settings["users"]
if user["name"] not in (owner.name, "default")
]
# Note that the "owner," which is our super-user of sorts, must always come first.
return [owner] + other_users

def _check_access_to_cluster(self):
"""
Make sure that ETL user and admin may connect and connect to different databases.
Expand Down
19 changes: 15 additions & 4 deletions python/etl/config/settings.schema
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,26 @@
"uniqueItems": true
},
"user_info": {
"type": "object",
"additionalProperties": false,
"not": { "required": [ "group", "groups" ] },
"oneOf": [
{"required": [ "name", "groups" ]},
{"required": [ "name", "group" ]}
],
"properties": {
"name": { "$ref": "#/$defs/identifier" },
"comment": { "type": "string" },
"description": { "type": "string" },
"group": { "$ref": "#/$defs/identifier" },
"groups": {
"items": { "$ref": "#/$defs/identifier" },
"maxItems": 1,
"minItems": 1,
"type": "array"
},
"name": { "$ref": "#/$defs/identifier" },
"schema": { "$ref": "#/$defs/identifier" }
},
"required": [ "name", "group" ],
"additionalProperties": false
"type": "object"
},
"glob_pattern_list": {
"type": "array",
Expand Down
27 changes: 15 additions & 12 deletions python/etl/data_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import etl.config.dw
import etl.db
import etl.dialect.redshift
from etl.config.dw import DataWarehouseSchema
from etl.config.dw import DataWarehouseSchema, DataWarehouseUser
from etl.errors import ETLConfigError, ETLRuntimeError
from etl.text import join_with_single_quotes

Expand Down Expand Up @@ -215,11 +215,14 @@ def revoke_schema_permissions(conn: Connection, schema: DataWarehouseSchema) ->


def create_groups(dry_run=False) -> None:
"""Create all groups from the data warehouse configuration or just those passed in."""
"""
Create all groups from the data warehouse configuration.
This is a callback of a command.
"""
config = etl.config.get_dw_config()
groups = sorted(frozenset(group for schema in config.schemas for group in schema.groups))
with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn:
_create_groups(conn, groups, dry_run=dry_run)
_create_groups(conn, sorted(config.groups), dry_run=dry_run)


def _create_groups(conn: Connection, groups: Iterable[str], dry_run=False) -> None:
Expand All @@ -240,7 +243,7 @@ def _create_groups(conn: Connection, groups: Iterable[str], dry_run=False) -> No
etl.db.create_group(conn, group)


def _create_or_update_user(conn: Connection, user, only_update=False, dry_run=False):
def _create_or_update_user(conn: Connection, user: DataWarehouseUser, only_update=False, dry_run=False):
"""
Create user in its group, or add user to its group.
Expand All @@ -264,14 +267,14 @@ def _create_or_update_user(conn: Connection, user, only_update=False, dry_run=Fa
etl.db.create_user(conn, user.name, user.group)


def _create_schema_for_user(conn, user, etl_group, dry_run=False):
def _create_schema_for_user(conn: Connection, user: DataWarehouseUser, etl_group: str, dry_run=False):
user_schema = etl.config.dw.DataWarehouseSchema(
{"name": user.schema, "owner": user.name, "readers": [user.group, etl_group]}
)
create_schema_and_grant_access(conn, user_schema, owner=user.name, dry_run=dry_run)


def _update_search_path(conn, user, dry_run=False):
def _update_search_path(conn: Connection, user: DataWarehouseUser, dry_run=False):
"""Non-system users have their schema in the search path, others get nothing (only "public")."""
search_path = ["public"]
if user.schema == user.name:
Expand All @@ -283,7 +286,7 @@ def _update_search_path(conn, user, dry_run=False):
etl.db.alter_search_path(conn, user.name, search_path)


def initial_setup(with_user_creation=False, force=False, dry_run=False):
def initial_setup(with_user_creation=False, force=False, dry_run=False) -> None:
"""
Place named data warehouse database into initial state.
Expand Down Expand Up @@ -346,8 +349,8 @@ def initial_setup(with_user_creation=False, force=False, dry_run=False):


def create_or_update_user(
user_name, group_name=None, add_user_schema=False, only_update=False, dry_run=False
):
user_name: str, group_name=None, add_user_schema=False, only_update=False, dry_run=False
) -> None:
"""
Add new user to cluster or update existing user.
Expand Down Expand Up @@ -415,8 +418,8 @@ def list_users(transpose=False) -> None:
groups[user.group].append(user.name)
rows = [[group, ", ".join(sorted(groups[group]))] for group in sorted(groups)]
else:
header = ["user", "groups"]
rows = [[user.name, user.group] for user in config.users]
header = ["user", "groups", "comment"]
rows = [[user.name, user.group, user.comment or ""] for user in config.users]
print(etl.text.format_lines(rows, header_row=header))


Expand Down
20 changes: 10 additions & 10 deletions python/etl/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,36 +420,36 @@ def _get_encrypted_password(cx, user) -> Optional[str]:
return "md5" + md5.hexdigest()


def create_user(cx, user, group):
def create_user(cx: Connection, user: str, group: str) -> None:
password = _get_encrypted_password(cx, user)
if password is None:
logger.warning("Missing entry in PGPASSFILE file for '%s'", user)
# SuppressWarnings: We need the user name in the output to be actionable feedback.
raise ETLRuntimeError(
f"password missing from PGPASSFILE for user '{user}'"
) # lgtm[py/clear-text-logging-sensitive-data]
execute(cx, """CREATE USER "{}" IN GROUP "{}" PASSWORD %s""".format(user, group), (password,))
execute(cx, f"""CREATE USER "{user}" IN GROUP "{group}" PASSWORD %s""", (password,))


def alter_password(cx, user, ignore_missing_password=False):
def alter_password(cx: Connection, user: str, ignore_missing_password=False) -> None:
password = _get_encrypted_password(cx, user)
if password is None:
logger.warning("Failed to find password in PGPASSFILE for '%s'", user)
if not ignore_missing_password:
raise ETLRuntimeError("password missing from PGPASSFILE for user '{}'".format(user))
return
execute(cx, """ALTER USER "{}" PASSWORD %s""".format(user), (password,))
execute(cx, f"""ALTER USER "{user}" PASSWORD %s""", (password,))


def alter_group_add_user(cx, group, user):
execute(cx, """ALTER GROUP "{}" ADD USER "{}" """.format(group, user))
def alter_group_add_user(cx: Connection, group: str, user: str) -> None:
execute(cx, f"""ALTER GROUP "{group}" ADD USER "{user}" """)


def alter_search_path(cx, user, schemas):
execute(cx, """ALTER USER "{}" SET SEARCH_PATH TO {}""".format(user, ", ".join(schemas)))
def alter_search_path(cx: Connection, user: str, schemas: Iterable[str]) -> None:
execute(cx, f"""ALTER USER "{user}" SET SEARCH_PATH TO {", ".join(schemas)}""")


def user_exists(cx, user) -> bool:
def user_exists(cx: Connection, user: str) -> bool:
rows = query(
cx,
"""
Expand Down Expand Up @@ -536,7 +536,7 @@ def revoke_all_on_all_tables_in_schema(cx: Connection, schema: str, groups: Iter
# ---- TABLES ----


def relation_kind(cx, schema, table) -> Optional[str]:
def relation_kind(cx: Connection, schema: str, table: str) -> Optional[str]:
"""
Return "kind" of relation, either 'TABLE' or 'VIEW' for relations that actually exist.
Expand Down
Empty file added tests/config/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions tests/config/test_dw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import unittest

import etl.config.dw
import etl.errors


class TestConfigDW(unittest.TestCase):
def setUp(self) -> None:
self.settings = {
"owner": {"name": "etl", "group": "etl"},
"users": [
{
"name": "default",
"group": "some_group",
},
{
"name": "a_user",
"groups": ["a_group"],
},
],
}

def test_parse_default_group(self):
"""Make sure we can load default group."""
found = etl.config.dw.DataWarehouseConfig.parse_default_group(self.settings)
self.assertEqual(found, "some_group")

def test_parse_default_missing_group(self):
"""Make sure we error out if default group is missing."""
settings = {"users": []}
with self.assertRaises(etl.errors.ETLConfigError):
etl.config.dw.DataWarehouseConfig.parse_default_group(settings)

def test_parse_users(self):
"""Make sure we can load users."""
found = etl.config.dw.DataWarehouseConfig.parse_users(self.settings)
self.assertEqual(len(found), 2)
self.assertEqual(found[0].name, "etl")
self.assertEqual(found[0].group, "etl")
self.assertEqual(found[1].name, "a_user")
self.assertEqual(found[1].group, "a_group")

0 comments on commit 76eac1b

Please sign in to comment.