Skip to content

Commit

Permalink
0.0.170
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Aug 23, 2024
1 parent 1877fed commit 46a84c9
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 52 deletions.
63 changes: 33 additions & 30 deletions orso/compute/compiled.pyx
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#cython: infer_types=True
#cython: embedsignature=True
#cython: binding=False
#cython: language_level=3
# cython: infer_types=True
# cython: embedsignature=True
# cython: binding=False
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: nonecheck=False
# cython: overflowcheck=False

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -27,6 +31,7 @@ import numpy as np
cimport cython
cimport numpy as cnp
from numpy cimport ndarray
from libc.stdint cimport int32_t

cnp.import_array()

Expand All @@ -35,8 +40,7 @@ cnp.import_array()
HEADER_PREFIX = b"\x10\x00"
MAXIMUM_RECORD_SIZE = 8 * 1024 * 1024

@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing

cpdef from_bytes_cython(bytes data):
cdef const char* data_ptr = PyBytes_AsString(data)
cdef Py_ssize_t length = PyBytes_GET_SIZE(data)
Expand Down Expand Up @@ -72,42 +76,41 @@ cpdef from_bytes_cython(bytes data):
return tuple(processed_list)





cpdef tuple extract_dict_columns(dict data, tuple fields):
cdef int i
cdef str field
cdef list sorted_data = [None] * len(fields) # Preallocate list size
for i in range(len(fields)):
field = fields[i]
sorted_data[i] = data[field]
return tuple(sorted_data) # Convert list to tuple
cdef list field_data = [None] * len(fields) # Preallocate list size

for i, field in enumerate(fields):
if field in data:
field_data[i] = data[field]
return tuple(field_data) # Convert list to tuple


@cython.boundscheck(False)
@cython.wraparound(False)
def collect_cython(list rows, cnp.ndarray[cnp.int32_t, ndim=1] columns, int limit=-1, int single=False) -> list:
cdef int i, j
cdef int num_rows = len(rows)
cdef int num_cols = columns.shape[0]
cpdef list collect_cython(list rows, cnp.ndarray[cnp.int32_t, ndim=1] columns, int limit=-1):
cdef int32_t i, j, col_idx
cdef int32_t num_rows = len(rows)
cdef int32_t num_cols = columns.shape[0]
cdef list row

if limit >= 0 and limit < num_rows:
num_rows = limit

# Initialize result list
cdef list result = [[None] * num_rows for _ in range(num_cols)]
# Initialize result memory view with pre-allocated numpy arrays for each column
#cdef cnp.ndarray[object, ndim=2] result = np.empty((num_cols, num_rows), dtype=object)
cdef list result = [list([0] * num_rows) for _ in range(num_cols)]

# Iterate over rows and columns, collecting data
for i in range(num_rows):
row = rows[i]
for j in range(num_cols):
result[j][i] = row[columns[j]]
# Populate each column one at a time
for j in range(num_cols):
col_idx = columns[j]
row = result[j]
for i in range(num_rows):
row[i] = rows[i][col_idx]

# Convert each column back to a list and return the list of lists
return result #[result[i] for i in range(num_cols)]

return result[0] if single else result

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef int calculate_data_width(list column_values):
cdef int width, max_width
cdef object value
Expand Down
1 change: 0 additions & 1 deletion orso/converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from orso.row import Row
from orso.schema import FlatColumn
from orso.schema import RelationSchema
from orso.tools import arrow_type_map


