Skip to content

Commit 6d36a11

Browse files
authored
Feat(dbt_cli): Set proper plan flags and also allow --empty and --environment (#5226)
1 parent 59d44eb commit 6d36a11

File tree

6 files changed

+293
-20
lines changed

6 files changed

+293
-20
lines changed

sqlmesh/cli/project_init.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,13 @@ def _gen_config(
121121
# https://sqlmesh.readthedocs.io/en/stable/guides/configuration/#virtual-data-environment-modes
122122
virtual_environment_mode: {VirtualEnvironmentMode.DEV_ONLY.lower()}
123123
124+
# --- Plan Defaults ---
125+
# https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#plan
126+
plan:
127+
# For Virtual Data Environments, this ensures that any changes are always considered against prod,
128+
# rather than the previous state of that environment
129+
always_recreate_environment: True
130+
124131
# --- Model Defaults ---
125132
# https://sqlmesh.readthedocs.io/en/stable/reference/model_configuration/#model-defaults
126133
model_defaults:

sqlmesh/core/context.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1518,8 +1518,11 @@ def plan_builder(
15181518
include_unmodified = self.config.plan.include_unmodified
15191519

15201520
if skip_backfill and not no_gaps and not is_dev:
1521-
raise ConfigError(
1522-
"When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
1521+
# note: we deliberately don't mention the --no-gaps flag in case the plan came from the sqlmesh_dbt command
1522+
# todo: perhaps we could have better error messages if we check sys.argv[0] for which cli is running?
1523+
self.console.log_warning(
1524+
"Skipping the backfill stage for production can lead to unexpected results, such as tables being empty or incremental data with non-contiguous time ranges being made available.\n"
1525+
"If you are doing this deliberately to create an empty version of a table to test a change, please consider using Virtual Data Environments instead."
15231526
)
15241527

15251528
if not skip_linter:

sqlmesh_dbt/cli.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,24 @@ def dbt(
9292
"--full-refresh",
9393
help="If specified, dbt will drop incremental models and fully-recalculate the incremental table from the model definition.",
9494
)
95+
@click.option(
96+
"--env",
97+
"--environment",
98+
help="Run against a specific Virtual Data Environment (VDE) instead of the main environment",
99+
)
100+
@click.option(
101+
"--empty/--no-empty", default=False, help="If specified, limit input refs and sources"
102+
)
95103
@vars_option
96104
@click.pass_context
97-
def run(ctx: click.Context, vars: t.Optional[t.Dict[str, t.Any]], **kwargs: t.Any) -> None:
105+
def run(
106+
ctx: click.Context,
107+
vars: t.Optional[t.Dict[str, t.Any]],
108+
env: t.Optional[str] = None,
109+
**kwargs: t.Any,
110+
) -> None:
98111
"""Compile SQL and execute against the current target database."""
99-
_get_dbt_operations(ctx, vars).run(**kwargs)
112+
_get_dbt_operations(ctx, vars).run(environment=env, **kwargs)
100113

101114

102115
@dbt.command(name="list")

sqlmesh_dbt/operations.py

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from sqlmesh.dbt.project import Project
1212
from sqlmesh_dbt.console import DbtCliConsole
1313
from sqlmesh.core.model import Model
14+
from sqlmesh.core.plan import Plan
1415

1516
logger = logging.getLogger(__name__)
1617

@@ -35,21 +36,20 @@ def list_(
3536

3637
def run(
3738
self,
39+
environment: t.Optional[str] = None,
3840
select: t.Optional[t.List[str]] = None,
3941
exclude: t.Optional[t.List[str]] = None,
4042
full_refresh: bool = False,
41-
) -> None:
42-
select_models = None
43-
44-
if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []):
45-
select_models = [sqlmesh_selector]
46-
47-
self.context.plan(
48-
select_models=select_models,
49-
run=True,
50-
no_diff=True,
51-
no_prompts=True,
52-
auto_apply=True,
43+
empty: bool = False,
44+
) -> Plan:
45+
return self.context.plan(
46+
**self._plan_options(
47+
environment=environment,
48+
select=select,
49+
exclude=exclude,
50+
full_refresh=full_refresh,
51+
empty=empty,
52+
)
5353
)
5454

5555
def _selected_models(
@@ -71,6 +71,86 @@ def _selected_models(
7171

7272
return selected_models
7373

74+
def _plan_options(
75+
self,
76+
environment: t.Optional[str] = None,
77+
select: t.Optional[t.List[str]] = None,
78+
exclude: t.Optional[t.List[str]] = None,
79+
empty: bool = False,
80+
full_refresh: bool = False,
81+
) -> t.Dict[str, t.Any]:
82+
import sqlmesh.core.constants as c
83+
84+
# convert --select and --exclude to a selector expression for the SQLMesh selector engine
85+
select_models = None
86+
if sqlmesh_selector := selectors.to_sqlmesh(select or [], exclude or []):
87+
select_models = [sqlmesh_selector]
88+
89+
is_dev = environment and environment != c.PROD
90+
is_prod = not is_dev
91+
92+
options: t.Dict[str, t.Any] = {}
93+
94+
if is_prod or (is_dev and select_models):
95+
# prod plans should "catch up" before applying the changes so that after the command finishes prod is the latest it can be
96+
# dev plans *with* selectors should do the same as the user is saying "specifically update these models to the latest"
97+
# dev plans *without* selectors should just have the defaults of never exceeding prod as the user is saying "just create this env" without focusing on any specific models
98+
options.update(
99+
dict(
100+
# always catch the data up to latest rather than only operating on what has been loaded before
101+
run=True,
102+
# don't taking cron schedules into account when deciding what models to run, do everything even if it just ran
103+
ignore_cron=True,
104+
)
105+
)
106+
107+
if is_dev:
108+
options.update(
109+
dict(
110+
# don't create views for all of prod in the dev environment
111+
include_unmodified=False,
112+
# always plan from scratch against prod. note that this is coupled with the `always_recreate_environment=True` setting in the default config file.
113+
# the result is that rather than planning against the previous state of an existing dev environment, the full scope of changes vs prod are always shown
114+
create_from=c.PROD,
115+
# Always enable dev previews for incremental / forward-only models.
116+
# Due to how DBT does incrementals (INCREMENTAL_UNMANAGED on the SQLMesh engine), this will result in the full model being refreshed
117+
# with the entire dataset, which can potentially be very large. If this is undesirable, users have two options:
118+
# - work around this using jinja to conditionally add extra filters to the WHERE clause or a LIMIT to the model query
119+
# - upgrade to SQLMesh's incremental models, where we have variables for the start/end date and inject leak guards to
120+
# limit the amount of data backfilled
121+
#
122+
# Note: enable_preview=True is *different* behaviour to the `sqlmesh` CLI, which uses enable_preview=None.
123+
# This means the `sqlmesh` CLI will only enable dev previews for dbt projects if the target adapter supports cloning,
124+
# whereas we enable it unconditionally here
125+
enable_preview=True,
126+
)
127+
)
128+
129+
if empty:
130+
# `dbt --empty` adds LIMIT 0 to the queries, resulting in empty tables. In addition, it happily clobbers existing tables regardless of if they are populated.
131+
# This *partially* lines up with --skip-backfill in SQLMesh, which indicates to not populate tables if they happened to be created/updated as part of this plan.
132+
# However, if a table already exists and has data in it, there is no change so SQLMesh will not recreate the table and thus it will not be cleared.
133+
# So in order to fully replicate dbt's --empty, we also need --full-refresh semantics in order to replace existing tables
134+
options["skip_backfill"] = True
135+
full_refresh = True
136+
137+
if full_refresh:
138+
# TODO: handling this requires some updates in the engine to enable restatements+changes in the same plan without affecting prod
139+
# if the plan targets dev
140+
pass
141+
142+
return dict(
143+
environment=environment,
144+
select_models=select_models,
145+
# dont output a diff of model changes
146+
no_diff=True,
147+
# don't throw up any prompts like "set the effective date" - use defaults
148+
no_prompts=True,
149+
# start doing work immediately (since no_diff is set, there isnt really anything for the user to say yes/no to)
150+
auto_apply=True,
151+
**options,
152+
)
153+
74154
@property
75155
def console(self) -> DbtCliConsole:
76156
console = self.context.console
@@ -103,6 +183,12 @@ def create(
103183
from sqlmesh_dbt.console import DbtCliConsole
104184
from sqlmesh.utils.errors import SQLMeshError
105185

186+
# clear any existing handlers set up by click/rich as defaults so that once SQLMesh logging config is applied,
187+
# we dont get duplicate messages logged from things like console.log_warning()
188+
root_logger = logging.getLogger()
189+
while root_logger.hasHandlers():
190+
root_logger.removeHandler(root_logger.handlers[0])
191+
106192
configure_logging(force_debug=debug)
107193
set_console(DbtCliConsole())
108194

tests/cli/test_cli.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,7 @@ def test_plan_skip_backfill(runner, tmp_path, flag):
260260
# plan for `prod` errors if `--skip-backfill` is passed without --no-gaps
261261
result = runner.invoke(cli, ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", flag])
262262
assert result.exit_code == 1
263-
assert (
264-
"Error: When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
265-
in result.output
266-
)
263+
assert "Skipping the backfill stage for production can lead to unexpected" in result.output
267264

268265
# plan executes virtual update without executing model batches
269266
# Input: `y` to perform virtual update

tests/dbt/cli/test_operations.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,36 @@
1+
import typing as t
12
from pathlib import Path
23
import pytest
34
from sqlmesh_dbt.operations import create
45
from sqlmesh.utils import yaml
56
from sqlmesh.utils.errors import SQLMeshError
67
import time_machine
8+
from sqlmesh.core.console import NoopConsole
9+
from sqlmesh.core.plan import PlanBuilder
10+
from sqlmesh.core.config.common import VirtualEnvironmentMode
711

812
pytestmark = pytest.mark.slow
913

1014

15+
class PlanCapturingConsole(NoopConsole):
16+
def plan(
17+
self,
18+
plan_builder: PlanBuilder,
19+
auto_apply: bool,
20+
default_catalog: t.Optional[str],
21+
no_diff: bool = False,
22+
no_prompts: bool = False,
23+
) -> None:
24+
self.plan_builder = plan_builder
25+
self.auto_apply = auto_apply
26+
self.default_catalog = default_catalog
27+
self.no_diff = no_diff
28+
self.no_prompts = no_prompts
29+
30+
# normal console starts applying the plan here; we dont because we just want to capture the parameters
31+
# and check they were set correctly
32+
33+
1134
def test_create_sets_and_persists_default_start_date(jaffle_shop_duckdb: Path):
1235
with time_machine.travel("2020-01-02 00:00:00 UTC"):
1336
from sqlmesh.utils.date import yesterday_ds, to_ds
@@ -71,6 +94,18 @@ def test_create_can_specify_profile_and_target(jaffle_shop_duckdb: Path):
7194
assert dbt_project.context.target_name == "dev"
7295

7396

97+
def test_default_options(jaffle_shop_duckdb: Path):
98+
operations = create()
99+
100+
config = operations.context.config
101+
dbt_project = operations.project
102+
103+
assert config.plan.always_recreate_environment is True
104+
assert config.virtual_environment_mode == VirtualEnvironmentMode.DEV_ONLY
105+
assert config.model_defaults.start is not None
106+
assert config.model_defaults.dialect == dbt_project.context.target.dialect
107+
108+
74109
def test_create_can_set_project_variables(jaffle_shop_duckdb: Path):
75110
(jaffle_shop_duckdb / "models" / "test_model.sql").write_text("""
76111
select '{{ var('foo') }}' as a
@@ -83,3 +118,135 @@ def test_create_can_set_project_variables(jaffle_shop_duckdb: Path):
83118
query = test_model.render_query()
84119
assert query is not None
85120
assert query.sql() == "SELECT 'bar' AS \"a\""
121+
122+
123+
def test_run_option_mapping(jaffle_shop_duckdb: Path):
124+
operations = create(project_dir=jaffle_shop_duckdb)
125+
console = PlanCapturingConsole()
126+
operations.context.console = console
127+
128+
plan = operations.run()
129+
assert plan.environment.name == "prod"
130+
assert console.no_prompts is True
131+
assert console.no_diff is True
132+
assert console.auto_apply is True
133+
assert plan.end_bounded is False
134+
assert plan.ignore_cron is True
135+
assert plan.skip_backfill is False
136+
assert plan.selected_models_to_backfill is None
137+
assert {s.name for s in plan.snapshots} == {k for k in operations.context.snapshots}
138+
139+
plan = operations.run(select=["main.stg_orders+"])
140+
assert plan.environment.name == "prod"
141+
assert console.no_prompts is True
142+
assert console.no_diff is True
143+
assert console.auto_apply is True
144+
assert plan.end_bounded is False
145+
assert plan.ignore_cron is True
146+
assert plan.skip_backfill is False
147+
assert plan.selected_models_to_backfill == {
148+
'"jaffle_shop"."main"."customers"',
149+
'"jaffle_shop"."main"."orders"',
150+
'"jaffle_shop"."main"."stg_orders"',
151+
}
152+
assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill
153+
154+
plan = operations.run(select=["main.stg_orders+"], exclude=["main.customers"])
155+
assert plan.environment.name == "prod"
156+
assert console.no_prompts is True
157+
assert console.no_diff is True
158+
assert console.auto_apply is True
159+
assert plan.end_bounded is False
160+
assert plan.ignore_cron is True
161+
assert plan.skip_backfill is False
162+
assert plan.selected_models_to_backfill == {
163+
'"jaffle_shop"."main"."orders"',
164+
'"jaffle_shop"."main"."stg_orders"',
165+
}
166+
assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill
167+
168+
plan = operations.run(exclude=["main.customers"])
169+
assert plan.environment.name == "prod"
170+
assert console.no_prompts is True
171+
assert console.no_diff is True
172+
assert console.auto_apply is True
173+
assert plan.end_bounded is False
174+
assert plan.ignore_cron is True
175+
assert plan.skip_backfill is False
176+
assert plan.selected_models_to_backfill == {k for k in operations.context.snapshots} - {
177+
'"jaffle_shop"."main"."customers"'
178+
}
179+
assert {s.name for s in plan.snapshots} == plan.selected_models_to_backfill
180+
181+
plan = operations.run(empty=True)
182+
assert plan.environment.name == "prod"
183+
assert console.no_prompts is True
184+
assert console.no_diff is True
185+
assert console.auto_apply is True
186+
assert plan.end_bounded is False
187+
assert plan.ignore_cron is True
188+
assert plan.skip_backfill is True
189+
assert plan.selected_models_to_backfill is None
190+
assert {s.name for s in plan.snapshots} == {k for k in operations.context.snapshots}
191+
192+
193+
def test_run_option_mapping_dev(jaffle_shop_duckdb: Path):
194+
# create prod so that dev has something to compare against
195+
operations = create(project_dir=jaffle_shop_duckdb)
196+
operations.run()
197+
198+
(jaffle_shop_duckdb / "models" / "new_model.sql").write_text("select 1")
199+
200+
operations = create(project_dir=jaffle_shop_duckdb)
201+
202+
console = PlanCapturingConsole()
203+
operations.context.console = console
204+
205+
plan = operations.run(environment="dev")
206+
assert plan.environment.name == "dev"
207+
assert console.no_prompts is True
208+
assert console.no_diff is True
209+
assert console.auto_apply is True
210+
assert plan.include_unmodified is False
211+
assert plan.context_diff.create_from == "prod"
212+
assert plan.context_diff.is_new_environment is True
213+
assert console.plan_builder._enable_preview is True
214+
assert plan.end_bounded is True
215+
assert plan.ignore_cron is False
216+
assert plan.skip_backfill is False
217+
assert plan.selected_models_to_backfill == {'"jaffle_shop"."main"."new_model"'}
218+
219+
plan = operations.run(environment="dev", empty=True)
220+
assert plan.environment.name == "dev"
221+
assert console.no_prompts is True
222+
assert console.no_diff is True
223+
assert console.auto_apply is True
224+
assert plan.include_unmodified is False
225+
assert plan.context_diff.create_from == "prod"
226+
assert plan.context_diff.is_new_environment is True
227+
assert console.plan_builder._enable_preview is True
228+
assert plan.end_bounded is True
229+
assert plan.ignore_cron is False
230+
assert plan.skip_backfill is True
231+
assert plan.selected_models_to_backfill == {'"jaffle_shop"."main"."new_model"'}
232+
233+
plan = operations.run(environment="dev", select=["main.stg_orders+"])
234+
assert plan.environment.name == "dev"
235+
assert console.no_prompts is True
236+
assert console.no_diff is True
237+
assert console.auto_apply is True
238+
assert plan.include_unmodified is False
239+
assert plan.context_diff.create_from == "prod"
240+
assert plan.context_diff.is_new_environment is True
241+
assert console.plan_builder._enable_preview is True
242+
# dev plans with --select have run=True, ignore_cron=True set
243+
# as opposed to dev plans that dont have a specific selector
244+
assert plan.end_bounded is False
245+
assert plan.ignore_cron is True
246+
assert plan.skip_backfill is False
247+
# note: the new model in the dev environment is ignored in favour of the explicitly selected ones
248+
assert plan.selected_models_to_backfill == {
249+
'"jaffle_shop"."main"."customers"',
250+
'"jaffle_shop"."main"."orders"',
251+
'"jaffle_shop"."main"."stg_orders"',
252+
}

0 commit comments

Comments
 (0)