Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Connection monitor service #31

Open
wants to merge 1 commit into
base: mariadb-4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Authors from Codership Oy:
* Daniele Sciascia <[email protected]>, Codership Oy
* Philip Stoev <[email protected]>, Codership Oy
* Mario Karuza <[email protected]>, Codership Oy
* Jan Lindström <[email protected]>, Codership Oy
[Codership employees, add name and email/username above this line, but leave this line intact]

Other contributors:
Expand Down
2 changes: 2 additions & 0 deletions galera/src/galera-sym.map
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
wsrep_init_config_service_v1;
wsrep_deinit_config_service_v1;
wsrep_node_isolation_mode_set_v1;
wsrep_init_connection_monitor_service_v1;
wsrep_deinit_connection_monitor_service_v1;

local: *;
};
13 changes: 12 additions & 1 deletion galera/src/wsrep_provider.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2010-2021 Codership Oy <[email protected]>
// Copyright (C) 2010-2024 Codership Oy <[email protected]>
//

#include "wsrep_api.h"
Expand All @@ -23,6 +23,7 @@
#include "gu_event_service.hpp"
#include "wsrep_config_service.h"
#include "wsrep_node_isolation.h"
#include "wsrep_connection_monitor_service.h"

#include <cassert>

Expand Down Expand Up @@ -1941,4 +1942,14 @@ wsrep_node_isolation_mode_set_v1(enum wsrep_node_isolation_mode mode)
return WSREP_NODE_ISOLATION_SUCCESS;
}

extern "C"
int wsrep_init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t *connection_monitor_service)
{
return gu::init_connection_monitor_service_v1(connection_monitor_service);
}

extern "C" void wsrep_deinit_connection_monitor_service_v1()
{
gu::deinit_connection_monitor_service_v1();
}

101 changes: 100 additions & 1 deletion galerautils/src/gu_asio.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (C) 2014-2020 Codership Oy <[email protected]>
// Copyright (C) 2014-2024 Codership Oy <[email protected]>
//

#include "gu_config.hpp"
Expand Down Expand Up @@ -53,6 +53,8 @@ static wsrep_tls_service_v1_t* gu_tls_service(0);

static wsrep_allowlist_service_v1_t* gu_allowlist_service(0);

static wsrep_connection_monitor_service_v1_t* gu_connection_monitor_service(0);

//
// AsioIpAddress wrapper
//
Expand Down Expand Up @@ -955,3 +957,100 @@ void gu::deinit_allowlist_service_v1()
std::atomic<enum wsrep_node_isolation_mode> gu::gu_asio_node_isolation_mode{
WSREP_NODE_ISOLATION_NOT_ISOLATED
};

//
// ConnectionMonitor
//

static std::mutex gu_connection_monitor_service_init_mutex;
static size_t gu_connection_monitor_service_usage;

int gu::init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t* connection_monitor)
{
log_info << "init_connection_monitor_service_v1";
std::lock_guard<std::mutex> lock(gu_connection_monitor_service_init_mutex);
++gu_connection_monitor_service_usage;
if (gu_connection_monitor_service)
{
assert(gu_connection_monitor_service == connection_monitor);
return 0;
}
gu_connection_monitor_service = connection_monitor;
return 0;
}

void gu::deinit_connection_monitor_service_v1()
{
log_info << "deinit_connection_monitor_service_v1";
std::lock_guard<std::mutex> lock(gu_connection_monitor_service_init_mutex);
assert(gu_connection_monitor_service_usage > 0);
--gu_connection_monitor_service_usage;
if (gu_connection_monitor_service_usage == 0)
gu_connection_monitor_service = 0;
}

void gu::connection_monitor_connect(wsrep_connection_key_t id,
const std::string& scheme,
const std::string& local_addr,
const std::string& remote_uuid,
const std::string& remote_addr)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

// tcp://127.0.0.1:19006 --> 127.0.0.1:19006
std::string ra = remote_addr.substr(6, remote_addr.length());
std::string la = local_addr.substr(6, local_addr.length());

wsrep_buf_t const remote = {remote_uuid.c_str(), remote_uuid.length() };
wsrep_buf_t const lscheme = {scheme.c_str(), scheme.length() };
wsrep_buf_t const raddr = {ra.c_str(), ra.length() };
wsrep_buf_t const laddr = {la.c_str(), la.length() };

gu_connection_monitor_service->connection_monitor_connect_cb(
gu_connection_monitor_service->context,
id,
&lscheme,
&laddr,
&remote,
&raddr);
}

void gu::connection_monitor_disconnect(wsrep_connection_key_t id)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

