Skip to content

Commit

Permalink
split the upload method into two helper function
Browse files Browse the repository at this point in the history
  • Loading branch information
MAfarrag committed Dec 15, 2024
1 parent 257c5c8 commit 7e7992a
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 25 deletions.
153 changes: 130 additions & 23 deletions src/unicloud/google_cloud/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
class GCS(CloudStorageFactory):
"""GCS Cloud Storage."""

def __init__(self, project_id: str, service_key: Optional[str] = None):
def __init__(self, project_id: str, service_key_path: Optional[str] = None):
"""Initialize the GCS client.
Parameters
----------
project_id: [str]
The Google Cloud project name.
service_key: str, optional, default=None
service_key_path: str, optional, default=None
The path to your service key file.
Raises
Expand All @@ -36,7 +36,7 @@ def __init__(self, project_id: str, service_key: Optional[str] = None):
- You can provide a path to your service key file. The file should be a JSON file with the service account
credentials. You can provide the path to the file as the `service_key` argument.
>>> gcs = GCS("my-project-id", service_key="path/to/your/service-account.json") # doctest: +SKIP
>>> gcs = GCS("my-project-id", service_key_path="path/to/your/service-account.json") # doctest: +SKIP
>>> print(gcs) # doctest: +SKIP
<BlankLine>
project_id: my-project-id,
Expand All @@ -57,12 +57,12 @@ def __init__(self, project_id: str, service_key: Optional[str] = None):
with the content of your service key file encoded using the `unicloud.secret_manager.encode` function.
"""
self._project_id = project_id
if service_key is not None and not Path(service_key).exists():
if service_key_path is not None and not Path(service_key_path).exists():
raise FileNotFoundError(
f"The service key file {service_key} does not exist"
f"The service key file {service_key_path} does not exist"
)

self.service_key = service_key
self.service_key = service_key_path
self._client = self.create_client()

@property
Expand Down Expand Up @@ -383,7 +383,12 @@ def file_exists(self, file_name: str) -> bool:
blob = self.bucket.get_blob(file_name)
return False if blob is None else True

