Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Unable to run vebbench and cli #447

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ lint.ignore = [
"RUF017",
"C416",
"PLW0603",
"COM812",
]

# Allow autofix for all enabled rules (when `--fix`) is provided.
Expand Down
14 changes: 12 additions & 2 deletions vectordb_bench/backend/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class DB(Enum):
AliyunOpenSearch = "AliyunOpenSearch"

@property
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901
"""Import while in use"""
if self == DB.Milvus:
from .milvus.milvus import Milvus
Expand Down Expand Up @@ -129,11 +129,16 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912

return AliyunOpenSearch

if self == DB.Test:
from .test.test import Test

return Test

msg = f"Unknown DB: {self.name}"
raise ValueError(msg)

@property
def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912
def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901
"""Import while in use"""
if self == DB.Milvus:
from .milvus.config import MilvusConfig
Expand Down Expand Up @@ -220,6 +225,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912

return AliyunOpenSearchConfig

if self == DB.Test:
from .test.config import TestConfig

return TestConfig

msg = f"Unknown DB: {self.name}"
raise ValueError(msg)

Expand Down
4 changes: 2 additions & 2 deletions vectordb_bench/backend/clients/memorydb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class MemoryDBTypedDict(TypedDict):
show_default=True,
default=False,
help=(
"Cluster Mode Disabled (CMD), use this flag when testing locally on a single node instance.",
" In production, MemoryDB only supports cluster mode (CME)",
"Cluster Mode Disabled (CMD), use this flag when testing locally on a single node instance."
" In production, MemoryDB only supports cluster mode (CME)"
),
),
]
Expand Down
9 changes: 2 additions & 7 deletions vectordb_bench/backend/clients/pgvecto_rs/pgvecto_rs.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ def _create_index(self):
self.cursor.execute(index_create_sql)
self.conn.commit()
except Exception as e:
log.warning(
f"Failed to create pgvecto.rs index {self._index_name} \
at table {self.table_name} error: {e}",
)
log.warning(f"Failed to create pgvecto.rs index {self._index_name} at table {self.table_name} error: {e}")
raise e from None

def _create_table(self, dim: int):
Expand Down Expand Up @@ -258,9 +255,7 @@ def insert_embeddings(

return len(metadata), None
except Exception as e:
log.warning(
f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}",
)
log.warning(f"Failed to insert data into pgvecto.rs table ({self.table_name}), error: {e}")
return 0, e

