Skip to content

Commit 46f2dd7

Browse files
perf: Improve repr performance (#918)
* perf: Improve repr performance * extract gbq metadata from nodes to common struct * clarify fast head * fix physical_schema to be bq client types * add classmethod annotation to GbqTable struct factory method * add classmethod annotation to GbqTable struct factory method --------- Co-authored-by: Tim Sweña (Swast) <[email protected]>
1 parent 7472a11 commit 46f2dd7

File tree

7 files changed

+238
-105
lines changed

7 files changed

+238
-105
lines changed

bigframes/core/__init__.py

+3-25
Original file line numberDiff line numberDiff line change
@@ -70,23 +70,7 @@ def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
7070
iobytes.getvalue(),
7171
data_schema=schema,
7272
session=session,
73-
)
74-
return cls(node)
75-
76-
@classmethod
77-
def from_cached(
78-
cls,
79-
original: ArrayValue,
80-
table: google.cloud.bigquery.Table,
81-
ordering: orderings.TotalOrdering,
82-
):
83-
node = nodes.CachedTableNode(
84-
original_node=original.node,
85-
project_id=table.reference.project,
86-
dataset_id=table.reference.dataset_id,
87-
table_id=table.reference.table_id,
88-
physical_schema=tuple(table.schema),
89-
ordering=ordering,
73+
n_rows=arrow_table.num_rows,
9074
)
9175
return cls(node)
9276

@@ -110,10 +94,7 @@ def from_table(
11094
bigframes.exceptions.PreviewWarning,
11195
)
11296
node = nodes.ReadTableNode(
113-
project_id=table.reference.project,
114-
dataset_id=table.reference.dataset_id,
115-
table_id=table.reference.table_id,
116-
physical_schema=tuple(table.schema),
97+
table=nodes.GbqTable.from_table(table),
11798
total_order_cols=(offsets_col,) if offsets_col else tuple(primary_key),
11899
order_col_is_sequential=(offsets_col is not None),
119100
columns=schema,
@@ -154,10 +135,7 @@ def as_cached(
154135
"""
155136
node = nodes.CachedTableNode(
156137
original_node=self.node,
157-
project_id=cache_table.reference.project,
158-
dataset_id=cache_table.reference.dataset_id,
159-
table_id=cache_table.reference.table_id,
160-
physical_schema=tuple(cache_table.schema),
138+
table=nodes.GbqTable.from_table(cache_table),
161139
ordering=ordering,
162140
)
163141
return ArrayValue(node)

bigframes/core/blocks.py

+10-16
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ def index(self) -> BlockIndexProperties:
200200
@functools.cached_property
201201
def shape(self) -> typing.Tuple[int, int]:
202202
"""Returns dimensions as (length, width) tuple."""
203+
203204
row_count_expr = self.expr.row_count()
204205

205206
# Support in-memory engines for hermetic unit tests.
@@ -210,8 +211,7 @@ def shape(self) -> typing.Tuple[int, int]:
210211
except Exception:
211212
pass
212213

213-
iter, _ = self.session._execute(row_count_expr, ordered=False)
214-
row_count = next(iter)[0]
214+
row_count = self.session._executor.get_row_count(self.expr)
215215
return (row_count, len(self.value_columns))
216216

217217
@property
@@ -560,7 +560,7 @@ def to_pandas(
560560
def try_peek(
561561
self, n: int = 20, force: bool = False
562562
) -> typing.Optional[pd.DataFrame]:
563-
if force or tree_properties.peekable(self.expr.node):
563+
if force or tree_properties.can_fast_peek(self.expr.node):
564564
iterator, _ = self.session._peek(self.expr, n)
565565
df = self._to_dataframe(iterator)
566566
self._copy_index_to_pandas(df)
@@ -1587,19 +1587,13 @@ def retrieve_repr_request_results(
15871587
15881588
Returns a tuple of the dataframe and the overall number of rows of the query.
15891589
"""
1590-
# TODO(swast): Select a subset of columns if max_columns is less than the
1591-
# number of columns in the schema.
1592-
count = self.shape[0]
1593-
if count > max_results:
1594-
head_block = self.slice(0, max_results)
1595-
else:
1596-
head_block = self
1597-
computed_df, query_job = head_block.to_pandas()
1598-
formatted_df = computed_df.set_axis(self.column_labels, axis=1)
1599-
# we reset the axis and substitute the bf index name(s) for the default
1600-
if len(self.index.names) > 0:
1601-
formatted_df.index.names = self.index.names # type: ignore
1602-
return formatted_df, count, query_job
1590+
1591+
results, query_job = self.session._executor.head(self.expr, max_results)
1592+
count = self.session._executor.get_row_count(self.expr)
1593+
1594+
computed_df = self._to_dataframe(results)
1595+
self._copy_index_to_pandas(computed_df)
1596+
return computed_df, count, query_job
16031597

16041598
def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
16051599
result_id = guid.generate_guid()

bigframes/core/compile/compiler.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -103,14 +103,16 @@ def compile_readlocal(self, node: nodes.ReadLocalNode, ordered: bool = True):
103103

104104
@_compile_node.register
105105
def compile_cached_table(self, node: nodes.CachedTableNode, ordered: bool = True):
106-
full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}"
106+
full_table_name = (
107+
f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}"
108+
)
107109
used_columns = (
108110
*node.schema.names,
109111
*node.hidden_columns,
110112
)
111113
# Physical schema might include unused columns, unsupported datatypes like JSON
112114
physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis(
113-
list(i for i in node.physical_schema if i.name in used_columns)
115+
list(i for i in node.table.physical_schema if i.name in used_columns)
114116
)
115117
ibis_table = ibis.table(physical_schema, full_table_name)
116118
if ordered:
@@ -156,14 +158,16 @@ def compile_readtable(self, node: nodes.ReadTableNode, ordered: bool = True):
156158
def read_table_as_unordered_ibis(
157159
self, node: nodes.ReadTableNode
158160
) -> ibis.expr.types.Table:
159-
full_table_name = f"{node.project_id}.{node.dataset_id}.{node.table_id}"
161+
full_table_name = (
162+
f"{node.table.project_id}.{node.table.dataset_id}.{node.table.table_id}"
163+
)
160164
used_columns = (
161165
*node.schema.names,
162166
*[i for i in node.total_order_cols if i not in node.schema.names],
163167
)
164168
# Physical schema might include unused columns, unsupported datatypes like JSON
165169
physical_schema = ibis.backends.bigquery.BigQuerySchema.to_ibis(
166-
list(i for i in node.physical_schema if i.name in used_columns)
170+
list(i for i in node.table.physical_schema if i.name in used_columns)
167171
)
168172
if node.at_time is not None or node.sql_predicate is not None:
169173
import bigframes.session._io.bigquery

