Skip to content

Commit

Permalink
fix(sink): construct java.sql.time/date object with a long type (#13651)
Browse files Browse the repository at this point in the history
  • Loading branch information
xuefengze authored Dec 4, 2023
1 parent 666e154 commit d7d56bd
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion e2e_test/sink/elasticsearch/elasticsearch_sink.result
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"doc","_id":"2","_score":1.0,"_source":{"v1":2,"v2":2,"v3":"2-2"}},{"_index":"test","_type":"doc","_id":"5","_score":1.0,"_source":{"v1":5,"v2":2,"v3":"5-2"}},{"_index":"test","_type":"doc","_id":"13","_score":1.0,"_source":{"v1":13,"v2":2,"v3":"13-2"}},{"_index":"test","_type":"doc","_id":"3","_score":1.0,"_source":{"v1":3,"v2":2,"v3":"3-2"}},{"_index":"test","_type":"doc","_id":"8","_score":1.0,"_source":{"v1":8,"v2":2,"v3":"8-2"}},{"_index":"test","_type":"doc","_id":"1","_score":1.0,"_source":{"v1":1,"v2":50,"v3":"1-50"}}]}}
{"took":6,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":6,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"test","_type":"doc","_id":"3","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.123","tz":"1970-01-01T00:00:00.123Z","v1":3,"v2":2,"v3":"3-2","ts":"1970-01-01 00:00:00.123"}},{"_index":"test","_type":"doc","_id":"5","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":5,"v2":2,"v3":"5-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"doc","_id":"8","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":8,"v2":2,"v3":"8-2","ts":"1970-01-01 00:00:00.000"}},{"_index":"test","_type":"doc","_id":"13","_score":1.0,"_source":{"d":"1970-01-01","t":"20:00:00.123","tz":"1970-01-01T20:00:00.123Z","v1":13,"v2":2,"v3":"13-2","ts":"1970-01-01 20:00:00.123"}},{"_index":"test","_type":"doc","_id":"1","_score":1.0,"_source":{"d":"2000-01-01","t":"00:00:00.123","tz":"2000-01-01T00:00:00.123Z","v1":1,"v2":50,"v3":"1-50","ts":"2000-01-01 00:00:00.123"}},{"_index":"test","_type":"doc","_id":"2","_score":1.0,"_source":{"d":"1970-01-01","t":"00:00:00.000","tz":"1970-01-01T00:00:00.000Z","v1":2,"v2":2,"v3":"2-2","ts":"1970-01-01 00:00:00.000"}}]}}
22 changes: 18 additions & 4 deletions e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
statement ok
CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar);
CREATE TABLE t7 (
v1 int primary key,
v2 bigint,
v3 varchar,
d date,
t time,
ts timestamp,
tz timestamptz
);

