Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graftlets periodic task support. #232

Open
wants to merge 2 commits into
base: feature/some-improvements
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions graftlets/TestGraftlet.cpp
Original file line number Diff line number Diff line change
@@ -56,6 +56,25 @@ class TestGraftlet: public IGraftlet
return graft::Status::Ok;
}

static std::string value;
static int count;

std::string resetPeriodic(const std::string& val)
{
std::string res;
res.swap(value);
count = 0;
value = val;
return res;
}

graft::Status testPeriodic(const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)
{
bool stop = value.empty();
value = "count " + std::to_string(++count);
return (stop)? graft::Status::Stop : graft::Status::Ok;
}

virtual void initOnce(const graft::CommonOpts& opts) override
{
// REGISTER_ACTION(TestGraftlet, testUndefined);
@@ -66,6 +85,10 @@ class TestGraftlet: public IGraftlet

REGISTER_ENDPOINT("/URI/test/{id:[0-9]+}", METHOD_GET | METHOD_POST, TestGraftlet, testHandler);
REGISTER_ENDPOINT("/URI/test1/{id:[0-9]+}", METHOD_GET | METHOD_POST, TestGraftlet, testHandler1);

REGISTER_ACTION(TestGraftlet, resetPeriodic);
//Type, method, int interval_ms, int initial_interval_ms, double random_factor
REGISTER_PERIODIC(TestGraftlet, testPeriodic, 100, 100, 0);
}
};

@@ -75,6 +98,9 @@ GRAFTLET_EXPORTS_END

GRAFTLET_PLUGIN_DEFAULT_CHECK_FW_VERSION(GRAFTLET_MKVER(0,3))

std::string TestGraftlet::value;
int TestGraftlet::count = 0;

namespace
{

34 changes: 34 additions & 0 deletions include/lib/graft/GraftletLoader.h
Original file line number Diff line number Diff line change
@@ -97,6 +97,14 @@ class GraftletHandlerT
struct helperSign<Sign> h(this);
return h.invoke(cls_method, std::forward<Args>(args)...);
}

BaseT* getClass(const ClsName& cls)
{
auto it = m_cls2any.find(cls);
if(it == m_cls2any.end()) throw std::runtime_error("Cannot find graftlet class name:" + cls);
std::shared_ptr<BaseT> concreteGraftlet = std::any_cast<std::shared_ptr<BaseT>>(it->second);
return concreteGraftlet.get();
}
};

class GraftletLoader
@@ -126,6 +134,11 @@ class GraftletLoader
return getEndpointsT<IGraftlet>();
}

typename IGraftlet::PeriodicVec getPeriodics()
{
return getPeriodicsT<IGraftlet>();
}

class DependencyGraph;
friend class GraftletLoader::DependencyGraph;
private:
@@ -158,6 +171,27 @@ class GraftletLoader
}
}

template <class BaseT>
typename BaseT::PeriodicVec getPeriodicsT()
{
prepareAllEndpoints<BaseT>();

typename BaseT::PeriodicVec res;
for(auto& it0 : m_name2gls)
{
if(it0.first.second != std::type_index(typeid(BaseT))) continue;
std::map<ClsName, std::any>& map = it0.second;
for(auto& it1 : map)
{
//TODO: remove shared_ptr, it does not hold something now
std::shared_ptr<BaseT> concreteGraftlet = std::any_cast<std::shared_ptr<BaseT>>(it1.second);
typename BaseT::PeriodicVec vec = concreteGraftlet->getPeriodics();
res.insert(res.end(), vec.begin(), vec.end());
}
}
return res;
}