def search_embedding(
Expand Down
4 changes: 1 addition & 3 deletions vectordb_bench/backend/clients/pgvector/pgvector.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,7 @@ def insert_embeddings(

return len(metadata), None
except Exception as e:
log.warning(
f"Failed to insert data into pgvector table ({self.table_name}), error: {e}",
)
log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}")
return 0, e

def search_embedding(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,7 @@ def insert_embeddings(

return len(metadata), None
except Exception as e:
log.warning(
f"Failed to insert data into pgvector table ({self.table_name}), error: {e}",
)
log.warning(f"Failed to insert data into pgvector table ({self.table_name}), error: {e}")
return 0, e

def search_embedding(
Expand Down
4 changes: 2 additions & 2 deletions vectordb_bench/backend/clients/qdrant_cloud/qdrant_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def optimize(self):
continue
if info.status == CollectionStatus.GREEN:
msg = (
f"Stored vectors: {info.vectors_count}, Indexed vectors: {info.indexed_vectors_count}, ",
f"Collection status: {info.indexed_vectors_count}",
f"Stored vectors: {info.vectors_count}, Indexed vectors: {info.indexed_vectors_count}, "
f"Collection status: {info.indexed_vectors_count}"
)
log.info(msg)
return
Expand Down
2 changes: 1 addition & 1 deletion vectordb_bench/backend/clients/test/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TestTypedDict(CommonTypedDict): ...
@click_parameter_decorators_from_typed_dict(TestTypedDict)
def Test(**parameters: Unpack[TestTypedDict]):
run(
db=DB.NewClient,
db=DB.Test,
db_config=TestConfig(db_label=parameters["db_label"]),
db_case_config=TestIndexConfig(),
**parameters,
Expand Down
16 changes: 4 additions & 12 deletions vectordb_bench/backend/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:
# check size equal
remote_size, local_size = info.content_length, local.stat().st_size
if remote_size != local_size:
log.info(
f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]",
)
log.info(f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]")
return False

return True
Expand All @@ -89,9 +87,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
local_file = local_ds_root.joinpath(file)

if (not local_file.exists()) or (not self.validate_file(remote_file, local_file)):
log.info(
f"local file: {local_file} not match with remote: {remote_file}; add to downloading list",
)
log.info(f"local file: {local_file} not match with remote: {remote_file}; add to downloading list")
downloads.append((remote_file, local_file))

if len(downloads) == 0:
Expand Down Expand Up @@ -135,9 +131,7 @@ def read(self, dataset: str, files: list[str], local_ds_root: pathlib.Path):
local_file = local_ds_root.joinpath(file)

if (not local_file.exists()) or (not self.validate_file(remote_file, local_file)):
log.info(
f"local file: {local_file} not match with remote: {remote_file}; add to downloading list",
)
log.info(f"local file: {local_file} not match with remote: {remote_file}; add to downloading list")
downloads.append(remote_file)

if len(downloads) == 0:
Expand All @@ -157,9 +151,7 @@ def validate_file(self, remote: pathlib.Path, local: pathlib.Path) -> bool:
# check size equal
remote_size, local_size = info.get("size"), local.stat().st_size
if remote_size != local_size:
log.info(
f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]",
)
log.info(f"local file: {local} size[{local_size}] not match with remote size[{remote_size}]")
return False

return True
50 changes: 16 additions & 34 deletions vectordb_bench/backend/runner/mp_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,22 @@ def search(

if count % 500 == 0:
log.debug(
f"({mp.current_process().name:16}) ",
f"search_count: {count}, latest_latency={time.perf_counter()-s}",
f"({mp.current_process().name:16}) "
f"search_count: {count}, latest_latency={time.perf_counter()-s}"
)

total_dur = round(time.perf_counter() - start_time, 4)
log.info(
f"{mp.current_process().name:16} search {self.duration}s: "
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}",
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}"
)

return (count, total_dur, latencies)

@staticmethod
def get_mp_context():
mp_start_method = "spawn"
log.debug(
f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}",
)
log.debug(f"MultiProcessingSearchRunner get multiprocessing start method: {mp_start_method}")
return mp.get_context(mp_start_method)

def _run_all_concurrencies_mem_efficient(self):
Expand All @@ -113,9 +111,7 @@ def _run_all_concurrencies_mem_efficient(self):
mp_context=self.get_mp_context(),
max_workers=conc,
) as executor:
log.info(
f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}",
)
log.info(f"Start search {self.duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = [executor.submit(self.search, self.test_data, q, cond) for i in range(conc)]
# Sync all processes
while q.qsize() < conc:
Expand All @@ -124,9 +120,7 @@ def _run_all_concurrencies_mem_efficient(self):

with cond:
cond.notify_all()
log.info(
f"Syncing all process and start concurrency search, concurrency={conc}",
)
log.info(f"Syncing all process and start concurrency search, concurrency={conc}")

start = time.perf_counter()
all_count = sum([r.result()[0] for r in future_iter])
Expand All @@ -140,18 +134,14 @@ def _run_all_concurrencies_mem_efficient(self):
conc_qps_list.append(qps)
conc_latency_p99_list.append(latency_p99)
conc_latency_avg_list.append(latency_avg)
log.info(
f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}",
)
log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")

if qps > max_qps:
max_qps = qps
log.info(
f"Update largest qps with concurrency {conc}: current max_qps={max_qps}",
)
log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}")
except Exception as e:
log.warning(
f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}",
f"Fail to search, concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}"
)
traceback.print_exc()

Expand Down Expand Up @@ -193,9 +183,7 @@ def _run_by_dur(self, duration: int) -> float:
mp_context=self.get_mp_context(),
max_workers=conc,
) as executor:
log.info(
f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}",
)
log.info(f"Start search_by_dur {duration}s in concurrency {conc}, filters: {self.filters}")
future_iter = [
executor.submit(self.search_by_dur, duration, self.test_data, q, cond) for i in range(conc)
]
Expand All @@ -206,24 +194,18 @@ def _run_by_dur(self, duration: int) -> float:

with cond:
cond.notify_all()
log.info(
f"Syncing all process and start concurrency search, concurrency={conc}",
)
log.info(f"Syncing all process and start concurrency search, concurrency={conc}")

start = time.perf_counter()
all_count = sum([r.result() for r in future_iter])
cost = time.perf_counter() - start

qps = round(all_count / cost, 4)
log.info(
f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}",
)
log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}")

