Skip to content

Commit 0887eb4

Browse files
aakashanandgolavloitegcf-owl-bot[bot]lszinvrahul2393
authored
fix: update retry strategy for mutation calls to handle aborted transactions (#1279)
* fix: update retry strategy for mutation calls to handle aborted transactions * test: add mock server test for aborted batch * chore(python): Update the python version in docs presubmit to use 3.10 (#1281) Source-Link: googleapis/synthtool@de3def6 Post-Processor: gcr.io/cloud-devrel-public-resources/owlbot-python:latest@sha256:a1c5112b81d645f5bbc4d4bbc99d7dcb5089a52216c0e3fb1203a0eeabadd7d5 Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> * fix:Refactoring existing retry logic for aborted transactions and clean up redundant code * fix: fixed linting errors * feat: support GRAPH and pipe syntax in dbapi (#1285) Recognize GRAPH and pipe syntax queries as valid queries in dbapi. * chore: Add Custom OpenTelemetry Exporter in for Service Metrics (#1273) * chore: Add Custom OpenTelemetry Exporter in for Service Metrics * Updated copyright dates to 2025 --------- Co-authored-by: rahul2393 <[email protected]> * fix: removing retry logic for RST_STREAM errors from _retry_on_aborted_exception handler --------- Co-authored-by: Knut Olav Løite <[email protected]> Co-authored-by: gcf-owl-bot[bot] <78513119+gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Lester Szeto <[email protected]> Co-authored-by: rahul2393 <[email protected]>
1 parent 04a11a6 commit 0887eb4

File tree

12 files changed

+247
-72
lines changed

12 files changed

+247
-72
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,7 @@ system_tests/local_test_setup
6262
# Make sure a generated file isn't accidentally committed.
6363
pylintrc
6464
pylintrc.test
65+
66+
67+
# Ignore coverage files
68+
.coverage*

google/cloud/spanner_dbapi/transaction_helper.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from google.cloud.spanner_dbapi.batch_dml_executor import BatchMode
2222
from google.cloud.spanner_dbapi.exceptions import RetryAborted
23-
from google.cloud.spanner_v1.session import _get_retry_delay
23+
from google.cloud.spanner_v1._helpers import _get_retry_delay
2424

2525
if TYPE_CHECKING:
2626
from google.cloud.spanner_dbapi import Connection, Cursor

google/cloud/spanner_v1/_helpers.py

+75
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,15 @@
2727
from google.protobuf.internal.enum_type_wrapper import EnumTypeWrapper
2828

2929
from google.api_core import datetime_helpers
30+
from google.api_core.exceptions import Aborted
3031
from google.cloud._helpers import _date_from_iso8601_date
3132
from google.cloud.spanner_v1 import TypeCode
3233
from google.cloud.spanner_v1 import ExecuteSqlRequest
3334
from google.cloud.spanner_v1 import JsonObject
3435
from google.cloud.spanner_v1.request_id_header import with_request_id
36+
from google.rpc.error_details_pb2 import RetryInfo
37+
38+
import random
3539

3640
# Validation error messages
3741
NUMERIC_MAX_SCALE_ERR_MSG = (
@@ -460,6 +464,23 @@ def _metadata_with_prefix(prefix, **kw):
460464
return [("google-cloud-resource-prefix", prefix)]
461465

462466

467+
def _retry_on_aborted_exception(
468+
func,
469+
deadline,
470+
):
471+
"""
472+
Handles retry logic for Aborted exceptions, considering the deadline.
473+
"""
474+
attempts = 0
475+
while True:
476+
try:
477+
attempts += 1
478+
return func()
479+
except Aborted as exc:
480+
_delay_until_retry(exc, deadline=deadline, attempts=attempts)
481+
continue
482+
483+
463484
def _retry(
464485
func,
465486
retry_count=5,
@@ -529,6 +550,60 @@ def _metadata_with_leader_aware_routing(value, **kw):
529550
return ("x-goog-spanner-route-to-leader", str(value).lower())
530551

531552

553+
def _delay_until_retry(exc, deadline, attempts):
554+
"""Helper for :meth:`Session.run_in_transaction`.
555+
556+
Detect retryable abort, and impose server-supplied delay.
557+
558+
:type exc: :class:`google.api_core.exceptions.Aborted`
559+
:param exc: exception for aborted transaction
560+
561+
:type deadline: float
562+
:param deadline: maximum timestamp to continue retrying the transaction.
563+
564+
:type attempts: int
565+
:param attempts: number of call retries
566+
"""
567+
568+
cause = exc.errors[0]
569+
now = time.time()
570+
if now >= deadline:
571+
raise
572+
573+
delay = _get_retry_delay(cause, attempts)
574+
if delay is not None:
575+
if now + delay > deadline:
576+
raise
577+
578+
time.sleep(delay)
579+
580+
581+
def _get_retry_delay(cause, attempts):
582+
"""Helper for :func:`_delay_until_retry`.
583+
584+
:type exc: :class:`grpc.Call`
585+
:param exc: exception for aborted transaction
586+
587+
:rtype: float
588+
:returns: seconds to wait before retrying the transaction.
589+
590+
:type attempts: int
591+
:param attempts: number of call retries
592+
"""
593+
if hasattr(cause, "trailing_metadata"):
594+
metadata = dict(cause.trailing_metadata())
595+
else:
596+
metadata = {}
597+
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
598+
if retry_info_pb is not None:
599+
retry_info = RetryInfo()
600+
retry_info.ParseFromString(retry_info_pb)
601+
nanos = retry_info.retry_delay.nanos
602+
return retry_info.retry_delay.seconds + nanos / 1.0e9
603+
604+
return 2**attempts + random.random()
605+
606+
532607
class AtomicCounter:
533608
def __init__(self, start_value=0):
534609
self.__lock = threading.Lock()

google/cloud/spanner_v1/batch.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@
2929
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
3030
from google.cloud.spanner_v1 import RequestOptions
3131
from google.cloud.spanner_v1._helpers import _retry
32+
from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception
3233
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
3334
from google.api_core.exceptions import InternalServerError
35+
import time
36+
37+
DEFAULT_RETRY_TIMEOUT_SECS = 30
3438

3539

3640
class _BatchBase(_SessionWrapper):
@@ -162,6 +166,7 @@ def commit(
162166
request_options=None,
163167
max_commit_delay=None,
164168
exclude_txn_from_change_streams=False,
169+
**kwargs,
165170
):
166171
"""Commit mutations to the database.
167172
@@ -227,9 +232,12 @@ def commit(
227232
request=request,
228233
metadata=metadata,
229234
)
230-
response = _retry(
235+
deadline = time.time() + kwargs.get(
236+
"timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS
237+
)
238+
response = _retry_on_aborted_exception(
231239
method,
232-
allowed_exceptions={InternalServerError: _check_rst_stream_error},
240+
deadline=deadline,
233241
)
234242
self.committed = response.commit_timestamp
235243
self.commit_stats = response.commit_stats
@@ -348,7 +356,9 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
348356
)
349357
response = _retry(
350358
method,
351-
allowed_exceptions={InternalServerError: _check_rst_stream_error},
359+
allowed_exceptions={
360+
InternalServerError: _check_rst_stream_error,
361+
},
352362
)
353363
self.committed = True
354364
return response

google/cloud/spanner_v1/database.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,7 @@ def batch(
775775
request_options=None,
776776
max_commit_delay=None,
777777
exclude_txn_from_change_streams=False,
778+
**kw,
778779
):
779780
"""Return an object which wraps a batch.
780781
@@ -805,7 +806,11 @@ def batch(
805806
:returns: new wrapper
806807
"""
807808
return BatchCheckout(
808-
self, request_options, max_commit_delay, exclude_txn_from_change_streams
809+
self,
810+
request_options,
811+
max_commit_delay,
812+
exclude_txn_from_change_streams,
813+
**kw,
809814
)
810815

811816
def mutation_groups(self):
@@ -1166,6 +1171,7 @@ def __init__(
11661171
request_options=None,
11671172
max_commit_delay=None,
11681173
exclude_txn_from_change_streams=False,
1174+
**kw,
11691175
):
11701176
self._database = database
11711177
self._session = self._batch = None
@@ -1177,6 +1183,7 @@ def __init__(
11771183
self._request_options = request_options
11781184
self._max_commit_delay = max_commit_delay
11791185
self._exclude_txn_from_change_streams = exclude_txn_from_change_streams
1186+
self._kw = kw
11801187

11811188
def __enter__(self):
11821189
"""Begin ``with`` block."""
@@ -1197,6 +1204,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
11971204
request_options=self._request_options,
11981205
max_commit_delay=self._max_commit_delay,
11991206
exclude_txn_from_change_streams=self._exclude_txn_from_change_streams,
1207+
**self._kw,
12001208
)
12011209
finally:
12021210
if self._database.log_commit_stats and self._batch.commit_stats:

google/cloud/spanner_v1/session.py

+2-56
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
"""Wrapper for Cloud Spanner Session objects."""
1616

1717
from functools import total_ordering
18-
import random
1918
import time
2019
from datetime import datetime
2120

2221
from google.api_core.exceptions import Aborted
2322
from google.api_core.exceptions import GoogleAPICallError
2423
from google.api_core.exceptions import NotFound
2524
from google.api_core.gapic_v1 import method
26-
from google.rpc.error_details_pb2 import RetryInfo
25+
from google.cloud.spanner_v1._helpers import _delay_until_retry
26+
from google.cloud.spanner_v1._helpers import _get_retry_delay
2727

2828
from google.cloud.spanner_v1 import ExecuteSqlRequest
2929
from google.cloud.spanner_v1 import CreateSessionRequest
@@ -554,57 +554,3 @@ def run_in_transaction(self, func, *args, **kw):
554554
extra={"commit_stats": txn.commit_stats},
555555
)
556556
return return_value
557-
558-
559-
# Rational: this function factors out complex shared deadline / retry
560-
# handling from two `except:` clauses.
561-
def _delay_until_retry(exc, deadline, attempts):
562-
"""Helper for :meth:`Session.run_in_transaction`.
563-
564-
Detect retryable abort, and impose server-supplied delay.
565-
566-
:type exc: :class:`google.api_core.exceptions.Aborted`
567-
:param exc: exception for aborted transaction
568-
569-
:type deadline: float
570-
:param deadline: maximum timestamp to continue retrying the transaction.
571-
572-
:type attempts: int
573-
:param attempts: number of call retries
574-
"""
575-
cause = exc.errors[0]
576-
577-
now = time.time()
578-
579-
if now >= deadline:
580-
raise
581-
582-
delay = _get_retry_delay(cause, attempts)
583-
if delay is not None:
584-
if now + delay > deadline:
585-
raise
586-
587-
time.sleep(delay)
588-
589-
590-
def _get_retry_delay(cause, attempts):
591-
"""Helper for :func:`_delay_until_retry`.
592-
593-
:type exc: :class:`grpc.Call`
594-
:param exc: exception for aborted transaction
595-
596-
:rtype: float
597-
:returns: seconds to wait before retrying the transaction.
598-
599-
:type attempts: int
600-
:param attempts: number of call retries
601-
"""
602-
metadata = dict(cause.trailing_metadata())
603-
retry_info_pb = metadata.get("google.rpc.retryinfo-bin")
604-
if retry_info_pb is not None:
605-
retry_info = RetryInfo()
606-
retry_info.ParseFromString(retry_info_pb)
607-
nanos = retry_info.retry_delay.nanos
608-
return retry_info.retry_delay.seconds + nanos / 1.0e9
609-
610-
return 2**attempts + random.random()

google/cloud/spanner_v1/testing/mock_spanner.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,19 @@ def __create_transaction(
213213
def Commit(self, request, context):
214214
self._requests.append(request)
215215
self.mock_spanner.pop_error(context)
216-
tx = self.transactions[request.transaction_id]
217-
if tx is None:
218-
raise ValueError(f"Transaction not found: {request.transaction_id}")
219-
del self.transactions[request.transaction_id]
216+
if not request.transaction_id == b"":
217+
tx = self.transactions[request.transaction_id]
218+
if tx is None:
219+
raise ValueError(f"Transaction not found: {request.transaction_id}")
220+
tx_id = request.transaction_id
221+
elif not request.single_use_transaction == TransactionOptions():
222+
tx = self.__create_transaction(
223+
request.session, request.single_use_transaction
224+
)
225+
tx_id = tx.id
226+
else:
227+
raise ValueError("Unsupported transaction type")
228+
del self.transactions[tx_id]
220229
return commit.CommitResponse()
221230

222231
def Rollback(self, request, context):

tests/mockserver_tests/test_aborted_transaction.py

+24
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,30 @@ def test_run_in_transaction_batch_dml_aborted(self):
9595
self.assertTrue(isinstance(requests[2], ExecuteBatchDmlRequest))
9696
self.assertTrue(isinstance(requests[3], CommitRequest))
9797

98+
def test_batch_commit_aborted(self):
99+
# Add an Aborted error for the Commit method on the mock server.
100+
add_error(SpannerServicer.Commit.__name__, aborted_status())
101+
with self.database.batch() as batch:
102+
batch.insert(
103+
table="Singers",
104+
columns=("SingerId", "FirstName", "LastName"),
105+
values=[
106+
(1, "Marc", "Richards"),
107+
(2, "Catalina", "Smith"),
108+
(3, "Alice", "Trentor"),
109+
(4, "Lea", "Martin"),
110+
(5, "David", "Lomond"),
111+
],
112+
)
113+
114+
# Verify that the transaction was retried.
115+
requests = self.spanner_service.requests
116+
self.assertEqual(3, len(requests), msg=requests)
117+
self.assertTrue(isinstance(requests[0], BatchCreateSessionsRequest))
118+
self.assertTrue(isinstance(requests[1], CommitRequest))
119+
# The transaction is aborted and retried.
120+
self.assertTrue(isinstance(requests[2], CommitRequest))
121+
98122

99123
def _insert_mutations(transaction: Transaction):
100124
transaction.insert("my_table", ["col1", "col2"], ["value1", "value2"])

0 commit comments

Comments
 (0)