Skip to content

Commit 9106f83

Browse files
feat: add grants for BigQuery (#5444)
Co-authored-by: eakmanrq <6326532+eakmanrq@users.noreply.github.com>
1 parent 0440ada commit 9106f83

File tree

6 files changed

+366
-46
lines changed

6 files changed

+366
-46
lines changed

.circleci/continue_config.yml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ jobs:
148148
command: ./.circleci/test_migration.sh sushi "--gateway duckdb_persistent"
149149
- run:
150150
name: Run the migration test - sushi_dbt
151-
command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config"
151+
command: ./.circleci/test_migration.sh sushi_dbt "--config migration_test_config"
152152

153153
ui_style:
154154
docker:
@@ -300,23 +300,23 @@ workflows:
300300
name: cloud_engine_<< matrix.engine >>
301301
context:
302302
- sqlmesh_cloud_database_integration
303-
requires:
304-
- engine_tests_docker
303+
# requires:
304+
# - engine_tests_docker
305305
matrix:
306306
parameters:
307307
engine:
308-
- snowflake
309-
- databricks
310-
- redshift
308+
# - snowflake
309+
# - databricks
310+
# - redshift
311311
- bigquery
312-
- clickhouse-cloud
313-
- athena
314-
- fabric
315-
- gcp-postgres
316-
filters:
317-
branches:
318-
only:
319-
- main
312+
# - clickhouse-cloud
313+
# - athena
314+
# - fabric
315+
# - gcp-postgres
316+
# filters:
317+
# branches:
318+
# only:
319+
# - main
320320
- ui_style
321321
- ui_test
322322
- vscode_test

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 105 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from sqlmesh.core.dialect import to_schema
1111
from sqlmesh.core.engine_adapter.mixins import (
1212
ClusteredByMixin,
13+
GrantsFromInfoSchemaMixin,
1314
RowDiffMixin,
1415
TableAlterClusterByOperation,
1516
)
@@ -39,7 +40,7 @@
3940
from google.cloud.bigquery.table import Table as BigQueryTable
4041

4142
from sqlmesh.core._typing import SchemaName, SessionProperties, TableName
42-
from sqlmesh.core.engine_adapter._typing import BigframeSession, DF, Query
43+
from sqlmesh.core.engine_adapter._typing import BigframeSession, DCL, DF, GrantsConfig, Query
4344
from sqlmesh.core.engine_adapter.base import QueryOrDF
4445

4546

@@ -54,7 +55,7 @@
5455

5556

5657
@set_catalog()
57-
class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin):
58+
class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin, GrantsFromInfoSchemaMixin):
5859
"""
5960
BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API.
6061
"""
@@ -64,6 +65,11 @@ class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin):
6465
SUPPORTS_TRANSACTIONS = False
6566
SUPPORTS_MATERIALIZED_VIEWS = True
6667
SUPPORTS_CLONING = True
68+
SUPPORTS_GRANTS = True
69+
CURRENT_USER_OR_ROLE_EXPRESSION: exp.Expression = exp.func("session_user")
70+
SUPPORTS_MULTIPLE_GRANT_PRINCIPALS = True
71+
USE_CATALOG_IN_GRANTS = True
72+
GRANT_INFORMATION_SCHEMA_TABLE_NAME = "OBJECT_PRIVILEGES"
6773
MAX_TABLE_COMMENT_LENGTH = 1024
6874
MAX_COLUMN_COMMENT_LENGTH = 1024
6975
SUPPORTS_QUERY_EXECUTION_TRACKING = True
@@ -1297,6 +1303,103 @@ def _session_id(self) -> t.Any:
12971303
def _session_id(self, value: t.Any) -> None:
12981304
self._connection_pool.set_attribute("session_id", value)
12991305

