From 3a98f391067dca298a3a84a19d42867958d41f85 Mon Sep 17 00:00:00 2001 From: Vijayan Balasubramanian Date: Sun, 17 Dec 2023 16:56:59 -0800 Subject: [PATCH] Add vector search param source Added new param source to partition vector dataset and neighbors. This will be passed to runner to perform search and compare response with neighbors for recall calculation. This param source extends Search ParamSource to inherit search's other query parameters. Vector Param Source will add additional paramter that are required for vector serach operation type. Signed-off-by: Vijayan Balasubramanian --- osbenchmark/workload/params.py | 186 ++++++++++++++++++++++++- tests/workload/params_test.py | 244 ++++++++++++++++++++++++++++++++- 2 files changed, 426 insertions(+), 4 deletions(-) diff --git a/osbenchmark/workload/params.py b/osbenchmark/workload/params.py index 5cf0bd59a..b7f89cc19 100644 --- a/osbenchmark/workload/params.py +++ b/osbenchmark/workload/params.py @@ -23,20 +23,22 @@ # under the License. import collections +import copy import inspect import logging import math import numbers import operator import random -from abc import ABC - import time +from abc import ABC, abstractmethod from enum import Enum from osbenchmark import exceptions -from osbenchmark.workload import workload from osbenchmark.utils import io +from osbenchmark.utils.dataset import DataSet, get_data_set, Context +from osbenchmark.utils.parse import parse_string_parameter, parse_int_parameter +from osbenchmark.workload import workload __PARAM_SOURCES_BY_OP = {} __PARAM_SOURCES_BY_NAME = {} @@ -797,6 +799,183 @@ def params(self): parsed_params.update(self._client_params()) return parsed_params + +class VectorSearchParamSource(SearchParamSource): + def __init__(self, workload, params, **kwargs): + super().__init__(workload, params, **kwargs) + self.delegate_param_source = VectorSearchPartitionPartitionParamSource(params, self.query_params) + + def partition(self, partition_index, total_partitions): + return self.delegate_param_source.partition(partition_index, total_partitions) + + +class VectorDataSetPartitionParamSource(ABC): + """ Abstract class that can read vectors from a data set and partition the + vectors across multiple clients. + + Attributes: + field_name: Name of the field to generate the query for + data_set_format: Format data set is serialized with. bigann or hdf5 + data_set_path: Path to data set + context: Context the data set will be used in. + data_set: Structure containing meta data about data and ability to read + num_vectors: Number of vectors to use from the data set + total: Number of vectors for the partition + current: Current vector offset in data set + infinite: Property of param source signalling that it can be exhausted + percent_completed: Progress indicator for how exhausted data set is + offset: Offset into the data set to start at. Relevant when there are + multiple partitions + """ + + def __init__(self, params, context: Context): + self.field_name: str = parse_string_parameter("field", params) + + self.context = context + self.data_set_format = parse_string_parameter("data_set_format", params) + self.data_set_path = parse_string_parameter("data_set_path", params) + self.data_set: DataSet = get_data_set( + self.data_set_format, self.data_set_path, self.context) + + num_vectors: int = parse_int_parameter( + "num_vectors", params, self.data_set.size()) + # if value is -1 or greater than dataset size, use dataset size as num_vectors + self.num_vectors = self.data_set.size() if ( + num_vectors < 0 or num_vectors > self.data_set.size()) else num_vectors + self.total = self.num_vectors + self.current = 0 + self.infinite = False + self.percent_completed = 0 + self.offset = 0 + + def _is_last_partition(self, partition_index, total_partitions): + return partition_index == total_partitions - 1 + + def partition(self, partition_index, total_partitions): + """ + Splits up the parameters source so that multiple clients can read data + from it. + Args: + partition_index: index of one particular partition + total_partitions: total number of partitions data set is split into + + Returns: + The parameter source for this particular partition + """ + partition_x = copy.copy(self) + + num_vectors = int(self.num_vectors / total_partitions) + + # if partition is not divided equally, add extra docs to the last partition + if self.num_vectors % total_partitions != 0 and self._is_last_partition(partition_index, total_partitions): + num_vectors += self.num_vectors - (num_vectors * total_partitions) + + partition_x.num_vectors = num_vectors + partition_x.offset = int(partition_index * partition_x.num_vectors) + # We need to create a new instance of the data set for each client + partition_x.data_set = get_data_set( + self.data_set_format, + self.data_set_path, + self.context + ) + partition_x.data_set.seek(partition_x.offset) + partition_x.current = partition_x.offset + return partition_x + + @abstractmethod + def params(self): + """ + Returns: A single parameter from this source + """ + + +class VectorSearchPartitionPartitionParamSource(VectorDataSetPartitionParamSource): + """ Parameter source for k-NN. Queries are created from data set + provided. + + Attributes: + k: The number of results to return for the search + repetitions: Number of times to re-run query dataset from beginning + """ + PARAMS_NAME_K = "k" + PARAMS_NAME_REPETITIONS = "repetitions" + PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT = "neighbors_data_set_format" + PARAMS_NAME_NEIGHBORS_DATA_SET_PATH = "neighbors_data_set_path" + PARAMS_NAME_OPERATION_TYPE = "operation-type" + PARAMS_VALUE_VECTOR_SEARCH = "vector-search" + PARAMS_NAME_ID_FIELD_NAME = "id_field_name" + + def __init__(self, params, query_params): + super().__init__(params, Context.QUERY) + self.k = parse_int_parameter(self.PARAMS_NAME_K, params) + self.repetitions = parse_int_parameter(self.PARAMS_NAME_REPETITIONS, params, 1) + self.current_rep = 1 + self.neighbors_data_set_format = parse_string_parameter( + self.PARAMS_NAME_NEIGHBORS_DATA_SET_FORMAT, params, self.data_set_format) + self.neighbors_data_set_path = parse_string_parameter( + self.PARAMS_NAME_NEIGHBORS_DATA_SET_PATH, params, self.data_set_path) + self.neighbors_data_set: DataSet = get_data_set( + self.neighbors_data_set_format, self.neighbors_data_set_path, Context.NEIGHBORS) + operation_type = parse_string_parameter(self.PARAMS_NAME_OPERATION_TYPE, params, + self.PARAMS_VALUE_VECTOR_SEARCH) + self.query_params = query_params + self.query_params.update({ + self.PARAMS_NAME_K: self.k, + self.PARAMS_NAME_OPERATION_TYPE: operation_type, + self.PARAMS_NAME_ID_FIELD_NAME: params.get(self.PARAMS_NAME_ID_FIELD_NAME), + }) + + def params(self): + """ + Returns: A query parameter with a vector from a data set + """ + is_dataset_exhausted = self.current >= self.num_vectors + self.offset + + if is_dataset_exhausted and self.current_rep < self.repetitions: + self.data_set.seek(self.offset) + self.current = self.offset + self.current_rep += 1 + elif is_dataset_exhausted: + raise StopIteration + vector = self.data_set.read(1)[0] + neighbor = self.neighbors_data_set.read(1)[0] + true_neighbors = list(map(str, neighbor[:self.k])) + self.query_params.update({ + "neighbors": true_neighbors, + "request-params": { + "_source": "false", + # we need to set it to true as this data source is used for actual queries + "allow_partial_search_results": "false" + } + }) + self.query_params.update({ + "body": self._build_vector_search_query_body(self.field_name, vector)}) + self.current += 1 + self.percent_completed = self.current / self.total + return self.query_params + + def _build_vector_search_query_body(self, field_name: str, vector) -> dict: + """Builds a k-NN request that can be used to execute an approximate nearest + neighbor search against a k-NN plugin index + Args: + field_name: name of field to search + vector: vector used for query + Returns: + A dictionary containing the body used for search query + """ + return { + "size": self.k, + "query": { + "knn": { + field_name: { + "vector": vector, + "k": self.k + } + } + } + } + + def get_target(workload, params): if len(workload.indices) == 1: default_target = workload.indices[0].name @@ -1215,6 +1394,7 @@ def read_bulk(self): register_param_source_for_operation(workload.OperationType.Bulk, BulkIndexParamSource) register_param_source_for_operation(workload.OperationType.Search, SearchParamSource) +register_param_source_for_operation(workload.OperationType.VectorSearch, VectorSearchParamSource) register_param_source_for_operation(workload.OperationType.CreateIndex, CreateIndexParamSource) register_param_source_for_operation(workload.OperationType.DeleteIndex, DeleteIndexParamSource) register_param_source_for_operation(workload.OperationType.CreateDataStream, CreateDataStreamParamSource) diff --git a/tests/workload/params_test.py b/tests/workload/params_test.py index 6505f8648..0c4c705ea 100644 --- a/tests/workload/params_test.py +++ b/tests/workload/params_test.py @@ -24,11 +24,20 @@ # pylint: disable=protected-access import random +import shutil +import tempfile from unittest import TestCase +import numpy as np + from osbenchmark import exceptions -from osbenchmark.workload import params, workload from osbenchmark.utils import io +from osbenchmark.utils.dataset import Context, HDF5DataSet +from osbenchmark.utils.parse import ConfigurationError +from osbenchmark.workload import params, workload +from osbenchmark.workload.params import VectorDataSetPartitionParamSource, VectorSearchPartitionPartitionParamSource +from tests.utils.dataset_helper import create_data_set +from tests.utils.dataset_test import DEFAULT_NUM_VECTORS class StaticBulkReader: @@ -2476,3 +2485,236 @@ def test_force_merge_all_params(self): self.assertEqual(30, p["request-timeout"]) self.assertEqual(1, p["max-num-segments"]) self.assertEqual("polling", p["mode"]) + + +class VectorSearchParamSourceTests(TestCase): + DEFAULT_INDEX_NAME = "test-index" + DEFAULT_FIELD_NAME = "test-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + # Create a data set we know to be valid for convenience + self.valid_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_missing_params(self): + empty_params = dict() + self.assertRaises( + ConfigurationError, + lambda: self.TestVectorsFromDataSetParamSource( + empty_params, VectorSearchParamSourceTests.DEFAULT_CONTEXT) + ) + + def test_invalid_data_set_format(self): + invalid_data_set_format = "invalid-data-set-format" + + test_param_source_params = { + "index": VectorSearchParamSourceTests.DEFAULT_INDEX_NAME, + "field": VectorSearchParamSourceTests.DEFAULT_FIELD_NAME, + "data_set_format": invalid_data_set_format, + "data_set_path": self.valid_data_set_path, + } + self.assertRaises( + ConfigurationError, + lambda: self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + ) + + def test_invalid_data_set_path(self): + invalid_data_set_path = "invalid-data-set-path" + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": invalid_data_set_path, + } + self.assertRaises( + FileNotFoundError, + lambda: self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + ) + + def test_partition_hdf5(self): + num_vectors = 100 + + hdf5_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + HDF5DataSet.FORMAT_NAME, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": HDF5DataSet.FORMAT_NAME, + "data_set_path": hdf5_data_set_path, + } + test_param_source = self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + + num_partitions = 10 + vectors_per_partition = test_param_source.num_vectors // num_partitions + + self._test_partition( + test_param_source, + num_partitions, + vectors_per_partition + ) + + def test_partition_bigann(self): + num_vectors = 100 + float_extension = "fbin" + + bigann_data_set_path = create_data_set( + num_vectors, + self.DEFAULT_DIMENSION, + float_extension, + self.DEFAULT_CONTEXT, + self.data_set_dir + ) + + test_param_source_params = { + "index": self.DEFAULT_INDEX_NAME, + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": "bigann", + "data_set_path": bigann_data_set_path, + } + test_param_source = self.TestVectorsFromDataSetParamSource( + test_param_source_params, + self.DEFAULT_CONTEXT + ) + + num_partitions = 10 + vecs_per_partition = test_param_source.num_vectors // num_partitions + + self._test_partition( + test_param_source, + num_partitions, + vecs_per_partition + ) + + def _test_partition( + self, + test_param_source: VectorDataSetPartitionParamSource, + num_partitions: int, + vec_per_partition: int + ): + for i in range(num_partitions): + test_param_source_i = test_param_source.partition(i, num_partitions) + self.assertEqual(test_param_source_i.num_vectors, vec_per_partition) + self.assertEqual(test_param_source_i.offset, i * vec_per_partition) + + class TestVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource): + """ + Empty implementation of ABC VectorsFromDataSetParamSource so that we can + test the concrete methods. + """ + + def params(self): + pass + + +class VectorSearchPartitionPartitionParamSourceTestCase(TestCase): + + DEFAULT_INDEX_NAME = "test-partition-index" + DEFAULT_FIELD_NAME = "test-vector-field" + DEFAULT_CONTEXT = Context.INDEX + DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME + DEFAULT_NUM_VECTORS = 10 + DEFAULT_DIMENSION = 10 + DEFAULT_RANDOM_STRING_LENGTH = 8 + + def setUp(self) -> None: + self.data_set_dir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.data_set_dir) + + def test_params(self): + # Create a data set + k = 12 + data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.QUERY, + self.data_set_dir + ) + neighbors_data_set_path = create_data_set( + self.DEFAULT_NUM_VECTORS, + self.DEFAULT_DIMENSION, + self.DEFAULT_TYPE, + Context.NEIGHBORS, + self.data_set_dir + ) + + # Create a QueryVectorsFromDataSetParamSource with relevant params + test_param_source_params = { + "field": self.DEFAULT_FIELD_NAME, + "data_set_format": self.DEFAULT_TYPE, + "data_set_path": data_set_path, + "neighbors_data_set_path": neighbors_data_set_path, + "k": k, + } + query_param_source = VectorSearchPartitionPartitionParamSource( + test_param_source_params, {"index": self.DEFAULT_INDEX_NAME} + ) + + # Check each + for _ in range(DEFAULT_NUM_VECTORS): + self._check_params( + query_param_source.params(), + self.DEFAULT_FIELD_NAME, + self.DEFAULT_DIMENSION, + k + ) + + # Assert last call creates stop iteration + with self.assertRaises(StopIteration): + query_param_source.params() + + def _check_params( + self, + params: dict, + expected_field: str, + expected_dimension: int, + expected_k: int + ): + body = params.get("body") + self.assertIsInstance(body, dict) + query = body.get("query") + self.assertIsInstance(query, dict) + query_knn = query.get("knn") + self.assertIsInstance(query_knn, dict) + field = query_knn.get(expected_field) + self.assertIsInstance(field, dict) + vector = field.get("vector") + self.assertIsInstance(vector, np.ndarray) + self.assertEqual(len(list(vector)), expected_dimension) + k = field.get("k") + self.assertEqual(k, expected_k) + neighbor = params.get("neighbors") + self.assertIsInstance(neighbor, list) + self.assertEqual(len(neighbor), expected_dimension)