Skip to content

Commit

Permalink
tls: Added tls_session_logger
Browse files Browse the repository at this point in the history
A class that wraps around a typical logger that appends useful
information about the connection when logging.

Added use to the OpenSSL implementation.

Signed-off-by: Michael Boquard <[email protected]>
  • Loading branch information
michael-redpanda committed Dec 6, 2024
1 parent 7c4344e commit ba70bc2
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 2 deletions.
79 changes: 77 additions & 2 deletions src/net/ossl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,10 @@ class session : public enable_shared_from_this<session>, public session_impl {

session(session_type t, shared_ptr<tls::certificate_credentials> creds,
std::unique_ptr<net::connected_socket_impl> sock, tls_options options = {})
: _sock(std::move(sock))
: _logger(this)
, _sock(std::move(sock))
, _local_address(fmt::format("{}", _sock->local_address()))
, _remote_address(fmt::format("{}", _sock->remote_address()))
, _creds(creds->_impl)
, _in(_sock->source())
, _out(_sock->sink())
Expand Down Expand Up @@ -815,11 +818,17 @@ class session : public enable_shared_from_this<session>, public session_impl {
assert(_output_pending.available());
}

const char * get_type_string() const {
return _type == session_type::CLIENT ? "Client": "Server";
}

// This function waits for the _output_pending future to resolve
// If an error occurs, it is saved off into _error and returned
future<> wait_for_output() {
_logger.trace("wait_for_output");
return std::exchange(_output_pending, make_ready_future())
.handle_exception([this](auto ep) {
_logger.debug("wait_for_output error: {}", ep);
_error = ep;
return make_exception_future(ep);
});
Expand Down Expand Up @@ -891,7 +900,9 @@ class session : public enable_shared_from_this<session>, public session_impl {
// will attempt to send the provided packet. If a renegotiation is needed
// any unprocessed part of the packet is returned.
future<net::packet> do_put(net::packet p) {
_logger.trace("do_put");
if (!connected()) {
_logger.debug("do_put: not connected");
return make_ready_future<net::packet>(std::move(p));
}
assert(_output_pending.available());
Expand All @@ -910,8 +921,10 @@ class session : public enable_shared_from_this<session>, public session_impl {
size_t bytes_written = 0;
auto write_rc = SSL_write_ex(
_ssl.get(), frag_view.data(), frag_view.size(), &bytes_written);
_logger.trace("do_put: SSL_write_ex: {}", write_rc);
if (write_rc != 1) {
const auto ssl_err = SSL_get_error(_ssl.get(), write_rc);
_logger.trace("do_put: SSL_get_error: {}", ssl_err);
if (ssl_err == SSL_ERROR_WANT_WRITE) {
return wait_for_output().then([] {
return stop_iteration::no;
Expand All @@ -922,6 +935,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
}
return handle_do_put_ssl_err(ssl_err);
} else {
_logger.trace("do_put: bytes_written: {}", bytes_written);
frag_view.remove_prefix(bytes_written);
p.trim_front(bytes_written);
return wait_for_output().then([] {
Expand All @@ -930,7 +944,8 @@ class session : public enable_shared_from_this<session>, public session_impl {
}
});
}
).then([&p] {
).then([this, &p] {
_logger.trace("do_put: returning packet of size: {}", p.len());
return std::move(p);
});
}
Expand All @@ -940,6 +955,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
// Used to push unencrypted data through OpenSSL, which will
// encrypt it and then place it into the output bio.
future<> put(net::packet p) override {
_logger.trace("put");
constexpr size_t openssl_max_record_size = 16 * 1024;
if (_error) {
return make_exception_future(_error);
Expand All @@ -949,6 +965,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
std::system_error(EPIPE, std::system_category()));
}
if (!connected()) {
_logger.trace("put: not connected, performing handshake");
return handshake().then(
[this, p = std::move(p)]() mutable { return put(std::move(p)); });
}
Expand All @@ -966,8 +983,10 @@ class session : public enable_shared_from_this<session>, public session_impl {
return do_put(std::move(p));
}).then([this](net::packet p) {
if (eof() || p.len() == 0) {
_logger.trace("put: eof: {}, p.len(): {}", eof(), p.len());
return make_ready_future();
} else {
_logger.trace("put: not completed packet sending, re-doing handshake");
return handshake().then([this, p = std::move(p)]() mutable {
return put(std::move(p));
});
Expand All @@ -979,7 +998,9 @@ class session : public enable_shared_from_this<session>, public session_impl {
// This function will walk through the handshake with a remote peer
// If EOF is encountered, ENOTCONN is thrown
future<> do_handshake() {
_logger.trace("do_handshake");
if (eof()) {
_logger.trace("do_handshake: eof encountered");
// if we have experienced and eof, set the error and return
// GnuTLS will probably return GNUTLS_E_PREMATURE_TERMINATION
// from gnutls_handshake in this situation.
Expand All @@ -989,15 +1010,18 @@ class session : public enable_shared_from_this<session>, public session_impl {
"EOF encountered during handshake"));
return make_exception_future(_error);
} else if (connected()) {
_logger.trace("do_handshake: already connected");
return make_ready_future<>();
}
return do_until(
[this] { return connected() || eof(); },
[this] {
try {
auto n = SSL_do_handshake(_ssl.get());
_logger.trace("do_handshake: SSL_do_handshake: {}", n);
if (n <= 0) {
auto ssl_error = SSL_get_error(_ssl.get(), n);
_logger.trace("do_handshake: SSL_get_error: {}", ssl_error);
switch(ssl_error) {
case SSL_ERROR_NONE:
// probably shouldn't have gotten here
Expand All @@ -1020,6 +1044,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
case SSL_ERROR_SSL:
{
auto ec = ERR_GET_REASON(ERR_peek_error());
_logger.debug("do_handshake: ERR_GET_REASON: {}", ec);
switch (ec) {
case SSL_R_UNEXPECTED_EOF_WHILE_READING:
// well in this situation, the remote end closed
Expand Down Expand Up @@ -1062,17 +1087,21 @@ class session : public enable_shared_from_this<session>, public session_impl {
// This function will attempt to pull data off of the _in stream
// if there isn't already data needing to be processed first.
future<> wait_for_input() {
_logger.trace("wait_for_input");
// If we already have data, then it needs to be processed
if (!_input.empty()) {
_logger.trace("wait_for_input: input not empty");
return make_ready_future();
}
return _in.get()
.then([this](buf_type buf) {
// Set EOF if it's empty
_logger.debug("wait_for_input: buffer {}empty", buf.empty() ? "is ": "");
_eof |= buf.empty();
_input = std::move(buf);
})
.handle_exception([this](auto ep) {
_logger.debug("wait_for_input: exception: {}", ep);
_error = ep;
return make_exception_future(ep);
});
Expand All @@ -1083,12 +1112,14 @@ class session : public enable_shared_from_this<session>, public session_impl {
// SSL session using SSL_read. If ther eis no data, then
// we will call perform_pull and wait for data to arrive.
future<buf_type> do_get() {
_logger.trace("do_get");
// Data is available to be pulled of the SSL session if there is pending
// data on the SSL session or there is data in the in_bio() which SSL reads
// from
auto data_to_pull = (BIO_ctrl_pending(in_bio()) + SSL_pending(_ssl.get())) > 0;
auto f = make_ready_future<>();
if (!data_to_pull) {
_logger.trace("do_get: no data to pull, waiting for input");
// If nothing is in the SSL buffers then we may have to wait for
// data to come in
f = wait_for_input();
Expand All @@ -1098,12 +1129,16 @@ class session : public enable_shared_from_this<session>, public session_impl {
return make_ready_future<buf_type>();
}
auto avail = BIO_ctrl_pending(in_bio()) + SSL_pending(_ssl.get());
_logger.trace("do_get: available: {}", avail);
buf_type buf(avail);
size_t bytes_read = 0;
auto read_result = SSL_read_ex(
_ssl.get(), buf.get_write(), avail, &bytes_read);
_logger.trace("do_get: SSL_read_ex: {}", read_result);
_logger.trace("do_get: SSL_read_ex bytes_ready: {}", bytes_read);
if (read_result != 1) {
const auto ssl_err = SSL_get_error(_ssl.get(), read_result);
_logger.trace("do_get: SSL_get_error: {}", ssl_err);
switch (ssl_err) {
case SSL_ERROR_ZERO_RETURN:
// Remote end has closed
Expand Down Expand Up @@ -1164,37 +1199,45 @@ class session : public enable_shared_from_this<session>, public session_impl {

// Called by user applications to pull data off of the TLS session
future<buf_type> get() override {
_logger.trace("get");
if (_error) {
return make_exception_future<buf_type>(_error);
}
if (_shutdown || eof()) {
return make_ready_future<buf_type>(buf_type());
}
if (!connected()) {
_logger.trace("get: not connected, performing handshake");
return handshake().then(std::bind(&session::get, this));
}
return with_semaphore(_in_sem, 1, std::bind(&session::do_get, this))
.then([this](buf_type buf) {
if (buf.empty() && !eof()) {
_logger.trace("get: buffer empty and not eof, performing handshake");
return handshake().then(std::bind(&session::get, this));
}
_logger.trace("get: returning buffer of size {}", buf.size());
return make_ready_future<buf_type>(std::move(buf));
});
}

// Performs shutdown
future<> do_shutdown() {
_logger.trace("do_shutdown");
if (_error || !connected()) {
_logger.trace("do_shutdown: error exists or not connected");
return make_ready_future();
}

auto res = SSL_shutdown(_ssl.get());
_logger.trace("do_shutdown: SSL_shutdown: {}", res);
if (res == 1) {
return wait_for_output();
} else if (res == 0) {
return yield().then([this] { return do_shutdown(); });
} else {
auto ssl_err = SSL_get_error(_ssl.get(), res);
_logger.trace("do_shutdown: SSL_get_error: {}", ssl_err);
switch (ssl_err) {
case SSL_ERROR_NONE:
// this is weird, yield and try again
Expand Down Expand Up @@ -1242,12 +1285,15 @@ class session : public enable_shared_from_this<session>, public session_impl {
}

void verify() {
_logger.trace("verify");
if (!_creds->_enable_certificate_verification) {
_logger.debug("verify: certificate verification disabled, skipping");
return;
}
// A success return code (0) does not signify if a cert was presented or not, that
// must be explicitly queried via SSL_get_peer_certificate
auto res = SSL_get_verify_result(_ssl.get());
_logger.trace("verify: SSL_get_verify_result: {}", res);
if (res != X509_V_OK) {
auto stat_str(X509_verify_cert_error_string(res));
auto dn = extract_dn_information();
Expand All @@ -1264,6 +1310,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
}
throw verification_error(stat_str);
} else if (SSL_get0_peer_certificate(_ssl.get()) == nullptr) {
_logger.trace("verify: No peer certificate");
// If a peer certificate was not presented,
// SSL_get_verify_result will return X509_V_OK:
// https://www.openssl.org/docs/man3.0/man3/SSL_get_verify_result.html
Expand Down Expand Up @@ -1294,6 +1341,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
// This function waits for eof() to occur on the input stream
// Unless wait_for_eof_on_shutdown is false
future<> wait_for_eof() {
_logger.trace("wait_for_eof");
if (!_options.wait_for_eof_on_shutdown) {
// Seastar option to allow users to just bypass EOF waiting
return make_ready_future();
Expand All @@ -1305,12 +1353,15 @@ class session : public enable_shared_from_this<session>, public session_impl {
return do_until(
[this] { return eof(); },
[this] { return do_get().discard_result(); });
}).finally([this] {
_logger.trace("wait_for_eof: complete");
});
}

// This function is called to kick off the handshake. It will obtain
// locks on the _in_sem and _out_sem semaphores and start the handshake.
future<> handshake() {
_logger.trace("handshake");
if (_creds->need_load_system_trust()) {
if (!SSL_CTX_set_default_verify_paths(_ctx.get())) {
throw make_ossl_error(
Expand All @@ -1332,6 +1383,7 @@ class session : public enable_shared_from_this<session>, public session_impl {
}

future<> shutdown() {
_logger.trace("shutdown");
// first, make sure any pending write is done.
// bye handshake is a flush operation, but this
// allows us to not pay extra attention to output state
Expand All @@ -1351,8 +1403,10 @@ class session : public enable_shared_from_this<session>, public session_impl {
}

void close() noexcept override {
_logger.trace("close");
// only do once.
if (!std::exchange(_shutdown, true)) {
_logger.trace("close: performing shutdown");
// running in background. try to bye-handshake us nicely, but after 10s we forcefully close.
engine().run_in_background(with_timeout(
timer<>::clock::now() + std::chrono::seconds(10), shutdown())
Expand Down Expand Up @@ -1465,6 +1519,14 @@ class session : public enable_shared_from_this<session>, public session_impl {
});
}

const sstring& local_address() const noexcept {
return _local_address;
}

const sstring& remote_address() const noexcept {
return _remote_address;
}

private:
std::vector<subject_alt_name> do_get_alt_name_information(const x509_ptr &peer_cert,
const std::unordered_set<subject_alt_name_type> &types) const {
Expand Down Expand Up @@ -1711,7 +1773,10 @@ class session : public enable_shared_from_this<session>, public session_impl {
BIO* out_bio() { return SSL_get_wbio(_ssl.get()); }

private:
tls_session_logger<session> _logger;
std::unique_ptr<net::connected_socket_impl> _sock;
sstring _local_address;
sstring _remote_address;
shared_ptr<tls::certificate_credentials::impl> _creds;
data_source _in;
data_sink _out;
Expand Down Expand Up @@ -1855,9 +1920,11 @@ int bio_create(BIO*) {
/// and set the retry write flag.
int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
auto session = unwrap_bio_ptr(b);
session->_logger.trace("bio_write_ex: dlen {}", dlen);
BIO_clear_retry_flags(b);

if (!session->_output_pending.available()) {
session->_logger.trace("bio_write_ex: nothing pending in output");
BIO_set_retry_write(b);
return 0;
}
Expand All @@ -1870,9 +1937,11 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
msg.append(std::string_view(data, dlen));
n = msg.size();
session->_output_pending = session->_out.put(std::move(msg).release());
session->_logger.trace("bio_write_ex: Appended {} bytes to output pending", n);
}

if (session->_output_pending.failed()) {
session->_logger.debug("bio_write_ex: output pending has error");
std::rethrow_exception(session->_output_pending.get_exception());
}

Expand All @@ -1882,9 +1951,11 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {

return 1;
} catch(const std::system_error & e) {
session->_logger.debug("bio_write_ex: system error occurred: {}", e.what());
ERR_raise_data(ERR_LIB_SYS, e.code().value(), e.what());
session->_output_pending = make_exception_future<>(std::current_exception());
} catch(...) {
session->_logger.debug("bio_write_ex: unknown error occurred");
ERR_raise(ERR_LIB_SYS, EIO);
session->_output_pending = make_exception_future<>(std::current_exception());
}
Expand All @@ -1900,13 +1971,16 @@ int bio_write_ex(BIO* b, const char * data, size_t dlen, size_t * written) {
/// the _input buffer and return it to the caller.
int bio_read_ex(BIO* b, char * data, size_t dlen, size_t *readbytes) {
auto session = unwrap_bio_ptr(b);
session->_logger.trace("bio_read_ex: dlen: {}", dlen);
BIO_clear_retry_flags(b);
if (session->eof()) {
session->_logger.trace("bio_read_ex: eof");
BIO_set_flags(b, BIO_FLAGS_IN_EOF);
return 0;
}

if (session->_input.empty()) {
session->_logger.trace("bio_read_ex: input empty");
BIO_set_retry_read(b);
return 0;
}
Expand All @@ -1918,6 +1992,7 @@ int bio_read_ex(BIO* b, char * data, size_t dlen, size_t *readbytes) {
*readbytes = n;
}

session->_logger.trace("bio_read_ex: read {} bytes from input", n);
return 1;
}

Expand Down
2 changes: 2 additions & 0 deletions src/net/tls-impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ module seastar;

namespace seastar {

logger tls::tls_log("seastar-tls");

// Helper
struct file_info {
sstring filename;
Expand Down
Loading

0 comments on commit ba70bc2

Please sign in to comment.