Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support es sink partial update #20048

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions integration_tests/elasticsearch-sink/create_sink.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ CREATE SINK bhv_es7_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
type = 'upsert',
index = 'test',
url = 'http://elasticsearch7:9200',
username = 'elastic',
Expand All @@ -14,6 +15,7 @@ CREATE SINK bhv_es8_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
type = 'upsert',
index = 'test',
url = 'http://elasticsearch8:9200',
username = 'elastic',
Expand All @@ -24,6 +26,7 @@ CREATE SINK es7_types_sink
FROM
es_types WITH (
connector = 'elasticsearch',
type = 'upsert',
index = 'test_types',
primary_key = 'types_id',
url = 'http://elasticsearch7:9200',
Expand All @@ -35,6 +38,7 @@ CREATE SINK es8_types_sink
FROM
es_types WITH (
connector = 'elasticsearch',
type = 'upsert',
index = 'test_types',
primary_key = 'types_id',
url = 'http://elasticsearch8:9200',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
// limitations under the License.

use anyhow::anyhow;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::array::{Op, RowRef, StreamChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::row::Row;
use risingwave_common::util::iter_util::ZipEqDebug;
use serde_json::{Map, Value};

use super::super::encoder::template::TemplateEncoder;
Expand Down Expand Up @@ -118,6 +119,7 @@ impl ElasticSearchOpenSearchFormatter {
chunk: StreamChunk,
is_append_only: bool,
) -> Result<Vec<BuildBulkPara>> {
let mut update_delete_row: Option<(String, RowRef<'_>)> = None;
let mut result_vec = Vec::with_capacity(chunk.capacity());
for (op, rows) in chunk.rows() {
let index = if let Some(index_column) = self.index_column {
Expand Down Expand Up @@ -151,7 +153,30 @@ impl ElasticSearchOpenSearchFormatter {
match op {
Op::Insert | Op::UpdateInsert => {
let key = self.key_encoder.encode(rows)?;
let value = self.value_encoder.encode(rows)?;
let mut modified_col_indices = Vec::new();
if let Some((delete_key, delete_row)) = update_delete_row.take()
&& delete_key == key
{
delete_row
.iter()
.enumerate()
.zip_eq_debug(rows.iter())
.for_each(|((index, delete_column), insert_column)| {
if let Some(insert_column) = insert_column
&& let Some(delete_column) = delete_column
&& insert_column == delete_column
{
// do nothing
} else {
modified_col_indices.push(index);
}
});
} else {
modified_col_indices = (0..self.value_encoder.schema().len()).collect();
}
let value = self
.value_encoder
.encode_cols(rows, modified_col_indices.into_iter())?;
result_vec.push(BuildBulkPara {
index: index.to_owned(),
key,
Expand Down Expand Up @@ -182,7 +207,8 @@ impl ElasticSearchOpenSearchFormatter {
"`UpdateDelete` operation is not supported in `append_only` mode"
)));
} else {
continue;
let key = self.key_encoder.encode(rows)?;
update_delete_row = Some((key, rows));
}
}
}
Expand Down
Loading