diff --git a/docs/bigquery/legacy_proto_types.rst b/docs/bigquery/legacy_proto_types.rst index bc1e93715..36e9984b9 100644 --- a/docs/bigquery/legacy_proto_types.rst +++ b/docs/bigquery/legacy_proto_types.rst @@ -3,7 +3,7 @@ Legacy proto-based Types for Google Cloud Bigquery v2 API .. warning:: These types are provided for backward compatibility only, and are not maintained - anymore. They might also differ from the types uspported on the backend. It is + anymore. They might also differ from the types supported on the backend. It is therefore strongly advised to migrate to the types found in :doc:`standard_sql`. Also see the :doc:`3.0.0 Migration Guide<../UPGRADING>` for more information. diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index c021771ee..050672531 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -21,13 +21,14 @@ import logging import queue import warnings -from typing import Any, Union +from typing import Any, Union, Optional, Callable, Generator, List from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers from google.cloud.bigquery import schema + try: import pandas # type: ignore @@ -75,7 +76,7 @@ def _to_wkb(v): _to_wkb = _to_wkb() try: - from google.cloud.bigquery_storage import ArrowSerializationOptions + from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions except ImportError: _ARROW_COMPRESSION_SUPPORT = False else: @@ -821,18 +822,54 @@ def _nowait(futures): def _download_table_bqstorage( - project_id, - table, - bqstorage_client, - preserve_order=False, - selected_fields=None, - page_to_item=None, - max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, -): - """Use (faster, but billable) BQ Storage API to construct DataFrame.""" + project_id: str, + table: Any, + bqstorage_client: Any, + preserve_order: bool = False, + selected_fields: Optional[List[Any]] = None, + page_to_item: Optional[Callable] = None, + max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, + max_stream_count: Optional[int] = None, +) -> Generator[Any, None, None]: + """Downloads a BigQuery table using the BigQuery Storage API. + + This method uses the faster, but potentially more expensive, BigQuery + Storage API to download a table as a Pandas DataFrame. It supports + parallel downloads and optional data transformations. + + Args: + project_id (str): The ID of the Google Cloud project containing + the table. + table (Any): The BigQuery table to download. + bqstorage_client (Any): An + authenticated BigQuery Storage API client. + preserve_order (bool, optional): Whether to preserve the order + of the rows as they are read from BigQuery. If True this limits + the number of streams to one and overrides `max_stream_count`. + Defaults to False. + selected_fields (Optional[List[SchemaField]]): + A list of BigQuery schema fields to select for download. If None, + all fields are downloaded. Defaults to None. + page_to_item (Optional[Callable]): An optional callable + function that takes a page of data from the BigQuery Storage API + max_stream_count (Optional[int]): The maximum number of + concurrent streams to use for downloading data. If `preserve_order` + is True, the requested streams are limited to 1 regardless of the + `max_stream_count` value. If 0 or None, then the number of + requested streams will be unbounded. Defaults to None. + + Yields: + pandas.DataFrame: Pandas DataFrames, one for each chunk of data + downloaded from BigQuery. + + Raises: + ValueError: If attempting to read from a specific partition or snapshot. + + Note: + This method requires the `google-cloud-bigquery-storage` library + to be installed. + """ - # Passing a BQ Storage client in implies that the BigQuery Storage library - # is available and can be imported. from google.cloud import bigquery_storage if "$" in table.table_id: @@ -842,10 +879,11 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") - requested_streams = 1 if preserve_order else 0 + requested_streams = determine_requested_streams(preserve_order, max_stream_count) - requested_session = bigquery_storage.types.ReadSession( - table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW + requested_session = bigquery_storage.types.stream.ReadSession( + table=table.to_bqstorage(), + data_format=bigquery_storage.types.stream.DataFormat.ARROW, ) if selected_fields is not None: for field in selected_fields: @@ -853,7 +891,8 @@ def _download_table_bqstorage( if _ARROW_COMPRESSION_SUPPORT: requested_session.read_options.arrow_serialization_options.buffer_compression = ( - ArrowSerializationOptions.CompressionCodec.LZ4_FRAME + # CompressionCodec(1) -> LZ4_FRAME + ArrowSerializationOptions.CompressionCodec(1) ) session = bqstorage_client.create_read_session( @@ -889,7 +928,7 @@ def _download_table_bqstorage( elif max_queue_size is None: max_queue_size = 0 # unbounded - worker_queue = queue.Queue(maxsize=max_queue_size) + worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: try: @@ -915,7 +954,7 @@ def _download_table_bqstorage( # we want to block on the queue's get method, instead. This # prevents the queue from filling up, because the main thread # has smaller gaps in time between calls to the queue's get - # method. For a detailed explaination, see: + # method. For a detailed explanation, see: # https://friendliness.dev/2019/06/18/python-nowait/ done, not_done = _nowait(not_done) for future in done: @@ -954,6 +993,7 @@ def download_arrow_bqstorage( preserve_order=False, selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, + max_stream_count=None, ): return _download_table_bqstorage( project_id, @@ -963,6 +1003,7 @@ def download_arrow_bqstorage( selected_fields=selected_fields, page_to_item=_bqstorage_page_to_arrow, max_queue_size=max_queue_size, + max_stream_count=max_stream_count, ) @@ -975,6 +1016,7 @@ def download_dataframe_bqstorage( preserve_order=False, selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, + max_stream_count=None, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -985,6 +1027,7 @@ def download_dataframe_bqstorage( selected_fields=selected_fields, page_to_item=page_to_item, max_queue_size=max_queue_size, + max_stream_count=max_stream_count, ) @@ -1029,3 +1072,40 @@ def verify_pandas_imports(): raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception if db_dtypes is None: raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception + + +def determine_requested_streams( + preserve_order: bool, + max_stream_count: Union[int, None], +) -> int: + """Determines the value of requested_streams based on the values of + `preserve_order` and `max_stream_count`. + + Args: + preserve_order (bool): Whether to preserve the order of streams. If True, + this limits the number of streams to one. `preserve_order` takes + precedence over `max_stream_count`. + max_stream_count (Union[int, None]]): The maximum number of streams + allowed. Must be a non-negative number or None, where None indicates + the value is unset. NOTE: if `preserve_order` is also set, it takes + precedence over `max_stream_count`, thus to ensure that `max_stream_count` + is used, ensure that `preserve_order` is None. + + Returns: + (int) The appropriate value for requested_streams. + """ + + if preserve_order: + # If preserve order is set, it takes precendence. + # Limit the requested streams to 1, to ensure that order + # is preserved) + return 1 + + elif max_stream_count is not None: + # If preserve_order is not set, only then do we consider max_stream_count + if max_stream_count <= -1: + raise ValueError("max_stream_count must be non-negative OR None") + return max_stream_count + + # Default to zero requested streams (unbounded). + return 0 diff --git a/samples/desktopapp/requirements.txt b/samples/desktopapp/requirements.txt index dafb60b2a..383829d7d 100644 --- a/samples/desktopapp/requirements.txt +++ b/samples/desktopapp/requirements.txt @@ -1,2 +1,2 @@ -google-cloud-bigquery==3.25.0 +google-cloud-bigquery==3.26.0 google-auth-oauthlib==1.2.1 diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index cc0f3ad17..1089dc195 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -9,14 +9,14 @@ cligj==0.7.2 dataclasses==0.8; python_version < '3.7' db-dtypes==1.3.0 Fiona===1.9.6; python_version == '3.7' -Fiona==1.10.0; python_version >= '3.8' +Fiona==1.10.1; python_version >= '3.8' geojson==3.1.0 geopandas===0.10.2; python_version == '3.7' geopandas===0.13.2; python_version == '3.8' geopandas==1.0.1; python_version >= '3.9' -google-api-core==2.19.2 -google-auth==2.34.0 -google-cloud-bigquery==3.25.0 +google-api-core==2.20.0 +google-auth==2.35.0 +google-cloud-bigquery==3.26.0 google-cloud-bigquery-storage==2.26.0 google-cloud-core==2.4.1 google-crc32c===1.5.0; python_version < '3.9' @@ -32,7 +32,7 @@ packaging===24.0; python_version == '3.7' packaging==24.1; python_version >= '3.8' pandas===1.3.5; python_version == '3.7' pandas===2.0.3; python_version == '3.8' -pandas==2.2.2; python_version >= '3.9' +pandas==2.2.3; python_version >= '3.9' proto-plus==1.24.0 pyarrow==12.0.1; python_version == '3.7' pyarrow==17.0.0; python_version >= '3.8' @@ -56,4 +56,4 @@ typing-extensions===4.7.1; python_version == '3.7' typing-extensions==4.12.2; python_version >= '3.8' typing-inspect==0.9.0 urllib3===1.26.18; python_version == '3.7' -urllib3==2.2.2; python_version >= '3.8' +urllib3==2.2.3; python_version >= '3.8' diff --git a/samples/magics/requirements.txt b/samples/magics/requirements.txt index 4652fcdf2..6386fb6d2 100644 --- a/samples/magics/requirements.txt +++ b/samples/magics/requirements.txt @@ -1,6 +1,6 @@ -bigquery_magics==0.3.0 +bigquery_magics==0.4.0 db-dtypes==1.3.0 -google.cloud.bigquery==3.25.0 +google.cloud.bigquery==3.26.0 google-cloud-bigquery-storage==2.26.0 ipython===7.31.1; python_version == '3.7' ipython===8.0.1; python_version == '3.8' diff --git a/samples/notebooks/requirements.txt b/samples/notebooks/requirements.txt index c4b75f3db..7463e1afc 100644 --- a/samples/notebooks/requirements.txt +++ b/samples/notebooks/requirements.txt @@ -1,6 +1,6 @@ -bigquery-magics==0.3.0 +bigquery-magics==0.4.0 db-dtypes==1.3.0 -google-cloud-bigquery==3.25.0 +google-cloud-bigquery==3.26.0 google-cloud-bigquery-storage==2.26.0 ipython===7.31.1; python_version == '3.7' ipython===8.0.1; python_version == '3.8' diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt index 9e181d963..65ce0be9f 100644 --- a/samples/snippets/requirements.txt +++ b/samples/snippets/requirements.txt @@ -1,2 +1,2 @@ # samples/snippets should be runnable with no "extras" -google-cloud-bigquery==3.25.0 +google-cloud-bigquery==3.26.0 diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index 203cc1d1c..3a5fddacc 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -18,6 +18,7 @@ import functools import operator import queue +from typing import Union from unittest import mock import warnings @@ -46,6 +47,7 @@ from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers from google.cloud.bigquery import schema +from google.cloud.bigquery._pandas_helpers import determine_requested_streams pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import() @@ -2053,3 +2055,32 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch): monkeypatch.setattr(module_under_test, "db_dtypes", None) with pytest.raises(ValueError, match="Please install the 'db-dtypes' package"): module_under_test.verify_pandas_imports() + + +@pytest.mark.parametrize( + "preserve_order, max_stream_count, expected_requested_streams", + [ + # If preserve_order is set/True, it takes precedence: + (True, 10, 1), # use 1 + (True, None, 1), # use 1 + # If preserve_order is not set check max_stream_count: + (False, 10, 10), # max_stream_count (X) takes precedence + (False, None, 0), # Unbounded (0) when both are unset + ], +) +def test_determine_requested_streams( + preserve_order: bool, + max_stream_count: Union[int, None], + expected_requested_streams: int, +): + """Tests various combinations of preserve_order and max_stream_count.""" + actual_requested_streams = determine_requested_streams( + preserve_order, max_stream_count + ) + assert actual_requested_streams == expected_requested_streams + + +def test_determine_requested_streams_invalid_max_stream_count(): + """Tests that a ValueError is raised if max_stream_count is negative.""" + with pytest.raises(ValueError): + determine_requested_streams(preserve_order=False, max_stream_count=-1)