Skip to content

Commit

Permalink
projection test
Browse files Browse the repository at this point in the history
  • Loading branch information
Boris Tyshkevich committed May 15, 2024
1 parent 2f4aa8d commit 1b87a37
Showing 1 changed file with 168 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ You can use fiddle or clickhouse-local to run such a test:
cat test.sql | clickhouse-local -nm
```

Results:
Results (Mac A2 Pro), milliseconds:

```sql
--- 252 INSERT
Expand All @@ -353,70 +353,194 @@ Results:

UPSERT is six times slower than direct INSERT because it requires looking up the destination table. That is the price. It is better to use idempotent inserts with an exactly-once delivery guarantee. However, it’s not always possible.

The FINAL speed is quite good, especially if we split the table by 20 partitions, use `do_not_merge_across_partitions_select_final` setting, and keep most of the table’s partitions optimized (1 part per partition).
The FINAL speed is quite good, especially if we split the table by 20 partitions, use `do_not_merge_across_partitions_select_final` setting, and keep most of the table’s partitions optimized (1 part per partition). But we can do it better.

### Adding projections

Let's add an aggregating projection with a more useful `updated_at` timestamp instead of an abstract `_version`.
Let's add an aggregating projection, and also add a more useful `updated_at` timestamp instead of an abstract `_version` and replace `String` for Department dimension by LowCardinality(String). Let’s look at the difference in time execution.

https://fiddle.clickhouse.com/3140d341-ccc5-4f57-8fbf-55dbf4883a21

