@@ -447,3 +447,135 @@ def test_load_deprecated_incremental_time_column(
447447 "Using `time_column` on a model with incremental_strategy 'delete+insert' has been deprecated. Please use `incremental_by_time_range` instead in model 'main.incremental_time_range'."
448448 in caplog .text
449449 )
450+
451+
452+ @pytest .mark .slow
453+ def test_load_microbatch_with_ref (
454+ tmp_path : Path , caplog , dbt_dummy_postgres_config : PostgresConfig , create_empty_project
455+ ) -> None :
456+ yaml = YAML ()
457+ project_dir , model_dir = create_empty_project ()
458+ source_schema = {
459+ "version" : 2 ,
460+ "sources" : [
461+ {
462+ "name" : "my_source" ,
463+ "tables" : [{"name" : "my_table" , "config" : {"event_time" : "ds_source" }}],
464+ }
465+ ],
466+ }
467+ source_schema_file = model_dir / "source_schema.yml"
468+ with open (source_schema_file , "w" , encoding = "utf-8" ) as f :
469+ yaml .dump (source_schema , f )
470+ # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
471+ microbatch_contents = """
472+ {{
473+ config(
474+ materialized='incremental',
475+ incremental_strategy='microbatch',
476+ event_time='ds',
477+ begin='2020-01-01',
478+ batch_size='day'
479+ )
480+ }}
481+
482+ SELECT cola, ds_source as ds FROM {{ source('my_source', 'my_table') }}
483+ """
484+ microbatch_model_file = model_dir / "microbatch.sql"
485+ with open (microbatch_model_file , "w" , encoding = "utf-8" ) as f :
486+ f .write (microbatch_contents )
487+
488+ microbatch_two_contents = """
489+ {{
490+ config(
491+ materialized='incremental',
492+ incremental_strategy='microbatch',
493+ event_time='ds',
494+ begin='2020-01-05',
495+ batch_size='day'
496+ )
497+ }}
498+
499+ SELECT cola, ds FROM {{ ref('microbatch') }}
500+ """
501+ microbatch_two_model_file = model_dir / "microbatch_two.sql"
502+ with open (microbatch_two_model_file , "w" , encoding = "utf-8" ) as f :
503+ f .write (microbatch_two_contents )
504+
505+ microbatch_snapshot_fqn = '"local"."main"."microbatch"'
506+ microbatch_two_snapshot_fqn = '"local"."main"."microbatch_two"'
507+ context = Context (paths = project_dir )
508+ assert (
509+ context .render (microbatch_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
510+ == 'SELECT "cola" AS "cola", "ds_source" AS "ds" FROM (SELECT * FROM "local"."my_source"."my_table" AS "my_table" WHERE "ds_source" >= \' 2025-01-01 00:00:00+00:00\' AND "ds_source" < \' 2025-01-11 00:00:00+00:00\' ) AS "_q_0"'
511+ )
512+ assert (
513+ context .render (microbatch_two_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
514+ == 'SELECT "_q_0"."cola" AS "cola", "_q_0"."ds" AS "ds" FROM (SELECT "microbatch"."cola" AS "cola", "microbatch"."ds" AS "ds" FROM "local"."main"."microbatch" AS "microbatch" WHERE "microbatch"."ds" < \' 2025-01-11 00:00:00+00:00\' AND "microbatch"."ds" >= \' 2025-01-01 00:00:00+00:00\' ) AS "_q_0"'
515+ )
516+
517+
518+ @pytest .mark .slow
519+ def test_load_microbatch_with_ref_no_filter (
520+ tmp_path : Path , caplog , dbt_dummy_postgres_config : PostgresConfig , create_empty_project
521+ ) -> None :
522+ yaml = YAML ()
523+ project_dir , model_dir = create_empty_project ()
524+ source_schema = {
525+ "version" : 2 ,
526+ "sources" : [
527+ {
528+ "name" : "my_source" ,
529+ "tables" : [{"name" : "my_table" , "config" : {"event_time" : "ds" }}],
530+ }
531+ ],
532+ }
533+ source_schema_file = model_dir / "source_schema.yml"
534+ with open (source_schema_file , "w" , encoding = "utf-8" ) as f :
535+ yaml .dump (source_schema , f )
536+ # add `tests` to model config since this is loaded by dbt and ignored and we shouldn't error when loading it
537+ microbatch_contents = """
538+ {{
539+ config(
540+ materialized='incremental',
541+ incremental_strategy='microbatch',
542+ event_time='ds',
543+ begin='2020-01-01',
544+ batch_size='day'
545+ )
546+ }}
547+
548+ SELECT cola, ds FROM {{ source('my_source', 'my_table').render() }}
549+ """
550+ microbatch_model_file = model_dir / "microbatch.sql"
551+ with open (microbatch_model_file , "w" , encoding = "utf-8" ) as f :
552+ f .write (microbatch_contents )
553+
554+ microbatch_two_contents = """
555+ {{
556+ config(
557+ materialized='incremental',
558+ incremental_strategy='microbatch',
559+ event_time='ds',
560+ begin='2020-01-01',
561+ batch_size='day'
562+ )
563+ }}
564+
565+ SELECT cola, ds FROM {{ ref('microbatch').render() }}
566+ """
567+ microbatch_two_model_file = model_dir / "microbatch_two.sql"
568+ with open (microbatch_two_model_file , "w" , encoding = "utf-8" ) as f :
569+ f .write (microbatch_two_contents )
570+
571+ microbatch_snapshot_fqn = '"local"."main"."microbatch"'
572+ microbatch_two_snapshot_fqn = '"local"."main"."microbatch_two"'
573+ context = Context (paths = project_dir )
574+ assert (
575+ context .render (microbatch_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
576+ == 'SELECT "cola" AS "cola", "ds" AS "ds" FROM "local"."my_source"."my_table" AS "my_table"'
577+ )
578+ assert (
579+ context .render (microbatch_two_snapshot_fqn , start = "2025-01-01" , end = "2025-01-10" ).sql ()
580+ == 'SELECT "microbatch"."cola" AS "cola", "microbatch"."ds" AS "ds" FROM "local"."main"."microbatch" AS "microbatch"'
581+ )
0 commit comments