Skip to content

Commit 1978b16

Browse files
committed
Revert plan-related triggers
1 parent 2af2100 commit 1978b16

File tree

6 files changed

+5
-127
lines changed

6 files changed

+5
-127
lines changed

sqlmesh/core/console.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,10 +3830,6 @@ def update_snapshot_evaluation_progress(
38303830
message += f" | auto_restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.auto_restatement_triggers)}"
38313831
if snapshot_evaluation_triggers.select_snapshot_triggers:
38323832
message += f" | select_snapshot_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.select_snapshot_triggers)}"
3833-
if snapshot_evaluation_triggers.directly_modified_triggers:
3834-
message += f" | directly_modified_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.directly_modified_triggers)}"
3835-
if snapshot_evaluation_triggers.restatement_triggers:
3836-
message += f" | restatement_triggers={','.join(trigger.name for trigger in snapshot_evaluation_triggers.restatement_triggers)}"
38373833

38383834
if audit_only:
38393835
message = f"Audited {snapshot.name} duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"

sqlmesh/core/plan/builder.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ def build(self) -> Plan:
289289
else DeployabilityIndex.all_deployable()
290290
)
291291

292-
restatements, restatement_triggers = self._build_restatements(
292+
restatements = self._build_restatements(
293293
dag,
294294
earliest_interval_start(self._context_diff.snapshots.values(), self.execution_time),
295295
)
@@ -326,7 +326,6 @@ def build(self) -> Plan:
326326
indirectly_modified=indirectly_modified,
327327
deployability_index=deployability_index,
328328
restatements=restatements,
329-
restatement_triggers=restatement_triggers,
330329
start_override_per_model=self._start_override_per_model,
331330
end_override_per_model=end_override_per_model,
332331
selected_models_to_backfill=self._backfill_models,
@@ -348,14 +347,14 @@ def _build_dag(self) -> DAG[SnapshotId]:
348347

349348
def _build_restatements(
350349
self, dag: DAG[SnapshotId], earliest_interval_start: TimeLike
351-
) -> t.Tuple[t.Dict[SnapshotId, Interval], t.Dict[SnapshotId, t.List[SnapshotId]]]:
350+
) -> t.Dict[SnapshotId, Interval]:
352351
restate_models = self._restate_models
353352
if restate_models == set():
354353
# This is a warning but we print this as error since the Console is lacking API for warnings.
355354
self._console.log_error(
356355
"Provided restated models do not match any models. No models will be included in plan."
357356
)
358-
return {}, {}
357+
return {}
359358

