|
| 1 | +from __future__ import annotations |
| 2 | +import typing as t |
| 3 | +from rich.progress import Progress |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +if t.TYPE_CHECKING: |
| 7 | + # important to gate these to be able to defer importing sqlmesh until we need to |
| 8 | + from sqlmesh.core.context import Context |
| 9 | + from sqlmesh.dbt.project import Project |
| 10 | + from sqlmesh_dbt.console import DbtCliConsole |
| 11 | + |
| 12 | + |
| 13 | +class DbtOperations: |
| 14 | + def __init__(self, sqlmesh_context: Context, dbt_project: Project): |
| 15 | + self.context = sqlmesh_context |
| 16 | + self.project = dbt_project |
| 17 | + |
| 18 | + def list_(self) -> None: |
| 19 | + for _, model in self.context.models.items(): |
| 20 | + self.console.print(model.name) |
| 21 | + |
| 22 | + def run(self, select: t.Optional[str] = None, full_refresh: bool = False) -> None: |
| 23 | + # A dbt run both updates data and changes schemas and has no way of rolling back so more closely maps to a SQLMesh forward-only plan |
| 24 | + # TODO: if --full-refresh specified, mark incrementals as breaking instead of forward_only? |
| 25 | + |
| 26 | + # TODO: we need to either convert DBT selector syntax to SQLMesh selector syntax |
| 27 | + # or make the model selection engine configurable |
| 28 | + select_models = None |
| 29 | + if select: |
| 30 | + if "," in select: |
| 31 | + select_models = select.split(",") |
| 32 | + else: |
| 33 | + select_models = select.split(" ") |
| 34 | + |
| 35 | + self.context.plan( |
| 36 | + select_models=select_models, |
| 37 | + forward_only=True, |
| 38 | + no_auto_categorization=True, # everything is breaking / foward-only |
| 39 | + effective_from=self.context.config.model_defaults.start, |
| 40 | + run=True, |
| 41 | + auto_apply=True, |
| 42 | + ) |
| 43 | + |
| 44 | + @property |
| 45 | + def console(self) -> DbtCliConsole: |
| 46 | + console = self.context.console |
| 47 | + from sqlmesh_dbt.console import DbtCliConsole |
| 48 | + |
| 49 | + if not isinstance(console, DbtCliConsole): |
| 50 | + raise ValueError(f"Expecting dbt cli console, got: {console}") |
| 51 | + |
| 52 | + return console |
| 53 | + |
| 54 | + |
| 55 | +def create( |
| 56 | + project_dir: t.Optional[Path] = None, profiles_dir: t.Optional[Path] = None, debug: bool = False |
| 57 | +) -> DbtOperations: |
| 58 | + with Progress(transient=True) as progress: |
| 59 | + # Indeterminate progress bar before SQLMesh import to provide feedback to the user that something is indeed happening |
| 60 | + load_task_id = progress.add_task("Loading engine", total=None) |
| 61 | + |
| 62 | + from sqlmesh import configure_logging |
| 63 | + from sqlmesh.core.context import Context |
| 64 | + from sqlmesh.dbt.loader import sqlmesh_config, DbtLoader |
| 65 | + from sqlmesh.core.console import set_console |
| 66 | + from sqlmesh_dbt.console import DbtCliConsole |
| 67 | + from sqlmesh.utils.errors import SQLMeshError |
| 68 | + |
| 69 | + configure_logging(force_debug=debug) |
| 70 | + set_console(DbtCliConsole()) |
| 71 | + |
| 72 | + progress.update(load_task_id, description="Loading project", total=None) |
| 73 | + |
| 74 | + # inject default start date if one is not specified to prevent the user from having to do anything |
| 75 | + _inject_default_start_date(project_dir) |
| 76 | + |
| 77 | + config = sqlmesh_config( |
| 78 | + project_root=project_dir, |
| 79 | + # do we want to use a local duckdb for state? |
| 80 | + # warehouse state has a bunch of overhead to initialize, is slow for ongoing operations and will create tables that perhaps the user was not expecting |
| 81 | + # on the other hand, local state is not portable |
| 82 | + state_connection=None, |
| 83 | + ) |
| 84 | + |
| 85 | + sqlmesh_context = Context( |
| 86 | + config=config, |
| 87 | + load=True, |
| 88 | + ) |
| 89 | + |
| 90 | + # this helps things which want a default project-level start date, like the "effective from date" for forward-only plans |
| 91 | + if not sqlmesh_context.config.model_defaults.start: |
| 92 | + min_start_date = min( |
| 93 | + ( |
| 94 | + model.start |
| 95 | + for model in sqlmesh_context.models.values() |
| 96 | + if model.start is not None |
| 97 | + ), |
| 98 | + default=None, |
| 99 | + ) |
| 100 | + sqlmesh_context.config.model_defaults.start = min_start_date |
| 101 | + |
| 102 | + dbt_loader = sqlmesh_context._loaders[0] |
| 103 | + if not isinstance(dbt_loader, DbtLoader): |
| 104 | + raise SQLMeshError(f"Unexpected loader type: {type(dbt_loader)}") |
| 105 | + |
| 106 | + # so that DbtOperations can query information from the DBT project files in order to invoke SQLMesh correctly |
| 107 | + dbt_project = dbt_loader._projects[0] |
| 108 | + |
| 109 | + return DbtOperations(sqlmesh_context, dbt_project) |
| 110 | + |
| 111 | + |
| 112 | +def _inject_default_start_date(project_dir: t.Optional[Path] = None) -> None: |
| 113 | + """ |
| 114 | + SQLMesh needs a start date to as the starting point for calculating intervals on incremental models |
| 115 | +
|
| 116 | + Rather than forcing the user to update their config manually or having a default that is not saved between runs, |
| 117 | + we can inject it automatically to the dbt_project.yml file |
| 118 | + """ |
| 119 | + from sqlmesh.dbt.project import PROJECT_FILENAME, load_yaml |
| 120 | + from sqlmesh.utils.yaml import dump |
| 121 | + from sqlmesh.utils.date import yesterday_ds |
| 122 | + |
| 123 | + project_yaml_path = (project_dir or Path.cwd()) / PROJECT_FILENAME |
| 124 | + if project_yaml_path.exists(): |
| 125 | + loaded_project_file = load_yaml(project_yaml_path) |
| 126 | + start_date_keys = ("start", "+start") |
| 127 | + if "models" in loaded_project_file and all( |
| 128 | + k not in loaded_project_file["models"] for k in start_date_keys |
| 129 | + ): |
| 130 | + loaded_project_file["models"]["+start"] = yesterday_ds() |
| 131 | + # todo: this may format the file differently, is that acceptable? |
| 132 | + with project_yaml_path.open("w") as f: |
| 133 | + dump(loaded_project_file, f) |
0 commit comments