Skip to content

Commit

Permalink
Add SQL Adapter
Browse files Browse the repository at this point in the history
preliminary start of sql adapter. to be continued ...

hashed table names. to be continued...

modified hashing and added a test for sqlite database. to be continued

try TILED_TEST_POSTGRESQL_URI usage

fix postgreql uri

Automatically set SQL driver if unset.

Do not require env var to be set.

Consistently use database URI with schema.

Refactor init_storage interface for SQL.

More adapters updated

More adapters updated

Parse uri earlier.

Use dataclass version of DataSource.

Begin to update SQLAdapter.

Fix import

Typesafe accessor for Storage

few changes

Basic write and append works

Do not preserve index.

changes in test_sql.py

latest changes

tried to fix the tests

removed prints

Remove vestigial comment.

Extract str path from sqlite URI

Use unique temp dir and clean it up.

some more fixing and addition of partitions

fixing docstrings

CLI works with SQL writing

Tests pass again

Add convenience method write_appendable_dataframe.

Fix typo

Fix path handling for Windows

The dataset_id concept is mostly implemented

Fix conditional

Support appendable tables with --temp catalog

Revert order swap (for now)
  • Loading branch information
Seher Karakuzu authored and danielballan committed Jan 28, 2025
1 parent 9dc91de commit 796f001
Show file tree
Hide file tree
Showing 31 changed files with 902 additions and 122 deletions.
Empty file added main.py
Empty file.
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ tiled = "tiled.commandline.main:main"

