diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b025ebe --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +*.o +tags +TcpClientMain +TcpSrvMain diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4f7cf11 --- /dev/null +++ b/Makefile @@ -0,0 +1,29 @@ +INC_SVR_COMM = -I. -I/usr/local/include +LIB_SVR_COMM = -L/usr/local/lib -levent -lpthread + +INC_ALL=$(INC_SVR_COMM) +LIB_ALL=$(LIB_SVR_COMM) + +BINARY = TcpSrvMain TcpClientMain + +all:$(BINARY) + + +.SUFFIXES: .o .cpp +CXX = g++ +CC = gcc + +CXXFLAGS= -g -Wall + +%.o:%.cpp + $(CXX) $(CFLAGS) -c $^ $(INC_ALL) + +TcpSrvMain:t_main.o t_tcpsrv.o t_socket.o t_proto.o t_eventpipe.o t_eventconn.o t_eventbase.o t_worktask.o t_eventlisten.o t_thread.o + $(CXX) $(CXXFLAGS) -o $@ $^ $(LIB_ALL) +TcpClientMain:t_client.o t_tcpsrv.o t_socket.o t_proto.o t_eventpipe.o t_eventconn.o t_eventbase.o t_worktask.o t_eventlisten.o t_thread.o + $(CXX) $(CXXFLAGS) -o $@ $^ $(LIB_ALL) +clean: + rm -f *.o *~ $(BINARY) + +strip: + strip $(BINARY) diff --git a/t_client.cpp b/t_client.cpp new file mode 100644 index 0000000..c49492f --- /dev/null +++ b/t_client.cpp @@ -0,0 +1,217 @@ +#include "t_client.hpp" + +namespace T_CLIENT +{ + TestOneClient::TestOneClient(const std::string& sRemoteIp, int iRemotePort, int iSendTimes) + :m_sRemoteIp(sRemoteIp), m_iRemotePort(iRemotePort), m_iTestNums(iSendTimes), m_iFd(-1), + m_pCodec(NULL) + { + } + + TestOneClient::~TestOneClient() + { + if (m_iFd > 0) + { + close(m_iFd); + m_iFd = -1; + } + /// + if (m_pCodec) + { + delete m_pCodec; + m_pCodec = NULL; + } + } + + int TestOneClient::Init() + { + m_iFd = socket( AF_INET, SOCK_STREAM , 0 ); + if (m_iFd < 0) + { + std::cout << "sockect() call fail" << std::endl; + return -1; + } + + m_stDestAddr.sin_family = AF_INET; + m_stDestAddr.sin_addr.s_addr = inet_addr(const_cast (m_sRemoteIp.c_str())); + m_stDestAddr.sin_port=htons(m_iRemotePort); + int iRet = ::connect( m_iFd,(struct sockaddr *)&m_stDestAddr, sizeof(m_stDestAddr) ); + if (iRet < 0) + { + perror("connect remote tcp srv fail"); + return -1; + } + m_pCodec = new BusiCodec(1); + + return 0; + } + // + int TestOneClient::main() + { + for (int i = 0; i < m_iTestNums; ++i) + { + char buf[1024]; + memset(buf, 0, sizeof(buf)); + snprintf(buf, sizeof(buf), "%d'client is test!", i); + int iSendLen = 0; + char sSendBuf[1024] = {0}; + if ( RET_RECV_OK != m_pCodec->EnCode(buf, strlen(buf)+1, sSendBuf, &iSendLen) ) + { + std::cout << "encode client data fail" << std::endl; + break; + } + + std::string sBuf; + sBuf.assign(buf, strlen(buf)+1); + int iRet = 0; + while ( iSendLen > 0 ) + { + iRet = ::send(m_iFd, sSendBuf, iSendLen, 0); + if (iRet < 0) + { + std::cout << "send client data fail" << std::endl; + break; + } + iSendLen -= iRet; + } + std::cout << "send client data done, next to recv response, send data: " << sBuf << std::endl; + + char rBuf[1024]; + char rBodybuf[1024]; + ::memset(rBuf, 0, sizeof(rBuf)); + iRet = 0; + int iRLen = 0; + int iDecodeMsgLen = 0; + + do { + iRet = ::recv(m_iFd, rBuf + iRet, sizeof(rBuf) - iRet, 0); + if (iRet < 0) + { + std::cout << "recv data from srv fail" << std::endl; + break; + } + else if (iRet == 0) + { + break; + } + + iRLen += iRet; + enum CODERET retDecod = m_pCodec->DeCode(rBuf, iRLen, rBodybuf, &iDecodeMsgLen ); + if (retDecod == RET_RECV_OK) + { + break; + } + else + { + + } + } while(1); + + std::string sRBuf; + sRBuf.assign(rBodybuf, iDecodeMsgLen - sizeof(4)); + std::cout << "recv reponse: " << sRBuf << std::endl; + } + this->Stop(); + return 0; + } + + /*********************************************************************** + * + * + **********************************************************************/ + TestClient::TestClient(int argc, char **argv) :m_bOk(false), m_iClientNums(0),m_iReqNums(0) + { + if (ParseCmd(argc, argv) == false) + { + Usage(argv[0]); + return ; + } + m_bOk = true; + } + + TestClient::~TestClient() + { + } + + bool TestClient::ParseCmd(int argc, char **argv) + { + if (argc <5) + { + std::cout << "client run param less than 5" << std::endl; + return false; + } + + if (strlen(argv[1]) == 0) + { + std::cout << "remote ip is empty" << std::endl; + return false; + } + m_sRemoteIp.assign(argv[1]); + + int iPort = ::atoi(argv[2]); + if (iPort <= 0) + { + std::cout << "remote port less than 0" << std::endl; + return false; + } + m_iRemotePort = iPort; + + m_iClientNums = ::atoi(argv[3]); + if (m_iClientNums <= 0) + { + std::cout << "client nums less than 0 " << std::endl; + return false; + } + + m_iReqNums = ::atoi(argv[4]); + if (m_iReqNums <= 0) + { + std::cout << "req nums for each client less than 0 " << std::endl; + return false; + } + return true; + } + + void TestClient::Usage(const char *pBinName) + { + std::cout <<"Usage: " << pBinName << " remoteIp " << " remotePort " << + " clientnums " << " reqnum_in_one_client" < vTestClientNode; + for (int i = 0; i < m_iClientNums; ++i) + { + TestOneClient* pNode = new TestOneClient(m_sRemoteIp, m_iRemotePort, m_iReqNums); + vTestClientNode.push_back(pNode); + } + + for (std::vector::iterator it = vTestClientNode.begin(); it != vTestClientNode.end(); ++it) + { + (*it)->Start(); + } + std::cout << "all client thread start " << std::endl; + + for (std::vector::iterator it = vTestClientNode.begin(); it != vTestClientNode.end(); ) + { + (*it)->JoinWork(); + delete (*it); + it = vTestClientNode.erase(it); + } + return 0; + } + +} + + +////////////////////////////////////////////// +///////////////////////////////////////////// +int main(int argc, char **argv) +{ + T_CLIENT::TestClient client(argc, argv); + client.main(); + + return 0; +} diff --git a/t_client.hpp b/t_client.hpp new file mode 100644 index 0000000..fbe78bb --- /dev/null +++ b/t_client.hpp @@ -0,0 +1,82 @@ +/** + * @file: t_client.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-19 + */ + +#ifndef _T_CLIENT_HPP_ +#define _T_CLIENT_HPP_ + +#ifdef __cplusplus + extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __cplusplus + } +#endif + +#include +#include +#include +#include "t_thread.hpp" +#include "t_proto.hpp" + +using namespace T_TCP; +using namespace std; + + +namespace T_CLIENT +{ + class TestOneClient: public PthreadBase + { + public: + TestOneClient(const std::string& sRemoteIp, int iRemotePort, int iSendTimes); + virtual ~TestOneClient(); + protected: + int main(); + virtual int Init(); + private: + std::string m_sRemoteIp; + int m_iRemotePort; + int m_iTestNums; + int m_iFd; + struct sockaddr_in m_stDestAddr; + BusiCodec* m_pCodec; + }; + + ////////////////////////// + class TestClient + { + public: + TestClient(int argc, char **argv); + virtual ~TestClient(); + int main(); + + private: + void Usage(const char *pBinName); + bool ParseCmd(int argc, char **argv); + + private: + bool m_bOk; + int m_iClientNums; + std::string m_sRemoteIp; + int m_iRemotePort; + int m_iReqNums; + }; +} + +#endif + diff --git a/t_eventbase.cpp b/t_eventbase.cpp new file mode 100644 index 0000000..2c7807d --- /dev/null +++ b/t_eventbase.cpp @@ -0,0 +1,19 @@ +#include "t_eventbase.hpp" +#include +#include +namespace T_TCP +{ + ConnBase::ConnBase(int iFd):m_iFd(iFd) + { + } + + // + ConnBase::~ConnBase() + { + event_del(&m_stREvent); + event_del(&m_stWEvent); + std::cout << "close this conn, fd: " << m_iFd << std::endl; + ::close(m_iFd); + } + // +} diff --git a/t_eventbase.hpp b/t_eventbase.hpp new file mode 100644 index 0000000..dc3fc5f --- /dev/null +++ b/t_eventbase.hpp @@ -0,0 +1,38 @@ +/** + * @file: t_eventbase.hpp + * @brief: + * @author: wusheng Hu + * @version: + * @date: 2018-04-23 + */ + +#ifndef _t_eventbase_hpp_ +#define _t_eventbase_hpp_ + +#include +#include +#include + +#include +#include + +namespace T_TCP +{ + class ConnBase + { + public: + ConnBase(int iFd); + virtual ~ConnBase(); + // + virtual bool AddEvent(struct event_base *pEvBase, int iEvent, void *arg) = 0; + virtual bool DelEvent(struct event_base *pEvBase, int iEvent, void *arg) = 0; + // + protected: + int m_iFd; + // + struct event m_stREvent; + struct event m_stWEvent; + }; +} + +#endif diff --git a/t_eventconn.cpp b/t_eventconn.cpp new file mode 100644 index 0000000..0c36b79 --- /dev/null +++ b/t_eventconn.cpp @@ -0,0 +1,179 @@ +#include +#include "t_eventconn.hpp" +#include "t_worktask.hpp" +#include "t_proto.hpp" +#include + +#include + +namespace T_TCP +{ + const int AcceptConn::CNT_STATIC_BUF_MAX_LEN; + // + AcceptConn::AcceptConn(int ifd, void* pData):ConnBase(ifd), m_pData((WorkerTask*)pData), m_pCodec(NULL), + m_pRecvBuf(NULL), m_iCurrRecvOffSet(0), + m_pSendBuf(NULL), m_iCurrSendOffSet(0) + { + m_pRecvBuf = new char[AcceptConn::CNT_STATIC_BUF_MAX_LEN]; + m_pSendBuf = new char[AcceptConn::CNT_STATIC_BUF_MAX_LEN]; + m_pCodec = new BusiCodec(1); + } + + AcceptConn::~AcceptConn() + { + delete [] m_pRecvBuf; + delete [] m_pSendBuf; + delete m_pCodec; + m_iCurrRecvOffSet = 0; + m_iCurrSendOffSet = 0; + } + + bool AcceptConn::AddEvent(struct event_base *pEvBase, int iEvent, void *arg) + { + if (pEvBase == NULL) + { + return false; + } + + if (EV_READ & iEvent) + { + event_assign(&m_stREvent, pEvBase, m_iFd, iEvent, ReadCallBack, this); + event_add(&m_stREvent, NULL); + } + else + { + event_assign(&m_stWEvent, pEvBase, m_iFd, iEvent, WriteCallBack, this); + event_add(&m_stWEvent, NULL); + } + return true; + } + + void AcceptConn::ReadCallBack(int iFd, short sEvent, void *pData) + { + if (iFd <= 0) + { + return ; + } + + AcceptConn* pConn = (AcceptConn*)pData; + if (pConn == NULL) + { + return ; + } + pConn->DoRead(); + } + // + bool AcceptConn::DoRead() + { + std::cout << "on fd: " << m_iFd << " , begin to recv data" << std::endl; + int iRet = ::recv(m_iFd, m_pRecvBuf + m_iCurrRecvOffSet, CNT_STATIC_BUF_MAX_LEN - m_iCurrRecvOffSet, 0); + if (iRet < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + std::cout << "recv err from client " << std::endl; + m_iCurrRecvOffSet = 0; + m_iCurrSendOffSet = 0; + m_pData->DeleteAcceptConn(m_iFd); + return false; + } + return true; + } + + if (iRet == 0) + { + std::cout << "remote client closet socket fd: " << m_iFd << std::endl; + //clear recv buf data; + m_iCurrRecvOffSet = 0; + m_iCurrSendOffSet = 0; + m_pData->DeleteAcceptConn(m_iFd); + return true; + } + // + m_iCurrRecvOffSet += iRet; + + char msgBuf[1024]; + memset(msgBuf, 0, sizeof(msgBuf)); + int iMsgLen = 0; + enum CODERET ret = m_pCodec->DeCode(m_pRecvBuf, m_iCurrRecvOffSet, msgBuf, &iMsgLen); + if (ret == RET_RECV_NOT_COMPLETE) + { + return true; + } + if (ret == RET_RECV_OK) + { + std::string printMsg; + printMsg.assign(msgBuf, iMsgLen - sizeof(int)); + std::cout << "recv msg data: " << printMsg << std::endl; + ::memmove(m_pRecvBuf, m_pRecvBuf + iMsgLen, m_iCurrRecvOffSet - iMsgLen ); + m_iCurrRecvOffSet -= iMsgLen; + + //next give response to client, then close. + struct event_base *pEvBase = event_get_base(&m_stREvent); + char sSendBuf[1024]; + memset(sSendBuf,0,sizeof(sSendBuf)); + snprintf(sSendBuf,sizeof(sSendBuf), "client: %s, srv recv: this is response", printMsg.c_str()); + + ret = m_pCodec->EnCode(sSendBuf, strlen(sSendBuf) + 1, m_pSendBuf, &m_iCurrSendOffSet); + if (ret != RET_RECV_OK) + { + std::cout << "encode send buf data fail" << std::endl; + return false; + } + AddEvent(pEvBase, EV_WRITE, this); + } + // + return true; + } + + void AcceptConn::WriteCallBack(int iFd, short sEvent, void *pData) + { + if (iFd <= 0) + { + return ; + } + AcceptConn* pConn = (AcceptConn*)pData; + if (pConn == NULL) + { + return ; + } + pConn->DoWrite(); + } + + bool AcceptConn::DoWrite() + { + if (m_iCurrSendOffSet > 0 && m_pSendBuf) + { + int iRet = ::write(m_iFd, m_pSendBuf, m_iCurrSendOffSet); + if (iRet < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + std::cout << "write data fail in fd: " << m_iFd << std::endl; + m_pData->DeleteAcceptConn(m_iFd); + return false; + } + return true; + } + else + { + ::memmove(m_pSendBuf, m_pSendBuf + iRet, m_iCurrSendOffSet - iRet); + m_iCurrSendOffSet -= iRet; + } + + if (m_iCurrSendOffSet > 0) + { + struct event_base *pEvBase = event_get_base(&m_stWEvent); + std::cout << "this is other data buf need to send, so add write event again." << std::endl; + AddEvent(pEvBase, EV_WRITE, this); + } + return true; + } + return false; + } + + bool AcceptConn::DelEvent(struct event_base *pEvBase, int iEvent, void *arg) + { + return true; + } +} diff --git a/t_eventconn.hpp b/t_eventconn.hpp new file mode 100644 index 0000000..096e682 --- /dev/null +++ b/t_eventconn.hpp @@ -0,0 +1,43 @@ +/** + * @file: t_eventconn.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x00001 + * @date: 2018-04-24 + */ + +#ifndef _t_eventconn_hpp_ +#define _t_eventconn_hpp_ + +#include "t_eventbase.hpp" + +namespace T_TCP +{ + class BusiCodec; + class WorkerTask; + class AcceptConn: public ConnBase + { + public: + AcceptConn(int ifd, void *pData); + virtual ~AcceptConn(); + virtual bool AddEvent(struct event_base *pEvBase, int iEvent, void *arg); + virtual bool DelEvent(struct event_base *pEvBase, int iEvent, void *arg); + private: + static void ReadCallBack(int iFd, short sEvent, void *pData); + static void WriteCallBack(int iFd, short sEvent, void *pData); + + bool DoRead(); + bool DoWrite(); + private: + WorkerTask* m_pData; + BusiCodec* m_pCodec; + private: + char* m_pRecvBuf; //接收缓冲区 + int m_iCurrRecvOffSet; + // + char* m_pSendBuf; //发送缓冲区 + int m_iCurrSendOffSet; + static const int CNT_STATIC_BUF_MAX_LEN = 1024; + }; +} +#endif diff --git a/t_eventlisten.cpp b/t_eventlisten.cpp new file mode 100644 index 0000000..2b8fb85 --- /dev/null +++ b/t_eventlisten.cpp @@ -0,0 +1,77 @@ +#include "t_eventlisten.hpp" +#include "t_tcpsrv.hpp" + +namespace T_TCP +{ + ListenConn::ListenConn(int ifd, void* pData):ConnBase(ifd), m_pData((TcpSrv*)pData) + { + } + ListenConn::~ListenConn() + { + } + + bool ListenConn::AddEvent(struct event_base *pEvBase, int iEvent, void *arg) + { + if (pEvBase == NULL) + { + return false; + } + + if (EV_READ & iEvent) + { + if ( 0 != event_assign(&m_stREvent, pEvBase, m_iFd, iEvent, ReadCallBack, this)) + { + std::cout << "event_assign() fail for read in listenconn" << std::endl; + return false; + } + if ( 0 != event_add(&m_stREvent, NULL) ) + { + std::cout << "event_add() fail for read in listenconn" << std::endl; + return false; + } + std::cout << "add event succ in listenconn (2)" << std::endl; + return true; + } + else if (iEvent & EV_WRITE) + { + event_assign(&m_stWEvent, pEvBase, m_iFd, iEvent, WriteCallBack, this); + event_add(&m_stWEvent, NULL); + return true; + } + else + { + + } + + return false; + } + + void ListenConn::WriteCallBack(int iFd, short sEvent, void *pData) + { + //listen conn need not to write event. + } + + void ListenConn::ReadCallBack(int iFd, short sEvent, void *pData) + { + ListenConn* pConn = (ListenConn*) pData; + if (pConn == NULL) + { + return; + } + std::cout << "listen conn recv read event, client new conn" << std::endl; + pConn->DoRead(); + } + + bool ListenConn::DoRead() + { + TcpSrv* tcpSrv = (TcpSrv*) m_pData; + return tcpSrv->Accept(); + } + + bool ListenConn::DelEvent(struct event_base *pEvBase, int iEvent, void *arg) + { + std::cout << "call default op in DelEvent" << std::endl; + return true; + } + +} diff --git a/t_eventlisten.hpp b/t_eventlisten.hpp new file mode 100644 index 0000000..7faa5e2 --- /dev/null +++ b/t_eventlisten.hpp @@ -0,0 +1,34 @@ +/** + * @file: t_event.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-23 + */ +#ifndef _t_event_hpp_ +#define _t_event_hpp_ + +#include "t_eventbase.hpp" + +namespace T_TCP +{ + class TcpSrv; + class ListenConn: public ConnBase + { + public: + ListenConn(int ifd, void* pData); + virtual ~ListenConn(); + virtual bool AddEvent(struct event_base *pEvBase, int iEvent, void *arg); + //TODO: + virtual bool DelEvent(struct event_base *pEvBase, int iEvent, void *arg); + private: + static void ReadCallBack(int iFd, short sEvent, void *pData); + static void WriteCallBack(int iFd, short sEvent, void *pData); + bool DoRead(); + TcpSrv* m_pData; + }; +} + +#endif + + diff --git a/t_eventpipe.cpp b/t_eventpipe.cpp new file mode 100644 index 0000000..a9069d1 --- /dev/null +++ b/t_eventpipe.cpp @@ -0,0 +1,69 @@ +#include "t_eventpipe.hpp" +#include "t_worktask.hpp" + +namespace T_TCP +{ + PipeConn::PipeConn(int ifd, void* pData): ConnBase(ifd), m_pData(pData) + { + } + // + PipeConn::~PipeConn() + { + } + + bool PipeConn::AddEvent(struct event_base *pEvBase, int iEvent, void *arg) + { + if (pEvBase == NULL) + { + return false; + } + if (EV_READ & iEvent) + { + event_assign(&m_stREvent, pEvBase, m_iFd, iEvent, ReadCallBack, this); + event_add(&m_stREvent, NULL); + } + else if (iEvent & EV_WRITE) + { + event_assign(&m_stWEvent, pEvBase, m_iFd, iEvent, WriteCallBack, this); + event_add(&m_stWEvent, NULL); + } + else + { + + } + return true; + } + + void PipeConn::WriteCallBack(int iFd, short sEvent, void *pData) + { + } + + void PipeConn::ReadCallBack(int iFd, short sEvent, void *pData) + { + char buf[1]; + if (::read(iFd, buf, 1) != 1) + { + std::cout << "read buf from pipe fail" << std::endl; + return ; + } + PipeConn* pConn = (PipeConn*) pData; + if (pConn == NULL) + { + std::cout << "input param is empty" << std::endl; + return; + } + pConn->DoRead(buf, sizeof(buf)); + } + + void PipeConn::DoRead(char *pData, int iLen) + { + WorkerTask* pTask = (WorkerTask*)m_pData; + pTask->NotifyRecvConn(pData, iLen); + } + + //TODO: + bool PipeConn::DelEvent(struct event_base *pEvBase, int iEvent, void *arg) + { + return true; + } +} diff --git a/t_eventpipe.hpp b/t_eventpipe.hpp new file mode 100644 index 0000000..2bb1416 --- /dev/null +++ b/t_eventpipe.hpp @@ -0,0 +1,32 @@ +/** + * @file: t_eventpipe.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-23 + */ + +#ifndef _t_eventpipe_hpp_ +#define _t_eventpipe_hpp_ + +#include "t_eventbase.hpp" + +namespace T_TCP +{ + class PipeConn: public ConnBase + { + public: + PipeConn(int ifd, void* pData); + virtual ~PipeConn(); + bool AddEvent(struct event_base *pEvBase, int iEvent, void *arg); + //TODO: + bool DelEvent(struct event_base *pEvBase, int iEvent, void *arg); + private: + static void ReadCallBack(int iFd, short sEvent, void *pData); + static void WriteCallBack(int iFd, short sEvent, void *pData); + void DoRead(char *pData, int iLen); + void *m_pData; + }; +} +#endif + diff --git a/t_main.cpp b/t_main.cpp new file mode 100644 index 0000000..7e2c6a8 --- /dev/null +++ b/t_main.cpp @@ -0,0 +1,35 @@ +#include +#include + +#include "t_tcpsrv.hpp" + +using namespace std; +using namespace T_TCP; + +int main(int argc, char **argv) +{ + if (argc < 4) + { + std::cout << argv[0] << " ip port thread_num" << std::endl; + return 0; + } + string sIp(argv[1]); + int iPort = ::atoi(argv[2]); + int iRunThreadNums = ::atoi(argv[3]); + if (iRunThreadNums <= 0) + { + iRunThreadNums = 3; + } + + TcpSrv tcpSrv(sIp, iPort, iRunThreadNums); + if (tcpSrv.Run() == false) + { + std::cout << "run tcp routine fail" << std::endl; + } + else + { + std::cout << "tcp main routine exit " << std::endl; + } + + return 0; +} diff --git a/t_pools.hpp b/t_pools.hpp new file mode 100644 index 0000000..19966a0 --- /dev/null +++ b/t_pools.hpp @@ -0,0 +1,160 @@ +/** + * @file: t_pools.hpp + * @brief: 线程池容器实现文件. + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-22 + */ + +#ifndef __T_THREAD_POOLS_HPP__ +#define __T_THREAD_POOLS_HPP__ + +#include +#include +#include + +namespace T_TCP +{ + + template + class PthreadPools + { + public: + PthreadPools(const int iPoolSize); + virtual ~PthreadPools(); + + /** + * @brief: StartAllThreads + * 启动线程池中所有线程.线程池 使用方调用. + * @return + */ + bool StartAllThreads(); + + T* GetIndexThread(const int iIndex); + /** + * @brief: AllocateThread + * 从线程池中分配一个可用,已经被启动的线程. + * @return 返回线程的地址. + */ + T* AllocateThread(); + private: + + /** + * @brief: WaitThreadRegiste + * 等待线程池所有线程都创建完成. + */ + void WaitThreadRegiste(); + + /** + * @brief: DeleteThreads + * 等待线程池中所有线程都退出。 + * 并删除线程对象资源 + */ + void DeleteThreads(); + pthread_mutex_t m_InitLock; + pthread_cond_t m_InitCond; + int m_iReadyThreadNums; + + std::vector m_vPoolList; + int m_iPoolSize; + + pthread_mutex_t m_AllocLock; + int m_iCurThreadIndex; + }; + + //////////////////////////////////////////// + //////////////////////////////////////////// + + ///implement for pthreadpools + template + PthreadPools::PthreadPools(const int iPoolSize): m_iReadyThreadNums(0), m_iPoolSize (iPoolSize), + m_iCurThreadIndex(0) + { + pthread_mutex_init(&m_InitLock, NULL); + pthread_cond_init(&m_InitCond, NULL); + + pthread_mutex_init(&m_AllocLock, NULL); + } + + template + PthreadPools::~PthreadPools () + { + DeleteThreads(); + } + + template + bool PthreadPools::StartAllThreads() + { + if (m_iPoolSize <= 0) + { + return false; + } + + for (int i = 0; i < m_iPoolSize; ++i) + { + T* pThread = new T(&m_InitLock, &m_InitCond, &m_iReadyThreadNums); + m_vPoolList.push_back(pThread); + } + + for (int i = 0; i < m_iPoolSize; ++i) + { + m_vPoolList.at(i)->Start(); + } + + WaitThreadRegiste(); + std::cout << "all threads start done ..." << std::endl; + return true; + } + + template + void PthreadPools::WaitThreadRegiste() + { + pthread_mutex_lock(&m_InitLock); + while( m_iReadyThreadNums < m_iPoolSize) + { + pthread_cond_wait(&m_InitCond, &m_InitLock); + } + pthread_mutex_unlock(&m_InitLock); + } + + template + void PthreadPools::DeleteThreads() + { + if (m_iPoolSize <=0) + { + return ; + } + + for (int i = 0; i < m_iPoolSize; ++i) + { + m_vPoolList.at(i)->JoinWork(); + delete m_vPoolList[i]; + } + m_vPoolList.clear(); + } + + template + T* PthreadPools::GetIndexThread(const int iIndex) + { + if (iIndex + 1 > m_iPoolSize) + { + return NULL; + } + if (iIndex < 0) + { + return NULL; + } + return m_vPoolList.at(iIndex);; + } + + template + T* PthreadPools::AllocateThread() + { + int iIndex = (m_iCurThreadIndex++) % m_iPoolSize; + + return m_vPoolList.at(iIndex); + } + +} + +#endif diff --git a/t_proto.cpp b/t_proto.cpp new file mode 100644 index 0000000..e61b428 --- /dev/null +++ b/t_proto.cpp @@ -0,0 +1,54 @@ +#include "t_proto.hpp" +#include +#include + +namespace T_TCP +{ + + // proto format: | 4 type head | payload | + BusiCodec::BusiCodec(int iType) :m_iCodeType(iType) + { + } + + BusiCodec::~BusiCodec() + { + } + + enum CODERET BusiCodec::DeCode(const char* pDstBuf, int iSrcLen, char* pSrcPayLoad, int *piPayLoadLen) + { + if (iSrcLen < sizeof(int)) + { + *piPayLoadLen = 0; + return RET_RECV_NOT_COMPLETE; + } + + int iHeadBufData = ::ntohl( *((int*)pDstBuf) ); + if (iSrcLen >= iHeadBufData + sizeof(int)) + { + ::memcpy(pSrcPayLoad, pDstBuf + sizeof(int), iHeadBufData); + *piPayLoadLen = iHeadBufData + sizeof(int); + return RET_RECV_OK; + } + return RET_RECV_NOT_COMPLETE; + } + + enum CODERET BusiCodec::EnCode(const char* pSrcBuf, int iSrcLen, char *pDstMsg, int* piDstMsgLen) + { + if (pSrcBuf == NULL || pDstMsg == NULL || piDstMsgLen == NULL) + { + return RET_RECV_ERR; + } + char *pPreCodeAddr = pDstMsg + (*piDstMsgLen); + + char buf[1024]; + ::memset(buf, 0, sizeof(buf)); + //set head content; + *((int*)buf) = ::htonl(iSrcLen); + //set body content + memcpy(buf + sizeof(int), pSrcBuf, iSrcLen); + //set conn send buf content. + memcpy(pPreCodeAddr, buf, iSrcLen + sizeof(int)); + (*piDstMsgLen) += iSrcLen + sizeof(int); + return RET_RECV_OK; + } +} diff --git a/t_proto.hpp b/t_proto.hpp new file mode 100644 index 0000000..5748f1f --- /dev/null +++ b/t_proto.hpp @@ -0,0 +1,46 @@ +/** + * @file: t_proto.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-24 + */ + +#ifndef _t_proto_hpp_ +#define _t_proto_hpp_ + +namespace T_TCP +{ + // proto format: | 4 type head | payload | + enum CODERET + { + RET_RECV_OK = 0, + RET_RECV_NOT_COMPLETE = 1, + RET_RECV_ERR = 2, + }; + class BusiCodec + { + public: + BusiCodec(int iType); + virtual ~BusiCodec(); + enum CODERET DeCode(const char* pDstBuf, int iSrcLen, char* pSrcPayLoad, int *piPayLoadLen); + + /** + * @brief: EnCode + * 将 长度为iSrcLen的pSrcBuf编码到 空间 pDstMsg, 最后总的编码空间长度存放 + * 在 piDstMsgLen. + * @param pSrcBuf + * @param iSrcLen + * @param pDstMsg + * @param piDstMsgLen + * 既是入参,也是出参,入参作为编码空间的初始偏移量,出参是编码完后,最 + * 终的编码空间的偏移量 + * @return + */ + enum CODERET EnCode(const char* pSrcBuf, int iSrcLen, char *pDstMsg, int* piDstMsgLen); + private: + int m_iCodeType; + }; +} +#endif + diff --git a/t_queue.hpp b/t_queue.hpp new file mode 100644 index 0000000..3efe5ae --- /dev/null +++ b/t_queue.hpp @@ -0,0 +1,100 @@ +/** + * @file: t_queue.hpp + * @brief: 队列的总元素个数(空闲队列和正在工作) 是一直增大. + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-23 + */ +#ifndef _t_queue_hpp_ +#define _t_queue_hpp_ + +#include +#include +#include + +namespace T_TCP +{ + template + class QueueSafe + { + public: + QueueSafe(int iInitCapacity = 0); + virtual ~QueueSafe(); + + bool PushFront(T* pItem); + T* PopBack(); + + private: + std::queue m_Queue; + pthread_mutex_t m_mutxQueue; + int m_iCurSize; + }; + + //// + template + QueueSafe::QueueSafe(int iInitCapacity): m_iCurSize(iInitCapacity) + { + pthread_mutex_init(&m_mutxQueue, NULL); + + pthread_mutex_lock(&m_mutxQueue); + for (int i = 0; i < m_iCurSize; ++i) + { + T* pNewItem = new T(); + m_Queue.push(pNewItem); + } + pthread_mutex_unlock(&m_mutxQueue); + } + + template + QueueSafe::~QueueSafe() + { + pthread_mutex_lock(&m_mutxQueue); + while (m_iCurSize > 0) + { + T* pItem = m_Queue.front(); + delete pItem; + m_iCurSize--; + m_Queue.pop(); + } + pthread_mutex_unlock(&m_mutxQueue); + } + + template + bool QueueSafe::PushFront(T* pItem) + { + pthread_mutex_lock(&m_mutxQueue); + m_Queue.push(pItem); + m_iCurSize++; + pthread_mutex_unlock(&m_mutxQueue); + return true; + } + + template + T* QueueSafe::PopBack() + { + pthread_mutex_lock(&m_mutxQueue); + if (m_iCurSize == 0) + { + T* pItem = new T(); + pthread_mutex_unlock(&m_mutxQueue); + return pItem; + } + + T* pItem = m_Queue.front(); + m_Queue.pop(); + m_iCurSize--; + pthread_mutex_unlock(&m_mutxQueue); + return pItem; + } + + + /** define item of queue ****/ + struct QueueItem + { + QueueItem():iConnFd(-1) { } + int iConnFd; + }; +} + +#endif + diff --git a/t_socket.cpp b/t_socket.cpp new file mode 100644 index 0000000..8205cf3 --- /dev/null +++ b/t_socket.cpp @@ -0,0 +1,213 @@ +#include "t_socket.hpp" + +namespace T_TCP +{ + Sock::Sock(const std::string& sIp, int iPort): m_sSrvIp(sIp), m_iSrvPort(iPort), m_iFd(-1), m_err(-1) + { + CreateSock(); + } + + Sock::~Sock() + { + if (m_iFd > 0) + { + close(m_iFd); + m_iFd = -1; + } + } + + int Sock::CreateSock() + { + m_iFd = ::socket( AF_INET, SOCK_STREAM, 0); + if (m_iFd <= 0) + { + SetErr(SOCK_ERR); + std::cout << "create local socket fd fail" << std::endl; + return -1; + } + + int iOptFlags; + if ((iOptFlags = ::fcntl(m_iFd,F_GETFL,0)) < 0 + || ::fcntl(m_iFd, F_SETFL, iOptFlags | O_NONBLOCK) < 0) + { + perror("setting O_NONBLOCK"); + SetErr(SOCK_ERR); + ::close(m_iFd); + m_iFd = -1; + return -2; + } + return m_iFd; + } + + bool Sock::SetSockOpt() + { + if (m_iFd <= 0) + { + return false; + } + // + int iOptFlag = 1; + int iRet = setsockopt(m_iFd, SOL_SOCKET, SO_REUSEADDR, (void*)& iOptFlag, sizeof(iOptFlag)); + if (iRet < 0) + { + SetErr(SOCK_ERR); + perror("setsockopt SO_REUSEADDR fail"); + return false; + } + + iRet = ::setsockopt(m_iFd, SOL_SOCKET, SO_KEEPALIVE,(void*)&iOptFlag,sizeof(iOptFlag)); + if (iRet < 0) + { + SetErr(SOCK_ERR); + perror("setsockopt SO_KEEPALIVE fail"); + return false; + } + + iRet = ::setsockopt(m_iFd, IPPROTO_TCP, TCP_NODELAY, (void*)&iOptFlag,sizeof(iOptFlag)); + if (iRet < 0) + { + SetErr(SOCK_ERR); + perror("setsockopt TCP_NODELAY fail"); + return false; + } + return true; + } + + int Sock::GerErr() + { + return m_err; + } + // + void Sock::SetErr(int err) + { + m_err = err; + } + // + int Sock::GetSockFd() + { + return m_iFd; + } + // + int Sock::Send(const char* pSendBuf, int iLen) + { + if (pSendBuf == NULL || iLen < 0 || m_iFd <= 0) + { + return -1; + } + int iRet = ::send(m_iFd, pSendBuf, iLen, 0); + if (iRet < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + SetErr(SOCK_AGAIN); + return -1; + } + + SetErr(SOCK_ERR); + perror("send err"); + return -1; + } + return iRet; + } + + int Sock::Recv(char* pRcvBuf, int iMaxLen) + { + if (pRcvBuf == NULL || m_iFd <= 0) + { + return -1; + } + int iRet = ::recv(m_iFd, pRcvBuf, iMaxLen, 0); + if (iRet < 0) + { + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + SetErr(SOCK_AGAIN); + return -1; + } + SetErr(SOCK_ERR); + perror("recv fail"); + return -2; + } + return iRet; + } + + bool Sock::Listen() + { + if (m_iFd <= 0) + { + return false; + } + + SetSockOpt(); + struct sockaddr_in addrListen; + memset(&addrListen, 0, sizeof(struct sockaddr_in)); + + addrListen.sin_family = AF_INET; + addrListen.sin_addr.s_addr = inet_addr(m_sSrvIp.c_str()); + addrListen.sin_port = ::htons(m_iSrvPort); + int addrLen = sizeof(addrListen); + + int iRet = ::bind(m_iFd, (struct sockaddr *)&addrListen, sizeof(addrListen)); + if (iRet < 0) + { + SetErr(SOCK_ERR); + perror("bind socket fail"); + return false; + } + iRet = ::listen(m_iFd, 1024); + if (iRet < 0) + { + SetErr(SOCK_ERR); + perror("listen socket fail"); + return false; + } + return true; + } + // + bool Sock::Connect() + { + if (m_iFd <= 0) + { + return false; + } + + struct sockaddr_in addrRemote; + memset(&addrRemote, 0, sizeof(addrRemote)); + addrRemote.sin_family = AF_INET; + addrRemote.sin_port = ::htons(m_iSrvPort); + inet_pton(AF_INET, m_sSrvIp.c_str(), &addrRemote.sin_addr); + + int iRet = ::connect(m_iFd, (struct sockaddr*)&addrRemote, sizeof(addrRemote)); + if (iRet < 0) + { + if (errno == EINPROGRESS) + { + std::cout << "connect is connecting....“" << std::endl; + return true; + } + perror("connect err"); + SetErr(SOCK_ERR); + return false; + } + return true; + } + + int Sock::SockAccept(struct sockaddr* pClientAddr) + { + if (pClientAddr == NULL) + { + return -1; + } + + socklen_t addrLen = sizeof(*pClientAddr); + int iRet = ::accept(m_iFd, pClientAddr, &addrLen); + if (iRet < 0) + { + SetErr(SOCK_ERR); + perror("accept fail"); + return iRet; + } + return iRet; + } + // +} diff --git a/t_socket.hpp b/t_socket.hpp new file mode 100644 index 0000000..3f75f17 --- /dev/null +++ b/t_socket.hpp @@ -0,0 +1,57 @@ +/** + * @file: t_socket.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-23 + */ + +#ifndef _t_socket_hpp_ +#define _t_socket_hpp_ +// +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +// +#include +#include +// +namespace T_TCP +{ + #define SOCK_OK (0) + #define SOCK_AGAIN (0) + #define SOCK_ERR errno + + class Sock + { + public: + Sock(const std::string &sIp, int iPort); + virtual ~Sock(); + int GetSockFd(); + bool Listen(); + bool Connect(); + + int SockAccept(struct sockaddr* pClientAddr); + + int Send(const char* pSendBuf, int iLen); + int Recv(char* pRcvBuf, int iMaxLen); + int GerErr(); + private: + bool SetSockOpt(); + int CreateSock(); + void SetErr(int err); + private: + std::string m_sSrvIp; + int m_iSrvPort; + int m_iFd; + int m_err; + }; +} +#endif diff --git a/t_tcpsrv.cpp b/t_tcpsrv.cpp new file mode 100644 index 0000000..d826e77 --- /dev/null +++ b/t_tcpsrv.cpp @@ -0,0 +1,161 @@ +#include "t_tcpsrv.hpp" +#include "t_eventlisten.hpp" + +namespace T_TCP +{ + TcpSrv::TcpSrv(const std::string& sIp, int iPort, int threadNums): m_bInit(false), p_mListenSock(NULL), + m_sLocalIp(sIp), m_iLocalPort(iPort), m_pEventBase(NULL), m_pThreadPool(NULL), + m_iThreadPoolNums(threadNums), p_mListenConn(NULL) + + { + m_bInit = Init(); + } + + // + TcpSrv::~TcpSrv() + { + m_bInit = false; + if (p_mListenSock) + { + delete p_mListenSock; + p_mListenSock = NULL; + } + + if (m_pThreadPool) + { + delete m_pThreadPool; + m_pThreadPool = NULL; + } + + if (p_mListenConn) + { + delete p_mListenConn; + p_mListenConn = NULL; + } + + if (m_pAcceptConnListFree) + { + delete m_pAcceptConnListFree; + m_pAcceptConnListFree = NULL; + } + // + if (m_pEventBase) + { + event_base_free(m_pEventBase); + m_pEventBase = NULL; + } + } + + // + bool TcpSrv::Init() + { + if (p_mListenSock == NULL) + { + p_mListenSock = new Sock(m_sLocalIp, m_iLocalPort); + } + + bool bRet = p_mListenSock->Listen(); + if (bRet == false) + { + std::cout << "Listen sock fail, err no: " << p_mListenSock->GerErr() << std::endl; + return false; + } + + struct event_config *ev_config = event_config_new(); + event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); + m_pEventBase = event_base_new_with_config(ev_config); + event_config_free(ev_config); + std::cout << "tcp srv event base init, addr: " << m_pEventBase << std::endl; + + m_pAcceptConnListFree = new QueueSafe(100); + std::cout << "create 100 len free-queue for accept_conn_item" << std::endl; + // + std::cout << "tcp srv init succ" << std::endl; + return true; + } + + bool TcpSrv::RegisteAcceptConn() + { + p_mListenConn= new ListenConn(p_mListenSock->GetSockFd(), (void*)this); + if (p_mListenConn == NULL) + { + return false; + } + + bool bRet = p_mListenConn->AddEvent(m_pEventBase, EV_READ|EV_PERSIST, this); + if (false == bRet) + { + delete p_mListenConn; + p_mListenConn = NULL; + std::cout << "add read event in listen conn fail(1)" << std::endl; + return false; + } + std::cout << "add read event in listen conn succ" << std::endl; + return true; + } + + bool TcpSrv::Run() + { + if (m_bInit == false) + { + return false; + } + if (RegisteAcceptConn() == false) + { + std::cout << "register accept conn to event base fail" << std::endl; + return false; + } + + //init thread pools + m_pThreadPool = new PthreadPools(m_iThreadPoolNums); + if ( false == m_pThreadPool->StartAllThreads() ) + { + std::cout << "start threads nums fail" << std::endl; + return false; + } + + // + int iRet = event_base_loop(m_pEventBase, 0); + if (iRet != 0) + { + std::cout << "event loop exit. " << std::endl; + return false; + } + return true; + } + + bool TcpSrv::Accept() + { + struct sockaddr_in clientAddr; + int iAcceptFd = p_mListenSock->SockAccept((struct sockaddr*)&clientAddr); + if (iAcceptFd < 0) + { + std::cout << "accept new connect fail" << std::endl; + return false; + } + std::cout << "accept new client conn, new fd: " << iAcceptFd << std::endl; + //从线程池中分配一个事件线程,用于独立接收报文和发送报文. + //so,需要有个线程池. + WorkerTask* pFreeTaskWork = m_pThreadPool->AllocateThread(); + if (pFreeTaskWork == NULL) + { + std::cout << " get free work task from thread pools fail" << std::endl; + return false; + } + + // + QueueItem* pItem = m_pAcceptConnListFree->PopBack(); + pItem->iConnFd = iAcceptFd; + m_AcceptConnListWorking.PushFront(pItem); + + //拼接一个新连接数据队列,并向发送任务的pipe上发送新请求命令字. + if (false == pFreeTaskWork->NotifySendConn(&m_AcceptConnListWorking, m_pAcceptConnListFree)) + { + std::cout << "send notify new conn cmd fail " << std::endl; + return false; + } + std::cout << "send new conn cmd by pipe, new fd: " << iAcceptFd << std::endl; + return true; + } + +} diff --git a/t_tcpsrv.hpp b/t_tcpsrv.hpp new file mode 100644 index 0000000..84bbcf6 --- /dev/null +++ b/t_tcpsrv.hpp @@ -0,0 +1,52 @@ +/** + * @file: t_tcpsrv.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-23 + */ + +#ifndef _t_tcpsrv_hpp_ +#define _t_tcpsrv_hpp_ + +#include +#include "t_socket.hpp" +#include "t_worktask.hpp" +#include "t_queue.hpp" +#include "t_pools.hpp" +#include +#include +#include + +namespace T_TCP +{ + class ListenConn; + class TcpSrv + { + public: + TcpSrv(const std::string& sIp, int iPort,int ithreadNums); + virtual ~TcpSrv(); + bool Run(); + + bool Accept(); + private: + bool RegisteAcceptConn(); + bool Init(); + private: + bool m_bInit; + Sock* p_mListenSock; + + std::string m_sLocalIp; + int m_iLocalPort; + struct event_base* m_pEventBase; + //thread pools + PthreadPools* m_pThreadPool; + int m_iThreadPoolNums; + QueueSafe m_AcceptConnListWorking; + QueueSafe *m_pAcceptConnListFree; + ListenConn* p_mListenConn; //in future, it can been allocated by conn_pools + }; +} +#endif + + diff --git a/t_thread.cpp b/t_thread.cpp new file mode 100644 index 0000000..b4ee360 --- /dev/null +++ b/t_thread.cpp @@ -0,0 +1,70 @@ +#include "t_thread.hpp" + +namespace T_TCP +{ + PthreadBase::PthreadBase() :m_iPid(-1), m_bRun(false), + m_pInitLock(NULL), m_pInitCond(NULL), + m_pReadyThreadNums(NULL) + { + } + + PthreadBase::PthreadBase(pthread_mutex_t* pInitLock, pthread_cond_t* pInitCond, int* pInitNums) + :m_iPid(-1), m_bRun(false),m_pInitLock(pInitLock), m_pInitCond(pInitCond), + m_pReadyThreadNums(pInitNums) + { + } + + PthreadBase::~PthreadBase() + { + } + void PthreadBase::Start() + { + int iRet = pthread_create(&m_iPid, NULL, Entry, this); + if (iRet != 0) + { + return ; + } + } + + void PthreadBase::Stop() + { + m_bRun = false; + } + + void* PthreadBase::Entry(void* pData) + { + PthreadBase* pThis = (PthreadBase*)pData; + if (pThis == NULL) + { + return NULL; + } + + if (pThis->Init() < 0) + { + return NULL; + } + + pThis->SetRun(); + while(pThis->IsRun()) + { + pThis->main(); + } + } + + void PthreadBase::JoinWork() + { + pthread_join(m_iPid, NULL); + } + + void PthreadBase::RegistePthreadToPool() + { + if (m_pInitLock == NULL || m_pInitCond == NULL || m_pReadyThreadNums == NULL) + { + return ; + } + pthread_mutex_lock(m_pInitLock); + ++(*m_pReadyThreadNums); + pthread_cond_signal(m_pInitCond); + pthread_mutex_unlock(m_pInitLock); + } +} diff --git a/t_thread.hpp b/t_thread.hpp new file mode 100644 index 0000000..7cd5d18 --- /dev/null +++ b/t_thread.hpp @@ -0,0 +1,89 @@ +/** + * @file: t_threads.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-18 + */ +#ifndef _T_THREADS_HPP_ +#define _T_THREADS_HPP_ + +#ifdef __cplusplus + extern "C" { +#endif + +#include + +#ifdef __cplusplus + } +#endif + +#include + +namespace T_TCP +{ + + +/** +* @brief: +* 线程类的抽象,该类对外支持: 启动线程,停止线程,join线程,获取当前线程id等能力 +* 如果要使用线程完成具体工作,只需继承该类即可,实现方法main(), Init(). +*/ +class PthreadBase +{ + public: + PthreadBase(); + PthreadBase(pthread_mutex_t* pInitLock, pthread_cond_t* pInitCond, int* pInitNums); + virtual ~PthreadBase(); + void Start(); + void JoinWork(); + pthread_t GetThreadId() + { + return m_iPid; + } + protected: + void Stop(); + private: + static void* Entry(void* pData); + void SetRun() + { + m_bRun = true; + } + bool IsRun() + { + return m_bRun; + } + protected: + + /** + * @brief: main + * 线程实现的具体内容,该接口被子类实现.子类不需要手动调用. + * @return 0:succ, ~0:error + */ + virtual int main() = 0; + + /** + * @brief: Init + * 工作线程初始化,用于业务线程的一些初始化需求。如果线程子类需要初始化数据 + * 时,只需实现该接口就能完成数据的初始化工作。子类不需手动调用 + * @return 0:succ, ~0:error + */ + virtual int Init() = 0; + /** + * @brief: RegistePthreadToPool() + * 该接口为线程池场景设计. 调用该接口表明该线程任务初始化在此已经完成。 + * 该接口不需子类实现,也不直接被外部类调用,被派生类的main()手动调用. + */ + void RegistePthreadToPool(); + + pthread_t m_iPid; + bool m_bRun; + // add pthread pool init data struction, 经构造函数参数传入. + pthread_mutex_t* m_pInitLock; + pthread_cond_t* m_pInitCond; + int* m_pReadyThreadNums; +}; + +/////// +} +#endif diff --git a/t_worktask.cpp b/t_worktask.cpp new file mode 100644 index 0000000..a4bccf2 --- /dev/null +++ b/t_worktask.cpp @@ -0,0 +1,167 @@ +#include "t_worktask.hpp" +#include +#include "t_eventpipe.hpp" +#include "t_eventconn.hpp" + +namespace T_TCP +{ + WorkerTask::WorkerTask(): m_iNotifyRecvFd(-1), m_iNofitySendFd(-1), m_PthreadEventBase(NULL) + { + } + + WorkerTask:: WorkerTask(pthread_mutex_t* pInitLock, pthread_cond_t* pInitCond, int* pInitNums) + :PthreadBase(pInitLock, pInitCond, pInitNums), + m_iNotifyRecvFd(-1),m_iNofitySendFd(-1), m_PthreadEventBase(NULL), m_pConnItemList(NULL), + m_pConnItemListFree(NULL) + { + } + + WorkerTask::~WorkerTask() + { + event_base_free(m_PthreadEventBase); + m_PthreadEventBase = NULL; + if (m_iNotifyRecvFd > 0) + { + ::close(m_iNotifyRecvFd); m_iNotifyRecvFd = -1; + } + + if (m_iNofitySendFd > 0) + { + ::close(m_iNofitySendFd); m_iNofitySendFd = -1; + } + + for (std::map::iterator it = m_mpAcceptConn.begin(); it != m_mpAcceptConn.end(); ) + { + if (it->second) + { + delete it->second; + } + if (it->first > 0) + { + ::close(it->first); + } + m_mpAcceptConn.erase(it++); + } + } + + int WorkerTask::Init() + { + struct event_config *ev_config = event_config_new(); + event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK); + m_PthreadEventBase = event_base_new_with_config(ev_config); + event_config_free(ev_config); + std::cout << "event base init, addr: " << m_PthreadEventBase << std::endl; + if (m_PthreadEventBase == NULL) + { + m_bRun = false; + return -1; + } + + int fds[2]; + if (::pipe(fds)) + { + perror("can't not notify pipe"); + return -1; + } + // + m_iNotifyRecvFd = fds[0]; + m_iNofitySendFd = fds[1]; + + //need to delete + PipeConn* pNotifyConn = new PipeConn(m_iNotifyRecvFd, this); + AddRConn(pNotifyConn); + std::cout << "create pipe, notify send cmd fd: " << m_iNofitySendFd << ", notify recv cmd fd: " + << m_iNotifyRecvFd << std::endl; + return 0; + } + + void WorkerTask::AddRConn(ConnBase* pEvent) + { + pEvent->AddEvent(m_PthreadEventBase, EV_READ | EV_PERSIST, this); + } + + int WorkerTask::main() + { + RegistePthreadToPool(); + // + event_base_dispatch(m_PthreadEventBase); + std::cout << "thread exit " << std::endl; + m_bRun = false; + return 0; + } + + bool WorkerTask::NotifyRecvConn(char *pBuf, int iLen) + { + if (pBuf == NULL) + { + return false; + } + if (iLen < 1) + { + return false; + } + if (m_pConnItemList == NULL || m_pConnItemListFree == NULL) + { + return false; + } + + switch(pBuf[0]) + { + case 'c': + { + //从队列中取出一个新连接请求. + QueueItem* pItem = m_pConnItemList->PopBack(); + int iFd = 0; + if (pItem != NULL) + { + iFd = pItem->iConnFd; + //发起一个连接, 需要从连接池中分配一个空闲连接 + AcceptConn* pConn = new AcceptConn(iFd, this); + AddRConn(pConn); + std::cout << "pipe recv new conn cmd, new fd: " << iFd << std::endl; + + m_mpAcceptConn.insert(std::pair(iFd, pConn)); //when close iFd, delete pConn + m_pConnItemListFree->PushFront(pItem); + } + } + break; + + default: + break; + } + return true; + } + + void WorkerTask::DeleteAcceptConn(int fd) + { + if (fd <=0) + { + return ; + } + // + std::map::iterator it = m_mpAcceptConn.find(fd); + if (it != m_mpAcceptConn.end()) + { + delete it->second; + m_mpAcceptConn.erase(it); + } + } + + bool WorkerTask::NotifySendConn(QueueSafe* pItemList, QueueSafe* pItemListFree) + { + if (pItemList == NULL) + { + return false; + } + char buf[1]; + buf [0] = 'c'; + if (::write(m_iNofitySendFd, buf, 1 ) != 1) + { + perror("write cmd c on notifysendcmd pipe fail"); + return false; + } + m_pConnItemList = pItemList; + m_pConnItemListFree = pItemListFree; + return true; + } +} diff --git a/t_worktask.hpp b/t_worktask.hpp new file mode 100644 index 0000000..c2436bd --- /dev/null +++ b/t_worktask.hpp @@ -0,0 +1,88 @@ +/** + * @file: t_worktask.hpp + * @brief: + * @author: wusheng Hu + * @version: v0x0001 + * @date: 2018-04-23 + */ + +#ifndef _t_worktask_hpp_ +#define _t_worktask_hpp_ + +#include "t_queue.hpp" +#include "t_thread.hpp" +#include "t_eventbase.hpp" +#include + +#include +#include +#include + +#include +#include +#include + +namespace T_TCP +{ + class WorkerTask: public PthreadBase + { + public: + WorkerTask(); + WorkerTask(pthread_mutex_t* pInitLock, pthread_cond_t* pInitCond, int* pInitNums); + virtual ~WorkerTask(); + + /** + * @brief: AddRConn + * 把读数据的连接对象注册到主线程上.该连接对象已经提供读写事件到达时的回 + * 调接口。主线程只负责调度 + * @param pEvent + */ + void AddRConn(ConnBase* pEvent); + + /** + * @brief: NotifyRecvConn + * 从管道上接收命令字后,对命令字的解析和处理. 用于PipeConn对象接收命令事件 + * 响应的处理函数中。 + * @param pBuf + * 具体命令字空间 + * @param iLen + * 具体命令字的长度 + * + * @return + */ + bool NotifyRecvConn(char *pBuf, int iLen); + + /** + * @brief: NotifySendConn + * 向发送命令管道上命令字,并保存命令字列表队列 + * 该接口用于向任务线程的发送管道上发送命令字, 对外被[其他]非任务线程调用. + * @param pItemList + * 已经发送的命令字列表队列. + * + * @return + */ + bool NotifySendConn(QueueSafe* pItemList, QueueSafe* pItemListFree); + + /** + * @brief: DeleteAcceptConn + * 删除本任务线程上的已经连接的连接对象,关闭该连接socket. + * @param fd + */ + void DeleteAcceptConn(int fd); + protected: + virtual int main(); + virtual int Init(); + + private: + int m_iNotifyRecvFd; + int m_iNofitySendFd; + + struct event_base *m_PthreadEventBase; + QueueSafe* m_pConnItemList; + QueueSafe* m_pConnItemListFree; + // + std::map m_mpAcceptConn; //need not lock + }; +} + +#endif