Skip to content

Commit ba9b2f8

Browse files
plamuttseaver
andauthored
feat: add BigQuery storage client support to DB API (#36)
* feat: add BigQueryStorageClient support to DB API * Use BigQuery Storage client in Cursor if available * Skip BQ storage unit tests in Python 3.8 * Add system tests for Cursor w/ BQ storage client * Add test for Connection ctor w/o BQ storage client * Refactor exception handling in Cursor._try_fetch() * Add explicit check against None Co-Authored-By: Tres Seaver <[email protected]> * Remove redundand word in a comment in cursor.py Co-authored-by: Tres Seaver <[email protected]>
1 parent 645f0fd commit ba9b2f8

File tree

8 files changed

+487
-10
lines changed

8 files changed

+487
-10
lines changed

google/cloud/bigquery/dbapi/_helpers.py

+20
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import six
2525

2626
from google.cloud import bigquery
27+
from google.cloud.bigquery import table
2728
from google.cloud.bigquery.dbapi import exceptions
2829

2930

@@ -218,3 +219,22 @@ def array_like(value):
218219
return isinstance(value, collections_abc.Sequence) and not isinstance(
219220
value, (six.text_type, six.binary_type, bytearray)
220221
)
222+
223+
224+
def to_bq_table_rows(rows_iterable):
225+
"""Convert table rows to BigQuery table Row instances.
226+
227+
Args:
228+
rows_iterable (Iterable[Mapping]):
229+
An iterable of row data items to convert to ``Row`` instances.
230+
231+
Returns:
232+
Iterable[google.cloud.bigquery.table.Row]
233+
"""
234+
235+
def to_table_row(row):
236+
values = tuple(row.values())
237+
keys_to_index = {key: i for i, key in enumerate(row.keys())}
238+
return table.Row(values, keys_to_index)
239+
240+
return (to_table_row(row_data) for row_data in rows_iterable)

google/cloud/bigquery/dbapi/connection.py

+33-6
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,24 @@ class Connection(object):
2323
2424
Args:
2525
client (google.cloud.bigquery.Client): A client used to connect to BigQuery.
26+
bqstorage_client(\
27+
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
28+
):
29+
[Beta] An alternative client that uses the faster BigQuery Storage
30+
API to fetch rows from BigQuery. If both clients are given,
31+
``bqstorage_client`` is used first to fetch query results,
32+
with a fallback on ``client``, if necessary.
33+
34+
.. note::
35+
There is a known issue with the BigQuery Storage API with small
36+
anonymous result sets, which results in such fallback.
37+
38+
https://github.com/googleapis/python-bigquery-storage/issues/2
2639
"""
2740

28-
def __init__(self, client):
41+
def __init__(self, client, bqstorage_client=None):
2942
self._client = client
43+
self._bqstorage_client = bqstorage_client
3044

3145
def close(self):
3246
"""No-op."""
@@ -43,17 +57,30 @@ def cursor(self):
4357
return cursor.Cursor(self)
4458

4559

46-
def connect(client=None):
60+
def connect(client=None, bqstorage_client=None):
4761
"""Construct a DB-API connection to Google BigQuery.
4862
4963
Args:
50-
client (google.cloud.bigquery.Client):
51-
(Optional) A client used to connect to BigQuery. If not passed, a
52-
client is created using default options inferred from the environment.
64+
client (Optional[google.cloud.bigquery.Client]):
65+
A client used to connect to BigQuery. If not passed, a client is
66+
created using default options inferred from the environment.
67+
bqstorage_client(\
68+
Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \
69+
):
70+
[Beta] An alternative client that uses the faster BigQuery Storage
71+
API to fetch rows from BigQuery. If both clients are given,
72+
``bqstorage_client`` is used first to fetch query results,
73+
with a fallback on ``client``, if necessary.
74+
75+
.. note::
76+
There is a known issue with the BigQuery Storage API with small
77+
anonymous result sets, which results in such fallback.
78+
79+
https://github.com/googleapis/python-bigquery-storage/issues/2
5380
5481
Returns:
5582
google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery.
5683
"""
5784
if client is None:
5885
client = bigquery.Client()
59-
return Connection(client)
86+
return Connection(client, bqstorage_client)

google/cloud/bigquery/dbapi/cursor.py

+68
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,18 @@
2121
except ImportError: # Python 2.7
2222
import collections as collections_abc
2323

24+
import logging
25+
2426
import six
2527

2628
from google.cloud.bigquery import job
2729
from google.cloud.bigquery.dbapi import _helpers
2830
from google.cloud.bigquery.dbapi import exceptions
2931
import google.cloud.exceptions
3032

33+
34+
_LOGGER = logging.getLogger(__name__)
35+
3136
# Per PEP 249: A 7-item sequence containing information describing one result
3237
# column. The first two items (name and type_code) are mandatory, the other
3338
# five are optional and are set to None if no meaningful values can be
@@ -212,13 +217,76 @@ def _try_fetch(self, size=None):
212217

213218
if self._query_data is None:
214219
client = self.connection._client
220+
bqstorage_client = self.connection._bqstorage_client
221+
222+
if bqstorage_client is not None:
223+
try:
224+
rows_iterable = self._bqstorage_fetch(bqstorage_client)
225+
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
226+
return
227+
except google.api_core.exceptions.GoogleAPICallError as exc:
228+
# NOTE: Forbidden is a subclass of GoogleAPICallError
229+
if isinstance(exc, google.api_core.exceptions.Forbidden):
230+
# Don't hide errors such as insufficient permissions to create
231+
# a read session, or the API is not enabled. Both of those are
232+
# clearly problems if the developer has explicitly asked for
233+
# BigQuery Storage API support.
234+
raise
235+
236+
# There is an issue with reading from small anonymous
237+
# query results tables. If such an error occurs, we silence
238+
# it in order to try again with the tabledata.list API.
239+
_LOGGER.debug(
240+
"Error fetching data with BigQuery Storage API, "
241+
"falling back to tabledata.list API."
242+
)
243+
215244
rows_iter = client.list_rows(
216245
self._query_job.destination,
217246
selected_fields=self._query_job._query_results.schema,
218247
page_size=self.arraysize,
219248
)
220249
self._query_data = iter(rows_iter)
221250

251+
def _bqstorage_fetch(self, bqstorage_client):
252+
"""Start fetching data with the BigQuery Storage API.
253+
254+
The method assumes that the data about the relevant query job already
255+
exists internally.
256+
257+
Args:
258+
bqstorage_client(\
259+
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
260+
):
261+
A client tha know how to talk to the BigQuery Storage API.
262+
263+
Returns:
264+
Iterable[Mapping]:
265+
A sequence of rows, represented as dictionaries.
266+
"""
267+
# NOTE: Given that BQ storage client instance is passed in, it means
268+
# that bigquery_storage_v1beta1 library is available (no ImportError).
269+
from google.cloud import bigquery_storage_v1beta1
270+
271+
table_reference = self._query_job.destination
272+
273+
read_session = bqstorage_client.create_read_session(
274+
table_reference.to_bqstorage(),
275+
"projects/{}".format(table_reference.project),
276+
# a single stream only, as DB API is not well-suited for multithreading
277+
requested_streams=1,
278+
)
279+
280+
if not read_session.streams:
281+
return iter([]) # empty table, nothing to read
282+
283+
read_position = bigquery_storage_v1beta1.types.StreamPosition(
284+
stream=read_session.streams[0],
285+
)
286+
read_rows_stream = bqstorage_client.read_rows(read_position)
287+
rows_iterable = read_rows_stream.rows(read_session)
288+
return rows_iterable
289+
222290
def fetchone(self):
223291
"""Fetch a single row from the results of the last ``execute*()`` call.
224292

noxfile.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def default(session):
4848
# Since many tests are skipped due to missing dependencies, test
4949
# coverage is much lower in Python 3.8. Remove once we can test with
5050
# pyarrow.
51-
coverage_fail_under = "--cov-fail-under=92"
51+
coverage_fail_under = "--cov-fail-under=91"
5252
dev_install = ".[pandas,tqdm]"
5353

5454
session.install("-e", dev_install)
@@ -70,7 +70,7 @@ def default(session):
7070
"--cov-report=",
7171
coverage_fail_under,
7272
os.path.join("tests", "unit"),
73-
*session.posargs
73+
*session.posargs,
7474
)
7575

