Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion sqlmesh/core/renderer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@
from sqlmesh.core import constants as c
from sqlmesh.core import dialect as d
from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
from sqlmesh.utils.date import TimeLike, date_dict, make_inclusive, to_datetime
from sqlmesh.utils.date import (
TimeLike,
date_dict,
make_inclusive,
to_datetime,
make_ts_exclusive,
to_tstz,
)
from sqlmesh.utils.errors import (
ConfigError,
ParsetimeAdapterCallError,
Expand Down Expand Up @@ -214,6 +221,17 @@ def _resolve_table(table: str | exp.Table) -> str:
dialect=self._dialect, identify=True, comments=False
)

all_refs = list(
self._jinja_macro_registry.global_objs.get("sources", {}).values() # type: ignore
) + list(
self._jinja_macro_registry.global_objs.get("refs", {}).values() # type: ignore
)
for ref in all_refs:
if ref.event_time_filter:
ref.event_time_filter["start"] = render_kwargs["start_tstz"]
ref.event_time_filter["end"] = to_tstz(
make_ts_exclusive(render_kwargs["end_tstz"], dialect=self._dialect)
)
jinja_env = self._jinja_macro_registry.build_environment(**jinja_env_kwargs)

expressions = []
Expand Down
9 changes: 9 additions & 0 deletions sqlmesh/dbt/basemodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
)
from sqlmesh.dbt.relation import Policy, RelationType
from sqlmesh.dbt.test import TestConfig
from sqlmesh.dbt.util import DBT_VERSION
from sqlmesh.utils import AttributeDict
from sqlmesh.utils.errors import ConfigError
from sqlmesh.utils.pydantic import field_validator
Expand Down Expand Up @@ -130,6 +131,7 @@ class BaseModelConfig(GeneralConfig):
grants: t.Dict[str, t.List[str]] = {}
columns: t.Dict[str, ColumnConfig] = {}
quoting: t.Dict[str, t.Optional[bool]] = {}
event_time: t.Optional[str] = None

version: t.Optional[int] = None
latest_version: t.Optional[int] = None
Expand Down Expand Up @@ -222,13 +224,20 @@ def relation_info(self) -> AttributeDict[str, t.Any]:
else:
relation_type = RelationType.Table

extras = {}
if DBT_VERSION >= (1, 9, 0) and self.event_time:
extras["event_time_filter"] = {
"field_name": self.event_time,
}

return AttributeDict(
{
"database": self.database,
"schema": self.table_schema,
"identifier": self.table_name,
"type": relation_type.value,
"quote_policy": AttributeDict(self.quoting),
**extras,
}
)

Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/dbt/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sqlmesh.dbt.column import ColumnConfig
from sqlmesh.dbt.common import GeneralConfig
from sqlmesh.dbt.relation import RelationType
from sqlmesh.dbt.util import DBT_VERSION
from sqlmesh.utils import AttributeDict
from sqlmesh.utils.errors import ConfigError

Expand Down Expand Up @@ -46,6 +47,7 @@ class SourceConfig(GeneralConfig):
external: t.Optional[t.Dict[str, t.Any]] = {}
source_meta: t.Optional[t.Dict[str, t.Any]] = {}
columns: t.Dict[str, ColumnConfig] = {}
event_time: t.Optional[str] = None

_canonical_name: t.Optional[str] = None

Expand Down Expand Up @@ -94,6 +96,11 @@ def relation_info(self) -> AttributeDict:
if external_location:
extras["external"] = external_location.replace("{name}", self.table_name)

if DBT_VERSION >= (1, 9, 0) and self.event_time:
extras["event_time_filter"] = {
"field_name": self.event_time,
}

