diff --git a/src/promql/src/extension_plan/series_divide.rs b/src/promql/src/extension_plan/series_divide.rs index fbdaa7dec3bb..b5fc923c00ac 100644 --- a/src/promql/src/extension_plan/series_divide.rs +++ b/src/promql/src/extension_plan/series_divide.rs @@ -258,7 +258,10 @@ impl Stream for SeriesDivideStream { let timer = std::time::Instant::now(); loop { if !self.buffer.is_empty() { - let cut_at = self.find_first_diff_row(); + let cut_at = match self.find_first_diff_row() { + Ok(cut_at) => cut_at, + Err(e) => return Poll::Ready(Some(Err(e))), + }; if let Some((batch_index, row_index)) = cut_at { // slice out the first time series and return it. let half_batch_of_first_series = @@ -322,10 +325,10 @@ impl SeriesDivideStream { /// Return the position to cut buffer. /// None implies the current buffer only contains one time series. - fn find_first_diff_row(&mut self) -> Option<(usize, usize)> { + fn find_first_diff_row(&mut self) -> DataFusionResult> { // fast path: no tag columns means all data belongs to the same series. if self.tag_indices.is_empty() { - return None; + return Ok(None); } let mut resumed_batch_index = self.inspect_start; @@ -341,18 +344,26 @@ impl SeriesDivideStream { for index in &self.tag_indices { let current_array = batch.column(*index); let last_array = last_batch.column(*index); - let current_value = current_array + let current_string_array = current_array .as_any() .downcast_ref::() - .unwrap() - .value(0); - let last_value = last_array + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; + let last_string_array = last_array .as_any() .downcast_ref::() - .unwrap() - .value(last_row); + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; + let current_value = current_string_array.value(0); + let last_value = last_string_array.value(last_row); if current_value != last_value { - return Some((resumed_batch_index, 0)); + return Ok(Some((resumed_batch_index, 0))); } } } @@ -360,7 +371,15 @@ impl SeriesDivideStream { // check column by column for index in &self.tag_indices { let array = batch.column(*index); - let string_array = array.as_any().downcast_ref::().unwrap(); + let string_array = + array + .as_any() + .downcast_ref::() + .ok_or_else(|| { + datafusion::error::DataFusionError::Internal( + "Failed to downcast tag column to StringArray".to_string(), + ) + })?; // the first row number that not equal to the next row. let mut same_until = 0; while same_until < num_rows - 1 { @@ -376,12 +395,12 @@ impl SeriesDivideStream { // all rows are the same, inspect next batch resumed_batch_index += 1; } else { - return Some((resumed_batch_index, result_index)); + return Ok(Some((resumed_batch_index, result_index))); } } self.inspect_start = resumed_batch_index; - None + Ok(None) } } diff --git a/src/query/src/promql/planner.rs b/src/query/src/promql/planner.rs index 69b4aef6d5a6..8fa344295ab3 100644 --- a/src/query/src/promql/planner.rs +++ b/src/query/src/promql/planner.rs @@ -63,6 +63,9 @@ use promql_parser::parser::{ VectorMatchCardinality, VectorSelector, }; use snafu::{ensure, OptionExt, ResultExt}; +use store_api::metric_engine_consts::{ + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, +}; use table::table::adapter::DfTableProviderAdapter; use crate::promql::error::{ @@ -1146,6 +1149,10 @@ impl PromPlanner { .table_info() .meta .row_key_column_names() + .filter(|col| { + // remove metric engine's internal columns + col != &DATA_SCHEMA_TABLE_ID_COLUMN_NAME && col != &DATA_SCHEMA_TSID_COLUMN_NAME + }) .cloned() .collect(); self.ctx.tag_columns = tags; diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index adbf49010bf0..d6f205ee9e62 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -45,7 +45,7 @@ use serde_json::Value; use session::context::{QueryContext, QueryContextRef}; use snafu::{Location, OptionExt, ResultExt}; use store_api::metric_engine_consts::{ - DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, + DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY, }; pub use super::result::prometheus_resp::PrometheusJsonResponse; @@ -941,16 +941,30 @@ pub async fn label_values_query( .start_timer(); if label_name == METRIC_NAME_LABEL { - let mut table_names = match handler - .catalog_manager() - .table_names(&catalog, &schema, Some(&query_ctx)) - .await - { - Ok(table_names) => table_names, - Err(e) => { - return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + let catalog_manager = handler.catalog_manager(); + let mut tables_stream = catalog_manager.tables(&catalog, &schema, Some(&query_ctx)); + let mut table_names = Vec::new(); + while let Some(table) = tables_stream.next().await { + // filter out physical tables + match table { + Ok(table) => { + if table + .table_info() + .meta + .options + .extra_options + .contains_key(PHYSICAL_TABLE_METADATA_KEY) + { + continue; + } + + table_names.push(table.table_info().name.clone()); + } + Err(e) => { + return PrometheusJsonResponse::error(e.status_code(), e.output_msg()); + } } - }; + } table_names.sort_unstable(); return PrometheusJsonResponse::success(PrometheusResponse::LabelValues(table_names)); } else if label_name == FIELD_NAME_LABEL { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 714f19a972ca..186d84d1b75f 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -659,6 +659,18 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert!(prom_resp.error_type.is_none()); // query `__name__` without match[] + // create a physical table and a logical table + let res = client + .get("/v1/sql?sql=create table physical_table (`ts` timestamp time index, message string) with ('physical_metric_table' = 'true');") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + let res = client + .get("/v1/sql?sql=create table logic_table (`ts` timestamp time index, message string) with ('on_physical_table' = 'physical_table');") + .send() + .await; + assert_eq!(res.status(), StatusCode::OK, "{:?}", res.text().await); + // query `__name__` let res = client .get("/v1/prometheus/api/v1/label/__name__/values") .send() @@ -668,6 +680,15 @@ pub async fn test_prom_http_api(store_type: StorageType) { assert_eq!(prom_resp.status, "success"); assert!(prom_resp.error.is_none()); assert!(prom_resp.error_type.is_none()); + assert_eq!( + prom_resp.data, + PrometheusResponse::Labels(vec![ + "demo".to_string(), + "demo_metrics".to_string(), + "logic_table".to_string(), + "numbers".to_string() + ]) + ); // buildinfo let res = client