Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ele 3771 have a single insert rows method that supports both limits #765

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/test-all-warehouses.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ jobs:
matrix:
dbt-version:
${{ inputs.dbt-version && fromJSON(format('["{0}"]', inputs.dbt-version)) ||
! contains(github.event_name, 'pull_request') && fromJSON('["1.3.0", "latest_pre"]') ||
fromJSON('["latest_pre"]') }}
! contains(github.event_name, 'pull_request') && fromJSON('["1.3.0", "latest_official"]') ||
fromJSON('["latest_official"]') }}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some pre versions are broken again, and I want tests to work.
I added a latest_pre run only on postgres so if there are cross-db things that break we'll notice, wdyt about this middle ground?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't query_max_size different for different warehousing solutions?

Copy link
Collaborator Author

@haritamar haritamar Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Niteeshkanungo - my comment above is specifically about testing on dbt pre-releases.
After the change above we will still run on all warehouses with the latest official dbt version, but test the latest dbt pre-release (right now 1.9.0b3) only on postgres.

warehouse-type:
[
postgres,
Expand All @@ -54,6 +54,8 @@ jobs:
# If we're not running on a specific dbt version, then always add postgres on 1.3.0
- dbt-version: "${{ inputs.dbt-version || '1.3.0' }}"
warehouse-type: postgres
- dbt-version: "${{ inputs.dbt-version || 'latest_pre' }}"
warehouse-type: postgres
exclude:
- dbt-version: "1.3.0"
warehouse-type: athena
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/tests/test_dbt_artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def test_timings(dbt_project: DbtProject):
dbt_project.dbt_runner.vars["disable_dbt_artifacts_autoupload"] = False
dbt_project.dbt_runner.vars["disable_run_results"] = False
dbt_project.dbt_runner.run(select=TEST_MODEL)
results = dbt_project.run_query('select * from {{ ref("dbt_run_results") }}')
results = dbt_project.run_query(
"""select * from {{ ref("dbt_run_results") }} where name='%s'""" % TEST_MODEL
)

assert len(results) == 1
assert results[0]["execute_started_at"]
1 change: 0 additions & 1 deletion macros/edr/system/system_utils/get_config_var.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
'long_string_size': 65535,
'collect_model_sql': true,
'query_max_size': 1000000,
'insert_rows_method': 'max_query_size',
'upload_artifacts_method': 'diff',
'project_name': none,
'elementary_full_refresh': false,
Expand Down
23 changes: 0 additions & 23 deletions macros/utils/list_utils/split_list_to_chunks.sql

This file was deleted.

57 changes: 14 additions & 43 deletions macros/utils/table_operations/insert_rows.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,15 @@
{% endif %}

{{ elementary.file_log('Inserting {} rows to table {}'.format(rows | length, table_relation)) }}
{% set insert_rows_method = elementary.get_config_var('insert_rows_method') %}
{% if insert_rows_method == 'max_query_size' %}
{% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows, on_query_exceed=on_query_exceed) %}
{% set queries_len = insert_rows_queries | length %}
{% for insert_query in insert_rows_queries %}
{% do elementary.file_log("[{}/{}] Running insert query.".format(loop.index, queries_len)) %}
{% do elementary.begin_duration_measure_context('run_insert_rows_query') %}
{% do elementary.run_query(insert_query) %}
{% do elementary.end_duration_measure_context('run_insert_rows_query') %}
{% endfor %}
{% elif insert_rows_method == 'chunk' %}
{% set rows_chunks = elementary.split_list_to_chunks(rows, chunk_size) %}
{% for rows_chunk in rows_chunks %}
{% do elementary.begin_duration_measure_context('get_chunk_insert_query') %}
{% set insert_rows_query = elementary.get_chunk_insert_query(table_relation, columns, rows_chunk) %}
{% do elementary.end_duration_measure_context('get_chunk_insert_query') %}

{% do elementary.begin_duration_measure_context('run_chunk_insert_query') %}
{% do elementary.run_query(insert_rows_query) %}
{% do elementary.end_duration_measure_context('run_chunk_insert_query') %}
{% endfor %}
{% else %}
{% do exceptions.raise_compiler_error("Specified invalid value for 'insert_rows_method' var.") %}
{% endif %}

{% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows, chunk_size=chunk_size, on_query_exceed=on_query_exceed) %}
{% set queries_len = insert_rows_queries | length %}
{% for insert_query in insert_rows_queries %}
{% do elementary.file_log("[{}/{}] Running insert query.".format(loop.index, queries_len)) %}
{% do elementary.begin_duration_measure_context('run_insert_rows_query') %}
{% do elementary.run_query(insert_query) %}
{% do elementary.end_duration_measure_context('run_insert_rows_query') %}
{% endfor %}

{% if should_commit %}
{% do elementary.begin_duration_measure_context('commit') %}
Expand All @@ -65,7 +50,7 @@
{{ return(elementary.default__insert_rows(table_relation, rows, false, chunk_size, on_query_exceed)) }}
{% endmacro %}

{% macro get_insert_rows_queries(table_relation, columns, rows, query_max_size=none, on_query_exceed=none) -%}
{% macro get_insert_rows_queries(table_relation, columns, rows, query_max_size=none, chunk_size=5000, on_query_exceed=none) -%}
{% do elementary.begin_duration_measure_context('get_insert_rows_queries') %}

{% if not query_max_size %}
Expand All @@ -83,11 +68,12 @@
{% do elementary.end_duration_measure_context('base_query_calc') %}

{% set current_query = namespace(data=base_insert_query) %}
{% set current_chunk_size = namespace(data=0) %}
{% for row in rows %}
{% set row_sql = elementary.render_row_to_sql(row, columns) %}
{% set query_with_row = current_query.data + ("," if not loop.first else "") + row_sql %}

{% if query_with_row | length > query_max_size %}
{% if query_with_row | length > query_max_size or current_chunk_size.data >= chunk_size %}
{% set new_insert_query = base_insert_query + row_sql %}

{# Check if row is too large to fit into an insert query. #}
Expand All @@ -111,9 +97,11 @@
{% do insert_queries.append(current_query.data) %}
{% endif %}
{% set current_query.data = new_insert_query %}
{% set current_chunk_size.data = 1 %}

{% else %}
{% set current_query.data = query_with_row %}
{% set current_chunk_size.data = current_chunk_size.data + 1 %}
{% endif %}
{% if loop.last %}
{% do insert_queries.append(current_query.data) %}
Expand Down Expand Up @@ -145,23 +133,6 @@
{% do return(row_sql) %}
{% endmacro %}

{% macro get_chunk_insert_query(table_relation, columns, rows) -%}
{% set insert_rows_query %}
insert into {{ table_relation }}
({%- for column in columns -%}
{{- column.name -}} {{- "," if not loop.last else "" -}}
{%- endfor -%}) values
{% for row in rows -%}
({%- for column in columns -%}
{%- set column_value = elementary.insensitive_get_dict_value(row, column.name, none) -%}
{{ elementary.render_value(column_value) }}
{{- "," if not loop.last else "" -}}
{%- endfor -%}) {{- "," if not loop.last else "" -}}
{%- endfor -%}
{% endset %}
{{ return(insert_rows_query) }}
{%- endmacro %}

{% macro escape_special_chars(string_value) %}
{{ return(adapter.dispatch('escape_special_chars', 'elementary')(string_value)) }}
{% endmacro %}
Expand Down
Loading