Skip to content

Commit

Permalink
speed test
Browse files Browse the repository at this point in the history
  • Loading branch information
Boris Tyshkevich committed May 15, 2024
1 parent ac8c149 commit 07dbb6c
Showing 1 changed file with 173 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ description: VersionedCollapsingMergeTree

When you have an incoming event stream with duplicates, updates, and deletes, building a consistent row state inside the Clickhouse table is a big challenge.

The UPDATE/DELETE approach in the OLTP world won’t help with OLAP databases tuned to handle big batches. UPDATE/DELETE operations in Clickhouse are executed as “mutations,” rewriting a lot of data and being relatively slow. You can’t run such operations very often, as for OLTP databases. But the UPSERT operation (insert and replace) runs quickly with the ReplacingMergeTree Engine. It’s even set as the default mode for INSERT without any special keyword. We can emulate UPDATE (or even DELETE) with the UPSERT operation.
The UPDATE/DELETE approach in the OLTP world won’t help with OLAP databases tuned to handle big batches. UPDATE/DELETE operations in Clickhouse are executed as “mutations,” rewriting a lot of data and being relatively slow. You can’t run such operations very often, as for OLTP databases. But the UPSERT operation (insert and replace) runs fast with the ReplacingMergeTree Engine. It’s even set as the default mode for INSERT without any special keyword. We can emulate UPDATE (or even DELETE) with the UPSERT operation.

