Skip to content

Commit 93a8cb8

Browse files
committed
feat: Second batch of AsyncIO integration
* Polling future * Page iterator * With unit tests
1 parent d2e6ec6 commit 93a8cb8

9 files changed

+1580
-0
lines changed
+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Copyright 2020, Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""AsyncIO implementation of the abstract base Future class."""
16+
17+
import asyncio
18+
19+
from google.api_core import exceptions
20+
from google.api_core import retry
21+
from google.api_core import retry_async
22+
from google.api_core.future import base
23+
24+
25+
class _OperationNotComplete(Exception):
26+
"""Private exception used for polling via retry."""
27+
pass
28+
29+
30+
RETRY_PREDICATE = retry.if_exception_type(
31+
_OperationNotComplete,
32+
exceptions.TooManyRequests,
33+
exceptions.InternalServerError,
34+
exceptions.BadGateway,
35+
)
36+
DEFAULT_RETRY = retry_async.AsyncRetry(predicate=RETRY_PREDICATE)
37+
38+
39+
class AsyncFuture(base.Future):
40+
"""A Future that polls peer service to self-update.
41+
42+
The :meth:`done` method should be implemented by subclasses. The polling
43+
behavior will repeatedly call ``done`` until it returns True.
44+
45+
.. note: Privacy here is intended to prevent the final class from
46+
overexposing, not to prevent subclasses from accessing methods.
47+
48+
Args:
49+
retry (google.api_core.retry.Retry): The retry configuration used
50+
when polling. This can be used to control how often :meth:`done`
51+
is polled. Regardless of the retry's ``deadline``, it will be
52+
overridden by the ``timeout`` argument to :meth:`result`.
53+
"""
54+
55+
def __init__(self, retry=DEFAULT_RETRY):
56+
super().__init__()
57+
self._retry = retry
58+
self._future = asyncio.get_event_loop().create_future()
59+
self._background_task = None
60+
61+
async def done(self, retry=DEFAULT_RETRY):
62+
"""Checks to see if the operation is complete.
63+
64+
Args:
65+
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
66+
67+
Returns:
68+
bool: True if the operation is complete, False otherwise.
69+
"""
70+
# pylint: disable=redundant-returns-doc, missing-raises-doc
71+
raise NotImplementedError()
72+
73+
async def _done_or_raise(self):
74+
"""Check if the future is done and raise if it's not."""
75+
result = await self.done()
76+
if not result:
77+
raise _OperationNotComplete()
78+
79+
async def running(self):
80+
"""True if the operation is currently running."""
81+
result = await self.done()
82+
return not result
83+
84+
async def _blocking_poll(self, timeout=None):
85+
"""Poll and await for the Future to be resolved.
86+
87+
Args:
88+
timeout (int):
89+
How long (in seconds) to wait for the operation to complete.
90+
If None, wait indefinitely.
91+
"""
92+
if self._future.done():
93+
return
94+
95+
retry_ = self._retry.with_deadline(timeout)
96+
97+
try:
98+
await retry_(self._done_or_raise)()
99+
except exceptions.RetryError:
100+
raise asyncio.TimeoutError(
101+
"Operation did not complete within the designated " "timeout."
102+
)
103+
104+
async def result(self, timeout=None):
105+
"""Get the result of the operation.
106+
107+
Args:
108+
timeout (int):
109+
How long (in seconds) to wait for the operation to complete.
110+
If None, wait indefinitely.
111+
112+
Returns:
113+
google.protobuf.Message: The Operation's result.
114+
115+
Raises:
116+
google.api_core.GoogleAPICallError: If the operation errors or if
117+
the timeout is reached before the operation completes.
118+
"""
119+
await self._blocking_poll(timeout=timeout)
120+
return self._future.result()
121+
122+
async def exception(self, timeout=None):
123+
"""Get the exception from the operation.
124+
125+
Args:
126+
timeout (int): How long to wait for the operation to complete.
127+
If None, wait indefinitely.
128+
129+
Returns:
130+
Optional[google.api_core.GoogleAPICallError]: The operation's
131+
error.
132+
"""
133+
await self._blocking_poll(timeout=timeout)
134+
return self._future.exception()
135+
136+
def add_done_callback(self, fn):
137+
"""Add a callback to be executed when the operation is complete.
138+
139+
If the operation is completed, the callback will be scheduled onto the
140+
event loop. Otherwise, the callback will be stored and invoked when the
141+
future is done.
142+
143+
Args:
144+
fn (Callable[Future]): The callback to execute when the operation
145+
is complete.
146+
"""
147+
if self._background_task is None:
148+
self._background_task = asyncio.get_event_loop().create_task(self._blocking_poll())
149+
self._future.add_done_callback(fn)
150+
151+
def set_result(self, result):
152+
"""Set the Future's result."""
153+
self._future.set_result(result)
154+
155+
def set_exception(self, exception):
156+
"""Set the Future's exception."""
157+
self._future.set_exception(exception)

