Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection pools #189

Open
wants to merge 15 commits into
base: development/v.0.0.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
cmake_minimum_required(VERSION 3.10)

if(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE RelWithDebInfo)
endif(NOT CMAKE_CONFIGURATION_TYPES AND NOT CMAKE_BUILD_TYPE)

if(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo)
message("==> The configuration is ${CMAKE_BUILD_TYPE}. Debug info will be extracted into separate files.")

function (add_executable _name)
_add_executable(${ARGV})

if (TARGET ${_name})
add_custom_command(TARGET ${_name} POST_BUILD
COMMAND echo "$<TARGET_FILE_NAME:${_name}>: extracting debug info"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --only-keep-debug "$<TARGET_FILE_NAME:${_name}>" "$<TARGET_FILE_NAME:${_name}>.debug"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> strip --strip-debug --strip-unneeded "$<TARGET_FILE_NAME:${_name}>"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --add-gnu-debuglink="$<TARGET_FILE_NAME:${_name}>.debug" "$<TARGET_FILE_NAME:${_name}>"
)
endif()
endfunction()

function (add_library _name _type)
_add_library(${ARGV})

if (TARGET ${_name} AND ${_type} STREQUAL SHARED)
add_custom_command(TARGET ${_name} POST_BUILD
COMMAND echo "$<TARGET_FILE_NAME:${_name}>: extracting debug info"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --only-keep-debug "$<TARGET_FILE_NAME:${_name}>" "$<TARGET_FILE_NAME:${_name}>.debug"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> strip --strip-debug --strip-unneeded "$<TARGET_FILE_NAME:${_name}>"
COMMAND ${CMAKE_COMMAND} -E chdir $<TARGET_FILE_DIR:${_name}> objcopy --add-gnu-debuglink="$<TARGET_FILE_NAME:${_name}>.debug" "$<TARGET_FILE_NAME:${_name}>"
)
endif()
endfunction()

endif(CMAKE_BUILD_TYPE MATCHES RelWithDebInfo)

project(graft_server)

option(OPT_BUILD_TESTS "Build tests." OFF)
Expand Down Expand Up @@ -155,6 +190,7 @@ add_library(graft STATIC
${PROJECT_SOURCE_DIR}/src/lib/graft/mongoosex.cpp
${PROJECT_SOURCE_DIR}/src/lib/graft/router.cpp
${PROJECT_SOURCE_DIR}/src/lib/graft/task.cpp
${PROJECT_SOURCE_DIR}/src/lib/graft/upstream_manager.cpp
${PROJECT_SOURCE_DIR}/modules/mongoose/mongoose.c
${PROJECT_SOURCE_DIR}/src/supernode/server.cpp
${PROJECT_SOURCE_DIR}/src/supernode/supernode.cpp
Expand Down Expand Up @@ -182,7 +218,7 @@ set_target_properties(graft PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/BUILD/libgraft
)

target_compile_definitions(graft PRIVATE MG_ENABLE_COAP=1 -DMONERO_DEFAULT_LOG_CATEGORY="supernode")
target_compile_definitions(graft PRIVATE MG_ENABLE_SYNC_RESOLVER MG_ENABLE_COAP -DMONERO_DEFAULT_LOG_CATEGORY="supernode")
if(ENABLE_SYSLOG)
target_compile_definitions(graft PRIVATE -DELPP_SYSLOG)
endif()
Expand Down Expand Up @@ -273,7 +309,7 @@ set_target_properties(supernode_common PROPERTIES
ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/BUILD/supernode_common
)

target_compile_definitions(supernode_common PRIVATE MG_ENABLE_COAP=1 -DMONERO_DEFAULT_LOG_CATEGORY="supernode")
target_compile_definitions(supernode_common PRIVATE -DMONERO_DEFAULT_LOG_CATEGORY="supernode")
if(ENABLE_SYSLOG)
target_compile_definitions(supernode_common PRIVATE -DELPP_SYSLOG)
endif()
Expand Down Expand Up @@ -369,7 +405,6 @@ if (OPT_BUILD_TESTS)
)


target_compile_definitions(supernode_test PRIVATE MG_ENABLE_COAP=1)
add_dependencies(supernode_test graft supernode_common googletest)
set_target_properties(supernode_test PROPERTIES LINK_FLAGS "-Wl,-E")
if(ENABLE_SYSLOG)
Expand Down
91 changes: 86 additions & 5 deletions data/config.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[cryptonode]
;;rpc-address can be one of [upstream] values, like rpc-address=$default
rpc-address=127.0.0.1:28681
p2p-address=127.0.0.1:18980

