Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event ingestion #4923

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions server/migrations/versions/2025-01-29-1400_add_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Add Event

Revision ID: 8cbe62c5645b
Revises: d25d25af882a
Create Date: 2025-01-29 14:00:06.232915

"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql

# Polar Custom Imports

# revision identifiers, used by Alembic.
revision = "8cbe62c5645b"
down_revision = "d25d25af882a"
branch_labels: tuple[str] | None = None
depends_on: tuple[str] | None = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"events",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("timestamp", sa.TIMESTAMP(timezone=True), nullable=False),
sa.Column("name", sa.String(length=128), nullable=False),
sa.Column("source", sa.String(), nullable=False),
sa.Column("customer_id", sa.Uuid(), nullable=True),
sa.Column("external_customer_id", sa.String(), nullable=True),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column(
"user_metadata", postgresql.JSONB(astext_type=sa.Text()), nullable=False
),
sa.ForeignKeyConstraint(
["customer_id"], ["customers.id"], name=op.f("events_customer_id_fkey")
),
sa.ForeignKeyConstraint(
["organization_id"],
["organizations.id"],
name=op.f("events_organization_id_fkey"),
ondelete="cascade",
),
sa.PrimaryKeyConstraint("id", name=op.f("events_pkey")),
)
op.create_index(
op.f("ix_events_customer_id"), "events", ["customer_id"], unique=False
)
op.create_index(
op.f("ix_events_external_customer_id"),
"events",
["external_customer_id"],
unique=False,
)
op.create_index(op.f("ix_events_name"), "events", ["name"], unique=False)
op.create_index(op.f("ix_events_source"), "events", ["source"], unique=False)
op.create_index(op.f("ix_events_timestamp"), "events", ["timestamp"], unique=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_events_timestamp"), table_name="events")
op.drop_index(op.f("ix_events_source"), table_name="events")
op.drop_index(op.f("ix_events_name"), table_name="events")
op.drop_index(op.f("ix_events_external_customer_id"), table_name="events")
op.drop_index(op.f("ix_events_customer_id"), table_name="events")
op.drop_table("events")
# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions server/polar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from polar.discount.endpoints import router as discount_router
from polar.email_update.endpoints import router as email_update_router
from polar.embed.endpoints import router as embed_router
from polar.event.endpoints import router as event_router
from polar.eventstream.endpoints import router as stream_router
from polar.external_organization.endpoints import router as external_organization_router
from polar.file.endpoints import router as files_router
Expand Down Expand Up @@ -144,3 +145,5 @@
router.include_router(customer_session_router)
# /integrations/plain
router.include_router(plain_router)
# /events
router.include_router(event_router)
3 changes: 3 additions & 0 deletions server/polar/auth/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Scope(StrEnum):
benefits_read = "benefits:read"
benefits_write = "benefits:write"

events_read = "events:read"
events_write = "events:write"

files_read = "files:read"
files_write = "files:write"

Expand Down
Empty file added server/polar/event/__init__.py
Empty file.
24 changes: 24 additions & 0 deletions server/polar/event/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Annotated

from fastapi import Depends

from polar.auth.dependencies import Authenticator
from polar.auth.models import AuthSubject, User
from polar.auth.scope import Scope
from polar.models.organization import Organization

_EventRead = Authenticator(
required_scopes={
Scope.web_default,
Scope.events_read,
Scope.events_write,
},
allowed_subjects={User, Organization},
)
EventRead = Annotated[AuthSubject[User | Organization], Depends(_EventRead)]

_EventWrite = Authenticator(
required_scopes={Scope.web_default, Scope.events_write},
allowed_subjects={User, Organization},
)
EventWrite = Annotated[AuthSubject[User | Organization], Depends(_EventWrite)]
114 changes: 114 additions & 0 deletions server/polar/event/endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from fastapi import Depends, Query
from pydantic import AwareDatetime

from polar.customer.schemas import CustomerID
from polar.exceptions import ResourceNotFound
from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema
from polar.kit.pagination import ListResource, PaginationParamsQuery
from polar.kit.schemas import MultipleQueryFilter
from polar.models import Event
from polar.models.event import EventSource
from polar.openapi import APITag
from polar.organization.schemas import OrganizationID
from polar.postgres import AsyncSession, get_db_session
from polar.routing import APIRouter

from . import auth, sorting
from .schemas import Event as EventSchema
from .schemas import EventID, EventsIngest, EventsIngestResponse
from .service import event as event_service

router = APIRouter(
prefix="/events",
tags=[
"events",
APITag.private,
# APITag.documented, APITag.featured, # Make it private for now, not ready for the show yet
],
)


EventNotFound = {"description": "Event not found.", "model": ResourceNotFound.schema()}


@router.get(
"/",
summary="List Events",
response_model=ListResource[EventSchema],
openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
)
async def list(
auth_subject: auth.EventRead,
pagination: PaginationParamsQuery,
sorting: sorting.ListSorting,
metadata: MetadataQuery,
before: AwareDatetime | None = Query(
None, description="Filter events before this timestamp."
),
after: AwareDatetime | None = Query(
None, description="Filter events after this timestamp."
),
organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
None, title="OrganizationID Filter", description="Filter by organization ID."
),
customer_id: MultipleQueryFilter[CustomerID] | None = Query(
None, title="CustomerID Filter", description="Filter by customer ID."
),
external_customer_id: MultipleQueryFilter[str] | None = Query(
None,
title="ExternalCustomerID Filter",
description="Filter by external customer ID.",
),
source: MultipleQueryFilter[EventSource] | None = Query(
None, title="Source Filter", description="Filter by event source."
),
session: AsyncSession = Depends(get_db_session),
) -> ListResource[EventSchema]:
"""List events."""
results, count = await event_service.list(
session,
auth_subject,
before=before,
after=after,
organization_id=organization_id,
customer_id=customer_id,
external_customer_id=external_customer_id,
source=source,
metadata=metadata,
pagination=pagination,
sorting=sorting,
)

