Skip to content

Commit

Permalink
Merge branch 'main' into fix-bq-storage-client-deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
kien-truong authored Oct 10, 2024
2 parents a3486d7 + 7372ad6 commit 256b0f7
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 32 deletions.
2 changes: 1 addition & 1 deletion docs/bigquery/legacy_proto_types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Legacy proto-based Types for Google Cloud Bigquery v2 API

.. warning::
These types are provided for backward compatibility only, and are not maintained
anymore. They might also differ from the types uspported on the backend. It is
anymore. They might also differ from the types supported on the backend. It is
therefore strongly advised to migrate to the types found in :doc:`standard_sql`.

Also see the :doc:`3.0.0 Migration Guide<../UPGRADING>` for more information.
Expand Down
118 changes: 99 additions & 19 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import logging
import queue
import warnings
from typing import Any, Union
from typing import Any, Union, Optional, Callable, Generator, List


from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema


try:
import pandas # type: ignore

Expand Down Expand Up @@ -75,7 +76,7 @@ def _to_wkb(v):
_to_wkb = _to_wkb()

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
Expand Down Expand Up @@ -821,18 +822,54 @@ def _nowait(futures):


def _download_table_bqstorage(
project_id,
table,
bqstorage_client,
preserve_order=False,
selected_fields=None,
page_to_item=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
project_id: str,
table: Any,
bqstorage_client: Any,
preserve_order: bool = False,
selected_fields: Optional[List[Any]] = None,
page_to_item: Optional[Callable] = None,
max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
max_stream_count: Optional[int] = None,
) -> Generator[Any, None, None]:
"""Downloads a BigQuery table using the BigQuery Storage API.
This method uses the faster, but potentially more expensive, BigQuery
Storage API to download a table as a Pandas DataFrame. It supports
parallel downloads and optional data transformations.
Args:
project_id (str): The ID of the Google Cloud project containing
the table.
table (Any): The BigQuery table to download.
bqstorage_client (Any): An
authenticated BigQuery Storage API client.
preserve_order (bool, optional): Whether to preserve the order
of the rows as they are read from BigQuery. If True this limits
the number of streams to one and overrides `max_stream_count`.
Defaults to False.
selected_fields (Optional[List[SchemaField]]):
A list of BigQuery schema fields to select for download. If None,
all fields are downloaded. Defaults to None.
page_to_item (Optional[Callable]): An optional callable
function that takes a page of data from the BigQuery Storage API
max_stream_count (Optional[int]): The maximum number of
concurrent streams to use for downloading data. If `preserve_order`
is True, the requested streams are limited to 1 regardless of the
`max_stream_count` value. If 0 or None, then the number of
requested streams will be unbounded. Defaults to None.
Yields:
pandas.DataFrame: Pandas DataFrames, one for each chunk of data
downloaded from BigQuery.
Raises:
ValueError: If attempting to read from a specific partition or snapshot.
Note:
This method requires the `google-cloud-bigquery-storage` library
to be installed.
"""

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage

