Skip to content

Commit 5315e2c

Browse files
Feat: Add dlt_refresh command to update models from a dlt pipeline (#3340)
1 parent 606527a commit 5315e2c

File tree

7 files changed

+188
-5
lines changed

7 files changed

+188
-5
lines changed

docs/integrations/dlt.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,28 @@ This will create the configuration file and directories, which are found in all
2828

2929
SQLMesh will also automatically generate models to ingest data from the pipeline incrementally. Incremental loading is ideal for large datasets where recomputing entire tables is resource-intensive. In this case utilizing the [`INCREMENTAL_BY_TIME_RANGE` model kind](../concepts/models/model_kinds.md#incremental_by_time_range). However, these model definitions can be customized to meet your specific project needs.
3030

31+
### Generating models on demand
32+
33+
To update the models in your SQLMesh project on demand, use the `dlt_refresh` command. This allows you to either specify individual tables to generate incremental models from or update all models at once.
34+
35+
- **Generate all missing tables**:
36+
37+
```bash
38+
$ sqlmesh dlt_refresh <pipeline-name>
39+
```
40+
41+
- **Generate all missing tables and overwrite existing ones** (use with `--force` or `-f`):
42+
43+
```bash
44+
$ sqlmesh dlt_refresh <pipeline-name> --force
45+
```
46+
47+
- **Generate specific dlt tables** (using `--table` or `-t`):
48+
49+
```bash
50+
$ sqlmesh dlt_refresh <pipeline-name> --table <dlt-table>
51+
```
52+
3153
#### Configuration
3254

3355
SQLMesh will retrieve the data warehouse connection credentials from your dlt project to configure the `config.yaml` file. This configuration can be modified or customized as needed. For more details, refer to the [configuration guide](../guides/configuration.md).

docs/reference/cli.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,18 @@ Options:
121121
--help Show this message and exit.
122122
```
123123

124+
## dlt_refresh
125+
126+
```
127+
Usage: dlt_refresh PIPELINE [OPTIONS]
128+
129+
Attaches to a DLT pipeline with the option to update specific or all models of the SQLMesh project.
130+
131+
Options:
132+
-t, --table TEXT The DLT tables to generate SQLMesh models from. When none specified, all new missing tables will be generated.
133+
-f, --force If set it will overwrite existing models with the new generated models from the DLT tables.
134+
```
135+
124136
## diff
125137

126138
```

docs/reference/notebook.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,17 @@ options:
230230
--file FILE, -f FILE An optional file path to write the HTML output to.
231231
```
232232

233+
#### dlt_refresh
234+
```
235+
%dlt_refresh PIPELINE [--table] TABLE [--force]
236+
237+
Attaches to a DLT pipeline with the option to update specific or all models of the SQLMesh project.
238+
239+
options:
240+
--table TABLE, -t TABLE The DLT tables to generate SQLMesh models from. When none specified, all new missing tables will be generated.
241+
--force, -f If set it will overwrite existing models with the new generated models from the DLT tables.
242+
```
243+
233244
#### fetchdf
234245
```
235246
%%fetchdf [df_var]

sqlmesh/cli/main.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -916,3 +916,39 @@ def clean(obj: Context) -> None:
916916
def table_name(obj: Context, model_name: str, dev: bool) -> None:
917917
"""Prints the name of the physical table for the given model."""
918918
print(obj.table_name(model_name, dev))
919+
920+
921+
@cli.command("dlt_refresh")
922+
@click.argument("pipeline", required=True)
923+
@click.option(
924+
"-t",
925+
"--table",
926+
type=str,
927+
multiple=True,
928+
help="The specific dlt tables to refresh in the SQLMesh models.",
929+
)
930+
@click.option(
931+
"-f",
932+
"--force",
933+
is_flag=True,
934+
default=False,
935+
help="If set, existing models are overwritten with the new DLT tables.",
936+
)
937+
@click.pass_context
938+
@error_handler
939+
@cli_analytics
940+
def dlt_refresh(
941+
ctx: click.Context,
942+
pipeline: str,
943+
force: bool,
944+
table: t.List[str] = [],
945+
) -> None:
946+
"""Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project."""
947+
from sqlmesh.integrations.dlt import generate_dlt_models
948+
949+
sqlmesh_models = generate_dlt_models(ctx.obj, pipeline, list(table or []), force)
950+
if sqlmesh_models:
951+
model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])
952+
ctx.obj.console.log_success(f"Updatde SQLMesh project with models:\n{model_names}")
953+
else:
954+
ctx.obj.console.log_success("All SQLMesh models are up to date.")

