Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected write_disposition behavior when moving data from S3 to Databricks #2252

Open
krudeen-spectra opened this issue Jan 30, 2025 · 0 comments

Comments

@krudeen-spectra
Copy link

krudeen-spectra commented Jan 30, 2025

dlt version

1.5.0

Describe the problem

When write_disposition is set as a hint on the source like below it is ignored (defaults to append),

s3_orders = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/orders",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"), write_disposition="replace")

and when write_disposition is set at the pipeline level, it is used but incremental loading is ignored.

    pipeline = dlt.pipeline(
        import_schema_path="schemas/import",
        export_schema_path="schemas/export",
        pipeline_name="s3_to_databricks",
        destination="databricks",
        staging=dlt.destinations.filesystem("s3://oc-test-data-load-tool-dbx/oc_data_staging"),
        dataset_name="oc_data_bronze",
        dev_mode=False,
        progress="tqdm"
    )
    orders_load_info = pipeline.run((s3_orders | read_jsonl()).with_name('orders'), write_disposition="replace")
    logging.info(orders_load_info)

Expected behavior

I expect the script to do a destructive replace when write_disposition="replace" is set at the source level, and I expect DLT to correctly use incremental loading even when the write_disposition is set at the pipeline level. (I do not expect behavior to be different based on whether write_disposition is set at the pipeline level or the source level)

Steps to reproduce

Here is my full code with the write_disposition set as part of a hint: (Note that I am not running it as a lambda function, I am running on my local machine)

from dlt.sources.filesystem import filesystem, read_jsonl
import logging
from datetime import datetime

def lambda_handler(event={}, context=None):
    s3_orders = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/orders",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"), write_disposition="append")

    s3_order_line_items = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/order_line_items",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"), write_disposition="append")

    s3_promotions = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/promotions",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"), write_disposition="replace")

    s3_products = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/products",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"), write_disposition="replace")

    s3_customers = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/customers",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"), write_disposition="replace")

    pipeline = dlt.pipeline(
        import_schema_path="schemas/import",
        export_schema_path="schemas/export",
        pipeline_name="s3_to_databricks",
        destination="databricks",
        staging=dlt.destinations.filesystem("s3://oc-test-data-load-tool-dbx/oc_data_staging"),
        dataset_name="oc_data_bronze",
        dev_mode=False,
        progress="tqdm"
    )

    promotions_load_info = pipeline.run((s3_promotions | read_jsonl()).with_name('promotions'))
    logging.info(promotions_load_info)
    products_load_info = pipeline.run((s3_products | read_jsonl()).with_name('products'))
    logging.info(products_load_info)
    orders_load_info = pipeline.run((s3_orders | read_jsonl()).with_name('orders'))
    logging.info(orders_load_info)
    order_line_items_load_info = pipeline.run((s3_order_line_items | read_jsonl()).with_name('order_line_items'))
    logging.info(order_line_items_load_info)
    customers_load_info = pipeline.run((s3_customers | read_jsonl()).with_name('customers'))
    logging.info(customers_load_info)

if __name__=="__main__":
    lambda_handler()

The easiest way to replicate this would be to put a simple test json file into a bucket in s3, run the pipeline, then put an additional file into the s3 bucket, and run the pipeline again (write_disposition should be set to replace). You will see two _dlt_load_id's in the data:

select _dlt_load_id, count(*) FROM oc_data_bronze.promotions
GROUP BY _dlt_load_id;

To see the other error, you'll need to move the write_disposition to the pipeline level as shown below (I have a couple with write_disposition set to append but those can be ignored):

import dlt
from dlt.sources.filesystem import filesystem, read_jsonl
import logging
from datetime import datetime

def lambda_handler(event={}, context=None):
    s3_orders = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/orders",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"))

    s3_order_line_items = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/order_line_items",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"))

    s3_promotions = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/promotions",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"))

    s3_products = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/products",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"))

    s3_customers = filesystem(
      bucket_url="s3://oc-test-data-load-tool-dbx/oc_data_raw/customers",
      file_glob="*.jsonl"
    ).apply_hints(incremental=dlt.sources.incremental("modification_date"))

    pipeline = dlt.pipeline(
        import_schema_path="schemas/import",
        export_schema_path="schemas/export",
        pipeline_name="s3_to_databricks",
        destination="databricks",
        staging=dlt.destinations.filesystem("s3://oc-test-data-load-tool-dbx/oc_data_staging"),
        dataset_name="oc_data_bronze",
        dev_mode=False,
        progress="tqdm"
    )

    promotions_load_info = pipeline.run((s3_promotions | read_jsonl()).with_name('promotions'), write_disposition="replace")
    logging.info(promotions_load_info)
    products_load_info = pipeline.run((s3_products | read_jsonl()).with_name('products'), write_disposition="replace")
    logging.info(products_load_info)
    orders_load_info = pipeline.run((s3_orders | read_jsonl()).with_name('orders'), write_disposition="append")
    logging.info(orders_load_info)
    order_line_items_load_info = pipeline.run((s3_order_line_items | read_jsonl()).with_name('order_line_items'), write_disposition="append")
    logging.info(order_line_items_load_info)
    customers_load_info = pipeline.run((s3_customers | read_jsonl()).with_name('customers'), write_disposition="replace")
    logging.info(customers_load_info)

if __name__=="__main__":
    lambda_handler()

Run the pipeline with one file in the source S3 bucket, then add another and run again. You will see that the target table in Databricks has data from both S3 files, but only one single _dlt_load_id, implying that a destructive replace was performed, but both files were read when only the latest one should have been. You can also further confirm this in the logs for the second run by noting that start_value is set to None (it should be the date of your first load).

{"written_at":"2025-01-30T17:01:21.635Z","written_ts":1738256481635656000,"component_name":"s3_to_databricks","process":12821,"msg":"Bind incremental on filesystem_products with initial_value: None, start_value: None, end_value: None","type":"log","logger":"dlt","thread":"MainThread","level":"INFO","module":"init","line_no":483,"version":{"dlt_version":"1.5.0","pipeline_name":"s3_to_databricks"}}

Operating system

macOS

Runtime environment

Local

Python version

3.11

dlt data source

S3 bucket

dlt destination

No response

Other deployment details

Destination is Databricks

Additional information

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Todo
Development

No branches or pull requests

1 participant