Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I get the error 100072 (22000): NULL result in a non-nullable column when I use my own custom naming convention #2220

Open
burnbay opened this issue Jan 15, 2025 · 3 comments
Assignees
Labels
bug Something isn't working question Further information is requested support This issue is monitored by Solution Engineer

Comments

@burnbay
Copy link

burnbay commented Jan 15, 2025

dlt version

1.5.0

Describe the problem

I'm trying to load a SQL Server database table to Snowflake. The table has a column with the Norwegian character "ø". I'd like to rename that column to "oe" using a custom naming convention. The column name is AMLSISTUTFØRTDATO

If I use dlt's sql_ci_v1 naming convention, the pipeline runs as expected, but the column is named AMLSISTUTF_RTDATO. So I created this naming convention:

from dlt.common.normalizers.naming.sql_cs_v1 import NamingConvention as SqlCsV1NamingConvention

class NamingConvention(SqlCsV1NamingConvention):
    """A variant of sql_cs_v1 which replaces Scandinavian characters."""

    def normalize_identifier(self, identifier: str) -> str:
        replace_map =  {'æ': 'ae', 'ø': 'oe', 'å': 'aa', 'ö': 'oe', 'ä': 'ae'}
        new_identifier = ''.join(replace_map.get(c, c) for c in identifier)
        return super().normalize_identifier(new_identifier)

I configure the pipeline to use it and the column gets renamed to AMLSISTUTFOERTDATO just as expected. The downside is that the pipeline doesn't run. It stops in the load stage with the error "100072 (22000): NULL result in a non-nullable column when I use my own custom naming convention". The same thing happens if I try to use the direct or the duck_case naming convention.

Another thing I noticed is that no _STAGING schema or table is created. I'm using the replace write disposition with staging-optimized replace strategy. When I use the sql_ci_v1 naming convention, the _STAGING schema and table are created.

The only non null column in the table is the ID column and all rows have data.

Expected behavior

I expect that the table is created with the correct column names and that the pipeline runs normally.

Steps to reproduce

Use the code above for the naming convention and create this table in a database (preferably SQL Server):

CREATE TABLE [dbo].[AMLPerFornyelse]
(
    [Id] [INT] IDENTITY(1, 1) NOT NULL
   ,[KundeID] [NUMERIC](11, 0) NULL
   ,[AmlSistUtførtDato] [DATE] NULL
   ,CONSTRAINT [PK_AMLResporring] PRIMARY KEY CLUSTERED ([Id] ASC) ON [PRIMARY]
) ON [PRIMARY];

Load the table to snowflake with replace and staging-optimized replace strategy.

Operating system

Linux

Runtime environment

Docker, Docker Compose

Python version

3.11

dlt data source

SQL Server

dlt destination

Snowflake

Other deployment details

No response

Additional information

No response

@rudolfix
Copy link
Collaborator

@burnbay thanks for this report! we are investigating it

@rudolfix rudolfix self-assigned this Jan 27, 2025
@rudolfix rudolfix added the bug Something isn't working label Jan 27, 2025
@rudolfix rudolfix moved this from Todo to In Progress in dlt core library Jan 27, 2025
@rudolfix
Copy link
Collaborator

@burnbay what happens if you use sql_cs_v1 naming convention? What happens if you derive your convention from sql_ci_v1 not from sql_cs_v1? both should work btw. cs will create case sensitive identifiers in snowflake. but that is supported

@VioletM VioletM added the support This issue is monitored by Solution Engineer label Jan 29, 2025
@rudolfix
Copy link
Collaborator

@burnbay I ran the following tests both deriving from ci and cs conventions

from dlt.common.normalizers.naming.sql_cs_v1 import NamingConvention as SqlCsV1NamingConvention
class ScandinavianNamingConvention(SqlCsV1NamingConvention):
    """A variant of sql_cs_v1 which replaces Scandinavian characters."""

    def normalize_identifier(self, identifier: str) -> str:
        replace_map =  {'æ': 'ae', 'ø': 'oe', 'å': 'aa', 'ö': 'oe', 'ä': 'ae'}
        new_identifier = ''.join(replace_map.get(c, c) for c in identifier)
        return super().normalize_identifier(new_identifier)


def test_char_replacement_naming_convention() -> None:
    duck_ = dlt.destinations.snowflake(naming_convention=ScandinavianNamingConvention, replace_strategy="staging-optimized")

    pipeline = dlt.pipeline("test_char_replacement_naming_convention", destination=duck_)

    data = [{"AmlSistUtførtDato": pendulum.now().date()}]

    pipeline.run(data, table_name="AMLPerFornyelseø", write_disposition="replace", loader_file_format="parquet")
    pipeline.run(data, table_name="AMLPerFornyelseø", write_disposition="replace", loader_file_format="parquet")
    print(pipeline.dataset()["AMLPerFornyelseoe"].df())