return AttributeDict(
{
"database": self.database,
Expand Down
7 changes: 7 additions & 0 deletions sqlmesh/utils/date.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,13 @@ def make_exclusive(time: TimeLike) -> datetime:
return dt


def make_ts_exclusive(time: TimeLike, dialect: DialectType) -> datetime:
ts = to_datetime(time)
if dialect == "tsql":
return to_utc_timestamp(ts) - pd.Timedelta(1, unit="ns")
return ts + timedelta(microseconds=1)


def to_utc_timestamp(time: datetime) -> pd.Timestamp:
import pandas as pd

Expand Down
4 changes: 3 additions & 1 deletion tests/core/test_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,9 @@ def test_fingerprint_jinja_macros_global_objs(model: Model, global_obj_key: str)
)
fingerprint = fingerprint_from_node(model, nodes={})
model = model.copy()
model.jinja_macros.global_objs[global_obj_key] = AttributeDict({"test": "test"})
model.jinja_macros.global_objs[global_obj_key] = AttributeDict(
{"test": AttributeDict({"test": "test"})}
)
updated_fingerprint = fingerprint_from_node(model, nodes={})
assert updated_fingerprint.data_hash != fingerprint.data_hash
assert updated_fingerprint.metadata_hash == fingerprint.metadata_hash
Expand Down
132 changes: 132 additions & 0 deletions tests/dbt/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,135 @@ def test_load_deprecated_incremental_time_column(
"Using `time_column` on a model with incremental_strategy 'delete+insert' has been deprecated. Please use `incremental_by_time_range` instead in model 'main.incremental_time_range'."
in caplog.text
)


@pytest.mark.slow
def test_load_microbatch_with_ref(
tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project
) -> None:
yaml = YAML()
project_dir, model_dir = create_empty_project()
source_schema = {
"version": 2,
"sources": [
{
"name": "my_source",
"tables": [{"name": "my_table", "config": {"event_time": "ds_source"}}],
}
],
}
source_schema_file = model_dir / "source_schema.yml"
with open(source_schema_file, "w", encoding="utf-8") as f:
yaml.dump(source_schema, f)
# add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
microbatch_contents = """
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='ds',
begin='2020-01-01',
batch_size='day'
)
}}

SELECT cola, ds_source as ds FROM {{ source('my_source', 'my_table') }}
"""
microbatch_model_file = model_dir / "microbatch.sql"
with open(microbatch_model_file, "w", encoding="utf-8") as f:
f.write(microbatch_contents)

microbatch_two_contents = """
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='ds',
begin='2020-01-05',
batch_size='day'
)
}}

SELECT cola, ds FROM {{ ref('microbatch') }}
"""
microbatch_two_model_file = model_dir / "microbatch_two.sql"
with open(microbatch_two_model_file, "w", encoding="utf-8") as f:
f.write(microbatch_two_contents)

microbatch_snapshot_fqn = '"local"."main"."microbatch"'
microbatch_two_snapshot_fqn = '"local"."main"."microbatch_two"'
context = Context(paths=project_dir)
assert (
context.render(microbatch_snapshot_fqn, start="2025-01-01", end="2025-01-10").sql()
== 'SELECT "cola" AS "cola", "ds_source" AS "ds" FROM (SELECT * FROM "local"."my_source"."my_table" AS "my_table" WHERE "ds_source" >= \'2025-01-01 00:00:00+00:00\' AND "ds_source" < \'2025-01-11 00:00:00+00:00\') AS "_q_0"'
)
assert (
context.render(microbatch_two_snapshot_fqn, start="2025-01-01", end="2025-01-10").sql()
== 'SELECT "_q_0"."cola" AS "cola", "_q_0"."ds" AS "ds" FROM (SELECT "microbatch"."cola" AS "cola", "microbatch"."ds" AS "ds" FROM "local"."main"."microbatch" AS "microbatch" WHERE "microbatch"."ds" < \'2025-01-11 00:00:00+00:00\' AND "microbatch"."ds" >= \'2025-01-01 00:00:00+00:00\') AS "_q_0"'
)


@pytest.mark.slow
def test_load_microbatch_with_ref_no_filter(
tmp_path: Path, caplog, dbt_dummy_postgres_config: PostgresConfig, create_empty_project
) -> None:
yaml = YAML()
project_dir, model_dir = create_empty_project()
source_schema = {
"version": 2,
"sources": [
{
"name": "my_source",
"tables": [{"name": "my_table", "config": {"event_time": "ds"}}],
}
],
}
source_schema_file = model_dir / "source_schema.yml"
with open(source_schema_file, "w", encoding="utf-8") as f:
yaml.dump(source_schema, f)
# add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
microbatch_contents = """
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='ds',
begin='2020-01-01',
batch_size='day'
)
}}

SELECT cola, ds FROM {{ source('my_source', 'my_table').render() }}
"""
microbatch_model_file = model_dir / "microbatch.sql"
with open(microbatch_model_file, "w", encoding="utf-8") as f:
f.write(microbatch_contents)

microbatch_two_contents = """
{{
config(
materialized='incremental',
incremental_strategy='microbatch',
event_time='ds',
begin='2020-01-01',
batch_size='day'
)
}}

SELECT cola, ds FROM {{ ref('microbatch').render() }}
"""
microbatch_two_model_file = model_dir / "microbatch_two.sql"
with open(microbatch_two_model_file, "w", encoding="utf-8") as f:
f.write(microbatch_two_contents)

microbatch_snapshot_fqn = '"local"."main"."microbatch"'
microbatch_two_snapshot_fqn = '"local"."main"."microbatch_two"'
context = Context(paths=project_dir)
assert (
context.render(microbatch_snapshot_fqn, start="2025-01-01", end="2025-01-10").sql()
== 'SELECT "cola" AS "cola", "ds" AS "ds" FROM "local"."my_source"."my_table" AS "my_table"'
)
assert (
context.render(microbatch_two_snapshot_fqn, start="2025-01-01", end="2025-01-10").sql()
== 'SELECT "microbatch"."cola" AS "cola", "microbatch"."ds" AS "ds" FROM "local"."main"."microbatch" AS "microbatch"'
)