diff --git a/README.md b/README.md index 29c57453..5a728824 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ you can simply use `../arthur-redshift-etl/` to find your way back to this ETL c Although the Redshift cluster can be administered using the AWS console and `psql`, some helper scripts will make setting up the cluster consistently much easier. -(See below for `initialize` and `create_user`.) +(See below for `initialize`, `create_groups`, and `create_users`.) Also, add the AWS IAM role that the database owner may assume within Redshift to your settings file so that Redshift has the needed permissions to access the @@ -166,7 +166,7 @@ Don't forget to run `terminate_emr_cluster.sh` when you're done. | ---- | ---- | | `initialize` | Create schemas, groups and users | | `create_groups` | Create groups that are mentioned in the configuration file | -| `create_user` | Create (or configure) users that are not mentioned in the configuration file | +| `create_users` | Create users that are mentioned in the configuration file | ```shell # The commands to setup the data warehouse users and groups or any database is by ADMIN (connected to `dev`) diff --git a/etc/arthur_completion.sh b/etc/arthur_completion.sh index 7c022995..82ad631d 100644 --- a/etc/arthur_completion.sh +++ b/etc/arthur_completion.sh @@ -22,7 +22,7 @@ _arthur_completion() create_groups create_index create_schemas - create_user + create_users delete_finished_pipelines design explain diff --git a/python/etl/commands.py b/python/etl/commands.py index a36e9837..d9b71121 100644 --- a/python/etl/commands.py +++ b/python/etl/commands.py @@ -347,7 +347,7 @@ def build_full_parser(prog_name): CreateGroupsCommand, CreateIndexCommand, CreateSchemasCommand, - CreateUserCommand, + CreateUsersCommand, DeleteFinishedPipelinesCommand, ExplainQueryCommand, ExtractToS3Command, @@ -681,7 +681,7 @@ def __init__(self): "Make sure that all groups mentioned in the configuration file actually exist." " (This allows to specify a group (as reader or writer) on a schema when that" " group does not appear with a user and thus may not have been previously" - " created using a 'create_user' call.)", + " created with 'create_users'.)", ) def add_arguments(self, parser): @@ -692,34 +692,35 @@ def callback(self, args): etl.data_warehouse.create_groups(dry_run=args.dry_run) -class CreateUserCommand(SubCommand): +class CreateUsersCommand(SubCommand): def __init__(self): super().__init__( - "create_user", - "add new user", - "Add new user and set group membership, optionally create a personal schema." + "create_users", + "add users to cluster", + "Add users to cluster and set group membership." " It is ok to re-initialize a user defined in a settings file." - " Note that you have to set a password for the user in your '~/.pgpass' file" - " before invoking this command. The password must be valid in Redshift," - " so must contain upper-case and lower-case characters as well as numbers.", + " This will add them to any new groups." + " NOTE we currently do not remove users from groups.", + # Old command name that we want to phase out: + aliases=["create_user"], ) def add_arguments(self, parser): add_standard_arguments(parser, ["dry-run"]) - parser.add_argument("-g", "--group", help="add user to specified group") + parser.add_argument("-g", "--group", help="DEPRECATED (specify group in configuration file)") parser.add_argument( - "-a", "--add-user-schema", help="add new schema, writable for the user", action="store_true" + "-a", "--add-user-schema", help="DEPRECATED (use 'update_user' instead)", action="store_true" ) - parser.add_argument("username", help="name for new user") + parser.add_argument("name", help="name of user", nargs="*") def callback(self, args): + if args.group: + logger.warning("Ignoring specified group, using configuration instead") + if args.add_user_schema: + logger.warning("Ignoring request to add user schema, use 'update_user' instead.") + with etl.db.log_error(): - etl.data_warehouse.create_new_user( - args.username, - group=args.group, - add_user_schema=args.add_user_schema, - dry_run=args.dry_run, - ) + etl.data_warehouse.create_users(args.name, dry_run=args.dry_run) class ListUsersCommand(SubCommand): @@ -731,7 +732,7 @@ def __init__(self): ) def add_arguments(self, parser): - parser.add_argument("-t", "--transpose", help="group list by user's groups", action="store_true") + parser.add_argument("-t", "--transpose", help="group list by users' groups", action="store_true") def callback(self, args): with etl.db.log_error(): @@ -742,8 +743,8 @@ class UpdateUserCommand(SubCommand): def __init__(self): super().__init__( "update_user", - "update user's group, password, and path", - "For an existing user, update group membership, password, and search path." + "update user's password, their schema and search path", + "For an existing user, update password, create a schema, and update the search path." " Note that you have to set a password for the user in your '~/.pgpass' file" " before invoking this command if you want to update the password. The password must" " be valid in Redshift, so must contain upper-case and lower-case characters as well" @@ -752,17 +753,19 @@ def __init__(self): def add_arguments(self, parser): add_standard_arguments(parser, ["dry-run"]) - parser.add_argument("-g", "--group", help="add user to specified group") + parser.add_argument("-g", "--group", help="DEPRECATED (specify group in configuration file)") parser.add_argument( "-a", "--add-user-schema", help="add new schema, writable for the user", action="store_true" ) parser.add_argument("name", help="name of user") def callback(self, args): + if args.group: + logger.warning("Ignoring specified group, using configuration instead") + with etl.db.log_error(): etl.data_warehouse.update_user( args.name, - group=args.group, add_user_schema=args.add_user_schema, dry_run=args.dry_run, ) diff --git a/python/etl/config/dw.py b/python/etl/config/dw.py index 0c5ccb29..e3a39186 100644 --- a/python/etl/config/dw.py +++ b/python/etl/config/dw.py @@ -11,7 +11,7 @@ class DataWarehouseUser: """ - Data warehouse users have always a name and group associated with them. + Data warehouse users have always a name and a list of groups associated with them. Users may have a schema "belong" to them which they then have write access to. This is useful for system users, mostly, since end users should treat the data warehouse as read-only. @@ -20,13 +20,13 @@ class DataWarehouseUser: def __init__(self, user_info) -> None: self.name = user_info["name"] self.comment = user_info.get("comment") - # TODO(tom): Support an array of group names using "#/$defs/identifier_list". if "group" in user_info: # Backward compatibility: support single group "group". - self.group = user_info["group"] + self.groups = [user_info["group"]] + elif "groups" in user_info: + self.groups = user_info["groups"] else: - # Forward compatibility: for now, just pick the only element from the list. - [self.group] = user_info["groups"] + self.groups = [] self.schema = user_info.get("schema") @@ -188,6 +188,7 @@ def __init__(self, settings) -> None: self._admin_access = dw_settings["admin_access"] self._etl_access = dw_settings["etl_access"] self._check_access_to_cluster() + self.users = self.parse_users(dw_settings) schema_owner_map = {user.schema: user.name for user in self.users if user.schema} @@ -205,7 +206,7 @@ def __init__(self, settings) -> None: # Schemas may grant access to groups that have no bootstrapped users. # So we "union" groups mentioned for users and groups mentioned for schemas. self.groups = sorted( - {user.group for user in self.users}.union( + {group for user in self.users for group in user.groups}.union( {group for schema in self.schemas for group in schema.groups} ) ) @@ -220,12 +221,13 @@ def __init__(self, settings) -> None: @staticmethod def parse_default_group(dw_settings: dict) -> str: - """Return default group for users based on a user called "default".""" + """Return default group based on a user called "default".""" try: [user_settings] = [user for user in dw_settings["users"] if user["name"] == "default"] except ValueError: raise ETLConfigError("failed to find user 'default'") - return DataWarehouseUser(user_settings).group + # TODO(tom): make sure there's exactly one group. + return DataWarehouseUser(user_settings).groups[0] def parse_schemas( self, partial_settings: Iterable[dict], schema_owner_map: dict diff --git a/python/etl/config/settings.schema b/python/etl/config/settings.schema index 9c44e0f7..e5d1e1f7 100644 --- a/python/etl/config/settings.schema +++ b/python/etl/config/settings.schema @@ -44,12 +44,7 @@ "comment": { "type": "string" }, "description": { "type": "string" }, "group": { "$ref": "#/$defs/identifier" }, - "groups": { - "items": { "$ref": "#/$defs/identifier" }, - "maxItems": 1, - "minItems": 1, - "type": "array" - }, + "groups": { "$ref": "#/$defs/identifier_list" }, "name": { "$ref": "#/$defs/identifier" }, "schema": { "$ref": "#/$defs/identifier" } }, diff --git a/python/etl/data_warehouse.py b/python/etl/data_warehouse.py index 41ad7c33..979abab6 100755 --- a/python/etl/data_warehouse.py +++ b/python/etl/data_warehouse.py @@ -81,7 +81,7 @@ def create_external_schema_and_grant_access( def create_schema_and_grant_access( - conn: Connection, schema: DataWarehouseSchema, owner: str = None, use_staging=False, dry_run=False + conn: Connection, schema: DataWarehouseSchema, owner=None, use_staging=False, dry_run=False ) -> None: group_names = join_with_single_quotes(schema.groups) name = schema.staging_name if use_staging else schema.name @@ -258,32 +258,46 @@ def _create_groups(conn: Connection, groups: Iterable[str], dry_run=False) -> No ) -def _create_or_update_user(conn: Connection, user: DataWarehouseUser, only_update=False, dry_run=False): +def _create_user(conn: Connection, user: DataWarehouseUser, dry_run=False): """ - Create user in its group, or add user to its group. + Create user (or skip if user already exists). The connection may point to 'dev' database since users are tied to the cluster, not a database. """ with conn: - if only_update or etl.db.user_exists(conn, user.name): - if dry_run: - logger.info("Dry-run: Skipping adding user '%s' to group '%s'", user.name, user.group) - logger.info("Dry-run: Skipping updating password for user '%s'", user.name) - else: - logger.info("Adding user '%s' to group '%s'", user.name, user.group) - etl.db.alter_group_add_user(conn, user.group, user.name) - logger.info("Updating password for user '%s'", user.name) - etl.db.alter_password(conn, user.name, ignore_missing_password=True) - else: - if dry_run: - logger.info("Dry-run: Skipping creating user '%s' in group '%s'", user.name, user.group) - else: - logger.info("Creating user '%s' in group '%s'", user.name, user.group) - etl.db.create_user(conn, user.name, user.group) + if etl.db.user_exists(conn, user.name): + logger.info("User '%s' already exists", user.name) + return + + if dry_run: + logger.info("Dry-run: Skipping creating user '%s'", user.name) + return + logger.info("Creating user '%s'", user.name) + etl.db.create_user(conn, user.name) + + +def _update_user_groups(conn: Connection, user: DataWarehouseUser, dry_run=False): + """ + Add user to the groups per configuration. + + NOTE That we currently do not remove users from groups, so we do not override changes. + """ + with conn: + if dry_run: + logger.info( + "Dry-run: Skipping adding user '%s' to group(s): %s", + user.name, + join_with_single_quotes(user.groups), + ) + return + + logger.info("Adding user '%s' to group(s): %s", user.name, join_with_single_quotes(user.groups)) + for group in user.groups: + etl.db.alter_group_add_user(conn, group, user.name) def _create_schema_for_user(conn: Connection, user: DataWarehouseUser, etl_group: str, dry_run=False): - groups = [user.group, etl_group] + groups = user.groups + [etl_group] user_schema = etl.config.dw.DataWarehouseSchema( {"name": user.schema, "owner": user.name, "readers": groups} ) @@ -333,14 +347,12 @@ def initial_setup(with_user_creation=False, force=False, dry_run=False) -> None: "Refused to initialize non-validation database '%s' without the --force option" % database_name ) - # Create all defined users which includes the ETL user needed before next step (so that - # database is owned by ETL). Also create all groups referenced in the configuration. + # Create the ETL user so that it can own the database. if with_user_creation: - groups = sorted(frozenset(group for schema in config.schemas for group in schema.groups)) with closing(etl.db.connection(config.dsn_admin, readonly=dry_run)) as conn: - _create_groups(conn, groups, dry_run=dry_run) - for user in config.users: - _create_or_update_user(conn, user, dry_run=dry_run) + _create_groups(conn, config.owner.groups, dry_run=dry_run) + _create_user(conn, config.owner, dry_run=dry_run) + _update_user_groups(conn, config.owner, dry_run=dry_run) if dry_run: logger.info( @@ -355,75 +367,108 @@ def initial_setup(with_user_creation=False, force=False, dry_run=False) -> None: ) etl.db.drop_and_create_database(conn, database_name, config.owner.name) - with closing( - etl.db.connection(config.dsn_admin_on_etl_db, autocommit=True, readonly=dry_run) - ) as conn: + with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: if dry_run: logger.info("Dry-run: Skipping dropping of PUBLIC schema in '%s'", database_name) else: logger.info("Dropping PUBLIC schema in '%s'", database_name) - etl.db.drop_schema(conn, "PUBLIC") + with conn: + etl.db.drop_schema(conn, "PUBLIC") + if with_user_creation: + non_owner_groups = [group for group in config.groups if group not in config.owner.groups] + _create_groups(conn, non_owner_groups, dry_run=dry_run) for user in config.users: + if user.name != config.owner.name: + _create_user(conn, user, dry_run=dry_run) + if user.groups: + _update_user_groups(conn, user, dry_run=dry_run) if user.schema: - _create_schema_for_user(conn, user, config.groups[0], dry_run=dry_run) + _create_schema_for_user(conn, user, config.owner.groups[0], dry_run=dry_run) _update_search_path(conn, user, dry_run=dry_run) -def create_or_update_user( - user_name: str, group_name=None, add_user_schema=False, only_update=False, dry_run=False +def _create_or_update_user( + conn: Connection, user: DataWarehouseUser, add_user_schema=False, dry_run=False ) -> None: """ Add new user to cluster or update existing user. - Either pick a group or accept the default group (from settings). - If the group does not yet exist, then we create the user's group here. - + If their group does not yet exist, then we create the user's groups here. If so advised, creates a schema for the user, making sure that the ETL user keeps read access via its group. So this assumes that the connection string points to the ETL database, not 'dev'. + If the user has a schema configured, then that is always created. + """ + config = etl.config.get_dw_config() + + _create_groups(conn, user.groups, dry_run=dry_run) + _create_user(conn, user, dry_run=dry_run) + if user.groups: + _update_user_groups(conn, user, dry_run=dry_run) + if not add_user_schema: + return + + # This allows to add a schema with the same name as the user without configuring it. + schema_user = DataWarehouseUser( + { + "name": user.name, + "groups": user.groups, + "schema": user.name if not user.schema else user.schema, + } + ) + with conn: + _create_schema_for_user(conn, schema_user, config.owner.groups[0], dry_run=dry_run) + _update_search_path(conn, schema_user, dry_run=dry_run) + + +def create_users(users: Sequence[str], dry_run=False) -> None: + """ + Create all users that are defined in configuration files. + + This is a callback of a command. """ config = etl.config.get_dw_config() - # Find user in the list of pre-defined users or create new user instance with default settings + known_names = [user.name for user in config.users] + missing_names = frozenset(users).difference(known_names) + if not users: + # None selected means create all. + selected = config.users + elif not missing_names: + selected = [user for user in config.users if user.name in users] + else: + raise ETLConfigError(f"users: {join_with_single_quotes(missing_names)} are not configured") + + with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: + for user in selected: + _create_or_update_user(conn, user, dry_run=dry_run) + + +def update_user(user_name: str, add_user_schema=False, dry_run=False) -> None: + """ + Update a current user and optionally add a schema for their use. + + This is a callback of a command. + """ + config = etl.config.get_dw_config() + # Find user in the list of pre-defined users. for user in config.users: if user.name == user_name: break else: - info = {"name": user_name, "group": group_name or config.default_group} - if add_user_schema: - info["schema"] = user_name - user = etl.config.dw.DataWarehouseUser(info) - - if user.name == "default": - raise ValueError("illegal user name '%s'" % user.name) - if user.group not in config.groups and user.group != config.default_group: - raise ValueError("specified group ('%s') not present in DataWarehouseConfig" % user.group) + raise ETLConfigError(f"user '{user_name}' has not been configured") with closing(etl.db.connection(config.dsn_admin_on_etl_db, readonly=dry_run)) as conn: - _create_groups(conn, [user.group], dry_run=dry_run) - _create_or_update_user(conn, user, only_update=only_update, dry_run=dry_run) - - with conn: - if add_user_schema: - _create_schema_for_user(conn, user, config.groups[0], dry_run=dry_run) - elif user.schema is not None: - logger.warning( - "User '%s' has schema '%s' configured but adding that was not requested", - user.name, - user.schema, - ) - _update_search_path(conn, user, dry_run=dry_run) - - -def create_new_user(new_user, group=None, add_user_schema=False, dry_run=False): - create_or_update_user( - new_user, group, add_user_schema=add_user_schema, only_update=False, dry_run=dry_run - ) + _create_or_update_user(conn, user, add_user_schema=add_user_schema, dry_run=dry_run) + if dry_run: + logger.info("Dry-run: Skipping updating password for user '%s'", user.name) + return -def update_user(old_user, group=None, add_user_schema=False, dry_run=False): - create_or_update_user( - old_user, group, add_user_schema=add_user_schema, only_update=True, dry_run=dry_run - ) + logger.info("Updating password for user '%s'", user.name) + with closing( + etl.db.connection(config.dsn_admin_on_etl_db, autocommit=True, readonly=dry_run) + ) as conn: + etl.db.alter_password(conn, user.name, ignore_missing_password=True) def list_users(transpose=False) -> None: @@ -437,11 +482,12 @@ def list_users(transpose=False) -> None: header = ["group", "users"] groups = defaultdict(list) for user in config.users: - groups[user.group].append(user.name) + for group in user.groups: + groups[group].append(user.name) rows = [[group, ", ".join(sorted(groups[group]))] for group in sorted(groups)] else: header = ["user", "groups", "comment"] - rows = [[user.name, user.group, user.comment or ""] for user in config.users] + rows = [[user.name, ", ".join(user.groups), user.comment or ""] for user in config.users] print(etl.text.format_lines(rows, header_row=header)) diff --git a/python/etl/db.py b/python/etl/db.py index 2e699c34..5c8adc36 100755 --- a/python/etl/db.py +++ b/python/etl/db.py @@ -420,7 +420,7 @@ def _get_encrypted_password(cx, user) -> Optional[str]: return "md5" + md5.hexdigest() -def create_user(cx: Connection, user: str, group: str) -> None: +def create_user(cx: Connection, user: str) -> None: password = _get_encrypted_password(cx, user) if password is None: logger.warning("Missing entry in PGPASSFILE file for '%s'", user) @@ -428,7 +428,7 @@ def create_user(cx: Connection, user: str, group: str) -> None: raise ETLRuntimeError( f"password missing from PGPASSFILE for user '{user}'" ) # lgtm[py/clear-text-logging-sensitive-data] - execute(cx, f"""CREATE USER "{user}" IN GROUP "{group}" PASSWORD %s""", (password,)) + execute(cx, f"""CREATE USER "{user}" PASSWORD %s""", (password,)) def alter_password(cx: Connection, user: str, ignore_missing_password=False) -> None: diff --git a/tests/config/test_dw.py b/tests/config/test_dw.py index f804fafe..4cfa5fe3 100644 --- a/tests/config/test_dw.py +++ b/tests/config/test_dw.py @@ -36,6 +36,6 @@ def test_parse_users(self): found = etl.config.dw.DataWarehouseConfig.parse_users(self.settings) self.assertEqual(len(found), 2) self.assertEqual(found[0].name, "etl") - self.assertEqual(found[0].group, "etl") + self.assertEqual(found[0].groups, ["etl"]) self.assertEqual(found[1].name, "a_user") - self.assertEqual(found[1].group, "a_group") + self.assertEqual(found[1].groups, ["a_group"])