Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batch_update #2100

Merged
merged 39 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f583136
Fix compilation errors
vasil-pashov Dec 12, 2024
1242013
Add async_update_impl and update_impl functions
vasil-pashov Dec 13, 2024
05b489f
Async version of rewrite_partial_segment
vasil-pashov Dec 17, 2024
6c0d5c2
Make reading of keys parallel in update
vasil-pashov Dec 17, 2024
bc55784
WIP async_update
vasil-pashov Dec 18, 2024
6443c08
Async update compiling and tests passing
vasil-pashov Dec 19, 2024
646cc1d
Change chaining structure
vasil-pashov Dec 19, 2024
229d597
Use thenValueInline in async_get_index_reader
vasil-pashov Dec 19, 2024
7043683
Use const ref for is_timeseries_index
vasil-pashov Dec 19, 2024
d87193e
Use then value inline for filtering existing slices
vasil-pashov Dec 19, 2024
3a9cec2
Replace variable with fn call
vasil-pashov Dec 19, 2024
27cca12
Merge branch 'master' into vasil.pashov/batch_read
vasil-pashov Dec 19, 2024
083f4be
Fix off-by-one error in computing index intersection with the update …
vasil-pashov Dec 20, 2024
c459c73
Fix warning in relase due to debug macro
vasil-pashov Dec 20, 2024
05e7965
Fix compilation errors
vasil-pashov Dec 20, 2024
7a703a1
Address review comments
vasil-pashov Jan 2, 2025
f62916d
Address review comments
vasil-pashov Jan 2, 2025
41fbd72
Perater batch_update interface
vasil-pashov Jan 2, 2025
3b15ed7
Return the modified metadata vector from _generate_batch_vectors_for_…
vasil-pashov Jan 3, 2025
f3760c6
Add update payload. Properly propagate python data to the C++ layer
vasil-pashov Jan 3, 2025
7a43745
Initial batch_update implementation with one hapy-path test passing
vasil-pashov Jan 3, 2025
54dbe69
Merge branch 'master' into vasil.pashov/batch_update_2
vasil-pashov Jan 21, 2025
b1f0409
Use const reference instead of const pointer in get_keys_affected_by_…
vasil-pashov Jan 21, 2025
5091653
Finish unit tests for batch update
vasil-pashov Jan 22, 2025
3361ea3
Add docs
vasil-pashov Jan 22, 2025
33cb0e3
Merge branch 'master' into vasil.pashov/batch_update_2
vasil-pashov Jan 22, 2025
f3f319b
Remove batch_update from Library V1 non reg tests. There is no batch_…
vasil-pashov Jan 22, 2025
797dcad
Fix copy/paste naming error
vasil-pashov Jan 22, 2025
d7dbecf
Remove comma in named tuple
vasil-pashov Jan 22, 2025
1c51f12
Properly handle categoricals
vasil-pashov Jan 23, 2025
837299e
Improve exception message for missing symbol on update. Mention the u…
vasil-pashov Jan 23, 2025
034849d
Refactor the processing of all batch_* results
vasil-pashov Jan 24, 2025
0366f2f
Do not inherit from NamedTuple for UpdatePayload
vasil-pashov Jan 24, 2025
7f33b60
Fix read batch metadata
vasil-pashov Jan 24, 2025
3001495
Address review comments
vasil-pashov Jan 24, 2025
c27ec26
Add tests for metadata and date_range with batch update
vasil-pashov Jan 27, 2025
6d4e88c
Address review comments
vasil-pashov Jan 27, 2025
f396395
Fix batch_read_descriptor_internal
vasil-pashov Jan 27, 2025
0e0a49a
Change the structure of flags
vasil-pashov Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cpp/arcticdb/stream/test/stream_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

#include <folly/gen/Base.h>
#include <folly/futures/Future.h>

#include <filesystem>
#include <span>
#include <string>
Expand Down
264 changes: 140 additions & 124 deletions cpp/arcticdb/version/local_versioned_engine.cpp

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion cpp/arcticdb/version/local_versioned_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <arcticdb/entity/descriptor_item.hpp>
#include <arcticdb/entity/data_error.hpp>