if qps > max_qps:
max_qps = qps
log.info(
f"Update largest qps with concurrency {conc}: current max_qps={max_qps}",
)
log.info(f"Update largest qps with concurrency {conc}: current max_qps={max_qps}")
except Exception as e:
log.warning(
f"Fail to search all concurrencies: {self.concurrencies}, max_qps before failure={max_qps}, reason={e}",
Expand Down Expand Up @@ -275,14 +257,14 @@ def search_by_dur(

if count % 500 == 0:
log.debug(
f"({mp.current_process().name:16}) search_count: {count}, ",
f"latest_latency={time.perf_counter()-s}",
f"({mp.current_process().name:16}) search_count: {count}, "
f"latest_latency={time.perf_counter()-s}"
)

total_dur = round(time.perf_counter() - start_time, 4)
log.debug(
f"{mp.current_process().name:16} search {self.duration}s: "
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}",
f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}"
)

return count
8 changes: 4 additions & 4 deletions vectordb_bench/backend/runner/rate_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ def submit_by_rate() -> bool:

if len(not_done) > 0:
log.warning(
f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] ",
f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round",
f"Failed to finish all tasks in 1s, [{len(not_done)}/{len(executing_futures)}] "
f"tasks are not done, waited={wait_interval:.2f}, trying to wait in the next round"
)
executing_futures = list(not_done)
else:
log.debug(
f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} ",
f"task in 1s, wait_interval={wait_interval:.2f}",
f"Finished {len(executing_futures)} insert-{config.NUM_PER_BATCH} "
f"task in 1s, wait_interval={wait_interval:.2f}"
)
executing_futures = []
except Exception as e:
Expand Down
24 changes: 10 additions & 14 deletions vectordb_bench/backend/runner/read_write_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def __init__(
self.read_dur_after_write = read_dur_after_write

log.info(
f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, ",
f"stage_search_dur={read_dur_after_write}",
f"Init runner, concurencys={concurrencies}, search_stage={search_stage}, "
f"stage_search_dur={read_dur_after_write}"
)

test_emb = np.stack(dataset.test_data["emb"])
Expand Down Expand Up @@ -88,12 +88,10 @@ def run_search(self):
res, ssearch_dur = self.serial_search_runner.run()
recall, ndcg, p99_latency = res
log.info(
f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, ",
f"Search after write - Serial search - recall={recall}, ndcg={ndcg}, p99={p99_latency}, "
f"dur={ssearch_dur:.4f}",
)
log.info(
f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}",
)
log.info(f"Search after wirte - Conc search start, dur for each conc={self.read_dur_after_write}")
max_qps = self.run_by_dur(self.read_dur_after_write)
log.info(f"Search after wirte - Conc search finished, max_qps={max_qps}")

Expand Down Expand Up @@ -157,18 +155,16 @@ def wait_next_target(start: int, target_batch: int) -> bool:

got = wait_next_target(start_batch, target_batch)
if got is False:
log.warning(
f"Abnormal exit, target_batch={target_batch}, start_batch={start_batch}",
)
log.warning(f"Abnormal exit, target_batch={target_batch}, start_batch={start_batch}")
return None

log.info(f"Insert {perc}% done, total batch={total_batch}")
log.info(f"[{target_batch}/{total_batch}] Serial search - {perc}% start")
res, ssearch_dur = self.serial_search_runner.run()
recall, ndcg, p99_latency = res
log.info(
f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, ",
f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}",
f"[{target_batch}/{total_batch}] Serial search - {perc}% done, recall={recall}, "
f"ndcg={ndcg}, p99={p99_latency}, dur={ssearch_dur:.4f}"
)

# Search duration for non-last search stage is carefully calculated.
Expand All @@ -183,8 +179,8 @@ def wait_next_target(start: int, target_batch: int) -> bool:
each_conc_search_dur = csearch_dur / len(self.concurrencies)
if each_conc_search_dur < 30:
warning_msg = (
f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, ",
f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}.",
f"Results might be inaccurate, duration[{csearch_dur:.4f}] left for conc-search is too short, "
f"total available dur={total_dur_between_stages}, serial_search_cost={ssearch_dur}."
)
log.warning(warning_msg)

Expand All @@ -193,7 +189,7 @@ def wait_next_target(start: int, target_batch: int) -> bool:
each_conc_search_dur = 60

log.info(
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}",
f"[{target_batch}/{total_batch}] Concurrent search - {perc}% start, dur={each_conc_search_dur:.4f}"
)
max_qps = self.run_by_dur(each_conc_search_dur)
result.append((perc, max_qps, recall, ndcg, p99_latency))
Expand Down
Loading
Loading