template <class BaseT>
typename BaseT::EndpointsVec getEndpointsT()
{
68 changes: 60 additions & 8 deletions include/lib/graft/IGraftlet.h
Original file line number Diff line number Diff line change
@@ -20,14 +20,26 @@
#define REGISTER_ENDPOINT(Endpoint, Methods, T, f) \
register_endpoint_memf(#f, this, &T::f, Endpoint, Methods)

#define REGISTER_PERIODIC(T, f, interval_ms, initial_interval_ms, random_factor) \
register_periodic_memf("#"#f, this, &T::f, interval_ms, initial_interval_ms, random_factor)

class IGraftlet
{
public:
struct Periodic
{
graft::Router::Handler handler;
int interval_ms;
int initial_interval_ms;
double random_factor;
};

using ClsName = std::string;
using FuncName = std::string;
using EndpointPath = std::string;
using Methods = int;
using EndpointsVec = std::vector< std::tuple<EndpointPath, Methods, graft::Router::Handler> >;
using PeriodicVec = std::vector< Periodic >;

IGraftlet() = delete;
virtual ~IGraftlet() = default;
@@ -64,6 +76,24 @@ class IGraftlet
return res;
}

PeriodicVec getPeriodics()
{
PeriodicVec res;
std::type_index ti = std::type_index(typeid(Periodic));
for(auto& it : m_map)
{
TypeIndex2any& ti2any = it.second;
auto it1 = ti2any.find(ti);
if(it1 == ti2any.end()) continue;

std::any& any = std::get<0>(it1->second);
Periodic periodic = std::any_cast<Periodic>(any);

res.emplace_back(std::move(periodic));
}
return res;
}

template <typename Res, typename...Ts, typename = Res(Ts...), typename...Args>
Res invoke(const FuncName& name, Args&&...args)
{
@@ -83,23 +113,29 @@ class IGraftlet
return callable(std::forward<Args>(args)...);
}

//It can be used to register any callable object like a function, to register member function use register_handler_memf
template<typename Res, typename...Ts, typename Callable = Res (Ts...)>
void register_handler(const FuncName& name, Callable callable, const EndpointPath& endpoint = EndpointPath(), Methods methods = 0)
template<typename Obj>
void register_obj(const FuncName& name, Obj&& obj, const EndpointPath& endpoint = EndpointPath(), Methods methods = 0)
{
std::type_index ti = std::type_index(typeid(Callable));
std::type_index ti = std::type_index(typeid(Obj));
TypeIndex2any& ti2any = m_map[name];
std::any any = std::make_any<Callable>(callable);
assert(any.type().hash_code() == typeid(callable).hash_code());
std::any any = std::make_any<Obj>(std::forward<Obj>(obj));
assert(any.type().hash_code() == typeid(Obj).hash_code());

std::ostringstream oss;
if(!endpoint.empty())
{
oss << " '" << endpoint << "' " << graft::Router::methodsToString(methods);
}
LOG_PRINT_L2("register_handler " << name << oss.str() << " of " << typeid(callable).name());
LOG_PRINT_L2("register_obj " << name << oss.str() << " of " << typeid(Obj).name());
auto res = ti2any.emplace(ti, std::make_tuple(std::move(any), endpoint, methods) );
if(!res.second) throw std::runtime_error("function " + name + " with typeid " + ti.name() + " already registered");
if(!res.second) throw std::runtime_error("object " + name + " with typeid " + ti.name() + " already registered");
}

//It can be used to register any callable object like a function, to register member function use register_handler_memf
template<typename Res, typename...Ts, typename Callable = Res (Ts...)>
void register_handler(const FuncName& name, Callable callable, const EndpointPath& endpoint = EndpointPath(), Methods methods = 0)
{
register_obj(name, callable, endpoint, methods);
}

template<typename Obj, typename Res, typename...Ts>
@@ -129,6 +165,22 @@ class IGraftlet
};
register_endpoint(name, fun, endpoint, methods);
}

template<typename Obj>
void register_periodic_memf(const FuncName& name, Obj* p
, graft::Status (Obj::*f)(const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)
, int interval_ms, int initial_interval_ms, double random_factor )
{
std::function<graft::Status (Obj*,const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)> memf = f;
graft::Router::Handler fun =
[p,memf](const graft::Router::vars_t& vars, const graft::Input& input, graft::Context& ctx, graft::Output& output)->graft::Status
{
return memf(p,vars,input,ctx,output);
};
Periodic periodic { fun, interval_ms, initial_interval_ms, random_factor };
register_obj(name, std::move(periodic));
}

protected:
IGraftlet(const ClsName& name = ClsName() ) : m_clsName(name) { }
virtual void initOnce(const graft::CommonOpts& opts) = 0;
1 change: 1 addition & 0 deletions include/supernode/server.h
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ class GraftServer
void addGlobalCtxCleaner();
void initGraftlets();
void initGraftletRouters();
void initGraftletPeriodics();

ConfigOpts& getCopts();

15 changes: 15 additions & 0 deletions src/supernode/server.cpp
Original file line number Diff line number Diff line change
@@ -109,6 +109,19 @@ void GraftServer::initGraftletRouters()
}
}

void GraftServer::initGraftletPeriodics()
{
assert(m_graftletLoader);
IGraftlet::PeriodicVec periodics = m_graftletLoader->getPeriodics();
for(IGraftlet::Periodic& p : periodics)
{
getLooper().addPeriodicTask(p.handler, std::chrono::milliseconds( p.interval_ms ),
std::chrono::milliseconds( p.initial_interval_ms ),
p.random_factor);
}
}


void GraftServer::initGlobalContext()
{
// TODO: why context intialized second time here?
@@ -176,6 +189,8 @@ bool GraftServer::init(int argc, const char** argv, ConfigOpts& configOpts)

m_connectionBase->bindConnectionManagers();

initGraftletPeriodics();

return true;
}

21 changes: 21 additions & 0 deletions test/graftlets_test.cpp
Original file line number Diff line number Diff line change
@@ -362,3 +362,24 @@ TEST_F(GraftServerTest, graftlets)

stop_and_wait_for();
}

TEST_F(GraftServerTest, graftletsPeriodic)
{
m_copts.graftlet_dirs.emplace_back("graftlets");
m_copts.timer_poll_interval_ms = 50;

graft::CommonOpts opts;
graftlet::GraftletLoader loader(opts);
loader.findGraftletsInDirectory("./graftlets", "so");

graftlet::GraftletHandler plugin = loader.buildAndResolveGraftlet("myGraftlet");
plugin.invoke<std::string (const std::string& val)>("testGL.resetPeriodic", " ");

run();

std::this_thread::sleep_for(std::chrono::milliseconds(1050));
std::string res = plugin.invoke<std::string (const std::string& val)>("testGL.resetPeriodic", "");
EXPECT_EQ(res, "count " + std::to_string(10));

stop_and_wait_for();
}