Expand Down Expand Up @@ -44,13 +45,93 @@ requests-per-sec=100 ;; maximal amount of requests per second in the window, 0 t
ban-ip-sec=300 ;; time duration in seconds to ban particular IP, 0 to ban forever

[upstream]
;;An entry in [upstream] section is a template with the following
;;format:
;; <upstream-name>=<uri>[,<timeout-seconds>[,{<max-connections>|
;; keepAlive[:<max-connections-per-ip:port>]}]] [;;<comment>]
;;
;;uri - Upstream URI in the form of <protocol>://<host>:<port>/<end-point>.
;;Any part of the URI can be overwritten by the requester.
;;
;;timeout-seconds - inactivity timeout in seconds
;;
;;max-connections - maximum number of connections, associated with
;;the template.
;;
;;keepAlive:<max-connections-per-host:port> - the parameter indicates
;;the associated connections are kept opened after receiving a response.
;;In this case <max-connections-per-host:port> is maximum number of
;;connections, associated with the template, for each host:port pair.
;;
;;Note, if keepAlive is enabled, the connections may stay openned for
;;unlimited time. Therefore parameters <max-connections> and
;;<max-connections-per-ip:port> are mutuly exclusive.
;;
;;Examples:
;;1.
;;mywallet=http://123.456.0.1:38694/something
;;
;;In the handler we can set
;;
;;output.uri = "$mywallet",
;;output.path = "get/your/number",
;;
;;and return Forward. The framework will try to make a connection using
;;resulting URI http://123.456.0.1:38694/get/your/number. The end point
;;/something is effective only if output.path is unset.
;;
;;2.
;;mywallet=http://0.0.0.0:0/answer
;;
;;In the handler:
;;
;;output.uri = "$mywallet",
;;output.host = "123.456.0.1",
;;output.port = 38694
;;
;;The resulting URI will be http://123.456.0.1:38694/answer.
;;
;;3.
;;mywallet=http://0.0.0.0:0/answer, 2, 10
;;
;;In the handler:
;;
;;output.uri = "$mywallet".
;;
;;The handler can set different values for output.host and output.port,
;;but the total number of associated connections cannot exceed 10.
;;Inactivity timeout is set to 2 seconds, thus inactive connections
;;will be closed after 2 seconds.
;;
;;4.
;;mywallet=http://0.0.0.0:0/answer, 2, keepAlive
;;
;;In the handler:
;;
;;output.uri = "$mywallet".
;;output.host = ...
;;output.port = ...
;;
;;Connections to the same host:port will be reused. There are no
;;restrictions for the number of connections.
;;
;;5.
;;mywallet=http://0.0.0.0:80/answer, 2, keepAlive:10
;;
;;In the handler:
;;
;;output.uri = "$mywallet".
;;output.host = "127.0.0.3"
;;
;;Connections to the host will be reused, but the number of connections
;;will not exeed 10.
;;If the handler sets output.host = "127.0.0.4", maximum number of
;;connections for 127.0.0.4:80 is also 10. Note, that the meaning of 10
;;is different from maximum number for non-keepAlive connections.
;;
default=127.0.0.1:28681
blah=https://127.0.0.1:8080
walletnode=http://127.0.0.1:28694
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format is very confusing and not self-explanatory, especially "keepAlive" - why just don't have it as last optional parameter? e.g.:
address[, timeout[, max_connections, [keep_alive]]]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, suggesting following format:

<name>=<uri>[,<timeout-in-seconds>[,<max-active-connections-per-upstream>],max-active-connections-per-address,]]keep-alive]]]

because connections limit and keep alive are unrelated entities and right now keepAlive flag implicitly switches limit's scope (upstream limit vs address (aka unique host:port record))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comma-separated values with variable number of arguments are horrible and non-descriptive. How about:

<name>=<uri>
<name>-timeout=seconds
<name>-max-conns-upstream=count
<name>-max-conns-address=count
<name>-keep-alive=seconds

;format <name>=<uri>[,<max-active-connections>[,<keep-alive>[,<timeout-in-seconds>]]] [;; comment]
; where <keep-alive> - {true | false | 0 | 1} , false by default
; example:
;wallet2=http://127.0.0.1:28694, 10, true, 2.55 ;; example
;walletnode=http://127.0.0.1:28694,cntMax,true/false/1/0 always_open,timeout

