diff --git a/Dockerfile b/Dockerfile index 30a47402c..8d9ecf091 100644 --- a/Dockerfile +++ b/Dockerfile @@ -52,8 +52,8 @@ RUN python3 -m venv "/opt/local/$PROJ_NAME/venv" && \ python3 -m pip install --upgrade pip==20.3.4 --disable-pip-version-check --no-cache-dir && \ python3 -m pip install --requirement /tmp/requirements-dev.txt --disable-pip-version-check --no-cache-dir -# Create an empty .pgpass file to help with create_user and update_user commands. -RUN echo '# Format to set password (used by create_user and update_user): *:5439:*::' > /home/arthur/.pgpass \ +# Create an empty .pgpass file to help with update_user command. +RUN echo '# Format to set password (used by update_user): *:5439:*::' > /home/arthur/.pgpass \ && chmod go= /home/arthur/.pgpass # Note that at runtime we (can or may) mount the local directory here. diff --git a/README.md b/README.md index 2d712fc48..db9ce5379 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ you can simply use `../arthur-redshift-etl/` to find your way back to this ETL c Although the Redshift cluster can be administered using the AWS console and `psql`, some helper scripts will make setting up the cluster consistently much easier. -(See below for `initialize` and `create_user`.) +(See below for `initialize`, `create_groups`, and `create_users`.) Also, add the AWS IAM role that the database owner may assume within Redshift to your settings file so that Redshift has the needed permissions to access the @@ -163,7 +163,8 @@ Don't forget to run `terminate_emr_cluster.sh` when you're done. | Sub-command | Goal | | ---- | ---- | | `initialize` | Create schemas, groups and users | -| `create_user` | Create (or configure) users that are not mentioned in the configuration file | +| `create_groups` | Create groups that are mentioned in the configuration file | +| `create_users` | Create users that are mentioned in the configuration file | ```shell # The commands to setup the data warehouse users and groups or any database is by ADMIN (connected to `dev`) diff --git a/etc/arthur_completion.sh b/etc/arthur_completion.sh index 26dfb3c12..43986753a 100644 --- a/etc/arthur_completion.sh +++ b/etc/arthur_completion.sh @@ -22,13 +22,14 @@ _arthur_completion() create_groups create_index create_schemas - create_user + create_users delete_finished_pipelines design explain extract help initialize + list_users load ls ping diff --git a/python/etl/commands.py b/python/etl/commands.py index 93efb97d2..4d4d0d67b 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -351,7 +351,8 @@ def build_full_parser(prog_name): InitializeSetupCommand, ShowRandomPassword, CreateGroupsCommand, - CreateUserCommand, + CreateUsersCommand, + ListUsersCommand, UpdateUserCommand, RunSqlCommand, # Commands to help with table designs and uploading them @@ -671,7 +672,7 @@ def __init__(self): "Make sure that all groups mentioned in the configuration file actually exist." " (This allows to specify a group (as reader or writer) on a schema when that" " group does not appear with a user and thus may not have been previously" - " created using a 'create_user' call.)", + " created with 'create_users'.)", ) def add_arguments(self, parser): @@ -682,42 +683,57 @@ def callback(self, args): etl.data_warehouse.create_groups(dry_run=args.dry_run) -class CreateUserCommand(SubCommand): +class CreateUsersCommand(SubCommand): def __init__(self): super().__init__( - "create_user", - "add new user", - "Add new user and set group membership, optionally create a personal schema." - " It is ok to re-initialize a user defined in a settings file." - " Note that you have to set a password for the user in your '~/.pgpass' file" - " before invoking this command. The password must be valid in Redshift," - " so must contain upper-case and lower-case characters as well as numbers.", + "create_users", + "add users", + "Add users and set group membership." + " It is ok to re-initialize a user defined in a settings file.", + # Old command name that we want to phase out: + aliases=["create_user"], ) def add_arguments(self, parser): add_standard_arguments(parser, ["dry-run"]) - parser.add_argument("-g", "--group", help="add user to specified group") + parser.add_argument("-g", "--group", help="DEPRECATED (specify group in configuration file)") parser.add_argument( - "-a", "--add-user-schema", help="add new schema, writable for the user", action="store_true" + "-a", "--add-user-schema", help="DEPRECATED (use 'update_user' instead)", action="store_true" ) - parser.add_argument("username", help="name for new user") + parser.add_argument("name", help="name of user", nargs="*") def callback(self, args): + if args.group: + logger.warning("Ignoring specified group, using configuration instead") + if args.add_user_schema: + logger.warning("Ignoring request to add user schema, use 'update_user' instead.") + with etl.db.log_error(): - etl.data_warehouse.create_new_user( - args.username, - group=args.group, - add_user_schema=args.add_user_schema, - dry_run=args.dry_run, - ) + etl.data_warehouse.create_users(args.name, dry_run=args.dry_run) + + +class ListUsersCommand(SubCommand): + def __init__(self): + super().__init__( + "list_users", + "list users as they are configured", + "List all users and their groups in the way that they are configurd.", + ) + + def add_arguments(self, parser): + parser.add_argument("-t", "--transpose", help="group list by user's groups", action="store_true") + + def callback(self, args): + with etl.db.log_error(): + etl.data_warehouse.list_users(transpose=args.transpose) class UpdateUserCommand(SubCommand): def __init__(self): super().__init__( "update_user", - "update user's group, password, and path", - "For an existing user, update group membership, password, and search path." + "update user's password, their schema and search path", + "For an existing user, update password, create a schema, and update the search path." " Note that you have to set a password for the user in your '~/.pgpass' file" " before invoking this command if you want to update the password. The password must" " be valid in Redshift, so must contain upper-case and lower-case characters as well" @@ -726,17 +742,19 @@ def __init__(self): def add_arguments(self, parser): add_standard_arguments(parser, ["dry-run"]) - parser.add_argument("-g", "--group", help="add user to specified group") + parser.add_argument("-g", "--group", help="DEPRECATED (specify group in configuration file)") parser.add_argument( "-a", "--add-user-schema", help="add new schema, writable for the user", action="store_true" ) - parser.add_argument("username", help="name of existing user") + parser.add_argument("name", help="name of user") def callback(self, args): + if args.group: + logger.warning("Ignoring specified group, using configuration instead") + with etl.db.log_error(): etl.data_warehouse.update_user( - args.username, - group=args.group, + args.name, add_user_schema=args.add_user_schema, dry_run=args.dry_run, ) diff --git a/python/etl/config/__init__.py b/python/etl/config/__init__.py index b34a6ea2c..37a69851a 100644 --- a/python/etl/config/__init__.py +++ b/python/etl/config/__init__.py @@ -116,9 +116,8 @@ def set_safe_config_value(name: str, value: str) -> None: def get_config_map() -> Dict[str, str]: if _mapped_config is None: return {} - else: - # Since the mapped config is flattened, we don't worry about a deep copy here. - return dict(_mapped_config) + # Since the mapped config is flattened, we don't worry about a deep copy here. + return dict(_mapped_config) def _flatten_hierarchy(prefix, props): diff --git a/python/etl/config/default_settings.yaml b/python/etl/config/default_settings.yaml index 94442fb79..2e69448a3 100644 --- a/python/etl/config/default_settings.yaml +++ b/python/etl/config/default_settings.yaml @@ -34,7 +34,7 @@ { # Default group specified as group of pseudo-user "default" "name": "default", - "group": "analyst_ro" + "group": "analyst" } ] }, diff --git a/python/etl/config/dw.py b/python/etl/config/dw.py index 9821d531a..5c53bdb39 100644 --- a/python/etl/config/dw.py +++ b/python/etl/config/dw.py @@ -1,6 +1,6 @@ """Data warehouse configuration based on config files for setup, sources, transformations, users.""" -from typing import Dict +from typing import Dict, List import etl.config.env import etl.db @@ -11,16 +11,20 @@ class DataWarehouseUser: """ - Data warehouse users have always a name and group associated with them. + Data warehouse users have always a name and a list of groups associated with them. - Users may have a schema "belong" to them which they then have write access to. - This is useful for system users, mostly, since end users should treat the - data warehouse as read-only. + Users may have a schema "belong" to them which they then have write access to. This is useful + for system users, mostly, since end users should treat the data warehouse as read-only. """ def __init__(self, user_info): self.name = user_info["name"] - self.group = user_info["group"] + if "group" in user_info: + self.groups = [user_info["group"]] + elif "groups" in user_info: + self.groups = user_info["groups"] + else: + self.groups = [] self.schema = user_info.get("schema") @@ -155,22 +159,22 @@ def dsn(self): return etl.db.parse_connection_string(etl.config.env.get(self._dsn_env_var)) @property - def groups(self): + def groups(self) -> List[str]: return self.reader_groups + self.writer_groups @property - def backup_name(self): + def backup_name(self) -> str: return etl.names.as_backup_name(self.name) @property - def staging_name(self): + def staging_name(self) -> str: return etl.names.as_staging_name(self.name) class DataWarehouseConfig: """Pretty interface to create objects from the settings files.""" - def __init__(self, settings): + def __init__(self, settings) -> None: dw_settings = settings["data_warehouse"] schema_settings = settings.get("sources", []) + dw_settings.get("transformations", []) @@ -178,49 +182,29 @@ def __init__(self, settings): self._admin_access = dw_settings["admin_access"] self._etl_access = dw_settings["etl_access"] self._check_access_to_cluster() - root = DataWarehouseUser(dw_settings["owner"]) - # Users are in the order from the config - other_users = [ - DataWarehouseUser(user) for user in dw_settings.get("users", []) if user["name"] != "default" - ] - # Note that the "owner," which is our super-user of sorts, comes first. - self.users = [root] + other_users - schema_owner_map = {u.schema: u.name for u in self.users if u.schema} + self.users = self._parse_users(dw_settings) + schema_owner_map = {user.schema: user.name for user in self.users if user.schema} # Schemas (upstream sources followed by transformations, keeps order of settings file) - self.schemas = [ - DataWarehouseSchema( - dict(info, owner=schema_owner_map.get(info["name"], root.name)), self._etl_access - ) - for info in schema_settings - if not info.get("external", False) - ] + self.schemas = self._parse_schemas( + filter(lambda info: not info.get("external"), schema_settings), schema_owner_map + ) self._schema_lookup = {schema.name: schema for schema in self.schemas} # External schemas are kept separate (for example, we don't back them up). - self.external_schemas = [ - DataWarehouseSchema( - dict(info, owner=schema_owner_map.get(info["name"], root.name)), self._etl_access - ) - for info in schema_settings - if info.get("external", False) - ] - - # Schemas may grant access to groups that have no bootstrapped users, so create all - # mentioned user groups. - other_groups = {u.group for u in other_users} | { - g for schema in self.schemas for g in schema.reader_groups - } + self.external_schemas = self._parse_schemas( + filter(lambda info: info.get("external", False), schema_settings), schema_owner_map + ) - # Groups are in sorted order after the root group - self.groups = [root.group] + sorted(other_groups) - try: - [self.default_group] = [ - user["group"] for user in dw_settings["users"] if user["name"] == "default" - ] - except ValueError: - raise ETLConfigError("Failed to find group of default user") + # Schemas may grant access to groups that have no bootstrapped users. + # So we "union" groups mentioned for users and groups mentioned for schemas. + self.groups = sorted( + {group for user in self.users for group in user.groups}.union( + {group for schema in self.schemas for group in schema.groups} + ) + ) + self.default_group = self._parse_default_group(dw_settings) # Relation glob patterns indicating unacceptable load failures; matches everything if unset required_patterns = dw_settings.get("required_for_success", []) self.required_in_full_load_selector = etl.names.TableSelector(required_patterns) @@ -228,6 +212,41 @@ def __init__(self, settings): # Map of SQL types to be able to automatically insert "expressions" into table design files. self.type_maps = settings["type_maps"] + def _parse_default_group(self, dw_settings) -> str: + """Return default group based on a user called "default".""" + try: + [default_user] = [user for user in dw_settings["users"] if user["name"] == "default"] + except ValueError: + raise ETLConfigError("failed to find user 'default'") + try: + return default_user["group"] + except KeyError: + raise ETLConfigError("Failed to find 'group' for user 'default'") + + def _parse_schemas(self, partial_settings, schema_owner_map) -> List[DataWarehouseSchema]: + # Any schema that is not explicitly claimed belongs to the owner. + return [ + DataWarehouseSchema( + dict(info, owner=schema_owner_map.get(info["name"], self.owner.name)), self._etl_access + ) + for info in partial_settings + ] + + def _parse_users(self, dw_settings) -> List[DataWarehouseUser]: + """ + Return list of users (with the owner as the first user). + + Users are in the order from the config (but skip pseudo user "default"). + """ + owner = DataWarehouseUser(dw_settings["owner"]) + other_users = [ + DataWarehouseUser(user) + for user in dw_settings["users"] + if user["name"] not in (owner.name, "default") + ] + # Note that the "owner," which is our super-user of sorts, must always come first. + return [owner] + other_users + def _check_access_to_cluster(self): """ Make sure that ETL user and admin may connect and connect to different databases. diff --git a/python/etl/config/settings.schema b/python/etl/config/settings.schema index 9a18a2849..9fed1af08 100644 --- a/python/etl/config/settings.schema +++ b/python/etl/config/settings.schema @@ -17,15 +17,17 @@ "minItems": 1 }, "user_info": { - "type": "object", + "additionalProperties": false, + "required": [ "name" ], + "not": { "required": [ "group", "groups" ] }, "properties": { "name": { "$ref": "#/$defs/identifier" }, "description": { "type": "string" }, "group": { "$ref": "#/$defs/identifier" }, + "groups": { "$ref": "#/$defs/identifier_list" }, "schema": { "$ref": "#/$defs/identifier" } }, - "required": [ "name", "group" ], - "additionalProperties": false + "type": "object" }, "glob_pattern_list": { "type": "array", diff --git a/python/etl/data_warehouse.py b/python/etl/data_warehouse.py index 4b6ef3053..c5cd7094d 100755 --- a/python/etl/data_warehouse.py +++ b/python/etl/data_warehouse.py @@ -13,6 +13,7 @@ """ import logging +from collections import defaultdict from contextlib import closing from typing import Iterable, Sequence @@ -22,7 +23,7 @@ import etl.config.dw import etl.db import etl.dialect.redshift -from etl.config.dw import DataWarehouseSchema +from etl.config.dw import DataWarehouseSchema, DataWarehouseUser from etl.errors import ETLConfigError, ETLRuntimeError from etl.text import join_with_single_quotes @@ -49,6 +50,8 @@ def create_schemas(schemas: Iterable[DataWarehouseSchema], use_staging=False, dr Create schemas and grant access. It's ok if any of the schemas already exist, in which case the owner and privileges are updated. + + This is a callback for a command. """ dsn_etl = etl.config.get_dw_config().dsn_etl with closing(etl.db.connection(dsn_etl, autocommit=True, readonly=dry_run)) as conn: @@ -56,7 +59,9 @@ def create_schemas(schemas: Iterable[DataWarehouseSchema], use_staging=False, dr create_schema_and_grant_access(conn, schema, use_staging=use_staging, dry_run=dry_run) -def create_external_schema_and_grant_access(conn, schema, dry_run=False) -> None: +def create_external_schema_and_grant_access( + conn: Connection, schema: DataWarehouseSchema, dry_run=False +) -> None: # TODO(tom): How do we make the ETL the owner of this schema? if not schema.database or not schema.iam_role: logger.warning("External schema '%s' is missing database name and IAM role", schema.name) @@ -75,7 +80,9 @@ def create_external_schema_and_grant_access(conn, schema, dry_run=False) -> None etl.db.grant_usage(conn, schema.name, schema.reader_groups) -def create_schema_and_grant_access(conn, schema, owner=None, use_staging=False, dry_run=False) -> None: +def create_schema_and_grant_access( + conn: Connection, schema: DataWarehouseSchema, owner=None, use_staging=False, dry_run=False +) -> None: group_names = join_with_single_quotes(schema.groups) name = schema.staging_name if use_staging else schema.name if dry_run: @@ -177,14 +184,19 @@ def restore_schemas(schemas: Iterable[DataWarehouseSchema], dry_run=False) -> No """ For the schemas that we need or want, rename the backups and restore access. - This is the inverse of backup_schemas. - Useful if bad data is in standard schemas + This is the inverse of backup_schemas. Useful if bad data is in standard schemas + + This is a callback for a command. """ _promote_schemas(schemas, "backup", dry_run=dry_run) def publish_schemas(schemas: Sequence[DataWarehouseSchema], dry_run=False) -> None: - """Backup current occupants of standard position and put staging schemas there.""" + """ + Backup current occupants of standard position and put staging schemas there. + + This is a callback for a command. + """ backup_schemas(schemas, dry_run=dry_run) _promote_schemas(schemas, "staging", dry_run=dry_run) @@ -207,11 +219,14 @@ def revoke_schema_permissions(conn: Connection, schema: DataWarehouseSchema) -> def create_groups(dry_run=False) -> None: - """Create all groups from the data warehouse configuration or just those passed in.""" + """ + Create all groups from the data warehouse configuration. + + This is a callback of a command. + """ config = etl.config.get_dw_config() - groups = sorted(frozenset(group for schema in config.schemas for group in schema.groups)) with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: - _create_groups(conn, groups, dry_run=dry_run) + _create_groups(conn, config.groups, dry_run=dry_run) def _create_groups(conn: Connection, groups: Iterable[str], dry_run=False) -> None: @@ -219,6 +234,7 @@ def _create_groups(conn: Connection, groups: Iterable[str], dry_run=False) -> No with conn: for group in groups: if etl.db.group_exists(conn, group): + logger.info("Skipping group '%s' which already exists", group) continue if dry_run: logger.info("Dry-run: Skipping creating group '%s'", group) @@ -227,50 +243,65 @@ def _create_groups(conn: Connection, groups: Iterable[str], dry_run=False) -> No etl.db.create_group(conn, group) -def _create_or_update_user(conn: Connection, user, only_update=False, dry_run=False): +def _create_user(conn: Connection, user: DataWarehouseUser, dry_run=False): """ - Create user in its group, or add user to its group. + Create user (or skip if user already exists). The connection may point to 'dev' database since users are tied to the cluster, not a database. """ with conn: - if only_update or etl.db.user_exists(conn, user.name): - if dry_run: - logger.info("Dry-run: Skipping adding user '%s' to group '%s'", user.name, user.group) - logger.info("Dry-run: Skipping updating password for user '%s'", user.name) - else: - logger.info("Adding user '%s' to group '%s'", user.name, user.group) - etl.db.alter_group_add_user(conn, user.group, user.name) - logger.info("Updating password for user '%s'", user.name) - etl.db.alter_password(conn, user.name, ignore_missing_password=True) - else: - if dry_run: - logger.info("Dry-run: Skipping creating user '%s' in group '%s'", user.name, user.group) - else: - logger.info("Creating user '%s' in group '%s'", user.name, user.group) - etl.db.create_user(conn, user.name, user.group) + if etl.db.user_exists(conn, user.name): + logger.info("User '%s' already exists", user.name) + return + + if dry_run: + logger.info("Dry-run: Skipping creating user '%s'", user.name) + return + logger.info("Creating user '%s'", user.name) + etl.db.create_user(conn, user.name) + + +def _update_user_groups(conn: Connection, user: DataWarehouseUser, dry_run=False): + """ + Add user to the groups per configuration. + + NOTE That we currently do not remove users from groups, so we do not override changes. + """ + with conn: + if dry_run: + logger.info( + "Dry-run: Skipping adding user '%s' to group(s): %s", + user.name, + join_with_single_quotes(user.groups), + ) + return + logger.info("Adding user '%s' to group(s): %s", user.name, join_with_single_quotes(user.groups)) + for group in user.groups: + etl.db.alter_group_add_user(conn, group, user.name) -def _create_schema_for_user(conn, user, etl_group, dry_run=False): + +def _create_schema_for_user(conn: Connection, user: DataWarehouseUser, etl_group: str, dry_run=False): + groups = user.groups + [etl_group] user_schema = etl.config.dw.DataWarehouseSchema( - {"name": user.schema, "owner": user.name, "readers": [user.group, etl_group]} + {"name": user.schema, "owner": user.name, "readers": groups} ) create_schema_and_grant_access(conn, user_schema, owner=user.name, dry_run=dry_run) -def _update_search_path(conn, user, dry_run=False): +def _update_search_path(conn: Connection, user: DataWarehouseUser, dry_run=False) -> None: """Non-system users have their schema in the search path, others get nothing (only "public").""" search_path = ["public"] if user.schema == user.name: search_path[:0] = ["'$user'"] # needs to be quoted per documentation if dry_run: logger.info("Dry-run: Skipping setting search path for user '%s' to: %s", user.name, search_path) - else: - logger.info("Setting search path for user '%s' to: %s", user.name, search_path) - etl.db.alter_search_path(conn, user.name, search_path) + return + logger.info("Setting search path for user '%s' to: %s", user.name, search_path) + etl.db.alter_search_path(conn, user.name, search_path) -def initial_setup(with_user_creation=False, force=False, dry_run=False): +def initial_setup(with_user_creation=False, force=False, dry_run=False) -> None: """ Place named data warehouse database into initial state. @@ -278,6 +309,8 @@ def initial_setup(with_user_creation=False, force=False, dry_run=False): You have to set `force` to true if the name of the database doesn't start with 'validation'. Optionally use `with_user_creation` flag to create users and groups. + + This is a callback of a command. """ config = etl.config.get_dw_config() try: @@ -294,96 +327,148 @@ def initial_setup(with_user_creation=False, force=False, dry_run=False): "Refused to initialize non-validation database '%s' without the --force option" % database_name ) - # Create all defined users which includes the ETL user needed before next step (so that - # database is owned by ETL). Also create all groups referenced in the configuration. + # Create the ETL user so that it can own the database. if with_user_creation: - groups = sorted(frozenset(group for schema in config.schemas for group in schema.groups)) with closing(etl.db.connection(config.dsn_admin, readonly=dry_run)) as conn: - _create_groups(conn, groups, dry_run=dry_run) - for user in config.users: - _create_or_update_user(conn, user, dry_run=dry_run) + _create_groups(conn, config.owner.groups, dry_run=dry_run) + _create_user(conn, config.owner, dry_run=dry_run) + _update_user_groups(conn, config.owner, dry_run=dry_run) - owner_name = config.owner.name if dry_run: logger.info( "Dry-run: Skipping drop and create of database '%s' with owner '%s'", database_name, - owner_name, + config.owner.name, ) else: with closing(etl.db.connection(config.dsn_admin, autocommit=True)) as conn: - logger.info("Dropping and creating database '%s' with owner '%s'", database_name, owner_name) - etl.db.drop_and_create_database(conn, database_name, owner_name) + logger.info( + "Dropping and creating database '%s' with owner '%s'", database_name, config.owner_name + ) + etl.db.drop_and_create_database(conn, database_name, config.owner_name) - with closing( - etl.db.connection(config.dsn_admin_on_etl_db, autocommit=True, readonly=dry_run) - ) as conn: + with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: if dry_run: logger.info("Dry-run: Skipping dropping of PUBLIC schema in '%s'", database_name) else: logger.info("Dropping PUBLIC schema in '%s'", database_name) - etl.db.drop_schema(conn, "PUBLIC") + with conn: + etl.db.drop_schema(conn, "PUBLIC") + if with_user_creation: + non_owner_groups = [group for group in config.groups if group != config.owner.groups[0]] + _create_groups(conn, non_owner_groups, dry_run=dry_run) for user in config.users: + if user.name != config.owner.name: + _create_user(conn, user, dry_run=dry_run) + if user.groups: + _update_user_groups(conn, user, dry_run=dry_run) if user.schema: - _create_schema_for_user(conn, user, config.groups[0], dry_run=dry_run) + _create_schema_for_user(conn, user, config.owner.groups[0], dry_run=dry_run) _update_search_path(conn, user, dry_run=dry_run) -def create_or_update_user( - user_name, group_name=None, add_user_schema=False, only_update=False, dry_run=False -): +def _create_or_update_user( + conn: Connection, user: DataWarehouseUser, add_user_schema=False, dry_run=False +) -> None: """ Add new user to cluster or update existing user. - Either pick a group or accept the default group (from settings). - If the group does not yet exist, then we create the user's group here. - + If their group does not yet exist, then we create the user's groups here. If so advised, creates a schema for the user, making sure that the ETL user keeps read access via its group. So this assumes that the connection string points to the ETL database, not 'dev'. + If the user has a schema configured, then that is always created. + """ + config = etl.config.get_dw_config() + + _create_groups(conn, user.groups, dry_run=dry_run) + _create_user(conn, user, dry_run=dry_run) + if user.groups: + _update_user_groups(conn, user, dry_run=dry_run) + if not add_user_schema: + return + + # This allows to add a schema with the same name as the user without configuring it. + schema_user = DataWarehouseUser( + { + "name": user.name, + "groups": user.groups, + "schema": user.name if not user.schema else user.schema, + } + ) + with conn: + _create_schema_for_user(conn, schema_user, config.owner.groups[0], dry_run=dry_run) + _update_search_path(conn, schema_user, dry_run=dry_run) + + +def create_users(users: Sequence[str], dry_run=False) -> None: + """ + Create all users that are defined in configuration files. + + This is a callback of a command. + """ + config = etl.config.get_dw_config() + known_names = [user.name for user in config.users] + missing_names = frozenset(users).difference(known_names) + if not users: + # None selected means create all. + selected = config.users + elif not missing_names: + selected = [user for user in config.users if user.name in users] + else: + raise ETLConfigError(f"users: {join_with_single_quotes(missing_names)} are not configured") + + with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: + for user in selected: + _create_or_update_user(conn, user, dry_run=dry_run) + + +def update_user(user_name: str, add_user_schema=False, dry_run=False) -> None: + """ + Update a current user and optionally add a schema for their use. + + This is a callback of a command. """ config = etl.config.get_dw_config() - # Find user in the list of pre-defined users or create new user instance with default settings + # Find user in the list of pre-defined users. for user in config.users: if user.name == user_name: break else: - info = {"name": user_name, "group": group_name or config.default_group} - if add_user_schema: - info["schema"] = user_name - user = etl.config.dw.DataWarehouseUser(info) - - if user.name == "default": - raise ValueError("illegal user name '%s'" % user.name) - if user.group not in config.groups and user.group != config.default_group: - raise ValueError("specified group ('%s') not present in DataWarehouseConfig" % user.group) + raise ETLConfigError(f"user '{user_name}' has not been configured") with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: - _create_groups(conn, [user.group], dry_run=dry_run) - _create_or_update_user(conn, user, only_update=only_update, dry_run=dry_run) - - with conn: - if add_user_schema: - _create_schema_for_user(conn, user, config.groups[0], dry_run=dry_run) - elif user.schema is not None: - logger.warning( - "User '%s' has schema '%s' configured but adding that was not requested", - user.name, - user.schema, - ) - _update_search_path(conn, user, dry_run=dry_run) - - -def create_new_user(new_user, group=None, add_user_schema=False, dry_run=False): - create_or_update_user( - new_user, group, add_user_schema=add_user_schema, only_update=False, dry_run=dry_run - ) + _create_or_update_user(conn, user, add_user_schema=add_user_schema, dry_run=dry_run) + + if dry_run: + logger.info("Dry-run: Skipping updating password for user '%s'", user.name) + return + logger.info("Updating password for user '%s'", user.name) + with closing( + etl.db.connection(config.dsn_admin_on_etl_db, autocommit=True, readonly=dry_run) + ) as conn: + etl.db.alter_password(conn, user.name, ignore_missing_password=True) -def update_user(old_user, group=None, add_user_schema=False, dry_run=False): - create_or_update_user( - old_user, group, add_user_schema=add_user_schema, only_update=True, dry_run=dry_run - ) + +def list_users(transpose=False) -> None: + """ + List all users and their groups (or list all groups and their users if transposed). + + This is a callback of a command. + """ + config = etl.config.get_dw_config() + if transpose: + header = ["group", "users"] + groups = defaultdict(list) + for user in config.users: + for group in user.groups: + groups[group].append(user.name) + rows = [[group, ", ".join(sorted(groups[group]))] for group in sorted(groups)] + else: + header = ["user", "groups"] + rows = [[user.name, ", ".join(user.groups)] for user in config.users] + print(etl.text.format_lines(rows, header_row=header)) def list_open_transactions(cx): @@ -436,7 +521,11 @@ def terminate_sessions_with_transaction_locks(cx, dry_run=False) -> None: def terminate_sessions(dry_run=False) -> None: - """Terminate sessions that currently hold locks on (user or system) tables.""" + """ + Terminate sessions that currently hold locks on (user or system) tables. + + This is a callback for a command. + """ dsn_admin = etl.config.get_dw_config().dsn_admin_on_etl_db with closing(etl.db.connection(dsn_admin, autocommit=True)) as conn: etl.db.execute(conn, "SET query_group TO 'superuser'") diff --git a/python/etl/db.py b/python/etl/db.py index bfdc8678e..b5b936096 100755 --- a/python/etl/db.py +++ b/python/etl/db.py @@ -420,7 +420,7 @@ def _get_encrypted_password(cx, user) -> Optional[str]: return "md5" + md5.hexdigest() -def create_user(cx, user, group): +def create_user(cx, user): password = _get_encrypted_password(cx, user) if password is None: logger.warning("Missing entry in PGPASSFILE file for '%s'", user) @@ -428,7 +428,7 @@ def create_user(cx, user, group): raise ETLRuntimeError( f"password missing from PGPASSFILE for user '{user}'" ) # lgtm[py/clear-text-logging-sensitive-data] - execute(cx, """CREATE USER "{}" IN GROUP "{}" PASSWORD %s""".format(user, group), (password,)) + execute(cx, """CREATE USER "{}" PASSWORD %s""".format(user), (password,)) def alter_password(cx, user, ignore_missing_password=False):