Skip to content

Commit

Permalink
Streaming Cases
Browse files Browse the repository at this point in the history
- Modified the 'search_stage' based on the end of the insertion process rather than the start.
- Errors occurring in the search sub-process will no longer interrupt the testing; they will only impact the metrics for that specific stage.
- Only errors from the insert sub-process will halt the overall testing.
- During concurrent search testing, database errors will not terminate the search test. QPS metrics will now only count successful requests.
- Updated the front-end to allow customization of case_config via the UI.
- Implemented a new page ('/streaming') to display results for streaming cases.

Signed-off-by: min.tian <[email protected]>
  • Loading branch information
alwayslove2013 committed Jan 21, 2025
1 parent 491ef6b commit 6f51942
Show file tree
Hide file tree
Showing 29 changed files with 1,246 additions and 343 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
pull_request:
branches:
- main
- vdbbench_*

jobs:
build:
Expand Down
7 changes: 7 additions & 0 deletions vectordb_bench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class config:
DEFAULT_DATASET_URL = env.str("DEFAULT_DATASET_URL", AWS_S3_URL)
DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset")
NUM_PER_BATCH = env.int("NUM_PER_BATCH", 100)
TIME_PER_BATCH = 1 # 1s. for streaming insertion.
MAX_INSERT_RETRY = 5
MAX_SEARCH_RETRY = 5

LOAD_MAX_TRY_COUNT = 10

DROP_OLD = env.bool("DROP_OLD", True)
USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", True)
Expand Down Expand Up @@ -66,6 +71,7 @@ class config:

CAPACITY_TIMEOUT_IN_SECONDS = 24 * 3600 # 24h
LOAD_TIMEOUT_DEFAULT = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_100K = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_1M = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_10M = 240 * 3600 # 10d
LOAD_TIMEOUT_768D_100M = 2400 * 3600 # 100d
Expand All @@ -74,6 +80,7 @@ class config:
LOAD_TIMEOUT_1536D_5M = 240 * 3600 # 10d

OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_100K = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d
OPTIMIZE_TIMEOUT_768D_100M = 2400 * 3600 # 100d
Expand Down
2 changes: 2 additions & 0 deletions vectordb_bench/backend/assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def assemble_all(
runners = [cls.assemble(run_id, task, source) for task in tasks]
load_runners = [r for r in runners if r.ca.label == CaseLabel.Load]
perf_runners = [r for r in runners if r.ca.label == CaseLabel.Performance]
streaming_runners = [r for r in runners if r.ca.label == CaseLabel.Streaming]

# group by db
db2runner = {}
Expand All @@ -58,6 +59,7 @@ def assemble_all(

all_runners = []
all_runners.extend(load_runners)
all_runners.extend(streaming_runners)
for v in db2runner.values():
all_runners.extend(v)

Expand Down
68 changes: 64 additions & 4 deletions vectordb_bench/backend/cases.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from enum import Enum, auto

Expand All @@ -8,7 +9,7 @@
CustomDatasetConfig,
)

from .dataset import CustomDataset, Dataset, DatasetManager
from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,6 +48,8 @@ class CaseType(Enum):
Custom = 100
PerformanceCustomDataset = 101

StreamingPerformanceCase = 200

def case_cls(self, custom_configs: dict | None = None) -> type["Case"]:
if custom_configs is None:
return type2case.get(self)()
Expand All @@ -68,6 +71,7 @@ def case_description(self, custom_configs: dict | None = None) -> str:
class CaseLabel(Enum):
Load = auto()
Performance = auto()
Streaming = auto()


class Case(BaseModel):
Expand All @@ -87,7 +91,7 @@ class Case(BaseModel):
description: str
dataset: DatasetManager

load_timeout: float | int
load_timeout: float | int | None = None
optimize_timeout: float | int | None = None

filter_rate: float | None = None
Expand All @@ -104,14 +108,14 @@ def filters(self) -> dict | None:
return None


class CapacityCase(Case, BaseModel):
class CapacityCase(Case):
label: CaseLabel = CaseLabel.Load
filter_rate: float | None = None
load_timeout: float | int = config.CAPACITY_TIMEOUT_IN_SECONDS
optimize_timeout: float | int | None = None


class PerformanceCase(Case, BaseModel):
class PerformanceCase(Case):
label: CaseLabel = CaseLabel.Performance
filter_rate: float | None = None
load_timeout: float | int = config.LOAD_TIMEOUT_DEFAULT
Expand Down Expand Up @@ -349,6 +353,61 @@ def __init__(
)


class StreamingPerformanceCase(Case):
case_id: CaseType = CaseType.StreamingPerformanceCase
label: CaseLabel = CaseLabel.Streaming
dataset_with_size_type: DatasetWithSizeType
insert_rate: int
search_stages: list[float]
concurrencies: list[int]
optimize_after_write: bool = True
read_dur_after_write: int = 30

def __init__(
self,
dataset_with_size_type: DatasetWithSizeType | str = DatasetWithSizeType.CohereSmall.value,
insert_rate: int = 500,
search_stages: list[float] | str = (0.5, 0.8),
concurrencies: list[int] | str = (5, 10),
**kwargs,
):
num_per_batch = config.NUM_PER_BATCH
if insert_rate % config.NUM_PER_BATCH != 0:
_insert_rate = max(
num_per_batch,
insert_rate // num_per_batch * num_per_batch,
)
log.warning(
f"[streaming_case init] insert_rate(={insert_rate}) should be "
f"divisible by NUM_PER_BATCH={num_per_batch}), reset to {_insert_rate}",
)
insert_rate = _insert_rate
if not isinstance(dataset_with_size_type, DatasetWithSizeType):
dataset_with_size_type = DatasetWithSizeType(dataset_with_size_type)
dataset = dataset_with_size_type.get_manager()
name = f"Streaming-Perf - {dataset_with_size_type.value}, {insert_rate} rows/s"
description = (
"This case tests the search performance of vector database while maintaining "
f"a fixed insertion speed. (dataset: {dataset_with_size_type.value})"
)

if isinstance(search_stages, str):
search_stages = json.loads(search_stages)
if isinstance(concurrencies, str):
concurrencies = json.loads(concurrencies)

super().__init__(
name=name,
description=description,
dataset=dataset,
dataset_with_size_type=dataset_with_size_type,
insert_rate=insert_rate,
search_stages=search_stages,
concurrencies=concurrencies,
**kwargs,
)


type2case = {
CaseType.CapacityDim960: CapacityDim960,
CaseType.CapacityDim128: CapacityDim128,
Expand All @@ -367,4 +426,5 @@ def __init__(
CaseType.Performance1536D5M99P: Performance1536D5M99P,
CaseType.Performance1536D50K: Performance1536D50K,
CaseType.PerformanceCustomDataset: PerformanceCustomDataset,
CaseType.StreamingPerformanceCase: StreamingPerformanceCase,
}
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/milvus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class MilvusConfig(DBConfig):
uri: SecretStr = "http://localhost:19530"
uri: SecretStr = "http://10.102.7.230:19530"
user: str | None = None
password: SecretStr | None = None

Expand Down
93 changes: 75 additions & 18 deletions vectordb_bench/backend/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ class BaseDataset(BaseModel):
with_gt: bool = False
_size_label: dict[int, SizeLabel] = PrivateAttr()
is_custom: bool = False
with_remote_resource: bool = True
with_scalar_labels: bool = False
train_id_field: str = "id"
train_vector_field: str = "emb"
test_file: str = "test.parquet"
test_id_field: str = "id"
test_vector_field: str = "emb"
gt_id_field: str = "id"
gt_neighbors_field: str = "neighbors_id"

@validator("size")
def verify_size(cls, v: int):
Expand All @@ -51,6 +60,10 @@ def verify_size(cls, v: int):
def label(self) -> str:
return self._size_label.get(self.size).label

@property
def full_name(self) -> str:
return f"{self.name.capitalize()} ({self.label.capitalize()})"

@property
def dir_name(self) -> str:
return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower()
Expand All @@ -59,11 +72,16 @@ def dir_name(self) -> str:
def file_count(self) -> int:
return self._size_label.get(self.size).file_count

@property
def train_files(self) -> list[str]:
return utils.compose_train_files(self.file_count, self.use_shuffled)


class CustomDataset(BaseDataset):
dir: str
file_num: int
is_custom: bool = True
with_remote_resource: bool = False

@validator("size")
def verify_size(cls, v: int):
Expand Down Expand Up @@ -109,7 +127,7 @@ class Cohere(BaseDataset):
dim: int = 768
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
with_gt: bool = (True,)
with_gt: bool = True
_size_label: dict = {
100_000: SizeLabel(100_000, "SMALL", 1),
1_000_000: SizeLabel(1_000_000, "MEDIUM", 1),
Expand Down Expand Up @@ -146,7 +164,7 @@ class OpenAI(BaseDataset):
dim: int = 1536
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
with_gt: bool = (True,)
with_gt: bool = True
_size_label: dict = {
50_000: SizeLabel(50_000, "SMALL", 1),
500_000: SizeLabel(500_000, "MEDIUM", 1),
Expand All @@ -166,8 +184,8 @@ class DatasetManager(BaseModel):
"""

data: BaseDataset
test_data: pd.DataFrame | None = None
gt_data: pd.DataFrame | None = None
test_data: list[list[float]] | None = None
gt_data: list[list[int]] | None = None
train_files: list[str] = []
reader: DatasetReader | None = None

Expand All @@ -191,7 +209,7 @@ def data_dir(self) -> pathlib.Path:
return pathlib.Path(
config.DATASET_LOCAL_DIR,
self.data.name.lower(),
self.data.dir_name.lower(),
self.data.dir_name,
)

def __iter__(self):
Expand All @@ -215,29 +233,24 @@ def prepare(
bool: whether the dataset is successfully prepared
"""
file_count, use_shuffled = self.data.file_count, self.data.use_shuffled

train_files = utils.compose_train_files(file_count, use_shuffled)
all_files = train_files

self.train_files = self.data.train_files
gt_file, test_file = None, None
if self.data.with_gt:
gt_file, test_file = utils.compose_gt_file(filters), "test.parquet"
all_files.extend([gt_file, test_file])
gt_file, test_file = utils.compose_gt_file(filters), self.data.test_file

if not self.data.is_custom:
if self.data.with_remote_resource:
download_files = [file for file in self.train_files]
download_files.extend([gt_file, test_file])
source.reader().read(
dataset=self.data.dir_name.lower(),
files=all_files,
files=download_files,
local_ds_root=self.data_dir,
)

if gt_file is not None and test_file is not None:
self.test_data = self._read_file(test_file)
self.gt_data = self._read_file(gt_file)
self.test_data = self._read_file(test_file)[self.data.test_vector_field].to_list()
self.gt_data = self._read_file(gt_file)[self.data.gt_neighbors_field].to_list()

prefix = "shuffle_train" if use_shuffled else "train"
self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")])
log.debug(f"{self.data.name}: available train files {self.train_files}")

return True
Expand Down Expand Up @@ -313,3 +326,47 @@ def get(self, size: int) -> BaseDataset:

def manager(self, size: int) -> DatasetManager:
return DatasetManager(data=self.get(size))


class DatasetWithSizeType(Enum):
CohereSmall = "Small Cohere (768dim, 100K)"
CohereMedium = "Medium Cohere (768dim, 1M)"
CohereLarge = "Large Cohere (768dim, 10M)"
OpenAISmall = "Small OpenAI (1536dim, 50K)"
OpenAIMedium = "Medium OpenAI (1536dim, 500K)"
OpenAILarge = "Large OpenAI (1536dim, 5M)"

def get_manager(self) -> DatasetManager:
if self not in DatasetWithSizeMap:
msg = f"wrong ScalarDatasetWithSizeType: {self.name}"
raise ValueError(msg)
return DatasetWithSizeMap.get(self)

def get_load_timeout(self) -> float:
if "small" in self.value.lower():
return config.LOAD_TIMEOUT_768D_100K
if "medium" in self.value.lower():
return config.LOAD_TIMEOUT_768D_1M
if "large" in self.value.lower():
return config.LOAD_TIMEOUT_768D_10M
msg = f"No load_timeout for {self.value}"
raise KeyError(msg)

def get_optimize_timeout(self) -> float:
if "small" in self.value.lower():
return config.OPTIMIZE_TIMEOUT_768D_100K
if "medium" in self.value.lower():
return config.OPTIMIZE_TIMEOUT_768D_1M
if "large" in self.value.lower():
return config.OPTIMIZE_TIMEOUT_768D_10M
return config.OPTIMIZE_TIMEOUT_DEFAULT


DatasetWithSizeMap = {
DatasetWithSizeType.CohereSmall: Dataset.COHERE.manager(100_000),
DatasetWithSizeType.CohereMedium: Dataset.COHERE.manager(1_000_000),
DatasetWithSizeType.CohereLarge: Dataset.COHERE.manager(10_000_000),
DatasetWithSizeType.OpenAISmall: Dataset.OPENAI.manager(50_000),
DatasetWithSizeType.OpenAIMedium: Dataset.OPENAI.manager(500_000),
DatasetWithSizeType.OpenAILarge: Dataset.OPENAI.manager(5_000_000),
}
6 changes: 3 additions & 3 deletions vectordb_bench/backend/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .mp_runner import (
MultiProcessingSearchRunner,
)
from .mp_runner import MultiProcessingSearchRunner
from .read_write_runner import ReadWriteRunner
from .serial_runner import SerialInsertRunner, SerialSearchRunner

__all__ = [
"MultiProcessingSearchRunner",
"ReadWriteRunner",
"SerialInsertRunner",
"SerialSearchRunner",
]
Loading

0 comments on commit 6f51942

Please sign in to comment.