[graftlets]
;;dirs parameter, a list of directories to search graftlets separated by colons. If a directory is set relative it will be interpreted both relative to the current directory and relative to the executable location. By default, 'graftlets' directory will be used relative to the executable location.
Expand Down
14 changes: 12 additions & 2 deletions include/lib/graft/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,19 @@ class UpstreamSender : public SelfHolder<UpstreamSender>
: m_bt(bt), m_onDone(onDone), m_keepAlive(true), m_connectioId(connectionId), m_upstream(upstream), m_timeout(timeout)
{ }

UpstreamSender(const BaseTaskPtr& bt, std::string error, std::function<void(UpstreamSender& uss)> onDoneError) : m_bt(bt)
{
setError(Status::Error, error);
onDoneError(*this);
releaseItself();
}

BaseTaskPtr& getTask() { return m_bt; }

void send(TaskManager& manager, const std::string& uri);
void send(mg_mgr* mgr, int http_callback_port, const std::string& uri);
Status getStatus() const { return m_status; }
const std::string& getError() const { return m_error; }
size_t getRequestSize() const { return m_requestSize; }

void ev_handler(mg_connection* upstream, int ev, void *ev_data);
private:
Expand All @@ -94,14 +102,15 @@ class UpstreamSender : public SelfHolder<UpstreamSender>
mg_connection* m_upstream = nullptr;
Status m_status = Status::None;
std::string m_error;
size_t m_requestSize = 0;
};

class ConnectionBase;

class Looper final : public TaskManager
{
public:
Looper(const ConfigOpts& copts, ConnectionBase& connectionBase);
Looper(const ConfigOpts& copts, UpstreamManager& upstreamManager, ConnectionBase& connectionBase);
virtual ~Looper();

void serve();
Expand Down Expand Up @@ -193,6 +202,7 @@ class ConnectionBase final
std::unique_ptr<SysInfoCounter> m_sysInfo;
std::atomic_bool m_looperReady{false};
std::unique_ptr<Looper> m_looper;
std::unique_ptr<UpstreamManager> m_upstreamManager;
std::map<ConnectionManager::Proto, std::unique_ptr<ConnectionManager>> m_conManagers;
};

Expand Down
10 changes: 6 additions & 4 deletions include/lib/graft/inout.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,16 @@ namespace graft
* Set uri, proto, host, port, path members if you need.
* The function forms real URI substituting absent parts according to Config.ini.
* It is public to be accessed from tests and other classes.
* \param default_uri - this parameter always comes from [cryptonode]rpc-address of Config.ini.
* \return
* \param default_uri - one of uri from lines in [upstream] of Config.ini.
* \param ip_port - output of resulting "ip:port".
* \param result_uri - output of resulting uri.
* \param resolve_cache - a cache with resolved host to ip map.
* \return true on success or false otherwise
*/
std::string makeUri(const std::string& default_uri) const;
bool makeUri(const std::string& default_uri, std::string& ip_port, std::string& result_uri, std::unordered_map<std::string,std::string>& resolve_cache) const;

std::string port;
std::string path;
static std::unordered_map<std::string, std::tuple<std::string,int,bool,double>> uri_substitutions;
};

class InHttp final : public InOutHttpBase
Expand Down
3 changes: 3 additions & 0 deletions include/lib/graft/serveropts.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <string>
#include <vector>
#include <unordered_map>
#include <cassert>