#include <sstream>
namespace arcticdb::version_store {

/**
Expand Down Expand Up @@ -277,6 +276,13 @@ class LocalVersionedEngine : public VersionedEngine {
bool upsert,
bool throw_on_error);

std::vector<std::variant<VersionedItem, DataError>> batch_update_internal(
const std::vector<StreamId>& stream_ids,
std::vector<std::shared_ptr<InputTensorFrame>>&& frames,
const std::vector<UpdateQuery>& update_queries,
bool prune_previous_versions,
bool upsert);

std::vector<ReadVersionOutput> batch_read_keys(const std::vector<AtomKey> &keys);

std::vector<std::variant<ReadVersionOutput, DataError>> batch_read_internal(
Expand Down
7 changes: 5 additions & 2 deletions cpp/arcticdb/version/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <arcticdb/python/adapt_read_dataframe.hpp>
#include <arcticdb/version/schema_checks.hpp>
#include <arcticdb/util/pybind_mutex.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>


namespace arcticdb::version_store {
Expand Down Expand Up @@ -300,7 +299,8 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def_property_readonly("error_code", &DataError::error_code)
.def_property_readonly("error_category", &DataError::error_category)
.def_property_readonly("exception_string", &DataError::exception_string)
.def("__str__", &DataError::to_string);
.def("__str__", &DataError::to_string)
.def("__repr__", &DataError::to_string);

// TODO: add repr.
py::class_<VersionedItem>(version, "VersionedItem")
Expand Down Expand Up @@ -784,6 +784,9 @@ void register_bindings(py::module &version, py::exception<arcticdb::ArcticExcept
.def("batch_append",
&PythonVersionStore::batch_append,
py::call_guard<SingleThreadMutexHolder>(), "Batch append to a list of symbols")
.def("batch_update",
&PythonVersionStore::batch_update,
py::call_guard<SingleThreadMutexHolder>(), "Batch update a list of symbols")
.def("batch_restore_version",
[&](PythonVersionStore& v, const std::vector<StreamId>& ids, const std::vector<VersionQuery>& version_queries){
auto results = v.batch_restore_version(ids, version_queries);
Expand Down
57 changes: 32 additions & 25 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ std::vector<SliceAndKey> filter_existing_slices(std::vector<std::optional<SliceA
using IntersectingSegments = std::tuple<std::vector<SliceAndKey>, std::vector<SliceAndKey>>;

[[nodiscard]] folly::Future<IntersectingSegments> async_intersecting_segments(
const std::vector<SliceAndKey>& affected_keys,
std::shared_ptr<std::vector<SliceAndKey>> affected_keys,
const IndexRange& front_range,
const IndexRange& back_range,
VersionId version_id,
Expand All @@ -205,7 +205,7 @@ using IntersectingSegments = std::tuple<std::vector<SliceAndKey>, std::vector<Sl
std::vector<folly::Future<std::optional<SliceAndKey>>> maybe_intersect_before_fut;
std::vector<folly::Future<std::optional<SliceAndKey>>> maybe_intersect_after_fut;

for (const auto& affected_slice_and_key : affected_keys) {
for (const auto& affected_slice_and_key : *affected_keys) {
const auto& affected_range = affected_slice_and_key.key().index_range();
if (intersects(affected_range, front_range) && !overlaps(affected_range, front_range) &&
is_before(affected_range, front_range)) {
Expand Down Expand Up @@ -267,7 +267,7 @@ VersionedItem delete_range_impl(
std::end(affected_keys),
std::back_inserter(unaffected_keys));

auto [intersect_before, intersect_after] = async_intersecting_segments(affected_keys, index_range, index_range, update_info.next_version_id_, store).get();
auto [intersect_before, intersect_after] = async_intersecting_segments(std::make_shared<std::vector<SliceAndKey>>(affected_keys), index_range, index_range, update_info.next_version_id_, store).get();

auto orig_filter_range = std::holds_alternative<std::monostate>(query.row_filter) ? get_query_index_range(index, index_range) : query.row_filter;

Expand Down Expand Up @@ -314,7 +314,12 @@ struct UpdateRanges {
IndexRange original_index_range;
};

static UpdateRanges compute_update_ranges(const FilterRange& row_filter, const InputTensorFrame& update_frame, std::span<SliceAndKey> update_slice_and_keys) {

static UpdateRanges compute_update_ranges(
const FilterRange& row_filter,
const InputTensorFrame& update_frame,
std::span<SliceAndKey> update_slice_and_keys
) {
return util::variant_match(row_filter,
[&](std::monostate) -> UpdateRanges {
util::check(std::holds_alternative<TimeseriesIndex>(update_frame.index), "Update with row count index is not permitted");
Expand Down Expand Up @@ -356,16 +361,18 @@ static void check_can_update(
}

static std::shared_ptr<std::vector<SliceAndKey>> get_keys_affected_by_update(
const index::IndexSegmentReader& index_segment_reader,
const InputTensorFrame& frame,
const UpdateQuery& query,
bool dynamic_schema) {
const index::IndexSegmentReader& index_segment_reader,
const InputTensorFrame& frame,
const UpdateQuery& query,
bool dynamic_schema
) {
std::vector<FilterQuery<index::IndexSegmentReader>> queries = build_update_query_filters<index::IndexSegmentReader>(
query.row_filter,
frame.index,
frame.index_range,
dynamic_schema,
index_segment_reader.bucketize_dynamic());
query.row_filter,
frame.index,
frame.index_range,
dynamic_schema,
index_segment_reader.bucketize_dynamic()
);
return std::make_shared<std::vector<SliceAndKey>>(filter_index(index_segment_reader, combine_filter_functions(queries)));
}

Expand All @@ -383,11 +390,12 @@ static std::vector<SliceAndKey> get_keys_not_affected_by_update(
}

static std::pair<std::vector<SliceAndKey>, size_t> get_slice_and_keys_for_update(
const UpdateRanges& update_ranges,
std::span<const SliceAndKey> unaffected_keys,
std::span<const SliceAndKey> affected_keys,
IntersectingSegments&& segments_intersecting_with_update_range,
std::vector<SliceAndKey>&& new_slice_and_keys) {
const UpdateRanges& update_ranges,
std::span<const SliceAndKey> unaffected_keys,
std::span<const SliceAndKey> affected_keys,
const IntersectingSegments& segments_intersecting_with_update_range,
std::vector<SliceAndKey>&& new_slice_and_keys
) {
const size_t new_keys_size = new_slice_and_keys.size();
size_t row_count = 0;
const std::array<std::vector<SliceAndKey>, 5> groups{
Expand All @@ -409,15 +417,15 @@ folly::Future<AtomKey> async_update_impl(
const UpdateInfo& update_info,
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const WriteOptions& options,
WriteOptions&& options,
bool dynamic_schema,
bool empty_types) {
return index::async_get_index_reader(*(update_info.previous_index_key_), store).thenValue([
store,
update_info,
query,
frame,
options=options,
options=std::move(options),
dynamic_schema,
empty_types
](index::IndexSegmentReader&& index_segment_reader) {
Expand All @@ -441,16 +449,15 @@ folly::Future<AtomKey> async_update_impl(
"The sum of affected keys and unaffected keys must be equal to the total number of keys {} + {} != {}",
affected_keys->size(), unaffected_keys.size(), index_segment_reader.size());
const UpdateRanges update_ranges = compute_update_ranges(query.row_filter, *frame, new_slice_and_keys);

return async_intersecting_segments(
*affected_keys,
affected_keys,
update_ranges.front,
update_ranges.back,
update_info.next_version_id_,
store).thenValue([new_slice_and_keys=std::move(new_slice_and_keys),
update_ranges=update_ranges,
unaffected_keys=std::move(unaffected_keys),
affected_keys=affected_keys,
affected_keys=std::move(affected_keys),
index_segment_reader=std::move(index_segment_reader),
frame,
dynamic_schema,
Expand All @@ -465,7 +472,7 @@ folly::Future<AtomKey> async_update_impl(
auto tsd = index::get_merged_tsd(row_count, dynamic_schema, index_segment_reader.tsd(), frame);
return index::write_index(
index_type_from_descriptor(tsd.as_stream_descriptor()),
tsd,
std::move(tsd),
std::move(flattened_slice_and_keys),
IndexPartialKey{frame->desc.id(), update_info.next_version_id_},
store
Expand All @@ -483,7 +490,7 @@ VersionedItem update_impl(
WriteOptions&& options,
bool dynamic_schema,
bool empty_types) {
auto version_key = async_update_impl(store, update_info, query, frame, options, dynamic_schema, empty_types).get();
auto version_key = async_update_impl(store, update_info, query, frame, std::move(options), dynamic_schema, empty_types).get();
auto versioned_item = VersionedItem(to_atom(std::move(version_key)));
ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_);
return versioned_item;
Expand Down
10 changes: 9 additions & 1 deletion cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <arcticdb/pipeline/write_options.hpp>
#include <arcticdb/async/task_scheduler.hpp>
#include <arcticdb/stream/incompletes.hpp>
#include <arcticdb/pipeline/read_pipeline.hpp>
#include <arcticdb/pipeline/pipeline_context.hpp>
#include <arcticdb/pipeline/read_options.hpp>
#include <arcticdb/entity/atom_key.hpp>
Expand Down Expand Up @@ -98,6 +97,15 @@ VersionedItem update_impl(
bool dynamic_schema,
bool empty_types);

folly::Future<AtomKey> async_update_impl(
const std::shared_ptr<Store>& store,
const UpdateInfo& update_info,
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
WriteOptions&& options,
bool dynamic_schema,
bool empty_types);

VersionedItem delete_range_impl(
const std::shared_ptr<Store>& store,
const StreamId& stream_id,
Expand Down
13 changes: 13 additions & 0 deletions cpp/arcticdb/version/version_store_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,19 @@ std::vector<std::variant<ReadResult, DataError>> PythonVersionStore::batch_read(
return res;
}

std::vector<std::variant<VersionedItem, DataError>> PythonVersionStore::batch_update(
const std::vector<StreamId>& stream_ids,
const std::vector<py::tuple>& items,
const std::vector<py::object>& norms,
const std::vector<py::object>& user_metas,
const std::vector<UpdateQuery>& update_qeries,
bool prune_previous_versions,
bool upsert
) {
auto frames = create_input_tensor_frames(stream_ids, items, norms, user_metas, cfg().write_options().empty_types());
return batch_update_internal(stream_ids, std::move(frames), update_qeries, prune_previous_versions, upsert);
}

void PythonVersionStore::delete_snapshot(const SnapshotId& snap_name) {
ARCTICDB_RUNTIME_DEBUG(log::version(), "Command: delete_snapshot");
auto opt_snapshot = get_snapshot(store(), snap_name);
Expand Down
19 changes: 9 additions & 10 deletions cpp/arcticdb/version/version_store_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,21 @@
#include <arcticdb/entity/data_error.hpp>
#include <arcticdb/entity/types.hpp>
#include <arcticdb/stream/index.hpp>
#include <arcticdb/util/timeouts.hpp>
#include <arcticdb/util/variant.hpp>
#include <pybind11/pybind11.h>
#include <arcticdb/python/python_utils.hpp>
#include <arcticdb/async/async_store.hpp>
#include <arcticdb/version/version_map.hpp>
#include <arcticdb/version/snapshot.hpp>
#include <arcticdb/version/symbol_list.hpp>
#include <arcticdb/entity/protobufs.hpp>
#include <arcticdb/pipeline/column_stats.hpp>
#include <arcticdb/pipeline/write_options.hpp>
#include <arcticdb/entity/versioned_item.hpp>
#include <arcticdb/pipeline/query.hpp>
#include <arcticdb/pipeline/slicing.hpp>
#include <arcticdb/pipeline/read_pipeline.hpp>
#include <arcticdb/pipeline/query.hpp>
#include <arcticdb/pipeline/read_options.hpp>
#include <arcticdb/stream/incompletes.hpp>
#include <arcticdb/version/version_core.hpp>
#include <arcticdb/version/local_versioned_engine.hpp>
#include <arcticdb/entity/read_result.hpp>
#include <arcticdb/version/version_log.hpp>

#include <type_traits>
#include <iostream>

namespace arcticdb::version_store {

Expand Down Expand Up @@ -299,6 +289,15 @@ class PythonVersionStore : public LocalVersionedEngine {
const ReadOptions& read_options,
std::any& handler_data);

std::vector<std::variant<VersionedItem, DataError>> batch_update(
const std::vector<StreamId>& stream_ids,
const std::vector<py::tuple>& items,
const std::vector<py::object>& norms,
const std::vector<py::object>& user_metas,
const std::vector<UpdateQuery>& update_qeries,
bool prune_previous_versions,
bool upsert);

std::vector<std::variant<std::pair<VersionedItem, py::object>, DataError>> batch_read_metadata(
const std::vector<StreamId>& stream_ids,
const std::vector<VersionQuery>& version_queries,
Expand Down
Loading
Loading