From e016da14a07dd64df194e7b5602ce8bde751bf41 Mon Sep 17 00:00:00 2001 From: dark0dave Date: Mon, 22 Jan 2024 16:10:46 +0000 Subject: [PATCH] fix(expiration): Add expiration back Signed-off-by: dark0dave --- docs/backends/gcloud.rst | 10 +++++++++ storages/backends/gcloud.py | 42 +++++++++++++++---------------------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/docs/backends/gcloud.rst b/docs/backends/gcloud.rst index 962298eb..0c18f27e 100644 --- a/docs/backends/gcloud.rst +++ b/docs/backends/gcloud.rst @@ -61,6 +61,7 @@ For development use cases, or other instances outside Google infrastructure: Alternatively, you can use the setting ``credentials`` or ``GS_CREDENTIALS`` as described below. +It is also now possible to use workload identity by providing the service account via ``GS_SA_SIGNING_EMAIL``. Settings ~~~~~~~~ @@ -219,3 +220,12 @@ Settings It supports `timedelta`, `datetime`, or `integer` seconds since epoch time. Note: The maximum value for this option is 7 days (604800 seconds) in version `v4` (See this `Github issue `_) + +``sa_email`` or ``GS_SA_SIGNING_EMAIL`` + + default: ``''`` + + This is the signing email if it is not fetched from the credentials. Or if you wish to sign the signed urls with a different service_account. + + As above please note that, Default Google Compute Engine (GCE) Service accounts are + `unable to sign urls `_. diff --git a/storages/backends/gcloud.py b/storages/backends/gcloud.py index 77b3e144..135e7f6b 100644 --- a/storages/backends/gcloud.py +++ b/storages/backends/gcloud.py @@ -1,5 +1,4 @@ import gzip -import logging import io import mimetypes from datetime import timedelta @@ -37,7 +36,6 @@ CONTENT_ENCODING = "content_encoding" CONTENT_TYPE = "content_type" -_LOGGER = logging.getLogger(__name__) class GoogleCloudFile(CompressedFileMixin, File): @@ -151,20 +149,18 @@ def get_default_settings(self): @property def client(self): - credentials, project_id = auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform']) - credentials.refresh(requests.Request()) - if not hasattr(credentials, "service_account_email"): - credentials.service_account_email = self.sa_email - _LOGGER.debug(f"Signing email: {credentials.service_account_email}") - self.credentials = credentials if self._client is None: - self._client = Client(project=project_id, credentials=self.credentials) - return self._client + project_id, credentials = self.project_id, self.credentials + if project_id is None and credentials is None: + credentials, project_id = auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform']) + if not hasattr(credentials, "service_account_email"): + credentials.service_account_email = self.sa_email + self._client = Client(project=project_id, credentials=credentials) @property def bucket(self): if self._bucket is None: - self._bucket = self.client.bucket(self.bucket_name) + self._bucket = self._client.bucket(self.bucket_name) return self._bucket def _normalize_name(self, name): @@ -246,7 +242,7 @@ def delete(self, name): def exists(self, name): if not name: # root element aka the bucket try: - self.client.get_bucket(self.bucket) + self._client.get_bucket(self.bucket) return True except NotFound: return False @@ -329,20 +325,16 @@ def url(self, name, parameters=None): storage_base_url=self.custom_endpoint, quoted_name=_quote(name, safe=b"/~"), ) - elif not self.custom_endpoint: - return blob.generate_signed_url(**self.signed_url_extra()) else: - return blob.generate_signed_url( - api_access_endpoint=self.custom_endpoint, - **self.signed_url_extra() - ) - - def signed_url_extra(self): - return { - "service_account_email": self.credentials.service_account_email, - "access_token": self.credentials.token, - "credentials": self.credentials, - } + params = { + "service_account_email": self.credentials.service_account_email, + "access_token": self.credentials.token, + "credentials": self.credentials, + "expiration": self.expiration, + } + if self.custom_endpoint: + params["api_access_endpoint"] = self.custom_endpoint + return blob.generate_signed_url(**params) def get_available_name(self, name, max_length=None): name = clean_name(name)