sqlmesh/integrations/dlt.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
from pydantic import ValidationError
55
from sqlglot import exp, parse_one
66
from sqlmesh.core.config.connection import parse_connection_config
7+
from sqlmesh.core.context import Context
78
from sqlmesh.utils.date import yesterday_ds
89

910

1011
def generate_dlt_models_and_settings(
11-
pipeline_name: str, dialect: str
12+
pipeline_name: str, dialect: str, tables: t.Optional[t.List[str]] = None
1213
) -> t.Tuple[t.Set[t.Tuple[str, str]], str, str]:
1314
"""This function attaches to a DLT pipeline and retrieves the connection configs and
1415
SQLMesh models based on the tables present in the pipeline's default schema.
@@ -42,8 +43,11 @@ def generate_dlt_models_and_settings(
4243
dlt_tables = {
4344
name: table
4445
for name, table in schema.tables.items()
45-
if (has_table_seen_data(table) and not name.startswith(schema._dlt_tables_prefix))
46-
or name == schema.loads_table_name
46+
if (
47+
(has_table_seen_data(table) and not name.startswith(schema._dlt_tables_prefix))
48+
or name == schema.loads_table_name
49+
)
50+
and (name in tables if tables else True)
4751
}
4852

4953
sqlmesh_models = set()
@@ -85,6 +89,27 @@ def generate_dlt_models_and_settings(
8589
return sqlmesh_models, format_config(configs, db_type), get_start_date(storage_ids)
8690

8791

92+
def generate_dlt_models(
93+
context: Context, pipeline_name: str, tables: t.List[str], force: bool
94+
) -> t.List[str]:
95+
from sqlmesh.cli.example_project import _create_models
96+
97+
sqlmesh_models, _, _ = generate_dlt_models_and_settings(
98+
pipeline_name=pipeline_name,
99+
dialect=context.config.dialect or "",
100+
tables=tables if tables else None,
101+
)
102+
103+
if not tables and not force:
104+
existing_models = [m.name for m in context.models.values()]
105+
sqlmesh_models = {model for model in sqlmesh_models if model[0] not in existing_models}
106+
107+
if sqlmesh_models:
108+
_create_models(models_path=context.path / "models", models=sqlmesh_models)
109+
return [model[0] for model in sqlmesh_models]
110+
return []
111+
112+
88113
def generate_incremental_model(
89114
model_name: str,
90115
model_def_columns: str,

sqlmesh/magics.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,42 @@ def table_name(self, context: Context, line: str) -> None:
690690
args = parse_argstring(self.table_name, line)
691691
context.console.log_status_update(context.table_name(args.model_name, args.dev))
692692

693+
@magic_arguments()
694+
@argument(
695+
"pipeline",
696+
nargs="?",
697+
type=str,
698+
help="The dlt pipeline to attach for this SQLMesh project.",
699+
)
700+
@argument(
701+
"--table",
702+
"-t",
703+
type=str,
704+
nargs="*",
705+
help="The specific dlt tables to refresh in the SQLMesh models.",
706+
)
707+
@argument(
708+
"--force",
709+
"-f",
710+
action="store_true",
711+
help="If set, existing models are overwritten with the new DLT tables.",
712+
)
713+
@line_magic
714+
@pass_sqlmesh_context
715+
def dlt_refresh(self, context: Context, line: str) -> None:
716+
"""Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project."""
717+
from sqlmesh.integrations.dlt import generate_dlt_models
718+
719+
args = parse_argstring(self.dlt_refresh, line)
720+
sqlmesh_models = generate_dlt_models(
721+
context, args.pipeline, list(args.table or []), args.force
722+
)
723+
if sqlmesh_models:
724+
model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])
725+
context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}")
726+
else:
727+
context.console.log_success("All SQLMesh models are up to date.")
728+
693729
@magic_arguments()
694730
@argument(
695731
"--read",

tests/cli/test_cli.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from sqlmesh.cli.example_project import ProjectTemplate, init_example_project
1111
from sqlmesh.cli.main import cli
1212
from sqlmesh.core.context import Context
13+
from sqlmesh.integrations.dlt import generate_dlt_models
1314
from sqlmesh.utils.date import yesterday_ds
1415

1516
FREEZE_TIME = "2023-01-01 00:00:00"
@@ -731,7 +732,11 @@ def test_plan_dlt(runner, tmp_path):
731732
TO_TIMESTAMP(CAST(_dlt_load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
732733
"""
733734