gu_connection_monitor_service->connection_monitor_disconnect_cb(
gu_connection_monitor_service->context,
id);
}

void gu::connection_monitor_ssl_info(wsrep_connection_key_t id,
const std::string& chipher,
const std::string& issuer,
const std::string& subject,
const std::string& version)
{
if (gu_connection_monitor_service == nullptr)
{
return; // No action
}

wsrep_buf_t const ch = {chipher.c_str(), chipher.length() };
wsrep_buf_t const iss = {issuer.c_str(), issuer.length() };
wsrep_buf_t const sub = {subject.c_str(), subject.length() };
wsrep_buf_t const vers = {version.c_str(), version.length() };

gu_connection_monitor_service->connection_monitor_ssl_info_cb(
gu_connection_monitor_service->context,
id,
&ch,
&iss,
&sub,
&vers);
}
18 changes: 18 additions & 0 deletions galerautils/src/gu_asio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "wsrep_tls_service.h"
#include "wsrep_allowlist_service.h"
#include "wsrep_node_isolation.h"
#include "wsrep_connection_monitor_service.h"

#include <netinet/tcp.h> // tcp_info

Expand Down Expand Up @@ -808,6 +809,23 @@ namespace gu
extern std::atomic<enum wsrep_node_isolation_mode>
gu_asio_node_isolation_mode;

/* Init/deinit global connection monitoring service hooks */
int init_connection_monitor_service_v1(wsrep_connection_monitor_service_v1_t*);
void deinit_connection_monitor_service_v1();
/* Connection monitor connect callback */
void connection_monitor_connect(wsrep_connection_key_t id,
const std::string& scheme,
const std::string& local_addr,
const std::string& remote_uuid,
const std::string& remote_addr);
/* Connection monitor disconnect callback */
void connection_monitor_disconnect(wsrep_connection_key_t id);
/* Connection monitor ssl info callback */
void connection_monitor_ssl_info(wsrep_connection_key_t id,
const std::string& chipher,
const std::string& issuer,
const std::string& subject,
const std::string& version);
}

#endif // GU_ASIO_HPP
27 changes: 26 additions & 1 deletion galerautils/src/gu_asio_stream_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ class AsioTcpStreamEngine : public gu::AsioStreamEngine
{
return gu::AsioErrorCode(last_error_, gu_asio_system_category);
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}

private:
void clear_error() { last_error_ = 0; }
int fd_;
Expand All @@ -99,7 +103,7 @@ class AsioTcpStreamEngine : public gu::AsioStreamEngine
#ifdef GALERA_HAVE_SSL

#include <openssl/ssl.h>

#include <openssl/x509.h>
#if OPENSSL_VERSION_NUMBER >= 0x1010100fL
#define HAVE_READ_EX
#define HAVE_WRITE_EX
Expand Down Expand Up @@ -187,6 +191,21 @@ class AsioSslStreamEngine : public gu::AsioStreamEngine
last_verify_error_);
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE
{
clear_error();
chipher = SSL_get_cipher(ssl_);
X509 *ssl_cert = SSL_get_peer_certificate(ssl_);
if (ssl_cert != nullptr)
{
subject = X509_NAME_oneline(X509_get_subject_name(ssl_cert), 0, 0);
issuer = X509_NAME_oneline(X509_get_issuer_name(ssl_cert), 0, 0);
X509_free(ssl_cert);
}
version = SSL_get_version(ssl_);
}

private:
void clear_error()
{
Expand Down Expand Up @@ -560,6 +579,9 @@ class AsioDynamicStreamEngine : public gu::AsioStreamEngine
return engine_->last_error();
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}

private:
bool socket_poll(long msec)
{
Expand Down Expand Up @@ -687,6 +709,9 @@ class AsioWsrepStreamEngine : public gu::AsioStreamEngine
return gu::AsioErrorCode(last_error_value_, last_error_category_,
&stream_);
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}
private:

enum op_status map_status(enum wsrep_tls_result status)
Expand Down
6 changes: 6 additions & 0 deletions galerautils/src/gu_asio_stream_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ namespace gu
AsioIoService&, const std::string& scheme, int fd,
bool non_blocking);

/**
* Fetch SSL/TLS information
*/
virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) = 0;

protected:
AsioStreamEngine() { }
};
Expand Down
42 changes: 42 additions & 0 deletions galerautils/src/gu_asio_stream_react.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ void gu::AsioStreamReact::close() try
{
GU_ASIO_DEBUG(debug_print() << "Socket not open on close");
}

