diff --git a/e2e_test/ddl/show.slt b/e2e_test/ddl/show.slt index 89fd174f02b4e..56e4868843a6a 100644 --- a/e2e_test/ddl/show.slt +++ b/e2e_test/ddl/show.slt @@ -164,30 +164,54 @@ drop view v3; statement ok drop view v1; +statement ok +CREATE SINK sink3 AS select * from t3 WITH ( + connector = 'blackhole' +); + +query TTTT +describe sink3; +---- +v1 integer false NULL +v2 integer false NULL +v3 integer false NULL +t3._row_id serial true NULL +distribution key t3._row_id NULL NULL +table description sink3 NULL NULL + +query TTTT +show columns from sink3; +---- +v1 integer false NULL +v2 integer false NULL +v3 integer false NULL +t3._row_id serial true NULL + +statement ok +drop sink sink3; + statement ok drop table t3; -# todo: re-enable it when we support these commands on view -# https://github.com/risingwavelabs/risingwave/issues/11234 -#query TT -#describe pg_matviews; -#---- -#schemaname varchar -#matviewname varchar -#matviewowner integer -#definition varchar -#matviewid integer -#matviewtimezone varchar -#matviewgraph varchar -#primary key schemaname, matviewname -# -#query TT -#show columns from pg_catalog.pg_matviews; -#---- -#schemaname varchar -#matviewname varchar -#matviewowner integer -#definition varchar -#matviewid integer -#matviewtimezone varchar -#matviewgraph varchar +query TTTT +describe pg_matviews; +---- +schemaname character varying false NULL +matviewname character varying false NULL +matviewowner integer false NULL +definition character varying false NULL +matviewid integer false NULL +matviewtimezone character varying false NULL +matviewgraph character varying false NULL +table description pg_matviews NULL NULL + +query TTTT +show columns from pg_catalog.pg_matviews; +---- +schemaname character varying false NULL +matviewname character varying false NULL +matviewowner integer false NULL +definition character varying false NULL +matviewid integer false NULL +matviewtimezone character varying false NULL +matviewgraph character varying false NULL diff --git a/src/frontend/src/binder/for_system.rs b/src/frontend/src/binder/for_system.rs new file mode 100644 index 0000000000000..d0db50e0f0712 --- /dev/null +++ b/src/frontend/src/binder/for_system.rs @@ -0,0 +1,68 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use risingwave_common::error::Result; +use risingwave_connector::sink::catalog::SinkCatalog; +use risingwave_sqlparser::ast::ObjectName; + +use crate::binder::BindFor; +use crate::catalog::root_catalog::SchemaPath; +use crate::catalog::view_catalog::ViewCatalog; +use crate::Binder; + +pub struct BoundSink { + pub sink_catalog: Arc, +} + +pub struct BoundView { + pub view_catalog: Arc, +} + +impl Binder { + pub fn bind_sink_by_name(&self, name: ObjectName) -> Result { + matches!(self.bind_for, BindFor::System); + let (schema_name, sink_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; + + let search_path = SchemaPath::new( + schema_name.as_deref(), + &self.search_path, + &self.auth_context.user_name, + ); + let (sink_catalog, _) = + self.catalog + .get_sink_by_name(&self.db_name, search_path, &sink_name)?; + Ok(BoundSink { + sink_catalog: sink_catalog.clone(), + }) + } + + pub fn bind_view_by_name(&self, name: ObjectName) -> Result { + matches!(self.bind_for, BindFor::System); + let (schema_name, view_name) = Self::resolve_schema_qualified_name(&self.db_name, name)?; + + let search_path = SchemaPath::new( + schema_name.as_deref(), + &self.search_path, + &self.auth_context.user_name, + ); + let (view_catalog, _) = + self.catalog + .get_view_by_name(&self.db_name, search_path, &view_name)?; + Ok(BoundView { + view_catalog: view_catalog.clone(), + }) + } +} diff --git a/src/frontend/src/binder/mod.rs b/src/frontend/src/binder/mod.rs index 00c82a8981da4..1435fb7d8cd65 100644 --- a/src/frontend/src/binder/mod.rs +++ b/src/frontend/src/binder/mod.rs @@ -28,6 +28,7 @@ mod bind_param; mod create; mod delete; mod expr; +mod for_system; mod insert; mod query; mod relation; @@ -41,6 +42,7 @@ mod values; pub use bind_context::{BindContext, Clause, LateralBindContext}; pub use delete::BoundDelete; pub use expr::{bind_data_type, bind_struct_field}; +pub use for_system::*; pub use insert::BoundInsert; use pgwire::pg_server::{Session, SessionId}; pub use query::BoundQuery; diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs index dd8cc72ed9bf5..bb00eb89b7d8a 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_columns.rs @@ -64,6 +64,29 @@ impl SysCatalogReaderImpl { }) }); + let sink_rows = schema + .iter_sink() + .flat_map(|sink| { + sink.full_columns() + .iter() + .enumerate() + .map(|(index, column)| { + OwnedRow::new(vec![ + Some(ScalarImpl::Int32(sink.id.sink_id as i32)), + Some(ScalarImpl::Utf8(column.name().into())), + Some(ScalarImpl::Int32(index as i32 + 1)), + Some(ScalarImpl::Bool(column.is_hidden)), + Some(ScalarImpl::Bool(sink.downstream_pk.contains(&index))), + Some(ScalarImpl::Bool(sink.distribution_key.contains(&index))), + Some(ScalarImpl::Utf8(column.data_type().to_string().into())), + Some(ScalarImpl::Int32(column.data_type().to_oid())), + Some(ScalarImpl::Int16(column.data_type().type_len())), + Some(ScalarImpl::Utf8(column.data_type().pg_name().into())), + ]) + }) + }) + .chain(view_rows); + let rows = schema .iter_system_tables() .flat_map(|table| { @@ -86,7 +109,7 @@ impl SysCatalogReaderImpl { ]) }) }) - .chain(view_rows); + .chain(sink_rows); schema .iter_valid_table() diff --git a/src/frontend/src/handler/describe.rs b/src/frontend/src/handler/describe.rs index cef7af9dbd324..aea8bf6c3251e 100644 --- a/src/frontend/src/handler/describe.rs +++ b/src/frontend/src/handler/describe.rs @@ -18,6 +18,7 @@ use itertools::Itertools; use pgwire::pg_field_descriptor::PgFieldDescriptor; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::types::Row; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc}; use risingwave_common::error::Result; use risingwave_common::types::DataType; use risingwave_sqlparser::ast::{display_comma_separated, ObjectName}; @@ -31,77 +32,130 @@ use crate::handler::HandlerArgs; pub fn handle_describe(handler_args: HandlerArgs, table_name: ObjectName) -> Result { let session = handler_args.session; let mut binder = Binder::new_for_system(&session); - let relation = binder.bind_relation_by_name(table_name.clone(), None, false)?; - // For Source, it doesn't have table catalog so use get source to get column descs. + let not_found_err = + CatalogError::NotFound("table, source, sink or view", table_name.to_string()); // Vec, Vec, Vec, Vec>, String, Option - let (columns, pk_columns, dist_columns, indices, relname, description) = match relation { - Relation::Source(s) => { - let pk_column_catalogs = s - .catalog - .pk_col_ids - .iter() - .map(|&column_id| { - s.catalog - .columns + let (columns, pk_columns, dist_columns, indices, relname, description) = + if let Ok(relation) = binder.bind_relation_by_name(table_name.clone(), None, false) { + match relation { + Relation::Source(s) => { + let pk_column_catalogs = s + .catalog + .pk_col_ids .iter() - .filter(|x| x.column_id() == column_id) - .map(|x| x.column_desc.clone()) - .exactly_one() - .unwrap() - }) - .collect_vec(); - ( - s.catalog.columns, - pk_column_catalogs, - vec![], - vec![], - s.catalog.name, - None, // Description - ) - } - Relation::BaseTable(t) => { - let pk_column_catalogs = t - .table_catalog - .pk() - .iter() - .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone()) - .collect_vec(); - let dist_columns = t - .table_catalog - .distribution_key() - .iter() - .map(|idx| t.table_catalog.columns[*idx].column_desc.clone()) - .collect_vec(); - ( - t.table_catalog.columns, - pk_column_catalogs, - dist_columns, - t.table_indexes, - t.table_catalog.name, - t.table_catalog.description, - ) - } - Relation::SystemTable(t) => { - let pk_column_catalogs = t - .sys_table_catalog - .pk - .iter() - .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone()) - .collect_vec(); - ( - t.sys_table_catalog.columns.clone(), - pk_column_catalogs, - vec![], - vec![], - t.sys_table_catalog.name.clone(), - None, // Description - ) - } - _ => { - return Err(CatalogError::NotFound("table or source", table_name.to_string()).into()); - } - }; + .map(|&column_id| { + s.catalog + .columns + .iter() + .filter(|x| x.column_id() == column_id) + .map(|x| x.column_desc.clone()) + .exactly_one() + .unwrap() + }) + .collect_vec(); + ( + s.catalog.columns, + pk_column_catalogs, + vec![], + vec![], + s.catalog.name, + None, // Description + ) + } + Relation::BaseTable(t) => { + let pk_column_catalogs = t + .table_catalog + .pk() + .iter() + .map(|x| t.table_catalog.columns[x.column_index].column_desc.clone()) + .collect_vec(); + let dist_columns = t + .table_catalog + .distribution_key() + .iter() + .map(|idx| t.table_catalog.columns[*idx].column_desc.clone()) + .collect_vec(); + ( + t.table_catalog.columns, + pk_column_catalogs, + dist_columns, + t.table_indexes, + t.table_catalog.name, + t.table_catalog.description, + ) + } + Relation::SystemTable(t) => { + let pk_column_catalogs = t + .sys_table_catalog + .pk + .iter() + .map(|idx| t.sys_table_catalog.columns[*idx].column_desc.clone()) + .collect_vec(); + ( + t.sys_table_catalog.columns.clone(), + pk_column_catalogs, + vec![], + vec![], + t.sys_table_catalog.name.clone(), + None, // Description + ) + } + Relation::Share(_) => { + if let Ok(view) = binder.bind_view_by_name(table_name.clone()) { + let columns = view + .view_catalog + .columns + .iter() + .enumerate() + .map(|(idx, field)| ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id(field, idx as _), + is_hidden: false, + }) + .collect(); + ( + columns, + vec![], + vec![], + vec![], + view.view_catalog.name.clone(), + None, + ) + } else { + return Err(not_found_err.into()); + } + } + _ => { + return Err(not_found_err.into()); + } + } + } else { + if let Ok(sink) = binder.bind_sink_by_name(table_name.clone()) { + let columns = sink.sink_catalog.full_columns().to_vec(); + let pk_columns = sink + .sink_catalog + .downstream_pk_indices() + .into_iter() + .map(|idx| columns[idx].column_desc.clone()) + .collect_vec(); + let dist_columns = sink + .sink_catalog + .distribution_key + .iter() + .map(|idx| columns[*idx].column_desc.clone()) + .collect_vec(); + ( + columns, + pk_columns, + dist_columns, + vec![], + sink.sink_catalog.name.clone(), + None, + ) + } else { + return Err(not_found_err.into()); + } + }; // Convert all column descs to rows let mut rows = col_descs_to_rows(columns); diff --git a/src/frontend/src/handler/show.rs b/src/frontend/src/handler/show.rs index 555162d42ed64..3c6016a6a610e 100644 --- a/src/frontend/src/handler/show.rs +++ b/src/frontend/src/handler/show.rs @@ -20,7 +20,7 @@ use pgwire::pg_protocol::truncated_fmt; use pgwire::pg_response::{PgResponse, StatementType}; use pgwire::pg_server::Session; use pgwire::types::Row; -use risingwave_common::catalog::{ColumnCatalog, DEFAULT_SCHEMA_NAME}; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, DEFAULT_SCHEMA_NAME}; use risingwave_common::error::{ErrorCode, Result}; use risingwave_common::types::DataType; use risingwave_common::util::addr::HostAddr; @@ -58,6 +58,34 @@ pub fn get_columns_from_table( Ok(column_catalogs) } +pub fn get_columns_from_sink( + session: &SessionImpl, + sink_name: ObjectName, +) -> Result> { + let binder = Binder::new_for_system(session); + let sink = binder.bind_sink_by_name(sink_name.clone())?; + Ok(sink.sink_catalog.full_columns().to_vec()) +} + +pub fn get_columns_from_view( + session: &SessionImpl, + view_name: ObjectName, +) -> Result> { + let binder = Binder::new_for_system(session); + let view = binder.bind_view_by_name(view_name.clone())?; + + Ok(view + .view_catalog + .columns + .iter() + .enumerate() + .map(|(idx, field)| ColumnCatalog { + column_desc: ColumnDesc::from_field_with_column_id(field, idx as _), + is_hidden: false, + }) + .collect()) +} + pub fn get_indexes_from_table( session: &SessionImpl, table_name: ObjectName, @@ -142,7 +170,17 @@ pub async fn handle_show_object( .map(|t| t.name.clone()) .collect(), ShowObject::Columns { table } => { - let columns = get_columns_from_table(&session, table)?; + let Ok(columns) = get_columns_from_table(&session, table.clone()) + .or(get_columns_from_sink(&session, table.clone())) + .or(get_columns_from_view(&session, table.clone())) + else { + return Err(CatalogError::NotFound( + "table, source, sink or view", + table.to_string(), + ) + .into()); + }; + let rows = col_descs_to_rows(columns); return Ok(PgResponse::builder(StatementType::SHOW_COMMAND)