Skip to content

Commit

Permalink
0.0.172
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Sep 8, 2024
1 parent b66968e commit 06543b4
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 29 deletions.
62 changes: 53 additions & 9 deletions orso/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,51 @@
from orso.schema import RelationSchema


class _RowsIterator:
"""
Iterator class for processing PyArrow tables lazily.
Parameters:
tables: Iterable of PyArrow tables to process.
row_factory: Factory method to create Row instances.
batch_size: Number of rows to process at a time.
max_size: Maximum number of rows to return.
"""

def __init__(
self, tables: typing.Iterable, row_factory: typing.Callable, batch_size: int, max_size: int
):
self.tables = itertools.chain(tables)
self.row_factory = row_factory
self.batch_size = batch_size
self.max_size = max_size
self.rows_processed = 0
self.current_table = None
self.current_rows = iter([])

def __iter__(self):
return self

def __next__(self):
if self.rows_processed >= self.max_size:
raise StopIteration

try:
row = next(self.current_rows)
self.rows_processed += 1
return row
except StopIteration:
# Fetch the next table and process it
self.current_table = next(self.tables, None)
if self.current_table is None:
raise StopIteration

self.current_rows = iter(
process_table(self.current_table, self.row_factory, self.batch_size)
)
return self.__next__()


def to_arrow(dataset, size=None):
try:
import pyarrow
Expand Down Expand Up @@ -65,17 +110,16 @@ def from_arrow(tables, size=None):
columns=[FlatColumn.from_arrow(field) for field in arrow_schema],
)
row_factory = Row.create_class(orso_schema, tuples_only=True)
rows: typing.List[Row] = []

for table in itertools.chain([first_table], tables):
rows.extend(process_table(table, row_factory, BATCH_SIZE))
if len(rows) > size:
break
# Create an bespoke lazy iterator instance
rows_iterator = _RowsIterator(
tables=itertools.chain([first_table], tables),
row_factory=row_factory,
batch_size=BATCH_SIZE,
max_size=size,
)

# Limit the number of rows to 'size'
if isinstance(size, int):
rows = itertools.islice(rows, size)
return rows, orso_schema
return rows_iterator, orso_schema


def to_pandas(dataset, size=None):
Expand Down
69 changes: 50 additions & 19 deletions orso/display.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import datetime
import decimal
from collections import deque
from itertools import islice
from typing import Union

from orso.compute.compiled import calculate_data_width
Expand Down Expand Up @@ -162,8 +164,10 @@ def ascii_table(

import numpy

if len(table) == 0:
return "No data in table"
from orso import DataFrame

lazy_length = 0
is_lazy = not isinstance(table._rows, list)

# get the width of the display
if isinstance(display_width, bool):
Expand All @@ -175,14 +179,26 @@ def ascii_table(
display_width = shutil.get_terminal_size((80, 20))[0]
# Extract head data
if limit > 0 and not top_and_tail:
t = table.slice(length=limit)
elif limit > 0 and top_and_tail and table.rowcount > ((2 * limit) + 1):
t = table.head(size=limit) + table.tail(size=limit)
if is_lazy:
t = DataFrame(rows=[row for row in islice(table._rows, limit)], schema=table.schema)
else:
t = table.slice(length=limit)
elif limit > 0 and top_and_tail:
if not is_lazy and table.rowcount > ((2 * limit) + 1):
t = table.head(size=limit) + table.tail(size=limit)
else:
head = list(islice(table._rows, limit))
tail_collector = deque(maxlen=limit)
for lazy_length, entry in enumerate(table._rows):
tail_collector.append(entry)
tail = list(tail_collector)
lazy_length += len(head)
t = DataFrame(rows=head + tail, schema=table.schema)
else:
t = table

# width of index column
index_width = len(str(len(table))) + 2
index_width = len(str(lazy_length + 1)) + 2 if is_lazy else len(str(len(table))) + 2

def numpy_type_mapper(value):
if isinstance(value, numpy.ndarray):
Expand Down Expand Up @@ -351,20 +367,35 @@ def _inner():
+ " │"
)
yield ("╞" + ("═" * index_width) + "╪═" + "═╪═".join("═" * cw for cw in col_width) + "═╡")
for i, row in enumerate(t):
if top_and_tail and (len(t) + 1) < len(table):
if is_lazy:
offset = 1
for i, row in enumerate(t):
if i == limit:
yield "\001PUNCm...\001OFFm"
if i >= limit:
i += len(table) - (limit * 2)
formatted = [type_formatter(v, w, t) for v, w, t in zip(row, col_width, col_types)]
yield (
"│\001TYPEm"
+ str(i + 1).rjust(index_width - 1)
+ "\001OFFm │ "
+ " │ ".join(formatted)
+ " │"
)
yield "..."
offset += 1 + (lazy_length - 2 * limit)
formatted = [type_formatter(v, w, t) for v, w, t in zip(row, col_width, col_types)]
yield (
"│\001TYPEm"
+ str(i + offset).rjust(index_width - 1)
+ "\001OFFm │ "
+ " │ ".join(formatted)
+ " │"
)
else:
for i, row in enumerate(t):
if top_and_tail and (len(t) + 1) <= limit:
if i == limit:
yield "\001PUNCm...\001OFFm"
if i >= limit:
i += len(table) - (limit * 2)
formatted = [type_formatter(v, w, t) for v, w, t in zip(row, col_width, col_types)]
yield (
"│\001TYPEm"
+ str(i + 1).rjust(index_width - 1)
+ "\001OFFm │ "
+ " │ ".join(formatted)
+ " │"
)
yield ("└" + ("─" * index_width) + "┴─" + "─┴─".join("─" * cw for cw in col_width) + "─┘")

return "\n".join(
Expand Down
2 changes: 1 addition & 1 deletion orso/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__: str = "0.0.171"
__version__: str = "0.0.172"
__author__: str = "@joocer"

0 comments on commit 06543b4

Please sign in to comment.