diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 782efebe75a00..bc092c655fcf8 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -55,6 +55,7 @@ public class DbzConnectorConfig { public static final String PG_PUB_CREATE = "publication.create.enable"; public static final String PG_SCHEMA_NAME = "schema.name"; public static final String PG_SSL_ROOT_CERT = "ssl.root.cert"; + public static final String PG_TEST_ONLY_FORCE_RDS = "test.only.force.rds"; /* Sql Server configs */ public static final String SQL_SERVER_SCHEMA_NAME = "schema.name"; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index cc8cf93c66902..4043e402e3566 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -65,7 +65,11 @@ public PostgresValidator( var password = userProps.get(DbzConnectorConfig.PASSWORD); this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); - this.isAwsRds = dbHost.contains(AWS_RDS_HOST); + this.isAwsRds = + dbHost.contains(AWS_RDS_HOST) + || userProps + .getOrDefault(DbzConnectorConfig.PG_TEST_ONLY_FORCE_RDS, "false") + .equalsIgnoreCase("true"); this.dbName = dbName; this.user = user; this.schemaName = userProps.get(DbzConnectorConfig.PG_SCHEMA_NAME); @@ -86,8 +90,8 @@ public PostgresValidator( @Override public void validateDbConfig() { try { - if (pgVersion > 16) { - throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16."); + if (pgVersion >= 17) { + throw ValidatorUtils.failedPrecondition("Postgres version should be less than 17."); } try (var stmt = jdbcConnection.createStatement()) { @@ -254,24 +258,18 @@ private void validatePrivileges() throws SQLException { boolean isSuperUser = false; if (this.isAwsRds) { // check privileges for aws rds postgres - boolean hasReplicationRole; + boolean hasReplicationRole = false; try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.rds.role.check"))) { stmt.setString(1, this.user); + stmt.setString(2, this.user); var res = stmt.executeQuery(); - var hashSet = new HashSet(); while (res.next()) { // check rds_superuser role or rds_replication role is granted - var memberof = res.getArray("memberof"); - if (memberof != null) { - var members = (String[]) memberof.getArray(); - hashSet.addAll(Arrays.asList(members)); - } - LOG.info("rds memberof: {}", hashSet); + isSuperUser = res.getBoolean(1); + hasReplicationRole = res.getBoolean(2); } - isSuperUser = hashSet.contains("rds_superuser"); - hasReplicationRole = hashSet.contains("rds_replication"); } if (!isSuperUser && !hasReplicationRole) { @@ -320,9 +318,8 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException { try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.table_read_privilege.check"))) { - stmt.setString(1, this.schemaName); - stmt.setString(2, this.tableName); - stmt.setString(3, this.user); + stmt.setString(1, this.user); + stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); var res = stmt.executeQuery(); while (res.next()) { if (!res.getBoolean(1)) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 57091adac4eee..ee7c8facb4353 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -13,8 +13,9 @@ postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = postgres.slot_limit.check=SELECT CASE WHEN (SELECT count(*) FROM pg_replication_slots) = (SELECT setting FROM pg_settings WHERE name='max_replication_slots')::int THEN 'true' ELSE 'false' END AS result; postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ? postgres.superuser.check=SELECT rolsuper FROM pg_roles WHERE rolname = ? +postgres.rds.role.check=SELECT pg_has_role(?, 'rds_superuser', 'member') as is_rds_superuser, pg_has_role(?, 'rds_replication', 'member') as is_rds_replication; postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ? -postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_schema = ? AND table_name = ? AND grantee = ? and privilege_type = 'SELECT' +postgres.table_read_privilege.check=SELECT has_table_privilege(?, ?, 'SELECT') postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ? postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames' postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? @@ -52,4 +53,3 @@ sqlserver.has.perms=SELECT HAS_PERMS_BY_NAME('cdc.' + ct.capture_instance + '_CT sqlserver.sql.agent.enabled=SELECT sys.fn_cdc_get_max_lsn() sqlserver.case.sensitive=WITH collations AS (SELECT name, CASE WHEN description like '%case-insensitive%' THEN 0 WHEN description like '%case-sensitive%' THEN 1 END isCaseSensitive FROM sys.fn_helpcollations()) SELECT * FROM collations WHERE name = CONVERT(varchar, DATABASEPROPERTYEX( ? ,'collation')); citus.distributed_table=select citus_table_type from citus_tables where table_name=?::regclass -postgres.rds.role.check=SELECT r.rolname, r.rolsuper, r.rolinherit, r.rolcreaterole, r.rolcreatedb, r.rolcanlogin, r.rolconnlimit, r.rolvaliduntil, ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid) as memberof , r.rolreplication , r.rolbypassrls FROM pg_catalog.pg_roles r WHERE r.rolname = ? diff --git a/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java b/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java index 801a89107b67f..4d0dad56ac697 100644 --- a/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java +++ b/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/PostgresSourceTest.java @@ -248,6 +248,378 @@ public void testPermissionCheck() throws SQLException { } } + @Test + public void testUserPermissionCheck() throws SQLException { + Connection connPg = SourceTestClient.connect(pgDataSource); + // use rds_replication and rds_superuser to simulate the RDS env + String query = "CREATE ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + // user Postgres creates a superuser debezium + query = "CREATE USER debezium"; + SourceTestClient.performQuery(connPg, query); + query = "ALTER USER debezium WITH PASSWORD '" + pg.getPassword() + "'"; + SourceTestClient.performQuery(connPg, query); + query = + "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; + SourceTestClient.performQuery(connPg, query); + + // user debezium connects to Postgres + DataSource dbzDataSource = + SourceTestClient.getDataSource( + pg.getJdbcUrl(), "debezium", pg.getPassword(), pg.getDriverClassName()); + Connection connDbz = SourceTestClient.connect(dbzDataSource); + + ConnectorServiceProto.TableSchema tableSchema = + ConnectorServiceProto.TableSchema.newBuilder() + .addColumns( + PlanCommon.ColumnDesc.newBuilder() + .setName("o_key") + .setColumnType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT64) + .build()) + .build()) + .addColumns( + PlanCommon.ColumnDesc.newBuilder() + .setName("o_val") + .setColumnType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build()) + .build()) + .addPkIndices(0) + .build(); + + try { + var resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must be superuser or replication role to start walsender.", + resp.getError().getErrorMessage()); + query = "ALTER USER debezium REPLICATION"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have select privilege on table 'public.orders'", + resp.getError().getErrorMessage()); + query = "GRANT SELECT ON orders TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have create privilege on database 'test'", + resp.getError().getErrorMessage()); + + query = "GRANT CREATE ON DATABASE test TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals( + "INVALID_ARGUMENT: Postgres user must be the owner of table 'orders' to create/alter publication", + resp.getError().getErrorMessage()); + + query = "ALTER TABLE orders OWNER TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals("", resp.getError().getErrorMessage()); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals( + "INVALID_ARGUMENT: Postgres user must be superuser or replication role to start walsender.", + resp.getError().getErrorMessage()); + query = "GRANT rds_replication TO debezium"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + query = "REVOKE rds_replication FROM dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "GRANT rds_superuser TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + } catch (Exception e) { + Assert.fail("validate rpc fail: " + e.getMessage()); + } finally { + // cleanup + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); + SourceTestClient.performQuery(connPg, query); + query = "DROP OWNED BY debezium"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + query = "DROP USER debezium"; + SourceTestClient.performQuery(connPg, query); + connDbz.close(); + connPg.close(); + } + } + + // Note: Group is the older version of Role. Normally, only roles should be used in production. + @Test + public void testGroupPermissionCheck() throws SQLException { + // use rds_replication and rds_superuser to simulate the RDS env + Connection connPg = SourceTestClient.connect(pgDataSource); + String query = "CREATE ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + // user Postgres creates a superuser debezium + query = "CREATE USER debezium"; + SourceTestClient.performQuery(connPg, query); + query = "ALTER USER debezium REPLICATION"; + SourceTestClient.performQuery(connPg, query); + query = "ALTER USER debezium WITH PASSWORD '" + pg.getPassword() + "'"; + SourceTestClient.performQuery(connPg, query); + query = "CREATE GROUP dbz_group WITH USER debezium"; + SourceTestClient.performQuery(connPg, query); + query = + "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; + SourceTestClient.performQuery(connPg, query); + + // user debezium connects to Postgres + DataSource dbzDataSource = + SourceTestClient.getDataSource( + pg.getJdbcUrl(), "debezium", pg.getPassword(), pg.getDriverClassName()); + Connection connDbz = SourceTestClient.connect(dbzDataSource); + + ConnectorServiceProto.TableSchema tableSchema = + ConnectorServiceProto.TableSchema.newBuilder() + .addColumns( + PlanCommon.ColumnDesc.newBuilder() + .setName("o_key") + .setColumnType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT64) + .build()) + .build()) + .addColumns( + PlanCommon.ColumnDesc.newBuilder() + .setName("o_val") + .setColumnType( + Data.DataType.newBuilder() + .setTypeName(Data.DataType.TypeName.INT32) + .build()) + .build()) + .addPkIndices(0) + .build(); + + try { + var resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have select privilege on table 'public.orders'", + resp.getError().getErrorMessage()); + query = "GRANT SELECT ON orders TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + assertEquals( + "INVALID_ARGUMENT: Postgres user must have create privilege on database 'test'", + resp.getError().getErrorMessage()); + + query = "GRANT CREATE ON DATABASE test TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals( + "INVALID_ARGUMENT: Postgres user must be the owner of table 'orders' to create/alter publication", + resp.getError().getErrorMessage()); + + query = "ALTER TABLE orders OWNER TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + + assertEquals("", resp.getError().getErrorMessage()); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals( + "INVALID_ARGUMENT: Postgres user must be superuser or replication role to start walsender.", + resp.getError().getErrorMessage()); + query = "GRANT rds_replication TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + query = "REVOKE rds_replication FROM dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "GRANT rds_superuser TO dbz_group"; + SourceTestClient.performQuery(connPg, query); + resp = + testClient.validateSource( + pg.getJdbcUrl(), + pg.getHost(), + "debezium", + pg.getPassword(), + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders", + true); + assertEquals("", resp.getError().getErrorMessage()); + + } catch (Exception e) { + Assert.fail("validate rpc fail: " + e.getMessage()); + } finally { + // cleanup + query = testClient.sqlStmts.getProperty("tpch.drop.orders"); + SourceTestClient.performQuery(connPg, query); + query = "DROP OWNED BY dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "DROP GROUP dbz_group"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_replication"; + SourceTestClient.performQuery(connPg, query); + query = "DROP ROLE rds_superuser"; + SourceTestClient.performQuery(connPg, query); + query = "DROP USER debezium"; + SourceTestClient.performQuery(connPg, query); + connDbz.close(); + } + } + // generates test cases for the risingwave debezium parser @Ignore @Test diff --git a/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java b/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java index 443a69bc6aad0..789a1b462047b 100644 --- a/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java +++ b/java/connector-node/risingwave-source-cdc-test/src/test/java/com/risingwave/connector/source/SourceTestClient.java @@ -102,6 +102,28 @@ protected ConnectorServiceProto.ValidateSourceResponse validateSource( ConnectorServiceProto.TableSchema tableSchema, String databaseName, String tableName) { + return validateSource( + jdbcUrl, + host, + username, + password, + sourceType, + tableSchema, + databaseName, + tableName, + false); + } + + protected ConnectorServiceProto.ValidateSourceResponse validateSource( + String jdbcUrl, + String host, + String username, + String password, + ConnectorServiceProto.SourceType sourceType, + ConnectorServiceProto.TableSchema tableSchema, + String databaseName, + String tableName, + boolean forceRds) { String port = String.valueOf(URI.create(jdbcUrl.substring(5)).getPort()); ConnectorServiceProto.ValidateSourceRequest req = ConnectorServiceProto.ValidateSourceRequest.newBuilder() @@ -119,6 +141,8 @@ protected ConnectorServiceProto.ValidateSourceResponse validateSource( .putProperties("server.id", "1") // mysql only .putProperties("publication.name", "rw_publication") // pg only .putProperties("publication.create.enable", "true") // pg only + .putProperties( + "test.only.force.rds", forceRds ? "true" : "false") // pg only .build(); return blockingStub.validateSource(req); }