if "$" in table.table_id:
Expand All @@ -842,18 +879,20 @@ def _download_table_bqstorage(
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

requested_streams = 1 if preserve_order else 0
requested_streams = determine_requested_streams(preserve_order, max_stream_count)

requested_session = bigquery_storage.types.ReadSession(
table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW
requested_session = bigquery_storage.types.stream.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage.types.stream.DataFormat.ARROW,
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
# CompressionCodec(1) -> LZ4_FRAME
ArrowSerializationOptions.CompressionCodec(1)
)

session = bqstorage_client.create_read_session(
Expand Down Expand Up @@ -889,7 +928,7 @@ def _download_table_bqstorage(
elif max_queue_size is None:
max_queue_size = 0 # unbounded

worker_queue = queue.Queue(maxsize=max_queue_size)
worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)

with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
Expand All @@ -915,7 +954,7 @@ def _download_table_bqstorage(
# we want to block on the queue's get method, instead. This
# prevents the queue from filling up, because the main thread
# has smaller gaps in time between calls to the queue's get
# method. For a detailed explaination, see:
# method. For a detailed explanation, see:
# https://friendliness.dev/2019/06/18/python-nowait/
done, not_done = _nowait(not_done)
for future in done:
Expand Down Expand Up @@ -954,6 +993,7 @@ def download_arrow_bqstorage(
preserve_order=False,
selected_fields=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
max_stream_count=None,
):
return _download_table_bqstorage(
project_id,
Expand All @@ -963,6 +1003,7 @@ def download_arrow_bqstorage(
selected_fields=selected_fields,
page_to_item=_bqstorage_page_to_arrow,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)


Expand All @@ -975,6 +1016,7 @@ def download_dataframe_bqstorage(
preserve_order=False,
selected_fields=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
max_stream_count=None,
):
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
return _download_table_bqstorage(
Expand All @@ -985,6 +1027,7 @@ def download_dataframe_bqstorage(
selected_fields=selected_fields,
page_to_item=page_to_item,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)


Expand Down Expand Up @@ -1029,3 +1072,40 @@ def verify_pandas_imports():
raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
if db_dtypes is None:
raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception


def determine_requested_streams(
preserve_order: bool,
max_stream_count: Union[int, None],
) -> int:
"""Determines the value of requested_streams based on the values of
`preserve_order` and `max_stream_count`.
Args:
preserve_order (bool): Whether to preserve the order of streams. If True,
this limits the number of streams to one. `preserve_order` takes
precedence over `max_stream_count`.
max_stream_count (Union[int, None]]): The maximum number of streams
allowed. Must be a non-negative number or None, where None indicates
the value is unset. NOTE: if `preserve_order` is also set, it takes
precedence over `max_stream_count`, thus to ensure that `max_stream_count`
is used, ensure that `preserve_order` is None.
Returns:
(int) The appropriate value for requested_streams.
"""

if preserve_order:
# If preserve order is set, it takes precendence.
# Limit the requested streams to 1, to ensure that order
# is preserved)
return 1

elif max_stream_count is not None:
# If preserve_order is not set, only then do we consider max_stream_count
if max_stream_count <= -1:
raise ValueError("max_stream_count must be non-negative OR None")
return max_stream_count

# Default to zero requested streams (unbounded).
return 0
2 changes: 1 addition & 1 deletion samples/desktopapp/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
google-cloud-bigquery==3.25.0
google-cloud-bigquery==3.26.0
google-auth-oauthlib==1.2.1
12 changes: 6 additions & 6 deletions samples/geography/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ cligj==0.7.2
dataclasses==0.8; python_version < '3.7'
db-dtypes==1.3.0
Fiona===1.9.6; python_version == '3.7'
Fiona==1.10.0; python_version >= '3.8'
Fiona==1.10.1; python_version >= '3.8'
geojson==3.1.0
geopandas===0.10.2; python_version == '3.7'
geopandas===0.13.2; python_version == '3.8'
geopandas==1.0.1; python_version >= '3.9'
google-api-core==2.19.2
google-auth==2.34.0
google-cloud-bigquery==3.25.0
google-api-core==2.20.0
google-auth==2.35.0
google-cloud-bigquery==3.26.0
google-cloud-bigquery-storage==2.26.0
google-cloud-core==2.4.1
google-crc32c===1.5.0; python_version < '3.9'
Expand All @@ -32,7 +32,7 @@ packaging===24.0; python_version == '3.7'
packaging==24.1; python_version >= '3.8'
pandas===1.3.5; python_version == '3.7'
pandas===2.0.3; python_version == '3.8'
pandas==2.2.2; python_version >= '3.9'
pandas==2.2.3; python_version >= '3.9'
proto-plus==1.24.0
pyarrow==12.0.1; python_version == '3.7'
pyarrow==17.0.0; python_version >= '3.8'
Expand All @@ -56,4 +56,4 @@ typing-extensions===4.7.1; python_version == '3.7'
typing-extensions==4.12.2; python_version >= '3.8'
typing-inspect==0.9.0
urllib3===1.26.18; python_version == '3.7'
urllib3==2.2.2; python_version >= '3.8'
urllib3==2.2.3; python_version >= '3.8'
4 changes: 2 additions & 2 deletions samples/magics/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
bigquery_magics==0.3.0
bigquery_magics==0.4.0
db-dtypes==1.3.0
google.cloud.bigquery==3.25.0
google.cloud.bigquery==3.26.0
google-cloud-bigquery-storage==2.26.0
ipython===7.31.1; python_version == '3.7'
ipython===8.0.1; python_version == '3.8'
Expand Down
4 changes: 2 additions & 2 deletions samples/notebooks/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
bigquery-magics==0.3.0
bigquery-magics==0.4.0
db-dtypes==1.3.0
google-cloud-bigquery==3.25.0
google-cloud-bigquery==3.26.0
google-cloud-bigquery-storage==2.26.0
ipython===7.31.1; python_version == '3.7'
ipython===8.0.1; python_version == '3.8'
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# samples/snippets should be runnable with no "extras"
google-cloud-bigquery==3.25.0
google-cloud-bigquery==3.26.0
31 changes: 31 additions & 0 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import functools
import operator
import queue
from typing import Union
from unittest import mock
import warnings

Expand Down Expand Up @@ -46,6 +47,7 @@
from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema
from google.cloud.bigquery._pandas_helpers import determine_requested_streams

pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()

Expand Down Expand Up @@ -2053,3 +2055,32 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "db_dtypes", None)
with pytest.raises(ValueError, match="Please install the 'db-dtypes' package"):
module_under_test.verify_pandas_imports()


@pytest.mark.parametrize(
"preserve_order, max_stream_count, expected_requested_streams",
[
# If preserve_order is set/True, it takes precedence:
(True, 10, 1), # use 1
(True, None, 1), # use 1
# If preserve_order is not set check max_stream_count:
(False, 10, 10), # max_stream_count (X) takes precedence
(False, None, 0), # Unbounded (0) when both are unset
],
)
def test_determine_requested_streams(
preserve_order: bool,
max_stream_count: Union[int, None],
expected_requested_streams: int,
):
"""Tests various combinations of preserve_order and max_stream_count."""
actual_requested_streams = determine_requested_streams(
preserve_order, max_stream_count
)
assert actual_requested_streams == expected_requested_streams


def test_determine_requested_streams_invalid_max_stream_count():
"""Tests that a ValueError is raised if max_stream_count is negative."""
with pytest.raises(ValueError):
determine_requested_streams(preserve_order=False, max_stream_count=-1)

0 comments on commit 256b0f7

Please sign in to comment.