Skip to content

Commit 528d9dd

Browse files
authored
feat(spanner): support multiplexed sessions for ReadWriteStmtBasedTransaction (#11852)
This PR adds support for multiplexed sessions in ReadWriteStmtBasedTransaction
1 parent ea00b17 commit 528d9dd

File tree

2 files changed

+186
-4
lines changed

2 files changed

+186
-4
lines changed

spanner/transaction.go

+26-4
Original file line numberDiff line numberDiff line change
@@ -1910,16 +1910,20 @@ func NewReadWriteStmtBasedTransaction(ctx context.Context, c *Client) (*ReadWrit
19101910
// NewReadWriteStmtBasedTransactionWithOptions is a configurable version of
19111911
// NewReadWriteStmtBasedTransaction.
19121912
func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, options TransactionOptions) (*ReadWriteStmtBasedTransaction, error) {
1913-
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, options, nil)
1913+
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, c, options, nil, nil)
19141914
}
19151915

1916-
func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *Client, options TransactionOptions, sh *sessionHandle) (*ReadWriteStmtBasedTransaction, error) {
1916+
func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *Client, options TransactionOptions, sh *sessionHandle, previousTransactionID transactionID) (*ReadWriteStmtBasedTransaction, error) {
19171917
var (
19181918
err error
19191919
t *ReadWriteStmtBasedTransaction
19201920
)
19211921
if sh == nil {
1922-
sh, err = c.idleSessions.take(ctx)
1922+
if c.idleSessions.isMultiplexedSessionForRWEnabled() {
1923+
sh, err = c.idleSessions.takeMultiplexed(ctx)
1924+
} else {
1925+
sh, err = c.idleSessions.take(ctx)
1926+
}
19231927
if err != nil {
19241928
// If session retrieval fails, just fail the transaction.
19251929
return nil, err
@@ -1931,6 +1935,12 @@ func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *C
19311935
},
19321936
client: c,
19331937
}
1938+
if previousTransactionID != nil {
1939+
// The previousTx field is updated with the most recent transaction ID. This is needed for multiplexed sessions
1940+
// to increase the priority of the new transaction during retry attempt.
1941+
// This assignment is ignored for regular sessions.
1942+
t.previousTx = previousTransactionID
1943+
}
19341944
t.txReadOnly.sp = c.idleSessions
19351945
t.txReadOnly.sh = sh
19361946
t.txReadOnly.txReadEnv = t
@@ -1958,6 +1968,9 @@ func newReadWriteStmtBasedTransactionWithSessionHandle(ctx context.Context, c *C
19581968
}
19591969
return nil, err
19601970
}
1971+
if isUnimplementedErrorForMultiplexedRW(err) {
1972+
c.idleSessions.disableMultiplexedSessionForRW()
1973+
}
19611974
return t, err
19621975
}
19631976

@@ -2003,8 +2016,17 @@ func (t *ReadWriteStmtBasedTransaction) ResetForRetry(ctx context.Context) (*Rea
20032016
if t.state != txAborted {
20042017
return nil, fmt.Errorf("ResetForRetry should only be called on an active transaction that was aborted by Spanner")
20052018
}
2019+
2020+
var previousTransactionID transactionID
2021+
if t.tx != nil {
2022+
// Track the current transactionId that is ABORTED.
2023+
previousTransactionID = t.tx
2024+
} else {
2025+
// In case the current transactionId is nil, then look at the previousTx.
2026+
previousTransactionID = t.previousTx
2027+
}
20062028
// Create a new transaction that re-uses the current session if it is available.
2007-
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, t.client, t.options, t.sh)
2029+
return newReadWriteStmtBasedTransactionWithSessionHandle(ctx, t.client, t.options, t.sh, previousTransactionID)
20082030
}
20092031

20102032
// writeOnlyTransaction provides the most efficient way of doing write-only

spanner/transaction_test.go

+160
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package spanner
1818

