diff --git a/src/v/ssx/include/ssx/single_sharded.h b/src/v/ssx/include/ssx/single_sharded.h new file mode 100644 index 0000000000000..b0076fad09c96 --- /dev/null +++ b/src/v/ssx/include/ssx/single_sharded.h @@ -0,0 +1,167 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "base/seastarx.h" + +#include +#include +#include + +#include +#include +#include + +namespace ssx { + +/// Template helper to run a real service service on one core and an small dummy +/// on other cores. +/// +/// \tparam Service a class to be instantiated on each core. Must expose +/// a \c stop() method that returns a \c future<>, to be called when +/// the service is stopped. +template +class maybe_service { + using underlying_t = std::optional; + underlying_t _underlying; + +public: + maybe_service() = default; + + template + explicit maybe_service(bool start, Args&&... args) { + if (start) { + _underlying.emplace(std::forward(args)...); + } + } + + ss::future<> stop() { + return _underlying ? _underlying->stop() : ss::make_ready_future(); + } + + // the ones below are only safe to be called on the shards where the service + // has started + + constexpr const Service* operator->() const noexcept { + return &*_underlying; + } + constexpr Service* operator->() noexcept { return &*_underlying; } + + constexpr const Service& operator*() const& noexcept { + return *_underlying; + } + constexpr Service& operator*() & noexcept { return *_underlying; } + constexpr const Service&& operator*() const&& noexcept { + return &*_underlying; + } + constexpr Service&& operator*() && noexcept { return &*_underlying; } +}; + +/// Template helper to run a real service service on one core and an small dummy +/// on other cores. +/// +/// \tparam Service a class to be instantiated on the core. Must expose +/// a \c stop() method that returns a \c future<>, to be called when +/// the service is stopped. +template +class single_sharded : ss::sharded> { + using base = ss::sharded>; + // init with most insane value to maximize the chances to blow up with + // segfault/asan if used; it shouldn't be used though + ss::shard_id _shard{std::numeric_limits::max()}; + +public: + /// Constructs an empty \c single_sharded object. No instances of the + /// service are created. + single_sharded() noexcept = default; + single_sharded(const single_sharded& other) = delete; + single_sharded& operator=(const single_sharded& other) = delete; + /// Sharded object with T that inherits from peering_sharded_service + /// cannot be moved safely, so disable move operations. + single_sharded(single_sharded&& other) = delete; + single_sharded& operator=(single_sharded&& other) = delete; + /// Destroys a \c single_sharded object. Must not be in a started state. + ~single_sharded() = default; + + /// Starts \c Service by constructing an instance on the specified logical + /// core with a copy of \c args passed to the constructor. + /// + /// \param shard Which shard to start on + /// \param args Arguments to be forwarded to \c Service constructor + /// \return a \ref seastar::future<> that becomes ready when the instance + /// has been constructed. + template + ss::future<> start_on(ss::shard_id shard, Args&&... args) noexcept { + _shard = shard; + return base::start( + ss::sharded_parameter( + [shard]() { return ss::this_shard_id() == shard; }), + std::forward(args)...); + } + + /// Stops the started instance and destroys it. + /// + /// For the instance, if it has started, its \c stop() method is called, + /// and then it is destroyed. + using base::stop; + + /// Invoke a callable on the instance of `Service`. + /// + /// \param options the options to forward to the \ref smp::submit_to() + /// called behind the scenes. + /// \param func a callable with signature `Value (Service&, Args...)` or + /// `future (Service&, Args...)` (for some `Value` type), or + /// a pointer to a member function of Service + /// \param args parameters to the callable; will be copied or moved. To + /// pass by reference, use std::ref(). + /// + /// \return result of calling `func(instance)` on the instance + template< + typename Func, + typename... Args, + typename Ret + = ss::futurize_t>> + requires std::invocable + Ret invoke_on_instance( + ss::smp_submit_to_options options, Func&& func, Args&&... args) { + return base::invoke_on( + _shard, + options, + [func = std::forward(func)]( + maybe_service& maybe_service, Args&&... args) { + func(*maybe_service, std::forward(args)...); + }, + std::forward(args)...); + } + + /// Invoke a callable on the instance of `Service`. + /// + /// \param func a callable with signature `Value (Service&)` or + /// `future (Service&)` (for some `Value` type), or a + /// pointer to a member function of Service + /// \param args parameters to the callable + /// \return result of calling `func(instance)` on the instance + template< + typename Func, + typename... Args, + typename Ret + = ss::futurize_t>> + requires std::invocable + Ret invoke_on_instance(Func&& func, Args&&... args) { + return invoke_on_instance( + ss::smp_submit_to_options(), + std::forward(func), + std::forward(args)...); + } +}; + +} // namespace ssx diff --git a/src/v/ssx/tests/CMakeLists.txt b/src/v/ssx/tests/CMakeLists.txt index 668ef3b2dd1d9..f233f4e278cd2 100644 --- a/src/v/ssx/tests/CMakeLists.txt +++ b/src/v/ssx/tests/CMakeLists.txt @@ -32,6 +32,7 @@ rp_test( SOURCES abort_source_test.cc sharded_ptr_test.cc + single_sharded.cc LIBRARIES v::seastar_testing_main ARGS "-- -c 2" LABELS ssx diff --git a/src/v/ssx/tests/single_sharded.cc b/src/v/ssx/tests/single_sharded.cc new file mode 100644 index 0000000000000..57d8a96afd757 --- /dev/null +++ b/src/v/ssx/tests/single_sharded.cc @@ -0,0 +1,113 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "ssx/single_sharded.h" + +#include +#include +#include +#include +#include + +#include + +#include + +struct counter { + int started = 0; + int called_foo = 0; + int called_bar = 0; + int stopped = 0; + ss::future<> stop() { return ss::make_ready_future(); } +}; + +struct single_service { + counter& cntr; + single_service(counter& cntr, ss::sharded& cntrs, int a) + : cntr(cntr) { + BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local()); + BOOST_REQUIRE_EQUAL(a, 1); + ++cntr.started; + } + void foo(ss::sharded& cntrs, int a, int&& b, int& c) { + BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local()); + BOOST_REQUIRE_EQUAL(a, 1); + BOOST_REQUIRE_EQUAL(b, 2); + BOOST_REQUIRE_EQUAL(c, -3); + c = 3; + ++cntr.called_foo; + } + void bar(std::vector&& v, std::unique_ptr uptr) { + BOOST_REQUIRE_EQUAL(v.size(), 2); + BOOST_REQUIRE_EQUAL(bool(uptr), true); + ++cntr.called_bar; + } + ss::future<> stop() { + ++cntr.stopped; + return ss::make_ready_future(); + } +}; + +struct caller { + ssx::single_sharded& sngl; + ss::sharded& cntrs; + explicit caller( + ssx::single_sharded& sngl, ss::sharded& cntrs) + : sngl(sngl) + , cntrs(cntrs) {} + ss::future<> call_twice() { + co_await sngl.invoke_on_instance([this](single_service& sngl_inst) { + int c = -3; + sngl_inst.foo(cntrs, 1, 2, c); + BOOST_REQUIRE_EQUAL(c, 3); + }); + co_await sngl.invoke_on_instance( + [](single_service& sngl_inst, std::vector&& v) { + sngl_inst.bar(std::move(v), std::make_unique()); + }, + std::vector{0, 0}); + } + ss::future<> stop() { return ss::make_ready_future(); } +}; + +SEASTAR_THREAD_TEST_CASE(single_sharded) { + ss::shard_id the_shard = ss::smp::count - 1; + + ss::sharded counters; + ssx::single_sharded single; + ss::sharded callers; + + counters.start().get(); + single + .start_on( + the_shard, + ss::sharded_parameter( + [&counters]() { return std::ref(counters.local()); }), + std::ref(counters), + 1) + .get(); + callers.start(std::ref(single), std::ref(counters)).get(); + + callers.invoke_on_all([](caller& cllr) { return cllr.call_twice(); }).get(); + + callers.stop().get(); + single.stop().get(); + counters + .invoke_on_all([the_shard](counter cntr) { + bool on_the_shard = the_shard == ss::this_shard_id(); + BOOST_REQUIRE_EQUAL(cntr.started, on_the_shard ? 1 : 0); + BOOST_REQUIRE_EQUAL( + cntr.called_foo, on_the_shard ? ss::smp::count : 0); + BOOST_REQUIRE_EQUAL( + cntr.called_bar, on_the_shard ? ss::smp::count : 0); + BOOST_REQUIRE_EQUAL(cntr.stopped, on_the_shard ? 1 : 0); + }) + .get(); + counters.stop().get(); +}