Skip to content

Commit

Permalink
fix: use impersonated service accounts for BigQuery Data Transfer (#2073
Browse files Browse the repository at this point in the history
)

Service

* Previously we just used the standard IAM role from Dagster, which
  fails when creating the transfer job.
* This was never triggered on development because we use a superadmin
  role there
* In production, we need to impersonate a service account
  • Loading branch information
ryscheng authored Sep 5, 2024
1 parent 37d0cf3 commit 47dae82
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
16 changes: 12 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ dagster-duckdb = "^0.24.0"
dagster-duckdb-polars = "^0.24.0"
google-cloud-bigquery-storage = "^2.25.0"
dagster-sqlmesh = "0.2.0.dev2"
google-auth = "^2.34.0"


[tool.poetry.scripts]
Expand Down
7 changes: 2 additions & 5 deletions warehouse/oso_dagster/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def get_project_id():
dbt_profiles_dir = get_profiles_dir()
dbt_target_base_dir = os.getenv("DAGSTER_DBT_TARGET_BASE_DIR") or ""
PRODUCTION_DBT_TARGET = "production"
impersonate_service_account = os.getenv("DAGSTER_DBT_IMPERSONATE_SERVICE_ACCOUNT", "")
main_dbt_manifests = load_dbt_manifests(
dbt_target_base_dir,
main_dbt_project_dir,
Expand All @@ -75,11 +76,7 @@ def get_project_id():
("base_playground", "oso_base_playground"),
("playground", "oso_playground"),
],
BQTargetConfigTemplate(
impersonate_service_account=os.getenv(
"DAGSTER_DBT_IMPERSONATE_SERVICE_ACCOUNT", ""
)
),
BQTargetConfigTemplate(impersonate_service_account=impersonate_service_account),
parse_projects=os.getenv("DAGSTER_DBT_PARSE_PROJECT_ON_LOAD", "0") == "1",
)
verbose_logs = os.getenv("DAGSTER_VERBOSE_LOGS", "false").lower() in ["true", "1"]
Expand Down
17 changes: 16 additions & 1 deletion warehouse/oso_dagster/resources/bq_dts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,14 @@
resource,
)
from dagster_gcp.bigquery.utils import setup_gcp_creds
from google import auth
from google.auth import impersonated_credentials
from google.cloud.bigquery_datatransfer import DataTransferServiceClient
from pydantic import Field
from ..constants import impersonate_service_account

IMPERSONATE_SCOPES = ["bigquery.admin"]
IMPERSONATE_LIFETIME = 300

"""
Note: This code is predominantly copied from the BigQueryResource
Expand Down Expand Up @@ -66,7 +72,16 @@ def get_client(self) -> Iterator[DataTransferServiceClient]:
yield DataTransferServiceClient()

else:
yield DataTransferServiceClient()
# By default, create an impersonated credential for a service account.
# This is necessary to create BigQuery DataTransfer jobs
credentials = auth.default()
target_credentials = impersonated_credentials.Credentials(
source_credentials=credentials,
target_principal=impersonate_service_account,
delegates=[],
target_scopes=IMPERSONATE_SCOPES,
lifetime=IMPERSONATE_LIFETIME)
yield DataTransferServiceClient(credentials=target_credentials)

def get_object_to_set_on_execution_context(self) -> Any:
with self.get_client() as client:
Expand Down

0 comments on commit 47dae82

Please sign in to comment.