Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e94e6ec
Merge pull request #5 from datacontract/main
dmaresma Jun 11, 2025
657b68d
init. snowflake sql ddl import to datacontract
dmaresma Jun 11, 2025
a224aba
apply ruff check and format
dmaresma Jun 11, 2025
327c21a
align import
dmaresma Jun 11, 2025
234c2fb
add dialect
dmaresma Jun 11, 2025
5d412fd
sqlglot ${} token bypass and waiting for NOORDER ORDER AUTOINCREMENT …
dmaresma Jun 13, 2025
76d53b8
fix regression on sql server side (no formal or declarative comments)
dmaresma Jun 13, 2025
020d879
type variant not allow in lint DataContract(data_contract_str=expect…
dmaresma Jun 14, 2025
e2ee1e8
remove simple-ddl-parser dependency
dmaresma Jun 14, 2025
ab60f5c
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jun 29, 2025
dd2a399
fix error message
dmaresma Jul 10, 2025
6d2a8df
Merge branch 'feat/snowflake_ddl_sql_import' of https://github.com/dm…
dmaresma Jul 10, 2025
d3759c9
fix specification version in test
dmaresma Jul 10, 2025
d29d770
refactor get_model_form_parsed add table desc, table tag
dmaresma Jul 10, 2025
cff64f8
fix format issue
dmaresma Jul 10, 2025
c6bf517
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jul 10, 2025
f569c9f
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jul 11, 2025
1186bb3
add script token remover function
dmaresma Jul 27, 2025
eb718c5
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Jul 28, 2025
593358c
Merge branch 'main' into feat/snowflake_ddl_sql_import
dmaresma Aug 5, 2025
1b44135
add money datatype #751
dmaresma Aug 25, 2025
29f371e
ignoe jinja
dmaresma Aug 27, 2025
9669bc1
Merge branch 'current_main' into feat/snowflake_ddl_sql_import
dmaresma Jan 26, 2026
e55b4af
odcs 3.1 introduce timestamp as logicalType, and fix typos
dmaresma Jan 26, 2026
8202145
fix dbml logicalType with 3.1 timestamp allow instead of date
dmaresma Jan 26, 2026
14ac615
logicalType: date to timestamp when physicalType is timestamp
dmaresma Jan 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datacontract/imports/odcs_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def create_schema_object(
description: str = None,
business_name: str = None,
properties: List[SchemaProperty] = None,
tags: List[str] = None,
) -> SchemaObject:
"""Create a SchemaObject (equivalent to DCS Model)."""
schema = SchemaObject(
Expand All @@ -48,6 +49,8 @@ def create_schema_object(
schema.businessName = business_name
if properties:
schema.properties = properties
if tags:
schema.tags = tags
return schema


Expand Down Expand Up @@ -130,9 +133,7 @@ def create_property(

# Custom properties
if custom_properties:
prop.customProperties = [
CustomProperty(property=k, value=v) for k, v in custom_properties.items()
]
prop.customProperties = [CustomProperty(property=k, value=v) for k, v in custom_properties.items()]

return prop

Expand Down
131 changes: 87 additions & 44 deletions datacontract/imports/sql_importer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import re

import sqlglot
from open_data_contract_standard.model import OpenDataContractStandard
Expand All @@ -17,22 +18,23 @@


class SqlImporter(Importer):
def import_source(
self, source: str, import_args: dict
) -> OpenDataContractStandard:
def import_source(self, source: str, import_args: dict) -> OpenDataContractStandard:
return import_sql(self.import_format, source, import_args)


def import_sql(
format: str, source: str, import_args: dict = None
) -> OpenDataContractStandard:
def import_sql(format: str, source: str, import_args: dict = None) -> OpenDataContractStandard:
sql = read_file(source)
dialect = to_dialect(import_args)
dialect = to_dialect(import_args) or None

parsed = None

try:
parsed = sqlglot.parse_one(sql=sql, read=dialect)

tables = parsed.find_all(sqlglot.expressions.Table)

except Exception as e:
logging.error(f"Error parsing SQL: {str(e)}")
logging.error(f"Error sqlglot SQL: {str(e)}")
raise DataContractException(
type="import",
name=f"Reading source from {source}",
Expand Down Expand Up @@ -67,6 +69,7 @@ def import_sql(
precision, scale = get_precision_scale(column)
is_primary_key = get_primary_key(column)
is_required = column.find(sqlglot.exp.NotNullColumnConstraint) is not None or None
tags = get_tags(column)

prop = create_property(
name=col_name,
Expand All @@ -79,23 +82,63 @@ def import_sql(
primary_key=is_primary_key,
primary_key_position=primary_key_position if is_primary_key else None,
required=is_required if is_required else None,
tags=tags,
)

if is_primary_key:
primary_key_position += 1

properties.append(prop)

table_comment_property = parsed.find(sqlglot.expressions.SchemaCommentProperty)

if table_comment_property:
table_description = table_comment_property.this.this

prop = parsed.find(sqlglot.expressions.Properties)
if prop:
tags = prop.find(sqlglot.expressions.Tags)
if tags:
tag_enum = tags.find(sqlglot.expressions.Property)
table_tags = [str(t) for t in tag_enum]

schema_obj = create_schema_object(
name=table_name,
physical_type="table",
description=table_description if table_comment_property else None,
tags=table_tags if tags else None,
properties=properties,
)
odcs.schema_.append(schema_obj)

return odcs


def map_physical_type(column, dialect) -> str | None:
autoincrement = ""
if column.get("autoincrement") and dialect == Dialects.SNOWFLAKE:
autoincrement = " AUTOINCREMENT" + " START " + str(column.get("start")) if column.get("start") else ""
autoincrement += " INCREMENT " + str(column.get("increment")) if column.get("increment") else ""
autoincrement += " NOORDER" if not column.get("increment_order") else ""
elif column.get("autoincrement"):
autoincrement = " IDENTITY"

if column.get("size") and isinstance(column.get("size"), tuple):
return (
column.get("type")
+ "("
+ str(column.get("size")[0])
+ ","
+ str(column.get("size")[1])
+ ")"
+ autoincrement
)
elif column.get("size"):
return column.get("type") + "(" + str(column.get("size")) + ")" + autoincrement
else:
return column.get("type") + autoincrement


def get_primary_key(column) -> bool | None:
if column.find(sqlglot.exp.PrimaryKeyColumnConstraint) is not None:
return True
Expand All @@ -116,25 +159,7 @@ def to_dialect(import_args: dict) -> Dialects | None:
return Dialects.TSQL
if dialect.upper() in Dialects.__members__:
return Dialects[dialect.upper()]
if dialect == "sqlserver":
return Dialects.TSQL
return None


def to_physical_type_key(dialect: Dialects | str | None) -> str:
dialect_map = {
Dialects.TSQL: "sqlserverType",
Dialects.POSTGRES: "postgresType",
Dialects.BIGQUERY: "bigqueryType",
Dialects.SNOWFLAKE: "snowflakeType",
Dialects.REDSHIFT: "redshiftType",
Dialects.ORACLE: "oracleType",
Dialects.MYSQL: "mysqlType",
Dialects.DATABRICKS: "databricksType",
}
if isinstance(dialect, str):
dialect = Dialects[dialect.upper()] if dialect.upper() in Dialects.__members__ else None
return dialect_map.get(dialect, "physicalType")
return "None"


def to_server_type(source, dialect: Dialects | None) -> str | None:
Expand Down Expand Up @@ -170,10 +195,23 @@ def to_col_type_normalized(column):

def get_description(column: sqlglot.expressions.ColumnDef) -> str | None:
if column.comments is None:
return None
description = column.find(sqlglot.expressions.CommentColumnConstraint)
if description:
return description.this.this
else:
return None
return " ".join(comment.strip() for comment in column.comments)


def get_tags(column: sqlglot.expressions.ColumnDef) -> str | None:
tags = column.find(sqlglot.expressions.Tags)
if tags:
tag_enum = tags.find(sqlglot.expressions.Property)
return [str(t) for t in tag_enum]
else:
return None


def get_max_length(column: sqlglot.expressions.ColumnDef) -> int | None:
col_type = to_col_type_normalized(column)
if col_type is None:
Expand Down Expand Up @@ -237,30 +275,28 @@ def map_type_from_sql(sql_type: str) -> str | None:
return "string"
elif sql_type_normed.startswith("ntext"):
return "string"
elif sql_type_normed.startswith("int") and not sql_type_normed.startswith("interval"):
return "integer"
elif sql_type_normed.startswith("bigint"):
return "integer"
elif sql_type_normed.startswith("tinyint"):
elif sql_type_normed.endswith("integer"):
return "integer"
elif sql_type_normed.startswith("smallint"):
elif sql_type_normed.endswith("int"): # covers int, bigint, smallint, tinyint
return "integer"
elif sql_type_normed.startswith("float"):
elif sql_type_normed.startswith("float") or sql_type_normed.startswith("double") or sql_type_normed == "real":
return "number"
elif sql_type_normed.startswith("double"):
elif sql_type_normed.startswith("number"):
return "number"
elif sql_type_normed.startswith("numeric"):
return "number"
elif sql_type_normed.startswith("decimal"):
return "number"
elif sql_type_normed.startswith("numeric"):
elif sql_type_normed.startswith("money"):
return "number"
elif sql_type_normed.startswith("bool"):
return "boolean"
elif sql_type_normed.startswith("bit"):
return "boolean"
elif sql_type_normed.startswith("binary"):
return "array"
return "object"
elif sql_type_normed.startswith("varbinary"):
return "array"
return "object"
elif sql_type_normed.startswith("raw"):
return "array"
elif sql_type_normed == "blob" or sql_type_normed == "bfile":
Expand All @@ -270,12 +306,10 @@ def map_type_from_sql(sql_type: str) -> str | None:
elif sql_type_normed == "time":
return "string"
elif sql_type_normed.startswith("timestamp"):
return "date"
elif sql_type_normed == "datetime" or sql_type_normed == "datetime2":
return "date"
return "timestamp"
elif sql_type_normed == "smalldatetime":
return "date"
elif sql_type_normed == "datetimeoffset":
elif sql_type_normed.startswith("datetime"): # tsql datatime2
return "date"
elif sql_type_normed == "uniqueidentifier": # tsql
return "string"
Expand All @@ -291,6 +325,14 @@ def map_type_from_sql(sql_type: str) -> str | None:
return "object"


def remove_variable_tokens(sql_script: str) -> str:
## to cleanse sql statement's script token like $(...) in sqlcmd for T-SQL langage, ${...} for liquibase, {{}} as Jinja
## https://learn.microsoft.com/en-us/sql/tools/sqlcmd/sqlcmd-use-scripting-variables?view=sql-server-ver17#b-use-the-setvar-command-interactively
## https://docs.liquibase.com/concepts/changelogs/property-substitution.html
## https://docs.getdbt.com/guides/using-jinja?step=1
return re.sub(r"\$\((\w+)\)|\$\{(\w+)\}|\{\{(\w+)\}\}", r"\1", sql_script)


def read_file(path):
if not os.path.exists(path):
raise DataContractException(
Expand All @@ -302,4 +344,5 @@ def read_file(path):
)
with open(path, "r") as file:
file_content = file.read()
return file_content

return remove_variable_tokens(file_content)
2 changes: 1 addition & 1 deletion tests/fixtures/databricks-unity/import/datacontract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ schema:
customProperties:
- property: databricksType
value: timestamp
logicalType: date
logicalType: timestamp
- name: is_active
physicalType: boolean
customProperties:
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/dbml/import/datacontract.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ schema:
physicalType: timestamp
description: The business timestamp in UTC when the order was successfully registered
in the source system and the payment was successful.
logicalType: date
logicalType: timestamp
required: true
- name: order_total
physicalType: record
Expand All @@ -46,7 +46,7 @@ schema:
- name: processed_timestamp
physicalType: timestamp
description: The timestamp when the record was processed by the data platform.
logicalType: date
logicalType: timestamp
required: true
- name: line_items
physicalType: table
Expand Down
4 changes: 2 additions & 2 deletions tests/fixtures/dbml/import/datacontract_table_filtered.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ schema:
physicalType: timestamp
description: The business timestamp in UTC when the order was successfully registered
in the source system and the payment was successful.
logicalType: date
logicalType: timestamp
required: true
- name: order_total
physicalType: record
Expand All @@ -46,5 +46,5 @@ schema:
- name: processed_timestamp
physicalType: timestamp
description: The timestamp when the record was processed by the data platform.
logicalType: date
logicalType: timestamp
required: true
42 changes: 42 additions & 0 deletions tests/fixtures/snowflake/import/ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
CREATE TABLE IF NOT EXISTS ${database_name}.PUBLIC.my_table (
-- https://docs.snowflake.com/en/sql-reference/intro-summary-data-types
field_primary_key NUMBER(38,0) NOT NULL autoincrement start 1 increment 1 COMMENT 'Primary key',
field_not_null INT NOT NULL COMMENT 'Not null',
field_char CHAR(10) COMMENT 'Fixed-length string',
field_character CHARACTER(10) COMMENT 'Fixed-length string',
field_varchar VARCHAR(100) WITH TAG (SNOWFLAKE.CORE.PRIVACY_CATEGORY='IDENTIFIER', SNOWFLAKE.CORE.SEMANTIC_CATEGORY='NAME') COMMENT 'Variable-length string',

field_text TEXT COMMENT 'Large variable-length string',
field_string STRING COMMENT 'Large variable-length Unicode string',

field_tinyint TINYINT COMMENT 'Integer (0-255)',
field_smallint SMALLINT COMMENT 'Integer (-32,768 to 32,767)',
field_int INT COMMENT 'Integer (-2.1B to 2.1B)',
field_integer INTEGER COMMENT 'Integer full name(-2.1B to 2.1B)',
field_bigint BIGINT COMMENT 'Large integer (-9 quintillion to 9 quintillion)',

field_decimal DECIMAL(10, 2) COMMENT 'Fixed precision decimal',
field_numeric NUMERIC(10, 2) COMMENT 'Same as DECIMAL',

field_float FLOAT COMMENT 'Approximate floating-point',
field_float4 FLOAT4 COMMENT 'Approximate floating-point 4',
field_float8 FLOAT8 COMMENT 'Approximate floating-point 8',
field_real REAL COMMENT 'Smaller floating-point',

field_boulean BOOLEAN COMMENT 'Boolean-like (0 or 1)',

field_date DATE COMMENT 'Date only (YYYY-MM-DD)',
field_time TIME COMMENT 'Time only (HH:MM:SS)',
field_timestamp TIMESTAMP COMMENT 'More precise datetime',
field_timestamp_ltz TIMESTAMP_LTZ COMMENT 'More precise datetime with local time zone; time zone, if provided, isn`t stored.',
field_timestamp_ntz TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP() COMMENT 'More precise datetime with no time zone; time zone, if provided, isn`t stored.',
field_timestamp_tz TIMESTAMP_TZ COMMENT 'More precise datetime with time zone.',

field_binary BINARY(16) COMMENT 'Fixed-length binary',
field_varbinary VARBINARY(100) COMMENT 'Variable-length binary',

field_variant VARIANT COMMENT 'VARIANT data',
field_json OBJECT COMMENT 'JSON (Stored as text)',
UNIQUE(field_not_null),
PRIMARY KEY (field_primary_key)
) COMMENT = 'My Comment'
10 changes: 5 additions & 5 deletions tests/test_import_sql_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ def test_import_sql_oracle():
physicalType: DOUBLE PRECISION
description: 64-bit floating point number
- name: field_timestamp
logicalType: date
logicalType: timestamp
physicalType: TIMESTAMP
description: Timestamp with fractional second precision of 6, no timezones
- name: field_timestamp_tz
logicalType: date
logicalType: timestamp
physicalType: TIMESTAMP WITH TIME ZONE
description: Timestamp with fractional second precision of 6, with timezones (TZ)
- name: field_timestamp_ltz
logicalType: date
logicalType: timestamp
physicalType: TIMESTAMPLTZ
description: Timestamp with fractional second precision of 6, with local timezone (LTZ)
- name: field_interval_year
Expand Down Expand Up @@ -176,7 +176,7 @@ def test_import_sql_constraints():
physicalType: VARCHAR(30)
required: true
- name: create_date
logicalType: date
logicalType: timestamp
physicalType: TIMESTAMP
required: true
- name: changed_by
Expand All @@ -185,7 +185,7 @@ def test_import_sql_constraints():
maxLength: 30
physicalType: VARCHAR(30)
- name: change_date
logicalType: date
logicalType: timestamp
physicalType: TIMESTAMP
- name: name
logicalType: string
Expand Down
Loading
Loading