bigframes/core/nodes.py

+81-39
Original file line numberDiff line numberDiff line change
@@ -312,18 +312,36 @@ def transform_children(
312312

313313
# Input Nodex
314314
@dataclass(frozen=True)
315-
class ReadLocalNode(BigFrameNode):
315+
class LeafNode(BigFrameNode):
316+
@property
317+
def roots(self) -> typing.Set[BigFrameNode]:
318+
return {self}
319+
320+
@property
321+
def supports_fast_head(self) -> bool:
322+
return False
323+
324+
def transform_children(
325+
self, t: Callable[[BigFrameNode], BigFrameNode]
326+
) -> BigFrameNode:
327+
return self
328+
329+
@property
330+
def row_count(self) -> typing.Optional[int]:
331+
"""How many rows are in the data source. None means unknown."""
332+
return None
333+
334+
335+
@dataclass(frozen=True)
336+
class ReadLocalNode(LeafNode):
316337
feather_bytes: bytes
317338
data_schema: schemata.ArraySchema
339+
n_rows: int
318340
session: typing.Optional[bigframes.session.Session] = None
319341

320342
def __hash__(self):
321343
return self._node_hash
322344

323-
@property
324-
def roots(self) -> typing.Set[BigFrameNode]:
325-
return {self}
326-
327345
@functools.cached_property
328346
def schema(self) -> schemata.ArraySchema:
329347
return self.data_schema
@@ -333,6 +351,10 @@ def variables_introduced(self) -> int:
333351
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
334352
return len(self.schema.items) + 1
335353

354+
@property
355+
def supports_fast_head(self) -> bool:
356+
return True
357+
336358
@property
337359
def order_ambiguous(self) -> bool:
338360
return False
@@ -341,20 +363,38 @@ def order_ambiguous(self) -> bool:
341363
def explicitly_ordered(self) -> bool:
342364
return True
343365

344-
def transform_children(
345-
self, t: Callable[[BigFrameNode], BigFrameNode]
346-
) -> BigFrameNode:
347-
return self
366+
@property
367+
def row_count(self) -> typing.Optional[int]:
368+
return self.n_rows
348369

349370

350-
## Put ordering in here or just add order_by node above?
351371
@dataclass(frozen=True)
352-
class ReadTableNode(BigFrameNode):
372+
class GbqTable:
353373
project_id: str = field()
354374
dataset_id: str = field()
355375
table_id: str = field()
356-
357376
physical_schema: Tuple[bq.SchemaField, ...] = field()
377+
n_rows: int = field()
378+
cluster_cols: typing.Optional[Tuple[str, ...]]
379+
380+
@staticmethod
381+
def from_table(table: bq.Table) -> GbqTable:
382+
return GbqTable(
383+
project_id=table.project,
384+
dataset_id=table.dataset_id,
385+
table_id=table.table_id,
386+
physical_schema=tuple(table.schema),
387+
n_rows=table.num_rows,
388+
cluster_cols=None
389+
if table.clustering_fields is None
390+
else tuple(table.clustering_fields),
391+
)
392+
393+
394+
## Put ordering in here or just add order_by node above?
395+
@dataclass(frozen=True)
396+
class ReadTableNode(LeafNode):
397+
table: GbqTable
358398
# Subset of physical schema columns, with chosen BQ types
359399
columns: schemata.ArraySchema = field()
360400

@@ -370,10 +410,10 @@ class ReadTableNode(BigFrameNode):
370410

371411
def __post_init__(self):
372412
# enforce invariants
373-
physical_names = set(map(lambda i: i.name, self.physical_schema))
413+
physical_names = set(map(lambda i: i.name, self.table.physical_schema))
374414
if not set(self.columns.names).issubset(physical_names):
375415
raise ValueError(
376-
f"Requested schema {self.columns} cannot be derived from table schemal {self.physical_schema}"
416+
f"Requested schema {self.columns} cannot be derived from table schemal {self.table.physical_schema}"
377417
)
378418
if self.order_col_is_sequential and len(self.total_order_cols) != 1:
379419
raise ValueError("Sequential primary key must have only one component")
@@ -385,10 +425,6 @@ def session(self):
385425
def __hash__(self):
386426
return self._node_hash
387427

388-
@property
389-
def roots(self) -> typing.Set[BigFrameNode]:
390-
return {self}
391-
392428
@property
393429
def schema(self) -> schemata.ArraySchema:
394430
return self.columns
@@ -398,6 +434,13 @@ def relation_ops_created(self) -> int:
398434
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
399435
return 3
400436

437+
@property
438+
def supports_fast_head(self) -> bool:
439+
# Fast head is only supported when row offsets are available.
440+
# In the future, ORDER BY+LIMIT optimizations may allow fast head when
441+
# clustered and/or partitioned on ordering key
442+
return self.order_col_is_sequential
443+
401444
@property
402445
def order_ambiguous(self) -> bool:
403446
return len(self.total_order_cols) == 0
@@ -410,37 +453,34 @@ def explicitly_ordered(self) -> bool:
410453
def variables_introduced(self) -> int:
411454
return len(self.schema.items) + 1
412455

413-
def transform_children(
414-
self, t: Callable[[BigFrameNode], BigFrameNode]
415-
) -> BigFrameNode:
416-
return self
456+
@property
457+
def row_count(self) -> typing.Optional[int]:
458+
if self.sql_predicate is None:
459+
return self.table.n_rows
460+
return None
417461

418462

419463
# This node shouldn't be used in the "original" expression tree, only used as replacement for original during planning
420464
@dataclass(frozen=True)
421-
class CachedTableNode(BigFrameNode):
465+
class CachedTableNode(LeafNode):
422466
# The original BFET subtree that was cached
423467
# note: this isn't a "child" node.
424468
original_node: BigFrameNode = field()
425469
# reference to cached materialization of original_node
426-
project_id: str = field()
427-
dataset_id: str = field()
428-
table_id: str = field()
429-
physical_schema: Tuple[bq.SchemaField, ...] = field()
430-
470+
table: GbqTable
431471
ordering: typing.Optional[orderings.RowOrdering] = field()
432472

433473
def __post_init__(self):
434474
# enforce invariants
435-
physical_names = set(map(lambda i: i.name, self.physical_schema))
475+
physical_names = set(map(lambda i: i.name, self.table.physical_schema))
436476
logical_names = self.original_node.schema.names
437477
if not set(logical_names).issubset(physical_names):
438478
raise ValueError(
439-
f"Requested schema {logical_names} cannot be derived from table schema {self.physical_schema}"
479+
f"Requested schema {logical_names} cannot be derived from table schema {self.table.physical_schema}"
440480
)
441481
if not set(self.hidden_columns).issubset(physical_names):
442482
raise ValueError(
443-
f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.physical_schema}"
483+
f"Requested hidden columns {self.hidden_columns} cannot be derived from table schema {self.table.physical_schema}"
444484
)
445485

