Skip to content

Commit

Permalink
second version
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 committed Jan 24, 2025
1 parent cd77184 commit 4550012
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 55 deletions.
43 changes: 43 additions & 0 deletions e2e_test/iceberg/test_case/iceberg_engine.slt
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,46 @@ select count(_row_id) from t_without_pk;

statement ok
DROP TABLE t_without_pk

# test iceberg connection in iceberg engine
statement ok
create secret my_secret with (
backend = 'meta'
) as 'hummockadmin';

statement ok
create connection my_conn
with (
type = 'iceberg',
warehouse.path = 's3://hummock001/iceberg_connection',
s3.access.key = secret my_secret,
s3.secret.key = secret my_secret,
s3.endpoint = 'http://127.0.0.1:9301',
s3.region = 'us-west-2',
);

statement ok
set iceberg_engine_connection = 'public.my_conn';

statement ok
create table t1(i int) with(commit_checkpoint_interval = 1) engine = iceberg;

statement ok
insert into t1 values(1);

statement ok
FLUSH;

query ?
select * from t1;
----
1

statement ok
DROP TABLE t1;

statement ok
DROP CONNECTION my_conn;

statement ok
DROP SECRET my_secret;
123 changes: 68 additions & 55 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ use risingwave_pb::plan_common::{
AdditionalColumn, ColumnDescVersion, DefaultColumnDesc, GeneratedColumnDesc,
};
use risingwave_pb::secret::secret_ref::PbRefAsType;
use risingwave_pb::secret::{secret_ref, PbSecretRef};
use risingwave_pb::secret::PbSecretRef;
use risingwave_pb::stream_plan::StreamFragmentGraph;
use risingwave_sqlparser::ast::{
CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, CreateSink, CreateSinkStatement,
CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format, FormatEncodeOptions,
Ident, ObjectName, OnConflict, SecretRefAsType, SecretRefValue, SourceWatermark, Statement,
TableConstraint, WebhookSourceInfo, WithProperties,
CdcTableInfo, ColumnDef, ColumnOption, CompatibleFormatEncode, ConnectionRefValue, CreateSink,
CreateSinkStatement, CreateSourceStatement, DataType as AstDataType, ExplainOptions, Format,
FormatEncodeOptions, Ident, ObjectName, OnConflict, SecretRefAsType,
SourceWatermark, Statement, TableConstraint, WebhookSourceInfo, WithProperties,
};
use risingwave_sqlparser::parser::{IncludeOption, Parser};
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -1481,7 +1481,7 @@ pub async fn create_iceberg_engine_table(

let iceberg_engine_connection: String = session.config().iceberg_engine_connection();

let mut secret_refs = BTreeMap::new();
let mut connection_ref = BTreeMap::new();

let (s3_region, s3_endpoint, s3_ak, s3_sk, warehouse_path) = if iceberg_engine_connection
.is_empty()
Expand All @@ -1495,14 +1495,14 @@ pub async fn create_iceberg_engine_table(
};
let s3_bucket = s3.strip_prefix("hummock+s3://").unwrap().to_owned();
(
s3_region,
Some(s3_region),
None,
None,
None,
format!(
Some(format!(
"s3://{}/{}/iceberg/{}",
s3_bucket, data_directory, iceberg_catalog_name
),
)),
)
}
minio if minio.starts_with("hummock+minio://") => {
Expand All @@ -1520,14 +1520,14 @@ pub async fn create_iceberg_engine_table(
};
let (address, s3_bucket) = rest.split_once('/').unwrap();
(
"us-east-1".to_owned(),
Some("us-east-1".to_owned()),
Some(format!("{}{}", endpoint_prefix, address)),
Some(access_key_id.to_owned()),
Some(secret_access_key.to_owned()),
format!(
Some(format!(
"s3://{}/{}/iceberg/{}",
s3_bucket, data_directory, iceberg_catalog_name
),
)),
)
}
_ => {
Expand All @@ -1544,53 +1544,58 @@ pub async fn create_iceberg_engine_table(
session.get_connection_by_name(Some(parts[0].to_owned()), &parts[1])?;
if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
if params.connection_type == ConnectionType::Iceberg as i32 {
println!("options: {:?}", &params.properties);
println!("secret_refs: {:?}", &params.secret_refs);
let catalog_reader = session.env().catalog_reader().read_guard();
for (k, v) in params.secret_refs.iter() {
let secret = catalog_reader.get_secret_by_id(&rw_db_name, v.secret_id)?;
if secret.database_id != table.database_id {
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"Secrets must be in the same database as the iceberg table".to_owned(),
)));
}
let schema_name = session
.env()
.catalog_reader()
.read_guard()
.get_schema_by_id(&secret.database_id, &secret.schema_id)?
.name();
secret_refs.insert(
k.to_owned(),
SecretRefValue {
secret_name: ObjectName::from(vec![
Ident::from(schema_name.as_str()),
Ident::from(secret.name.as_str()),
]),
ref_as: match secret_ref::RefAsType::try_from(v.ref_as).unwrap() {
secret_ref::RefAsType::Text => SecretRefAsType::Text,
secret_ref::RefAsType::File => SecretRefAsType::File,
secret_ref::RefAsType::Unspecified => unreachable!(),
},
},
);
}
let s3_region = params
let _s3_region = params
.properties
.get("s3.region")
.ok_or_else(|| anyhow!("`s3.region` must be set in iceberg engine connection"))?
.to_owned();
let s3_endpoint = params.properties.get("s3.endpoint").map(|s| s.to_owned());
let s3_ak = params.properties.get("s3.access.key").map(|s| s.to_owned());
let s3_sk = params.properties.get("s3.secret.key").map(|s| s.to_owned());
let warehouse_path = params
let _s3_endpoint = params.properties.get("s3.endpoint").map(|s| s.to_owned());
let _warehouse_path = params
.properties
.get("warehouse.path")
.map(|s| s.to_owned())
.ok_or_else(|| {
anyhow!("`warehouse.path` must be set in iceberg engine connection")
})?;
(s3_region, s3_endpoint, s3_ak, s3_sk, warehouse_path)