def upload(self, local_path: Union[str, Path], bucket_path: Union[str, Path]):
def upload(
self,
local_path: Union[str, Path],
bucket_path: Union[str, Path],
overwrite: bool = False,
):
"""Upload a file to GCS.
Uploads a file or an entire directory to a Google Cloud Storage bucket.
Expand All @@ -401,6 +406,8 @@ def upload(self, local_path: Union[str, Path], bucket_path: Union[str, Path]):
The destination path in the GCS bucket where the file(s) will be uploaded.
- For a single file, provide the full path (e.g., "bucket/folder/file.txt").
- For a directory upload, provide the base path (e.g., "bucket/folder/").
overwrite : bool, optional
If True, overwrite existing files. Default is False.
Raises
------
Expand Down Expand Up @@ -433,28 +440,128 @@ def upload(self, local_path: Union[str, Path], bucket_path: Union[str, Path]):
raise FileNotFoundError(f"The local path {local_path} does not exist.")

if local_path.is_file():
# Upload a single file
blob = self.bucket.blob(bucket_path)
blob.upload_from_filename(str(local_path))
print(f"File {local_path} uploaded to {bucket_path}.")
self._upload_file(local_path, bucket_path, overwrite)
elif local_path.is_dir():
# Upload all files in the directory
for file in local_path.rglob("*"):
if file.is_file():
# Preserve directory structure in the bucket
relative_path = file.relative_to(local_path)
bucket_file_path = (
f"{bucket_path.rstrip('/')}/{relative_path.as_posix()}"
)
blob = self.bucket.blob(bucket_file_path)
blob.upload_from_filename(str(file))
print(f"File {file} uploaded to {bucket_file_path}.")
self._upload_directory(local_path, bucket_path, overwrite)
else:
raise ValueError(
f"The local path {local_path} is neither a file nor a directory."
)

def download(self, file_name, local_path):
def _upload_file(
self, local_path: Path, bucket_path: str, overwrite: bool = False
) -> None:
"""
Upload a single file to GCS with overwrite handling.
Parameters
----------
local_path : Path
The local file to upload.
bucket_path : str
The destination path in the GCS bucket.
overwrite : bool
If True, overwrites the file if it already exists in the bucket. If False, raises
a `ValueError` if the destination file already exists.
Raises
------
ValueError
If the file exists in the bucket and `overwrite` is False.
FileNotFoundError
If the `local_path` does not exist or is not a file.
Examples
--------
>>> Bucket_ID = "test-bucket"
>>> PROJECT_ID = "py-project-id"
>>> gcs = GCS(PROJECT_ID) # doctest: +SKIP
>>> my_bucket = gcs.get_bucket(Bucket_ID) # doctest: +SKIP
Upload a single file, allowing overwrites:
>>> my_bucket._upload_file(
... Path("local/file.txt"),
... "bucket/folder/file.txt",
... overwrite=True
... ) # doctest: +SKIP
Upload a single file without overwrites:
>>> try:
... my_bucket._upload_file(
... Path("local/file.txt"),
... "bucket/folder/file.txt",
... overwrite=False
... ) # doctest: +SKIP
... except ValueError as e:
... print(e)
"The file 'bucket/folder/file.txt' already exists in the bucket and overwrite is set to False."
"""
blob = self.bucket.blob(bucket_path)

if not overwrite and blob.exists():
raise ValueError(
f"The file '{bucket_path}' already exists in the bucket and overwrite is set to False."
)

blob.upload_from_filename(str(local_path))
print(f"File '{local_path}' uploaded to '{bucket_path}'.")

def _upload_directory(
self, local_path: Path, bucket_path: str, overwrite: bool = False
):
"""
Upload an entire directory, including subdirectories, to GCS.
Parameters
----------
local_path : Path
The path to the local directory to upload.
bucket_path : str
The base destination path in the GCS bucket where the directory contents will be uploaded.
Directory structure is preserved relative to `local_path`.
overwrite : bool
If True, overwrites existing files in the bucket. If False, raises a `ValueError`
for any existing files.
Raises
------
FileNotFoundError
If the `local_path` does not exist or is not a directory.
ValueError
If any destination file exists in the bucket and `overwrite` is False.
Examples
--------
>>> Bucket_ID = "test-bucket"
>>> PROJECT_ID = "py-project-id"
>>> gcs = GCS(PROJECT_ID) # doctest: +SKIP
>>> my_bucket = gcs.get_bucket(Bucket_ID) # doctest: +SKIP
Upload a directory with overwrites allowed:
>>> my_bucket._upload_directory(
... Path("local/directory/"),
... "bucket/folder/",
... overwrite=True
... ) # doctest: +SKIP
Upload a directory without overwrites:
>>> try:
... my_bucket._upload_directory(
... Path("local/directory/"),
... "bucket/folder/",
... overwrite=False
... ) # doctest: +SKIP
... except ValueError as e:
... print(e)
"The file 'bucket/folder/subdir/file.txt' already exists in the bucket and overwrite is set to False."
"""
for file in local_path.rglob("*"):
if file.is_file():
relative_path = file.relative_to(local_path)
bucket_file_path = (
f"{bucket_path.rstrip('/')}/{relative_path.as_posix()}"
)
self._upload_file(file, bucket_file_path, overwrite)

def download(self, file_name: str, local_path: str):
"""Download a file from GCS.
Downloads a file from a Google Cloud Storage bucket to a local directory or path.
Expand Down
4 changes: 2 additions & 2 deletions tests/google_cloud/test_gcs_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def test_upload_single_file(self):
with patch("pathlib.Path.exists", return_value=True), patch(
"pathlib.Path.is_file", return_value=True
):
gcs_bucket.upload(local_file, bucket_path)
gcs_bucket.upload(local_file, bucket_path, overwrite=True)

mock_bucket.blob.assert_called_once_with(bucket_path)
mock_blob.upload_from_filename.assert_called_once_with(str(local_file))
Expand Down Expand Up @@ -344,7 +344,7 @@ def test_upload_directory_with_subdirectories(self):

# Mock individual file checks and uploads
with patch("pathlib.Path.is_file", side_effect=[False, True, True, True]):
gcs_bucket.upload(local_directory, bucket_path)
gcs_bucket.upload(local_directory, bucket_path, overwrite=True)

for file in files:
relative_path = file.relative_to(local_directory)
Expand Down

0 comments on commit 7e7992a

Please sign in to comment.