Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Cases #454

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
pull_request:
branches:
- main
- vdbbench_*

jobs:
build:
Expand Down
7 changes: 7 additions & 0 deletions vectordb_bench/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class config:
DEFAULT_DATASET_URL = env.str("DEFAULT_DATASET_URL", AWS_S3_URL)
DATASET_LOCAL_DIR = env.path("DATASET_LOCAL_DIR", "/tmp/vectordb_bench/dataset")
NUM_PER_BATCH = env.int("NUM_PER_BATCH", 100)
TIME_PER_BATCH = 1 # 1s. for streaming insertion.
MAX_INSERT_RETRY = 5
MAX_SEARCH_RETRY = 5

LOAD_MAX_TRY_COUNT = 10

DROP_OLD = env.bool("DROP_OLD", True)
USE_SHUFFLED_DATA = env.bool("USE_SHUFFLED_DATA", True)
Expand Down Expand Up @@ -66,6 +71,7 @@ class config:

CAPACITY_TIMEOUT_IN_SECONDS = 24 * 3600 # 24h
LOAD_TIMEOUT_DEFAULT = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_100K = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_1M = 24 * 3600 # 24h
LOAD_TIMEOUT_768D_10M = 240 * 3600 # 10d
LOAD_TIMEOUT_768D_100M = 2400 * 3600 # 100d
Expand All @@ -74,6 +80,7 @@ class config:
LOAD_TIMEOUT_1536D_5M = 240 * 3600 # 10d

