From e45de70c8c7139cb04767fd86789e9bbcfd3a0a8 Mon Sep 17 00:00:00 2001 From: Alessio Cesaretti Date: Wed, 19 Feb 2025 17:17:49 +0100 Subject: [PATCH 1/5] Pass csv parameters during duckdb connection --- .../engines/soda/connections/duckdb.py | 47 +++++-- .../model/data_contract_specification.py | 9 ++ .../schemas/datacontract-1.1.0.schema.json | 133 ++++++++++++++++-- datacontract/schemas/odcs-3.0.1.schema.json | 132 +++++++++++++++-- 4 files changed, 289 insertions(+), 32 deletions(-) diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index ea645a090..473bd53fd 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -42,14 +42,45 @@ def get_duckdb_connection(data_contract, server, run: Run): elif server.format == "csv": columns = to_csv_types(model) run.log_info("Using columns: " + str(columns)) - if columns is None: - con.sql( - f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1);""" - ) - else: - con.sql( - f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});""" - ) + + # Start with the required parameter. + params = ["hive_partitioning=1"] + + # Define a mapping for CSV parameters: server attribute -> read_csv parameter name. + param_mapping = { + "delimiter": "delim", # Map server.delimiter to 'delim' + "header": "header", + "escape": "escape", + "all_varchar": "all_varchar", + "allow_quoted_nulls": "allow_quoted_nulls", + "dateformat": "dateformat", + "decimal_separator": "decimal_separator", + "new_line": "new_line", + "timestampformat": "timestampformat", + "quote": "quote", + } + for server_attr, read_csv_param in param_mapping.items(): + value = getattr(server, server_attr, None) + if value is not None: + # Wrap string values in quotes. + if isinstance(value, str): + params.append(f"{read_csv_param}='{value}'") + else: + params.append(f"{read_csv_param}={value}") + + # Add columns if they exist. + if columns is not None: + params.append(f"columns={columns}") + + # Build the parameter string. + params_str = ", ".join(params) + + # Create the view with the assembled parameters. + con.sql(f""" + CREATE VIEW "{model_name}" AS + SELECT * FROM read_csv('{model_path}', {params_str}); + """) + elif server.format == "delta": con.sql("update extensions;") # Make sure we have the latest delta extension con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""") diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index dcfdd94ec..7fb43fdf3 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -58,6 +58,15 @@ class Server(pyd.BaseModel): dataset: str | None = None path: str | None = None delimiter: str | None = None + header: bool | None = None + escape: str | None = None + all_varchar: bool | None = None + allow_quoted_nulls: bool | None = None + dateformat: str | None = None + decimal_separator: str | None = None + new_line: str | None = None + timestampformat: str | None = None + quote: str | None = None endpointUrl: str | None = None location: str | None = None account: str | None = None diff --git a/datacontract/schemas/datacontract-1.1.0.schema.json b/datacontract/schemas/datacontract-1.1.0.schema.json index 29b933e14..61276ca7b 100644 --- a/datacontract/schemas/datacontract-1.1.0.schema.json +++ b/datacontract/schemas/datacontract-1.1.0.schema.json @@ -1212,11 +1212,48 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." + } } }, "required": [ @@ -1248,11 +1285,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1336,11 +1409,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ diff --git a/datacontract/schemas/odcs-3.0.1.schema.json b/datacontract/schemas/odcs-3.0.1.schema.json index e501aad32..8f899bdbf 100644 --- a/datacontract/schemas/odcs-3.0.1.schema.json +++ b/datacontract/schemas/odcs-3.0.1.schema.json @@ -776,11 +776,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1381,11 +1417,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ @@ -1417,11 +1489,47 @@ }, "delimiter": { "type": "string", - "enum": [ - "new_line", - "array" + "anyOf": [ + { "enum": ["new_line", "array"] }, + { "pattern": "^.$" } ], - "description": "Only for format = json. How multiple json documents are delimited within one file" + "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ From 9a9fcf52ede8009c85abd0bea7e063dc003cb54a Mon Sep 17 00:00:00 2001 From: Stefan McKinnon Edwards Date: Wed, 26 Feb 2025 15:33:39 +0100 Subject: [PATCH 2/5] fix: Typo in datacontract.schema --- .../schemas/datacontract-1.1.0.schema.json | 73 +++++++++---------- 1 file changed, 36 insertions(+), 37 deletions(-) diff --git a/datacontract/schemas/datacontract-1.1.0.schema.json b/datacontract/schemas/datacontract-1.1.0.schema.json index 61276ca7b..f2d9d9069 100644 --- a/datacontract/schemas/datacontract-1.1.0.schema.json +++ b/datacontract/schemas/datacontract-1.1.0.schema.json @@ -1217,43 +1217,42 @@ { "pattern": "^.$" } ], "description": "For JSON format, only 'new_line' or 'array' is allowed to indicate how multiple JSON documents are delimited. For CSV format, any single character can be used as the delimiter between columns. Only valid for CSV." - }, - "header": { - "type": "boolean", - "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." - }, - "escape": { - "type": "string", - "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." - }, - "all_varchar": { - "type": "boolean", - "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." - }, - "allow_quoted_nulls": { - "type": "boolean", - "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." - }, - "dateformat": { - "type": "string", - "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." - }, - "decimal_separator": { - "type": "string", - "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." - }, - "new_line": { - "type": "string", - "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." - }, - "timestampformat": { - "type": "string", - "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." - }, - "quote": { - "type": "string", - "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." - } + }, + "header": { + "type": "boolean", + "description": "Indicates whether the first row in the CSV file should be treated as column headers. Only valid for CSV." + }, + "escape": { + "type": "string", + "description": "Specifies the escape character used in the CSV file to include special characters in fields. Only valid for CSV." + }, + "all_varchar": { + "type": "boolean", + "description": "If true, all CSV columns are read as VARCHAR (strings), bypassing type inference. Only valid for CSV." + }, + "allow_quoted_nulls": { + "type": "boolean", + "description": "If true, quoted 'NULL' values in the CSV are interpreted as SQL NULL rather than as the string 'NULL'. Only valid for CSV." + }, + "dateformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d') used to parse date values in the CSV. Only valid for CSV." + }, + "decimal_separator": { + "type": "string", + "description": "The character used as the decimal separator in numeric CSV values (e.g., '.' or ','). Only valid for CSV." + }, + "new_line": { + "type": "string", + "description": "The newline character(s) used in the CSV file (e.g., '\\n' or '\\r\\n'). Only valid for CSV." + }, + "timestampformat": { + "type": "string", + "description": "A format string (e.g., '%Y-%m-%d %H:%M:%S') used to parse timestamp values in the CSV. Only valid for CSV." + }, + "quote": { + "type": "string", + "description": "The character used for quoting fields in the CSV file (e.g., '\"'). Only valid for CSV." } }, "required": [ From 5c848be4aac6eded6dd6947698ecdcce527ab15e Mon Sep 17 00:00:00 2001 From: Stefan McKinnon Edwards Date: Thu, 27 Feb 2025 10:04:10 +0100 Subject: [PATCH 3/5] fix: mismatch between csv file and model Allows duckdb to load the csv file correctly and lets SodaCL check for field presence. This fix does not check for incorrect ordering of columns. --- .../engines/soda/connections/duckdb.py | 30 +++++ pyproject.toml | 1 + tests/fixtures/csv/data/datacontract.yaml | 2 +- tests/test_duckdb_csv.py | 115 ++++++++++++++++++ tests/test_test_local_csv.py | 26 ++++ 5 files changed, 173 insertions(+), 1 deletion(-) create mode 100644 tests/test_duckdb_csv.py create mode 100644 tests/test_test_local_csv.py diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index 473bd53fd..ee369d996 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -1,8 +1,11 @@ +import io import os import duckdb + from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type +from datacontract.export.sql_converter import _escape from datacontract.model.run import Run @@ -68,6 +71,15 @@ def get_duckdb_connection(data_contract, server, run: Run): else: params.append(f"{read_csv_param}={value}") + # Sniff out columns, if available: + has_header = getattr(server, "header", True) + if columns is not None and (has_header or has_header is None): + csv_columns = sniff_csv_header(model_path, server) + difference = set(csv_columns) - set(columns.keys()) + if len(difference) > 0: + run.log_warn(f"{model_path} contained unexpected fields: {', '.join(difference)}!") + columns = { k:columns.get(k, 'VARCHAR') for k in csv_columns } + # Add columns if they exist. if columns is not None: params.append(f"columns={columns}") @@ -97,6 +109,24 @@ def to_csv_types(model) -> dict: return columns +def sniff_csv_header(model_path, server): + # Define a mapping for CSV parameters: server attribute -> duckdb.read_csv parameter name. + # Note! The parameter names in the python calls (read_csv, read_csv_auto, and from_csv_auto) + # are different from those used in the SQL statements. + param_mapping = { + "delimiter": "delimiter", + "header": "header", + "escape": "escapechar", + "decimal_separator": "decimal", + "quote": "quotechar" + } + # Remainder params are left out, as we do not care about parsing datatype for just the header. + with open(model_path, 'rb') as model_file: + header_line = model_file.readline() + csv_params = { v: getattr(server, k) for (k,v) in param_mapping.items() if getattr(server, k, None) is not None } + # from_csv_auto + return duckdb.from_csv_auto(io.BytesIO(header_line), **csv_params).columns + def setup_s3_connection(con, server): s3_region = os.getenv("DATACONTRACT_S3_REGION") s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID") diff --git a/pyproject.toml b/pyproject.toml index d68e76174..7792eac74 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "rich>=13.7,<13.10", "sqlglot>=26.6.0,<27.0.0", "duckdb==1.1.2", + "fsspec", "soda-core-duckdb>=3.3.20,<3.5.0", # remove setuptools when https://github.com/sodadata/soda-core/issues/2091 is resolved "setuptools>=60", diff --git a/tests/fixtures/csv/data/datacontract.yaml b/tests/fixtures/csv/data/datacontract.yaml index b0aacc22a..7e25751b0 100644 --- a/tests/fixtures/csv/data/datacontract.yaml +++ b/tests/fixtures/csv/data/datacontract.yaml @@ -7,7 +7,7 @@ servers: production: type: local format: csv - path: ./tests/fixtures/csv/data/sample_data.csv + path: ./fixtures/csv/data/sample_data.csv delimiter: ',' models: sample_data: diff --git a/tests/test_duckdb_csv.py b/tests/test_duckdb_csv.py new file mode 100644 index 000000000..d4184f1ae --- /dev/null +++ b/tests/test_duckdb_csv.py @@ -0,0 +1,115 @@ +import pytest +import duckdb + +from datacontract.data_contract import DataContract +from datacontract.model.run import Run +from datacontract.lint import resolve +from datacontract.engines.soda.connections.duckdb import get_duckdb_connection + +def test_csv_all_fields_present(): + data_contract_str = """dataContractSpecification: 1.1.0 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +servers: + production: + type: local + format: csv + path: ./fixtures/csv/data/sample_data.csv + delimiter: ',' +models: + sample_data: + description: Csv file with encoding ascii + type: table + fields: + field_one: + type: string + field_two: + type: integer + field_three: + type: string + """ + data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) + con = get_duckdb_connection(data_contract, data_contract.servers["production"], Run.create_run()) + assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] + assert con.table("sample_data").shape == (10,3) + +def test_csv_missing_field(): + data_contract_str = """dataContractSpecification: 1.1.0 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +servers: + production: + type: local + format: csv + path: ./fixtures/csv/data/sample_data.csv + delimiter: ',' +models: + sample_data: + description: Csv file with encoding ascii + type: table + fields: + field_one: + type: string + field_two: + type: integer + field_three: + type: string + missing_field: + type: string + """ + data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) + # this is ok + con = get_duckdb_connection(data_contract, data_contract.servers["production"], Run.create_run()) + assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] + assert con.table("sample_data").shape == (10,3) + # now test + data_contract = DataContract(data_contract_str=data_contract_str) + run = data_contract.test() + checks = {k.field:k for k in run.checks if k.type == 'field_is_present'} + assert checks['field_one'].result == 'passed' + assert checks['field_two'].result == 'passed' + assert checks['field_three'].result == 'passed' + assert checks['missing_field'].result == 'failed' + + +def test_local_csv_extra_field(): + data_contract_str = """dataContractSpecification: 1.1.0 +id: my-data-contract-id +info: + title: My Data Contract + version: 0.0.1 +servers: + production: + type: local + format: csv + path: ./fixtures/csv/data/sample_data.csv + delimiter: ',' +models: + sample_data: + description: Csv file with encoding ascii + type: table + fields: + field_three: + type: string + field_one: + type: string + """ + data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) + # this is somewhat ok + run = Run.create_run() + con = get_duckdb_connection(data_contract, data_contract.servers["production"], run) + assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] + assert con.table("sample_data").shape == (10,3) + assert any([v.message == './fixtures/csv/data/sample_data.csv contained unexpected fields: field_two!' for v in run.logs if v.level == 'WARN']) + + # now test + data_contract = DataContract(data_contract_str=data_contract_str) + run = data_contract.test() + checks = {k.field:k for k in run.checks if k.type == 'field_is_present'} + assert len(checks) == 2 + assert checks['field_one'].result == 'passed' + assert checks['field_three'].result == 'passed' diff --git a/tests/test_test_local_csv.py b/tests/test_test_local_csv.py new file mode 100644 index 000000000..9eee5ee70 --- /dev/null +++ b/tests/test_test_local_csv.py @@ -0,0 +1,26 @@ +import pytest +from typer.testing import CliRunner + +from datacontract.cli import app +from datacontract.model.run import Run +from datacontract.data_contract import DataContract +from datacontract.lint import resolve +from datacontract.engines.soda.connections.duckdb import get_duckdb_connection + +runner = CliRunner() + +#csv_file_path = "fixtures/csv/data/sample_data.csv" + + +def test_cli(): + result = runner.invoke(app, "./fixtures/csv/data/datacontract.yaml") + assert result.exit_code == 0 + + + +def test_local_json(): + data_contract = DataContract(data_contract_file="./fixtures/csv/data/datacontract.yaml") + run = data_contract.test() + print(run) + assert run.result == "passed" + From 46e874c566c9a10007fb312b37236837bda30fbb Mon Sep 17 00:00:00 2001 From: Stefan McKinnon Edwards Date: Thu, 27 Feb 2025 10:26:47 +0100 Subject: [PATCH 4/5] fix: added checks to duckdb-csv loading Removed logging warning. --- .../engines/soda/connections/duckdb.py | 27 +++++++++++++++++-- tests/test_duckdb_csv.py | 7 +++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index ee369d996..bc4635029 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -1,12 +1,13 @@ import io import os +import uuid import duckdb from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type from datacontract.export.sql_converter import _escape -from datacontract.model.run import Run +from datacontract.model.run import ResultEnum, Run, Check def get_duckdb_connection(data_contract, server, run: Run): @@ -76,8 +77,30 @@ def get_duckdb_connection(data_contract, server, run: Run): if columns is not None and (has_header or has_header is None): csv_columns = sniff_csv_header(model_path, server) difference = set(csv_columns) - set(columns.keys()) + same_order = list(columns.keys()) == csv_columns[:len(columns)] + if same_order == False: + run.checks.append(Check( + id=str(uuid.uuid4()), + model=model_name, + category="schema", + type="fields_are_same", + name="Column order mismatch", + result=ResultEnum.warning, + reason=f"Order of columns in {model_path} does not match the model.", + details=f"Expected: {'|'.join(columns.keys())}\nActual: {'|'.join(csv_columns)}", + engine="datacontract", + )) if len(difference) > 0: - run.log_warn(f"{model_path} contained unexpected fields: {', '.join(difference)}!") + run.checks.append(Check( + id=str(uuid.uuid4()), + model=model_name, + category="schema", + type="fields_are_same", + name="Dataset contained unexpected fields", + result=ResultEnum.warning, + reason=f"{model_path} contained unexpected fields: {', '.join(difference)}", + engine="datacontract", + )) columns = { k:columns.get(k, 'VARCHAR') for k in csv_columns } # Add columns if they exist. diff --git a/tests/test_duckdb_csv.py b/tests/test_duckdb_csv.py index d4184f1ae..697e64ddc 100644 --- a/tests/test_duckdb_csv.py +++ b/tests/test_duckdb_csv.py @@ -63,9 +63,11 @@ def test_csv_missing_field(): """ data_contract = resolve.resolve_data_contract(data_contract_str = data_contract_str) # this is ok - con = get_duckdb_connection(data_contract, data_contract.servers["production"], Run.create_run()) + run = Run.create_run() + con = get_duckdb_connection(data_contract, data_contract.servers["production"], run) assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] assert con.table("sample_data").shape == (10,3) + assert any([c.name == 'Column order mismatch' for c in run.checks if c.result == 'warning']) # now test data_contract = DataContract(data_contract_str=data_contract_str) run = data_contract.test() @@ -104,7 +106,8 @@ def test_local_csv_extra_field(): con = get_duckdb_connection(data_contract, data_contract.servers["production"], run) assert con.table("sample_data").columns == ['field_one', 'field_two', 'field_three'] assert con.table("sample_data").shape == (10,3) - assert any([v.message == './fixtures/csv/data/sample_data.csv contained unexpected fields: field_two!' for v in run.logs if v.level == 'WARN']) + assert any([c.name == 'Column order mismatch' for c in run.checks if c.result == 'warning']) + assert any([c.name == 'Dataset contained unexpected fields' for c in run.checks if c.result == 'warning']) # now test data_contract = DataContract(data_contract_str=data_contract_str) From a0b23622e9596995fd781a4e507c8dff42364fc8 Mon Sep 17 00:00:00 2001 From: Alessio Cesaretti Date: Fri, 28 Feb 2025 16:00:59 +0100 Subject: [PATCH 5/5] refactor server vars to use lowerCamelCase --- .../engines/soda/connections/duckdb.py | 75 ++++++++++--------- .../model/data_contract_specification.py | 8 +- 2 files changed, 43 insertions(+), 40 deletions(-) diff --git a/datacontract/engines/soda/connections/duckdb.py b/datacontract/engines/soda/connections/duckdb.py index bc4635029..1d00c50f5 100644 --- a/datacontract/engines/soda/connections/duckdb.py +++ b/datacontract/engines/soda/connections/duckdb.py @@ -4,10 +4,8 @@ import duckdb - from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type -from datacontract.export.sql_converter import _escape -from datacontract.model.run import ResultEnum, Run, Check +from datacontract.model.run import Check, ResultEnum, Run def get_duckdb_connection(data_contract, server, run: Run): @@ -55,11 +53,11 @@ def get_duckdb_connection(data_contract, server, run: Run): "delimiter": "delim", # Map server.delimiter to 'delim' "header": "header", "escape": "escape", - "all_varchar": "all_varchar", - "allow_quoted_nulls": "allow_quoted_nulls", + "allVarchar": "all_varchar", + "allowQuotedNulls": "allow_quoted_nulls", "dateformat": "dateformat", - "decimal_separator": "decimal_separator", - "new_line": "new_line", + "decimalSeparator": "decimal_separator", + "newLine": "new_line", "timestampformat": "timestampformat", "quote": "quote", } @@ -77,31 +75,35 @@ def get_duckdb_connection(data_contract, server, run: Run): if columns is not None and (has_header or has_header is None): csv_columns = sniff_csv_header(model_path, server) difference = set(csv_columns) - set(columns.keys()) - same_order = list(columns.keys()) == csv_columns[:len(columns)] - if same_order == False: - run.checks.append(Check( - id=str(uuid.uuid4()), - model=model_name, - category="schema", - type="fields_are_same", - name="Column order mismatch", - result=ResultEnum.warning, - reason=f"Order of columns in {model_path} does not match the model.", - details=f"Expected: {'|'.join(columns.keys())}\nActual: {'|'.join(csv_columns)}", - engine="datacontract", - )) + same_order = list(columns.keys()) == csv_columns[: len(columns)] + if not same_order: + run.checks.append( + Check( + id=str(uuid.uuid4()), + model=model_name, + category="schema", + type="fields_are_same", + name="Column order mismatch", + result=ResultEnum.warning, + reason=f"Order of columns in {model_path} does not match the model.", + details=f"Expected: {'|'.join(columns.keys())}\nActual: {'|'.join(csv_columns)}", + engine="datacontract", + ) + ) if len(difference) > 0: - run.checks.append(Check( - id=str(uuid.uuid4()), - model=model_name, - category="schema", - type="fields_are_same", - name="Dataset contained unexpected fields", - result=ResultEnum.warning, - reason=f"{model_path} contained unexpected fields: {', '.join(difference)}", - engine="datacontract", - )) - columns = { k:columns.get(k, 'VARCHAR') for k in csv_columns } + run.checks.append( + Check( + id=str(uuid.uuid4()), + model=model_name, + category="schema", + type="fields_are_same", + name="Dataset contained unexpected fields", + result=ResultEnum.warning, + reason=f"{model_path} contained unexpected fields: {', '.join(difference)}", + engine="datacontract", + ) + ) + columns = {k: columns.get(k, "VARCHAR") for k in csv_columns} # Add columns if they exist. if columns is not None: @@ -137,19 +139,20 @@ def sniff_csv_header(model_path, server): # Note! The parameter names in the python calls (read_csv, read_csv_auto, and from_csv_auto) # are different from those used in the SQL statements. param_mapping = { - "delimiter": "delimiter", + "delimiter": "delimiter", "header": "header", "escape": "escapechar", "decimal_separator": "decimal", - "quote": "quotechar" + "quote": "quotechar", } # Remainder params are left out, as we do not care about parsing datatype for just the header. - with open(model_path, 'rb') as model_file: + with open(model_path, "rb") as model_file: header_line = model_file.readline() - csv_params = { v: getattr(server, k) for (k,v) in param_mapping.items() if getattr(server, k, None) is not None } - # from_csv_auto + csv_params = {v: getattr(server, k) for (k, v) in param_mapping.items() if getattr(server, k, None) is not None} + # from_csv_auto return duckdb.from_csv_auto(io.BytesIO(header_line), **csv_params).columns + def setup_s3_connection(con, server): s3_region = os.getenv("DATACONTRACT_S3_REGION") s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID") diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index 7fb43fdf3..9983ef728 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -60,11 +60,11 @@ class Server(pyd.BaseModel): delimiter: str | None = None header: bool | None = None escape: str | None = None - all_varchar: bool | None = None - allow_quoted_nulls: bool | None = None + allVarchar: bool | None = None + allowQuotedNulls: bool | None = None dateformat: str | None = None - decimal_separator: str | None = None - new_line: str | None = None + decimalSeparator: str | None = None + newLine: str | None = None timestampformat: str | None = None quote: str | None = None endpointUrl: str | None = None