Skip to content

Commit

Permalink
ssx: create single_sharded<Service> template
Browse files Browse the repository at this point in the history
For services running on one shard but exposing invocation helper
to callers from other shards.
  • Loading branch information
bashtanov committed Jun 21, 2024
1 parent 03ccde6 commit 4e3f3c6
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 0 deletions.
167 changes: 167 additions & 0 deletions src/v/ssx/include/ssx/single_sharded.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright 2024 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "base/seastarx.h"

#include <seastar/core/future.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>

#include <limits>
#include <optional>
#include <utility>

namespace ssx {

/// Template helper to run a real service service on one core and an small dummy
/// on other cores.
///
/// \tparam Service a class to be instantiated on each core. Must expose
/// a \c stop() method that returns a \c future<>, to be called when
/// the service is stopped.
template<typename Service>
class maybe_service {
using underlying_t = std::optional<Service>;
underlying_t _underlying;

public:
maybe_service() = default;

template<class... Args>
explicit maybe_service(bool start, Args&&... args) {
if (start) {
_underlying.emplace(std::forward<Args>(args)...);
}
}

ss::future<> stop() {
return _underlying ? _underlying->stop() : ss::make_ready_future();
}

// the ones below are only safe to be called on the shards where the service
// has started

constexpr const Service* operator->() const noexcept {
return &*_underlying;
}
constexpr Service* operator->() noexcept { return &*_underlying; }

constexpr const Service& operator*() const& noexcept {
return *_underlying;
}
constexpr Service& operator*() & noexcept { return *_underlying; }
constexpr const Service&& operator*() const&& noexcept {
return &*_underlying;
}
constexpr Service&& operator*() && noexcept { return &*_underlying; }
};

/// Template helper to run a real service service on one core and an small dummy
/// on other cores.
///
/// \tparam Service a class to be instantiated on the core. Must expose
/// a \c stop() method that returns a \c future<>, to be called when
/// the service is stopped.
template<typename Service>
class single_sharded : ss::sharded<maybe_service<Service>> {
using base = ss::sharded<maybe_service<Service>>;
// init with most insane value to maximize the chances to blow up with
// segfault/asan if used; it shouldn't be used though
ss::shard_id _shard{std::numeric_limits<ss::shard_id>::max()};

public:
/// Constructs an empty \c single_sharded object. No instances of the
/// service are created.
single_sharded() noexcept = default;
single_sharded(const single_sharded& other) = delete;
single_sharded& operator=(const single_sharded& other) = delete;
/// Sharded object with T that inherits from peering_sharded_service
/// cannot be moved safely, so disable move operations.
single_sharded(single_sharded&& other) = delete;
single_sharded& operator=(single_sharded&& other) = delete;
/// Destroys a \c single_sharded object. Must not be in a started state.
~single_sharded() = default;

/// Starts \c Service by constructing an instance on the specified logical
/// core with a copy of \c args passed to the constructor.
///
/// \param shard Which shard to start on
/// \param args Arguments to be forwarded to \c Service constructor
/// \return a \ref seastar::future<> that becomes ready when the instance
/// has been constructed.
template<typename... Args>
ss::future<> start_on(ss::shard_id shard, Args&&... args) noexcept {
_shard = shard;
return base::start(
ss::sharded_parameter(
[shard]() { return ss::this_shard_id() == shard; }),
std::forward<Args>(args)...);
}

/// Stops the started instance and destroys it.
///
/// For the instance, if it has started, its \c stop() method is called,
/// and then it is destroyed.
using base::stop;

/// Invoke a callable on the instance of `Service`.
///
/// \param options the options to forward to the \ref smp::submit_to()
/// called behind the scenes.
/// \param func a callable with signature `Value (Service&, Args...)` or
/// `future<Value> (Service&, Args...)` (for some `Value` type), or
/// a pointer to a member function of Service
/// \param args parameters to the callable; will be copied or moved. To
/// pass by reference, use std::ref().
///
/// \return result of calling `func(instance)` on the instance
template<
typename Func,
typename... Args,
typename Ret
= ss::futurize_t<std::invoke_result_t<Func, Service&, Args...>>>
requires std::invocable<Func, Service&, Args&&...>
Ret invoke_on_instance(
ss::smp_submit_to_options options, Func&& func, Args&&... args) {
return base::invoke_on(
_shard,
options,
[func = std::forward<Func>(func)](
maybe_service<Service>& maybe_service, Args&&... args) {
func(*maybe_service, std::forward<Args>(args)...);
},
std::forward<Args>(args)...);
}

/// Invoke a callable on the instance of `Service`.
///
/// \param func a callable with signature `Value (Service&)` or
/// `future<Value> (Service&)` (for some `Value` type), or a
/// pointer to a member function of Service
/// \param args parameters to the callable
/// \return result of calling `func(instance)` on the instance
template<
typename Func,
typename... Args,
typename Ret
= ss::futurize_t<std::invoke_result_t<Func, Service&, Args&&...>>>
requires std::invocable<Func, Service&, Args&&...>
Ret invoke_on_instance(Func&& func, Args&&... args) {
return invoke_on_instance(
ss::smp_submit_to_options(),
std::forward<Func>(func),
std::forward<Args>(args)...);
}
};

} // namespace ssx
1 change: 1 addition & 0 deletions src/v/ssx/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rp_test(
SOURCES
abort_source_test.cc
sharded_ptr_test.cc
single_sharded.cc
LIBRARIES v::seastar_testing_main
ARGS "-- -c 2"
LABELS ssx
Expand Down
113 changes: 113 additions & 0 deletions src/v/ssx/tests/single_sharded.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#include "ssx/single_sharded.h"

#include <seastar/core/coroutine.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shard_id.hh>
#include <seastar/core/sharded.hh>
#include <seastar/testing/thread_test_case.hh>

#include <boost/test/tools/old/interface.hpp>

#include <memory>

struct counter {
int started = 0;
int called_foo = 0;
int called_bar = 0;
int stopped = 0;
ss::future<> stop() { return ss::make_ready_future(); }
};

struct single_service {
counter& cntr;
single_service(counter& cntr, ss::sharded<counter>& cntrs, int a)
: cntr(cntr) {
BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local());
BOOST_REQUIRE_EQUAL(a, 1);
++cntr.started;
}
void foo(ss::sharded<counter>& cntrs, int a, int&& b, int& c) {
BOOST_REQUIRE_EQUAL(&cntr, &cntrs.local());
BOOST_REQUIRE_EQUAL(a, 1);
BOOST_REQUIRE_EQUAL(b, 2);
BOOST_REQUIRE_EQUAL(c, -3);
c = 3;
++cntr.called_foo;
}
void bar(std::vector<int>&& v, std::unique_ptr<int> uptr) {
BOOST_REQUIRE_EQUAL(v.size(), 2);
BOOST_REQUIRE_EQUAL(bool(uptr), true);
++cntr.called_bar;
}
ss::future<> stop() {
++cntr.stopped;
return ss::make_ready_future();
}
};

