Skip to content

Commit

Permalink
Merge pull request #17 from MissouriMRDT/hotfix/non-blocking-sockets
Browse files Browse the repository at this point in the history
Non-blocking Sockets and Resource Management
  • Loading branch information
ClayJay3 authored Mar 8, 2024
2 parents 0fa005f + edc6045 commit 52140eb
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ SpacesInContainerLiterals: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
InsertNewlineAtEOF: true
Standard: c++14
Standard: c++17
TabWidth: 4
UseTab: Never
DisableFormat: false
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
// C/C++ extension settings.
"C_Cpp.formatting": "clangFormat",
"C_Cpp.default.compilerPath": "/usr/bin/g++-10",
"C_Cpp.default.cppStandard": "c++20",
"C_Cpp.default.cppStandard": "c++17",
"C_Cpp.default.includePath": [
"/usr/local/include/quill/",
"/usr/local/include/quill/**",
Expand Down
3 changes: 3 additions & 0 deletions src/RoveComm/RoveCommConsts.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@ namespace rovecomm
#define ROVECOMM_PACKET_MAX_DATA_COUNT 65535
#define ROVECOMM_PACKET_HEADER_SIZE 6
#define ROVECOMM_VERSION 3

// Server constants.
const int ROVECOMM_THREAD_MAX_IPS = 120;
} // namespace rovecomm
#endif // ROVECOMM_CONSTS_H
152 changes: 95 additions & 57 deletions src/RoveComm/RoveCommTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
/// \cond
#include <arpa/inet.h>
#include <cstring>
#include <fcntl.h>
#include <functional>
#include <iostream>
#include <netinet/in.h>
Expand All @@ -32,6 +33,34 @@
******************************************************************************/
namespace rovecomm
{
/******************************************************************************
* @brief Construct a new RoveCommTCP::RoveCommTCP object.
*
*
* @author clayjay3 ([email protected])
* @date 2024-03-07
******************************************************************************/
RoveCommTCP::RoveCommTCP()
{
// Initialize member variables.
m_nTCPSocket = -1;
m_nCurrentTCPClientSocket = -1;

// Set an IPS cap in the backend RoveComm thread.
this->SetMainThreadIPSLimit(rovecomm::ROVECOMM_THREAD_MAX_IPS);
}

/******************************************************************************
* @brief Destroy the RoveCommTCP::RoveCommTCP object.
*
* @author Eli Byrd ([email protected])
* @date 2024-02-07
******************************************************************************/
RoveCommTCP::~RoveCommTCP()
{
CloseTCPSocket();
}

/******************************************************************************
* @brief Initializes a TCP socket and binds it to the specified IP address and
* port. And then starts the threaded continuous code in AutonomyThread.
Expand All @@ -55,6 +84,16 @@ namespace rovecomm
perror("Failed to create TCP socket");
return false;
}
else
{
// Attempt to set the socket to non-blocking mode.
if (fcntl(m_nTCPSocket, F_SETFL, fcntl(m_nTCPSocket, F_GETFL) | O_NONBLOCK) == -1)
{
// Handle and print error.
perror("Failed to set UDP socket to non-blocking mode.");
return false;
}
}

// Configure the server address
memset(&m_saTCPServerAddr, 0, sizeof(m_saTCPServerAddr));
Expand All @@ -63,7 +102,7 @@ namespace rovecomm
m_saTCPServerAddr.sin_port = htons(nPort);

// Bind the socket to the server address
if (bind(m_nTCPSocket, (struct sockaddr*) &m_saTCPServerAddr, sizeof(m_saTCPServerAddr)) == -1)
if (bind(m_nTCPSocket.load(), (struct sockaddr*) &m_saTCPServerAddr, sizeof(m_saTCPServerAddr)) == -1)
{
perror("Failed to bind TCP socket");
close(m_nTCPSocket);
Expand Down Expand Up @@ -139,6 +178,12 @@ namespace rovecomm

// Send the data
ssize_t siBytesSent = send(nClientSocket, &stData, sizeof(stData), 0);
// Check if any bytes were sent.
if (siBytesSent == -1)
{
// Handle and print error message.
perror("Failed to send data to TCP client socket");
}

// Close the client socket
close(nClientSocket);
Expand All @@ -165,6 +210,9 @@ namespace rovecomm
template<typename T>
void RoveCommTCP::AddTCPCallback(std::function<void(const RoveCommPacket<T>&)> fnCallback, const uint16_t& unCondition)
{
// Acquire a write lock to protect the callback vectors.
std::unique_lock<std::shared_mutex> lkCallbackLock(m_muCallbackMutex);

// Add the callback function to the vector of TCP callbacks for the specified data type
if constexpr (std::is_same_v<T, uint8_t>)
{
Expand Down Expand Up @@ -230,6 +278,9 @@ namespace rovecomm
template<typename T>
void RoveCommTCP::RemoveTCPCallback(std::function<void(const RoveCommPacket<T>&)> fnCallback)
{
// Acquire a write lock to protect the callback vectors.
std::unique_lock<std::shared_mutex> lkCallbackLock(m_muCallbackMutex);

// Remove the callback function from the appropriate vector based on the data type T
if constexpr (std::is_same_v<T, uint8_t>)
{
Expand Down Expand Up @@ -331,8 +382,12 @@ namespace rovecomm
void RoveCommTCP::ProcessPacket(const RoveCommData& stData,
const std::vector<std::tuple<std::function<void(const rovecomm::RoveCommPacket<T>&)>, uint16_t>>& vCallbacks)
{
// Create instance variables.
RoveCommPacket<T> stPacket = UnpackData<T>(stData);

// Acquire a read lock to protect the callback vectors.
std::shared_lock<std::shared_mutex> lkCallbackLock(m_muCallbackMutex);

// Invoke registered callbacks
for (const std::tuple<std::function<void(const rovecomm::RoveCommPacket<T>&)>, uint16_t>& tpCallbackInfo : vCallbacks)
{
Expand Down Expand Up @@ -363,58 +418,52 @@ namespace rovecomm
******************************************************************************/
void RoveCommTCP::ReceiveTCPPacketAndCallback()
{
// Accept a client connection
struct sockaddr_in saClientAddr;
socklen_t sklClientAddrLen = sizeof(saClientAddr);
int nClientSocket = accept(m_nTCPSocket, (struct sockaddr*) &saClientAddr, &sklClientAddrLen);
if (nClientSocket == -1)
{
perror("Failed to accept client connection");
return;
}

// Receive data from the client
RoveCommData stData;
ssize_t siBytesReceived = recv(nClientSocket, &stData, sizeof(stData), 0);
if (siBytesReceived < 0)
{
perror("Error receiving data from client");
close(nClientSocket);
return;
}
else if (siBytesReceived == 0)
if (m_nCurrentTCPClientSocket == -1)
{
// Connection closed by client
close(nClientSocket);
return;
// Accept a client connection
struct sockaddr_in m_saClientAddr;
socklen_t sklClientAddrLen = sizeof(m_saClientAddr);
m_nCurrentTCPClientSocket = accept(m_nTCPSocket, (struct sockaddr*) &m_saClientAddr, &sklClientAddrLen);
}

// Process the received packet and invoke the appropriate callback
if (siBytesReceived == sizeof(RoveCommData))
else
{
// Extract the data id from the received data
uint16_t unDataId = (static_cast<uint16_t>(stData.unBytes[1]) << 8) | static_cast<uint16_t>(stData.unBytes[2]);

// Determine the data type from the received data
manifest::DataTypes eDataType = manifest::Helpers::GetDataTypeFromId(unDataId);
// Receive data from the client
RoveCommData stData;
ssize_t siBytesReceived = recv(m_nCurrentTCPClientSocket, &stData, sizeof(stData), 0);

// Convert RoveCommData to appropriate RoveCommPacket based on data type
switch (eDataType)
// Process the received packet and invoke the appropriate callback
if (siBytesReceived == sizeof(RoveCommData))
{
case manifest::DataTypes::UINT8_T: ProcessPacket<uint8_t>(stData, tcp::vUInt8Callbacks); break;
case manifest::DataTypes::INT8_T: ProcessPacket<int8_t>(stData, tcp::vInt8Callbacks); break;
case manifest::DataTypes::UINT16_T: ProcessPacket<uint16_t>(stData, tcp::vUInt16Callbacks); break;
case manifest::DataTypes::INT16_T: ProcessPacket<int16_t>(stData, tcp::vInt16Callbacks); break;
case manifest::DataTypes::UINT32_T: ProcessPacket<uint32_t>(stData, tcp::vUInt32Callbacks); break;
case manifest::DataTypes::INT32_T: ProcessPacket<int32_t>(stData, tcp::vInt32Callbacks); break;
case manifest::DataTypes::FLOAT_T: ProcessPacket<float>(stData, tcp::vFloatCallbacks); break;
case manifest::DataTypes::DOUBLE_T: ProcessPacket<double>(stData, tcp::vDoubleCallbacks); break;
case manifest::DataTypes::CHAR: ProcessPacket<char>(stData, tcp::vCharCallbacks); break;
// Extract the data id from the received data
uint16_t unDataId = (static_cast<uint16_t>(stData.unBytes[1]) << 8) | static_cast<uint16_t>(stData.unBytes[2]);

// Determine the data type from the received data
manifest::DataTypes eDataType = manifest::Helpers::GetDataTypeFromId(unDataId);

// Convert RoveCommData to appropriate RoveCommPacket based on data type
switch (eDataType)
{
case manifest::DataTypes::UINT8_T: ProcessPacket<uint8_t>(stData, tcp::vUInt8Callbacks); break;
case manifest::DataTypes::INT8_T: ProcessPacket<int8_t>(stData, tcp::vInt8Callbacks); break;
case manifest::DataTypes::UINT16_T: ProcessPacket<uint16_t>(stData, tcp::vUInt16Callbacks); break;
case manifest::DataTypes::INT16_T: ProcessPacket<int16_t>(stData, tcp::vInt16Callbacks); break;
case manifest::DataTypes::UINT32_T: ProcessPacket<uint32_t>(stData, tcp::vUInt32Callbacks); break;
case manifest::DataTypes::INT32_T: ProcessPacket<int32_t>(stData, tcp::vInt32Callbacks); break;
case manifest::DataTypes::FLOAT_T: ProcessPacket<float>(stData, tcp::vFloatCallbacks); break;
case manifest::DataTypes::DOUBLE_T: ProcessPacket<double>(stData, tcp::vDoubleCallbacks); break;
case manifest::DataTypes::CHAR: ProcessPacket<char>(stData, tcp::vCharCallbacks); break;
}

// Close the client socket
close(m_nCurrentTCPClientSocket);
m_nCurrentTCPClientSocket = -1;
}
// Still waiting for data or connection return without error.
else if (siBytesReceived == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
{
return;
}
}

// Close the client socket
close(nClientSocket);
}

/******************************************************************************
Expand Down Expand Up @@ -465,17 +514,6 @@ namespace rovecomm
}
}

/******************************************************************************
* @brief Destroy the RoveCommTCP::RoveCommTCP object.
*
* @author Eli Byrd ([email protected])
* @date 2024-02-07
******************************************************************************/
RoveCommTCP::~RoveCommTCP()
{
CloseTCPSocket();
}

// Explicitly define template function types for TCP class
template ssize_t RoveCommTCP::SendTCPPacket<uint8_t>(const RoveCommPacket<uint8_t>&, const char*, int);
template void RoveCommTCP::AddTCPCallback<uint8_t>(std::function<void(const RoveCommPacket<uint8_t>&)>, const uint16_t&);
Expand Down
15 changes: 11 additions & 4 deletions src/RoveComm/RoveCommTCP.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

/// \cond
#include <arpa/inet.h>
#include <atomic>
#include <csignal>
#include <cstring>
#include <functional>
#include <iostream>
#include <netinet/in.h>
#include <shared_mutex>
#include <sys/socket.h>
#include <unistd.h>
#include <vector>
Expand Down Expand Up @@ -53,8 +55,11 @@ namespace rovecomm
{
private:
// Private member variables
int m_nTCPSocket;
std::atomic_int m_nTCPSocket;
struct sockaddr_in m_saTCPServerAddr;
std::atomic_int m_nCurrentTCPClientSocket;
struct sockaddr_in m_saClientAddr;
std::shared_mutex m_muCallbackMutex;

// Packet processing functions
template<typename T>
Expand All @@ -67,7 +72,9 @@ namespace rovecomm

public:
// Constructor
RoveCommTCP() : m_nTCPSocket(-1) {}
RoveCommTCP();
// Destructor
~RoveCommTCP();

// Initialization
bool InitTCPSocket(const char* cIPAddress, int nPort);
Expand All @@ -86,8 +93,8 @@ namespace rovecomm
// Deinitialization
void CloseTCPSocket();

// Destructor
~RoveCommTCP();
// Selectively make inherited method public so we can get RoveCommNode FPS.
using AutonomyThread::GetIPS;

// NOTE: These functions are for testing purposes only and should not be used in production code!
template<typename T>
Expand Down
Loading

0 comments on commit 52140eb

Please sign in to comment.