Skip to content

Commit

Permalink
views
Browse files Browse the repository at this point in the history
  • Loading branch information
joocer committed Jul 30, 2024
1 parent 1ff9c0e commit d8c64af
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 28 deletions.
2 changes: 2 additions & 0 deletions tarchia/api/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .relation_management import router as relation_router
from .search import router as search_router
from .table_management import router as table_router
from .view_management import router as view_router

v1_router = APIRouter(prefix="/v1")
v1_router.include_router(branch_router, tags=["Branch Management"])
Expand All @@ -18,3 +19,4 @@
v1_router.include_router(search_router, tags=["Search"])
v1_router.include_router(relation_router, tags=["Relation Management"])
v1_router.include_router(table_router, tags=["Table Management"])
v1_router.include_router(view_router, tags=["View Management"])
13 changes: 10 additions & 3 deletions tarchia/api/v1/table_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ async def list_tables(
"last_updated_ms",
"steward",
"metadata",
"created_at",
"disposition",
}
}
current_commit_sha = table.get("current_commit_sha")
Expand Down Expand Up @@ -97,14 +99,19 @@ async def create_table(
base_url = get_base_url(request=request)
timestamp = int(time.time_ns() / 1e6)

# can we find the owner?
owner_entry = identify_owner(name=owner)

catalog_entry = catalog_provider.get_view(owner=owner, view=table_definition.name)
if catalog_entry:
# return a 409
raise AlreadyExistsError(entity=table_definition.name)
# check if we have a table with that name already
catalog_entry = catalog_provider.get_table(owner=owner, table=table_definition.name)
if catalog_entry:
# return a 409
raise AlreadyExistsError(entity=table_definition.name)

# can we find the owner?
owner_entry = identify_owner(name=owner)
table_id = generate_uuid()

commit_root = build_root(COMMITS_ROOT, owner=owner, table_id=table_id)
Expand Down Expand Up @@ -167,7 +174,7 @@ async def create_table(
{
"event": "TABLE_CREATED",
"table": f"{owner}.{table_definition.name}",
"url": f"{base_url}/v1/tables/{owner}/{table_definition}",
"url": f"{base_url}/v1/tables/{owner}/{table_definition.name}",
},
)

Expand Down
116 changes: 110 additions & 6 deletions tarchia/api/v1/view_management.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,146 @@
import time

from fastapi import APIRouter
from fastapi import Path
from fastapi import Request
from fastapi.responses import ORJSONResponse

from tarchia.interfaces.catalog import catalog_factory
from tarchia.models import CreateViewRequest
from tarchia.models import ViewCatalogEntry
from tarchia.utils import get_base_url
from tarchia.utils.constants import IDENTIFIER_REG_EX

router = APIRouter()
catalog_provider = catalog_factory()


@router.get("/views/{owner}", response_class=ORJSONResponse)
async def list_views(
request: Request,
owner: str = Path(description="The owner of the view.", pattern=IDENTIFIER_REG_EX),
):
raise NotImplementedError("Not Implemented")
base_url = get_base_url(request=request)

view_list = []

views = catalog_provider.list_views(owner)
for view in views:
# filter down the items we return
view = {
k: v
for k, v in view.items()
if k
in {
"view_id",
"name",
"description",
"statement",
"owner",
"last_updated_ms",
"metadata",
"created_at",
}
}
view_name = view.get("name")

view["view_url"] = f"{base_url}/v1/views/{owner}/{view_name}"
view_list.append(view)
return view_list


@router.post("/views/{owner}", response_class=ORJSONResponse)
async def create_view(
request: Request,
view_definition: CreateViewRequest,
owner: str = Path(description="The owner of the view.", pattern=IDENTIFIER_REG_EX),
):
raise NotImplementedError("Not Implemented")
from tarchia.exceptions import AlreadyExistsError
from tarchia.utils import generate_uuid
from tarchia.utils.catalogs import identify_owner

# can we find the owner?
owner_entry = identify_owner(name=owner)

base_url = get_base_url(request=request)
timestamp = int(time.time_ns() / 1e6)

# check if we have a table with that name already
table_exists = catalog_provider.get_table(owner=owner, table=view_definition.name)
if table_exists:
# return a 409
raise AlreadyExistsError(entity=view_definition.name)

catalog_entry = catalog_provider.get_view(owner=owner, view=view_definition.name)
if catalog_entry:
# return a 409
raise AlreadyExistsError(entity=view_definition.name)

view_id = generate_uuid()

# We create tables without any data
new_view = ViewCatalogEntry(
name=view_definition.name,
statement=view_definition.statement,
owner=owner,
view_id=view_id,
format_version=1,
description=view_definition.description,
metadata=view_definition.metadata,
last_updated_ms=timestamp,
)
# Save the table to the Catalog - do this last
catalog_provider.update_view(table_id=new_view.table_id, entry=new_view)

# trigger webhooks - this should be async so we don't wait for the outcome
owner_entry.trigger_event(
owner_entry.EventTypes.VIEW_CREATED,
{
"event": "VIEW_CREATED",
"view": f"{owner}.{view_definition.name}",
"url": f"{base_url}/v1/views/{owner}/{view_definition.name}",
},
)

return {
"message": "Table Created",
"table": f"{owner}.{table_definition.name}",
}


@router.get("/views/{owner}/{view}", response_class=ORJSONResponse)
async def get_view(
request: Request,
owner: str = Path(description="The owner of the view.", pattern=IDENTIFIER_REG_EX),
view: str = Path(description="The view.", pattern=IDENTIFIER_REG_EX),
):
raise NotImplementedError("Not Implemented")
from tarchia.utils.catalogs import identify_view

catalog_entry = identify_view(owner, view)

return catalog_entry.as_dict()


@router.delete("/views/{owner}/{view}", response_class=ORJSONResponse)
async def delete_view(
request: Request,
owner: str = Path(description="The owner of the view.", pattern=IDENTIFIER_REG_EX),
view: str = Path(description="The view.", pattern=IDENTIFIER_REG_EX),
):
raise NotImplementedError("Not Implemented")
from tarchia.utils.catalogs import identify_owner
from tarchia.utils.catalogs import identify_view

owner_entry = identify_owner(name=owner)
catalog_entry = identify_view(owner=owner, view=view)

view_id = catalog_entry.view_id
catalog_provider.delete_view(view_id)

# trigger webhooks - this should be async so we don't wait for the outcome
owner_entry.trigger_event(
owner_entry.EventTypes.VIEW_DELETED,
{"event": "VIEW_DELETED", "view": f"{owner}.{view}"},
)

return {
"message": "View Deleted",
"view": f"{owner}.{view}",
}
17 changes: 9 additions & 8 deletions tarchia/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ def __init__(self, owner: str, table: str):
super().__init__(message)


class ViewNotFoundError(NotFoundError): # pragma: no cover
def __init__(self, owner: str, view: str):
self.owner = owner
self.view = view

message = f"View with reference {owner}.{view} could not be found."
super().__init__(message)


class OwnerNotFoundError(NotFoundError): # pragma: no cover
def __init__(self, owner: str):
self.owner = owner
Expand All @@ -90,14 +99,6 @@ class UnableToReadBlobError(Exception):
"""Can't find a blob when trying to add to manifest"""


class AmbiguousTableError(Exception): # pragma: no cover
def __init__(self, table: str):
self.table = table

message = f"Multiple tables with reference {table} were found."
super().__init__(message)


class AlreadyExistsError(Exception): # pragma: no cover
def __init__(self, entity: str):
self.entity = entity
Expand Down
40 changes: 36 additions & 4 deletions tarchia/interfaces/catalog/dev_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
development, prototyping and for regression testing.
"""

from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from tarchia.exceptions import AmbiguousTableError
from tarchia.interfaces.catalog.provider_base import CatalogProvider
from tarchia.models import OwnerEntry
from tarchia.models import TableCatalogEntry
Expand Down Expand Up @@ -42,8 +43,6 @@ def get_table(self, owner: str, table: str) -> Optional[dict]:
"""

result = self.store.find("tables", {"owner": owner, "name": table})
if len(result) > 1:
raise AmbiguousTableError(owner=owner, table=table)
return result[0] if result else None

def update_table(self, table_id: str, entry: TableCatalogEntry) -> None:
Expand All @@ -57,7 +56,7 @@ def update_table(self, table_id: str, entry: TableCatalogEntry) -> None:

self.store.upsert("tables", entry.as_dict(), {"table_id": table_id})

def list_tables(self, owner: str) -> List[dict]:
def list_tables(self, owner: str) -> List[Dict[str, Any]]:
"""
List all tables in the catalog along with their basic metadata.
Expand Down Expand Up @@ -85,3 +84,36 @@ def update_owner(self, entry: OwnerEntry) -> None:

def delete_owner(self, owner_id: str) -> None:
self.store.delete("owners", {"owner_id": owner_id})

def list_views(self, owner: str) -> List[Dict[str, Any]]:
"""
List all views in the catalog along with their basic metadata.
Returns:
List[Dict[str, Any]]: A list of dictionaries, each representing the metadata of a table.
"""
result = self.store.find("views", {"owner": owner})
return result

def get_view(self, owner: str, view: str) -> Optional[Dict[str, Any]]:
"""
Retrieve metadata for a specified view.
Parameters:
table_id (str): The identifier of the table.
Returns:
Dict[str, Any]: A dictionary containing the metadata of the table.
"""

result = self.store.find("views", {"owner": owner, "name": view})
return result[0] if result else None

def delete_view(self, view_id: str) -> None:
"""
Delete metadata for a specified table.
Parameters:
table_id (str): The identifier of the table to be deleted.
"""
self.store.delete("views", {"view_id": view_id})
Loading

0 comments on commit d8c64af

Please sign in to comment.