2626
2727
2828from sqlmesh import CustomMaterialization
29+ import sqlmesh
2930from sqlmesh .cli .project_init import init_example_project
3031from sqlmesh .core import constants as c
3132from sqlmesh .core import dialect as d
@@ -1784,26 +1785,97 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
17841785 context , plan = init_and_plan_context ("examples/sushi" )
17851786 context .apply (plan )
17861787
1788+ # modify 3 models
1789+ # - 2 breaking changes for testing plan directly modified triggers
1790+ # - 1 adding an auto-restatement for subsequent `run` test
1791+ marketing = context .get_model ("sushi.marketing" )
1792+ marketing_kwargs = {
1793+ ** marketing .dict (),
1794+ "query" : d .parse_one (
1795+ f"{ marketing .query .sql (dialect = 'duckdb' )} ORDER BY customer_id" , dialect = "duckdb"
1796+ ),
1797+ }
1798+ context .upsert_model (SqlModel .parse_obj (marketing_kwargs ))
1799+
1800+ customers = context .get_model ("sushi.customers" )
1801+ customers_kwargs = {
1802+ ** customers .dict (),
1803+ "query" : d .parse_one (
1804+ f"{ customers .query .sql (dialect = 'duckdb' )} ORDER BY customer_id" , dialect = "duckdb"
1805+ ),
1806+ }
1807+ context .upsert_model (SqlModel .parse_obj (customers_kwargs ))
1808+
17871809 # add auto restatement to orders
1788- model = context .get_model ("sushi.orders" )
1789- kind = {
1790- ** model .kind .dict (),
1810+ orders = context .get_model ("sushi.orders" )
1811+ orders_kind = {
1812+ ** orders .kind .dict (),
17911813 "auto_restatement_cron" : "@hourly" ,
17921814 }
1793- kwargs = {
1794- ** model .dict (),
1795- "kind" : kind ,
1815+ orders_kwargs = {
1816+ ** orders .dict (),
1817+ "kind" : orders_kind ,
17961818 }
1797- context .upsert_model (PythonModel .parse_obj (kwargs ))
1798- plan = context .plan_builder (skip_tests = True ).build ()
1799- context .apply (plan )
1819+ context .upsert_model (PythonModel .parse_obj (orders_kwargs ))
18001820
1801- # Mock run_merged_intervals to capture triggers arg
1802- scheduler = context .scheduler ()
1803- run_merged_intervals_mock = mocker .patch .object (
1804- scheduler , "run_merged_intervals" , return_value = ([], [])
1821+ spy = mocker .spy (sqlmesh .core .scheduler .Scheduler , "run_merged_intervals" )
1822+
1823+ context .plan (auto_apply = True , no_prompts = True , categorizer_config = CategorizerConfig .all_full ())
1824+
1825+ # PLAN: directly modified triggers
1826+ actual_triggers = spy .call_args .kwargs ["snapshot_evaluation_triggers" ]
1827+ actual_triggers_name = {
1828+ k .name : sorted ([s .name for s in v .directly_modified_triggers ])
1829+ for k , v in actual_triggers .items ()
1830+ if v .directly_modified_triggers
1831+ }
1832+ marketing_name = '"memory"."sushi"."marketing"'
1833+ customers_name = '"memory"."sushi"."customers"'
1834+ marketing_customers_names = sorted ([marketing_name , customers_name ])
1835+ children_names = [
1836+ f'"memory"."sushi"."{ model } "'
1837+ for model in {
1838+ "waiter_as_customer_by_day" ,
1839+ "active_customers" ,
1840+ "count_customers_active" ,
1841+ "count_customers_inactive" ,
1842+ }
1843+ ]
1844+ assert actual_triggers_name == {
1845+ marketing_name : [marketing_name ],
1846+ customers_name : [customers_name ],
1847+ ** {k : marketing_customers_names for k in children_names },
1848+ }
1849+
1850+ # PLAN: restatement triggers
1851+ spy .reset_mock ()
1852+ context .plan (
1853+ restate_models = [
1854+ '"memory"."sushi"."marketing"' ,
1855+ '"memory"."sushi"."order_items"' ,
1856+ '"memory"."sushi"."waiter_revenue_by_day"' ,
1857+ ],
1858+ auto_apply = True ,
1859+ no_prompts = True ,
18051860 )
18061861
1862+ order_items_name = '"memory"."sushi"."order_items"'
1863+ waiter_revenue_by_day_name = '"memory"."sushi"."waiter_revenue_by_day"'
1864+ actual_triggers = spy .call_args .kwargs ["snapshot_evaluation_triggers" ]
1865+ actual_triggers_name = {
1866+ k .name : sorted ([s .name for s in v .restatement_triggers ])
1867+ for k , v in actual_triggers .items ()
1868+ if v .restatement_triggers
1869+ }
1870+ assert actual_triggers_name == {
1871+ waiter_revenue_by_day_name : [waiter_revenue_by_day_name , order_items_name ],
1872+ order_items_name : [order_items_name ],
1873+ '"memory"."sushi"."top_waiters"' : [waiter_revenue_by_day_name ],
1874+ '"memory"."sushi"."customer_revenue_by_day"' : [order_items_name ],
1875+ '"memory"."sushi"."customer_revenue_lifetime"' : [order_items_name ],
1876+ }
1877+
1878+ # RUN: select and auto-restatement triggers
18071879 # User selects top_waiters and waiter_revenue_by_day, others added as auto-upstream
18081880 selected_models = {"top_waiters" , "waiter_revenue_by_day" }
18091881 selected_models_auto_upstream = {"order_items" , "orders" , "items" }
@@ -1814,6 +1886,11 @@ def test_snapshot_triggers(init_and_plan_context: t.Callable, mocker: MockerFixt
18141886 f'"memory"."sushi"."{ model } "' for model in selected_models
18151887 }
18161888
1889+ scheduler = context .scheduler ()
1890+ run_merged_intervals_mock = mocker .patch .object (
1891+ scheduler , "run_merged_intervals" , return_value = ([], [])
1892+ )
1893+
18171894 with time_machine .travel ("2023-01-09 00:00:01 UTC" ):
18181895 scheduler .run (
18191896 environment = c .PROD ,
0 commit comments