There are a lot of [blog posts](https://altinity.com/blog/clickhouse-replacingmergetree-explained-the-good-the-bad-and-the-ugly) on how to use ReplacingMergeTree to handle mutated data streams. But there are several problems:
There are a lot of [blog posts](https://altinity.com/blog/clickhouse-replacingmergetree-explained-the-good-the-bad-and-the-ugly) on how to use ReplacingMergeTree Engine to handle mutated data streams. A properly designed table schema with ReplacingMergeTree Engine is a good instrument for building the DWH Dimensions table. But when maintaining metrics in Fact tables, there are several problems:

- you can’t use another important Clickhouse feature - online aggregation of incoming data by Materialized Views or Projections on top of the ReplacingMT table, because duplicates and updates will not be deduplicated by the engine during inserts, and calculated aggregates (like sum or count) will be incorrect. For significant amounts of data, it’s become critical because aggregating raw data during report queries will take too much time.
- it’s not possible to use a valuable Clickhouse feature - online aggregation of incoming data by Materialized Views or Projections on top of the ReplacingMT table, because duplicates and updates will not be deduplicated by the engine during inserts, and calculated aggregates (like sum or count) will be incorrect. For significant amounts of data, it’s become critical because aggregating raw data during report queries will take too much time.
- unfinished support for DELETEs. While in the newest versions of Clickhouse, it’s possible to add the is_deleted to ReplacingMergeTree parameters, the necessity of manually filtering out deleted rows after FINAL processing makes that feature less useful.
- Mutated data should be localized to the same partition. If the “replacing” row is saved to a partition different from the previous one, the report query will be much slower or produce unexpected results.

```sql
-- multiple partitions problem
CREATE TABLE RMT
(
`key` Int64,
Expand Down Expand Up @@ -49,13 +50,11 @@ When dealing with such complicated data streams, it needs to be solved 3 tasks s
- process updates and deletes
- calculate correct aggregates

The collapsing algorithm of VersionedCollapsingMergeTree as it is described in the [documentation](https://clickhouse.com/docs/en/operations/settings/settings#max-insert-threads) :
It’s essential to understand how the collapsing algorithm of VersionedCollapsingMergeTree works. Quote from the [documentation](https://clickhouse.com/docs/en/operations/settings/settings#max-insert-threads) :

> When ClickHouse merges data parts, it deletes each pair of rows that have the same primary key and version and different Sign. The order of rows does not matter.
>
It’s quite essential to understand how it works.

The version column should increase over time. You may use a natural timestamp for that. Random-generated IDs are not suitable for the version column.

### Replace data in another partition
Expand Down Expand Up @@ -92,7 +91,9 @@ With VersionedCollapsingMergeTree, we can use more partition strategies, even wi

There are several ways to remove duplicates from the event stream. The most effective feature is block deduplication, which occurs when Clickhouse drops incoming blocks with the same checksum (or tag). However, this requires building a smart ingestor capable of saving positions in a transactional manner.

However, another method is possible, which is verifying whether a particular row already exists in the destination table to avoid redundant insertions. However, ensuring accuracy and consistency in results requires executing this process on a single thread within one cluster node. This method is particularly suitable for less active event streams, such as those with up to 100,000 events per second. To boost performance, incoming streams should be segmented into several partitions (or 'shards') based on the table/event's Primary Key, with each partition processed on a single thread.
However, another method is possible: verifying whether a particular row already exists in the destination table to avoid redundant insertions. Together with block deduplication, that method also avoids using ReplacingMergeTree and FINAL during query time.

Ensuring accuracy and consistency in results requires executing this process on a single thread within one cluster node. This method is particularly suitable for less active event streams, such as those with up to 100,000 events per second. To boost performance, incoming streams should be segmented into several partitions (or 'shards') based on the table/event's Primary Key, with each partition processed on a single thread.

An example of row deduplication:

Expand Down Expand Up @@ -164,7 +165,7 @@ ORDER BY id
create table Stage engine=Null as Example3 ;

create materialized view Example3Transform to Example3 as
with __new as ( SELECT * FROM Stage order by _version,sign desc desc limit 1 by id ),
with __new as ( SELECT * FROM Stage order by _version desc, sign desc limit 1 by id ),
__old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
( select * FROM Example3 final
PREWHERE id IN (SELECT id FROM __new)
Expand Down Expand Up @@ -208,6 +209,152 @@ Important additions:
- filter to skip out-of-order events by checking the version
- DELETE event processing (inside last WHERE)

### Speed Test

```sql
set allow_experimental_analyzer=0;
create table Example3
(
id Int32,
Department String,
metric1 UInt32,
metric2 Float32,
_version UInt64,
sign Int8 default 1
) engine = VersionedCollapsingMergeTree(sign, _version)
ORDER BY id
partition by (id % 20)
settings index_granularity=4096
;

set do_not_merge_across_partitions_select_final=1;

-- make 100M table
INSERT INTO Example3
SELECT
number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2,
0 AS _version,
1 AS sign
FROM numbers(1E8);

create function timeSpent as () ->
date_diff('millisecond',(select ts from t1),now64(3));

-- measure plain INSERT time for 1M batch
create temporary table t1 (ts DateTime64(3)) as select now64(3);
INSERT INTO Example3
SELECT
number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2,
1 AS _version,
1 AS sign
FROM numbers(1E6);
select '---',timeSpent(),'INSERT';

--create table Stage engine=MergeTree order by id as Example3 ;
create table Stage engine=Null as Example3 ;

create materialized view Example3Transform to Example3 as
with __new as ( SELECT * FROM Stage order by _version desc,sign desc limit 1 by id ),
__old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
( select * FROM Example3 final
PREWHERE id IN (SELECT id FROM __new)
where sign = 1
)
)
select id,
if(__old._sign = -1, __old.Department, __new.Department) AS
Department,
if(__old._sign = -1, __old.metric1, __new.metric1) AS metric1,
if(__old._sign = -1, __old.metric2, __new.metric2) AS metric2,
if(__old._sign = -1, __old._version, __new._version) AS _version,
if(__old._sign = -1, -1, 1) AS sign
from __new left join __old using id
where if(__new.sign=-1,
__old._sign = -1, -- insert only delete row if it's found in old data
__new._version > __old._version -- skip duplicates for updates
);

-- calculate UPSERT time for 1M batch
drop table t1;
create temporary table t1 (ts DateTime64(3)) as select now64(3);
INSERT INTO Stage
SELECT
(rand() % 1E6)*100 AS id,
--number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2,
2 AS _version,
1 AS sign
FROM numbers(1E6);

select '---',timeSpent(),'UPSERT';

-- FINAL query
drop table t1;
create temporary table t1 (ts DateTime64(3)) as select now64(3);
select Department, count(), sum(metric1) from Example3 FINAL
group by Department order by Department
format Null
;
select '---',timeSpent(),'FINAL';

-- GROUP BY query
drop table t1;
create temporary table t1 (ts DateTime64(3)) as select now64(3);
select Department, sum(sign), sum(sign*metric1) from Example3
group by Department order by Department
format Null
;
select '---',timeSpent(),'GROUP BY';

optimize table Example3 final;
-- FINAL query
drop table t1;
create temporary table t1 (ts DateTime64(3)) as select now64(3);
select Department, count(), sum(metric1) from Example3 FINAL
group by Department order by Department
format Null
;
select '---',timeSpent(),'FINAL OPTIMIZED';

-- GROUP BY query
drop table t1;
create temporary table t1 (ts DateTime64(3)) as select now64(3);
select Department, sum(sign), sum(sign*metric1) from Example3
group by Department order by Department
format Null
;
select '---',timeSpent(),'GROUP BY OPTIMIZED';
```

You can use fiddle or clickhouse-local to run such a test:

```bash
cat test.sql | clickhouse-local -nm
```

Results:

```sql
--- 252 INSERT
--- 1710 UPSERT
--- 763 FINAL
--- 311 GROUP BY
--- 314 FINAL OPTIMIZED
--- 295 GROUP BY OPTIMIZED
```

UPSERT is six times slower than direct INSERT because it requires looking up the destination table. That is the price. It is better to use idempotent inserts with an exactly-once delivery guarantee. However, it’s not always possible.

The FINAL speed is quite good, especially if we split the table by 20 partitions, use `do_not_merge_across_partitions_select_final` setting, and keep most of the table’s partitions optimized (1 part per partition).

### Adding projections

Let's add an aggregating projection with a more useful `updated_at` timestamp instead of an abstract `_version`.
Expand Down Expand Up @@ -269,7 +416,9 @@ select 'step3',* from Example4 final;
select 'proj3',dim1, sum(Smetric1) from Example4 group by dim1;
```

### DELETEs processing
Keep in mind that building projections requires more resources. Insert time will be longer.

### DELETEs inaccuracy

The typical CDC event for DWH systems besides INSERT is UPSERT—a new row replaces the old one (with suitable aggregate corrections). But DELETE events are also supported (ones with column sign=-1). The Materialized View described above will correctly process the DELETE event by inserting only 1 row with sign=-1 if a row with a particular ID already exists in the table. In such cases, VersionedCollapsingMergeTree will wipe both rows (with sign=1 & -1) during merge or final operations.

Expand Down Expand Up @@ -334,7 +483,7 @@ select 'step2',* from Example5 final ;

### Complex Primary Key

In previous examples, I used a simple, compact column with Int64 type for the primary key. When possible, it's better to go this route. [SnowFlakeId](https://www.notion.so/4a5c621b1e224c96b44210da5ce9c601?pvs=21) is the best option and can be easily created during INSERT from DateTime and the hash of one or several important columns. However, sometimes a more complex primary key is needed, for instance, when storing data for multiple tenants (Customers, partners, etc.) in the same table. This is not a problem for the suggested technique - use all the necessary columns in all filters and JOIN operations.
I used a simple, compact column with Int64 type for the primary key in previous examples. It's better to go this route with monotonically growing IDs like autoincrement ID or [SnowFlakeId](https://www.notion.so/4a5c621b1e224c96b44210da5ce9c601?pvs=21) (based on timestamp). However, in some cases, a more complex primary key is needed. For instance, when storing data for multiple tenants (Customers, partners, etc.) in the same table. This is not a problem for the suggested technique - use all the necessary columns in all filters and JOIN operations as Tuple.

```sql
create table Example6
Expand Down Expand Up @@ -375,17 +524,22 @@ The suggested approach works well when inserting data in a single thread on a si

But inserting different shards with a sharding key derived from ID works fine. Every shard will operate with its own non-intersecting set of IDs, and don’t interfere with each other.

The same approach can be implemented when inserting several threads into the same replica node. We need to split the incoming stream into several ones by sharding the keys derived from ID. The ingesting app should do some sort of shard calculation like `cityHash64(id) % 2 = 0` before sending data to internal buffers that will be flushed to INSERTs.

For big installations with high traffic and many shards and replicas, the ingesting app should split the data stream into a considerably large number of “virtual shards” (or partitions in Kafka terminology) and then map the “virtual shards” to the threads doing inserts to “physical shards.”
The same approach can be implemented when inserting several threads into the same replica node. For big installations with high traffic and many shards and replicas, the ingesting app can split the data stream into a considerably large number of “virtual shards” (or partitions in Kafka terminology) and then map the “virtual shards” to the threads doing inserts to “physical shards.”

We could even build such a pipeline with Clickhouse building blocks:
The incoming stream could be split into several ones by using an expression like `cityHash64(id) % 50 = 0` as a sharding key. The ingesting app should calculate the shard number before sending data to internal buffers that will be flushed to INSERTs.

```sql
Kafka ->
Kafka Engine -> MV ->
Distributed -> Null Table ->
Deduplicate MV -> Destination Table
-- emulate insert into distributed table
INSERT INTO function remote('localhos{t,t,t}',default,Stage,id)
SELECT
(rand() % 1E6)*100 AS id,
--number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2,
2 AS _version,
1 AS sign
FROM numbers(1000)
settings prefer_localhost_replica=0;
```

You need to set `prefer_localhost_replica=0` so that even for a local replica, inserts go through the distributed queue, and the distributed table must rely on an `all-replicated` cluster definition. Be careful, if one replica fails, the data starts buffering in the distributed table store. The specially designed ingesting app is a better approach.

0 comments on commit 07dbb6c

Please sign in to comment.