diff --git a/sqlmesh/core/model/definition.py b/sqlmesh/core/model/definition.py index caec1c86fe..8e71b3aa02 100644 --- a/sqlmesh/core/model/definition.py +++ b/sqlmesh/core/model/definition.py @@ -1586,6 +1586,7 @@ def render_seed(self) -> t.Iterator[QueryOrDF]: string_columns = [] columns_to_types = self.columns_to_types_ or {} + column_names_to_check = set(columns_to_types) for name, tpe in columns_to_types.items(): if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32): date_columns.append(name) @@ -1605,6 +1606,14 @@ def render_seed(self) -> t.Iterator[QueryOrDF]: rename_dict[normalized_name] = column if rename_dict: df.rename(columns=rename_dict, inplace=True) + # These names have already been checked + column_names_to_check -= set(rename_dict) + + missing_columns = column_names_to_check - set(df.columns) + if missing_columns: + raise_config_error( + f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path + ) # convert all date/time types to native pandas timestamp for column in [*date_columns, *datetime_columns]: @@ -1614,7 +1623,15 @@ def render_seed(self) -> t.Iterator[QueryOrDF]: # extract datetime.date from pandas timestamp for DATE columns for column in date_columns: - df[column] = df[column].dt.date + try: + df[column] = df[column].dt.date + except Exception as ex: + logger.error( + "Failed to convert column '%s' to date in seed model '%s': %s", + column, + self.name, + ex, + ) for column in bool_columns: df[column] = df[column].apply(lambda i: str_to_bool(str(i))) diff --git a/sqlmesh/dbt/seed.py b/sqlmesh/dbt/seed.py index 10e98cf93c..a84e39e653 100644 --- a/sqlmesh/dbt/seed.py +++ b/sqlmesh/dbt/seed.py @@ -17,6 +17,7 @@ from sqlmesh.core.config.common import VirtualEnvironmentMode from sqlmesh.core.model import Model, SeedKind, create_seed_model +from sqlmesh.core.model.seed import CsvSettings from sqlmesh.dbt.basemodel import BaseModelConfig from sqlmesh.dbt.column import ColumnConfig @@ -80,12 +81,16 @@ def to_sqlmesh( kwargs["columns"] = new_columns + # dbt treats single whitespace as a null value + csv_settings = CsvSettings(na_values=[" "], keep_default_na=True) + return create_seed_model( self.canonical_name(context), - SeedKind(path=seed_path), + SeedKind(path=seed_path, csv_settings=csv_settings), dialect=self.dialect(context), audit_definitions=audit_definitions, virtual_environment_mode=virtual_environment_mode, + start=self.start or context.sqlmesh_config.model_defaults.start, **kwargs, ) diff --git a/tests/core/test_model.py b/tests/core/test_model.py index cffcc52a4e..eecc3977e7 100644 --- a/tests/core/test_model.py +++ b/tests/core/test_model.py @@ -9997,6 +9997,61 @@ def test_seed_coerce_datetime(tmp_path): assert df["bad_datetime"].iloc[0] == "9999-12-31 23:59:59" +def test_seed_invalid_date_column(tmp_path): + model_csv_path = (tmp_path / "model.csv").absolute() + + with open(model_csv_path, "w", encoding="utf-8") as fd: + fd.write("bad_date\n9999-12-31\n2025-01-01\n1000-01-01") + + expressions = d.parse( + f""" + MODEL ( + name db.seed, + kind SEED ( + path '{str(model_csv_path)}', + ), + columns ( + bad_date date, + ), + ); + """ + ) + + model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql")) + df = next(model.render(context=None)) + # The conversion to date should not raise an error + assert df["bad_date"].to_list() == ["9999-12-31", "2025-01-01", "1000-01-01"] + + +def test_seed_missing_columns(tmp_path): + model_csv_path = (tmp_path / "model.csv").absolute() + + with open(model_csv_path, "w", encoding="utf-8") as fd: + fd.write("key,value\n1,2\n3,4") + + expressions = d.parse( + f""" + MODEL ( + name db.seed, + kind SEED ( + path '{str(model_csv_path)}', + ), + columns ( + key int, + value int, + missing_column int, + ), + ); + """ + ) + + model = load_sql_based_model(expressions, path=Path("./examples/sushi/models/test_model.sql")) + with pytest.raises( + ConfigError, match="Seed model 'db.seed' has missing columns: {'missing_column'}.*" + ): + next(model.render(context=None)) + + def test_missing_column_data_in_columns_key(): expressions = d.parse( """ diff --git a/tests/dbt/test_transformation.py b/tests/dbt/test_transformation.py index 1bcc3081f7..b8c1021ba7 100644 --- a/tests/dbt/test_transformation.py +++ b/tests/dbt/test_transformation.py @@ -689,6 +689,33 @@ def test_seed_column_inference(tmp_path): } +def test_seed_single_whitespace_is_na(tmp_path): + seed_csv = tmp_path / "seed.csv" + with open(seed_csv, "w", encoding="utf-8") as fd: + fd.write("col_a, col_b\n") + fd.write(" ,1\n") + fd.write("2, \n") + + seed = SeedConfig( + name="test_model", + package="foo", + path=Path(seed_csv), + ) + + context = DbtContext() + context.project_name = "foo" + context.target = DuckDbConfig(name="target", schema="test") + sqlmesh_seed = seed.to_sqlmesh(context) + assert sqlmesh_seed.columns_to_types == { + "col_a": exp.DataType.build("int"), + "col_b": exp.DataType.build("int"), + } + + df = next(sqlmesh_seed.render_seed()) + assert df["col_a"].to_list() == [None, 2] + assert df["col_b"].to_list() == [1, None] + + def test_seed_partial_column_inference(tmp_path): seed_csv = tmp_path / "seed.csv" with open(seed_csv, "w", encoding="utf-8") as fd: