Skip to content

Commit 956c9b6

Browse files
committed
feat: Second batch of AsyncIO integration
* Polling future * Page iterator * With unit tests & docs
1 parent d2e6ec6 commit 956c9b6

10 files changed

+1352
-1
lines changed

docs/futures.rst

+5-1
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ Futures
77

88
.. automodule:: google.api_core.future.polling
99
:members:
10-
:show-inheritance:
10+
:show-inheritance:
11+
12+
.. automodule:: google.api_core.future.async_future
13+
:members:
14+
:show-inheritance:

docs/operation.rst

+7
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,10 @@ Long-Running Operations
44
.. automodule:: google.api_core.operation
55
:members:
66
:show-inheritance:
7+
8+
Long-Running Operations in AsyncIO
9+
--------------
10+
11+
.. automodule:: google.api_core.operation_async
12+
:members:
13+
:show-inheritance:

docs/page_iterator.rst

+7
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,10 @@ Page Iterators
44
.. automodule:: google.api_core.page_iterator
55
:members:
66
:show-inheritance:
7+
8+
Page Iterators in AsyncIO
9+
-------------------------
10+
11+
.. automodule:: google.api_core.page_iterator_async
12+
:members:
13+
:show-inheritance:
+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Copyright 2020, Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""AsyncIO implementation of the abstract base Future class."""
16+
17+
import asyncio
18+
19+
from google.api_core import exceptions
20+
from google.api_core import retry
21+
from google.api_core import retry_async
22+
from google.api_core.future import base
23+
24+
25+
class _OperationNotComplete(Exception):
26+
"""Private exception used for polling via retry."""
27+
pass
28+
29+
30+
RETRY_PREDICATE = retry.if_exception_type(
31+
_OperationNotComplete,
32+
exceptions.TooManyRequests,
33+
exceptions.InternalServerError,
34+
exceptions.BadGateway,
35+
)
36+
DEFAULT_RETRY = retry_async.AsyncRetry(predicate=RETRY_PREDICATE)
37+
38+
39+
class AsyncFuture(base.Future):
40+
"""A Future that polls peer service to self-update.
41+
42+
The :meth:`done` method should be implemented by subclasses. The polling
43+
behavior will repeatedly call ``done`` until it returns True.
44+
45+
.. note: Privacy here is intended to prevent the final class from
46+
overexposing, not to prevent subclasses from accessing methods.
47+
48+
Args:
49+
retry (google.api_core.retry.Retry): The retry configuration used
50+
when polling. This can be used to control how often :meth:`done`
51+
is polled. Regardless of the retry's ``deadline``, it will be
52+
overridden by the ``timeout`` argument to :meth:`result`.
53+
"""
54+
55+
def __init__(self, retry=DEFAULT_RETRY):
56+
super().__init__()
57+
self._retry = retry
58+
self._future = asyncio.get_event_loop().create_future()
59+
self._background_task = None
60+
61+
async def done(self, retry=DEFAULT_RETRY):
62+
"""Checks to see if the operation is complete.
63+
64+
Args:
65+
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
66+
67+
Returns:
68+
bool: True if the operation is complete, False otherwise.
69+
"""
70+
# pylint: disable=redundant-returns-doc, missing-raises-doc
71+
raise NotImplementedError()
72+
73+
async def _done_or_raise(self):
74+
"""Check if the future is done and raise if it's not."""
75+
result = await self.done()
76+
if not result:
77+
raise _OperationNotComplete()
78+
79+
async def running(self):
80+
"""True if the operation is currently running."""
81+
result = await self.done()
82+
return not result
83+
84+
async def _blocking_poll(self, timeout=None):
85+
"""Poll and await for the Future to be resolved.
86+
87+
Args:
88+
timeout (int):
89+
How long (in seconds) to wait for the operation to complete.
90+
If None, wait indefinitely.
91+
"""
92+
if self._future.done():
93+
return
94+
95+
retry_ = self._retry.with_deadline(timeout)
96+
97+
try:
98+
await retry_(self._done_or_raise)()
99+
except exceptions.RetryError:
100+
raise asyncio.TimeoutError(
101+
"Operation did not complete within the designated " "timeout."
102+
)
103+
104+
async def result(self, timeout=None):
105+
"""Get the result of the operation.
106+
107+
Args:
108+
timeout (int):
109+
How long (in seconds) to wait for the operation to complete.
110+
If None, wait indefinitely.
111+
112+
Returns:
113+
google.protobuf.Message: The Operation's result.
114+
115+
Raises:
116+
google.api_core.GoogleAPICallError: If the operation errors or if
117+
the timeout is reached before the operation completes.
118+
"""
119+
await self._blocking_poll(timeout=timeout)
120+
return self._future.result()
121+
122+
async def exception(self, timeout=None):
123+
"""Get the exception from the operation.
124+
125+
Args:
126+
timeout (int): How long to wait for the operation to complete.
127+
If None, wait indefinitely.
128+
129+
Returns:
130+
Optional[google.api_core.GoogleAPICallError]: The operation's
131+
error.
132+
"""
133+
await self._blocking_poll(timeout=timeout)
134+
return self._future.exception()
135+
136+
def add_done_callback(self, fn):
137+
"""Add a callback to be executed when the operation is complete.
138+
139+
If the operation is completed, the callback will be scheduled onto the
140+
event loop. Otherwise, the callback will be stored and invoked when the
141+
future is done.
142+
143+
Args:
144+
fn (Callable[Future]): The callback to execute when the operation
145+
is complete.
146+
"""
147+
if self._background_task is None:
148+
self._background_task = asyncio.get_event_loop().create_task(self._blocking_poll())
149+
self._future.add_done_callback(fn)
150+
151+
def set_result(self, result):
152+
"""Set the Future's result."""
153+
self._future.set_result(result)
154+
155+
def set_exception(self, exception):
156+
"""Set the Future's exception."""
157+
self._future.set_exception(exception)

0 commit comments

Comments
 (0)