OPTIMIZE_TIMEOUT_DEFAULT = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_100K = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_1M = 24 * 3600 # 24h
OPTIMIZE_TIMEOUT_768D_10M = 240 * 3600 # 10d
OPTIMIZE_TIMEOUT_768D_100M = 2400 * 3600 # 100d
Expand Down
2 changes: 2 additions & 0 deletions vectordb_bench/backend/assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def assemble_all(
runners = [cls.assemble(run_id, task, source) for task in tasks]
load_runners = [r for r in runners if r.ca.label == CaseLabel.Load]
perf_runners = [r for r in runners if r.ca.label == CaseLabel.Performance]
streaming_runners = [r for r in runners if r.ca.label == CaseLabel.Streaming]

# group by db
db2runner = {}
Expand All @@ -58,6 +59,7 @@ def assemble_all(

all_runners = []
all_runners.extend(load_runners)
all_runners.extend(streaming_runners)
for v in db2runner.values():
all_runners.extend(v)

Expand Down
68 changes: 64 additions & 4 deletions vectordb_bench/backend/cases.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from enum import Enum, auto

Expand All @@ -8,7 +9,7 @@
CustomDatasetConfig,
)

from .dataset import CustomDataset, Dataset, DatasetManager
from .dataset import CustomDataset, Dataset, DatasetManager, DatasetWithSizeType

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,6 +48,8 @@ class CaseType(Enum):
Custom = 100
PerformanceCustomDataset = 101

StreamingPerformanceCase = 200

def case_cls(self, custom_configs: dict | None = None) -> type["Case"]:
if custom_configs is None:
return type2case.get(self)()
Expand All @@ -68,6 +71,7 @@ def case_description(self, custom_configs: dict | None = None) -> str:
class CaseLabel(Enum):
Load = auto()
Performance = auto()
Streaming = auto()


class Case(BaseModel):
Expand All @@ -87,7 +91,7 @@ class Case(BaseModel):
description: str
dataset: DatasetManager

load_timeout: float | int
load_timeout: float | int | None = None
optimize_timeout: float | int | None = None

filter_rate: float | None = None
Expand All @@ -104,14 +108,14 @@ def filters(self) -> dict | None:
return None


class CapacityCase(Case, BaseModel):
class CapacityCase(Case):
label: CaseLabel = CaseLabel.Load
filter_rate: float | None = None
load_timeout: float | int = config.CAPACITY_TIMEOUT_IN_SECONDS
optimize_timeout: float | int | None = None


class PerformanceCase(Case, BaseModel):
class PerformanceCase(Case):
label: CaseLabel = CaseLabel.Performance
filter_rate: float | None = None
load_timeout: float | int = config.LOAD_TIMEOUT_DEFAULT
Expand Down Expand Up @@ -349,6 +353,61 @@ def __init__(
)


class StreamingPerformanceCase(Case):
case_id: CaseType = CaseType.StreamingPerformanceCase
label: CaseLabel = CaseLabel.Streaming
dataset_with_size_type: DatasetWithSizeType
insert_rate: int
search_stages: list[float]
concurrencies: list[int]
optimize_after_write: bool = True
read_dur_after_write: int = 30

def __init__(
self,
dataset_with_size_type: DatasetWithSizeType | str = DatasetWithSizeType.CohereSmall.value,
insert_rate: int = 500,
search_stages: list[float] | str = (0.5, 0.8),
concurrencies: list[int] | str = (5, 10),
**kwargs,
):
num_per_batch = config.NUM_PER_BATCH
if insert_rate % config.NUM_PER_BATCH != 0:
_insert_rate = max(
num_per_batch,
insert_rate // num_per_batch * num_per_batch,
)
log.warning(
f"[streaming_case init] insert_rate(={insert_rate}) should be "
f"divisible by NUM_PER_BATCH={num_per_batch}), reset to {_insert_rate}",
)
insert_rate = _insert_rate
if not isinstance(dataset_with_size_type, DatasetWithSizeType):
dataset_with_size_type = DatasetWithSizeType(dataset_with_size_type)
dataset = dataset_with_size_type.get_manager()
name = f"Streaming-Perf - {dataset_with_size_type.value}, {insert_rate} rows/s"
description = (
"This case tests the search performance of vector database while maintaining "
f"a fixed insertion speed. (dataset: {dataset_with_size_type.value})"
)

if isinstance(search_stages, str):
search_stages = json.loads(search_stages)
if isinstance(concurrencies, str):
concurrencies = json.loads(concurrencies)

super().__init__(
name=name,
description=description,
dataset=dataset,
dataset_with_size_type=dataset_with_size_type,
insert_rate=insert_rate,
search_stages=search_stages,
concurrencies=concurrencies,
**kwargs,
)


type2case = {
CaseType.CapacityDim960: CapacityDim960,
CaseType.CapacityDim128: CapacityDim128,
Expand All @@ -367,4 +426,5 @@ def __init__(
CaseType.Performance1536D5M99P: Performance1536D5M99P,
CaseType.Performance1536D50K: Performance1536D50K,
CaseType.PerformanceCustomDataset: PerformanceCustomDataset,
CaseType.StreamingPerformanceCase: StreamingPerformanceCase,
}
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/milvus/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class MilvusConfig(DBConfig):
uri: SecretStr = "http://localhost:19530"
uri: SecretStr = "http://10.102.7.230:19530"
user: str | None = None
password: SecretStr | None = None

Expand Down
93 changes: 75 additions & 18 deletions vectordb_bench/backend/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ class BaseDataset(BaseModel):
with_gt: bool = False
_size_label: dict[int, SizeLabel] = PrivateAttr()
is_custom: bool = False
with_remote_resource: bool = True
with_scalar_labels: bool = False
train_id_field: str = "id"
train_vector_field: str = "emb"
test_file: str = "test.parquet"
test_id_field: str = "id"
test_vector_field: str = "emb"
gt_id_field: str = "id"
gt_neighbors_field: str = "neighbors_id"

@validator("size")
def verify_size(cls, v: int):
Expand All @@ -51,6 +60,10 @@ def verify_size(cls, v: int):
def label(self) -> str:
return self._size_label.get(self.size).label

@property
def full_name(self) -> str:
return f"{self.name.capitalize()} ({self.label.capitalize()})"

@property
def dir_name(self) -> str:
return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower()
Expand All @@ -59,11 +72,16 @@ def dir_name(self) -> str:
def file_count(self) -> int:
return self._size_label.get(self.size).file_count

@property
def train_files(self) -> list[str]:
return utils.compose_train_files(self.file_count, self.use_shuffled)


class CustomDataset(BaseDataset):
dir: str
file_num: int
is_custom: bool = True
with_remote_resource: bool = False

@validator("size")
def verify_size(cls, v: int):
Expand Down Expand Up @@ -109,7 +127,7 @@ class Cohere(BaseDataset):
dim: int = 768
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
with_gt: bool = (True,)
with_gt: bool = True
_size_label: dict = {
100_000: SizeLabel(100_000, "SMALL", 1),
1_000_000: SizeLabel(1_000_000, "MEDIUM", 1),
Expand Down Expand Up @@ -146,7 +164,7 @@ class OpenAI(BaseDataset):
dim: int = 1536
metric_type: MetricType = MetricType.COSINE
use_shuffled: bool = config.USE_SHUFFLED_DATA
with_gt: bool = (True,)
with_gt: bool = True
_size_label: dict = {
50_000: SizeLabel(50_000, "SMALL", 1),
500_000: SizeLabel(500_000, "MEDIUM", 1),
Expand All @@ -166,8 +184,8 @@ class DatasetManager(BaseModel):
"""

data: BaseDataset
test_data: pd.DataFrame | None = None
gt_data: pd.DataFrame | None = None
test_data: list[list[float]] | None = None
gt_data: list[list[int]] | None = None
train_files: list[str] = []
reader: DatasetReader | None = None

Expand All @@ -191,7 +209,7 @@ def data_dir(self) -> pathlib.Path:
return pathlib.Path(
config.DATASET_LOCAL_DIR,
self.data.name.lower(),
self.data.dir_name.lower(),
self.data.dir_name,
)

def __iter__(self):
Expand All @@ -215,29 +233,24 @@ def prepare(
bool: whether the dataset is successfully prepared

"""
file_count, use_shuffled = self.data.file_count, self.data.use_shuffled

train_files = utils.compose_train_files(file_count, use_shuffled)
all_files = train_files

self.train_files = self.data.train_files
gt_file, test_file = None, None
if self.data.with_gt:
gt_file, test_file = utils.compose_gt_file(filters), "test.parquet"
all_files.extend([gt_file, test_file])
gt_file, test_file = utils.compose_gt_file(filters), self.data.test_file

if not self.data.is_custom:
if self.data.with_remote_resource:
download_files = [file for file in self.train_files]
download_files.extend([gt_file, test_file])
source.reader().read(
dataset=self.data.dir_name.lower(),
files=all_files,
files=download_files,
local_ds_root=self.data_dir,
)

if gt_file is not None and test_file is not None:
self.test_data = self._read_file(test_file)
self.gt_data = self._read_file(gt_file)
self.test_data = self._read_file(test_file)[self.data.test_vector_field].to_list()
self.gt_data = self._read_file(gt_file)[self.data.gt_neighbors_field].to_list()

prefix = "shuffle_train" if use_shuffled else "train"
self.train_files = sorted([f.name for f in self.data_dir.glob(f"{prefix}*.parquet")])
log.debug(f"{self.data.name}: available train files {self.train_files}")

return True
Expand Down Expand Up @@ -313,3 +326,47 @@ def get(self, size: int) -> BaseDataset:

def manager(self, size: int) -> DatasetManager:
return DatasetManager(data=self.get(size))


class DatasetWithSizeType(Enum):
CohereSmall = "Small Cohere (768dim, 100K)"
CohereMedium = "Medium Cohere (768dim, 1M)"
CohereLarge = "Large Cohere (768dim, 10M)"
OpenAISmall = "Small OpenAI (1536dim, 50K)"
OpenAIMedium = "Medium OpenAI (1536dim, 500K)"
OpenAILarge = "Large OpenAI (1536dim, 5M)"

def get_manager(self) -> DatasetManager:
if self not in DatasetWithSizeMap:
msg = f"wrong ScalarDatasetWithSizeType: {self.name}"
raise ValueError(msg)
return DatasetWithSizeMap.get(self)

def get_load_timeout(self) -> float:
if "small" in self.value.lower():
return config.LOAD_TIMEOUT_768D_100K
if "medium" in self.value.lower():
return config.LOAD_TIMEOUT_768D_1M
if "large" in self.value.lower():
return config.LOAD_TIMEOUT_768D_10M
msg = f"No load_timeout for {self.value}"
raise KeyError(msg)

def get_optimize_timeout(self) -> float:
if "small" in self.value.lower():
return config.OPTIMIZE_TIMEOUT_768D_100K
if "medium" in self.value.lower():
return config.OPTIMIZE_TIMEOUT_768D_1M
if "large" in self.value.lower():
return config.OPTIMIZE_TIMEOUT_768D_10M
return config.OPTIMIZE_TIMEOUT_DEFAULT


DatasetWithSizeMap = {
DatasetWithSizeType.CohereSmall: Dataset.COHERE.manager(100_000),
DatasetWithSizeType.CohereMedium: Dataset.COHERE.manager(1_000_000),
DatasetWithSizeType.CohereLarge: Dataset.COHERE.manager(10_000_000),
DatasetWithSizeType.OpenAISmall: Dataset.OPENAI.manager(50_000),
DatasetWithSizeType.OpenAIMedium: Dataset.OPENAI.manager(500_000),
DatasetWithSizeType.OpenAILarge: Dataset.OPENAI.manager(5_000_000),
}
6 changes: 3 additions & 3 deletions vectordb_bench/backend/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .mp_runner import (
MultiProcessingSearchRunner,
)
from .mp_runner import MultiProcessingSearchRunner
from .read_write_runner import ReadWriteRunner
from .serial_runner import SerialInsertRunner, SerialSearchRunner

__all__ = [
"MultiProcessingSearchRunner",
"ReadWriteRunner",
"SerialInsertRunner",
"SerialSearchRunner",
]
Loading
Loading