struct caller {
ssx::single_sharded<single_service>& sngl;
ss::sharded<counter>& cntrs;
explicit caller(
ssx::single_sharded<single_service>& sngl, ss::sharded<counter>& cntrs)
: sngl(sngl)
, cntrs(cntrs) {}
ss::future<> call_twice() {
co_await sngl.invoke_on_instance([this](single_service& sngl_inst) {
int c = -3;
sngl_inst.foo(cntrs, 1, 2, c);
BOOST_REQUIRE_EQUAL(c, 3);
});
co_await sngl.invoke_on_instance(
[](single_service& sngl_inst, std::vector<int>&& v) {
sngl_inst.bar(std::move(v), std::make_unique<int>());
},
std::vector<int>{0, 0});
}
ss::future<> stop() { return ss::make_ready_future(); }
};

SEASTAR_THREAD_TEST_CASE(single_sharded) {
ss::shard_id the_shard = ss::smp::count - 1;

ss::sharded<counter> counters;
ssx::single_sharded<single_service> single;
ss::sharded<caller> callers;

counters.start().get();
single
.start_on(
the_shard,
ss::sharded_parameter(
[&counters]() { return std::ref(counters.local()); }),
std::ref(counters),
1)
.get();
callers.start(std::ref(single), std::ref(counters)).get();

callers.invoke_on_all([](caller& cllr) { return cllr.call_twice(); }).get();

callers.stop().get();
single.stop().get();
counters
.invoke_on_all([the_shard](counter cntr) {
bool on_the_shard = the_shard == ss::this_shard_id();
BOOST_REQUIRE_EQUAL(cntr.started, on_the_shard ? 1 : 0);
BOOST_REQUIRE_EQUAL(
cntr.called_foo, on_the_shard ? ss::smp::count : 0);
BOOST_REQUIRE_EQUAL(
cntr.called_bar, on_the_shard ? ss::smp::count : 0);
BOOST_REQUIRE_EQUAL(cntr.stopped, on_the_shard ? 1 : 0);
})
.get();
counters.stop().get();
}

0 comments on commit 4e3f3c6

Please sign in to comment.