446486
@property
@@ -450,10 +490,6 @@ def session(self):
450490
def __hash__(self):
451491
return self._node_hash
452492

453-
@property
454-
def roots(self) -> typing.Set[BigFrameNode]:
455-
return {self}
456-
457493
@property
458494
def schema(self) -> schemata.ArraySchema:
459495
return self.original_node.schema
@@ -473,6 +509,13 @@ def hidden_columns(self) -> typing.Tuple[str, ...]:
473509
if col not in self.schema.names
474510
)
475511

512+
@property
513+
def supports_fast_head(self) -> bool:
514+
# Fast head is only supported when row offsets are available.
515+
# In the future, ORDER BY+LIMIT optimizations may allow fast head when
516+
# clustered and/or partitioned on ordering key
517+
return (self.ordering is None) or self.ordering.is_sequential
518+
476519
@property
477520
def order_ambiguous(self) -> bool:
478521
return not isinstance(self.ordering, orderings.TotalOrdering)
@@ -483,10 +526,9 @@ def explicitly_ordered(self) -> bool:
483526
self.ordering.all_ordering_columns
484527
) > 0
485528

486-
def transform_children(
487-
self, t: Callable[[BigFrameNode], BigFrameNode]
488-
) -> BigFrameNode:
489-
return self
529+
@property
530+
def row_count(self) -> typing.Optional[int]:
531+
return self.table.n_rows
490532

491533

492534
# Unary nodes

0 commit comments

Comments
 (0)