From 39d6a18151220811b34a4c321a3edce46606bde2 Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Tue, 25 Feb 2025 18:46:37 -0500 Subject: [PATCH 01/21] initial add az_read_file feat. --- CHANGELOG.md | 5 ++ .../fastjsonschema/az/az_read_files.py | 37 +++++++++++ .../fastjsonschema/check_jsonschema.py | 62 ++++++++++++++++--- pyproject.toml | 6 +- 4 files changed, 99 insertions(+), 11 deletions(-) create mode 100644 datacontract/engines/fastjsonschema/az/az_read_files.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c66add6ec..2328d7dd0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Azure Storage Account json Check +- Azure and S3 filepath token `{year}, {month}, {day}, {date}, {day-1}, {quarter}` + +### Added + - `datacontract test --output-format junit --output TEST-datacontract.xml` Export CLI test results to a file, in a standard format (e.g. JUnit) to improve CI/CD experience (#650) diff --git a/datacontract/engines/fastjsonschema/az/az_read_files.py b/datacontract/engines/fastjsonschema/az/az_read_files.py new file mode 100644 index 000000000..f86957d5b --- /dev/null +++ b/datacontract/engines/fastjsonschema/az/az_read_files.py @@ -0,0 +1,37 @@ +import logging +import os + +from datacontract.model.exceptions import DataContractException + + +def yield_az_files(az_endpoint_url, az_location): + fs = az_fs(az_endpoint_url) + files = fs.glob(az_location) + for file in files: + with fs.open(file) as f: + logging.info(f"Downloading file {file}") + yield f.read() + +def az_fs(az_endpoint_url): + try: + import adlfs + except ImportError as e: + raise DataContractException( + type="schema", + result="failed", + name="az extra missing", + reason="Install the extra datacontract-cli\[azure] to use az", + engine="datacontract", + original_exception=e, + ) + + az_client_id = os.getenv("DATACONTRACT_AZURE_CLIENT_ID") + az_client_secret = os.getenv("DATACONTRACT_AZURE_CLIENT_SECRET") + az_tenant_id = os.getenv("DATACONTRACT_AZURE_TENANT_ID") + return adlfs.AzureBlobFileSystem( + az_endpoint_url=az_endpoint_url, + client_id=az_client_id, + client_secret=az_client_secret, + tenant_id=az_tenant_id, + anon=az_client_id is None, + ) diff --git a/datacontract/engines/fastjsonschema/check_jsonschema.py b/datacontract/engines/fastjsonschema/check_jsonschema.py index 5ea79caad..b69802b74 100644 --- a/datacontract/engines/fastjsonschema/check_jsonschema.py +++ b/datacontract/engines/fastjsonschema/check_jsonschema.py @@ -2,12 +2,14 @@ import logging import os import threading +from datetime import datetime from typing import List, Optional import fastjsonschema from fastjsonschema import JsonSchemaValueException from datacontract.engines.fastjsonschema.s3.s3_read_files import yield_s3_files +from datacontract.engines.fastjsonschema.az.az_read_files import yield_az_files from datacontract.export.jsonschema_converter import to_jsonschema from datacontract.model.data_contract_specification import DataContractSpecification, Server from datacontract.model.exceptions import DataContractException @@ -212,6 +214,45 @@ def process_s3_file(run, server, schema, model_name, validate): # Handle all errors from schema validation. process_exceptions(run, exceptions) +def process_azure_file(run, server, schema, model_name, validate): + az_endpoint_url = server.endpointUrl + az_location = server.location + date = datetime.today() + + if "{model}" in az_location: + az_location = az_location.format(model=model_name) + if "{year}" in az_location: + az_location = az_location.format(year=date.strftime('%Y')) + if "{month}" in az_location: + az_location = az_location.format(month=date.strftime('%m')) + if "{day}" in az_location: + az_location = az_location.format(day=date.strftime('%d')) + if "{date}" in az_location: + az_location = az_location.format(date=date.strftime('%Y-%m-%d')) + json_stream = None + + for file_content in yield_az_files(az_endpoint_url, az_location): + if server.delimiter == "new_line": + json_stream = read_json_lines_content(file_content) + elif server.delimiter == "array": + json_stream = read_json_array_content(file_content) + else: + json_stream = read_json_file_content(file_content) + + if json_stream is None: + raise DataContractException( + type="schema", + name="Check that JSON has valid schema", + result="warning", + reason=f"Cannot find any file in {s3_location}", + engine="datacontract", + ) + + # Validate the JSON stream and collect exceptions. + exceptions = validate_json_stream(schema, model_name, validate, json_stream) + + # Handle all errors from schema validation. + process_exceptions(run, exceptions) def check_jsonschema(run: Run, data_contract: DataContractSpecification, server: Server): run.log_info("Running engine jsonschema") @@ -262,16 +303,17 @@ def check_jsonschema(run: Run, data_contract: DataContractSpecification, server: ) ) elif server.type == "azure": - run.checks.append( - Check( - type="schema", - name="Check that JSON has valid schema", - model=model_name, - result=ResultEnum.info, - reason="JSON Schema check skipped for azure, as azure is currently not supported", - engine="jsonschema", - ) - ) + process_azure_file(run, server, schema, model_name, validate) + #run.checks.append( + # Check( + # type="schema", + # name="Check that JSON has valid schema", + # model=model_name, + # result=ResultEnum.info, + # reason="JSON Schema check skipped for azure, as azure is currently not supported", + # engine="jsonschema", + # ) + #) else: run.checks.append( Check( diff --git a/pyproject.toml b/pyproject.toml index 10d4cf649..5cf27de56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,10 @@ s3 = [ "aiobotocore>=2.17.0,<2.20.0", ] +azure = [ + "adlfs>=2024.12.0", +] + snowflake = [ "snowflake-connector-python[pandas]>=3.6,<3.14", "soda-core-snowflake>=3.3.20,<3.5.0" @@ -111,7 +115,7 @@ api = [ ] all = [ - "datacontract-cli[kafka,bigquery,csv,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet,rdf,api]" + "datacontract-cli[kafka,bigquery,csv,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet,rdf,api,azure]" ] dev = [ From f1e14187501494f0f354c2974ca1cd0d848c50cb Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Wed, 26 Feb 2025 13:50:25 -0500 Subject: [PATCH 02/21] initial add az_read_file feat. --- CHANGELOG.md | 5 ++ .../fastjsonschema/az/az_read_files.py | 37 +++++++++++ .../fastjsonschema/check_jsonschema.py | 62 ++++++++++++++++--- datacontract/imports/odcs_v3_importer.py | 4 +- .../model/data_contract_specification.py | 3 + pyproject.toml | 6 +- 6 files changed, 105 insertions(+), 12 deletions(-) create mode 100644 datacontract/engines/fastjsonschema/az/az_read_files.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c66add6ec..dc08d9ea2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Azure Storage Account json Check +- Azure token `{year}, {month}, {day}, {date}, {day-1}, {quarter}` + +### Added + - `datacontract test --output-format junit --output TEST-datacontract.xml` Export CLI test results to a file, in a standard format (e.g. JUnit) to improve CI/CD experience (#650) diff --git a/datacontract/engines/fastjsonschema/az/az_read_files.py b/datacontract/engines/fastjsonschema/az/az_read_files.py new file mode 100644 index 000000000..b3fa057a2 --- /dev/null +++ b/datacontract/engines/fastjsonschema/az/az_read_files.py @@ -0,0 +1,37 @@ +import logging +import os + +from datacontract.model.exceptions import DataContractException + + +def yield_az_files(az_storageAccount, az_location): + fs = az_fs(az_storageAccount) + files = fs.glob(az_location) + for file in files: + with fs.open(file) as f: + logging.info(f"Downloading file {file}") + yield f.read() + +def az_fs(az_storageAccount): + try: + import adlfs + except ImportError as e: + raise DataContractException( + type="schema", + result="failed", + name="az extra missing", + reason="Install the extra datacontract-cli\\[azure] to use az", + engine="datacontract", + original_exception=e, + ) + + az_client_id = os.getenv("DATACONTRACT_AZURE_CLIENT_ID") + az_client_secret = os.getenv("DATACONTRACT_AZURE_CLIENT_SECRET") + az_tenant_id = os.getenv("DATACONTRACT_AZURE_TENANT_ID") + return adlfs.AzureBlobFileSystem( + account_name=az_storageAccount, + client_id=az_client_id, + client_secret=az_client_secret, + tenant_id=az_tenant_id, + anon=az_client_id is None, + ) diff --git a/datacontract/engines/fastjsonschema/check_jsonschema.py b/datacontract/engines/fastjsonschema/check_jsonschema.py index 5ea79caad..f4eb100ca 100644 --- a/datacontract/engines/fastjsonschema/check_jsonschema.py +++ b/datacontract/engines/fastjsonschema/check_jsonschema.py @@ -2,12 +2,14 @@ import logging import os import threading +from datetime import datetime from typing import List, Optional import fastjsonschema from fastjsonschema import JsonSchemaValueException from datacontract.engines.fastjsonschema.s3.s3_read_files import yield_s3_files +from datacontract.engines.fastjsonschema.az.az_read_files import yield_az_files from datacontract.export.jsonschema_converter import to_jsonschema from datacontract.model.data_contract_specification import DataContractSpecification, Server from datacontract.model.exceptions import DataContractException @@ -212,6 +214,55 @@ def process_s3_file(run, server, schema, model_name, validate): # Handle all errors from schema validation. process_exceptions(run, exceptions) +def process_azure_file(run, server, schema, model_name, validate): + + if server.storageAccount is None: + raise DataContractException( + type="schema", + name="Check that JSON has valid schema", + result="warning", + reason=f"Cannot retrieve storageAccount in Server config", + engine="datacontract", + ) + + az_storageAccount = server.storageAccount + az_location = server.location + date = datetime.today() + + if "{model}" in az_location: + az_location = az_location.format(model=model_name) + if "{year}" in az_location: + az_location = az_location.format(year=date.strftime('%Y')) + if "{month}" in az_location: + az_location = az_location.format(month=date.strftime('%m')) + if "{day}" in az_location: + az_location = az_location.format(day=date.strftime('%d')) + if "{date}" in az_location: + az_location = az_location.format(date=date.strftime('%Y-%m-%d')) + json_stream = None + + for file_content in yield_az_files(az_storageAccount, az_location): + if server.delimiter == "new_line": + json_stream = read_json_lines_content(file_content) + elif server.delimiter == "array": + json_stream = read_json_array_content(file_content) + else: + json_stream = read_json_file_content(file_content) + + if json_stream is None: + raise DataContractException( + type="schema", + name="Check that JSON has valid schema", + result="warning", + reason=f"Cannot find any file in {az_location}", + engine="datacontract", + ) + + # Validate the JSON stream and collect exceptions. + exceptions = validate_json_stream(schema, model_name, validate, json_stream) + + # Handle all errors from schema validation. + process_exceptions(run, exceptions) def check_jsonschema(run: Run, data_contract: DataContractSpecification, server: Server): run.log_info("Running engine jsonschema") @@ -262,16 +313,7 @@ def check_jsonschema(run: Run, data_contract: DataContractSpecification, server: ) ) elif server.type == "azure": - run.checks.append( - Check( - type="schema", - name="Check that JSON has valid schema", - model=model_name, - result=ResultEnum.info, - reason="JSON Schema check skipped for azure, as azure is currently not supported", - engine="jsonschema", - ) - ) + process_azure_file(run, server, schema, model_name, validate) else: run.checks.append( Check( diff --git a/datacontract/imports/odcs_v3_importer.py b/datacontract/imports/odcs_v3_importer.py index 56e9998c7..f1c2f4b9b 100644 --- a/datacontract/imports/odcs_v3_importer.py +++ b/datacontract/imports/odcs_v3_importer.py @@ -98,6 +98,7 @@ def import_servers(odcs_contract: Dict[str, Any]) -> Dict[str, Server] | None: continue server = Server() + server.name = server_name server.type = odcs_server.get("type") server.description = odcs_server.get("description") server.environment = odcs_server.get("environment") @@ -121,8 +122,9 @@ def import_servers(odcs_contract: Dict[str, Any]) -> Dict[str, Server] | None: server.outputPortId = odcs_server.get("outputPortId") server.driver = odcs_server.get("driver") server.roles = odcs_server.get("roles") + server.storageAccount = odcs_server.get("storageAccount") - servers[server_name] = server + servers[server.name] = server return servers diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index dcfdd94ec..9fbc88f47 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -28,6 +28,8 @@ "record", "struct", "null", + "geography", + "geometry", ] @@ -50,6 +52,7 @@ class ServerRole(pyd.BaseModel): class Server(pyd.BaseModel): + name: str | None = None type: str | None = None description: str | None = None environment: str | None = None diff --git a/pyproject.toml b/pyproject.toml index 10d4cf649..5cf27de56 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,10 @@ s3 = [ "aiobotocore>=2.17.0,<2.20.0", ] +azure = [ + "adlfs>=2024.12.0", +] + snowflake = [ "snowflake-connector-python[pandas]>=3.6,<3.14", "soda-core-snowflake>=3.3.20,<3.5.0" @@ -111,7 +115,7 @@ api = [ ] all = [ - "datacontract-cli[kafka,bigquery,csv,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet,rdf,api]" + "datacontract-cli[kafka,bigquery,csv,snowflake,postgres,databricks,sqlserver,s3,trino,dbt,dbml,iceberg,parquet,rdf,api,azure]" ] dev = [ From 1ff252faae4a00850062ff8fadd4c7fc04bfe268 Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Mon, 24 Mar 2025 10:55:36 -0400 Subject: [PATCH 03/21] add sql ddl for view, implement lineage tables --- datacontract/export/sql_converter.py | 66 +++++++++++++------ datacontract/imports/odcs_v3_importer.py | 7 +- .../model/data_contract_specification.py | 1 + 3 files changed, 54 insertions(+), 20 deletions(-) diff --git a/datacontract/export/sql_converter.py b/datacontract/export/sql_converter.py index 2aabe111d..8b6e02913 100644 --- a/datacontract/export/sql_converter.py +++ b/datacontract/export/sql_converter.py @@ -100,31 +100,59 @@ def to_sql_ddl( def _to_sql_table(model_name, model, server_type="snowflake"): - if server_type == "databricks": + result = "init" + if server_type in ("databricks","snowflake") and model.type.lower() == "table": # Databricks recommends to use the CREATE OR REPLACE statement for unity managed tables # https://docs.databricks.com/en/sql/language-manual/sql-ref-syntax-ddl-create-table-using.html + # the same for Snowflake + # https://docs.snowflake.com/en/sql-reference/sql/create-table result = f"CREATE OR REPLACE TABLE {model_name} (\n" - else: + elif model.type.lower() == "table": result = f"CREATE TABLE {model_name} (\n" + elif server_type == "snowflake" and model.type.lower() == "view": + # https://docs.snowflake.com/en/sql-reference/sql/create-view + result = f"CREATE OR ALTER VIEW {model_name} (\n" + fields = len(model.fields) current_field_index = 1 - for field_name, field in iter(model.fields.items()): - type = convert_to_sql_type(field, server_type) - result += f" {field_name} {type}" - if field.required: - result += " not null" - if field.primaryKey or field.primary: - result += " primary key" - if server_type == "databricks" and field.description is not None: - result += f' COMMENT "{_escape(field.description)}"' - if current_field_index < fields: - result += "," - result += "\n" - current_field_index += 1 - result += ")" - if server_type == "databricks" and model.description is not None: - result += f' COMMENT "{_escape(model.description)}"' - result += ";\n" + if model.type.lower() == "table": + for field_name, field in iter(model.fields.items()): + type = convert_to_sql_type(field, server_type) + result += f" {field_name} {type}" + if field.required: + result += " not null" + if (field.primaryKey or field.primary) and field.c: + result += " primary key" + if server_type in ("snowflake","databricks") and field.description is not None: + result += f' COMMENT "{_escape(field.description)}"' + if current_field_index < fields: + result += "," + result += "\n" + current_field_index += 1 + result += ")" + elif model.type.lower() == "view": + field_list = '' + lineage_list = set() + for field_name, field in iter(model.fields.items()): + type = convert_to_sql_type(field, server_type) + result += f" {field_name}" + field_list += f" \n\t\t{field_name}," + if server_type in ("databricks","snowflake") and field.description is not None: + result += f' COMMENT "{_escape(field.description)}"' + if current_field_index < fields: + result += "," + result += "\n" + current_field_index += 1 + if field.lineages: + lineage_list = lineage_list.union(set(field.lineages)) + + result += f")" + if server_type in ( "snowflake","databricks") and model.description is not None: + result += f'\nCOMMENT "{_escape(model.description)}"\nAS \n\tSELECT {field_list} \n\tFROM {''.join(lineage_list)}' + else: + result += f')\nAS \n\tSELECT {field_list} \n\tFROM {''.join(lineage_list)}' + + result += ";\n\n" return result diff --git a/datacontract/imports/odcs_v3_importer.py b/datacontract/imports/odcs_v3_importer.py index f1c2f4b9b..4ec26eb7e 100644 --- a/datacontract/imports/odcs_v3_importer.py +++ b/datacontract/imports/odcs_v3_importer.py @@ -192,7 +192,8 @@ def import_models(odcs_contract: Dict[str, Any]) -> Dict[str, Model]: schema_physical_name = odcs_schema.get("physicalName") schema_description = odcs_schema.get("description") if odcs_schema.get("description") is not None else "" model_name = schema_physical_name if schema_physical_name is not None else schema_name - model = Model(description=" ".join(schema_description.splitlines()), type="table") + type = odcs_schema.get("physicalType") if odcs_schema.get("physicalType") is not None else "table" + model = Model(description=" ".join(schema_description.splitlines()), type=type) model.fields = import_fields( odcs_schema.get("properties"), custom_type_mappings, server_type=get_server_type(odcs_contract) ) @@ -259,9 +260,11 @@ def import_fields( for odcs_property in odcs_properties: mapped_type = map_type(odcs_property.get("logicalType"), custom_type_mappings) + if mapped_type is not None: property_name = odcs_property["name"] description = odcs_property.get("description") if odcs_property.get("description") is not None else None + field = Field( description=" ".join(description.splitlines()) if description is not None else None, type=mapped_type, @@ -278,8 +281,10 @@ def import_fields( tags=odcs_property.get("tags") if odcs_property.get("tags") is not None else None, quality=odcs_property.get("quality") if odcs_property.get("quality") is not None else [], config=import_field_config(odcs_property, server_type), + lineages=odcs_property.get("transformSourceObjects") if odcs_property.get("transformSourceObjects") is not None else None, ) result[property_name] = field + else: logger.info( f"Can't map {odcs_property.get('column')} to the Datacontract Mapping types, as there is no equivalent or special mapping. Consider introducing a customProperty 'dc_mapping_{odcs_property.get('logicalName')}' that defines your expected type as the 'value'" diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index 9fbc88f47..9a6778d54 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -184,6 +184,7 @@ class Field(pyd.BaseModel): examples: List[Any] | None = None quality: List[Quality] | None = [] config: Dict[str, Any] | None = None + lineages: List[Any] | None = None model_config = pyd.ConfigDict( extra="allow", From d8c31c7c7f5c08216c71ba3e7a9a622ab9738c57 Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Tue, 25 Mar 2025 15:57:50 -0400 Subject: [PATCH 04/21] bring back simple-ddl-parser --- datacontract/imports/sql_importer.py | 182 ++++++++------------------- pyproject.toml | 2 +- 2 files changed, 51 insertions(+), 133 deletions(-) diff --git a/datacontract/imports/sql_importer.py b/datacontract/imports/sql_importer.py index c51e4272c..6a4af41cf 100644 --- a/datacontract/imports/sql_importer.py +++ b/datacontract/imports/sql_importer.py @@ -1,8 +1,7 @@ import logging import os -import sqlglot -from sqlglot.dialects.dialect import Dialects +from simple_ddl_parser import parse_from_file from datacontract.imports.importer import Importer from datacontract.model.data_contract_specification import DataContractSpecification, Field, Model, Server @@ -20,12 +19,11 @@ def import_source( def import_sql( data_contract_specification: DataContractSpecification, format: str, source: str, import_args: dict = None ) -> DataContractSpecification: - sql = read_file(source) dialect = to_dialect(import_args) try: - parsed = sqlglot.parse_one(sql=sql, read=dialect) + ddl = parse_from_file(source, group_by_type=True, encoding = "cp1252", output_mode = dialect ) except Exception as e: logging.error(f"Error parsing SQL: {str(e)}") raise DataContractException( @@ -36,104 +34,72 @@ def import_sql( result=ResultEnum.error, ) - server_type: str | None = to_server_type(source, dialect) + server_type: str | None = dialect if server_type is not None: data_contract_specification.servers[server_type] = Server(type=server_type) - tables = parsed.find_all(sqlglot.expressions.Table) + tables = ddl["tables"] for table in tables: if data_contract_specification.models is None: data_contract_specification.models = {} - table_name = table.this.name + table_name = table["table_name"] fields = {} - for column in parsed.find_all(sqlglot.exp.ColumnDef): - if column.parent.this.name != table_name: - continue - + for column in table["columns"]: field = Field() - col_name = column.this.name - col_type = to_col_type(column, dialect) - field.type = map_type_from_sql(col_type) - col_description = get_description(column) - field.description = col_description - field.maxLength = get_max_length(column) - precision, scale = get_precision_scale(column) - field.precision = precision - field.scale = scale - field.primaryKey = get_primary_key(column) - field.required = column.find(sqlglot.exp.NotNullColumnConstraint) is not None or None - physical_type_key = to_physical_type_key(dialect) - field.config = { - physical_type_key: col_type, - } - - fields[col_name] = field - + field.type = map_type_from_sql(map_type_from_sql(column["type"])) + if not column["nullable"]: + field.required = True + if column["unique"]: + field.unique = True + + if column["size"] is not None and column["size"] and not isinstance(column["size"], tuple): + field.maxLength = column["size"] + elif isinstance(column["size"], tuple): + field.precision = column["size"][0] + field.scale = column["size"][1] + + field.description = column["comment"][1:-1] if column.get("comment") else None + field.required = column["nullable"] + if column.get("with_tag"): + field.tags = ", ".join(column["with_tag"]) + if column.get("with_masking_policy"): + field.classification = ", ".join(column["with_masking_policy"]) + if column.get("generated"): + field.examples = str(column["generated"]) + field.unique = column["unique"] + + fields[column["name"]] = field + + if table.get("constraints"): + if table["constraints"].get("primary_key"): + for primary_key in table["constraints"]["primary_key"]["columns"]: + if primary_key in fields: + fields[primary_key].unique = True + fields[primary_key].required = True + fields[primary_key].primaryKey = True + + table_description = table["comment"][1:-1] if table.get("comment") else None + table_tags = table["with_tag"][1:-1] if table.get("with_tag") else None + data_contract_specification.models[table_name] = Model( type="table", + description=table_description, + tags=table_tags, fields=fields, ) return data_contract_specification - -def get_primary_key(column) -> bool | None: - if column.find(sqlglot.exp.PrimaryKeyColumnConstraint) is not None: - return True - if column.find(sqlglot.exp.PrimaryKey) is not None: - return True - return None - - -def to_dialect(import_args: dict) -> Dialects | None: +def to_dialect(import_args: dict) -> str | None: if import_args is None: return None if "dialect" not in import_args: return None dialect = import_args.get("dialect") - if dialect is None: - return None - if dialect == "sqlserver": - return Dialects.TSQL - if dialect.upper() in Dialects.__members__: - return Dialects[dialect.upper()] - if dialect == "sqlserver": - return Dialects.TSQL - return None - - -def to_physical_type_key(dialect: Dialects | None) -> str: - dialect_map = { - Dialects.TSQL: "sqlserverType", - Dialects.POSTGRES: "postgresType", - Dialects.BIGQUERY: "bigqueryType", - Dialects.SNOWFLAKE: "snowflakeType", - Dialects.REDSHIFT: "redshiftType", - Dialects.ORACLE: "oracleType", - Dialects.MYSQL: "mysqlType", - Dialects.DATABRICKS: "databricksType", - } - return dialect_map.get(dialect, "physicalType") - - -def to_server_type(source, dialect: Dialects | None) -> str | None: - if dialect is None: - return None - dialect_map = { - Dialects.TSQL: "sqlserver", - Dialects.POSTGRES: "postgres", - Dialects.BIGQUERY: "bigquery", - Dialects.SNOWFLAKE: "snowflake", - Dialects.REDSHIFT: "redshift", - Dialects.ORACLE: "oracle", - Dialects.MYSQL: "mysql", - Dialects.DATABRICKS: "databricks", - } - return dialect_map.get(dialect, None) - + return dialect def to_col_type(column, dialect): col_type_kind = column.args["kind"] @@ -142,62 +108,12 @@ def to_col_type(column, dialect): return col_type_kind.sql(dialect) - def to_col_type_normalized(column): col_type = column.args["kind"].this.name if col_type is None: return None return col_type.lower() - -def get_description(column: sqlglot.expressions.ColumnDef) -> str | None: - if column.comments is None: - return None - return " ".join(comment.strip() for comment in column.comments) - - -def get_max_length(column: sqlglot.expressions.ColumnDef) -> int | None: - col_type = to_col_type_normalized(column) - if col_type is None: - return None - if col_type not in ["varchar", "char", "nvarchar", "nchar"]: - return None - col_params = list(column.args["kind"].find_all(sqlglot.expressions.DataTypeParam)) - max_length_str = None - if len(col_params) == 0: - return None - if len(col_params) == 1: - max_length_str = col_params[0].name - if len(col_params) == 2: - max_length_str = col_params[1].name - if max_length_str is not None: - return int(max_length_str) if max_length_str.isdigit() else None - - -def get_precision_scale(column): - col_type = to_col_type_normalized(column) - if col_type is None: - return None, None - if col_type not in ["decimal", "numeric", "float", "number"]: - return None, None - col_params = list(column.args["kind"].find_all(sqlglot.expressions.DataTypeParam)) - if len(col_params) == 0: - return None, None - if len(col_params) == 1: - if not col_params[0].name.isdigit(): - return None, None - precision = int(col_params[0].name) - scale = 0 - return precision, scale - if len(col_params) == 2: - if not col_params[0].name.isdigit() or not col_params[1].name.isdigit(): - return None, None - precision = int(col_params[0].name) - scale = int(col_params[1].name) - return precision, scale - return None, None - - def map_type_from_sql(sql_type: str): if sql_type is None: return None @@ -218,14 +134,16 @@ def map_type_from_sql(sql_type: str): return "string" elif sql_type_normed.startswith("ntext"): return "string" + elif sql_type_normed.startswith("number"): + return "decimal" elif sql_type_normed.startswith("int"): - return "int" + return "decimal" elif sql_type_normed.startswith("bigint"): return "long" elif sql_type_normed.startswith("tinyint"): - return "int" + return "decimal" elif sql_type_normed.startswith("smallint"): - return "int" + return "decimal" elif sql_type_normed.startswith("float"): return "float" elif sql_type_normed.startswith("decimal"): diff --git a/pyproject.toml b/pyproject.toml index 9d13e3f84..d6f33d455 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "numpy>=1.26.4,<2.0.0", # transitive dependency, needs to be <2.0.0 https://github.com/datacontract/datacontract-cli/issues/575 "python-multipart==0.0.20", "rich>=13.7,<13.10", - "sqlglot>=26.6.0,<27.0.0", + "simple-ddl-parser==1.7.1", "duckdb>=1.0.0,<2.0.0", "soda-core-duckdb>=3.3.20,<3.5.0", # remove setuptools when https://github.com/sodadata/soda-core/issues/2091 is resolved From 101a88f5c54ae7a80bb1edb29ecd8bd1bc5bf5f3 Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Mon, 31 Mar 2025 09:19:35 -0400 Subject: [PATCH 05/21] improve csv test with duckdb 1.2.1 (force upgrade) to support utf-8 load --- .../soda/connections/duckdb_connection.py | 102 ++++++++++++------ .../model/data_contract_specification.py | 9 ++ pyproject.toml | 18 ++-- 3 files changed, 85 insertions(+), 44 deletions(-) diff --git a/datacontract/engines/soda/connections/duckdb_connection.py b/datacontract/engines/soda/connections/duckdb_connection.py index f05fce2f6..9a05cd4ae 100644 --- a/datacontract/engines/soda/connections/duckdb_connection.py +++ b/datacontract/engines/soda/connections/duckdb_connection.py @@ -1,13 +1,11 @@ import os from typing import Any - import duckdb from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type from datacontract.model.data_contract_specification import DataContractSpecification, Server from datacontract.model.run import Run - def get_duckdb_connection( data_contract: DataContractSpecification, server: Server, @@ -33,40 +31,75 @@ def get_duckdb_connection( setup_azure_connection(con, server) for model_name, model in data_contract.models.items(): model_path = path - if "{model}" in model_path: - model_path = model_path.format(model=model_name) - run.log_info(f"Creating table {model_name} for {model_path}") - - if server.format == "json": - json_format = "auto" - if server.delimiter == "new_line": - json_format = "newline_delimited" - elif server.delimiter == "array": - json_format = "array" - con.sql(f""" - CREATE VIEW "{model_name}" AS SELECT * FROM read_json_auto('{model_path}', format='{json_format}', hive_partitioning=1); - """) - elif server.format == "parquet": - con.sql(f""" - CREATE VIEW "{model_name}" AS SELECT * FROM read_parquet('{model_path}', hive_partitioning=1); - """) - elif server.format == "csv": - columns = to_csv_types(model) - run.log_info("Using columns: " + str(columns)) - if columns is None: - con.sql( - f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1);""" - ) - else: - con.sql( - f"""CREATE VIEW "{model_name}" AS SELECT * FROM read_csv('{model_path}', hive_partitioning=1, columns={columns});""" - ) - elif server.format == "delta": - con.sql("update extensions;") # Make sure we have the latest delta extension - con.sql(f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""") + try: + if "{model}" in model_path: + model_path = model_path.format(model=model_name) + run.log_info(f"Creating table {model_name} for {model_path}") + view_ddl= "" + if server.format == "json": + json_format = "auto" + if server.delimiter == "new_line": + json_format = "newline_delimited" + elif server.delimiter == "array": + json_format = "array" + view_ddl=f""" + CREATE VIEW "{model_name}" AS SELECT * FROM read_json_auto('{model_path}', format='{json_format}', hive_partitioning=1); + """ + elif server.format == "parquet": + view_ddl=f""" + CREATE VIEW "{model_name}" AS SELECT * FROM read_parquet('{model_path}', hive_partitioning=1); + """ + elif server.format == "csv": + columns = to_csv_types(model) + run.log_info("Using columns: " + str(columns)) + # Start with the required parameter. + params = ["hive_partitioning=1"] + + # Define a mapping for CSV parameters: server attribute -> read_csv parameter name. + param_mapping = { + "delimiter": "delim", # Map server.delimiter to 'delim' + "header": "header", + "escape": "escape", + "allVarchar": "all_varchar", + "allowQuotedNulls": "allow_quoted_nulls", + "dateformat": "dateformat", + "decimalSeparator": "decimal_separator", + "newLine": "new_line", + "timestampformat": "timestampformat", + "quote": "quote", + + } + for server_attr, read_csv_param in param_mapping.items(): + value = getattr(server, server_attr, None) + if value is not None: + # Wrap string values in quotes. + if isinstance(value, str): + params.append(f"{read_csv_param}='{value}'") + else: + params.append(f"{read_csv_param}={value}") + + # Add columns if they exist. + if columns is not None: + params.append(f"columns={columns}") + + # Build the parameter string. + params_str = ", ".join(params) + + # Create the view with the assembled parameters. + view_ddl = f""" + CREATE VIEW "{model_name}" AS + SELECT * FROM read_csv('{model_path}', {params_str}); + """ + elif server.format == "delta": + con.sql("update extensions;") # Make sure we have the latest delta extension + view_ddl=f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""" + + con.sql(view_ddl) + except Exception as inst: + print(inst) + continue return con - def to_csv_types(model) -> dict[Any, str | None] | None: if model is None: return None @@ -76,7 +109,6 @@ def to_csv_types(model) -> dict[Any, str | None] | None: columns[field_name] = convert_to_duckdb_csv_type(field) return columns - def setup_s3_connection(con, server): s3_region = os.getenv("DATACONTRACT_S3_REGION") s3_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID") diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index 9a6778d54..49a4dafcc 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -61,6 +61,15 @@ class Server(pyd.BaseModel): dataset: str | None = None path: str | None = None delimiter: str | None = None + header: bool | None = None + escape: str | None = None + allVarchar: bool | None = None + allowQuotedNulls: bool | None = None + dateformat: str | None = None + decimalSeparator: str | None = None + newLine: str | None = None + timestampformat: str | None = None + quote: str | None = None endpointUrl: str | None = None location: str | None = None account: str | None = None diff --git a/pyproject.toml b/pyproject.toml index d6f33d455..c20bd8aab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ "rich>=13.7,<13.10", "simple-ddl-parser==1.7.1", "duckdb>=1.0.0,<2.0.0", - "soda-core-duckdb>=3.3.20,<3.5.0", + "soda-core-duckdb>=3.3.20,<3.5.3", # remove setuptools when https://github.com/sodadata/soda-core/issues/2091 is resolved "setuptools>=60", "python-dotenv~=1.0.0", @@ -43,7 +43,7 @@ avro = [ ] bigquery = [ - "soda-core-bigquery>=3.3.20,<3.5.0", + "soda-core-bigquery>=3.3.20,<3.5.3", ] csv = [ @@ -52,8 +52,8 @@ csv = [ ] databricks = [ - "soda-core-spark-df>=3.3.20,<3.5.0", - "soda-core-spark[databricks]>=3.3.20,<3.5.0", + "soda-core-spark-df>=3.3.20,<3.5.3", + "soda-core-spark[databricks]>=3.3.20,<3.5.3", "databricks-sql-connector>=3.7.0,<3.8.0", "databricks-sdk<0.45.0", ] @@ -64,11 +64,11 @@ iceberg = [ kafka = [ "datacontract-cli[avro]", - "soda-core-spark-df>=3.3.20,<3.5.0" + "soda-core-spark-df>=3.3.20,<3.5.3" ] postgres = [ - "soda-core-postgres>=3.3.20,<3.5.0" + "soda-core-postgres>=3.3.20,<3.5.3" ] s3 = [ @@ -82,15 +82,15 @@ azure = [ snowflake = [ "snowflake-connector-python[pandas]>=3.6,<3.14", - "soda-core-snowflake>=3.3.20,<3.5.0" + "soda-core-snowflake>=3.3.20,<3.5.3" ] sqlserver = [ - "soda-core-sqlserver>=3.3.20,<3.5.0" + "soda-core-sqlserver>=3.3.20,<3.5.3" ] trino = [ - "soda-core-trino>=3.3.20,<3.5.0" + "soda-core-trino>=3.3.20,<3.5.3" ] dbt = [ From 7bcf6d6b6bf810a94d4772eec1ba102079e74bc5 Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Mon, 31 Mar 2025 09:36:40 -0400 Subject: [PATCH 06/21] fix emojy encoding issue on windows when export html format and catalog --- datacontract/catalog/catalog.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datacontract/catalog/catalog.py b/datacontract/catalog/catalog.py index 99de87dd3..306df38a2 100644 --- a/datacontract/catalog/catalog.py +++ b/datacontract/catalog/catalog.py @@ -19,7 +19,7 @@ def create_data_contract_html(contracts, file: Path, path: Path, schema: str): file_without_suffix = file.with_suffix(".html") html_filepath = path / file_without_suffix html_filepath.parent.mkdir(parents=True, exist_ok=True) - with open(html_filepath, "w") as f: + with open(html_filepath, "w", encoding='utf-8') as f: f.write(html) contracts.append( DataContractView( @@ -42,7 +42,7 @@ class DataContractView: def create_index_html(contracts, path): index_filepath = path / "index.html" - with open(index_filepath, "w") as f: + with open(index_filepath, "w", encoding='utf-8') as f: # Load templates from templates folder package_loader = PackageLoader("datacontract", "templates") env = Environment( From fd19cebfc4214620cc7b49e9cf51b809ab6f13a2 Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Tue, 1 Apr 2025 18:53:34 -0400 Subject: [PATCH 07/21] improve json validation --- datacontract/engines/data_contract_test.py | 6 +-- .../fastjsonschema/az/az_read_files.py | 39 ++++++++++++++++-- .../fastjsonschema/check_jsonschema.py | 40 ++++++++++--------- .../fastjsonschema/s3/s3_read_files.py | 37 +++++++++++++++-- .../soda/connections/duckdb_connection.py | 13 +++++- datacontract/export/csv_type_converter.py | 2 + 6 files changed, 106 insertions(+), 31 deletions(-) diff --git a/datacontract/engines/data_contract_test.py b/datacontract/engines/data_contract_test.py index ae3b84b1f..d051d34db 100644 --- a/datacontract/engines/data_contract_test.py +++ b/datacontract/engines/data_contract_test.py @@ -43,9 +43,9 @@ def execute_data_contract_test( run.checks.extend(create_checks(data_contract_specification, server)) # TODO check server is supported type for nicer error messages - # TODO check server credentials are complete for nicer error messages - if server.format == "json" and server.type != "kafka": - check_jsonschema(run, data_contract_specification, server) + if server.format == "json" and server.type in ("azure", "s3"): + check_jsonschema(run, data_contract_specification, server) + # with soda check_soda_execute(run, data_contract_specification, server, spark, duckdb_connection) diff --git a/datacontract/engines/fastjsonschema/az/az_read_files.py b/datacontract/engines/fastjsonschema/az/az_read_files.py index b3fa057a2..ff04d53ad 100644 --- a/datacontract/engines/fastjsonschema/az/az_read_files.py +++ b/datacontract/engines/fastjsonschema/az/az_read_files.py @@ -1,24 +1,25 @@ -import logging import os from datacontract.model.exceptions import DataContractException +from datacontract.model.run import Run, ResultEnum -def yield_az_files(az_storageAccount, az_location): +def yield_az_files(run: Run, az_storageAccount, az_location): fs = az_fs(az_storageAccount) files = fs.glob(az_location) for file in files: with fs.open(file) as f: - logging.info(f"Downloading file {file}") + run.log_info(f"Downloading file {file}") yield f.read() + def az_fs(az_storageAccount): try: import adlfs except ImportError as e: raise DataContractException( type="schema", - result="failed", + result=ResultEnum.failed, name="az extra missing", reason="Install the extra datacontract-cli\\[azure] to use az", engine="datacontract", @@ -26,8 +27,38 @@ def az_fs(az_storageAccount): ) az_client_id = os.getenv("DATACONTRACT_AZURE_CLIENT_ID") + if az_client_id is None: + raise DataContractException( + type="schema", + result=ResultEnum.failed, + name="az env. variable DATACONTRACT_AZURE_CLIENT_ID missing", + reason="configure export DATACONTRACT_AZURE_CLIENT_ID=*** ", + engine="datacontract", + original_exception=e, + ) + az_client_secret = os.getenv("DATACONTRACT_AZURE_CLIENT_SECRET") + if az_client_secret is None: + raise DataContractException( + type="schema", + result=ResultEnum.failed, + name="az env. variable DATACONTRACT_AZURE_CLIENT_SECRET missing", + reason="configure export DATACONTRACT_AZURE_CLIENT_SECRET=*** ", + engine="datacontract", + original_exception=e, + ) + az_tenant_id = os.getenv("DATACONTRACT_AZURE_TENANT_ID") + if az_tenant_id is None: + raise DataContractException( + type="schema", + result=ResultEnum.failed, + name="az env. variable DATACONTRACT_AZURE_TENANT_ID missing", + reason="configure export DATACONTRACT_AZURE_TENANT_ID=*** ", + engine="datacontract", + original_exception=e, + ) + return adlfs.AzureBlobFileSystem( account_name=az_storageAccount, client_id=az_client_id, diff --git a/datacontract/engines/fastjsonschema/check_jsonschema.py b/datacontract/engines/fastjsonschema/check_jsonschema.py index f4eb100ca..6bdf9e6e8 100644 --- a/datacontract/engines/fastjsonschema/check_jsonschema.py +++ b/datacontract/engines/fastjsonschema/check_jsonschema.py @@ -87,15 +87,15 @@ def process_exceptions(run, exceptions: List[DataContractException]): def validate_json_stream( - schema: dict, model_name: str, validate: callable, json_stream: list[dict] + run: Run, schema: dict, model_name: str, validate: callable, json_stream: list[dict] ) -> List[DataContractException]: - logging.info(f"Validating JSON stream for model: '{model_name}'.") + run.log_info(f"Validating JSON stream for model: '{model_name}'.") exceptions: List[DataContractException] = [] for json_obj in json_stream: try: validate(json_obj) except JsonSchemaValueException as e: - logging.warning(f"Validation failed for JSON object with type: '{model_name}'.") + run.log_warn(f"Validation failed for JSON object with type: '{model_name}'.") primary_key_value = get_primary_key_value(schema, model_name, json_obj) exceptions.append( DataContractException( @@ -109,7 +109,7 @@ def validate_json_stream( ) ) if not exceptions: - logging.info(f"All JSON objects in the stream passed validation for model: '{model_name}'.") + run.log_info(f"All JSON objects in the stream passed validation for model: '{model_name}'.") return exceptions @@ -153,7 +153,7 @@ def process_json_file(run, schema, model_name, validate, file, delimiter): json_stream = read_json_file(file) # Validate the JSON stream and collect exceptions. - exceptions = validate_json_stream(schema, model_name, validate, json_stream) + exceptions = validate_json_stream(run, schema, model_name, validate, json_stream) # Handle all errors from schema validation. process_exceptions(run, exceptions) @@ -167,7 +167,7 @@ def process_local_file(run, server, schema, model_name, validate): if os.path.isdir(path): return process_directory(run, path, server, model_name, validate) else: - logging.info(f"Processing file {path}") + run.log_info(f"Processing file {path}") with open(path, "r") as file: process_json_file(run, schema, model_name, validate, file, server.delimiter) @@ -191,7 +191,7 @@ def process_s3_file(run, server, schema, model_name, validate): s3_location = s3_location.format(model=model_name) json_stream = None - for file_content in yield_s3_files(s3_endpoint_url, s3_location): + for file_content in yield_s3_files(run, s3_endpoint_url, s3_location): if server.delimiter == "new_line": json_stream = read_json_lines_content(file_content) elif server.delimiter == "array": @@ -209,7 +209,7 @@ def process_s3_file(run, server, schema, model_name, validate): ) # Validate the JSON stream and collect exceptions. - exceptions = validate_json_stream(schema, model_name, validate, json_stream) + exceptions = validate_json_stream(run, schema, model_name, validate, json_stream) # Handle all errors from schema validation. process_exceptions(run, exceptions) @@ -230,18 +230,20 @@ def process_azure_file(run, server, schema, model_name, validate): date = datetime.today() if "{model}" in az_location: - az_location = az_location.format(model=model_name) - if "{year}" in az_location: - az_location = az_location.format(year=date.strftime('%Y')) - if "{month}" in az_location: - az_location = az_location.format(month=date.strftime('%m')) - if "{day}" in az_location: - az_location = az_location.format(day=date.strftime('%d')) - if "{date}" in az_location: - az_location = az_location.format(date=date.strftime('%Y-%m-%d')) + date = datetime.today() + month_to_quarter = { 1: "Q1", 2: "Q1", 3: "Q1", 4: "Q2", 5: "Q2", 6: "Q2", + 7: "Q3", 8: "Q3", 9: "Q3",10: "Q4", 11: "Q4", 12: "Q4" } + + az_location = az_location.format(model=model_name, + year=date.strftime('%Y'), + month=date.strftime('%m'), + day=date.strftime('%d'), + date=date.strftime('%Y-%m-%d'), + quarter=month_to_quarter.get(date.month)) + json_stream = None - for file_content in yield_az_files(az_storageAccount, az_location): + for file_content in yield_az_files(run, az_storageAccount, az_location): if server.delimiter == "new_line": json_stream = read_json_lines_content(file_content) elif server.delimiter == "array": @@ -259,7 +261,7 @@ def process_azure_file(run, server, schema, model_name, validate): ) # Validate the JSON stream and collect exceptions. - exceptions = validate_json_stream(schema, model_name, validate, json_stream) + exceptions = validate_json_stream(run, schema, model_name, validate, json_stream) # Handle all errors from schema validation. process_exceptions(run, exceptions) diff --git a/datacontract/engines/fastjsonschema/s3/s3_read_files.py b/datacontract/engines/fastjsonschema/s3/s3_read_files.py index 87447f2ed..1dc44d333 100644 --- a/datacontract/engines/fastjsonschema/s3/s3_read_files.py +++ b/datacontract/engines/fastjsonschema/s3/s3_read_files.py @@ -1,16 +1,15 @@ -import logging import os from datacontract.model.exceptions import DataContractException -from datacontract.model.run import ResultEnum +from datacontract.model.run import Run, ResultEnum -def yield_s3_files(s3_endpoint_url, s3_location): +def yield_s3_files(run: Run, s3_endpoint_url, s3_location): fs = s3_fs(s3_endpoint_url) files = fs.glob(s3_location) for file in files: with fs.open(file) as f: - logging.info(f"Downloading file {file}") + run.log_info(f"Downloading file {file}") yield f.read() @@ -28,8 +27,38 @@ def s3_fs(s3_endpoint_url): ) aws_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID") + if aws_access_key_id is None: + raise DataContractException( + type="schema", + result=ResultEnum.failed, + name="s3 env. variable DATACONTRACT_S3_ACCESS_KEY_ID missing", + reason="configure export DATACONTRACT_S3_ACCESS_KEY_ID=*** ", + engine="datacontract", + original_exception=e, + ) + aws_secret_access_key = os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY") + if aws_secret_access_key is None: + raise DataContractException( + type="schema", + result=ResultEnum.failed, + name="s3 env. variable DATACONTRACT_S3_SECRET_ACCESS_KEY missing", + reason="configure export DATACONTRACT_S3_SECRET_ACCESS_KEY=*** ", + engine="datacontract", + original_exception=e, + ) + aws_session_token = os.getenv("DATACONTRACT_S3_SESSION_TOKEN") + if aws_session_token is None: + raise DataContractException( + type="schema", + result=ResultEnum.failed, + name="s3 env. variable DATACONTRACT_S3_SESSION_TOKEN missing", + reason="configure export DATACONTRACT_S3_SESSION_TOKEN=*** ", + engine="datacontract", + original_exception=e, + ) + return s3fs.S3FileSystem( key=aws_access_key_id, secret=aws_secret_access_key, diff --git a/datacontract/engines/soda/connections/duckdb_connection.py b/datacontract/engines/soda/connections/duckdb_connection.py index 9a05cd4ae..8dfe5ae60 100644 --- a/datacontract/engines/soda/connections/duckdb_connection.py +++ b/datacontract/engines/soda/connections/duckdb_connection.py @@ -5,6 +5,7 @@ from datacontract.export.csv_type_converter import convert_to_duckdb_csv_type from datacontract.model.data_contract_specification import DataContractSpecification, Server from datacontract.model.run import Run +from datetime import datetime def get_duckdb_connection( data_contract: DataContractSpecification, @@ -33,7 +34,16 @@ def get_duckdb_connection( model_path = path try: if "{model}" in model_path: - model_path = model_path.format(model=model_name) + date = datetime.today() + month_to_quarter = { 1: "Q1", 2: "Q1", 3: "Q1", 4: "Q2", 5: "Q2", 6: "Q2", + 7: "Q3", 8: "Q3", 9: "Q3",10: "Q4", 11: "Q4", 12: "Q4" } + + model_path = model_path.format(model=model_name, + year=date.strftime('%Y'), + month=date.strftime('%m'), + day=date.strftime('%d'), + date=date.strftime('%Y-%m-%d'), + quarter=month_to_quarter.get(date.month)) run.log_info(f"Creating table {model_name} for {model_path}") view_ddl= "" if server.format == "json": @@ -94,6 +104,7 @@ def get_duckdb_connection( con.sql("update extensions;") # Make sure we have the latest delta extension view_ddl=f"""CREATE VIEW "{model_name}" AS SELECT * FROM delta_scan('{model_path}');""" + run.log_info("Active view ddl: " +view_ddl) con.sql(view_ddl) except Exception as inst: print(inst) diff --git a/datacontract/export/csv_type_converter.py b/datacontract/export/csv_type_converter.py index 79dfe1668..9c5c3948e 100644 --- a/datacontract/export/csv_type_converter.py +++ b/datacontract/export/csv_type_converter.py @@ -33,4 +33,6 @@ def convert_to_duckdb_csv_type(field) -> None | str: return "VARCHAR" if type.lower() in ["null"]: return "SQLNULL" + if type.lower() in ["json"]: + return "JSON" return "VARCHAR" From 1c419cba78b6161cac74e43c96fd21d17b90934f Mon Sep 17 00:00:00 2001 From: Damien Maresma Date: Wed, 2 Apr 2025 21:02:26 -0400 Subject: [PATCH 08/21] add mermaid entity relation diagram rendering in catalog --- datacontract/imports/odcs_v3_importer.py | 1 + .../model/data_contract_specification.py | 20 ++++++++++++ datacontract/templates/datacontract.html | 23 ++++++++++++++ datacontract/templates/index.html | 1 + .../templates/partials/erdiagram.html | 31 +++++++++++++++++++ 5 files changed, 76 insertions(+) create mode 100644 datacontract/templates/partials/erdiagram.html diff --git a/datacontract/imports/odcs_v3_importer.py b/datacontract/imports/odcs_v3_importer.py index f937a7ca4..47667ef11 100644 --- a/datacontract/imports/odcs_v3_importer.py +++ b/datacontract/imports/odcs_v3_importer.py @@ -282,6 +282,7 @@ def import_fields( quality=odcs_property.get("quality") if odcs_property.get("quality") is not None else [], config=import_field_config(odcs_property, server_type), lineages=odcs_property.get("transformSourceObjects") if odcs_property.get("transformSourceObjects") is not None else None, + references=odcs_property.get("references") if odcs_property.get("references") is not None else None ) result[property_name] = field diff --git a/datacontract/model/data_contract_specification.py b/datacontract/model/data_contract_specification.py index 49a4dafcc..9aafbae24 100644 --- a/datacontract/model/data_contract_specification.py +++ b/datacontract/model/data_contract_specification.py @@ -338,3 +338,23 @@ def to_yaml(self): sort_keys=False, allow_unicode=True, ) + + def diagram(self) -> str | None: + mmd_entity = "" + mmd_references = [] + try: + for model_name, model in self.models.items(): + entity_block="" + for field_name, field in model.fields.items(): + entity_block += f"\t{ field_name.replace("#","Nb").replace(" ","_").replace("/","by")}{'🔑' if field.primaryKey or (field.unique and field.required) else ''}{'⌘' if field.references else''} {field.type}\n" + if field.references: + mmd_references.append(f'"📑{field.references.split(".")[0] if "." in field.references else ""}"' + "}o--{ ||"+f'"📑{model_name}"') + mmd_entity+= f'\t"📑{model_name}"'+'{\n' + entity_block + '}\n' + + if mmd_entity == "": + return None + else: + return f"{mmd_entity}\n" + except Exception as e: + print(f"error : {e}, {self}") + return None diff --git a/datacontract/templates/datacontract.html b/datacontract/templates/datacontract.html index 4bd126906..937bda54e 100644 --- a/datacontract/templates/datacontract.html +++ b/datacontract/templates/datacontract.html @@ -4,10 +4,24 @@ Data Contract + {# #} + @@ -73,6 +87,15 @@

+ {% if datacontract.diagram() %} +
+

Diagram

+

Entity relationship diagram

+
+
+ {{ render_partial('partials/erdiagram.html', datacontract = datacontract) }} +
+ {% endif %}
{{ render_partial('partials/datacontract_information.html', datacontract = datacontract) }}
diff --git a/datacontract/templates/index.html b/datacontract/templates/index.html index 8c15c221d..0c22b2563 100644 --- a/datacontract/templates/index.html +++ b/datacontract/templates/index.html @@ -4,6 +4,7 @@ Data Contract + {# #}