namespace graft {
Expand Down Expand Up @@ -30,6 +31,8 @@ struct ConfigOpts
std::vector<std::string> graftlet_dirs;
int lru_timeout_ms;
IPFilterOpts ipfilter;
std::unordered_map<std::string, std::tuple<std::string,int,bool,double>> uri_substitutions;
std::string default_uri_substitution_name;

void check_asserts() const
{
Expand Down
5 changes: 3 additions & 2 deletions include/lib/graft/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ namespace graft
{
extern std::string client_addr(mg_connection* client);
extern std::string client_host(mg_connection* client);
extern unsigned int port_from_uri(const std::string& uri);

class UpstreamSender;
class TaskManager;
Expand Down Expand Up @@ -192,7 +193,7 @@ class UpstreamManager;
class TaskManager : private HandlerAPI
{
public:
TaskManager(const ConfigOpts& copts, SysInfoCounter& sysInfoCounter);
TaskManager(const ConfigOpts& copts, UpstreamManager& upstreamManager, SysInfoCounter& sysInfoCounter);
virtual ~TaskManager();
TaskManager(const TaskManager&) = delete;
TaskManager& operator = (const TaskManager&) = delete;
Expand Down Expand Up @@ -259,6 +260,7 @@ class TaskManager : private HandlerAPI

static inline size_t next_pow2(size_t val);

UpstreamManager& m_upstreamManager;
SysInfoCounter& m_sysInfoCounter;
GlobalContextMap m_gcm;

Expand All @@ -277,7 +279,6 @@ class TaskManager : private HandlerAPI
std::deque<BaseTaskPtr> m_readyToResume;
std::priority_queue<std::pair<std::chrono::time_point<std::chrono::steady_clock>,Context::uuid_t>> m_expireTaskQueue;
std::unique_ptr<ExpiringList> m_futurePostponeUuids;
std::unique_ptr<UpstreamManager> m_upstreamManager;

using PromiseItem = UpstreamTask::PromiseItem;
using PromiseQueue = tp::MPMCBoundedQueue<PromiseItem>;
Expand Down
95 changes: 95 additions & 0 deletions include/lib/graft/upstream_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#pragma once
#include "lib/graft/connection.h"


namespace graft
{

class UpstreamManager
{
public:
using OnDoneCallback = std::function<void(UpstreamSender& uss)>;
UpstreamManager() = default;

void init(const ConfigOpts& copts, mg_mgr* mgr, int http_callback_port, OnDoneCallback onDoneCallback);

bool busy() const
{
return (m_cntUpstreamSender != m_cntUpstreamSenderDone);
}

void send(BaseTaskPtr& bt);
protected:
//testGetUri for test only
const std::string testGetUri(const Output& output);
private:
class ConnItem
{
public:
using ConnectionId = uint64_t;
using IpPort = std::string;
using Uri = std::string;

struct Bunch
{
int m_connCnt = 0;
std::map<mg_connection*, ConnectionId> m_idleConnections;
std::map<ConnectionId, mg_connection*> m_activeConnections;
UpstreamStub m_upstreamStub;
};

ConnItem() = default;
ConnItem(int uriId, const std::string& uri, int maxConnections, bool keepAlive, double timeout)
: m_uriId(uriId)
, m_uri(uri)
, m_maxConnections(maxConnections)
, m_keepAlive(keepAlive)
, m_timeout(timeout)
{
}
~ConnItem()
{
for(auto& it : m_bunches)
{
auto& b = it.second;
assert(b.m_idleConnections.empty());
assert(b.m_activeConnections.empty());
}
}

std::pair<ConnectionId, mg_connection*> getConnection(const IpPort& ip_port);
void releaseActive(ConnectionId connectionId, const IpPort& ip_port, mg_connection* client);
void onCloseIdle(const IpPort& ip_port, mg_connection* client);
Bunch& getBunch(const IpPort& ip_port, bool createIfNotExists = false);

int m_uriId;
std::string m_uri;
double m_timeout;
int m_maxConnections;
std::deque< std::tuple<BaseTaskPtr,IpPort,Uri> > m_taskQueue;
bool m_keepAlive = false;
std::map<IpPort, Bunch> m_bunches;
private:
ConnectionId m_newId = 0;
};

void onDone(UpstreamSender& uss, ConnItem* connItem, const std::string& ip_port, ConnItem::ConnectionId connectionId, mg_connection* client);
void createUpstreamSender(ConnItem* connItem, const std::string& ip_port, BaseTaskPtr bt, const std::string& uri);
ConnItem* findConnItem(const Output& output, std::string& ip_port, std::string& result_uri);
const std::string& getUri(ConnItem* connItem, const std::string& inputUri);

using Uri2ConnItem = std::map<std::string, ConnItem>;

OnDoneCallback m_onDoneCallback;
uint64_t m_cntUpstreamSender = 0;
uint64_t m_cntUpstreamSenderDone = 0;
ConnItem m_default;
Uri2ConnItem m_conn2item;
mg_mgr* m_mgr = nullptr;
int m_http_callback_port;
protected:
std::unordered_map<std::string,std::string> m_resolveCache;
};

}//namespace graft

Loading