and it runs without any problems. here's a dump of SQL being executed. it is creating staging dataset and using CLONE for optimized copy:

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s
SELECT "version", "engine_version", "inserted_at", "schema_name", "version_hash", "schema" FROM "test_char_replacement_naming_convention_dataset"."_dlt_version" WHERE "version_hash" = %s;

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s
SELECT "version", "engine_version", "inserted_at", "schema_name", "version_hash", "schema" FROM "test_char_replacement_naming_convention_dataset_staging"."_dlt_version" WHERE "version_hash" = %s;

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s
TRUNCATE TABLE "test_char_replacement_naming_convention_dataset_staging"."AMLPerFornyelseoe";
PUT file:///home/rudolfix/src/dlt/_storage/.dlt/pipelines/test_char_replacement_naming_convention/load/normalized/1738158754.239918/started_jobs/AMLPerFornyelseoe.88fe295277.0.parquet @"test_char_replacement_naming_convention_dataset_staging"."%AMLPerFornyelseoe"/"1738158754.239918" OVERWRITE = TRUE, AUTO_COMPRESS = FALSE
COPY INTO "test_char_replacement_naming_convention_dataset_staging"."AMLPerFornyelseoe"
            FROM @"test_char_replacement_naming_convention_dataset_staging"."%AMLPerFornyelseoe"/"1738158754.239918"/AMLPerFornyelseoe.88fe295277.0.parquet


            FILE_FORMAT = (TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE, USE_LOGICAL_TYPE = TRUE)
            MATCH_BY_COLUMN_NAME='CASE_SENSITIVE'


DROP TABLE IF EXISTS "test_char_replacement_naming_convention_dataset"."AMLPerFornyelseoe";
CREATE TABLE "test_char_replacement_naming_convention_dataset"."AMLPerFornyelseoe" CLONE "test_char_replacement_naming_convention_dataset_staging"."AMLPerFornyelseoe";
INSERT INTO "test_char_replacement_naming_convention_dataset"."_dlt_loads"("load_id", "schema_name", "status", "inserted_at", "schema_version_hash") VALUES(%s, %s, %s, %s, %s);

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s
SELECT "version", "engine_version", "inserted_at", "schema_name", "version_hash", "schema" FROM "test_char_replacement_naming_convention_dataset"."_dlt_version" WHERE "version_hash" = %s;

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s
SELECT "version", "engine_version", "inserted_at", "schema_name", "version_hash", "schema" FROM "test_char_replacement_naming_convention_dataset_staging"."_dlt_version" WHERE "version_hash" = %s;

SELECT 1
    FROM INFORMATION_SCHEMA.SCHEMATA
    WHERE schema_name = %s
TRUNCATE TABLE "test_char_replacement_naming_convention_dataset_staging"."AMLPerFornyelseoe";
PUT file:///home/rudolfix/src/dlt/_storage/.dlt/pipelines/test_char_replacement_naming_convention/load/normalized/1738158759.9290993/started_jobs/AMLPerFornyelseoe.ae872dbafd.0.parquet @"test_char_replacement_naming_convention_dataset_staging"."%AMLPerFornyelseoe"/"1738158759.9290993" OVERWRITE = TRUE, AUTO_COMPRESS = FALSE
COPY INTO "test_char_replacement_naming_convention_dataset_staging"."AMLPerFornyelseoe"
            FROM @"test_char_replacement_naming_convention_dataset_staging"."%AMLPerFornyelseoe"/"1738158759.9290993"/AMLPerFornyelseoe.ae872dbafd.0.parquet


            FILE_FORMAT = (TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE, USE_LOGICAL_TYPE = TRUE)
            MATCH_BY_COLUMN_NAME='CASE_SENSITIVE'


DROP TABLE IF EXISTS "test_char_replacement_naming_convention_dataset"."AMLPerFornyelseoe";
CREATE TABLE "test_char_replacement_naming_convention_dataset"."AMLPerFornyelseoe" CLONE "test_char_replacement_naming_convention_dataset_staging"."AMLPerFornyelseoe";
INSERT INTO "test_char_replacement_naming_convention_dataset"."_dlt_loads"("load_id", "schema_name", "status", "inserted_at", "schema_version_hash") VALUES(%s, %s, %s, %s, %s);
SELECT
  *
FROM "test_char_replacement_naming_convention_dataset"."AMLPerFornyelseoe"

Note that I'm running my code against fresh datasets (both staging and destination). Maybe this is the problem in your case.

  1. please make sure that datasets are not created. ie. use some random dataset_name when loading
  2. cs naming convention assumes that names are case sensitive. you'll not get any *_STAGING dataset. you'll get *_staging dataset and to find it you need to quote the identifier. otherwise Snowlflake assumes it is case inventivie and looks for the upper case
    (yeah, case sensitivity in Snowflake is IMO a hack and it is better not to use it :) )

@rudolfix rudolfix added the question Further information is requested label Jan 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working question Further information is requested support This issue is monitored by Solution Engineer
Projects
Status: In Progress
Development

No branches or pull requests

3 participants