statement ok
CREATE SINK s7 AS select t7.v1 as v1, t7.v2 as v2, t7.v3 as v3 from t7 WITH (
CREATE SINK s7 from t7 WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch:9200',
Expand All @@ -11,13 +19,19 @@ CREATE SINK s7 AS select t7.v1 as v1, t7.v2 as v2, t7.v3 as v3 from t7 WITH (
);

statement ok
INSERT INTO t7 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2');
INSERT INTO t7 VALUES
(1, 2, '1-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(2, 2, '2-2', '1970-01-01', '00:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(3, 2, '3-2', '1970-01-01', '00:00:00.123456', '1970-01-01 00:00:00.123456', '1970-01-01 00:00:00.123456Z'),
(5, 2, '5-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(8, 2, '8-2', '1970-01-01', '20:00:00', '1970-01-01 00:00:00', '1970-01-01 00:00:00Z'),
(13, 2, '13-2', '1970-01-01', '20:00:00.123456', '1970-01-01 20:00:00.123456', '1970-01-01 20:00:00.123456Z');

statement ok
FLUSH;

statement ok
INSERT INTO t7 VALUES (1, 50, '1-50');
INSERT INTO t7 VALUES (1, 50, '1-50', '2000-01-01', '00:00:00.123456', '2000-01-01 00:00:00.123456', '2000-01-01 00:00:00.123456Z');

statement ok
FLUSH;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
import io.grpc.Status;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -64,6 +65,11 @@ public class EsSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(EsSink.class);
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";

private static final TimeZone UTCTimeZone = TimeZone.getTimeZone("UTC");
private final SimpleDateFormat tDfm;
private final SimpleDateFormat tsDfm;
private final SimpleDateFormat tstzDfm;

private final EsSinkConfig config;
private final BulkProcessor bulkProcessor;
private final RestHighLevelClient client;
Expand Down Expand Up @@ -105,6 +111,10 @@ public EsSink(EsSinkConfig config, TableSchema tableSchema) {
for (String primaryKey : tableSchema.getPrimaryKeys()) {
primaryKeyIndexes.add(tableSchema.getColumnIndex(primaryKey));
}

tDfm = createSimpleDateFormat("HH:mm:ss.SSS", UTCTimeZone);
tsDfm = createSimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS", UTCTimeZone);
tstzDfm = createSimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", UTCTimeZone);
}

private static RestClientBuilder configureRestClientBuilder(
Expand Down Expand Up @@ -206,15 +216,23 @@ private Map<String, Object> buildDoc(SinkRow row)
var type = columnDescs.get(i).getDataType().getTypeName();
Object col = row.get(i);
switch (type) {
case DATE:
case TIME:
case TIMESTAMP:
case TIMESTAMPTZ:
// es client doesn't natively support java.sql.Timestamp/Time/Date
// so we need to convert Date/Time/Timestamp type into a string as suggested in
// https://github.com/elastic/elasticsearch/issues/31377#issuecomment-398102292
case DATE:
col = col.toString();
break;
// construct java.sql.Time/Timestamp with milliseconds time value.
// it will use system timezone by default, so we have to set timezone manually
case TIME:
col = tDfm.format(col);
break;
case TIMESTAMP:
col = tsDfm.format(col);
break;
case TIMESTAMPTZ:
col = tstzDfm.format(col);
break;
case JSONB:
ObjectMapper mapper = new ObjectMapper();
JsonNode jsonNode = mapper.readTree((String) col);
Expand Down Expand Up @@ -351,4 +369,10 @@ public void drop() {
public RestHighLevelClient getClient() {
return client;
}

private final SimpleDateFormat createSimpleDateFormat(String pattern, TimeZone timeZone) {
SimpleDateFormat sdf = new SimpleDateFormat(pattern);
sdf.setTimeZone(timeZone);
return sdf;
}
}
1 change: 1 addition & 0 deletions src/jni_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ normal = ["workspace-hack"]
anyhow = "1"
bytes = "1"
cfg-or-panic = "0.2"
chrono = { version = "0.4", default-features = false }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
itertools = "0.12"
jni = "0.21.1"
Expand Down
92 changes: 28 additions & 64 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ use std::sync::{LazyLock, OnceLock};
use anyhow::anyhow;
use bytes::Bytes;
use cfg_or_panic::cfg_or_panic;
use chrono::NaiveDateTime;
use jni::objects::{
AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JStaticMethodID, JString,
JValueGen, JValueOwned, ReleaseMode,
AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JString, ReleaseMode,
};
use jni::signature::ReturnType;
use jni::sys::{
jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort, jsize, jvalue, JNI_FALSE, JNI_TRUE,
};
Expand Down Expand Up @@ -234,8 +233,8 @@ struct JavaClassMethodCache {
big_decimal_ctor: OnceLock<(GlobalRef, JMethodID)>,
timestamp_ctor: OnceLock<(GlobalRef, JMethodID)>,

date_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
time_ctor: OnceLock<(GlobalRef, JStaticMethodID)>,
date_ctor: OnceLock<(GlobalRef, JMethodID)>,
time_ctor: OnceLock<(GlobalRef, JMethodID)>,
}

// TODO: may only return a RowRef
Expand Down Expand Up @@ -675,40 +674,22 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDateValue
idx: jint,
) -> JObject<'a> {
execute_and_catch(env, move |env: &mut EnvParam<'_>| {
let value = pointer
.as_ref()
.datum_at(idx as usize)
.unwrap()
.into_date()
.0
.to_string();
// Constructs a Date object using a milliseconds time value.
let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_date();
let datetime = value.0.and_hms_opt(0, 0, 0).unwrap();
let millis = datetime.timestamp_millis();

let string_value = env.new_string(value)?;
let (class_ref, constructor) =
let (date_class_ref, constructor) =
pointer.as_ref().class_cache.date_ctor.get_or_try_init(|| {
let cls = env.find_class("java/sql/Date")?;
let init_method = env.get_static_method_id(
&cls,
"valueOf",
"(Ljava/lang/String;)Ljava/sql/Date;",
)?;
let cls = env.find_class(gen_class_name!(java.sql.Date))?;
let init_method =
env.get_method_id(&cls, "<init>", gen_jni_sig!(void Date(long)))?;
Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
})?;
unsafe {
let JValueOwned::Object(date_obj) = env.call_static_method_unchecked(
<&JClass<'_>>::from(class_ref.as_obj()),
*constructor,
ReturnType::Object,
&[jvalue {
l: string_value.into_raw(),
}],
)?
else {
return Err(BindingError::from(jni::errors::Error::MethodNotFound {
name: "valueOf".to_string(),
sig: "(Ljava/lang/String;)Ljava/sql/Date;".into(),
}));
};
let date_class = <&JClass<'_>>::from(date_class_ref.as_obj());
let date_obj =
env.new_object_unchecked(date_class, *constructor, &[jvalue { j: millis }])?;
Ok(date_obj)
}
})
Expand All @@ -721,41 +702,24 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimeValue
idx: jint,
) -> JObject<'a> {
execute_and_catch(env, move |env: &mut EnvParam<'_>| {
let value = pointer
.as_ref()
.datum_at(idx as usize)
.unwrap()
.into_time()
.0
.to_string();
// Constructs a Time object using a milliseconds time value.
let value = pointer.as_ref().datum_at(idx as usize).unwrap().into_time();
let epoch_date = NaiveDateTime::UNIX_EPOCH.date();
let datetime = epoch_date.and_time(value.0);
let millis = datetime.timestamp_millis();

let string_value = env.new_string(value)?;
let (class_ref, constructor) =
let (time_class_ref, constructor) =
pointer.as_ref().class_cache.time_ctor.get_or_try_init(|| {
let cls = env.find_class("java/sql/Time")?;
let init_method = env.get_static_method_id(
&cls,
"valueOf",
"(Ljava/lang/String;)Ljava/sql/Time;",
)?;
let cls = env.find_class(gen_class_name!(java.sql.Time))?;
let init_method =
env.get_method_id(&cls, "<init>", gen_jni_sig!(void Time(long)))?;
Ok::<_, jni::errors::Error>((env.new_global_ref(cls)?, init_method))
})?;
unsafe {
let class = <&JClass<'_>>::from(class_ref.as_obj());
match env.call_static_method_unchecked(
class,
*constructor,
ReturnType::Object,
&[jvalue {
l: string_value.into_raw(),
}],
)? {
JValueGen::Object(obj) => Ok(obj),
_ => Err(BindingError::from(jni::errors::Error::MethodNotFound {
name: "valueOf".to_string(),
sig: "(Ljava/lang/String;)Ljava/sql/Time;".into(),
})),
}
let time_class = <&JClass<'_>>::from(time_class_ref.as_obj());
let time_obj =
env.new_object_unchecked(time_class, *constructor, &[jvalue { j: millis }])?;
Ok(time_obj)
}
})
}
Expand Down

0 comments on commit d7d56bd

Please sign in to comment.