```sql
create table Example4
set allow_experimental_analyzer=0;
create table Example4
(
id Int32,
metric1 UInt32,
Smetric1 alias metric1*sign,
metric2 UInt32,
dim1 LowCardinality(String),
updated_at DateTime64(3) default now64(3),
sign Int8 default 1,
-- incoming event stream is deduplicated so I can do stream aggregation
PROJECTION byDim1 (
select dim1, sum(metric1*sign) group by dim1
)
) engine = VersionedCollapsingMergeTree(sign, updated_at)
ORDER BY id
id Int32,
Department LowCardinality(String),
metric1 Int32,
metric2 Float32,
_version DateTime64(3) default now64(3),
sign Int8 default 1
) engine = VersionedCollapsingMergeTree(sign, _version)
ORDER BY id
partition by (id % 20)
settings index_granularity=4096
;

set do_not_merge_across_partitions_select_final=1;

-- make 100M table
INSERT INTO Example4
SELECT
number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2,
0 AS _version,
1 AS sign
FROM numbers(1E8);

create temporary table timeMark (ts DateTime64(3));
create function timeSpent as () ->
date_diff('millisecond',(select max(ts) from timeMark),now64(3));

-- measure plain INSERT time for 1M batch
insert into timeMark select now64(3);
INSERT INTO Example4(id,Department,metric1,metric2)
SELECT
number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2
FROM numbers(1E6);
select '---',timeSpent(),'INSERT';

--create table Stage engine=MergeTree order by id as Example4 ;
create table Stage engine=Null as Example4 ;

create materialized view Example4Transform to Example4 as
with __new as ( SELECT * FROM Stage order by sign desc, updated_at desc limit 1 by id ),
with __new as ( SELECT * FROM Stage order by _version desc,sign desc limit 1 by id ),
__old AS ( SELECT *, arrayJoin([-1,1]) AS _sign from
( select * FROM Example4 final
PREWHERE id IN (SELECT id FROM __new)
where sign = 1
)
)
( select * FROM Example4 final
PREWHERE id IN (SELECT id FROM __new)
where sign = 1
)
)
select id,
if(__old._sign = -1, __old.metric1, __new.metric1) AS metric1,
if(__old._sign = -1, __old.metric2, __new.metric2) AS metric2,
if(__old._sign = -1, __old.dim1, __new.dim1) AS dim1,
if(__old._sign = -1, __old.updated_at, __new.updated_at) AS updated_at,
if(__old._sign = -1, -1, 1) AS sign
if(__old._sign = -1, __old.Department, __new.Department) AS
Department,
if(__old._sign = -1, __old.metric1, __new.metric1) AS metric1,
if(__old._sign = -1, __old.metric2, __new.metric2) AS metric2,
if(__old._sign = -1, __old._version, __new._version) AS _version,
if(__old._sign = -1, -1, 1) AS sign
from __new left join __old using id
where if(__new.sign=-1,
__old._sign = -1, -- insert only delete row if it's found in old data
__new.updated_at > __old.updated_at -- skip duplicates for updates
);
__old._sign = -1, -- insert only delete row if it's found in old data
__new._version > __old._version -- skip duplicates for updates
);

-- original
insert into Stage(id,metric1,metric2,dim1) values (1,1,1,'d'), (2,2,2,'d');
select 'step1',* from Example4 ;
select 'proj1',dim1, sum(Smetric1) from Example4 group by dim1;
-- calculate UPSERT time for 1M batch
insert into timeMark select now64(3);
INSERT INTO Stage(id,Department,metric1,metric2)
SELECT
(rand() % 1E6)*100 AS id,
--number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2
FROM numbers(1E6);

-- delete a row with id=2
insert into Stage(id,metric1,metric2,sign) values (2,2,2,-1);
select 'step2',* from Example4 final;
select 'proj2',dim1, sum(Smetric1) from Example4 group by dim1;
select '---',timeSpent(),'UPSERT';

-- FINAL query
insert into timeMark select now64(3);
select Department, count(), sum(metric1) from Example4 FINAL
group by Department order by Department
format Null
;
select '---',timeSpent(),'FINAL';

-- GROUP BY query
insert into timeMark select now64(3);
select Department, sum(sign), sum(sign*metric1) from Example4
group by Department order by Department
format Null
;
select '---',timeSpent(),'GROUP BY';

--select '--parts1',partition, count() from system.parts where active and table='Example4' group by partition;

insert into timeMark select now64(3);
optimize table Example4 final;
select '---',timeSpent(),'OPTIMIZE';

-- FINAL OPTIMIZED
insert into timeMark select now64(3);
select Department, count(), sum(metric1) from Example4 FINAL
group by Department order by Department
format Null
;
select '---',timeSpent(),'FINAL OPTIMIZED';

-- GROUP BY OPTIMIZED
insert into timeMark select now64(3);
select Department, sum(sign), sum(sign*metric1) from Example4
group by Department order by Department
format Null
;
select '---',timeSpent(),'GROUP BY OPTIMIZED';

-- UPSERT a little data to create more parts
INSERT INTO Stage(id,Department,metric1,metric2)
SELECT
number AS id,
['HR', 'Finance', 'Engineering', 'Sales', 'Marketing'][rand() % 5 + 1] AS Department,
rand() % 1000 AS metric1,
(rand() % 10000) / 100.0 AS metric2
FROM numbers(1000);

--select '--parts2',partition, count() from system.parts where active and table='Example4' group by partition;

-- GROUP BY SEMI-OPTIMIZED
insert into timeMark select now64(3);
select Department, sum(sign), sum(sign*metric1) from Example4
group by Department order by Department
format Null
;
select '---',timeSpent(),'GROUP BY SEMI-OPTIMIZED';

--alter table Example4 add column Smetric1 Int32 alias metric1*sign;
alter table Example4 add projection byDep (select Department, sum(sign), sum(sign*metric1) group by Department);

-- Materialize Projection
insert into timeMark select now64(3);
alter table Example4 materialize projection byDep settings mutations_sync=1;
select '---',timeSpent(),'Materialize Projection';

-- GROUP BY query Projected
insert into timeMark select now64(3);
select Department, sum(sign), sum(sign*metric1) from Example4
group by Department order by Department
settings force_optimize_projection=1
format Null
;
select '---',timeSpent(),'GROUP BY Projected';

-- replace a row with id=1. row with sign=-1 not needed, but can be in the insert blocks (will be skipped)
insert into Stage(id,metric1,metric2,dim1,sign) values (1,1,1,'',-1),(1,3,3,'d',1);
select 'step3',* from Example4 final;
select 'proj3',dim1, sum(Smetric1) from Example4 group by dim1;
```

Keep in mind that building projections requires more resources. Insert time will be longer.
Results (Mac A2 Pro), milliseconds:

```sql
--- 175 INSERT
--- 1613 UPSERT
--- 329 FINAL
--- 102 GROUP BY
--- 10498 OPTIMIZE
--- 103 FINAL OPTIMIZED
--- 90 GROUP BY OPTIMIZED
--- 94 GROUP BY SEMI-OPTIMIZED
--- 919 Materialize Projection
--- 5 GROUP BY Projected
```

Some thoughts:

- INSERT, UPSERT, and SELECT benefit from switching the Department column to LowCardinality. Fewer reads - faster queries.
- OPTIMIZE is VERY expensive
- FINAL is quite fast (especially for the OPTIMIZED table). You don’t need to OPTIMIZE the table till the 1 part for partition to remove FINAL from the query. Not having too many parts already gives you a performance boost.
- GROUP BY for that task is still faster
- projections building requires resources. Inserts to the table with Projections will be longer. Tune the insert timeouts.
- Query over projection is very fast (as it should be). However, it’s not always possible to aggregate data in such a simple way.

### DELETEs inaccuracy

Expand Down Expand Up @@ -542,4 +666,3 @@ SELECT
FROM numbers(1000)
settings prefer_localhost_replica=0;
```

0 comments on commit 1b87a37

Please sign in to comment.