734-
with open(tmp_path / "models/incremental_sushi_types.sql") as file:
735+
dlt_sushi_types_model_path = tmp_path / "models/incremental_sushi_types.sql"
736+
dlt_loads_model_path = tmp_path / "models/incremental__dlt_loads.sql"
737+
dlt_waiters_model_path = tmp_path / "models/incremental_waiters.sql"
738+
739+
with open(dlt_sushi_types_model_path) as file:
735740
incremental_model = file.read()
736741

737742
expected_dlt_loads_model = """MODEL (
@@ -762,11 +767,14 @@ def test_plan_dlt(runner, tmp_path):
762767
TO_TIMESTAMP(CAST(load_id AS DOUBLE)) BETWEEN @start_ds AND @end_ds
763768
"""
764769

765-
with open(tmp_path / "models/incremental__dlt_loads.sql") as file:
770+
with open(dlt_loads_model_path) as file:
766771
dlt_loads_model = file.read()
767772

768773
# Validate generated config and models
769774
assert config == expected_config
775+
assert dlt_loads_model_path.exists()
776+
assert dlt_sushi_types_model_path.exists()
777+
assert dlt_waiters_model_path.exists()
770778
assert dlt_loads_model == expected_dlt_loads_model
771779
assert incremental_model == expected_incremental_model
772780

@@ -777,4 +785,37 @@ def test_plan_dlt(runner, tmp_path):
777785

778786
assert result.exit_code == 0
779787
assert_backfill_success(result)
788+
789+
# Remove and update with missing model
790+
remove(dlt_waiters_model_path)
791+
assert not dlt_waiters_model_path.exists()
792+
793+
# Update with force = False will generate only the missing model
794+
context = Context(paths=tmp_path)
795+
assert generate_dlt_models(context, "sushi", [], False) == [
796+
"sushi_dataset_sqlmesh.incremental_waiters"
797+
]
798+
assert dlt_waiters_model_path.exists()
799+
800+
# Remove all models
801+
remove(dlt_waiters_model_path)
802+
remove(dlt_loads_model_path)
803+
remove(dlt_sushi_types_model_path)
804+
805+
# Update to generate a specific model: sushi_types
806+
assert generate_dlt_models(context, "sushi", ["sushi_types"], False) == [
807+
"sushi_dataset_sqlmesh.incremental_sushi_types"
808+
]
809+
810+
# Only the sushi_types should be generated now
811+
assert not dlt_waiters_model_path.exists()
812+
assert not dlt_loads_model_path.exists()
813+
assert dlt_sushi_types_model_path.exists()
814+
815+
# Update with force = True will generate all models and overwrite existing ones
816+
generate_dlt_models(context, "sushi", [], True)
817+
assert dlt_loads_model_path.exists()
818+
assert dlt_sushi_types_model_path.exists()
819+
assert dlt_waiters_model_path.exists()
820+
780821
remove(dataset_path)

0 commit comments

Comments
 (0)