Skip to content

Commit ab31078

Browse files
authored
fix: retry UNAVAILABLE errors for streaming RPCs (#1278)
UNAVAILABLE errors that occurred during the initial attempt of a streaming RPC (StreamingRead / ExecuteStreamingSql) would not be retried. Fixes #1150
1 parent 6352dd2 commit ab31078

File tree

3 files changed

+52
-4
lines changed

3 files changed

+52
-4
lines changed

google/cloud/spanner_v1/snapshot.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,18 @@ def _restart_on_unavailable(
8686
)
8787

8888
request.transaction = transaction_selector
89+
iterator = None
8990

90-
with trace_call(
91-
trace_name, session, attributes, observability_options=observability_options
92-
):
93-
iterator = method(request=request)
9491
while True:
9592
try:
93+
if iterator is None:
94+
with trace_call(
95+
trace_name,
96+
session,
97+
attributes,
98+
observability_options=observability_options,
99+
):
100+
iterator = method(request=request)
96101
for item in iterator:
97102
item_buffer.append(item)
98103
# Setting the transaction id because the transaction begin was inlined for first rpc.

tests/mockserver_tests/mock_server_test_base.py

+21
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,27 @@ def aborted_status() -> _Status:
5757
return status
5858

5959

60+
# Creates an UNAVAILABLE status with the smallest possible retry delay.
61+
def unavailable_status() -> _Status:
62+
error = status_pb2.Status(
63+
code=code_pb2.UNAVAILABLE,
64+
message="Service unavailable.",
65+
)
66+
retry_info = RetryInfo(retry_delay=Duration(seconds=0, nanos=1))
67+
status = _Status(
68+
code=code_to_grpc_status_code(error.code),
69+
details=error.message,
70+
trailing_metadata=(
71+
("grpc-status-details-bin", error.SerializeToString()),
72+
(
73+
"google.rpc.retryinfo-bin",
74+
retry_info.SerializeToString(),
75+
),
76+
),
77+
)
78+
return status
79+
80+
6081
def add_error(method: str, error: status_pb2.Status):
6182
MockServerTestBase.spanner_service.mock_spanner.add_error(method, error)
6283

tests/mockserver_tests/test_basics.py

+22
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,14 @@
2121
BeginTransactionRequest,
2222
TransactionOptions,
2323
)
24+
from google.cloud.spanner_v1.testing.mock_spanner import SpannerServicer
2425

2526
from tests.mockserver_tests.mock_server_test_base import (
2627
MockServerTestBase,
2728
add_select1_result,
2829
add_update_count,
30+
add_error,
31+
unavailable_status,
2932
)
3033

3134

@@ -85,3 +88,22 @@ def test_dbapi_partitioned_dml(self):
8588
self.assertEqual(
8689
TransactionOptions(dict(partitioned_dml={})), begin_request.options
8790
)
91+
92+
def test_execute_streaming_sql_unavailable(self):
93+
add_select1_result()
94+
# Add an UNAVAILABLE error that is returned the first time the
95+
# ExecuteStreamingSql RPC is called.
96+
add_error(SpannerServicer.ExecuteStreamingSql.__name__, unavailable_status())
97+
with self.database.snapshot() as snapshot:
98+
results = snapshot.execute_sql("select 1")
99+
result_list = []
100+
for row in results:
101+
result_list.append(row)
102+
self.assertEqual(1, row[0])
103+
self.assertEqual(1, len(result_list))
104+
requests = self.spanner_service.requests
105+
self.assertEqual(3, len(requests), msg=requests)
106+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
107+
# The ExecuteStreamingSql call should be retried.
108+
self.assertTrue(isinstance(requests[1], ExecuteSqlRequest))
109+
self.assertTrue(isinstance(requests[2], ExecuteSqlRequest))

0 commit comments

Comments
 (0)