return ListResource.from_paginated_results(
[EventSchema.model_validate(result) for result in results], count, pagination
)


@router.get(
"/{id}",
summary="Get Event",
response_model=EventSchema,
responses={404: EventNotFound},
)
async def get(
id: EventID,
auth_subject: auth.EventRead,
session: AsyncSession = Depends(get_db_session),
) -> Event:
"""Get an event by ID."""
event = await event_service.get(session, auth_subject, id)

if event is None:
raise ResourceNotFound()

return event


@router.post("/ingest", summary="Ingest Events")
async def ingest(
ingest: EventsIngest,
auth_subject: auth.EventWrite,
session: AsyncSession = Depends(get_db_session),
) -> EventsIngestResponse:
"""Ingest batch of events."""
return await event_service.ingest(session, auth_subject, ingest)
45 changes: 45 additions & 0 deletions server/polar/event/repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from collections.abc import Sequence
from typing import Any
from uuid import UUID

from sqlalchemy import Select, insert, select

from polar.auth.models import AuthSubject, Organization, User, is_organization, is_user
from polar.kit.repository import RepositoryBase, RepositoryIDMixin
from polar.models import Event, UserOrganization


class EventRepository(RepositoryBase[Event], RepositoryIDMixin[Event, UUID]):
model = Event

async def get_all_by_organization(self, organization_id: UUID) -> Sequence[Event]:
statement = self.get_base_statement().where(
Event.organization_id == organization_id
)
return await self.get_all(statement)

async def insert_batch(self, events: Sequence[dict[str, Any]]) -> None:
statement = insert(Event)
await self.session.execute(statement, events)

def get_readable_statement(
self, auth_subject: AuthSubject[User | Organization]
) -> Select[tuple[Event]]:
statement = self.get_base_statement()

if is_user(auth_subject):
user = auth_subject.subject
statement = statement.where(
Event.organization_id.in_(
select(UserOrganization.organization_id).where(
UserOrganization.user_id == user.id,
UserOrganization.deleted_at.is_(None),
)
)
)
elif is_organization(auth_subject):
statement = statement.where(
Event.organization_id == auth_subject.subject.id
)

return statement
94 changes: 94 additions & 0 deletions server/polar/event/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from datetime import UTC, datetime
from typing import Annotated

from fastapi import Path
from pydantic import UUID4, AfterValidator, AwareDatetime, Field

from polar.kit.metadata import MetadataInputMixin, MetadataOutputMixin
from polar.kit.schemas import IDSchema, Schema
from polar.models.event import EventSource
from polar.organization.schemas import OrganizationID


def default_timestamp_factory() -> datetime:
return datetime.now(UTC)


def is_past_timestamp(timestamp: datetime) -> datetime:
# Convert to UTC
timestamp = timestamp.astimezone(UTC)
if timestamp > datetime.now(UTC):
raise ValueError("Timestamp must be in the past.")
return timestamp


class EventCreateBase(Schema, MetadataInputMixin):
timestamp: Annotated[
AwareDatetime,
AfterValidator(is_past_timestamp),
] = Field(
default_factory=default_timestamp_factory,
description="The timestamp of the event.",
)
name: str = Field(..., description="The name of the event.")
organization_id: OrganizationID | None = Field(
default=None,
description=(
"The ID of the organization owning the event. "
"**Required unless you use an organization token.**"
),
)


class EventCreateCustomer(EventCreateBase):
customer_id: UUID4 = Field(
description=(
"ID of the customer in your Polar organization "
"associated with the event."
)
)


class EventCreateExternalCustomer(EventCreateBase):
external_customer_id: str = Field(
description="ID of the customer in your system associated with the event."
)


EventCreate = EventCreateCustomer | EventCreateExternalCustomer


class EventsIngest(Schema):
events: list[EventCreate] = Field(description="List of events to ingest.")


class EventsIngestResponse(Schema):
inserted: int = Field(description="Number of events inserted.")


class Event(IDSchema, MetadataOutputMixin):
timestamp: datetime = Field(description="The timestamp of the event.")
name: str = Field(..., description="The name of the event.")
source: EventSource = Field(
...,
description=(
"The source of the event. "
"`system` events are created by Polar. "
"`user` events are the one you create through our ingestion API."
),
)
organization_id: OrganizationID = Field(
description="The ID of the organization owning the event."
)
customer_id: UUID4 | None = Field(
description=(
"ID of the customer in your Polar organization "
"associated with the event."
)
)
external_customer_id: str | None = Field(
description="ID of the customer in your system associated with the event."
)


EventID = Annotated[UUID4, Path(description="The event ID.")]
Loading