7676

@@ -94,6 +94,7 @@ def system(session):
9494
# Install all test dependencies, then install local packages in place.
9595
session.install("mock", "pytest", "psutil")
9696
session.install("google-cloud-storage")
97+
session.install("fastavro")
9798
session.install("-e", "test_utils")
9899
session.install("-e", ".[all]")
99100

tests/system.py

+100
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@
3636
from google.cloud import bigquery_storage_v1beta1
3737
except ImportError: # pragma: NO COVER
3838
bigquery_storage_v1beta1 = None
39+
40+
try:
41+
import fastavro # to parse BQ storage client results
42+
except ImportError: # pragma: NO COVER
43+
fastavro = None
44+
3945
try:
4046
import pandas
4147
except ImportError: # pragma: NO COVER
@@ -1543,6 +1549,100 @@ def test_dbapi_fetchall(self):
15431549
row_tuples = [r.values() for r in rows]
15441550
self.assertEqual(row_tuples, [(1, 2), (3, 4), (5, 6)])
15451551

1552+
@unittest.skipIf(
1553+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1554+
)
1555+
def test_dbapi_fetch_w_bqstorage_client_small_result_set(self):
1556+
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
1557+
credentials=Config.CLIENT._credentials
1558+
)
1559+
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()
1560+
1561+
# Reading small result sets causes an issue with BQ storage client,
1562+
# and the DB API should transparently fall back to the default client.
1563+
cursor.execute(
1564+
"""
1565+
SELECT id, `by`, time_ts
1566+
FROM `bigquery-public-data.hacker_news.comments`
1567+
ORDER BY `id` ASC
1568+
LIMIT 10
1569+
"""
1570+
)
1571+
1572+
result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]
1573+
1574+
field_name = operator.itemgetter(0)
1575+
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]
1576+
1577+
expected_data = [
1578+
[
1579+
("by", "sama"),
1580+
("id", 15),
1581+
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
1582+
],
1583+
[
1584+
("by", "pg"),
1585+
("id", 17),
1586+
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
1587+
],
1588+
[
1589+
("by", "pg"),
1590+
("id", 22),
1591+
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
1592+
],
1593+
]
1594+
self.assertEqual(fetched_data, expected_data)
1595+
1596+
@unittest.skipIf(
1597+
bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`"
1598+
)
1599+
@unittest.skipIf(fastavro is None, "Requires `fastavro`")
1600+
def test_dbapi_fetch_w_bqstorage_client_large_result_set(self):
1601+
bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient(
1602+
credentials=Config.CLIENT._credentials
1603+
)
1604+
cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor()
1605+
1606+
# Pick a large enouhg LIMIT value to assure that the fallback to the
1607+
# default client is not needed due to the result set being too small
1608+
# (a known issue that causes problems when reding such result sets with
1609+
# BQ storage client).
1610+
cursor.execute(
1611+
"""
1612+
SELECT id, `by`, time_ts
1613+
FROM `bigquery-public-data.hacker_news.comments`
1614+
ORDER BY `id` ASC
1615+
LIMIT 100000
1616+
"""
1617+
)
1618+
1619+
result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()]
1620+
1621+
field_name = operator.itemgetter(0)
1622+
fetched_data = [sorted(row.items(), key=field_name) for row in result_rows]
1623+
1624+
# Since DB API is not thread safe, only a single result stream should be
1625+
# requested by the BQ storage client, meaning that results should arrive
1626+
# in the sorted order.
1627+
expected_data = [
1628+
[
1629+
("by", "sama"),
1630+
("id", 15),
1631+
("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)),
1632+
],
1633+
[
1634+
("by", "pg"),
1635+
("id", 17),
1636+
("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)),
1637+
],
1638+
[
1639+
("by", "pg"),
1640+
("id", 22),
1641+
("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)),
1642+
],
1643+
]
1644+
self.assertEqual(fetched_data, expected_data)
1645+
15461646
def _load_table_for_dml(self, rows, dataset_id, table_id):
15471647
from google.cloud._testing import _NamedTemporaryFile
15481648
from google.cloud.bigquery.job import CreateDisposition

tests/unit/test_dbapi__helpers.py

+34
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
import datetime
1616
import decimal
1717
import math
18+
import operator as op
1819
import unittest
1920

2021
import google.cloud._helpers
22+
from google.cloud.bigquery import table
2123
from google.cloud.bigquery.dbapi import _helpers
2224
from google.cloud.bigquery.dbapi import exceptions
2325

@@ -185,3 +187,35 @@ def test_to_query_parameters_w_list_dict_param(self):
185187
def test_to_query_parameters_none_argument(self):
186188
query_parameters = _helpers.to_query_parameters(None)
187189
self.assertEqual(query_parameters, [])
190+
191+
192+
class TestToBqTableRows(unittest.TestCase):
193+
def test_empty_iterable(self):
194+
rows_iterable = iter([])
195+
result = _helpers.to_bq_table_rows(rows_iterable)
196+
self.assertEqual(list(result), [])
197+
198+
def test_non_empty_iterable(self):
199+
rows_iterable = [
200+
dict(one=1.1, four=1.4, two=1.2, three=1.3),
201+
dict(one=2.1, four=2.4, two=2.2, three=2.3),
202+
]
203+
204+
result = _helpers.to_bq_table_rows(rows_iterable)
205+
206+
rows = list(result)
207+
self.assertEqual(len(rows), 2)
208+
209+
row_1, row_2 = rows
210+
self.assertIsInstance(row_1, table.Row)
211+
self.assertIsInstance(row_2, table.Row)
212+
213+
field_value = op.itemgetter(1)
214+
215+
items = sorted(row_1.items(), key=field_value)
216+
expected_items = [("one", 1.1), ("two", 1.2), ("three", 1.3), ("four", 1.4)]
217+
self.assertEqual(items, expected_items)
218+
219+
items = sorted(row_2.items(), key=field_value)
220+
expected_items = [("one", 2.1), ("two", 2.2), ("three", 2.3), ("four", 2.4)]
221+
self.assertEqual(items, expected_items)

0 commit comments

Comments
 (0)