Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote launch #79

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ NATS_SERVER_URL=nats://nats1:4222
NATS_SERVER_URL_IN_CONTAINER=nats://host.containers.internal:4222

# Generate this using ./nats-conf/generate-auth-nkey.sh
NKEYS_SEED_FILE=/path/to/app_user.nk
NKEYS_SEED_FILE=nats-conf/out_nkey/app_user.nk


SENTRY_DSN=
Expand All @@ -58,5 +58,5 @@ DOCKER_IMAGE_OPERATOR=operator
DOCKER_IMAGE_CALLOUT=callout

# For operators
GITHUB_USERNAME=
GITHUB_PASSWORD=
GITHUB_USERNAME=changeme
GITHUB_TOKEN=changeme
13 changes: 11 additions & 2 deletions .ruff.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
target-version = "py310"
exclude = ["backend/app/interactem/app/alembic/**", "backend/agent/thirdparty/**", "conftest.py"]
exclude = [
"backend/app/interactem/app/alembic/**",
"backend/agent/thirdparty/**",
"conftest.py",
"operators/**",
"tests/**"
]

[lint]
exclude = ["**/__init__.py"]
Expand All @@ -21,4 +27,7 @@ ignore = [

[lint.pyupgrade]
# Preserve types, even if a file imports `from __future__ import annotations`.
keep-runtime-typing = true
keep-runtime-typing = true

[lint.isort]
known-first-party = ["interactem"]
6 changes: 1 addition & 5 deletions backend/agent/interactem/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
create_agent_parameter_consumer,
)
from interactem.core.pipeline import Pipeline
from interactem.core.util import create_task_with_ref

from .config import cfg

Expand Down Expand Up @@ -686,8 +687,3 @@ async def handle_name_conflict(client: PodmanClient, container_name: str) -> Non
logger.info(f"Conflicting container {conflicting_container.id} removed. ")


def create_task_with_ref(task_refs: set[asyncio.Task], coro: Coroutine) -> asyncio.Task:
task = asyncio.create_task(coro)
task_refs.add(task)
task.add_done_callback(task_refs.discard) # Clean up after completion
return task
2 changes: 1 addition & 1 deletion backend/agent/interactem/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
LOCAL: bool = True
LOCAL: bool = False
DOCKER_COMPATIBILITY_MODE: bool = False
PODMAN_SERVICE_URI: str | None = None
NATS_SERVER_URL: NatsDsn = Field(default="nats://localhost:4222")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

from interactem.agent.agent import Agent

if __name__ == "__main__":

async def main():
agent = Agent()
try:
asyncio.run(agent.run())
await agent.run()
except KeyboardInterrupt:
pass
finally:
print("Application terminated.")


def entrypoint():
asyncio.run(main())
3 changes: 2 additions & 1 deletion backend/agent/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 14 additions & 4 deletions backend/agent/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
[tool.poetry]
[project]
name = "interactem-agent"
version = "0.1.0"
description = ""
authors = ["Sam Welborn <[email protected]>"]
dynamic = [ "version", "dependencies"]
description = "Agent for interactem"
readme = "README.md"
authors = [
{name = "Sam Welborn", email = "[email protected]"},
{name = "Chris Harris", email = "[email protected]"}
]
requires-python = ">=3.10"

[project.scripts]
interactem-agent = "interactem.agent.entrypoint:entrypoint"

[tool.poetry]
version = "0.1.0"
packages = [{ include = "interactem" }]

[tool.poetry.dependencies]
Expand Down
2 changes: 2 additions & 0 deletions backend/app/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# TODO: this is deprecated. We should use the core base image...
FROM tiangolo/uvicorn-gunicorn-fastapi:python3.10

WORKDIR /app/
Expand All @@ -15,6 +16,7 @@ COPY ./app/pyproject.toml ./app/poetry.lock* /app/
ARG INSTALL_DEV=false

COPY ./core/ /core/
COPY ./sfapi_models/ /sfapi_models/

RUN bash -c "if [ $INSTALL_DEV == 'true' ] ; then poetry install --no-root ; else poetry install --no-root --only main ; fi"

Expand Down
3 changes: 2 additions & 1 deletion backend/app/interactem/app/api/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from fastapi import APIRouter

from interactem.app.api.routes import login, operators, pipelines, users, utils
from interactem.app.api.routes import agents, login, operators, pipelines, users, utils

api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(utils.router, prefix="/utils", tags=["utils"])
api_router.include_router(pipelines.router, prefix="/pipelines", tags=["pipelines"])
api_router.include_router(operators.router, prefix="/operators", tags=["operators"])
api_router.include_router(agents.router, prefix="/agents", tags=["agents"])
17 changes: 17 additions & 0 deletions backend/app/interactem/app/api/routes/agents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

from fastapi import APIRouter

from interactem.app.api.deps import CurrentUser
from interactem.app.events.producer import publish_sfapi_submit_event
from interactem.core.logger import get_logger
from interactem.sfapi_models import AgentCreateEvent

logger = get_logger()
router = APIRouter()

