Skip to content

Commit

Permalink
fix(pg-cdc): fix role-based permission for rds (#20347) (#20384)
Browse files Browse the repository at this point in the history
Co-authored-by: Kexiang Wang <[email protected]>
  • Loading branch information
github-actions[bot] and KeXiangWang authored Feb 6, 2025
1 parent 7e07888 commit 45e317e
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class DbzConnectorConfig {
public static final String PG_PUB_CREATE = "publication.create.enable";
public static final String PG_SCHEMA_NAME = "schema.name";
public static final String PG_SSL_ROOT_CERT = "ssl.root.cert";
public static final String PG_TEST_ONLY_FORCE_RDS = "test.only.force.rds";

/* Sql Server configs */
public static final String SQL_SERVER_SCHEMA_NAME = "schema.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ public PostgresValidator(
var password = userProps.get(DbzConnectorConfig.PASSWORD);
this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password);

this.isAwsRds = dbHost.contains(AWS_RDS_HOST);
this.isAwsRds =
dbHost.contains(AWS_RDS_HOST)
|| userProps
.getOrDefault(DbzConnectorConfig.PG_TEST_ONLY_FORCE_RDS, "false")
.equalsIgnoreCase("true");
this.dbName = dbName;
this.user = user;
this.schemaName = userProps.get(DbzConnectorConfig.PG_SCHEMA_NAME);
Expand All @@ -86,8 +90,8 @@ public PostgresValidator(
@Override
public void validateDbConfig() {
try {
if (pgVersion > 16) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16.");
if (pgVersion >= 17) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 17.");
}

try (var stmt = jdbcConnection.createStatement()) {
Expand Down Expand Up @@ -254,24 +258,18 @@ private void validatePrivileges() throws SQLException {
boolean isSuperUser = false;
if (this.isAwsRds) {
// check privileges for aws rds postgres
boolean hasReplicationRole;
boolean hasReplicationRole = false;
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.rds.role.check"))) {
stmt.setString(1, this.user);
stmt.setString(2, this.user);
var res = stmt.executeQuery();
var hashSet = new HashSet<String>();
while (res.next()) {
// check rds_superuser role or rds_replication role is granted
var memberof = res.getArray("memberof");
if (memberof != null) {
var members = (String[]) memberof.getArray();
hashSet.addAll(Arrays.asList(members));
}
LOG.info("rds memberof: {}", hashSet);
isSuperUser = res.getBoolean(1);
hasReplicationRole = res.getBoolean(2);
}
isSuperUser = hashSet.contains("rds_superuser");
hasReplicationRole = hashSet.contains("rds_replication");
}

if (!isSuperUser && !hasReplicationRole) {
Expand Down Expand Up @@ -320,9 +318,8 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.table_read_privilege.check"))) {
stmt.setString(1, this.schemaName);
stmt.setString(2, this.tableName);
stmt.setString(3, this.user);
stmt.setString(1, this.user);
stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name =
postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_slots) = (SELECT setting FROM pg_settings WHERE name='max_replication_slots')::int THEN 'true' ELSE 'false' END AS result;
postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ?
postgres.superuser.check=SELECT rolsuper FROM pg_roles WHERE rolname = ?
postgres.rds.role.check=SELECT pg_has_role(?, 'rds_superuser', 'member') as is_rds_superuser, pg_has_role(?, 'rds_replication', 'member') as is_rds_replication;
postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ?
postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_schema = ? AND table_name = ? AND grantee = ? and privilege_type = 'SELECT'
postgres.table_read_privilege.check=SELECT has_table_privilege(?, ?, 'SELECT')
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
Expand Down Expand Up @@ -52,4 +53,3 @@ sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT
sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn()
sqlserver.case.sensitive=WITH collations AS (SELECT name, CASE WHEN description like '%case-insensitive%' THEN 0 WHEN description like '%case-sensitive%' THEN 1 END isCaseSensitive FROM sys.fn_helpcollations()) SELECT * FROM collations WHERE name = CONVERT(varchar, DATABASEPROPERTYEX( ? ,'collation'));
citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass
postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ?
Loading

0 comments on commit 45e317e

Please sign in to comment.