# This is the union of all optional dependencies.
all = [
"adbc_driver_manager",
"adbc_driver_sqlite",
"adbc_driver_postgresql",
"aiofiles",
"aiosqlite",
"alembic",
Expand Down Expand Up @@ -196,6 +199,9 @@ minimal-server = [
]
# This is the "kichen sink" fully-featured server dependency set.
server = [
"adbc_driver_manager",
"adbc_driver_sqlite",
"adbc_driver_postgresql",
"aiofiles",
"aiosqlite",
"alembic",
Expand Down
26 changes: 23 additions & 3 deletions tiled/_tests/adapters/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import pytest

from tiled.adapters.arrow import ArrowAdapter
from tiled.structures.core import StructureFamily
from tiled.structures.data_source import DataSource, Management, Storage
from tiled.structures.table import TableStructure

names = ["f0", "f1", "f2"]
Expand All @@ -26,11 +28,29 @@


@pytest.fixture
def adapter() -> ArrowAdapter:
def data_source_from_init_storage() -> DataSource[TableStructure]:
table = pa.Table.from_arrays(data0, names)
structure = TableStructure.from_arrow_table(table, npartitions=3)
assets = ArrowAdapter.init_storage(data_uri, structure=structure)
return ArrowAdapter([asset.data_uri for asset in assets], structure=structure)
data_source = DataSource(
management=Management.writable,
mimetype="application/vnd.apache.arrow.file",
structure_family=StructureFamily.table,
structure=structure,
assets=[],
)
storage = Storage(filesystem=data_uri, sql=None)
return ArrowAdapter.init_storage(
data_source=data_source, storage=storage, path_parts=[]
)


@pytest.fixture
def adapter(data_source_from_init_storage: DataSource[TableStructure]) -> ArrowAdapter:
data_source = data_source_from_init_storage
return ArrowAdapter(
[asset.data_uri for asset in data_source.assets],
data_source.structure,
)


def test_attributes(adapter: ArrowAdapter) -> None:
Expand Down
177 changes: 177 additions & 0 deletions tiled/_tests/adapters/test_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import os
import tempfile
from typing import Generator

import adbc_driver_sqlite
import pyarrow as pa
import pytest

from tiled.adapters.sql import SQLAdapter
from tiled.structures.core import StructureFamily
from tiled.structures.data_source import DataSource, Management, Storage
from tiled.structures.table import TableStructure

names = ["f0", "f1", "f2"]
data0 = [
pa.array([1, 2, 3, 4, 5]),
pa.array(["foo0", "bar0", "baz0", None, "goo0"]),
pa.array([True, None, False, True, None]),
]
data1 = [
pa.array([6, 7, 8, 9, 10, 11, 12]),
pa.array(["foo1", "bar1", None, "baz1", "biz", None, "goo"]),
pa.array([None, True, True, False, False, None, True]),
]
data2 = [pa.array([13, 14]), pa.array(["foo2", "baz2"]), pa.array([False, None])]

batch0 = pa.record_batch(data0, names=names)
batch1 = pa.record_batch(data1, names=names)
batch2 = pa.record_batch(data2, names=names)


@pytest.fixture
def data_source_from_init_storage() -> DataSource[TableStructure]:
table = pa.Table.from_arrays(data0, names)
structure = TableStructure.from_arrow_table(table, npartitions=1)
data_source = DataSource(
management=Management.writable,
mimetype="application/x-tiled-sql-table",
structure_family=StructureFamily.table,
structure=structure,
assets=[],
)
storage = Storage(filesystem=None, sql="sqlite:////tmp/test.sqlite")
return SQLAdapter.init_storage(
data_source=data_source, storage=storage, path_parts=[]
)


@pytest.fixture
def adapter_sql(
data_source_from_init_storage: DataSource[TableStructure],
) -> Generator[SQLAdapter, None, None]:
with tempfile.TemporaryDirectory() as td:
data_uri = f"sqlite:///{td}/test.db"
data_source = data_source_from_init_storage
yield SQLAdapter(
data_uri,
data_source.structure,
data_source.parameters["table_name"],
data_source.parameters["dataset_id"],
)


def test_attributes(adapter_sql: SQLAdapter) -> None:
assert adapter_sql.structure().columns == names
assert adapter_sql.structure().npartitions == 1
assert isinstance(adapter_sql.conn, adbc_driver_sqlite.dbapi.AdbcSqliteConnection)


def test_write_read_sql(adapter_sql: SQLAdapter) -> None:
# test writing and reading it
adapter_sql.write(batch0)
result = adapter_sql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")

assert pa.Table.from_arrays(data0, names) == pa.Table.from_pandas(result)

adapter_sql.write([batch0, batch1])
result = adapter_sql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")
assert pa.Table.from_batches([batch0, batch1]) == pa.Table.from_pandas(result)

adapter_sql.write([batch0, batch1, batch2])
result = adapter_sql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")
assert pa.Table.from_batches([batch0, batch1, batch2]) == pa.Table.from_pandas(
result
)

# test write , append and read all
adapter_sql.write([batch0, batch1, batch2])
adapter_sql.append([batch2, batch0, batch1])
adapter_sql.append([batch1, batch2, batch0])
result = adapter_sql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")

assert pa.Table.from_batches(
[batch0, batch1, batch2, batch2, batch0, batch1, batch1, batch2, batch0]
) == pa.Table.from_pandas(result)


@pytest.fixture
def postgres_uri() -> str:
uri = os.getenv("TILED_TEST_POSTGRESQL_URI")
if uri is not None:
return uri
pytest.skip("TILED_TEST_POSTGRESQL_URI is not set")
return ""


@pytest.fixture
def adapter_psql(
data_source_from_init_storage: DataSource[TableStructure], postgres_uri: str
) -> SQLAdapter:
data_source = data_source_from_init_storage
return SQLAdapter(
postgres_uri,
data_source.structure,
data_source.parameters["table_name"],
data_source.parameters["dataset_id"],
)


def test_psql(postgres_uri: str, adapter_psql: SQLAdapter) -> None:
assert adapter_psql.structure().columns == names
assert adapter_psql.structure().npartitions == 1
# assert isinstance(
# adapter_psql.conn, adbc_driver_postgresql.dbapi.AdbcSqliteConnection
# )


def test_write_read_psql(adapter_psql: SQLAdapter) -> None:
# test writing and reading it
adapter_psql.write(batch0)
result = adapter_psql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")

assert pa.Table.from_arrays(data0, names) == pa.Table.from_pandas(result)

adapter_psql.write([batch0, batch1])
result = adapter_psql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")
assert pa.Table.from_batches([batch0, batch1]) == pa.Table.from_pandas(result)

adapter_psql.write([batch0, batch1, batch2])
result = adapter_psql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")
assert pa.Table.from_batches([batch0, batch1, batch2]) == pa.Table.from_pandas(
result
)

# test write , append and read all
adapter_psql.write([batch0, batch1, batch2])
adapter_psql.append([batch2, batch0, batch1])
adapter_psql.append([batch1, batch2, batch0])
result = adapter_psql.read()
# the pandas dataframe gives the last column of the data as 0 and 1 since SQL does not save boolean
# so we explicitely convert the last column to boolean for testing purposes
result["f2"] = result["f2"].astype("boolean")

assert pa.Table.from_batches(
[batch0, batch1, batch2, batch2, batch0, batch1, batch1, batch2, batch0]
) == pa.Table.from_pandas(result)
10 changes: 5 additions & 5 deletions tiled/_tests/test_access_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def context(tmpdir_module):
{"tree": f"{__name__}:tree_b", "path": "/b", "access_policy": None},
{
"tree": "tiled.catalog:in_memory",
"args": {"writable_storage": tmpdir_module / "c"},
"args": {"writable_storage": str(tmpdir_module / "c")},
"path": "/c",
"access_control": {
"access_policy": "tiled.access_policies:SimpleAccessPolicy",
Expand All @@ -145,7 +145,7 @@ def context(tmpdir_module):
},
{
"tree": "tiled.catalog:in_memory",
"args": {"writable_storage": tmpdir_module / "d"},
"args": {"writable_storage": str(tmpdir_module / "d")},
"path": "/d",
"access_control": {
"access_policy": "tiled.access_policies:SimpleAccessPolicy",
Expand All @@ -162,7 +162,7 @@ def context(tmpdir_module):
},
{
"tree": "tiled.catalog:in_memory",
"args": {"writable_storage": tmpdir_module / "e"},
"args": {"writable_storage": str(tmpdir_module / "e")},
"path": "/e",
"access_control": {
"access_policy": "tiled.access_policies:SimpleAccessPolicy",
Expand All @@ -185,7 +185,7 @@ def context(tmpdir_module):
{"tree": ArrayAdapter.from_array(arr), "path": "/f"},
{
"tree": "tiled.catalog:in_memory",
"args": {"writable_storage": tmpdir_module / "g"},
"args": {"writable_storage": str(tmpdir_module / "g")},
"path": "/g",
"access_control": {
"access_policy": "tiled.access_policies:SimpleAccessPolicy",
Expand All @@ -200,7 +200,7 @@ def context(tmpdir_module):
},
{
"tree": "tiled.catalog:in_memory",
"args": {"writable_storage": tmpdir_module / "h"},
"args": {"writable_storage": str(tmpdir_module / "h")},
"path": "/h",
"access_control": {
"access_policy": "tiled._tests.test_access_control:EntryBasedAccessPolicy",
Expand Down
2 changes: 1 addition & 1 deletion tiled/_tests/test_awkward.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

@pytest.fixture
def catalog(tmpdir):
catalog = in_memory(writable_storage=tmpdir)
catalog = in_memory(writable_storage=str(tmpdir))
yield catalog


Expand Down
6 changes: 3 additions & 3 deletions tiled/_tests/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ async def test_write_dataframe_external_direct(a, tmpdir):
async def test_write_array_internal_direct(a, tmpdir):
arr = numpy.ones((5, 3))
ad = ArrayAdapter.from_array(arr)
structure = asdict(ad.structure())
structure = ad.structure()
await a.create_node(
key="x",
structure_family="array",
Expand Down Expand Up @@ -328,7 +328,7 @@ async def test_delete_tree(tmpdir):
# Do not use client fixture here.
# The Context must be opened inside the test or we run into
# event loop crossing issues with the Postgres test.
tree = in_memory(writable_storage=tmpdir)
tree = in_memory(writable_storage=str(tmpdir))
with Context.from_app(build_app(tree)) as context:
client = from_context(context)

Expand Down Expand Up @@ -523,7 +523,7 @@ async def test_constraints_on_parameter_and_num(a, assets):
DataSource(
structure_family=arr_adapter.structure_family,
mimetype="text/csv",
structure=asdict(arr_adapter.structure()),
structure=arr_adapter.structure(),
parameters={},
management=Management.external,
assets=assets,
Expand Down
Loading

0 comments on commit 796f001

Please sign in to comment.