diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml new file mode 100644 index 000000000..ba1db45c8 --- /dev/null +++ b/.github/workflows/docker.yml @@ -0,0 +1,31 @@ +name: Release Nightly + +on: + workflow_dispatch: + pull_request: + +permissions: + actions: write + +jobs: + trigger: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERIO_USERNAME }} + password: ${{ secrets.DOCKERIO_TOKEN }} + - name: Push binary release to Docker Registry + uses: docker/build-push-action@v4 + with: + context: . + push: true + file: .Dockerfile + tags: ${{ secrets.DOCKERIO_USERNAME }}/vectordbbench:latest diff --git a/Dockerfile b/Dockerfile index 5c57ceb03..a5b9b1ec2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,5 +14,7 @@ COPY --from=builder-image /usr/local/lib/python3.11/site-packages /usr/local/lib WORKDIR /opt/code COPY . . ENV PYTHONPATH /opt/code +RUN sed -i 's/deb.debian.org/mirrors.ustc.edu.cn/g' /etc/apt/sources.list +RUN apt-get update && apt-get install libpq5 -y ENTRYPOINT ["python3", "-m", "vectordb_bench"] diff --git a/install/requirements_py3.11.txt b/install/requirements_py3.11.txt index c55601934..2c3d4f0c6 100644 --- a/install/requirements_py3.11.txt +++ b/install/requirements_py3.11.txt @@ -1,13 +1,6 @@ grpcio==1.53.0 grpcio-tools==1.53.0 -qdrant-client -pinecone-client -weaviate-client -elasticsearch -pgvector -sqlalchemy -redis -chromadb +pgvecto.rs @ git+https://github.com/cutecutecat/pgvecto.rs-py@django-types pytz streamlit-autorefresh streamlit>=1.23.0 @@ -21,3 +14,4 @@ environs pydantic=3.1.12" ] redis = [ "redis" ] chromadb = [ "chromadb" ] zilliz_cloud = [] diff --git a/vectordb_bench/__init__.py b/vectordb_bench/__init__.py index f7664502f..1f2ab64a3 100644 --- a/vectordb_bench/__init__.py +++ b/vectordb_bench/__init__.py @@ -46,14 +46,14 @@ class config: LOAD_TIMEOUT_1536D_500K = 2.5 * 3600 # 2.5h LOAD_TIMEOUT_1536D_5M = 25 * 3600 # 25h - OPTIMIZE_TIMEOUT_DEFAULT = 30 * 60 # 30min - OPTIMIZE_TIMEOUT_768D_1M = 30 * 60 # 30min - OPTIMIZE_TIMEOUT_768D_10M = 5 * 3600 # 5h - OPTIMIZE_TIMEOUT_768D_100M = 50 * 3600 # 50h + OPTIMIZE_TIMEOUT_DEFAULT = LOAD_TIMEOUT_DEFAULT + OPTIMIZE_TIMEOUT_768D_1M = LOAD_TIMEOUT_768D_1M + OPTIMIZE_TIMEOUT_768D_10M = LOAD_TIMEOUT_768D_10M + OPTIMIZE_TIMEOUT_768D_100M = LOAD_TIMEOUT_768D_100M - OPTIMIZE_TIMEOUT_1536D_500K = 15 * 60 # 15min - OPTIMIZE_TIMEOUT_1536D_5M = 2.5 * 3600 # 2.5h + OPTIMIZE_TIMEOUT_1536D_500K = LOAD_TIMEOUT_1536D_500K + OPTIMIZE_TIMEOUT_1536D_5M = LOAD_TIMEOUT_1536D_5M def display(self) -> str: tmp = [ i for i in inspect.getmembers(self) diff --git a/vectordb_bench/backend/cases.py b/vectordb_bench/backend/cases.py index 6c43bb910..38773b2de 100644 --- a/vectordb_bench/backend/cases.py +++ b/vectordb_bench/backend/cases.py @@ -1,4 +1,3 @@ -import typing import logging from enum import Enum, auto from typing import Type diff --git a/vectordb_bench/backend/clients/api.py b/vectordb_bench/backend/clients/api.py index 0024bf600..68ac9c6d8 100644 --- a/vectordb_bench/backend/clients/api.py +++ b/vectordb_bench/backend/clients/api.py @@ -1,6 +1,5 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Type from contextlib import contextmanager from pydantic import BaseModel, validator, SecretStr diff --git a/vectordb_bench/backend/clients/milvus/milvus.py b/vectordb_bench/backend/clients/milvus/milvus.py index 4590265ae..11fd1c59e 100644 --- a/vectordb_bench/backend/clients/milvus/milvus.py +++ b/vectordb_bench/backend/clients/milvus/milvus.py @@ -8,7 +8,7 @@ from pymilvus import Collection, utility from pymilvus import CollectionSchema, DataType, FieldSchema, MilvusException -from ..api import VectorDB, IndexType +from ..api import VectorDB from .config import MilvusIndexConfig @@ -129,7 +129,7 @@ def wait_index(): log.warning(f"{self.name} compact error: {e}") if hasattr(e, 'code'): if e.code().name == 'PERMISSION_DENIED': - log.warning(f"Skip compact due to permission denied.") + log.warning("Skip compact due to permission denied.") pass else: raise e @@ -165,7 +165,7 @@ def optimize(self): def need_normalize_cosine(self) -> bool: """Wheather this database need to normalize dataset to support COSINE""" if self.case_config.is_gpu_index: - log.info(f"current gpu_index only supports IP / L2, cosine dataset need normalize.") + log.info("current gpu_index only supports IP / L2, cosine dataset need normalize.") return True return False diff --git a/vectordb_bench/backend/clients/pgvecto_rs/cli.py b/vectordb_bench/backend/clients/pgvecto_rs/cli.py new file mode 100644 index 000000000..10dbff556 --- /dev/null +++ b/vectordb_bench/backend/clients/pgvecto_rs/cli.py @@ -0,0 +1,154 @@ +from typing import Annotated, Optional, Unpack + +import click +import os +from pydantic import SecretStr + +from ....cli.cli import ( + CommonTypedDict, + HNSWFlavor1, + IVFFlatTypedDict, + cli, + click_parameter_decorators_from_typed_dict, + run, +) +from vectordb_bench.backend.clients import DB + + +class PgVectoRSTypedDict(CommonTypedDict): + user_name: Annotated[ + str, click.option("--user-name", type=str, help="Db username", required=True) + ] + password: Annotated[ + str, + click.option( + "--password", + type=str, + help="Postgres database password", + default=lambda: os.environ.get("POSTGRES_PASSWORD", ""), + show_default="$POSTGRES_PASSWORD", + ), + ] + + host: Annotated[ + str, click.option("--host", type=str, help="Db host", required=True) + ] + db_name: Annotated[ + str, click.option("--db-name", type=str, help="Db name", required=True) + ] + max_parallel_workers: Annotated[ + Optional[int], + click.option( + "--max-parallel-workers", + type=int, + help="Sets the maximum number of parallel processes per maintenance operation (index creation)", + required=False, + ), + ] + quantization_type: Annotated[ + str, + click.option( + "--quantization-type", + type=click.Choice(["trivial", "scalar", "product"]), + help="quantization type for vectors", + required=False, + ), + ] + quantization_ratio: Annotated[ + str, + click.option( + "--quantization-ratio", + type=click.Choice(["x4", "x8", "x16", "x32", "x64"]), + help="quantization ratio(for product quantization)", + required=False, + ), + ] + + +class PgVectoRSFlatTypedDict(PgVectoRSTypedDict, IVFFlatTypedDict): ... + + +@cli.command() +@click_parameter_decorators_from_typed_dict(PgVectoRSFlatTypedDict) +def PgVectoRSFlat( + **parameters: Unpack[PgVectoRSFlatTypedDict], +): + from .config import PgVectoRSConfig, PgVectoRSFLATConfig + + run( + db=DB.PgVectoRS, + db_config=PgVectoRSConfig( + db_label=parameters["db_label"], + user_name=SecretStr(parameters["user_name"]), + password=SecretStr(parameters["password"]), + host=parameters["host"], + db_name=parameters["db_name"], + ), + db_case_config=PgVectoRSFLATConfig( + max_parallel_workers=parameters["max_parallel_workers"], + quantization_type=parameters["quantization_type"], + quantization_ratio=parameters["quantization_ratio"], + ), + **parameters, + ) + + +class PgVectoRSIVFFlatTypedDict(PgVectoRSTypedDict, IVFFlatTypedDict): ... + + +@cli.command() +@click_parameter_decorators_from_typed_dict(PgVectoRSIVFFlatTypedDict) +def PgVectoRSIVFFlat( + **parameters: Unpack[PgVectoRSIVFFlatTypedDict], +): + from .config import PgVectoRSConfig, PgVectoRSIVFFlatConfig + + run( + db=DB.PgVectoRS, + db_config=PgVectoRSConfig( + db_label=parameters["db_label"], + user_name=SecretStr(parameters["user_name"]), + password=SecretStr(parameters["password"]), + host=parameters["host"], + db_name=parameters["db_name"], + ), + db_case_config=PgVectoRSIVFFlatConfig( + max_parallel_workers=parameters["max_parallel_workers"], + quantization_type=parameters["quantization_type"], + quantization_ratio=parameters["quantization_ratio"], + probes=parameters["probes"], + lists=parameters["lists"], + ), + **parameters, + ) + + +class PgVectoRSHNSWTypedDict(PgVectoRSTypedDict, HNSWFlavor1): ... + + +@cli.command() +@click_parameter_decorators_from_typed_dict(PgVectoRSHNSWTypedDict) +def PgVectoRSHNSW( + **parameters: Unpack[PgVectoRSHNSWTypedDict], +): + from .config import PgVectoRSConfig, PgVectoRSHNSWConfig + + run( + db=DB.PgVectoRS, + db_config=PgVectoRSConfig( + db_label=parameters["db_label"], + user_name=SecretStr(parameters["user_name"]), + password=SecretStr(parameters["password"]), + host=parameters["host"], + db_name=parameters["db_name"], + ), + db_case_config=PgVectoRSHNSWConfig( + max_parallel_workers=parameters["max_parallel_workers"], + quantization_type=parameters["quantization_type"], + quantization_ratio=parameters["quantization_ratio"], + m=parameters["m"], + ef_construction=parameters["ef_construction"], + ef_search=parameters["ef_search"], + ), + **parameters, + ) diff --git a/vectordb_bench/backend/clients/pgvecto_rs/config.py b/vectordb_bench/backend/clients/pgvecto_rs/config.py index 73c41c239..08ed7e082 100644 --- a/vectordb_bench/backend/clients/pgvecto_rs/config.py +++ b/vectordb_bench/backend/clients/pgvecto_rs/config.py @@ -1,10 +1,26 @@ -from typing import Literal +from abc import abstractmethod +from typing import TypedDict + from pydantic import BaseModel, SecretStr -from ..api import DBConfig, DBCaseConfig, MetricType, IndexType +from pgvecto_rs.types import IndexOption, Ivf, Hnsw, Flat, Quantization +from pgvecto_rs.types.index import QuantizationType, QuantizationRatio + +from ..api import DBConfig, DBCaseConfig, IndexType, MetricType POSTGRE_URL_PLACEHOLDER = "postgresql://%s:%s@%s/%s" +class PgVectorRSConfigDict(TypedDict): + """These keys will be directly used as kwargs in psycopg connection string, + so the names must match exactly psycopg API""" + + user: str + password: str + host: str + port: int + dbname: str + + class PgVectoRSConfig(DBConfig): user_name: SecretStr = "postgres" password: SecretStr @@ -20,11 +36,18 @@ def to_dict(self) -> dict: "port": self.port, "dbname": self.db_name, "user": user_str, - "password": pwd_str + "password": pwd_str, } + class PgVectoRSIndexConfig(BaseModel, DBCaseConfig): metric_type: MetricType | None = None + create_index_before_load: bool = False + create_index_after_load: bool = True + + max_parallel_workers: int | None = None + quantization_type: QuantizationType | None = None + quantization_ratio: QuantizationRatio | None = None def parse_metric(self) -> str: if self.metric_type == MetricType.L2: @@ -40,88 +63,100 @@ def parse_metric_fun_op(self) -> str: return "<#>" return "<=>" -class PgVectoRSQuantConfig(PgVectoRSIndexConfig): - quantizationType: Literal["trivial", "scalar", "product"] - quantizationRatio: None | Literal["x4", "x8", "x16", "x32", "x64"] - - def parse_quantization(self) -> str: - if self.quantizationType == "trivial": - return "quantization = { trivial = { } }" - elif self.quantizationType == "scalar": - return "quantization = { scalar = { } }" - else: - return f'quantization = {{ product = {{ ratio = "{self.quantizationRatio}" }} }}' - + def search_param(self) -> dict: + return { + "metric_fun_op": self.parse_metric_fun_op(), + } -class HNSWConfig(PgVectoRSQuantConfig): - M: int - efConstruction: int - index: IndexType = IndexType.HNSW + @abstractmethod + def index_param(self) -> dict[str, str]: ... - def index_param(self) -> dict: - options = f""" -[indexing.hnsw] -m = {self.M} -ef_construction = {self.efConstruction} -{self.parse_quantization()} -""" - return {"options": options, "metric": self.parse_metric()} + @abstractmethod + def session_param(self) -> dict[str, str | int]: ... - def search_param(self) -> dict: - return {"metrics_op": self.parse_metric_fun_op()} +class PgVectoRSHNSWConfig(PgVectoRSIndexConfig): + index: IndexType = IndexType.HNSW + m: int | None = None + ef_search: int | None + ef_construction: int | None = None -class IVFFlatConfig(PgVectoRSQuantConfig): - nlist: int - nprobe: int | None = None + def index_param(self) -> dict[str, str]: + if self.quantization_type is None: + quantization = None + else: + quantization = Quantization( + typ=self.quantization_type, ratio=self.quantization_ratio + ) + + option = IndexOption( + index=Hnsw( + m=self.m, + ef_construction=self.ef_construction, + quantization=quantization, + ), + threads=self.max_parallel_workers, + ) + return {"options": option.dumps(), "metric": self.parse_metric()} + + def session_param(self) -> dict[str, str | int]: + session_parameters = {} + if self.ef_search is not None: + session_parameters["vectors.hnsw_ef_search"] = str(self.ef_search) + return session_parameters + + +class PgVectoRSIVFFlatConfig(PgVectoRSIndexConfig): index: IndexType = IndexType.IVFFlat + probes: int | None + lists: int | None + + def index_param(self) -> dict[str, str]: + if self.quantization_type is None: + quantization = None + else: + quantization = Quantization( + typ=self.quantization_type, ratio=self.quantization_ratio + ) - def index_param(self) -> dict: - options = f""" -[indexing.ivf] -nlist = {self.nlist} -nsample = {self.nprobe if self.nprobe else 10} -{self.parse_quantization()} -""" - return {"options": options, "metric": self.parse_metric()} + option = IndexOption( + index=Ivf(nlist=self.lists, quantization=quantization), + threads=self.max_parallel_workers, + ) + return {"options": option.dumps(), "metric": self.parse_metric()} - def search_param(self) -> dict: - return {"metrics_op": self.parse_metric_fun_op()} - -class IVFFlatSQ8Config(PgVectoRSIndexConfig): - nlist: int - nprobe: int | None = None - index: IndexType = IndexType.IVFSQ8 - - def index_param(self) -> dict: - options = f""" -[indexing.ivf] -nlist = {self.nlist} -nsample = {self.nprobe if self.nprobe else 10} -quantization = {{ scalar = {{ }} }} -""" - return {"options": options, "metric": self.parse_metric()} + def session_param(self) -> dict[str, str | int]: + session_parameters = {} + if self.probes is not None: + session_parameters["vectors.ivf_nprobe"] = str(self.probes) + return session_parameters - def search_param(self) -> dict: - return {"metrics_op": self.parse_metric_fun_op()} -class FLATConfig(PgVectoRSQuantConfig): +class PgVectoRSFLATConfig(PgVectoRSIndexConfig): index: IndexType = IndexType.Flat - def index_param(self) -> dict: - options = f""" -[indexing.flat] -{self.parse_quantization()} -""" - return {"options": options, "metric": self.parse_metric()} + def index_param(self) -> dict[str, str]: + if self.quantization_type is None: + quantization = None + else: + quantization = Quantization( + typ=self.quantization_type, ratio=self.quantization_ratio + ) - def search_param(self) -> dict: - return {"metrics_op": self.parse_metric_fun_op()} + option = IndexOption( + index=Flat( + quantization=quantization, + ), + threads=self.max_parallel_workers, + ) + return {"options": option.dumps(), "metric": self.parse_metric()} + + def session_param(self) -> dict[str, str | int]: + return {} _pgvecto_rs_case_config = { - IndexType.HNSW: HNSWConfig, - IndexType.IVFFlat: IVFFlatConfig, - IndexType.IVFSQ8: IVFFlatSQ8Config, - IndexType.Flat: FLATConfig, + IndexType.HNSW: PgVectoRSHNSWConfig, + IndexType.IVFFlat: PgVectoRSIVFFlatConfig, + IndexType.Flat: PgVectoRSFLATConfig, } diff --git a/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py b/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py index 22caa43e6..57ca6f388 100644 --- a/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py +++ b/vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py @@ -1,73 +1,138 @@ """Wrapper around the Pgvecto.rs vector database over VectorDB""" -import io import logging +import pprint from contextlib import contextmanager -from typing import Any -import pandas as pd -import psycopg2 -import psycopg2.extras +from typing import Any, Generator, Optional, Tuple -from ..api import VectorDB, DBCaseConfig +import numpy as np +import psycopg +from psycopg import Connection, Cursor, sql +from pgvecto_rs.psycopg import register_vector + +from ..api import VectorDB +from .config import PgVectoRSConfig, PgVectoRSIndexConfig log = logging.getLogger(__name__) + class PgVectoRS(VectorDB): - """Use SQLAlchemy instructions""" + """Use psycopg instructions""" + + conn: psycopg.Connection[Any] | None = None + cursor: psycopg.Cursor[Any] | None = None + _unfiltered_search: sql.Composed + _filtered_search: sql.Composed def __init__( self, dim: int, - db_config: dict, - db_case_config: DBCaseConfig, - collection_name: str = "PgVectorCollection", + db_config: PgVectoRSConfig, + db_case_config: PgVectoRSIndexConfig, + collection_name: str = "PgVectoRSCollection", drop_old: bool = False, **kwargs, ): + + self.name = "PgVectorRS" self.db_config = db_config self.case_config = db_case_config self.table_name = collection_name self.dim = dim - self._index_name = "pqvector_index" + self._index_name = "pgvectors_index" self._primary_field = "id" self._vector_field = "embedding" # construct basic units - self.conn = psycopg2.connect(**self.db_config) - self.conn.autocommit = False - self.cursor = self.conn.cursor() + self.conn, self.cursor = self._create_connection(**self.db_config) # create vector extension self.cursor.execute("CREATE EXTENSION IF NOT EXISTS vectors") self.conn.commit() + register_vector(self.conn) + + log.info(f"{self.name} config values: {self.db_config}\n{self.case_config}") + if not any( + ( + self.case_config.create_index_before_load, + self.case_config.create_index_after_load, + ) + ): + err = f"{self.name} config must create an index using create_index_before_load or create_index_after_load" + log.error(err) + raise RuntimeError( + f"{err}\n{pprint.pformat(self.db_config)}\n{pprint.pformat(self.case_config)}" + ) + if drop_old: log.info(f"Pgvecto.rs client drop table : {self.table_name}") self._drop_index() self._drop_table() self._create_table(dim) - self._create_index() + if self.case_config.create_index_before_load: + self._create_index() self.cursor.close() self.conn.close() self.cursor = None self.conn = None + @staticmethod + def _create_connection(**kwargs) -> Tuple[Connection, Cursor]: + conn = psycopg.connect(**kwargs) + conn.autocommit = False + cursor = conn.cursor() + + assert conn is not None, "Connection is not initialized" + assert cursor is not None, "Cursor is not initialized" + + return conn, cursor + @contextmanager - def init(self) -> None: + def init(self) -> Generator[None, None, None]: """ Examples: >>> with self.init(): >>> self.insert_embeddings() >>> self.search_embedding() """ - self.conn = psycopg2.connect(**self.db_config) - self.conn.autocommit = False - self.cursor = self.conn.cursor() - self.cursor.execute('SET search_path = "$user", public, vectors') + + self.conn, self.cursor = self._create_connection(**self.db_config) + + # index configuration may have commands defined that we should set during each client session + session_options = self.case_config.session_param() + + for key, val in session_options.items(): + command = sql.SQL("SET {setting_name} " + "= {val};").format( + setting_name=sql.Identifier(key), + val=val, + ) + log.debug(command.as_string(self.cursor)) + self.cursor.execute(command) self.conn.commit() + self._filtered_search = sql.Composed( + [ + sql.SQL( + "SELECT id FROM public.{table_name} WHERE id >= %s ORDER BY embedding " + ).format(table_name=sql.Identifier(self.table_name)), + sql.SQL(self.case_config.search_param()["metric_fun_op"]), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + + self._unfiltered_search = sql.Composed( + [ + sql.SQL( + "SELECT id FROM public.{table_name} ORDER BY embedding " + ).format(table_name=sql.Identifier(self.table_name)), + sql.SQL(self.case_config.search_param()["metric_fun_op"]), + sql.SQL(" %s::vector LIMIT %s::int"), + ] + ) + try: yield finally: @@ -79,42 +144,65 @@ def init(self) -> None: def _drop_table(self): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop table : {self.table_name}") - self.cursor.execute(f'DROP TABLE IF EXISTS public."{self.table_name}"') + self.cursor.execute( + sql.SQL("DROP TABLE IF EXISTS public.{table_name}").format( + table_name=sql.Identifier(self.table_name) + ) + ) self.conn.commit() def ready_to_load(self): pass def optimize(self): - pass + self._post_insert() - def ready_to_search(self): - pass + def _post_insert(self): + log.info(f"{self.name} post insert before optimize") + if self.case_config.create_index_after_load: + self._drop_index() + self._create_index() def _drop_index(self): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client drop index : {self._index_name}") - self.cursor.execute(f'DROP INDEX IF EXISTS "{self._index_name}"') + drop_index_sql = sql.SQL("DROP INDEX IF EXISTS {index_name}").format( + index_name=sql.Identifier(self._index_name) + ) + log.debug(drop_index_sql.as_string(self.cursor)) + self.cursor.execute(drop_index_sql) self.conn.commit() def _create_index(self): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" + log.info(f"{self.name} client create index : {self._index_name}") index_param = self.case_config.index_param() + index_create_sql = sql.SQL( + """ + CREATE INDEX IF NOT EXISTS {index_name} ON public.{table_name} + USING vectors (embedding {embedding_metric}) WITH (options = {index_options}) + """ + ).format( + index_name=sql.Identifier(self._index_name), + table_name=sql.Identifier(self.table_name), + embedding_metric=sql.Identifier(index_param["metric"]), + index_options=index_param["options"], + ) try: - # create table - self.cursor.execute( - f'CREATE INDEX IF NOT EXISTS {self._index_name} ON public."{self.table_name}" \ - USING vectors (embedding {index_param["metric"]}) WITH (options = $${index_param["options"]}$$);' - ) + log.debug(index_create_sql.as_string(self.cursor)) + self.cursor.execute(index_create_sql) self.conn.commit() except Exception as e: log.warning( - f"Failed to create pgvecto.rs table: {self.table_name} error: {e}" + f"Failed to create pgvecto.rs index {self._index_name} \ + at table {self.table_name} error: {e}" ) raise e from None @@ -122,12 +210,18 @@ def _create_table(self, dim: int): assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" + table_create_sql = sql.SQL( + """ + CREATE TABLE IF NOT EXISTS public.{table_name} + (id BIGINT PRIMARY KEY, embedding vector({dim})) + """ + ).format( + table_name=sql.Identifier(self.table_name), + dim=dim, + ) try: # create table - self.cursor.execute( - f'CREATE TABLE IF NOT EXISTS public."{self.table_name}" \ - (id Integer PRIMARY KEY, embedding vector({dim}));' - ) + self.cursor.execute(table_create_sql) self.conn.commit() except Exception as e: log.warning( @@ -140,7 +234,7 @@ def insert_embeddings( embeddings: list[list[float]], metadata: list[int], **kwargs: Any, - ) -> (int, Exception): + ) -> Tuple[int, Optional[Exception]]: assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" @@ -148,19 +242,27 @@ def insert_embeddings( assert self.cursor is not None, "Cursor is not initialized" try: - items = { - "id": metadata, - "embedding": embeddings - } - df = pd.DataFrame(items) - csv_buffer = io.StringIO() - df.to_csv(csv_buffer, index=False, header=False) - csv_buffer.seek(0) - self.cursor.copy_expert(f"COPY public.\"{self.table_name}\" FROM STDIN WITH (FORMAT CSV)", csv_buffer) + metadata_arr = np.array(metadata) + embeddings_arr = np.array(embeddings) + + with self.cursor.copy( + sql.SQL("COPY public.{table_name} FROM STDIN (FORMAT BINARY)").format( + table_name=sql.Identifier(self.table_name) + ) + ) as copy: + copy.set_types(["bigint", "vector"]) + for i, row in enumerate(metadata_arr): + copy.write_row((row, embeddings_arr[i])) self.conn.commit() + + if kwargs.get("last_batch"): + self._post_insert() + return len(metadata), None except Exception as e: - log.warning(f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}") + log.warning( + f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}" + ) return 0, e def search_embedding( @@ -173,20 +275,18 @@ def search_embedding( assert self.conn is not None, "Connection is not initialized" assert self.cursor is not None, "Cursor is not initialized" - search_param = self.case_config.search_param() + q = np.asarray(query) if filters: + log.debug(self._filtered_search.as_string(self.cursor)) gt = filters.get("id") - self.cursor.execute( - f"SELECT id FROM (SELECT * FROM public.\"{self.table_name}\" ORDER BY embedding \ - {search_param['metrics_op']} '{query}' LIMIT {k}) AS X WHERE id > {gt} ;" + result = self.cursor.execute( + self._filtered_search, (gt, q, k), prepare=True, binary=True ) else: - self.cursor.execute( - f"SELECT id FROM public.\"{self.table_name}\" ORDER BY embedding \ - {search_param['metrics_op']} '{query}' LIMIT {k};" + log.debug(self._unfiltered_search.as_string(self.cursor)) + result = self.cursor.execute( + self._unfiltered_search, (q, k), prepare=True, binary=True ) - self.conn.commit() - result = self.cursor.fetchall() - return [int(i[0]) for i in result] + return [int(i[0]) for i in result.fetchall()] diff --git a/vectordb_bench/backend/clients/pgvector/cli.py b/vectordb_bench/backend/clients/pgvector/cli.py index 31b268231..f1301bb7f 100644 --- a/vectordb_bench/backend/clients/pgvector/cli.py +++ b/vectordb_bench/backend/clients/pgvector/cli.py @@ -1,4 +1,4 @@ -from typing import Annotated, Optional, TypedDict, Unpack +from typing import Annotated, Optional, Unpack import click import os diff --git a/vectordb_bench/backend/clients/redis/redis.py b/vectordb_bench/backend/clients/redis/redis.py index 8acf669d2..6ba766ce2 100644 --- a/vectordb_bench/backend/clients/redis/redis.py +++ b/vectordb_bench/backend/clients/redis/redis.py @@ -1,8 +1,7 @@ import logging from contextlib import contextmanager -from typing import Any, Type -from ..api import VectorDB, DBConfig, DBCaseConfig, EmptyDBCaseConfig, IndexType -from .config import RedisConfig +from typing import Any +from ..api import VectorDB, DBCaseConfig, IndexType import redis from redis.commands.search.field import TagField, VectorField, NumericField from redis.commands.search.indexDefinition import IndexDefinition, IndexType diff --git a/vectordb_bench/backend/clients/test/config.py b/vectordb_bench/backend/clients/test/config.py index 01a77e000..351d7bcac 100644 --- a/vectordb_bench/backend/clients/test/config.py +++ b/vectordb_bench/backend/clients/test/config.py @@ -1,6 +1,6 @@ -from pydantic import BaseModel, SecretStr +from pydantic import BaseModel -from ..api import DBCaseConfig, DBConfig, IndexType, MetricType +from ..api import DBCaseConfig, DBConfig, MetricType class TestConfig(DBConfig): diff --git a/vectordb_bench/cli/vectordbbench.py b/vectordb_bench/cli/vectordbbench.py index 396909cd5..46fe63237 100644 --- a/vectordb_bench/cli/vectordbbench.py +++ b/vectordb_bench/cli/vectordbbench.py @@ -1,4 +1,5 @@ from ..backend.clients.pgvector.cli import PgVectorHNSW +from ..backend.clients.pgvecto_rs.cli import PgVectoRSHNSW, PgVectoRSIVFFlat from ..backend.clients.redis.cli import Redis from ..backend.clients.test.cli import Test from ..backend.clients.weaviate_cloud.cli import Weaviate @@ -9,6 +10,8 @@ from .cli import cli cli.add_command(PgVectorHNSW) +cli.add_command(PgVectoRSHNSW) +cli.add_command(PgVectoRSIVFFlat) cli.add_command(Redis) cli.add_command(Weaviate) cli.add_command(Test) diff --git a/vectordb_bench/frontend/components/run_test/dbSelector.py b/vectordb_bench/frontend/components/run_test/dbSelector.py index ccf0168c6..e20ee5059 100644 --- a/vectordb_bench/frontend/components/run_test/dbSelector.py +++ b/vectordb_bench/frontend/components/run_test/dbSelector.py @@ -22,7 +22,7 @@ def dbSelector(st): dbIsActived[db] = column.checkbox(db.name) try: column.image(DB_TO_ICON.get(db, "")) - except MediaFileStorageError as e: + except MediaFileStorageError: column.warning(f"{db.name} image not available") pass activedDbList = [db for db in DB_LIST if dbIsActived[db]] diff --git a/vectordb_bench/frontend/components/tables/data.py b/vectordb_bench/frontend/components/tables/data.py index 96134c7ff..df2b9fc55 100644 --- a/vectordb_bench/frontend/components/tables/data.py +++ b/vectordb_bench/frontend/components/tables/data.py @@ -1,5 +1,4 @@ from dataclasses import asdict -from vectordb_bench.backend.cases import CaseType from vectordb_bench.interface import benchMarkRunner from vectordb_bench.models import CaseResult, ResultLabel import pandas as pd diff --git a/vectordb_bench/frontend/config/dbCaseConfigs.py b/vectordb_bench/frontend/config/dbCaseConfigs.py index ce8a3a4ae..7f9b97f25 100644 --- a/vectordb_bench/frontend/config/dbCaseConfigs.py +++ b/vectordb_bench/frontend/config/dbCaseConfigs.py @@ -190,8 +190,21 @@ class CaseConfigInput(BaseModel): }, ) +CaseConfigParamInput_IndexType_PgVectoRS = CaseConfigInput( + label=CaseConfigParamType.IndexType, + inputHelp="Select Index Type", + inputType=InputType.Option, + inputConfig={ + "options": [ + IndexType.HNSW.value, + IndexType.IVFFlat.value, + IndexType.Flat.value, + ], + }, +) + CaseConfigParamInput_M = CaseConfigInput( - label=CaseConfigParamType.M, + label=CaseConfigParamType.m, inputType=InputType.Number, inputConfig={ "min": 4, @@ -272,14 +285,26 @@ class CaseConfigInput(BaseModel): CaseConfigParamInput_EFConstruction_PgVectoRS = CaseConfigInput( - label=CaseConfigParamType.EFConstruction, + label=CaseConfigParamType.ef_construction, inputType=InputType.Number, inputConfig={ - "min": 8, - "max": 512, - "value": 360, + "min": 10, + "max": 2000, + "value": 300, }, - isDisplayed=lambda config: config[CaseConfigParamType.IndexType] + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) + == IndexType.HNSW.value, +) + +CaseConfigParamInput_EFSearch_PgVectoRS = CaseConfigInput( + label=CaseConfigParamType.ef_search, + inputType=InputType.Number, + inputConfig={ + "min": 1, + "max": 65535, + "value": 100, + }, + isDisplayed=lambda config: config.get(CaseConfigParamType.IndexType, None) == IndexType.HNSW.value, ) @@ -598,6 +623,7 @@ class CaseConfigInput(BaseModel): == IndexType.HNSW.value, ) + CaseConfigParamInput_QuantizationType_PgVectoRS = CaseConfigInput( label=CaseConfigParamType.quantizationType, inputType=InputType.Option, @@ -626,6 +652,18 @@ class CaseConfigInput(BaseModel): ], ) +CaseConfigParamInput_max_parallel_workers_PgVectorRS = CaseConfigInput( + label=CaseConfigParamType.max_parallel_workers, + displayLabel="Max parallel workers", + inputHelp="Recommended value: (cpu cores - 1). This will set the parameters: [optimizing.optimizing_threads]", + inputType=InputType.Number, + inputConfig={ + "min": 0, + "max": 1024, + "value": 16, + }, +) + CaseConfigParamInput_ZillizLevel = CaseConfigInput( label=CaseConfigParamType.level, inputType=InputType.Number, @@ -707,22 +745,25 @@ class CaseConfigInput(BaseModel): ] PgVectoRSLoadingConfig = [ - CaseConfigParamInput_IndexType, + CaseConfigParamInput_IndexType_PgVectoRS, CaseConfigParamInput_M, CaseConfigParamInput_EFConstruction_PgVectoRS, CaseConfigParamInput_Nlist, CaseConfigParamInput_QuantizationType_PgVectoRS, CaseConfigParamInput_QuantizationRatio_PgVectoRS, + CaseConfigParamInput_max_parallel_workers_PgVectorRS, ] PgVectoRSPerformanceConfig = [ - CaseConfigParamInput_IndexType, + CaseConfigParamInput_IndexType_PgVectoRS, CaseConfigParamInput_M, CaseConfigParamInput_EFConstruction_PgVectoRS, + CaseConfigParamInput_EFSearch_PgVectoRS, CaseConfigParamInput_Nlist, CaseConfigParamInput_Nprobe, CaseConfigParamInput_QuantizationType_PgVectoRS, CaseConfigParamInput_QuantizationRatio_PgVectoRS, + CaseConfigParamInput_max_parallel_workers_PgVectorRS, ] ZillizCloudPerformanceConfig = [ diff --git a/vectordb_bench/models.py b/vectordb_bench/models.py index 56034796e..73845cf5d 100644 --- a/vectordb_bench/models.py +++ b/vectordb_bench/models.py @@ -2,7 +2,7 @@ import pathlib from datetime import date from enum import Enum, StrEnum, auto -from typing import List, Self, Sequence, Set +from typing import List, Self import ujson @@ -10,7 +10,6 @@ DB, DBConfig, DBCaseConfig, - IndexType, ) from .backend.cases import CaseType from .base import BaseModel @@ -46,8 +45,8 @@ class CaseConfigParamType(Enum): numCandidates = "num_candidates" lists = "lists" probes = "probes" - quantizationType = "quantizationType" - quantizationRatio = "quantizationRatio" + quantizationType = "quantization_type" + quantizationRatio = "quantization_ratio" m = "m" nbits = "nbits" intermediate_graph_degree = "intermediate_graph_degree"