360359
restatements: t.Dict[SnapshotId, Interval] = {}
361360
forward_only_preview_needed = self._forward_only_preview_needed
@@ -379,7 +378,7 @@ def _build_restatements(
379378
is_preview = True
380379

381380
if not restate_models:
382-
return {}, {}
381+
return {}
383382

384383
start = self._start or earliest_interval_start
385384
end = self._end or now()
@@ -389,7 +388,6 @@ def _build_restatements(
389388
if model_fqn not in self._model_fqn_to_snapshot:
390389
raise PlanError(f"Cannot restate model '{model_fqn}'. Model does not exist.")
391390

392-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
393391
# Get restatement intervals for all restated snapshots and make sure that if an incremental snapshot expands it's
394392
# restatement range that it's downstream dependencies all expand their restatement ranges as well.
395393
for s_id in dag:
@@ -425,13 +423,6 @@ def _build_restatements(
425423
logger.info("Skipping restatement for model '%s'", snapshot.name)
426424
continue
427425

428-
if snapshot.name in restate_models:
429-
restatement_triggers[s_id] = [s_id]
430-
if restating_parents:
431-
restatement_triggers[s_id] = restatement_triggers.get(s_id, []) + [
432-
s.snapshot_id for s in restating_parents
433-
]
434-
435426
possible_intervals = {
436427
restatements[p.snapshot_id] for p in restating_parents if p.is_incremental
437428
}
@@ -460,7 +451,7 @@ def _build_restatements(
460451

461452
restatements[s_id] = (snapshot_start, snapshot_end)
462453

463-
return restatements, restatement_triggers
454+
return restatements
464455

465456
def _build_directly_and_indirectly_modified(
466457
self, dag: DAG[SnapshotId]

sqlmesh/core/plan/definition.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ class Plan(PydanticModel, frozen=True):
5757

5858
deployability_index: DeployabilityIndex
5959
restatements: t.Dict[SnapshotId, Interval]
60-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
6160
start_override_per_model: t.Optional[t.Dict[str, datetime]]
6261
end_override_per_model: t.Optional[t.Dict[str, datetime]]
6362

@@ -255,7 +254,6 @@ def to_evaluatable(self) -> EvaluatablePlan:
255254
skip_backfill=self.skip_backfill,
256255
empty_backfill=self.empty_backfill,
257256
restatements={s.name: i for s, i in self.restatements.items()},
258-
restatement_triggers=self.restatement_triggers,
259257
is_dev=self.is_dev,
260258
allow_destructive_models=self.allow_destructive_models,
261259
forward_only=self.forward_only,
@@ -297,7 +295,6 @@ class EvaluatablePlan(PydanticModel):
297295
skip_backfill: bool
298296
empty_backfill: bool
299297
restatements: t.Dict[str, Interval]
300-
restatement_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
301298
is_dev: bool
302299
allow_destructive_models: t.Set[str]
303300
forward_only: bool

sqlmesh/core/plan/evaluator.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
SnapshotCreationFailedError,
3838
SnapshotNameVersion,
3939
)
40-
from sqlmesh.core.snapshot.definition import SnapshotEvaluationTriggers
4140
from sqlmesh.utils import to_snake_case
4241
from sqlmesh.core.state_sync import StateSync
4342
from sqlmesh.utils import CorrelationId
@@ -235,27 +234,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
235234
self.console.log_success("SKIP: No model batches to execute")
236235
return
237236

238-
directly_modified_triggers: t.Dict[SnapshotId, t.List[SnapshotId]] = {}
239-
for parent, children in plan.indirectly_modified_snapshots.items():
240-
parent_id = stage.all_snapshots[parent].snapshot_id
241-
directly_modified_triggers[parent_id] = directly_modified_triggers.get(
242-
parent_id, []
243-
) + [parent_id]
244-
for child in children:
245-
directly_modified_triggers[child] = directly_modified_triggers.get(child, []) + [
246-
parent_id
247-
]
248-
directly_modified_triggers = {
249-
k: list(dict.fromkeys(v)) for k, v in directly_modified_triggers.items()
250-
}
251-
snapshot_evaluation_triggers = {
252-
s_id: SnapshotEvaluationTriggers(
253-
directly_modified_triggers=directly_modified_triggers.get(s_id, []),
254-
restatement_triggers=plan.restatement_triggers.get(s_id, []),
255-
)
256-
for s_id in [s.snapshot_id for s in stage.all_snapshots.values()]
257-
}
258-
259237
scheduler = self.create_scheduler(stage.all_snapshots.values(), self.snapshot_evaluator)
260238
# Convert model name restatements to snapshot ID restatements
261239
restatements_by_snapshot_id = {
@@ -271,7 +249,6 @@ def visit_backfill_stage(self, stage: stages.BackfillStage, plan: EvaluatablePla
271249
start=plan.start,
272250
end=plan.end,
273251
restatements=restatements_by_snapshot_id,
274-
snapshot_evaluation_triggers=snapshot_evaluation_triggers,
275252
)
276253
if errors:
277254
raise PlanError("Plan application failed.")

sqlmesh/core/snapshot/definition.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,6 @@ class SnapshotEvaluationTriggers(PydanticModel):
331331
cron_ready: t.Optional[bool] = None
332332
auto_restatement_triggers: t.List[SnapshotId] = []
333333
select_snapshot_triggers: t.List[SnapshotId] = []
334-
directly_modified_triggers: t.List[SnapshotId] = []
335-
restatement_triggers: t.List[SnapshotId] = []
336334

337335

338336
class SnapshotInfoMixin(ModelKindMixin):

tests/core/test_integration.py

Lines changed: 0 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727

2828
from sqlmesh import CustomMaterialization
29-
import sqlmesh
3029
from sqlmesh.cli.project_init import init_example_project
3130
from sqlmesh.core import constants as c
3231
from sqlmesh.core import dialect as d
@@ -1806,27 +1805,6 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
18061805
context, plan = init_and_plan_context("examples/sushi")
18071806
context.apply(plan)
18081807

1809-
# modify 3 models
1810-
# - 2 breaking changes for testing plan directly modified triggers
1811-
# - 1 adding an auto-restatement for subsequent `run` test
1812-
marketing = context.get_model("sushi.marketing")
1813-
marketing_kwargs = {
1814-
**marketing.dict(),
1815-
"query": d.parse_one(
1816-
f"{marketing.query.sql(dialect='duckdb')} ORDER BY customer_id", dialect="duckdb"
1817-
),
1818-
}
1819-
context.upsert_model(SqlModel.parse_obj(marketing_kwargs))
1820-
1821-
customers = context.get_model("sushi.customers")
1822-
customers_kwargs = {
1823-
**customers.dict(),
1824-
"query": d.parse_one(
1825-
f"{customers.query.sql(dialect='duckdb')} ORDER BY customer_id", dialect="duckdb"
1826-
),
1827-
}
1828-
context.upsert_model(SqlModel.parse_obj(customers_kwargs))
1829-
18301808
# add auto restatement to orders
18311809
orders = context.get_model("sushi.orders")
18321810
orders_kind = {
@@ -1839,67 +1817,8 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
18391817
}
18401818
context.upsert_model(PythonModel.parse_obj(orders_kwargs))
18411819

1842-
spy = mocker.spy(sqlmesh.core.scheduler.Scheduler, "run_merged_intervals")
1843-
18441820
context.plan(auto_apply=True, no_prompts=True, categorizer_config=CategorizerConfig.all_full())
18451821

1846-
# PLAN: directly modified triggers
1847-
actual_triggers = spy.call_args.kwargs["snapshot_evaluation_triggers"]
1848-
actual_triggers_name = {
1849-
k.name: sorted([s.name for s in v.directly_modified_triggers])
1850-
for k, v in actual_triggers.items()
1851-
if v.directly_modified_triggers
1852-
}
1853-
marketing_name = '"memory"."sushi"."marketing"'
1854-
customers_name = '"memory"."sushi"."customers"'
1855-
marketing_customers_names = sorted([marketing_name, customers_name])
1856-
children_names = [
1857-
f'"memory"."sushi"."{model}"'
1858-
for model in {
1859-
"waiter_as_customer_by_day",
1860-
"active_customers",
1861-
"count_customers_active",
1862-
"count_customers_inactive",
1863-
}
1864-
]
1865-
assert actual_triggers_name == {
1866-
marketing_name: [marketing_name],
1867-
customers_name: [customers_name],
1868-
**{k: marketing_customers_names for k in children_names},
1869-
}
1870-
1871-
# PLAN: restatement triggers
1872-
spy.reset_mock()
1873-
context.plan(
1874-
restate_models=[
1875-
'"memory"."sushi"."marketing"',
1876-
'"memory"."sushi"."order_items"',
1877-
'"memory"."sushi"."waiter_revenue_by_day"',
1878-
],
1879-
auto_apply=True,
1880-
no_prompts=True,
1881-
)
1882-
1883-
order_items_name = '"memory"."sushi"."order_items"'
1884-
waiter_revenue_by_day_name = '"memory"."sushi"."waiter_revenue_by_day"'
1885-
actual_triggers = spy.call_args.kwargs["snapshot_evaluation_triggers"]
1886-
actual_triggers_name = {
1887-
k.name: sorted([s.name for s in v.restatement_triggers])
1888-
for k, v in actual_triggers.items()
1889-
if v.restatement_triggers
1890-
}
1891-
1892-
assert sorted(actual_triggers_name[waiter_revenue_by_day_name]) == sorted(
1893-
[waiter_revenue_by_day_name, order_items_name]
1894-
)
1895-
assert actual_triggers_name[order_items_name] == [order_items_name]
1896-
assert actual_triggers_name['"memory"."sushi"."top_waiters"'] == [waiter_revenue_by_day_name]
1897-
assert actual_triggers_name['"memory"."sushi"."customer_revenue_by_day"'] == [order_items_name]
1898-
assert actual_triggers_name['"memory"."sushi"."customer_revenue_lifetime"'] == [
1899-
order_items_name
1900-
]
1901-
1902-
# RUN: select and auto-restatement triggers
19031822
# User selects top_waiters and waiter_revenue_by_day, others added as auto-upstream
19041823
selected_models = {"top_waiters", "waiter_revenue_by_day"}
19051824
selected_models_auto_upstream = {"order_items", "orders", "items"}

0 commit comments

Comments
 (0)