gu::connection_monitor_disconnect((wsrep_connection_key_t)this);
socket_.close();
}
// Catch all the possible exceptions here, not only asio ones.
Expand Down Expand Up @@ -198,11 +200,32 @@ void gu::AsioStreamReact::connect(const gu::URI& uri) try
socket_.connect(resolve_result->endpoint());
connected_ = true;
prepare_engine(false);
assign_addresses();

auto result(engine_->client_handshake());
switch (result)
{
case AsioStreamEngine::success:
{
gu::connection_monitor_connect((wsrep_connection_key_t)this,
scheme_,
local_addr_,
"", // remote uuid
remote_addr_);
#ifdef GALERA_HAVE_SSL
if (engine_->scheme() == gu::scheme::ssl)
{
std::string chipher, issuer, subject, version;
engine_->get_SSL_info(chipher, issuer, subject, version);
gu::connection_monitor_ssl_info((wsrep_connection_key_t)this,
chipher,
issuer,
subject,
version);
}
#endif
return;
}
case AsioStreamEngine::want_read:
case AsioStreamEngine::want_write:
case AsioStreamEngine::eof:
Expand Down Expand Up @@ -389,6 +412,25 @@ void gu::AsioStreamReact::connect_handler(
set_socket_options(socket_);
prepare_engine(true);
assign_addresses();

gu::connection_monitor_connect((wsrep_connection_key_t)this,
scheme_,
local_addr_,
"", // remote uuid
remote_addr_);

#ifdef GALERA_HAVE_SSL
if (engine_->scheme() == gu::scheme::ssl)
{
std::string chipher, issuer, subject, version;
engine_->get_SSL_info(chipher, issuer, subject, version);
gu::connection_monitor_ssl_info((wsrep_connection_key_t)this,
chipher,
issuer,
subject,
version);
}
#endif
GU_ASIO_DEBUG(debug_print()
<< " AsioStreamReact::connect_handler: init handshake");
auto result(engine_->client_handshake());
Expand Down
3 changes: 3 additions & 0 deletions galerautils/tests/gu_asio_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class MockStreamEngine : public gu::AsioStreamEngine
}
}

virtual void get_SSL_info(std::string &chipher, std::string &subject,
std::string &issuer, std::string &version) GALERA_OVERRIDE {}

enum op_status next_result;
int next_error;
size_t count_client_handshake_called;
Expand Down
2 changes: 1 addition & 1 deletion gcomm/src/asio_tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class gcomm::AsioTcpSocket :
virtual std::string local_addr() const GALERA_OVERRIDE;
virtual std::string remote_addr() const GALERA_OVERRIDE;
virtual State state() const GALERA_OVERRIDE { return state_; }
virtual SocketId id() const GALERA_OVERRIDE { return &socket_; }
virtual SocketId id() const GALERA_OVERRIDE { return socket_.get(); }
virtual SocketStats stats() const GALERA_OVERRIDE;
private:
// AsioSocketHandler interface
Expand Down
21 changes: 20 additions & 1 deletion gcomm/src/gmcast.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2009-2019 Codership Oy <[email protected]>
* Copyright (C) 2009-2024 Codership Oy <[email protected]>
*/

#include "gmcast.hpp"
Expand Down Expand Up @@ -584,6 +584,16 @@ void gcomm::GMCast::gmcast_connect(const std::string& remote_addr)
segment_,
group_name_);

std::ostringstream os;
os << peer->remote_uuid().full_str();
log_info << ":::JAN:::gcomm::GMCast::gmcast_connect";
gu::connection_monitor_connect((wsrep_connection_key_t)tp->id(),
get_scheme(pnet_, use_ssl_, dynamic_socket_),
peer->local_addr(),
os.str(),
remote_addr);


std::pair<ProtoMap::iterator, bool> ret =
proto_map_->insert(std::make_pair(tp->id(), peer));

Expand Down Expand Up @@ -676,6 +686,15 @@ void gcomm::GMCast::handle_established(Proto* est)
// UUID checks are handled during protocol handshake
assert(est->remote_uuid() != uuid());

std::ostringstream os;
os << est->remote_uuid().full_str();
log_info << ":::JAN::: gcomm::GMCast::handle_established";
gu::connection_monitor_connect((wsrep_connection_key_t)est->socket()->id(),
get_scheme(pnet_, use_ssl_, dynamic_socket_),
est->local_addr(),
os.str(),
est->remote_addr());

if (is_evicted(est->remote_uuid()))
{
log_warn << "Closing connection to evicted node " << est->remote_uuid();
Expand Down
Loading