16
16
17
17
import datetime
18
18
import queue
19
+ import time
19
20
20
21
from google .cloud .exceptions import NotFound
21
22
from google .cloud .spanner_v1 import BatchCreateSessionsRequest
24
25
_metadata_with_prefix ,
25
26
_metadata_with_leader_aware_routing ,
26
27
)
28
+ from google .cloud .spanner_v1 ._opentelemetry_tracing import (
29
+ add_span_event ,
30
+ get_current_span ,
31
+ )
27
32
from warnings import warn
28
33
29
34
_NOW = datetime .datetime .utcnow # unit tests may replace
@@ -196,20 +201,50 @@ def bind(self, database):
196
201
when needed.
197
202
"""
198
203
self ._database = database
204
+ requested_session_count = self .size - self ._sessions .qsize ()
205
+ span = get_current_span ()
206
+ span_event_attributes = {"kind" : type (self ).__name__ }
207
+
208
+ if requested_session_count <= 0 :
209
+ add_span_event (
210
+ span ,
211
+ f"Invalid session pool size({ requested_session_count } ) <= 0" ,
212
+ span_event_attributes ,
213
+ )
214
+ return
215
+
199
216
api = database .spanner_api
200
217
metadata = _metadata_with_prefix (database .name )
201
218
if database ._route_to_leader_enabled :
202
219
metadata .append (
203
220
_metadata_with_leader_aware_routing (database ._route_to_leader_enabled )
204
221
)
205
222
self ._database_role = self ._database_role or self ._database .database_role
223
+ if requested_session_count > 0 :
224
+ add_span_event (
225
+ span ,
226
+ f"Requesting { requested_session_count } sessions" ,
227
+ span_event_attributes ,
228
+ )
229
+
230
+ if self ._sessions .full ():
231
+ add_span_event (span , "Session pool is already full" , span_event_attributes )
232
+ return
233
+
206
234
request = BatchCreateSessionsRequest (
207
235
database = database .name ,
208
- session_count = self . size - self . _sessions . qsize () ,
236
+ session_count = requested_session_count ,
209
237
session_template = Session (creator_role = self .database_role ),
210
238
)
211
239
240
+ returned_session_count = 0
212
241
while not self ._sessions .full ():
242
+ request .session_count = requested_session_count - self ._sessions .qsize ()
243
+ add_span_event (
244
+ span ,
245
+ f"Creating { request .session_count } sessions" ,
246
+ span_event_attributes ,
247
+ )
213
248
resp = api .batch_create_sessions (
214
249
request = request ,
215
250
metadata = metadata ,
@@ -218,6 +253,13 @@ def bind(self, database):
218
253
session = self ._new_session ()
219
254
session ._session_id = session_pb .name .split ("/" )[- 1 ]
220
255
self ._sessions .put (session )
256
+ returned_session_count += 1
257
+
258
+ add_span_event (
259
+ span ,
260
+ f"Requested for { requested_session_count } sessions, returned { returned_session_count } " ,
261
+ span_event_attributes ,
262
+ )
221
263
222
264
def get (self , timeout = None ):
223
265
"""Check a session out from the pool.
@@ -233,12 +275,43 @@ def get(self, timeout=None):
233
275
if timeout is None :
234
276
timeout = self .default_timeout
235
277
236
- session = self ._sessions .get (block = True , timeout = timeout )
237
- age = _NOW () - session .last_use_time
278
+ start_time = time .time ()
279
+ current_span = get_current_span ()
280
+ span_event_attributes = {"kind" : type (self ).__name__ }
281
+ add_span_event (current_span , "Acquiring session" , span_event_attributes )
238
282
239
- if age >= self ._max_age and not session .exists ():
240
- session = self ._database .session ()
241
- session .create ()
283
+ session = None
284
+ try :
285
+ add_span_event (
286
+ current_span ,
287
+ "Waiting for a session to become available" ,
288
+ span_event_attributes ,
289
+ )
290
+
291
+ session = self ._sessions .get (block = True , timeout = timeout )
292
+ age = _NOW () - session .last_use_time
293
+
294
+ if age >= self ._max_age and not session .exists ():
295
+ if not session .exists ():
296
+ add_span_event (
297
+ current_span ,
298
+ "Session is not valid, recreating it" ,
299
+ span_event_attributes ,
300
+ )
301
+ session = self ._database .session ()
302
+ session .create ()
303
+ # Replacing with the updated session.id.
304
+ span_event_attributes ["session.id" ] = session ._session_id
305
+
306
+ span_event_attributes ["session.id" ] = session ._session_id
307
+ span_event_attributes ["time.elapsed" ] = time .time () - start_time
308
+ add_span_event (current_span , "Acquired session" , span_event_attributes )
309
+
310
+ except queue .Empty as e :
311
+ add_span_event (
312
+ current_span , "No sessions available in the pool" , span_event_attributes
313
+ )
314
+ raise e
242
315
243
316
return session
244
317
@@ -312,13 +385,32 @@ def get(self):
312
385
:returns: an existing session from the pool, or a newly-created
313
386
session.
314
387
"""
388
+ current_span = get_current_span ()
389
+ span_event_attributes = {"kind" : type (self ).__name__ }
390
+ add_span_event (current_span , "Acquiring session" , span_event_attributes )
391
+
315
392
try :
393
+ add_span_event (
394
+ current_span ,
395
+ "Waiting for a session to become available" ,
396
+ span_event_attributes ,
397
+ )
316
398
session = self ._sessions .get_nowait ()
317
399
except queue .Empty :
400
+ add_span_event (
401
+ current_span ,
402
+ "No sessions available in pool. Creating session" ,
403
+ span_event_attributes ,
404
+ )
318
405
session = self ._new_session ()
319
406
session .create ()
320
407
else :
321
408
if not session .exists ():
409
+ add_span_event (
410
+ current_span ,
411
+ "Session is not valid, recreating it" ,
412
+ span_event_attributes ,
413
+ )
322
414
session = self ._new_session ()
323
415
session .create ()
324
416
return session
@@ -427,6 +519,38 @@ def bind(self, database):
427
519
session_template = Session (creator_role = self .database_role ),
428
520
)
429
521
522
+ span_event_attributes = {"kind" : type (self ).__name__ }
523
+ current_span = get_current_span ()
524
+ requested_session_count = request .session_count
525
+ if requested_session_count <= 0 :
526
+ add_span_event (
527
+ current_span ,
528
+ f"Invalid session pool size({ requested_session_count } ) <= 0" ,
529
+ span_event_attributes ,
530
+ )
531
+ return
532
+
533
+ add_span_event (
534
+ current_span ,
535
+ f"Requesting { requested_session_count } sessions" ,
536
+ span_event_attributes ,
537
+ )
538
+
539
+ if created_session_count >= self .size :
540
+ add_span_event (
541
+ current_span ,
542
+ "Created no new sessions as sessionPool is full" ,
543
+ span_event_attributes ,
544
+ )
545
+ return
546
+
547
+ add_span_event (
548
+ current_span ,
549
+ f"Creating { request .session_count } sessions" ,
550
+ span_event_attributes ,
551
+ )
552
+
553
+ returned_session_count = 0
430
554
while created_session_count < self .size :
431
555
resp = api .batch_create_sessions (
432
556
request = request ,
@@ -436,8 +560,16 @@ def bind(self, database):
436
560
session = self ._new_session ()
437
561
session ._session_id = session_pb .name .split ("/" )[- 1 ]
438
562
self .put (session )
563
+ returned_session_count += 1
564
+
439
565
created_session_count += len (resp .session )
440
566
567
+ add_span_event (
568
+ current_span ,
569
+ f"Requested for { requested_session_count } sessions, return { returned_session_count } " ,
570
+ span_event_attributes ,
571
+ )
572
+
441
573
def get (self , timeout = None ):
442
574
"""Check a session out from the pool.
443
575
@@ -452,7 +584,26 @@ def get(self, timeout=None):
452
584
if timeout is None :
453
585
timeout = self .default_timeout
454
586
455
- ping_after , session = self ._sessions .get (block = True , timeout = timeout )
587
+ start_time = time .time ()
588
+ span_event_attributes = {"kind" : type (self ).__name__ }
589
+ current_span = get_current_span ()
590
+ add_span_event (
591
+ current_span ,
592
+ "Waiting for a session to become available" ,
593
+ span_event_attributes ,
594
+ )
595
+
596
+ ping_after = None
597
+ session = None
598
+ try :
599
+ ping_after , session = self ._sessions .get (block = True , timeout = timeout )
600
+ except queue .Empty as e :
601
+ add_span_event (
602
+ current_span ,
603
+ "No sessions available in the pool within the specified timeout" ,
604
+ span_event_attributes ,
605
+ )
606
+ raise e
456
607
457
608
if _NOW () > ping_after :
458
609
# Using session.exists() guarantees the returned session exists.
@@ -462,6 +613,14 @@ def get(self, timeout=None):
462
613
session = self ._new_session ()
463
614
session .create ()
464
615
616
+ span_event_attributes .update (
617
+ {
618
+ "time.elapsed" : time .time () - start_time ,
619
+ "session.id" : session ._session_id ,
620
+ "kind" : "pinging_pool" ,
621
+ }
622
+ )
623
+ add_span_event (current_span , "Acquired session" , span_event_attributes )
465
624
return session
466
625
467
626
def put (self , session ):
0 commit comments