1306+
def _get_current_schema(self) -> str:
1307+
raise NotImplementedError("BigQuery does not support current schema")
1308+
1309+
def _get_bq_dataset_location(self, project: str, dataset: str) -> str:
1310+
return self._db_call(self.client.get_dataset, dataset_ref=f"{project}.{dataset}").location
1311+
1312+
def _get_grant_expression(self, table: exp.Table) -> exp.Expression:
1313+
if not table.db:
1314+
raise ValueError(
1315+
f"Table {table.sql(dialect=self.dialect)} does not have a schema (dataset)"
1316+
)
1317+
project = table.catalog or self.get_current_catalog()
1318+
if not project:
1319+
raise ValueError(
1320+
f"Table {table.sql(dialect=self.dialect)} does not have a catalog (project)"
1321+
)
1322+
1323+
dataset = table.db
1324+
table_name = table.name
1325+
location = self._get_bq_dataset_location(project, dataset)
1326+
1327+
# https://cloud.google.com/bigquery/docs/information-schema-object-privileges
1328+
# OBJECT_PRIVILEGES is a project-level INFORMATION_SCHEMA view with regional qualifier
1329+
object_privileges_table = exp.to_table(
1330+
f"`{project}`.`region-{location}`.INFORMATION_SCHEMA.{self.GRANT_INFORMATION_SCHEMA_TABLE_NAME}",
1331+
dialect=self.dialect,
1332+
)
1333+
return (
1334+
exp.select("privilege_type", "grantee")
1335+
.from_(object_privileges_table)
1336+
.where(
1337+
exp.and_(
1338+
exp.column("object_schema").eq(exp.Literal.string(dataset)),
1339+
exp.column("object_name").eq(exp.Literal.string(table_name)),
1340+
# Filter out current_user
1341+
# BigQuery grantees format: "user:email" or "group:name"
1342+
exp.func("split", exp.column("grantee"), exp.Literal.string(":"))[
1343+
exp.func("OFFSET", exp.Literal.number("1"))
1344+
].neq(self.CURRENT_USER_OR_ROLE_EXPRESSION),
1345+
)
1346+
)
1347+
)
1348+
1349+
@staticmethod
1350+
def _grant_object_kind(table_type: DataObjectType) -> str:
1351+
if table_type == DataObjectType.VIEW:
1352+
return "VIEW"
1353+
return "TABLE"
1354+
1355+
def _dcl_grants_config_expr(
1356+
self,
1357+
dcl_cmd: t.Type[DCL],
1358+
table: exp.Table,
1359+
grant_config: GrantsConfig,
1360+
table_type: DataObjectType = DataObjectType.TABLE,
1361+
) -> t.List[exp.Expression]:
1362+
expressions: t.List[exp.Expression] = []
1363+
if not grant_config:
1364+
return expressions
1365+
1366+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-control-language
1367+
1368+
def normalize_principal(p: str) -> str:
1369+
if ":" not in p:
1370+
raise ValueError(f"Principal '{p}' missing a prefix label")
1371+
1372+
# allUsers and allAuthenticatedUsers special groups that are cas-sensitive and must start with "specialGroup:"
1373+
if p.endswith("allUsers") or p.endswith("allAuthenticatedUsers"):
1374+
if not p.startswith("specialGroup:"):
1375+
raise ValueError(
1376+
f"Special group principal '{p}' must start with 'specialGroup:' prefix label"
1377+
)
1378+
return p
1379+
1380+
label, principal = p.split(":", 1)
1381+
# always lowercase principals
1382+
return f"{label}:{principal.lower()}"
1383+
1384+
object_kind = self._grant_object_kind(table_type)
1385+
for privilege, principals in grant_config.items():
1386+
if not principals:
1387+
continue
1388+
1389+
noramlized_principals = [exp.Literal.string(normalize_principal(p)) for p in principals]
1390+
args: t.Dict[str, t.Any] = {
1391+
"privileges": [exp.GrantPrivilege(this=exp.to_identifier(privilege, quoted=True))],
1392+
"securable": table.copy(),
1393+
"principals": noramlized_principals,
1394+
}
1395+
1396+
if object_kind:
1397+
args["kind"] = exp.Var(this=object_kind)
1398+
1399+
expressions.append(dcl_cmd(**args)) # type: ignore[arg-type]
1400+
1401+
return expressions
1402+
13001403