google/api_core/gapic_v1/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@
2323

2424
if sys.version_info >= (3, 6):
2525
from google.api_core.gapic_v1 import config_async # noqa: F401
26+
from google.api_core.gapic_v1 import method_async # noqa: F401
2627
__all__.append("config_async")
28+
__all__.append("method_async")

google/api_core/operation_async.py

+217
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
# Copyright 2016 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Futures for long-running operations returned from Google Cloud APIs.
16+
17+
These futures can be used to synchronously wait for the result of a
18+
long-running operation using :meth:`Operation.result`:
19+
20+
21+
.. code-block:: python
22+
23+
operation = my_api_client.long_running_method()
24+
result = operation.result()
25+
26+
Or asynchronously using callbacks and :meth:`Operation.add_done_callback`:
27+
28+
.. code-block:: python
29+
30+
operation = my_api_client.long_running_method()
31+
32+
def my_callback(future):
33+
result = future.result()
34+
35+
operation.add_done_callback(my_callback)
36+
37+
"""
38+
39+
import functools
40+
import threading
41+
42+
from google.api_core import exceptions
43+
from google.api_core import protobuf_helpers
44+
from google.api_core.future import async_future
45+
from google.longrunning import operations_pb2
46+
from google.rpc import code_pb2
47+
48+
49+
class AsyncOperation(async_future.AsyncFuture):
50+
"""A Future for interacting with a Google API Long-Running Operation.
51+
52+
Args:
53+
operation (google.longrunning.operations_pb2.Operation): The
54+
initial operation.
55+
refresh (Callable[[], ~.api_core.operation.Operation]): A callable that
56+
returns the latest state of the operation.
57+
cancel (Callable[[], None]): A callable that tries to cancel
58+
the operation.
59+
result_type (func:`type`): The protobuf type for the operation's
60+
result.
61+
metadata_type (func:`type`): The protobuf type for the operation's
62+
metadata.
63+
retry (google.api_core.retry.Retry): The retry configuration used
64+
when polling. This can be used to control how often :meth:`done`
65+
is polled. Regardless of the retry's ``deadline``, it will be
66+
overridden by the ``timeout`` argument to :meth:`result`.
67+
"""
68+
69+
def __init__(
70+
self,
71+
operation,
72+
refresh,
73+
cancel,
74+
result_type,
75+
metadata_type=None,
76+
retry=async_future.DEFAULT_RETRY,
77+
):
78+
super().__init__(retry=retry)
79+
self._operation = operation
80+
self._refresh = refresh
81+
self._cancel = cancel
82+
self._result_type = result_type
83+
self._metadata_type = metadata_type
84+
self._completion_lock = threading.Lock()
85+
# Invoke this in case the operation came back already complete.
86+
self._set_result_from_operation()
87+
88+
@property
89+
def operation(self):
90+
"""google.longrunning.Operation: The current long-running operation."""
91+
return self._operation
92+
93+
@property
94+
def metadata(self):
95+
"""google.protobuf.Message: the current operation metadata."""
96+
if not self._operation.HasField("metadata"):
97+
return None
98+
99+
return protobuf_helpers.from_any_pb(
100+
self._metadata_type, self._operation.metadata
101+
)
102+
103+
@classmethod
104+
def deserialize(cls, payload):
105+
"""Deserialize a ``google.longrunning.Operation`` protocol buffer.
106+
107+
Args:
108+
payload (bytes): A serialized operation protocol buffer.
109+
110+
Returns:
111+
~.operations_pb2.Operation: An Operation protobuf object.
112+
"""
113+
return operations_pb2.Operation.FromString(payload)
114+
115+
def _set_result_from_operation(self):
116+
"""Set the result or exception from the operation if it is complete."""
117+
# This must be done in a lock to prevent the async_future thread
118+
# and main thread from both executing the completion logic
119+
# at the same time.
120+
with self._completion_lock:
121+
# If the operation isn't complete or if the result has already been
122+
# set, do not call set_result/set_exception again.
123+
# Note: self._result_set is set to True in set_result and
124+
# set_exception, in case those methods are invoked directly.
125+
if not self._operation.done or self._future.done():
126+
return
127+
128+
if self._operation.HasField("response"):
129+
response = protobuf_helpers.from_any_pb(
130+
self._result_type, self._operation.response
131+
)
132+
self.set_result(response)
133+
elif self._operation.HasField("error"):
134+
exception = exceptions.GoogleAPICallError(
135+
self._operation.error.message,
136+
errors=(self._operation.error,),
137+
response=self._operation,
138+
)
139+
self.set_exception(exception)
140+
else:
141+
exception = exceptions.GoogleAPICallError(
142+
"Unexpected state: Long-running operation had neither "
143+
"response nor error set."
144+
)
145+
self.set_exception(exception)
146+
147+
async def _refresh_and_update(self, retry=async_future.DEFAULT_RETRY):
148+
"""Refresh the operation and update the result if needed.
149+
150+
Args:
151+
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
152+
"""
153+
# If the currently cached operation is done, no need to make another
154+
# RPC as it will not change once done.
155+
if not self._operation.done:
156+
self._operation = await self._refresh(retry=retry)
157+
self._set_result_from_operation()
158+
159+
async def done(self, retry=async_future.DEFAULT_RETRY):
160+
"""Checks to see if the operation is complete.
161+
162+
Args:
163+
retry (google.api_core.retry.Retry): (Optional) How to retry the RPC.
164+
165+
Returns:
166+
bool: True if the operation is complete, False otherwise.
167+
"""
168+
await self._refresh_and_update(retry)
169+
return self._operation.done
170+
171+
async def cancel(self):
172+
"""Attempt to cancel the operation.
173+
174+
Returns:
175+
bool: True if the cancel RPC was made, False if the operation is
176+
already complete.
177+
"""
178+
result = await self.done()
179+
if result:
180+
return False
181+
else:
182+
await self._cancel()
183+
return True
184+
185+
async def cancelled(self):
186+
"""True if the operation was cancelled."""
187+
await self._refresh_and_update()
188+
return (
189+
self._operation.HasField("error")
190+
and self._operation.error.code == code_pb2.CANCELLED
191+
)
192+
193+
194+
def from_gapic(operation, operations_client, result_type, **kwargs):
195+
"""Create an operation future from a gapic client.
196+
197+
This interacts with the long-running operations `service`_ (specific
198+
to a given API) via a gapic client.
199+
200+
.. _service: https://github.com/googleapis/googleapis/blob/\
201+
050400df0fdb16f63b63e9dee53819044bffc857/\
202+
google/longrunning/operations.proto#L38
203+
204+
Args:
205+
operation (google.longrunning.operations_pb2.Operation): The operation.
206+
operations_client (google.api_core.operations_v1.OperationsClient):
207+
The operations client.
208+
result_type (:func:`type`): The protobuf result type.
209+
kwargs: Keyword args passed into the :class:`Operation` constructor.
210+
211+
Returns:
212+
~.api_core.operation.Operation: The operation future to track the given
213+
operation.
214+
"""
215+
refresh = functools.partial(operations_client.get_operation, operation.name)
216+
cancel = functools.partial(operations_client.cancel_operation, operation.name)
217+
return AsyncOperation(operation, refresh, cancel, result_type, **kwargs)

0 commit comments

Comments
 (0)