let allowed_properties = [
"s3.region",
"s3.endpoint",
"s3.access.key",
"s3.secret.key",
"warehouse.path",
"s3.path.style.access",
];
for (k, _) in params.properties.iter() {
if !allowed_properties.contains(&k.as_str()) {
return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
"`{}` is not allowed in iceberg engine connection",
k
))));
}
}

let allowed_secrets = ["s3.access.key", "s3.secret.key"];
for (k, _) in params.secret_refs.iter() {
if !allowed_secrets.contains(&k.as_str()) {
return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
"secret `{}` is not allowed in iceberg engine connection",
k
))));
}
}

connection_ref.insert(
"connection".to_owned(),
ConnectionRefValue {
connection_name: ObjectName::from(vec![
Ident::from(parts[0]),
Ident::from(parts[1]),
]),
},
);

(None, None, None, None, None)
} else {
return Err(RwError::from(ErrorCode::InvalidParameterValue(
"Only iceberg connection could be used in iceberg engine".to_owned(),
Expand Down Expand Up @@ -1735,7 +1740,9 @@ pub async fn create_iceberg_engine_table(
with.insert("primary_key".to_owned(), pks.join(","));
with.insert("type".to_owned(), "upsert".to_owned());
with.insert("catalog.type".to_owned(), "jdbc".to_owned());
with.insert("warehouse.path".to_owned(), warehouse_path.clone());
if let Some(warehouse_path) = warehouse_path.clone() {
with.insert("warehouse.path".to_owned(), warehouse_path.clone());
}
if let Some(s3_endpoint) = s3_endpoint.clone() {
with.insert("s3.endpoint".to_owned(), s3_endpoint);
}
Expand All @@ -1745,7 +1752,9 @@ pub async fn create_iceberg_engine_table(
if let Some(s3_sk) = s3_sk.clone() {
with.insert("s3.secret.key".to_owned(), s3_sk.clone());
}
with.insert("s3.region".to_owned(), s3_region.clone());
if let Some(s3_region) = s3_region.clone() {
with.insert("s3.region".to_owned(), s3_region.clone());
}
with.insert("catalog.uri".to_owned(), catalog_uri.clone());
with.insert("catalog.jdbc.user".to_owned(), meta_store_user.clone());
with.insert(
Expand Down Expand Up @@ -1788,7 +1797,7 @@ pub async fn create_iceberg_engine_table(
with.insert("create_table_if_not_exists".to_owned(), "true".to_owned());
with.insert("enable_config_load".to_owned(), "true".to_owned());
sink_handler_args.with_options =
WithOptions::new(with, secret_refs.clone(), Default::default());
WithOptions::new(with, Default::default(), connection_ref.clone());

let mut source_name = table_name.clone();
*source_name.0.last_mut().unwrap() = Ident::from(
Expand All @@ -1811,7 +1820,9 @@ pub async fn create_iceberg_engine_table(
let mut with = BTreeMap::new();
with.insert("connector".to_owned(), "iceberg".to_owned());
with.insert("catalog.type".to_owned(), "jdbc".to_owned());
with.insert("warehouse.path".to_owned(), warehouse_path.clone());
if let Some(warehouse_path) = warehouse_path.clone() {
with.insert("warehouse.path".to_owned(), warehouse_path.clone());
}
if let Some(s3_endpoint) = s3_endpoint {
with.insert("s3.endpoint".to_owned(), s3_endpoint.clone());
}
Expand All @@ -1821,7 +1832,9 @@ pub async fn create_iceberg_engine_table(
if let Some(s3_sk) = s3_sk.clone() {
with.insert("s3.secret.key".to_owned(), s3_sk.clone());
}
with.insert("s3.region".to_owned(), s3_region.clone());
if let Some(s3_region) = s3_region.clone() {
with.insert("s3.region".to_owned(), s3_region.clone());
}
with.insert("catalog.uri".to_owned(), catalog_uri.clone());
with.insert("catalog.jdbc.user".to_owned(), meta_store_user.clone());
with.insert(
Expand All @@ -1832,7 +1845,7 @@ pub async fn create_iceberg_engine_table(
with.insert("database.name".to_owned(), iceberg_database_name.clone());
with.insert("table.name".to_owned(), iceberg_table_name.clone());
with.insert("enable_config_load".to_owned(), "true".to_owned());
source_handler_args.with_options = WithOptions::new(with, secret_refs, Default::default());
source_handler_args.with_options = WithOptions::new(with, Default::default(), connection_ref);

// before we create the table, ensure the JVM is initialized as we use jdbc catalog right now.
// If JVM isn't initialized successfully, current not atomic ddl will result in a partially created iceberg engine table.
Expand Down

0 comments on commit 4550012

Please sign in to comment.