diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9cf8bf03..487f7c33 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -29,8 +29,16 @@ jobs: redis: image: redis/redis-stack:7.2.0-v13 ports: - - 6333:6379 + - 6338:6379 options: --health-cmd "redis-cli ping" --health-interval 10s --health-timeout 5s --health-retries 5 + qdrant: + image: qdrant/qdrant:latest + ports: + - 6333:6333 + - 6334:6334 + options: --health-cmd "bash -c ':> /dev/tcp/127.0.0.1/6333' || exit 1" --health-interval 10s --health-timeout 5s --health-retries 5 + volumes: + - ./qdrant_storage:/qdrant/storage:z steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} diff --git a/hypha/VERSION b/hypha/VERSION index 62b918a4..637e2687 100644 --- a/hypha/VERSION +++ b/hypha/VERSION @@ -1,3 +1,3 @@ { - "version": "0.20.39.post13" + "version": "0.20.39.post14" } diff --git a/hypha/artifact.py b/hypha/artifact.py index 5bd78f00..ec032dcb 100644 --- a/hypha/artifact.py +++ b/hypha/artifact.py @@ -917,6 +917,7 @@ async def create( if alias: alias = alias.strip() + assert "^" not in alias, "Alias cannot contain the '^' character." if "/" in alias: ws, alias = alias.split("/") if workspace and ws != workspace: @@ -1068,7 +1069,7 @@ async def create( vectors_config = config.get("vectors_config", {}) await self._vectordb_client.create_collection( - collection_name=f"{new_artifact.workspace}/{new_artifact.alias}", + collection_name=f"{new_artifact.workspace}^{new_artifact.alias}", vectors_config=VectorParams( size=vectors_config.get("size", 128), distance=Distance(vectors_config.get("distance", "Cosine")), @@ -1277,7 +1278,7 @@ async def read( artifact_data["config"] = artifact_data.get("config", {}) artifact_data["config"]["vector_count"] = ( await self._vectordb_client.count( - collection_name=f"{artifact.workspace}/{artifact.alias}" + collection_name=f"{artifact.workspace}^{artifact.alias}" ) ).count @@ -1433,7 +1434,7 @@ async def delete( self._vectordb_client ), "The server is not configured to use a VectorDB client." await self._vectordb_client.delete_collection( - collection_name=f"{artifact.workspace}/{artifact.alias}" + collection_name=f"{artifact.workspace}^{artifact.alias}" ) s3_config = self._get_s3_config(artifact, parent_artifact) @@ -1522,7 +1523,7 @@ async def add_vectors( p["id"] = p.get("id") or str(uuid.uuid4()) _points.append(PointStruct(**p)) await self._vectordb_client.upsert( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", points=_points, ) # TODO: Update file_count @@ -1536,14 +1537,12 @@ async def _embed_texts(self, config, texts): embedding_model = config.get("embedding_model") # "text-embedding-3-small" assert ( embedding_model - ), "Embedding model must be provided, e.g. 'fastembed', 'text-embedding-3-small' for openai or 'all-minilm' for ollama." + ), "Embedding model must be provided, e.g. 'fastembed:BAAI/bge-small-en-v1.5', 'openai:text-embedding-3-small' for openai embeddings." if embedding_model.startswith("fastembed"): from fastembed import TextEmbedding - if ":" in embedding_model: - model_name = embedding_model.split(":")[-1] - else: - model_name = "BAAI/bge-small-en-v1.5" + assert ":" in embedding_model, "Embedding model must be provided." + model_name = embedding_model.split(":")[-1] embedding_model = TextEmbedding( model_name=model_name, cache_dir=self._cache_dir ) @@ -1551,14 +1550,20 @@ async def _embed_texts(self, config, texts): embeddings = list( await loop.run_in_executor(None, embedding_model.embed, texts) ) - else: + elif embedding_model.startswith("openai"): assert ( self._openai_client ), "The server is not configured to use an OpenAI client." + assert ":" in embedding_model, "Embedding model must be provided." + embedding_model = embedding_model.split(":")[-1] result = await self._openai_client.embeddings.create( input=texts, model=embedding_model ) embeddings = [data.embedding for data in result.data] + else: + raise ValueError( + f"Unsupported embedding model: {embedding_model}, supported models: 'fastembed:*', 'openai:*'" + ) return embeddings async def add_documents( @@ -1593,7 +1598,7 @@ async def add_documents( for embedding, doc in zip(embeddings, documents) ] await self._vectordb_client.upsert( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", points=points, ) logger.info(f"Upserted documents to artifact with ID: {artifact_id}") @@ -1632,7 +1637,7 @@ async def search_by_vector( if query_filter: query_filter = Filter.model_validate(query_filter) search_results = await self._vectordb_client.search( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", query_vector=query_vector, query_filter=query_filter, limit=limit, @@ -1642,7 +1647,7 @@ async def search_by_vector( ) if pagination: count = await self._vectordb_client.count( - collection_name=f"{artifact.workspace}/{artifact.alias}" + collection_name=f"{artifact.workspace}^{artifact.alias}" ) return { "total": count.count, @@ -1684,7 +1689,7 @@ async def search_by_text( if query_filter: query_filter = Filter.model_validate(query_filter) search_results = await self._vectordb_client.search( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", query_vector=query_vector, query_filter=query_filter, limit=limit, @@ -1694,7 +1699,7 @@ async def search_by_text( ) if pagination: count = await self._vectordb_client.count( - collection_name=f"{artifact.workspace}/{artifact.alias}" + collection_name=f"{artifact.workspace}^{artifact.alias}" ) return { "total": count.count, @@ -1728,7 +1733,7 @@ async def remove_vectors( self._vectordb_client ), "The server is not configured to use a VectorDB client." await self._vectordb_client.delete( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", points_selector=ids, ) logger.info(f"Removed vectors from artifact with ID: {artifact_id}") @@ -1757,7 +1762,7 @@ async def get_vector( self._vectordb_client ), "The server is not configured to use a VectorDB client." points = await self._vectordb_client.retrieve( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", ids=[id], with_payload=True, with_vectors=True, @@ -1797,7 +1802,7 @@ async def list_vectors( if query_filter: query_filter = Filter.model_validate(query_filter) points, _ = await self._vectordb_client.scroll( - collection_name=f"{artifact.workspace}/{artifact.alias}", + collection_name=f"{artifact.workspace}^{artifact.alias}", scroll_filter=query_filter, limit=limit, offset=offset, diff --git a/hypha/core/store.py b/hypha/core/store.py index 40453174..6c5344e7 100644 --- a/hypha/core/store.py +++ b/hypha/core/store.py @@ -12,6 +12,7 @@ from hypha_rpc import RPC from hypha_rpc.utils.schema import schema_method from starlette.routing import Mount +from pydantic.fields import Field from hypha import __version__ from hypha.core import ( @@ -278,6 +279,26 @@ async def _run_startup_functions(self, startup_functions): # Stop the entire event loop if an error occurs asyncio.get_running_loop().stop() + async def housekeeping(self): + """Perform housekeeping tasks.""" + # Perform housekeeping tasks + # Start the housekeeping task after 2 minutes + logger.info("Starting housekeeping task in 2 minutes...") + await asyncio.sleep(120) + while True: + try: + logger.info("Running housekeeping task...") + async with self.get_workspace_interface( + self._root_user, "ws-user-root", client_id="housekeeping" + ) as api: + # admin = await api.get_service("admin-utils") + workspaces = await api.list_workspaces() + for workspace in workspaces: + await api.cleanup(workspace.id) + await asyncio.sleep(3600) + except Exception as e: + logger.exception(f"Error in housekeeping: {e}") + async def upgrade(self): """Upgrade the store.""" current_version = await self._redis.get("hypha_version") @@ -503,6 +524,8 @@ async def init(self, reset_redis, startup_functions=None): logger.info("Server initialized with server id: %s", self._server_id) logger.info("Currently connected hypha servers: %s", servers) + asyncio.create_task(self.housekeeping()) + async def _register_root_services(self): """Register root services.""" self._root_workspace_interface = await self.get_workspace_interface( @@ -522,9 +545,33 @@ async def _register_root_services(self): "list_servers": self.list_servers, "kickout_client": self.kickout_client, "list_workspaces": self.list_all_workspaces, + "list_vector_collections": self.list_vector_collections, + "delete_vector_collection": self.delete_vector_collection, } ) + @schema_method + async def list_vector_collections(self): + """List all vector collections.""" + if self._vectordb_client is None: + raise Exception("Vector database is not configured") + # get_collections + collections = await self._vectordb_client.get_collections() + return collections + + @schema_method + async def delete_vector_collection( + self, + collection_name: str = Field( + ..., description="The name of the vector collection to delete." + ), + ): + """Delete a vector collection.""" + if self._vectordb_client is None: + raise Exception("Vector database is not configured") + # delete_collection + await self._vectordb_client.delete_collection(collection_name) + @schema_method async def list_servers(self): """List all servers.""" diff --git a/tests/__init__.py b/tests/__init__.py index fe7eff3b..edb0fa75 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -18,7 +18,7 @@ MINIO_SERVER_URL_PUBLIC = f"http://localhost:{MINIO_PORT}" MINIO_ROOT_USER = "minio" MINIO_ROOT_PASSWORD = str(uuid.uuid4()) -REDIS_PORT = 6333 +REDIS_PORT = 6338 POSTGRES_PORT = 5432 POSTGRES_USER = "postgres" @@ -28,6 +28,10 @@ POSTGRES_URI = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:{POSTGRES_PORT}/{POSTGRES_DB}" +QDRANT_PORT = 6333 +QDRANT_URL = "http://127.0.0.1:6333" + + def find_item(items, key, value): """Find an item with key or attributes in an object list.""" filtered = [ diff --git a/tests/conftest.py b/tests/conftest.py index 8b85095d..7d24b65b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,6 +19,7 @@ from hypha.core.auth import generate_presigned_token, create_scope from hypha.minio import setup_minio_executables from redis import Redis +from qdrant_client import QdrantClient from . import ( MINIO_PORT, @@ -38,6 +39,8 @@ POSTGRES_PASSWORD, POSTGRES_DB, POSTGRES_URI, + QDRANT_PORT, + QDRANT_URL, ) JWT_SECRET = str(uuid.uuid4()) @@ -171,13 +174,16 @@ def postgres_server(): "-p", f"{POSTGRES_PORT}:5432", "-d", - "postgres", + "postgres:12.21", ] ) time.sleep(2) # Give the container time to initialize else: print("Using existing PostgreSQL container:", existing_container) else: + # Pull the PostgreSQL image + print("Pulling PostgreSQL Docker image...") + subprocess.run(["docker", "pull", "postgres:12.21"], check=True) # Start a new PostgreSQL container print("Starting a new PostgreSQL container") subprocess.Popen( @@ -192,7 +198,7 @@ def postgres_server(): "-p", f"{POSTGRES_PORT}:5432", "-d", - "postgres", + "postgres:12.21", ] ) time.sleep(2) @@ -227,6 +233,8 @@ def redis_server(): r.ping() yield except Exception: + # Pull the Redis image + subprocess.run(["docker", "pull", "redis/redis-stack:7.2.0-v13"], check=True) # user docker to start redis subprocess.Popen( [ @@ -254,8 +262,67 @@ def redis_server(): time.sleep(1) +@pytest_asyncio.fixture(name="qdrant_server", scope="session") +def qdrant_server(): + """Start a Qdrant server as test fixture and tear down after test.""" + try: + # Check if Qdrant is already running + client = QdrantClient(QDRANT_URL) + client.info() + print(f"Qdrant is running, using vectordb at {QDRANT_URL}") + yield QDRANT_URL + except Exception: + # Pull the Qdrant image + print("Pulling Qdrant Docker image...") + subprocess.run( + ["docker", "pull", "qdrant/qdrant:v1.12.4-unprivileged"], check=True + ) + # Start Qdrant using Docker + print("Qdrant is not running, starting a Qdrant server using Docker...") + dirpath = tempfile.mkdtemp() + subprocess.Popen( + [ + "docker", + "run", + "-d", + "--name", + "qdrant", + "-p", + f"{QDRANT_PORT}:{QDRANT_PORT}", + "-p", + "6334:6334", + "-v", + f"{dirpath}:/qdrant/storage:z", + "qdrant/qdrant:v1.12.4-unprivileged", + ] + ) + + # Wait for Qdrant to be ready + timeout = 10 + while timeout > 0: + try: + client = QdrantClient(QDRANT_URL) + client.info() + print(f"Qdrant is running at {QDRANT_URL}") + break + except Exception: + pass + timeout -= 0.1 + time.sleep(0.1) + + if timeout <= 0: + raise RuntimeError("Failed to start Qdrant server.") + + yield QDRANT_URL + + # Stop and remove the Docker container after the test + subprocess.Popen(["docker", "stop", "qdrant"]) + subprocess.Popen(["docker", "rm", "qdrant"]) + time.sleep(1) + + @pytest_asyncio.fixture(name="fastapi_server", scope="session") -def fastapi_server_fixture(minio_server, postgres_server): +def fastapi_server_fixture(minio_server, postgres_server, qdrant_server): """Start server as test fixture and tear down after test.""" with subprocess.Popen( [ @@ -275,7 +342,7 @@ def fastapi_server_fixture(minio_server, postgres_server): "--enable-s3-proxy", f"--workspace-bucket=my-workspaces", "--s3-admin-type=minio", - "--vectordb-uri=:memory:", + f"--vectordb-uri={qdrant_server}", "--cache-dir=./bin/cache", f"--triton-servers=http://127.0.0.1:{TRITON_PORT}", "--static-mounts=/tests:./tests", diff --git a/tests/test_artifact.py b/tests/test_artifact.py index d89c3459..6f44bcb8 100644 --- a/tests/test_artifact.py +++ b/tests/test_artifact.py @@ -55,7 +55,7 @@ async def test_artifact_vector_collection( }, }, { - "vector": [np.random.rand(384)], + "vector": np.random.rand(384), "payload": { "text": "Another document.", "label": "doc2", @@ -63,7 +63,7 @@ async def test_artifact_vector_collection( }, }, { - "vector": [np.random.rand(384)], + "vector": np.random.rand(384), "payload": { "text": "Yet another document.", "label": "doc3", diff --git a/tests/test_server.py b/tests/test_server.py index 15e45cdc..b93cde21 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -324,114 +324,6 @@ async def test_workspace_owners( await api.disconnect() -async def test_service_search(fastapi_server_redis_1, test_user_token): - """Test service search with registered services.""" - api = await connect_to_server( - { - "client_id": "my-app-99", - "server_url": SERVER_URL_REDIS_1, - "token": test_user_token, - } - ) - - # Register sample services with unique `docs` field - await api.register_service( - { - "id": "service-1", - "name": "example service one", - "type": "my-type", - "description": "This is the first test service.", - "app_id": "my-app", - "service_schema": {"example_key": "example_value"}, - "config": {"setting": "value1"}, - "docs": "This service handles data analysis workflows for genomics.", - } - ) - await api.register_service( - { - "id": "service-2", - "name": "example service two", - "type": "another-type", - "description": "This is the second test service.", - "app_id": "another-app", - "service_schema": {"example_key": "another_value"}, - "config": {"setting": "value2"}, - "docs": "This service focuses on image processing for medical imaging.", - } - ) - - await api.register_service( - { - "id": "service-3", - "name": "example service three", - "type": "my-type", - "description": "This is the third test service.", - "app_id": "my-app", - "service_schema": {"example_key": "yet_another_value"}, - "config": {"setting": "value3"}, - "docs": "This service specializes in natural language processing and AI chatbots.", - } - ) - - # Test semantic search using `text_query` - text_query = "NLP" - services = await api.search_services(text_query=text_query, limit=3) - assert isinstance(services, list) - assert len(services) <= 3 - # The top hit should be the service with "natural language processing" in the `docs` field - assert "natural language processing" in services[0]["docs"] - assert services[0]["score"] < services[1]["score"] - - results = await api.search_services(text_query=text_query, limit=3, pagination=True) - assert results["total"] >= 1 - - embedding = np.ones(384).astype(np.float32) - await api.register_service( - { - "id": "service-88", - "name": "example service 88", - "type": "another-type", - "description": "This is the 88-th test service.", - "app_id": "another-app", - "service_schema": {"example_key": "another_value"}, - "config": {"setting": "value2", "service_embedding": embedding}, - "docs": "This service is used for performing alphafold calculations.", - } - ) - - # Test vector query with the exact embedding - services = await api.search_services(vector_query=embedding, limit=3) - assert isinstance(services, list) - assert len(services) <= 3 - assert "service-88" in services[0]["id"] - - # Test filter-based search with fuzzy matching on the `docs` field - filters = {"docs": "calculations*"} - services = await api.search_services(filters=filters, limit=3) - assert isinstance(services, list) - assert len(services) <= 3 - assert "calculations" in services[0]["docs"] - - # Test hybrid search (text query + filters) - filters = {"type": "my-type"} - text_query = "genomics workflows" - services = await api.search_services( - text_query=text_query, filters=filters, limit=3 - ) - assert isinstance(services, list) - assert all(service["type"] == "my-type" for service in services) - # The top hit should be the service with "genomics" in the `docs` field - assert "genomics" in services[0]["docs"].lower() - - # Test hybrid search (embedding + filters) - filters = {"type": "my-type"} - services = await api.search_services( - vector_query=np.random.rand(384), filters=filters, limit=3 - ) - assert isinstance(services, list) - assert all(service["type"] == "my-type" for service in services) - - async def test_server_scalability( fastapi_server_redis_1, fastapi_server_redis_2, test_user_token ): diff --git a/tests/test_service_search.py b/tests/test_service_search.py new file mode 100644 index 00000000..f42f70ae --- /dev/null +++ b/tests/test_service_search.py @@ -0,0 +1,120 @@ +"""Test the hypha server.""" +import numpy as np + +import pytest +from hypha_rpc import connect_to_server + +from . import ( + SERVER_URL_REDIS_1, +) + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +async def test_service_search(fastapi_server_redis_1, test_user_token): + """Test service search with registered services.""" + api = await connect_to_server( + { + "client_id": "my-app-99", + "server_url": SERVER_URL_REDIS_1, + "token": test_user_token, + } + ) + + # Register sample services with unique `docs` field + await api.register_service( + { + "id": "service-1", + "name": "example service one", + "type": "my-type", + "description": "This is the first test service.", + "app_id": "my-app", + "service_schema": {"example_key": "example_value"}, + "config": {"setting": "value1"}, + "docs": "This service handles data analysis workflows for genomics.", + } + ) + await api.register_service( + { + "id": "service-2", + "name": "example service two", + "type": "another-type", + "description": "This is the second test service.", + "app_id": "another-app", + "service_schema": {"example_key": "another_value"}, + "config": {"setting": "value2"}, + "docs": "This service focuses on image processing for medical imaging.", + } + ) + + await api.register_service( + { + "id": "service-3", + "name": "example service three", + "type": "my-type", + "description": "This is the third test service.", + "app_id": "my-app", + "service_schema": {"example_key": "yet_another_value"}, + "config": {"setting": "value3"}, + "docs": "This service specializes in natural language processing and AI chatbots.", + } + ) + + # Test semantic search using `text_query` + text_query = "NLP" + services = await api.search_services(text_query=text_query, limit=3) + assert isinstance(services, list) + assert len(services) <= 3 + # The top hit should be the service with "natural language processing" in the `docs` field + assert "natural language processing" in services[0]["docs"] + assert services[0]["score"] < services[1]["score"] + + results = await api.search_services(text_query=text_query, limit=3, pagination=True) + assert results["total"] >= 1 + + embedding = np.ones(384).astype(np.float32) + await api.register_service( + { + "id": "service-88", + "name": "example service 88", + "type": "another-type", + "description": "This is the 88-th test service.", + "app_id": "another-app", + "service_schema": {"example_key": "another_value"}, + "config": {"setting": "value2", "service_embedding": embedding}, + "docs": "This service is used for performing alphafold calculations.", + } + ) + + # Test vector query with the exact embedding + services = await api.search_services(vector_query=embedding, limit=3) + assert isinstance(services, list) + assert len(services) <= 3 + assert "service-88" in services[0]["id"] + + # Test filter-based search with fuzzy matching on the `docs` field + filters = {"docs": "calculations*"} + services = await api.search_services(filters=filters, limit=3) + assert isinstance(services, list) + assert len(services) <= 3 + assert "calculations" in services[0]["docs"] + + # Test hybrid search (text query + filters) + filters = {"type": "my-type"} + text_query = "genomics workflows" + services = await api.search_services( + text_query=text_query, filters=filters, limit=3 + ) + assert isinstance(services, list) + assert all(service["type"] == "my-type" for service in services) + # The top hit should be the service with "genomics" in the `docs` field + assert "genomics" in services[0]["docs"].lower() + + # Test hybrid search (embedding + filters) + filters = {"type": "my-type"} + services = await api.search_services( + vector_query=np.random.rand(384), filters=filters, limit=3 + ) + assert isinstance(services, list) + assert all(service["type"] == "my-type" for service in services)