1919
import (
20+
"bytes"
2021
"context"
2122
"errors"
2223
"fmt"
@@ -1199,6 +1200,165 @@ func TestReadWriteStmtBasedTransactionWithOptions(t *testing.T) {
11991200
}
12001201
}
12011202

1203+
// Verify that requests in a ReadWriteStmtBasedTransaction uses multiplexed sessions when enabled
1204+
func TestReadWriteStmtBasedTransaction_UsesMultiplexedSession(t *testing.T) {
1205+
t.Parallel()
1206+
ctx := context.Background()
1207+
cfg := SessionPoolConfig{
1208+
// Avoid regular session creation
1209+
MinOpened: 0,
1210+
MaxOpened: 0,
1211+
// Enabled multiplexed sessions
1212+
enableMultiplexSession: true,
1213+
enableMultiplexedSessionForRW: true,
1214+
}
1215+
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1216+
SessionPoolConfig: cfg,
1217+
})
1218+
defer teardown()
1219+
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1220+
SimulatedExecutionTime{
1221+
Errors: []error{status.Errorf(codes.Aborted, "Transaction aborted")},
1222+
})
1223+
1224+
txn, err := NewReadWriteStmtBasedTransaction(ctx, client)
1225+
if err != nil {
1226+
t.Fatalf("got an error: %v", err)
1227+
}
1228+
_, err = txn.Commit(ctx)
1229+
if status.Code(err) != codes.Aborted || !strings.Contains(err.Error(), "Transaction aborted") {
1230+
t.Fatalf("got an incorrect error: %v", err)
1231+
}
1232+
1233+
requests := drainRequestsFromServer(server.TestSpanner)
1234+
if err := compareRequests([]interface{}{
1235+
&sppb.CreateSessionRequest{},
1236+
&sppb.BeginTransactionRequest{},
1237+
&sppb.CommitRequest{}}, requests); err != nil {
1238+
t.Fatal(err)
1239+
}
1240+
for _, req := range requests {
1241+
if c, ok := req.(*sppb.CommitRequest); ok {
1242+
if !strings.Contains(c.GetSession(), "multiplexed") {
1243+
t.Errorf("Expected session to be multiplexed")
1244+
}
1245+
}
1246+
if b, ok := req.(*sppb.BeginTransactionRequest); ok {
1247+
if !strings.Contains(b.GetSession(), "multiplexed") {
1248+
t.Errorf("Expected session to be multiplexed")
1249+
}
1250+
}
1251+
}
1252+
}
1253+
1254+
// Verify that in ReadWriteStmtBasedTransaction when a transaction using a multiplexed session fails with ABORTED then during retry the previousTransactionID is passed
1255+
func TestReadWriteStmtBasedTransaction_UsesPreviousTransactionIDForMultiplexedSession_OnAbort(t *testing.T) {
1256+
t.Parallel()
1257+
ctx := context.Background()
1258+
cfg := SessionPoolConfig{
1259+
// Avoid regular session creation
1260+
MinOpened: 0,
1261+
MaxOpened: 0,
1262+
// Enabled multiplexed sessions
1263+
enableMultiplexSession: true,
1264+
enableMultiplexedSessionForRW: true,
1265+
}
1266+
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1267+
SessionPoolConfig: cfg,
1268+
})
1269+
defer teardown()
1270+
server.TestSpanner.PutExecutionTime(MethodCommitTransaction,
1271+
SimulatedExecutionTime{
1272+
Errors: []error{status.Errorf(codes.Aborted, "Transaction aborted")},
1273+
})
1274+
1275+
txn, err := NewReadWriteStmtBasedTransaction(ctx, client)
1276+
if err != nil {
1277+
t.Fatalf("got an error: %v", err)
1278+
}
1279+
_, err = txn.Commit(ctx)
1280+
if status.Code(err) != codes.Aborted || !strings.Contains(err.Error(), "Transaction aborted") {
1281+
t.Fatalf("got an incorrect error: %v", err)
1282+
}
1283+
// Fetch the transaction ID of the first attempt
1284+
previousTransactionID := txn.tx
1285+
txn, err = txn.ResetForRetry(ctx)
1286+
if err != nil {
1287+
t.Fatal(err)
1288+
}
1289+
_, err = txn.Commit(ctx)
1290+
if err != nil {
1291+
t.Fatalf("expect commit to succeed but failed with error: %v", err)
1292+
}
1293+
requests := drainRequestsFromServer(server.TestSpanner)
1294+
beginTransactionRequestCount := 0
1295+
for _, req := range requests {
1296+
if b, ok := req.(*sppb.BeginTransactionRequest); ok {
1297+
beginTransactionRequestCount++
1298+
// The first BeginTransactionRequest will not have any previousTransactionID.
1299+
// Check if the second BeginTransactionRequest sets the previousTransactionID to correct value.
1300+
if beginTransactionRequestCount == 2 {
1301+
if !strings.Contains(b.GetSession(), "multiplexed") {
1302+
t.Errorf("Expected session to be multiplexed")
1303+
}
1304+
opts := b.Options.GetReadWrite()
1305+
if opts == nil {
1306+
t.Fatal("missing ReadWrite options")
1307+
}
1308+
if !bytes.Equal(opts.MultiplexedSessionPreviousTransactionId, previousTransactionID) {
1309+
t.Errorf("BeginTransactionRequest during retry: got prev txID %v, want %v",
1310+
opts.MultiplexedSessionPreviousTransactionId, previousTransactionID)
1311+
}
1312+
}
1313+
}
1314+
}
1315+
}
1316+
1317+
// Verify that in ReadWriteStmtBasedTransaction, commit request has precommit token set when using multiplexed sessions
1318+
func TestReadWriteStmtBasedTransaction_SetsPrecommitToken(t *testing.T) {
1319+
t.Parallel()
1320+
ctx := context.Background()
1321+
cfg := SessionPoolConfig{
1322+
// Avoid regular session creation
1323+
MinOpened: 0,
1324+
MaxOpened: 0,
1325+
// Enabled multiplexed sessions
1326+
enableMultiplexSession: true,
1327+
enableMultiplexedSessionForRW: true,
1328+
}
1329+
server, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{
1330+
SessionPoolConfig: cfg,
1331+
})
1332+
defer teardown()
1333+
1334+
txn, err := NewReadWriteStmtBasedTransaction(ctx, client)
1335+
if err != nil {
1336+
t.Fatalf("got an error: %v", err)
1337+
}
1338+
c, err := txn.Update(ctx, Statement{SQL: UpdateBarSetFoo})
1339+
if err != nil {
1340+
t.Fatal(err)
1341+
}
1342+
if g, w := c, int64(UpdateBarSetFooRowCount); g != w {
1343+
t.Fatalf("update count mismatch\n Got: %v\nWant: %v", g, w)
1344+
}
1345+
_, err = txn.Commit(ctx)
1346+
if err != nil {
1347+
t.Fatal(err)
1348+
}
1349+
requests := drainRequestsFromServer(server.TestSpanner)
1350+
for _, req := range requests {
1351+
if commitReq, ok := req.(*sppb.CommitRequest); ok {
1352+
if commitReq.GetPrecommitToken() == nil || !strings.Contains(string(commitReq.GetPrecommitToken().PrecommitToken), "ResultSetPrecommitToken") {
1353+
t.Errorf("Expected precommit token 'ResultSetPrecommitToken', got %v", commitReq.GetPrecommitToken())
1354+
}
1355+
if !strings.Contains(commitReq.GetSession(), "multiplexed") {
1356+
t.Errorf("Expected session to be multiplexed")
1357+
}
1358+
}
1359+
}
1360+
}
1361+
12021362
func TestBatchDML_StatementBased_WithMultipleDML(t *testing.T) {
12031363
t.Parallel()
12041364
ctx := context.Background()

0 commit comments

Comments
 (0)