@@ -671,52 +671,53 @@ def test_grants_plan_full_refresh_model_via_replace(
671671 assert dev_grants == {"SELECT" : [roles ["reader" ]["username" ]]}
672672
673673
674- def test_grants_plan_incremental_model_first_insert (
674+ def test_grants_plan_incremental_model (
675675 engine_adapter : PostgresEngineAdapter , ctx : TestContext , tmp_path : Path
676676):
677- with create_users (engine_adapter , "reader" ) as roles :
678- # Create an incremental model with grants
677+ with create_users (engine_adapter , "reader" , "writer" ) as roles :
679678 (tmp_path / "models" ).mkdir (exist_ok = True )
680- (tmp_path / "models" / "incremental_model.sql" ).write_text (
681- f"""
682- MODEL (
683- name test_schema.incremental_model,
684- kind INCREMENTAL_BY_TIME_RANGE (
685- time_column ts
686- ),
687- grants (
688- 'SELECT' = ['{ roles ["reader" ]["username" ]} ']
689- ),
690- grants_target_layer 'all'
691- );
692679
693- SELECT 1 as id, @start_ds::timestamp as ts, 'data' as value
694- """
695- )
680+ model_name = "incr_model"
681+ model_definition = f"""
682+ MODEL (
683+ name test_schema.{ model_name } ,
684+ kind INCREMENTAL_BY_TIME_RANGE (
685+ time_column ts
686+ ),
687+ grants (
688+ 'SELECT' = ['{ roles ["reader" ]["username" ]} '],
689+ 'INSERT' = ['{ roles ["writer" ]["username" ]} ']
690+ ),
691+ grants_target_layer 'all'
692+ );
693+ SELECT 1 as id, @start_ds::timestamp as ts, 'data' as value
694+ """
695+
696+ (tmp_path / "models" / f"{ model_name } .sql" ).write_text (model_definition )
696697
697698 context = ctx .create_context (path = tmp_path )
698699
699- # First run - this will create the table via _replace_query_for_model
700+ # First plan
700701 plan_result = context .plan (
701702 "dev" , start = "2020-01-01" , end = "2020-01-01" , auto_apply = True , no_prompts = True
702703 )
703-
704704 assert len (plan_result .new_snapshots ) == 1
705+
705706 snapshot = plan_result .new_snapshots [0 ]
706707 table_name = snapshot .table_name ()
707708
708- # Physical table
709709 physical_grants = engine_adapter ._get_current_grants_config (
710710 exp .to_table (table_name , dialect = engine_adapter .dialect )
711711 )
712- assert physical_grants == {"SELECT" : [roles ["reader" ]["username" ]]}
712+ assert physical_grants .get ("SELECT" , []) == [roles ["reader" ]["username" ]]
713+ assert physical_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
713714
714- # Virtual view
715- dev_view_name = "test_schema__dev.incremental_model"
715+ view_name = f"test_schema__dev.{ model_name } "
716716 view_grants = engine_adapter ._get_current_grants_config (
717- exp .to_table (dev_view_name , dialect = engine_adapter .dialect )
717+ exp .to_table (view_name , dialect = engine_adapter .dialect )
718718 )
719- assert view_grants == physical_grants
719+ assert view_grants .get ("SELECT" , []) == [roles ["reader" ]["username" ]]
720+ assert view_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
720721
721722
722723def test_grants_plan_clone_environment (
@@ -1191,3 +1192,194 @@ def test_grants_target_layer_plan_env_with_vde_dev_only(
11911192 exp .to_table (physical_table_name , dialect = engine_adapter .dialect )
11921193 )
11931194 assert roles ["grantee" ]["username" ] in physical_grants .get ("SELECT" , [])
1195+
1196+
1197+ @pytest .mark .parametrize (
1198+ "model_kind" ,
1199+ [
1200+ "SCD_TYPE_2" ,
1201+ "SCD_TYPE_2_BY_TIME" ,
1202+ ],
1203+ )
1204+ def test_grants_plan_scd_type_2_models (
1205+ engine_adapter : PostgresEngineAdapter ,
1206+ ctx : TestContext ,
1207+ tmp_path : Path ,
1208+ model_kind : str ,
1209+ ):
1210+ with create_users (engine_adapter , "reader" , "writer" , "analyst" ) as roles :
1211+ (tmp_path / "models" ).mkdir (exist_ok = True )
1212+ model_name = "scd_model"
1213+
1214+ kind_config = f"{ model_kind } (unique_key [id])"
1215+ model_definition = f"""
1216+ MODEL (
1217+ name test_schema.{ model_name } ,
1218+ kind { kind_config } ,
1219+ grants (
1220+ 'SELECT' = ['{ roles ["reader" ]["username" ]} '],
1221+ 'INSERT' = ['{ roles ["writer" ]["username" ]} ']
1222+ ),
1223+ grants_target_layer 'all'
1224+ );
1225+ SELECT 1 as id, 'initial_data' as name, CURRENT_TIMESTAMP as updated_at
1226+ """
1227+ (tmp_path / "models" / f"{ model_name } .sql" ).write_text (model_definition )
1228+
1229+ context = ctx .create_context (path = tmp_path )
1230+ plan_result = context .plan (
1231+ "dev" , start = "2023-01-01" , end = "2023-01-01" , auto_apply = True , no_prompts = True
1232+ )
1233+ assert len (plan_result .new_snapshots ) == 1
1234+
1235+ current_snapshot = plan_result .new_snapshots [0 ]
1236+ fingerprint_version = current_snapshot .fingerprint .to_version ()
1237+ physical_table_name = (
1238+ f"sqlmesh__test_schema.test_schema__{ model_name } __{ fingerprint_version } __dev"
1239+ )
1240+ physical_grants = engine_adapter ._get_current_grants_config (
1241+ exp .to_table (physical_table_name , dialect = engine_adapter .dialect )
1242+ )
1243+ assert physical_grants .get ("SELECT" , []) == [roles ["reader" ]["username" ]]
1244+ assert physical_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
1245+
1246+ view_name = f"test_schema__dev.{ model_name } "
1247+ view_grants = engine_adapter ._get_current_grants_config (
1248+ exp .to_table (view_name , dialect = engine_adapter .dialect )
1249+ )
1250+ assert view_grants .get ("SELECT" , []) == [roles ["reader" ]["username" ]]
1251+ assert view_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
1252+
1253+ # Data change
1254+ updated_model_definition = f"""
1255+ MODEL (
1256+ name test_schema.{ model_name } ,
1257+ kind { kind_config } ,
1258+ grants (
1259+ 'SELECT' = ['{ roles ["reader" ]["username" ]} '],
1260+ 'INSERT' = ['{ roles ["writer" ]["username" ]} ']
1261+ ),
1262+ grants_target_layer 'all'
1263+ );
1264+ SELECT 1 as id, 'updated_data' as name, CURRENT_TIMESTAMP as updated_at
1265+ """
1266+ (tmp_path / "models" / f"{ model_name } .sql" ).write_text (updated_model_definition )
1267+
1268+ context .load ()
1269+ context .plan ("dev" , start = "2023-01-02" , end = "2023-01-02" , auto_apply = True , no_prompts = True )
1270+
1271+ snapshot = context .get_snapshot (f"test_schema.{ model_name } " )
1272+ assert snapshot
1273+ fingerprint = snapshot .fingerprint .to_version ()
1274+ table_name = f"sqlmesh__test_schema.test_schema__{ model_name } __{ fingerprint } __dev"
1275+ data_change_grants = engine_adapter ._get_current_grants_config (
1276+ exp .to_table (table_name , dialect = engine_adapter .dialect )
1277+ )
1278+ assert data_change_grants .get ("SELECT" , []) == [roles ["reader" ]["username" ]]
1279+ assert data_change_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
1280+
1281+ # Data + grants changes
1282+ grant_change_model_definition = f"""
1283+ MODEL (
1284+ name test_schema.{ model_name } ,
1285+ kind { kind_config } ,
1286+ grants (
1287+ 'SELECT' = ['{ roles ["reader" ]["username" ]} ', '{ roles ["analyst" ]["username" ]} '],
1288+ 'INSERT' = ['{ roles ["writer" ]["username" ]} '],
1289+ 'UPDATE' = ['{ roles ["analyst" ]["username" ]} ']
1290+ ),
1291+ grants_target_layer 'all'
1292+ );
1293+ SELECT 1 as id, 'grant_changed_data' as name, CURRENT_TIMESTAMP as updated_at
1294+ """
1295+ (tmp_path / "models" / f"{ model_name } .sql" ).write_text (grant_change_model_definition )
1296+
1297+ context .load ()
1298+ context .plan ("dev" , start = "2023-01-03" , end = "2023-01-03" , auto_apply = True , no_prompts = True )
1299+
1300+ snapshot = context .get_snapshot (f"test_schema.{ model_name } " )
1301+ assert snapshot
1302+ fingerprint = snapshot .fingerprint .to_version ()
1303+ table_name = f"sqlmesh__test_schema.test_schema__{ model_name } __{ fingerprint } __dev"
1304+ final_grants = engine_adapter ._get_current_grants_config (
1305+ exp .to_table (table_name , dialect = engine_adapter .dialect )
1306+ )
1307+ expected_select_users = {roles ["reader" ]["username" ], roles ["analyst" ]["username" ]}
1308+ assert set (final_grants .get ("SELECT" , [])) == expected_select_users
1309+ assert final_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
1310+ assert final_grants .get ("UPDATE" , []) == [roles ["analyst" ]["username" ]]
1311+
1312+ final_view_grants = engine_adapter ._get_current_grants_config (
1313+ exp .to_table (view_name , dialect = engine_adapter .dialect )
1314+ )
1315+ assert set (final_view_grants .get ("SELECT" , [])) == expected_select_users
1316+ assert final_view_grants .get ("INSERT" , []) == [roles ["writer" ]["username" ]]
1317+ assert final_view_grants .get ("UPDATE" , []) == [roles ["analyst" ]["username" ]]
1318+
1319+
1320+ @pytest .mark .parametrize (
1321+ "model_kind" ,
1322+ [
1323+ "SCD_TYPE_2" ,
1324+ "SCD_TYPE_2_BY_TIME" ,
1325+ ],
1326+ )
1327+ def test_grants_plan_scd_type_2_with_vde_dev_only (
1328+ engine_adapter : PostgresEngineAdapter ,
1329+ ctx : TestContext ,
1330+ tmp_path : Path ,
1331+ model_kind : str ,
1332+ ):
1333+ with create_users (engine_adapter , "etl_user" , "analyst" ) as roles :
1334+ (tmp_path / "models" ).mkdir (exist_ok = True )
1335+ model_name = "vde_scd_model"
1336+
1337+ model_def = f"""
1338+ MODEL (
1339+ name test_schema.{ model_name } ,
1340+ kind { model_kind } (unique_key [customer_id]),
1341+ grants (
1342+ 'SELECT' = ['{ roles ["analyst" ]["username" ]} '],
1343+ 'INSERT' = ['{ roles ["etl_user" ]["username" ]} ']
1344+ ),
1345+ grants_target_layer 'all'
1346+ );
1347+ SELECT
1348+ 1 as customer_id,
1349+ 'active' as status,
1350+ CURRENT_TIMESTAMP as updated_at
1351+ """
1352+ (tmp_path / "models" / f"{ model_name } .sql" ).write_text (model_def )
1353+
1354+ context = ctx .create_context (path = tmp_path , config_mutator = _vde_dev_only_config )
1355+
1356+ # Prod
1357+ context .plan ("prod" , auto_apply = True , no_prompts = True )
1358+ prod_table = f"test_schema.{ model_name } "
1359+ prod_grants = engine_adapter ._get_current_grants_config (
1360+ exp .to_table (prod_table , dialect = engine_adapter .dialect )
1361+ )
1362+ assert roles ["analyst" ]["username" ] in prod_grants .get ("SELECT" , [])
1363+ assert roles ["etl_user" ]["username" ] in prod_grants .get ("INSERT" , [])
1364+
1365+ # Dev
1366+ context .plan ("dev" , auto_apply = True , no_prompts = True , include_unmodified = True )
1367+ dev_view = f"test_schema__dev.{ model_name } "
1368+ dev_grants = engine_adapter ._get_current_grants_config (
1369+ exp .to_table (dev_view , dialect = engine_adapter .dialect )
1370+ )
1371+ assert roles ["analyst" ]["username" ] in dev_grants .get ("SELECT" , [])
1372+ assert roles ["etl_user" ]["username" ] in dev_grants .get ("INSERT" , [])
1373+
1374+ snapshot = context .get_snapshot (f"test_schema.{ model_name } " )
1375+ assert snapshot
1376+ fingerprint_version = snapshot .fingerprint .to_version ()
1377+ dev_physical_table_name = (
1378+ f"sqlmesh__test_schema.test_schema__{ model_name } __{ fingerprint_version } __dev"
1379+ )
1380+
1381+ dev_physical_grants = engine_adapter ._get_current_grants_config (
1382+ exp .to_table (dev_physical_table_name , dialect = engine_adapter .dialect )
1383+ )
1384+ assert roles ["analyst" ]["username" ] in dev_physical_grants .get ("SELECT" , [])
1385+ assert roles ["etl_user" ]["username" ] in dev_physical_grants .get ("INSERT" , [])
0 commit comments