diff --git a/.typos.toml b/.typos.toml
index 4b6af71b5ddf6..001a8f9d276ec 100644
--- a/.typos.toml
+++ b/.typos.toml
@@ -33,7 +33,7 @@ extend-exclude = [
"src/sqlparser/tests/testdata/",
"src/frontend/planner_test/tests/testdata",
"src/tests/sqlsmith/tests/freeze",
- "src/license/src/manager.rs",
+ "src/license/**/*.rs", # JWT license key
"Cargo.lock",
"**/Cargo.toml",
"**/go.mod",
diff --git a/Cargo.lock b/Cargo.lock
index 73178ca6ba387..6f18a4607ad36 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1904,7 +1904,7 @@ dependencies = [
"bitflags 2.8.0",
"cexpr",
"clang-sys",
- "itertools 0.10.5",
+ "itertools 0.12.1",
"lazy_static",
"lazycell",
"log",
@@ -1927,7 +1927,7 @@ dependencies = [
"bitflags 2.8.0",
"cexpr",
"clang-sys",
- "itertools 0.10.5",
+ "itertools 0.13.0",
"proc-macro2",
"quote",
"regex",
@@ -2724,9 +2724,9 @@ dependencies = [
[[package]]
name = "const-str"
-version = "0.5.6"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6"
+checksum = "671927c085eb5827d30b95df08f6c6a2301eafe2274c368bb2c16f42e03547eb"
[[package]]
name = "constant_time_eq"
@@ -5090,9 +5090,9 @@ dependencies = [
[[package]]
name = "fs-err"
-version = "3.0.0"
+version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8bb60e7409f34ef959985bc9d9c5ee8f5db24ee46ed9775850548021710f807f"
+checksum = "1f89bda4c2a21204059a977ed3bfe746677dfd137b83c339e702b0ac91d482aa"
dependencies = [
"autocfg",
]
@@ -9466,7 +9466,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
- "itertools 0.10.5",
+ "itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.87",
diff --git a/README.md b/README.md
index 96bc0f35c7ddf..139feeb6f0381 100644
--- a/README.md
+++ b/README.md
@@ -9,25 +9,18 @@
-### 🌊 Reimagine real-time data engineering.
+### 🌊 Ride the Wave of Real-Time Data.
-
- 📚
- Documentation 🚀
-
- Slack Community
-
+
+ Docs | Benchmarks | Demos
+
+
+
-RisingWave is a Postgres-compatible SQL database engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data.
+RisingWave is the world's most advanced event stream processing platform engineered to provide the simplest and most cost-efficient approach for processing, analyzing, and managing real-time event streaming data. It provides both a Postgres-compatible [SQL interface](https://docs.risingwave.com/sql/overview) and a DataFrame-style [Python interface](https://docs.risingwave.com/python-sdk/intro).
-RisingWave can ingest millions of events per second, continuously join and analyze live data streams with historical tables, serve ad-hoc queries in real-time, and deliver fresh, consistent results wherever needed.
+RisingWave can ingest millions of events per second, continuously join and analyze live data streams with historical tables, serve ad-hoc queries at low latency, and deliver fresh, consistent results wherever needed.
-![RisingWave](./docs/dev/src/images/architecture_20240908.png)
+![RisingWave](./docs/dev/src/images/architecture_20250127.png)
## Try it out in 60 seconds
@@ -61,14 +54,28 @@ curl -L https://risingwave.com/sh | sh
To learn about other installation options, such as using a Docker image, see [Quick Start](https://docs.risingwave.com/docs/current/get-started/).
-## When is RisingWave the perfect fit?
-RisingWave is the ideal solution for:
+## What Is RisingWave Optimized For?
+RisingWave simplifies the development of real-time data pipelines and applications. It is purpose-built to:
+
+* **Ingestion**: Ingest millions of events per second from both streaming and batch data sources.
+* **Stream processing**: Perform real-time incremental data processing to join and analyze live data streams with historical tables.
+* **Serving**: Persist data and serve ad-hoc queries with single-digit millisecond latency.
+* **Delivery**: Deliver fresh, consistent results to data lakes (e.g., Apache Iceberg) or any destination.
+
+
+## Why Is RisingWave Special?
+RisingWave stands apart from traditional stream processing systems due to its:
+
+### PostgreSQL Compatibility
+* **Seamless integration**: Works with a wide range of tools in the PostgreSQL ecosystem.
+* **Expressive SQL**: Supports structured, semi-structured, and unstructured data using a rich, familiar SQL dialect.
+* **No manual state tuning**: Eliminates the need for complex state management configurations.
+
+### Decoupled Compute and Storage
+* **Optimized for complex queries**: Ensures high performance, especially for complex operations like joins and time windowing.
+* **Fast failure recovery**: Recovers from system crashes within seconds.
+* **Dynamic scaling**: Adjusts resources instantly to handle workload spikes.
-* Managing real-time data sources like Kafka streams, database CDC, and more.
-* Executing complex, on-the-fly queries, including joins, aggregations, and time windowing.
-* Interactively and concurrently exploring consistent, up-to-the-moment results.
-* Seamlessly delivering results to downstream systems.
-* Processing both streaming and batch data with a unified codebase.
## In what use cases does RisingWave excel?
diff --git a/ci/workflows/main-cron.yml b/ci/workflows/main-cron.yml
index 8fcc035500398..f977c65a056ad 100644
--- a/ci/workflows/main-cron.yml
+++ b/ci/workflows/main-cron.yml
@@ -144,7 +144,7 @@ steps:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 13
+ timeout_in_minutes: 18
retry: *auto-retry
- label: "end-to-end test (parallel, in-memory) (release)"
@@ -160,7 +160,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 15
+ timeout_in_minutes: 20
retry: *auto-retry
- group: "end-to-end connector test (release)"
@@ -342,7 +342,7 @@ steps:
- label: "end-to-end test (madsim)"
key: "e2e-test-deterministic"
- command: "TEST_NUM=32 timeout 130m ci/scripts/deterministic-e2e-test.sh"
+ command: "TEST_NUM=32 timeout 135m ci/scripts/deterministic-e2e-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation"
@@ -357,12 +357,12 @@ steps:
environment:
- GITHUB_TOKEN
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 135
+ timeout_in_minutes: 140
retry: *auto-retry
- label: "end-to-end test (madsim, random vnode count)"
key: "e2e-test-deterministic-random-vnode-count"
- command: "TEST_NUM=32 RW_SIM_RANDOM_VNODE_COUNT=true timeout 130m ci/scripts/deterministic-e2e-test.sh"
+ command: "TEST_NUM=32 RW_SIM_RANDOM_VNODE_COUNT=true timeout 135m ci/scripts/deterministic-e2e-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-test-deterministic-simulation"
@@ -377,12 +377,12 @@ steps:
environment:
- GITHUB_TOKEN
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 135
+ timeout_in_minutes: 140
retry: *auto-retry
- label: "recovery test (madsim)"
key: "recovery-test-deterministic"
- command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 70m ci/scripts/deterministic-recovery-test.sh"
+ command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.0 timeout 75m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
@@ -392,13 +392,13 @@ steps:
- docker-compose#v5.5.0: *docker-compose
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
- timeout_in_minutes: 75
+ timeout_in_minutes: 80
retry: *auto-retry
# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (madsim)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
- command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 70m ci/scripts/deterministic-recovery-test.sh"
+ command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 75m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
@@ -408,7 +408,7 @@ steps:
- docker-compose#v5.5.0: *docker-compose
# Only upload zipped files, otherwise the logs is too much.
- ./ci/plugins/upload-failure-logs-zipped
- timeout_in_minutes: 70
+ timeout_in_minutes: 80
retry: *auto-retry
- label: "end-to-end iceberg sink test (release)"
diff --git a/ci/workflows/pull-request.yml b/ci/workflows/pull-request.yml
index 19fda2f2d99d5..1b2adcbac28b7 100644
--- a/ci/workflows/pull-request.yml
+++ b/ci/workflows/pull-request.yml
@@ -160,7 +160,7 @@ steps:
plugins:
- docker-compose#v5.5.0: *docker-compose
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 25
+ timeout_in_minutes: 30
retry: *auto-retry
- label: "end-to-end test for opendal (parallel)"
@@ -562,7 +562,7 @@ steps:
environment:
- GITHUB_TOKEN
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 30
+ timeout_in_minutes: 35
cancel_on_build_failing: true
retry: *auto-retry
@@ -583,7 +583,7 @@ steps:
# - test-collector#v1.0.0:
# files: "*-junit.xml"
# format: "junit"
- timeout_in_minutes: 35
+ timeout_in_minutes: 40
cancel_on_build_failing: true
retry: *auto-retry
@@ -748,7 +748,7 @@ steps:
run: ci-standard-env
propagate-environment: true
- ./ci/plugins/upload-failure-logs
- timeout_in_minutes: 32
+ timeout_in_minutes: 37
retry: *auto-retry
# FIXME(kwannoel): Let the github PR labeller label it, if sqlsmith source files has changes.
diff --git a/docs/dev/src/images/architecture_20250127.png b/docs/dev/src/images/architecture_20250127.png
new file mode 100644
index 0000000000000..703a38e83f356
Binary files /dev/null and b/docs/dev/src/images/architecture_20250127.png differ
diff --git a/e2e_test/batch/join/asof_join.slt b/e2e_test/batch/join/asof_join.slt
new file mode 100644
index 0000000000000..bf905b661e107
--- /dev/null
+++ b/e2e_test/batch/join/asof_join.slt
@@ -0,0 +1,43 @@
+statement ok
+SET RW_IMPLICIT_FLUSH TO true;
+
+statement ok
+create table t1 (v1 int, v2 int, v3 int primary key);
+
+statement ok
+create table t2 (v1 int, v2 int, v3 int primary key);
+
+statement ok
+insert into t1 values (1, 2, 3), (2, 3, 4), (1, 2, 9);
+
+statement ok
+insert into t2 values (1, NULL, 8), (1, 3, 4), (1, 2, 5), (1, 2, 6);
+
+# asof inner join
+query IIIIII
+SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 order by t1.v1, t1.v3;
+----
+1 2 3 1 3 4
+1 2 9 1 3 4
+
+# asof left join
+query IIIIII
+SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 < t2.v2 order by t1.v1, t1.v3;
+----
+1 2 3 1 3 4
+1 2 9 1 3 4
+2 3 4 NULL NULL NULL
+
+# asof left join
+query IIIIII
+SELECT t1.v1 t1_v1, t1.v2 t1_v2, t1.v3 t1_v3, t2.v1 t2_v1, t2.v2 t2_v2, t2.v3 t2_v3 FROM t1 ASOF LEFT JOIN t2 ON t1.v1 = t2.v1 and t1.v2 > t2.v2 order by t1.v1, t1.v3;
+----
+1 2 3 NULL NULL NULL
+1 2 9 NULL NULL NULL
+2 3 4 NULL NULL NULL
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
\ No newline at end of file
diff --git a/e2e_test/ddl/alter_table_column_issue_17121.slt b/e2e_test/ddl/alter_table_column_issue_17121.slt
new file mode 100644
index 0000000000000..d96c030d0a20c
--- /dev/null
+++ b/e2e_test/ddl/alter_table_column_issue_17121.slt
@@ -0,0 +1,41 @@
+# https://github.com/risingwavelabs/risingwave/issues/17121
+
+statement ok
+create table t(v int);
+
+statement ok
+insert into t values (1);
+
+statement ok
+alter table t add column now1 timestamptz default now();
+
+# Epoch (then `now()`) will advance after `FLUSH`.
+statement ok
+flush;
+
+statement ok
+insert into t values (2);
+
+statement ok
+flush;
+
+query I
+select v from t order by now1;
+----
+1
+2
+
+# Add a new column again, causing the table to be replaced again.
+statement ok
+alter table t add column v2 varchar;
+
+# We show that the "snapshot value" of `now1` does not get refreshed upon the above `ALTER TABLE`.
+# Otherwise, the `now1` column of `v=1` would be greater than that of `v=2`.
+query I
+select v from t order by now1;
+----
+1
+2
+
+statement ok
+drop table t;
diff --git a/e2e_test/s3/file_sink.py b/e2e_test/s3/file_sink.py
index 6eca0e2b9194f..1979b7f6606ab 100644
--- a/e2e_test/s3/file_sink.py
+++ b/e2e_test/s3/file_sink.py
@@ -17,6 +17,12 @@
def gen_data(file_num, item_num_per_file):
assert item_num_per_file % 2 == 0, \
f'item_num_per_file should be even to ensure sum(mark) == 0: {item_num_per_file}'
+
+ struct_type = pa.struct([
+ ('field1', pa.int32()),
+ ('field2', pa.string())
+ ])
+
return [
[{
'id': file_id * item_num_per_file + item_id,
@@ -44,6 +50,7 @@ def gen_data(file_num, item_num_per_file):
'test_timestamptz_ms': pa.scalar(datetime.now().timestamp() * 1000, type=pa.timestamp('ms', tz='+00:00')),
'test_timestamptz_us': pa.scalar(datetime.now().timestamp() * 1000000, type=pa.timestamp('us', tz='+00:00')),
'test_timestamptz_ns': pa.scalar(datetime.now().timestamp() * 1000000000, type=pa.timestamp('ns', tz='+00:00')),
+ 'nested_struct': pa.scalar((item_id, f'struct_value_{item_id}'), type=struct_type),
} for item_id in range(item_num_per_file)]
for file_id in range(file_num)
]
@@ -65,7 +72,7 @@ def _table():
print("test table function file scan")
cur.execute(f'''
SELECT
- id,
+ id,
name,
sex,
mark,
@@ -89,7 +96,8 @@ def _table():
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
- test_timestamptz_ns
+ test_timestamptz_ns,
+ nested_struct
FROM file_scan(
'parquet',
's3',
@@ -104,7 +112,6 @@ def _table():
except ValueError as e:
print(f"cur.fetchone() got ValueError: {e}")
-
print("file scan test pass")
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE {_table()}(
@@ -132,8 +139,8 @@ def _table():
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
- test_timestamptz_ns timestamptz
-
+ test_timestamptz_ns timestamptz,
+ nested_struct STRUCT<"field1" int, "field2" varchar>
) WITH (
connector = 's3',
match_pattern = '*.parquet',
@@ -213,7 +220,8 @@ def _table():
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
- test_timestamptz_ns
+ test_timestamptz_ns,
+ nested_struct
from {_table()} WITH (
connector = 's3',
match_pattern = '*.parquet',
@@ -230,7 +238,7 @@ def _table():
print('Sink into s3 in parquet encode...')
# Execute a SELECT statement
cur.execute(f'''CREATE TABLE test_parquet_sink_table(
- id bigint primary key,\
+ id bigint primary key,
name TEXT,
sex bigint,
mark bigint,
@@ -254,7 +262,8 @@ def _table():
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
- test_timestamptz_ns timestamptz
+ test_timestamptz_ns timestamptz,
+ nested_struct STRUCT<"field1" int, "field2" varchar>,
) WITH (
connector = 's3',
match_pattern = 'test_parquet_sink/*.parquet',
@@ -263,8 +272,8 @@ def _table():
s3.credentials.access = 'hummockadmin',
s3.credentials.secret = 'hummockadmin',
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
+ refresh.interval.sec = 1,
) FORMAT PLAIN ENCODE PARQUET;''')
-
total_rows = file_num * item_num_per_file
MAX_RETRIES = 40
for retry_no in range(MAX_RETRIES):
@@ -305,7 +314,8 @@ def _table():
test_timestamptz_s,
test_timestamptz_ms,
test_timestamptz_us,
- test_timestamptz_ns
+ test_timestamptz_ns,
+ nested_struct
from {_table()} WITH (
connector = 'snowflake',
match_pattern = '*.parquet',
@@ -316,7 +326,8 @@ def _table():
s3.endpoint_url = 'http://hummock001.127.0.0.1:9301',
s3.path = 'test_json_sink/',
type = 'append-only',
- force_append_only='true'
+ force_append_only='true',
+ refresh.interval.sec = 1,
) FORMAT PLAIN ENCODE JSON(force_append_only='true');''')
print('Sink into s3 in json encode...')
@@ -346,7 +357,8 @@ def _table():
test_timestamptz_s timestamptz,
test_timestamptz_ms timestamptz,
test_timestamptz_us timestamptz,
- test_timestamptz_ns timestamptz
+ test_timestamptz_ns timestamptz,
+ nested_struct STRUCT<"field1" int, "field2" varchar>
) WITH (
connector = 's3',
match_pattern = 'test_json_sink/*.json',
diff --git a/e2e_test/source_inline/kafka/shared_source.slt.serial b/e2e_test/source_inline/kafka/shared_source.slt.serial
index 42d4cf86725ef..1559962f8b878 100644
--- a/e2e_test/source_inline/kafka/shared_source.slt.serial
+++ b/e2e_test/source_inline/kafka/shared_source.slt.serial
@@ -99,6 +99,10 @@ SET streaming_use_shared_source TO false;
statement ok
create materialized view mv_2 as select * from s0;
+statement ok
+SET streaming_use_shared_source TO true;
+
+
sleep 2s
query ?? rowsort
@@ -370,3 +374,32 @@ drop source s0 cascade;
statement ok
drop source s_before_produce cascade;
+
+# test: scan.startup.mode=latest should not be blocked when there's no data to backfill
+# https://github.com/risingwavelabs/risingwave/issues/20083#issuecomment-2609422824
+statement ok
+create source s_latest (v1 int, v2 varchar) with (
+ ${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
+ topic = 'shared_source',
+ scan.startup.mode = 'latest'
+) FORMAT PLAIN ENCODE JSON;
+
+# Note: batch kafka scan ignores scan.startup.mode
+query ? rowsort
+select count(*) from s_latest;
+----
+55
+
+statement ok
+create materialized view mv_latest as select * from s_latest;
+
+query ? rowsort
+select count(*) from mv_latest;
+----
+0
+
+statement ok
+drop source s_latest cascade;
+
+system ok
+rpk topic delete shared_source;
diff --git a/e2e_test/streaming/bug_fixes/issue_20342.slt b/e2e_test/streaming/bug_fixes/issue_20342.slt
new file mode 100644
index 0000000000000..f36740eb3068a
--- /dev/null
+++ b/e2e_test/streaming/bug_fixes/issue_20342.slt
@@ -0,0 +1,9 @@
+statement ok
+set streaming_use_arrangement_backfill=false;
+
+include ./issue_20342.slt.part
+
+statement ok
+set streaming_use_arrangement_backfill=true;
+
+include ./issue_20342.slt.part
diff --git a/e2e_test/streaming/bug_fixes/issue_20342.slt.part b/e2e_test/streaming/bug_fixes/issue_20342.slt.part
new file mode 100644
index 0000000000000..5c93e0a0434c9
--- /dev/null
+++ b/e2e_test/streaming/bug_fixes/issue_20342.slt.part
@@ -0,0 +1,99 @@
+statement ok
+set streaming_parallelism=1;
+
+# pk: [v1]
+# stream key: [v1]
+statement ok
+create table t(v1 int primary key, v2 int);
+
+# pk: [v2, v1]
+# stream key: [v1]
+statement ok
+create materialized view m1 as select * from t order by v2;
+
+statement ok
+insert into t select x as v1, x as v2 from generate_series(1, 10000) t(x);
+
+statement ok
+flush;
+
+skipif madsim
+statement ok
+set backfill_rate_limit=1;
+
+skipif madsim
+statement ok
+set background_ddl=true;
+
+statement ok
+create materialized view m2 as select count(*) from m1;
+
+skipif madsim
+sleep 2s
+
+statement ok
+update t set v2 = 100000 where v1 = 1;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100001 where v1 = 2;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100002 where v1 = 3;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100003 where v1 = 4;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100004 where v1 = 5;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100005 where v1 = 6;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100006 where v1 = 7;
+
+statement ok
+flush;
+
+statement ok
+update t set v2 = 100007 where v1 = 8;
+
+statement ok
+flush;
+
+statement ok
+set backfill_rate_limit=default;
+
+statement ok
+set background_ddl=false;
+
+statement ok
+set streaming_use_arrangement_backfill=true;
+
+statement ok
+alter materialized view m2 set backfill_rate_limit=default;
+
+skipif madsim
+statement ok
+wait;
+
+statement ok
+drop table t cascade;
\ No newline at end of file
diff --git a/proto/batch_plan.proto b/proto/batch_plan.proto
index 7ffdf94e3c30a..6e07ceae4d5d4 100644
--- a/proto/batch_plan.proto
+++ b/proto/batch_plan.proto
@@ -289,6 +289,7 @@ message HashJoinNode {
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 6;
+ optional plan_common.AsOfJoinDesc asof_desc = 7;
}
message SortMergeJoinNode {
diff --git a/proto/meta.proto b/proto/meta.proto
index 975798de90da9..d8351060d72a5 100644
--- a/proto/meta.proto
+++ b/proto/meta.proto
@@ -471,6 +471,7 @@ message MetaSnapshot {
reserved "parallel_unit_mappings";
GetSessionParamsResponse session_params = 20;
repeated catalog.Secret secrets = 23;
+ uint64 compute_node_total_cpu_count = 24;
repeated common.WorkerNode nodes = 10;
hummock.HummockVersion hummock_version = 12;
backup_service.MetaBackupManifestId meta_backup_manifest_id = 14;
@@ -540,6 +541,7 @@ message SubscribeResponse {
FragmentWorkerSlotMapping streaming_worker_slot_mapping = 27;
FragmentWorkerSlotMappings serving_worker_slot_mappings = 28;
catalog.Secret secret = 29;
+ uint64 compute_node_total_cpu_count = 30;
}
reserved 12;
reserved "parallel_unit_mapping";
diff --git a/proto/plan_common.proto b/proto/plan_common.proto
index b8b4989e78b8a..5fb77a2aa0659 100644
--- a/proto/plan_common.proto
+++ b/proto/plan_common.proto
@@ -21,9 +21,6 @@ enum ColumnDescVersion {
// Introduced in https://github.com/risingwavelabs/risingwave/pull/13707#discussion_r1429947537,
// in case DEFAULT_KEY_COLUMN_NAME changes
COLUMN_DESC_VERSION_PR_13707 = 1;
-
- // for test only
- COLUMN_DESC_VERSION_MAX = 2147483647;
}
message ColumnDesc {
diff --git a/proto/stream_plan.proto b/proto/stream_plan.proto
index 3963e3d03e4c3..9b886e3cef1b3 100644
--- a/proto/stream_plan.proto
+++ b/proto/stream_plan.proto
@@ -356,9 +356,6 @@ enum AggNodeVersion {
// https://github.com/risingwavelabs/risingwave/issues/13465#issuecomment-1821016508
AGG_NODE_VERSION_ISSUE_13465 = 2;
-
- // Used for test only.
- AGG_NODE_VERSION_MAX = 2147483647;
}
message SimpleAggNode {
diff --git a/src/batch/executors/benches/hash_join.rs b/src/batch/executors/benches/hash_join.rs
index 330fc299594d0..6d64461dd1c2d 100644
--- a/src/batch/executors/benches/hash_join.rs
+++ b/src/batch/executors/benches/hash_join.rs
@@ -76,6 +76,7 @@ fn create_hash_join_executor(
"HashJoinExecutor".into(),
CHUNK_SIZE,
None,
+ None,
BatchSpillMetrics::for_test(),
ShutdownToken::empty(),
MemoryContext::none(),
diff --git a/src/batch/executors/src/executor/join/hash_join.rs b/src/batch/executors/src/executor/join/hash_join.rs
index 9c31e945723f5..44518e5155496 100644
--- a/src/batch/executors/src/executor/join/hash_join.rs
+++ b/src/batch/executors/src/executor/join/hash_join.rs
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use std::cmp::Ordering;
use std::iter;
use std::iter::empty;
use std::marker::PhantomData;
@@ -25,8 +26,8 @@ use risingwave_common::bitmap::{Bitmap, BitmapBuilder, FilterByBitmap};
use risingwave_common::catalog::Schema;
use risingwave_common::hash::{HashKey, HashKeyDispatcher, PrecomputedBuildHasher};
use risingwave_common::memory::{MemoryContext, MonitoredGlobalAlloc};
-use risingwave_common::row::{repeat_n, RowExt};
-use risingwave_common::types::{DataType, Datum};
+use risingwave_common::row::{repeat_n, Row, RowExt};
+use risingwave_common::types::{DataType, Datum, DefaultOrd};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common_estimate_size::EstimateSize;
@@ -35,7 +36,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::data::DataChunk as PbDataChunk;
use risingwave_pb::Message;
-use super::{ChunkedData, JoinType, RowId};
+use super::{AsOfDesc, AsOfInequalityType, ChunkedData, JoinType, RowId};
use crate::error::{BatchError, Result};
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
@@ -83,6 +84,8 @@ pub struct HashJoinExecutor {
null_matched: Vec,
identity: String,
chunk_size: usize,
+ /// Whether the join is an as-of join
+ asof_desc: Option,
spill_backend: Option,
spill_metrics: Arc,
@@ -179,6 +182,7 @@ pub struct EquiJoinParams {
next_build_row_with_same_key: ChunkedData