Skip to content

Commit 2b103b6

Browse files
authored
fix: consume part of StreamingResponseIterator to support failure while under a retry context (#10206)
1 parent 14f1f34 commit 2b103b6

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

google/api_core/grpc_helpers.py

+18
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,19 @@ class _StreamingResponseIterator(grpc.Call):
6565
def __init__(self, wrapped):
6666
self._wrapped = wrapped
6767

68+
# This iterator is used in a retry context, and returned outside after init.
69+
# gRPC will not throw an exception until the stream is consumed, so we need
70+
# to retrieve the first result, in order to fail, in order to trigger a retry.
71+
try:
72+
self._stored_first_result = six.next(self._wrapped)
73+
except TypeError:
74+
# It is possible the wrapped method isn't an iterable (a grpc.Call
75+
# for instance). If this happens don't store the first result.
76+
pass
77+
except StopIteration:
78+
# ignore stop iteration at this time. This should be handled outside of retry.
79+
pass
80+
6881
def __iter__(self):
6982
"""This iterator is also an iterable that returns itself."""
7083
return self
@@ -76,8 +89,13 @@ def next(self):
7689
protobuf.Message: A single response from the stream.
7790
"""
7891
try:
92+
if hasattr(self, "_stored_first_result"):
93+
result = self._stored_first_result
94+
del self._stored_first_result
95+
return result
7996
return six.next(self._wrapped)
8097
except grpc.RpcError as exc:
98+
# If the stream has already returned data, we cannot recover here.
8199
six.raise_from(exceptions.from_grpc_error(exc), exc)
82100

83101
# Alias needed for Python 2/3 support.

tests/unit/test_grpc_helpers.py

+36-5
Original file line numberDiff line numberDiff line change
@@ -129,24 +129,55 @@ def test_wrap_stream_errors_invocation():
129129
assert exc_info.value.response == grpc_error
130130

131131

132+
def test_wrap_stream_empty_iterator():
133+
expected_responses = []
134+
callable_ = mock.Mock(spec=["__call__"], return_value=iter(expected_responses))
135+
136+
wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
137+
138+
got_iterator = wrapped_callable()
139+
140+
responses = list(got_iterator)
141+
142+
callable_.assert_called_once_with()
143+
assert responses == expected_responses
144+
145+
132146
class RpcResponseIteratorImpl(object):
133-
def __init__(self, exception):
134-
self._exception = exception
147+
def __init__(self, iterable):
148+
self._iterable = iter(iterable)
135149

136150
def next(self):
137-
raise self._exception
151+
next_item = next(self._iterable)
152+
if isinstance(next_item, RpcErrorImpl):
153+
raise next_item
154+
return next_item
138155

139156
__next__ = next
140157

141158

142-
def test_wrap_stream_errors_iterator():
159+
def test_wrap_stream_errors_iterator_initialization():
143160
grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE)
144-
response_iter = RpcResponseIteratorImpl(grpc_error)
161+
response_iter = RpcResponseIteratorImpl([grpc_error])
145162
callable_ = mock.Mock(spec=["__call__"], return_value=response_iter)
146163

147164
wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
148165

166+
with pytest.raises(exceptions.ServiceUnavailable) as exc_info:
167+
wrapped_callable(1, 2, three="four")
168+
169+
callable_.assert_called_once_with(1, 2, three="four")
170+
assert exc_info.value.response == grpc_error
171+
172+
173+
def test_wrap_stream_errors_during_iteration():
174+
grpc_error = RpcErrorImpl(grpc.StatusCode.UNAVAILABLE)
175+
response_iter = RpcResponseIteratorImpl([1, grpc_error])
176+
callable_ = mock.Mock(spec=["__call__"], return_value=response_iter)
177+
178+
wrapped_callable = grpc_helpers._wrap_stream_errors(callable_)
149179
got_iterator = wrapped_callable(1, 2, three="four")
180+
next(got_iterator)
150181

151182
with pytest.raises(exceptions.ServiceUnavailable) as exc_info:
152183
next(got_iterator)

0 commit comments

Comments
 (0)