13011404
class _ErrorCounter:
13021405
"""

sqlmesh/core/snapshot/evaluator.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,19 +1723,29 @@ def _apply_grants(
17231723
return
17241724

17251725
model_grants_target_layer = model.grants_target_layer
1726+
deployable_vde_dev_only = (
1727+
is_snapshot_deployable and model.virtual_environment_mode.is_dev_only
1728+
)
1729+
1730+
# table_type is always a VIEW in the virtual layer unless model is deployable and VDE is dev_only
1731+
# in which case we fall back to the model's model_grants_table_type
1732+
if target_layer == GrantsTargetLayer.VIRTUAL and not deployable_vde_dev_only:
1733+
model_grants_table_type = DataObjectType.VIEW
1734+
else:
1735+
model_grants_table_type = model.grants_table_type
17261736

17271737
if (
17281738
model_grants_target_layer.is_all
17291739
or model_grants_target_layer == target_layer
17301740
# Always apply grants in production when VDE is dev_only regardless of target_layer
17311741
# since only physical tables are created in production
1732-
or (is_snapshot_deployable and model.virtual_environment_mode.is_dev_only)
1742+
or deployable_vde_dev_only
17331743
):
17341744
logger.info(f"Applying grants for model {model.name} to table {table_name}")
17351745
self.adapter.sync_grants_config(
17361746
exp.to_table(table_name, dialect=self.adapter.dialect),
17371747
grants_config,
1738-
model.grants_table_type,
1748+
model_grants_table_type,
17391749
)
17401750
else:
17411751
logger.debug(

tests/core/engine_adapter/integration/__init__.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,21 @@ def _get_create_user_or_role(
758758
# Creating an account-level group in Databricks requires making REST API calls so we are going to
759759
# use a pre-created group instead. We assume the suffix on the name is the unique id
760760
return "_".join(username.split("_")[:-1]), None
761+
if self.dialect == "bigquery":
762+
# BigQuery uses IAM service accounts that need to be pre-created
763+
# Pre-created GCP service accounts:
764+
# - sqlmesh-test-admin@{project-id}.iam.gserviceaccount.com
765+
# - sqlmesh-test-analyst@{project-id}.iam.gserviceaccount.com
766+
# - sqlmesh-test-etl-user@{project-id}.iam.gserviceaccount.com
767+
# - sqlmesh-test-reader@{project-id}.iam.gserviceaccount.com
768+
# - sqlmesh-test-user@{project-id}.iam.gserviceaccount.com
769+
# - sqlmesh-test-writer@{project-id}.iam.gserviceaccount.com
770+
role_name = (
771+
username.replace(f"_{self.test_id}", "").replace("test_", "").replace("_", "-")
772+
)
773+
project_id = self.engine_adapter.get_current_catalog()
774+
service_account = f"sqlmesh-test-{role_name}@{project_id}.iam.gserviceaccount.com"
775+
return f"serviceAccount:{service_account}", None
761776
raise ValueError(f"User creation not supported for dialect: {self.dialect}")
762777

763778
def _create_user_or_role(self, username: str, password: t.Optional[str] = None) -> str:
@@ -791,20 +806,29 @@ def create_users_or_roles(self, *role_names: str) -> t.Iterator[t.Dict[str, str]
791806
for user_name in created_users:
792807
self._cleanup_user_or_role(user_name)
793808

809+
def get_select_privilege(self) -> str:
810+
if self.dialect == "bigquery":
811+
return "roles/bigquery.dataViewer"
812+
return "SELECT"
813+
794814
def get_insert_privilege(self) -> str:
795815
if self.dialect == "databricks":
796816
# This would really be "MODIFY" but for the purposes of having this be unique from UPDATE
797817
# we return "MANAGE" instead
798818
return "MANAGE"
819+
if self.dialect == "bigquery":
820+
return "roles/bigquery.dataEditor"
799821
return "INSERT"
800822

801823
def get_update_privilege(self) -> str:
802824
if self.dialect == "databricks":
803825
return "MODIFY"
826+
if self.dialect == "bigquery":
827+
return "roles/bigquery.dataOwner"
804828
return "UPDATE"
805829

806830
def _cleanup_user_or_role(self, user_name: str) -> None:
807-
"""Helper function to clean up a PostgreSQL user and all their dependencies."""
831+
"""Helper function to clean up a user/role and all their dependencies."""
808832
try:
809833
if self.dialect in ["postgres", "redshift"]:
810834
self.engine_adapter.execute(f"""
@@ -816,7 +840,8 @@ def _cleanup_user_or_role(self, user_name: str) -> None:
816840
self.engine_adapter.execute(f'DROP USER IF EXISTS "{user_name}"')
817841
elif self.dialect == "snowflake":
818842
self.engine_adapter.execute(f"DROP ROLE IF EXISTS {user_name}")
819-
elif self.dialect == "databricks":
843+
elif self.dialect in ["databricks", "bigquery"]:
844+
# For Databricks and BigQuery, we use pre-created accounts that should not be deleted
820845
pass
821846
except Exception:
822847
pass

0 commit comments

Comments
 (0)