@router.post("/launch")
async def launch_agent(current_user: CurrentUser, agent_req: AgentCreateEvent) -> None:
"""
Launch an agent remotely.
"""
await publish_sfapi_submit_event(agent_req)
17 changes: 16 additions & 1 deletion backend/app/interactem/app/api/routes/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def read_pipeline(session: SessionDep, current_user: CurrentUser, id: uuid.UUID)
@router.post("/", response_model=PipelinePublic)
def create_pipeline(
*, session: SessionDep, current_user: CurrentUser, pipeline_in: PipelineCreate
) -> Any:
) -> PipelinePublic:
"""
Create new pipeline.
"""
Expand All @@ -78,9 +78,24 @@ def create_pipeline(
session.add(pipeline)
session.commit()
session.refresh(pipeline)
pipeline = PipelinePublic.model_validate(pipeline)
return pipeline


@router.post("/run", response_model=PipelinePublic)
async def create_and_run_pipeline(
*, session: SessionDep, current_user: CurrentUser, pipeline_in: PipelineCreate
) -> PipelinePublic:
"""
Create new pipeline and run it.
"""
pipeline = create_pipeline(
session=session, current_user=current_user, pipeline_in=pipeline_in
)

return await run_pipeline(session, current_user, pipeline.id)


@router.delete("/{id}")
def delete_pipeline(
session: SessionDep, current_user: CurrentUser, id: uuid.UUID
Expand Down
84 changes: 77 additions & 7 deletions backend/app/interactem/app/events/producer.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,59 @@
import asyncio
import logging

from fastapi import HTTPException
from nats.aio.msg import Msg as NatsMessage
from nats.errors import NoRespondersError as NatsNoRespondersError
from nats.errors import TimeoutError as NatsTimeoutError
from nats.js.api import StreamInfo
from nats.js.errors import APIError, NoStreamResponseError
from pydantic import BaseModel
from sqlmodel import SQLModel

from interactem.core.constants import STREAM_PIPELINES, SUBJECT_PIPELINES_RUN
from interactem.core.constants import (
SFAPI_GROUP_NAME,
SFAPI_STATUS_ENDPOINT,
STREAM_PIPELINES,
STREAM_SFAPI,
SUBJECT_PIPELINES_RUN,
SUBJECT_SFAPI_JOBS_SUBMIT,
)
from interactem.core.events.pipelines import PipelineRunEvent
from interactem.core.nats import create_or_update_stream, nc
from interactem.core.nats.config import PIPELINES_STREAM_CONFIG
from interactem.core.nats.config import PIPELINES_STREAM_CONFIG, SFAPI_STREAM_CONFIG
from interactem.sfapi_models import AgentCreateEvent, StatusRequest

from ..core.config import settings

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

nats_tasks: list[asyncio.Task] = []

NATS_REQ_TIMEOUT_DEFAULT = 5
NATS_REQ_TIMEOUT_SFAPI = 10


async def start():
global nats_client
global nats_jetstream
logger.info(f"Connecting to NATS server: {settings.NATS_SERVER_URL}")
nats_client = await nc([str(settings.NATS_SERVER_URL)], "api")
nats_jetstream = nats_client.jetstream()
info = await create_or_update_stream(PIPELINES_STREAM_CONFIG, nats_jetstream)
logger.info(f"Stream information: {info}")
stream_infos: list[StreamInfo] = []
stream_infos.append(
await create_or_update_stream(PIPELINES_STREAM_CONFIG, nats_jetstream)
)
stream_infos.append(
await create_or_update_stream(SFAPI_STREAM_CONFIG, nats_jetstream)
)
logger.info(f"Streams information:\n {stream_infos}")


async def stop():
if nats_client:
await nats_client.close()


async def publish_jetstream_event(
stream: str,
subject: str,
Expand All @@ -45,9 +70,54 @@ async def publish_jetstream_event(
stream=stream,
headers=None,
)
except: # noqa
logger.exception(f"Exception send on subject: {subject}")
except NoStreamResponseError as e:
raise HTTPException(
status_code=503,
detail=f"Failed to publish event on stream: {stream}. \nNats error: {e}.",
)
except APIError as e:
if not e.code:
raise HTTPException(status_code=500, detail=str(e))
raise HTTPException(status_code=e.code, detail=str(e))


async def nats_req_rep(
subject: str,
payload: BaseModel | SQLModel,
timeout: int = NATS_REQ_TIMEOUT_DEFAULT,
) -> NatsMessage:
try:
rep: NatsMessage = await nats_client.request(
subject=subject,
payload=payload.model_dump_json().encode(),
headers=None,
timeout=timeout,
)
except NatsNoRespondersError as e:
raise HTTPException(
status_code=503,
detail=f"No responders found for subject: {subject}. Nats error: {e}.",
)
except NatsTimeoutError as e:
logger.exception(f"Timeout for subject: {subject}")
raise HTTPException(
status_code=504,
detail=f"Timeout for subject: {subject}. Nats error: {e}.",
)

return rep


async def publish_pipeline_run_event(event: PipelineRunEvent) -> None:
await publish_jetstream_event(STREAM_PIPELINES, SUBJECT_PIPELINES_RUN, event)


async def request_machine_status(payload: StatusRequest) -> NatsMessage:
return await nats_req_rep(
f"{SFAPI_GROUP_NAME}.{SFAPI_STATUS_ENDPOINT}",
payload,
timeout=NATS_REQ_TIMEOUT_SFAPI, # longer timeout for sfapi calls
)

async def publish_sfapi_submit_event(event: AgentCreateEvent) -> None:
await publish_jetstream_event(STREAM_SFAPI, SUBJECT_SFAPI_JOBS_SUBMIT, event)
Loading