def to_arrow(dataset, size=None):
Expand Down
7 changes: 5 additions & 2 deletions orso/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,12 @@ def collect(
if not isinstance(c, int):
column_indicies[i] = self.column_names.index(c)

return collect_cython(
self._rows, numpy.array(column_indicies, dtype=numpy.int32), limit, single
collected = collect_cython(
self._rows, numpy.array(column_indicies, dtype=numpy.int32), limit
)
if single:
return collected[0]
return collected

def __getitem__(self, items):
return self.collect(columns=items, limit=None)
Expand Down
11 changes: 6 additions & 5 deletions orso/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ormsgpack import OPT_SERIALIZE_NUMPY
from ormsgpack import packb

# from orso.compiled import extract_dict_columns
from orso.compute.compiled import extract_dict_columns
from orso.compute.compiled import from_bytes_cython
from orso.exceptions import DataError
from orso.schema import RelationSchema
Expand Down Expand Up @@ -84,10 +84,11 @@ def __new__(cls, data: Union[Dict[str, Any], Tuple[Any, ...]]):
A new Row instance.
"""
if isinstance(data, dict):
data = tuple([data.get(field) for field in cls._fields])
# this is faster but has a quirk that needs to be resolved - working is better than fast, but fast
# is good
# data = extract_dict_columns(data, cls._fields) # type:ignore
# data = tuple([data.get(field) for field in cls._fields])
# previous comments on the below line suggested it had a bug, but didn't
# say what the bug was - this is about 25% faster than the pure Python version
# There is a lot of testing on this function and it hasn't found any bugs.
data = extract_dict_columns(data, cls._fields) # type:ignore
instance = super().__new__(cls, data) # type:ignore
return instance

Expand Down
11 changes: 9 additions & 2 deletions orso/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from dataclasses import field
from dataclasses import fields
from decimal import Decimal
from decimal import getcontext
from enum import Enum
from typing import Any
from typing import Callable
Expand All @@ -84,6 +85,7 @@
from orso.types import OrsoTypes

_MISSING_VALUE: str = str()
DECIMAL_PRECISION: int = getcontext().prec


class ColumnDisposition(Enum):
Expand Down Expand Up @@ -184,6 +186,9 @@ def __init__(self, **kwargs):
"Column type NUMERIC will be deprecated in a future version, use DECIMAL, DOUBLE or INTEGER instead. Mapped to DOUBLE, this may not be compatible with all values NUMERIC was compatible with."
)
self.type = OrsoTypes.DOUBLE
elif type_name == "BSON":
warn("Column type BSON will be deprecated in a future version, use JSONB instead.")
self.type = OrsoTypes.JSONB
elif type_name == "STRING":
raise ValueError(
f"Unknown column type '{self.type}' for column '{self.name}'. Did you mean 'VARCHAR'?"
Expand Down Expand Up @@ -284,12 +289,14 @@ def arrow_field(self):
OrsoTypes.TIME: pyarrow.time32("ms"),
OrsoTypes.INTERVAL: pyarrow.month_day_nano_interval(),
OrsoTypes.STRUCT: pyarrow.binary(), # convert structs to JSON strings/BSONs
OrsoTypes.DECIMAL: pyarrow.decimal128(self.precision or 28, self.scale or 10),
OrsoTypes.DECIMAL: pyarrow.decimal128(
self.precision or DECIMAL_PRECISION, self.scale or 10
),
OrsoTypes.DOUBLE: pyarrow.float64(),
OrsoTypes.INTEGER: pyarrow.int64(),
OrsoTypes.ARRAY: pyarrow.list_(pyarrow.string()),
OrsoTypes.VARCHAR: pyarrow.string(),
OrsoTypes.BSON: pyarrow.binary(),
OrsoTypes.JSONB: pyarrow.binary(),
OrsoTypes.NULL: pyarrow.null(),
}

Expand Down
8 changes: 2 additions & 6 deletions orso/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ def counter(func: Callable) -> Callable:
Callable: Wrapped function with counter and timer logic.
"""

import time

def report(self: Callable) -> str:
"""
Generate and return a summary report of execution statistics.
Expand Down Expand Up @@ -283,8 +281,6 @@ def throttle(calls_per_second: float) -> Callable:
Callable: Wrapped function with rate-limiting logic.
"""

import time

# Validate the input rate
if calls_per_second <= 0:
raise ValueError("@throttle requires calls_per_second to be greater than zero")
Expand Down Expand Up @@ -372,7 +368,7 @@ def wrapper(*args, **kwargs) -> Any:
stop_flag = False

# Use a thread to monitor the resource usage
def monitor():
def _monitor():
peak_cpu = 0
peak_memory = 0
cpu_tracker = []
Expand All @@ -397,7 +393,7 @@ def monitor():
print(f"Peak CPU usage: {peak_cpu:.2f}%")
print(f"Peak memory usage: {peak_memory/1024/1024:.2f} MB")

monitor_thread = threading.Thread(target=monitor)
monitor_thread = threading.Thread(target=_monitor)
monitor_thread.start()

try:
Expand Down
15 changes: 10 additions & 5 deletions orso/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import decimal
from enum import Enum
from typing import Any
from typing import Type

from orso.tools import parse_iso

Expand All @@ -36,7 +37,7 @@ class OrsoTypes(str, Enum):
TIME = "TIME"
VARCHAR = "VARCHAR"
NULL = "NULL"
BSON = "BSON"
JSONB = "JSONB"
_MISSING_TYPE = 0

def is_numeric(self):
Expand All @@ -52,14 +53,18 @@ def is_large_object(self):
return self in (self.VARCHAR, self.BLOB)

def is_complex(self):
return self in (self.ARRAY, self.STRUCT, self.BSON, self.INTERVAL)
return self in (self.ARRAY, self.STRUCT, self.JSONB, self.INTERVAL)

def __str__(self):
return self.value

def parse(self, value: Any) -> Any:
return ORSO_TO_PYTHON_PARSER[self.value](value)

@property
def python_type(self) -> Type:
return ORSO_TO_PYTHON_MAP.get(self)


ORSO_TO_PYTHON_MAP: dict = {
OrsoTypes.BOOLEAN: bool,
Expand All @@ -74,12 +79,12 @@ def parse(self, value: Any) -> Any:
OrsoTypes.INTEGER: int,
OrsoTypes.ARRAY: list,
OrsoTypes.VARCHAR: str,
OrsoTypes.BSON: bytes,
OrsoTypes.JSONB: bytes,
OrsoTypes.NULL: None,
}

PYTHON_TO_ORSO_MAP: dict = {
value: key for key, value in ORSO_TO_PYTHON_MAP.items() if key != OrsoTypes.BSON
value: key for key, value in ORSO_TO_PYTHON_MAP.items() if key != OrsoTypes.JSONB
}
PYTHON_TO_ORSO_MAP.update({tuple: OrsoTypes.ARRAY, set: OrsoTypes.ARRAY}) # map other python types

Expand All @@ -96,6 +101,6 @@ def parse(self, value: Any) -> Any:
OrsoTypes.INTEGER: int,
OrsoTypes.ARRAY: list,
OrsoTypes.VARCHAR: str,
OrsoTypes.BSON: bytes,
OrsoTypes.JSONB: bytes,
OrsoTypes.NULL: lambda x: None,
}
2 changes: 1 addition & 1 deletion orso/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__: str = "0.0.169"
__version__: str = "0.0.170"
__author__: str = "@joocer"
Loading

0 comments on commit 46a84c9

Please sign in to comment.