Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed Jan 23, 2025
1 parent 6feab21 commit 9a0d1af
Show file tree
Hide file tree
Showing 14 changed files with 119 additions and 32 deletions.
4 changes: 2 additions & 2 deletions e2e_test/batch/types/timestamp_ns.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ select * from t1 where v2 >= '2012-01-01 01:01:01.123456';
6 2013-01-01 01:01:01.123456789

query T rowsort
select v1, cast(v2 as date) as date_v2, cast(v2 as timestamp with time zone) as timestamptz_v2 from t1;
select v1, cast(v2 as date) as date_v2, cast(v2 as timestampnano) as timestamptz_v2 from t1;
----
1 2013-01-01 2013-01-01 01:01:01.123456+00:00
2 2012-01-01 2012-01-01 01:01:01.123456+00:00
Expand Down Expand Up @@ -102,7 +102,7 @@ select v1, to_char(v2, 'YYYY-MM-DD HH24:MI:SS.NS') as formatted_v2 from t1;
6 2013-01-01 01:01:01.123456789

query T rowsort
select generate_series('2013-01-01 01:01:01.123456789'::timestamp,'2013-01-01 01:01:05.123456790'::timestamp, '1 s');
select generate_series('2013-01-01 01:01:01.123456789'::timestampnano,'2013-01-01 01:01:05.123456790'::timestampnano, '1 s');
----
2013-01-01 01:01:01.123456789
2013-01-01 01:01:02.123456789
Expand Down
1 change: 1 addition & 0 deletions proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ message ExprNode {
MAKE_DATE = 113;
MAKE_TIME = 114;
MAKE_TIMESTAMP = 115;
MAKE_TIMESTAMPNANO = 116;
// From f64 to timestamp.
// e.g. `select to_timestamp(1672044740.0)`
SEC_TO_TIMESTAMPTZ = 104;
Expand Down
13 changes: 13 additions & 0 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,19 @@ macro_rules! impl_timestamp {
impl_timestamp!(Timestamp);
impl_timestamp!(TimestampNano);

impl From<Timestamp> for TimestampNano {
fn from(ts: Timestamp) -> Self {
TimestampNano::new(ts.0)
}
}

impl From<TimestampNano> for Timestamp {
fn from(ts: TimestampNano) -> Self {
let ts = ts.truncate_micros();
Timestamp::new(ts.0)
}
}

impl TimestampNano {
pub fn from_protobuf(cur: &mut Cursor<&[u8]>) -> ArrayResult<TimestampNano> {
let secs = cur
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/src/aggregate/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ fn min<T: Ord>(state: T, input: T) -> T {
#[aggregate("max(time) -> auto", state = "ref")]
#[aggregate("max(interval) -> auto", state = "ref")]
#[aggregate("max(timestamp) -> auto", state = "ref")]
#[aggregate("max(timestampnano) -> auto", state = "ref")]
#[aggregate("max(timestamptz) -> auto", state = "ref")]
#[aggregate("max(varchar) -> auto", state = "ref")]
#[aggregate("max(bytea) -> auto", state = "ref")]
Expand Down
2 changes: 2 additions & 0 deletions src/expr/impl/src/scalar/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ where
#[function("cast(timestampnano) -> date")]
#[function("cast(timestamp) -> time")]
#[function("cast(timestampnano) -> time")]
#[function("cast(timestampnano) -> timestamp")]
#[function("cast(timestamp) -> timestampnano")]
#[function("cast(interval) -> time")]
#[function("cast(varchar) -> varchar")]
#[function("cast(int256) -> float8")]
Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/src/scalar/cmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ where
#[function("greater_than_or_equal(time, time) -> boolean")]
#[function("greater_than_or_equal(interval, interval) -> boolean")]
#[function("greater_than_or_equal(timestamp, timestamp) -> boolean")]
#[function("greater_than_or_equal(timestampnano, timestampnano) -> boolean")]
#[function("greater_than_or_equal(timestamptz, timestamptz) -> boolean")]
#[function("greater_than_or_equal(date, timestamp) -> boolean")]
#[function("greater_than_or_equal(timestamp, date) -> boolean")]
Expand Down
37 changes: 30 additions & 7 deletions src/expr/impl/src/scalar/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
use std::str::FromStr;

use chrono::{Datelike, NaiveTime, Timelike};
use risingwave_common::types::{Date, Decimal, Interval, Time, Timestamp, Timestamptz, F64};
use risingwave_common::types::{
Date, Decimal, Interval, Time, Timestamp, TimestampNano, Timestamptz, F64,
};
use risingwave_expr::{function, ExprError, Result};

use self::Unit::*;
Expand Down Expand Up @@ -93,24 +95,45 @@ fn extract_from_time(time: Time, unit: &Unit) -> Decimal {
fn extract_from_timestamp(timestamp: Timestamp, unit: &Unit) -> Decimal {
match unit {
Epoch => {
if let Some(nanos) = timestamp.0.and_utc().timestamp_nanos_opt() {
let micros = timestamp.0.and_utc().timestamp_micros();
Decimal::from_i128_with_scale(micros as i128, 6)
}
Julian => {
let epoch =
Decimal::from_i128_with_scale(timestamp.0.and_utc().timestamp_micros() as i128, 6);
epoch / (24 * 60 * 60).into() + 2_440_588.into()
}
_ if unit.is_date_unit() => extract_from_datelike(timestamp.0.date(), *unit),
_ if unit.is_time_unit() => extract_from_timelike(timestamp.0.time(), *unit),
u => unreachable!("invalid unit {:?} for timestamp", u),
}
}

#[function(
"extract(varchar, timestampnano) -> decimal",
prebuild = "Unit::from_str($0)?.ensure_timestamp()?"
)]
fn extract_from_timestamp_nano(timestamp_nano: TimestampNano, unit: &Unit) -> Decimal {
match unit {
Epoch => {
if let Some(nanos) = timestamp_nano.0.and_utc().timestamp_nanos_opt() {
Decimal::from_i128_with_scale(nanos as i128, 9)
} else {
let micros = timestamp.0.and_utc().timestamp_micros();
let micros = timestamp_nano.0.and_utc().timestamp_micros();
Decimal::from_i128_with_scale(micros as i128, 6)
}
}
Julian => {
let epoch = if let Some(nanos) = timestamp.0.and_utc().timestamp_nanos_opt() {
let epoch = if let Some(nanos) = timestamp_nano.0.and_utc().timestamp_nanos_opt() {
Decimal::from_i128_with_scale(nanos as i128, 9)
} else {
let epoch = timestamp.0.and_utc().timestamp_micros();
let epoch = timestamp_nano.0.and_utc().timestamp_micros();
Decimal::from_i128_with_scale(epoch as i128, 6)
};
epoch / (24 * 60 * 60).into() + 2_440_588.into()
}
_ if unit.is_date_unit() => extract_from_datelike(timestamp.0.date(), *unit),
_ if unit.is_time_unit() => extract_from_timelike(timestamp.0.time(), *unit),
_ if unit.is_date_unit() => extract_from_datelike(timestamp_nano.0.date(), *unit),
_ if unit.is_time_unit() => extract_from_timelike(timestamp_nano.0.time(), *unit),
u => unreachable!("invalid unit {:?} for timestamp", u),
}
}
Expand Down
42 changes: 37 additions & 5 deletions src/expr/impl/src/scalar/make_time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use risingwave_common::types::{Date, FloatExt, Time, Timestamp, Timestamptz, F64};
use risingwave_common::types::{Date, FloatExt, Time, Timestamp, TimestampNano, Timestamptz, F64};
use risingwave_expr::expr_context::TIME_ZONE;
use risingwave_expr::{capture_context, function, ExprError, Result};

Expand All @@ -35,7 +35,7 @@ pub fn make_naive_date(mut year: i32, month: i32, day: i32) -> Result<NaiveDate>
})
}

fn make_naive_time(hour: i32, min: i32, sec: F64) -> Result<NaiveTime> {
fn make_naive_time_ns(hour: i32, min: i32, sec: F64) -> Result<NaiveTime> {
if !sec.is_finite() || sec.0.is_sign_negative() {
return Err(ExprError::InvalidParam {
name: "sec",
Expand All @@ -52,6 +52,23 @@ fn make_naive_time(hour: i32, min: i32, sec: F64) -> Result<NaiveTime> {
)
}

fn make_naive_time_ms(hour: i32, min: i32, sec: F64) -> Result<NaiveTime> {
if !sec.is_finite() || sec.0.is_sign_negative() {
return Err(ExprError::InvalidParam {
name: "sec",
reason: format!("invalid sec: {}", sec).into(),
});
}
let sec_u32 = sec.0.trunc() as u32;
let microsecond_u32 = ((sec.0 - sec.0.trunc()) * 1_000_000.0).round_ties_even() as u32;
NaiveTime::from_hms_micro_opt(hour as u32, min as u32, sec_u32, microsecond_u32).ok_or_else(
|| ExprError::InvalidParam {
name: "hour, min, sec",
reason: format!("invalid time: {}:{}:{}", hour, min, sec).into(),
},
)
}

// year int, month int, day int
#[function("make_date(int4, int4, int4) -> date")]
pub fn make_date(year: i32, month: i32, day: i32) -> Result<Date> {
Expand All @@ -61,7 +78,7 @@ pub fn make_date(year: i32, month: i32, day: i32) -> Result<Date> {
// hour int, min int, sec double precision
#[function("make_time(int4, int4, float8) -> time")]
pub fn make_time(hour: i32, min: i32, sec: F64) -> Result<Time> {
Ok(Time(make_naive_time(hour, min, sec)?))
Ok(Time(make_naive_time_ns(hour, min, sec)?))
}

// year int, month int, day int, hour int, min int, sec double precision
Expand All @@ -76,7 +93,22 @@ pub fn make_timestamp(
) -> Result<Timestamp> {
Ok(Timestamp(NaiveDateTime::new(
make_naive_date(year, month, day)?,
make_naive_time(hour, min, sec)?,
make_naive_time_ms(hour, min, sec)?,
)))
}

#[function("make_timestampnano(int4, int4, int4, int4, int4, float8) -> timestampnano")]
pub fn make_timestamp_nano(
year: i32,
month: i32,
day: i32,
hour: i32,
min: i32,
sec: F64,
) -> Result<TimestampNano> {
Ok(TimestampNano(NaiveDateTime::new(
make_naive_date(year, month, day)?,
make_naive_time_ns(hour, min, sec)?,
)))
}

Expand Down Expand Up @@ -119,7 +151,7 @@ fn make_timestamptz_impl(
) -> Result<Timestamptz> {
let naive_date_time = NaiveDateTime::new(
make_naive_date(year, month, day)?,
make_naive_time(hour, min, sec)?,
make_naive_time_ms(hour, min, sec)?,
);
timestamp_at_time_zone(Timestamp(naive_date_time), time_zone)
}
Expand Down
11 changes: 10 additions & 1 deletion src/expr/impl/src/scalar/to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::LazyLock;
use aho_corasick::{AhoCorasick, AhoCorasickBuilder};
use chrono::format::{Item, StrftimeItems};
use chrono::{Datelike, NaiveDate};
use risingwave_common::types::{Interval, Timestamp, Timestamptz};
use risingwave_common::types::{Interval, Timestamp, TimestampNano, Timestamptz};
use risingwave_expr::{function, ExprError, Result};

use super::timestamptz::time_zone_err;
Expand Down Expand Up @@ -194,6 +194,15 @@ fn timestamp_to_char(data: Timestamp, pattern: &ChronoPattern, writer: &mut impl
write!(writer, "{}", format).unwrap();
}

#[function(
"to_char(timestampnano, varchar) -> varchar",
prebuild = "ChronoPattern::compile($1)"
)]
fn timestamp_nano_to_char(data: TimestampNano, pattern: &ChronoPattern, writer: &mut impl Write) {
let format = data.0.format_with_items(pattern.borrow_dependent().iter());
write!(writer, "{}", format).unwrap();
}

#[function("to_char(timestamptz, varchar) -> varchar", rewritten)]
fn _timestamptz_to_char() {}

Expand Down
1 change: 1 addition & 0 deletions src/expr/impl/src/table_function/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where
#[function("generate_series(int4, int4, int4) -> setof int4")]
#[function("generate_series(int8, int8, int8) -> setof int8")]
#[function("generate_series(timestamp, timestamp, interval) -> setof timestamp")]
#[function("generate_series(timestampnano, timestampnano, interval) -> setof timestampnano")]
fn generate_series_step<T, S>(start: T, stop: T, step: S) -> Result<impl Iterator<Item = T>>
where
T: CheckedAdd<S, Output = T> + PartialOrd + Copy,
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/expr/function/builtin_scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ impl Binder {
("make_date", raw_call(ExprType::MakeDate)),
("make_time", raw_call(ExprType::MakeTime)),
("make_timestamp", raw_call(ExprType::MakeTimestamp)),
("make_timestampnano", raw_call(ExprType::MakeTimestampnano)),
("make_timestamptz", raw_call(ExprType::MakeTimestamptz)),
("timezone", rewrite(ExprType::AtTimeZone, |mut inputs|{
if inputs.len() == 2 {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/expr/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl ExprVisitor for ImpureAnalyzer {
| Type::MakeDate
| Type::MakeTime
| Type::MakeTimestamp
| Type::MakeTimestampnano
| Type::CharToTimestamptz
| Type::CharToDate
| Type::CastWithTimeZone
Expand Down
35 changes: 18 additions & 17 deletions src/frontend/src/expr/type_inference/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,23 +306,24 @@ pub static CAST_TABLE: LazyLock<CastTable> = LazyLock::new(|| {
use DataTypeName::*;
const CAST_TABLE: &[(&str, DataTypeName)] = &[
// 123456789ABCDEF
(". e a ", Boolean), // 0
(" .iiiiii a ", Int16), // 1
("ea.iiiii a ", Int32), // 2
(" aa.iiii ae", Int64), // 3
(" aaa.ii a ", Decimal), // 4
(" aaaa.i a ", Float32), // 5
(" aaaaa. a ", Float64), // 6
(" e. a ", Int256), // 7
(" .ii a ", Date), // 8
(" a.ia a ", Timestamp), // 9
(" aa.a a ", Timestamptz), // A
(" .i a ", Time), // B
(" a. a ", Interval), // C
("eeeeeee . a ", Jsonb), // D
(" .a ", Bytea), // E
("eeeeeeeeeeeeeee. ", Varchar), // F
(" e .", Serial),
(". e a ", Boolean), // 0
(" .iiiiii a ", Int16), // 1
("ea.iiiii a ", Int32), // 2
(" aa.iiii ae ", Int64), // 3
(" aaa.ii a ", Decimal), // 4
(" aaaa.i a ", Float32), // 5
(" aaaaa. a ", Float64), // 6
(" e. a ", Int256), // 7
(" .ii a i", Date), // 8
(" a.ia a i", Timestamp), // 9
(" aa.a a a", Timestamptz), // A
(" .i a ", Time), // B
(" a. a ", Interval), // C
("eeeeeee . a ", Jsonb), // D
(" .a ", Bytea), // E
("eeeeeeeeeeeeeee. e", Varchar), // F
(" e . ", Serial), // G
(" aaia a .", TimestampNano), // H
];
let mut map = BTreeMap::new();
for (row, source) in CAST_TABLE {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/optimizer/plan_expr_visitor/strong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ impl Strong {
| ExprType::TumbleStart
| ExprType::MakeDate
| ExprType::MakeTime
| ExprType::MakeTimestampnano
| ExprType::MakeTimestamp
| ExprType::SecToTimestamptz
| ExprType::AtTimeZone
Expand Down

0 comments on commit 9a0d1af

Please sign in to comment.