Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1586,6 +1586,7 @@ def render_seed(self) -> t.Iterator[QueryOrDF]:
string_columns = []

columns_to_types = self.columns_to_types_ or {}
column_names_to_check = set(columns_to_types)
for name, tpe in columns_to_types.items():
if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32):
date_columns.append(name)
Expand All @@ -1605,6 +1606,14 @@ def render_seed(self) -> t.Iterator[QueryOrDF]:
rename_dict[normalized_name] = column
if rename_dict:
df.rename(columns=rename_dict, inplace=True)
# These names have already been checked
column_names_to_check -= set(rename_dict)

missing_columns = column_names_to_check - set(df.columns)
if missing_columns:
raise_config_error(
f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path
)

# convert all date/time types to native pandas timestamp
for column in [*date_columns, *datetime_columns]:
Expand All @@ -1614,7 +1623,15 @@ def render_seed(self) -> t.Iterator[QueryOrDF]:

# extract datetime.date from pandas timestamp for DATE columns
for column in date_columns:
df[column] = df[column].dt.date
try:
df[column] = df[column].dt.date
except Exception as ex:
logger.error(
"Failed to convert column '%s' to date in seed model '%s': %s",
column,
self.name,
ex,
)

for column in bool_columns:
df[column] = df[column].apply(lambda i: str_to_bool(str(i)))
Expand Down
7 changes: 6 additions & 1 deletion sqlmesh/dbt/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from sqlmesh.core.config.common import VirtualEnvironmentMode
from sqlmesh.core.model import Model, SeedKind, create_seed_model
from sqlmesh.core.model.seed import CsvSettings
from sqlmesh.dbt.basemodel import BaseModelConfig
from sqlmesh.dbt.column import ColumnConfig

Expand Down Expand Up @@ -80,12 +81,16 @@ def to_sqlmesh(

kwargs["columns"] = new_columns

# dbt treats single whitespace as a null value
csv_settings = CsvSettings(na_values=[" "], keep_default_na=True)

return create_seed_model(
self.canonical_name(context),
SeedKind(path=seed_path),
SeedKind(path=seed_path, csv_settings=csv_settings),
dialect=self.dialect(context),
audit_definitions=audit_definitions,
virtual_environment_mode=virtual_environment_mode,
start=self.start or context.sqlmesh_config.model_defaults.start,
**kwargs,
)

Expand Down
55 changes: 55 additions & 0 deletions tests/core/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9997,6 +9997,61 @@ def test_seed_coerce_datetime(tmp_path):
assert df["bad_datetime"].iloc[0] == "9999-12-31 23:59:59"


def test_seed_invalid_date_column(tmp_path):
model_csv_path = (tmp_path / "model.csv").absolute()

with open(model_csv_path, "w", encoding="utf-8") as fd:
fd.write("bad_date\n9999-12-31\n2025-01-01\n1000-01-01")

expressions = d.parse(
f"""
MODEL (
name db.seed,
kind SEED (
path '{str(model_csv_path)}',
),
columns (
bad_date date,
),
);
"""
)

model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))
df = next(model.render(context=None))
# The conversion to date should not raise an error
assert df["bad_date"].to_list() == ["9999-12-31", "2025-01-01", "1000-01-01"]


def test_seed_missing_columns(tmp_path):
model_csv_path = (tmp_path / "model.csv").absolute()

with open(model_csv_path, "w", encoding="utf-8") as fd:
fd.write("key,value\n1,2\n3,4")

expressions = d.parse(
f"""
MODEL (
name db.seed,
kind SEED (
path '{str(model_csv_path)}',
),
columns (
key int,
value int,
missing_column int,
),
);
"""
)

model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql"))
with pytest.raises(
ConfigError, match="Seed model 'db.seed' has missing columns: {'missing_column'}.*"
):
next(model.render(context=None))


def test_missing_column_data_in_columns_key():
expressions = d.parse(
"""
Expand Down
27 changes: 27 additions & 0 deletions tests/dbt/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,33 @@ def test_seed_column_inference(tmp_path):
}


def test_seed_single_whitespace_is_na(tmp_path):
seed_csv = tmp_path / "seed.csv"
with open(seed_csv, "w", encoding="utf-8") as fd:
fd.write("col_a, col_b\n")
fd.write(" ,1\n")
fd.write("2, \n")

seed = SeedConfig(
name="test_model",
package="foo",
path=Path(seed_csv),
)

context = DbtContext()
context.project_name = "foo"
context.target = DuckDbConfig(name="target", schema="test")
sqlmesh_seed = seed.to_sqlmesh(context)
assert sqlmesh_seed.columns_to_types == {
"col_a": exp.DataType.build("int"),
"col_b": exp.DataType.build("int"),
}

df = next(sqlmesh_seed.render_seed())
assert df["col_a"].to_list() == [None, 2]
assert df["col_b"].to_list() == [1, None]


def test_seed_partial_column_inference(tmp_path):
seed_csv = tmp_path / "seed.csv"
with open(seed_csv, "w", encoding="utf-8") as fd:
Expand Down