Skip to content

Commit

Permalink
rename the variable name (OpenAtomFoundation#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
panlei-coder authored Oct 11, 2023
1 parent 264c3e1 commit 26121e0
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 31 deletions.
Empty file modified clear.sh
100644 → 100755
Empty file.
40 changes: 20 additions & 20 deletions src/io_thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ bool IOThreadPool::SetWorkerNum(size_t num) {
return false;
}

if (!loops_.empty()) {
ERROR("can only called once, not empty loops size: {}", loops_.size());
if (!worker_loops_.empty()) {
ERROR("can only called once, not empty loops size: {}", worker_loops_.size());
return false;
}

Expand All @@ -62,14 +62,14 @@ bool IOThreadPool::SetWorkerNum(size_t num) {
}

worker_num_.store(num);
workers_.reserve(num);
loops_.reserve(num);
worker_threads_.reserve(num);
worker_loops_.reserve(num);

return true;
}

bool IOThreadPool::Init(const char* ip, int port, NewTcpConnectionCallback cb) {
auto f = std::bind(&IOThreadPool::Next, this);
auto f = std::bind(&IOThreadPool::ChooseNextWorkerEventLoop, this);

base_.Init();
printf("base loop %s %p, g_baseLoop %p\n", base_.GetName().c_str(), &base_, base_.Self());
Expand All @@ -89,10 +89,10 @@ void IOThreadPool::Run(int ac, char* av[]) {
StartWorkers();
base_.Run();

for (auto& w : workers_) {
for (auto& w : worker_threads_) {
w.join();
}
workers_.clear();
worker_threads_.clear();

INFO("Process stopped, goodbye...");
}
Expand All @@ -101,8 +101,8 @@ void IOThreadPool::Exit() {
state_ = State::kStopped;

BaseLoop()->Stop();
for (size_t index = 0; index < loops_.size(); ++index) {
EventLoop* loop = loops_[index].get();
for (size_t index = 0; index < worker_loops_.size(); ++index) {
EventLoop* loop = worker_loops_[index].get();
loop->Stop();
}
}
Expand All @@ -111,12 +111,12 @@ bool IOThreadPool::IsExit() const { return state_ == State::kStopped; }

EventLoop* IOThreadPool::BaseLoop() { return &base_; }

EventLoop* IOThreadPool::Next() {
if (loops_.empty()) {
EventLoop* IOThreadPool::ChooseNextWorkerEventLoop() {
if (worker_loops_.empty()) {
return BaseLoop();
}

auto& loop = loops_[current_loop_++ % loops_.size()];
auto& loop = worker_loops_[current_worker_loop_++ % worker_loops_.size()];
return loop.get();
}

Expand All @@ -125,23 +125,23 @@ void IOThreadPool::StartWorkers() {
assert(state_ == State::kNone);

size_t index = 1;
while (loops_.size() < worker_num_) {
while (worker_loops_.size() < worker_num_) {
std::unique_ptr<EventLoop> loop(new EventLoop);
if (!name_.empty()) {
loop->SetName(name_ + "_" + std::to_string(index++));
printf("loop %p, name %s\n", loop.get(), loop->GetName().c_str());
}
loops_.push_back(std::move(loop));
worker_loops_.push_back(std::move(loop));
}

for (index = 0; index < loops_.size(); ++index) {
EventLoop* loop = loops_[index].get();
for (index = 0; index < worker_loops_.size(); ++index) {
EventLoop* loop = worker_loops_[index].get();
std::thread t([loop]() {
loop->Init();
loop->Run();
});
printf("thread %lu, thread loop %p, loop name %s \n", index, loop, loop->GetName().c_str());
workers_.push_back(std::move(t));
worker_threads_.push_back(std::move(t));
}

state_ = State::kStarted;
Expand All @@ -152,14 +152,14 @@ void IOThreadPool::SetName(const std::string& name) { name_ = name; }
IOThreadPool::IOThreadPool() : state_(State::kNone) { InitSignal(); }

bool IOThreadPool::Listen(const char* ip, int port, NewTcpConnectionCallback ccb) {
auto f = std::bind(&IOThreadPool::Next, this);
auto f = std::bind(&IOThreadPool::ChooseNextWorkerEventLoop, this);
auto loop = BaseLoop();
return loop->Execute([loop, ip, port, ccb, f]() { return loop->Listen(ip, port, std::move(ccb), f); }).get();
}

void IOThreadPool::Connect(const char* ip, int port, NewTcpConnectionCallback ccb, TcpConnectionFailCallback fcb, EventLoop* loop) {
if (!loop) {
loop = Next();
loop = ChooseNextWorkerEventLoop();
}

std::string ipstr(ip);
Expand All @@ -186,7 +186,7 @@ std::shared_ptr<HttpClient> IOThreadPool::ConnectHTTP(const char* ip, int port,
auto fcb = [client](EventLoop*, const char* ip, int port) { client->OnConnectFail(ip, port); };

if (!loop) {
loop = Next();
loop = ChooseNextWorkerEventLoop();
}
client->SetLoop(loop);
Connect(ip, port, std::move(ncb), std::move(fcb), loop);
Expand Down
8 changes: 4 additions & 4 deletions src/io_thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class IOThreadPool {
EventLoop* BaseLoop();

// choose a loop
EventLoop* Next();
EventLoop* ChooseNextWorkerEventLoop();

// set worker threads, each thread has a EventLoop object
bool SetWorkerNum(size_t n);
Expand Down Expand Up @@ -69,9 +69,9 @@ class IOThreadPool {
EventLoop base_;

std::atomic<size_t> worker_num_{0};
std::vector<std::thread> workers_;
std::vector<std::unique_ptr<EventLoop>> loops_;
mutable std::atomic<size_t> current_loop_{0};
std::vector<std::thread> worker_threads_;
std::vector<std::unique_ptr<EventLoop>> worker_loops_;
mutable std::atomic<size_t> current_worker_loop_{0};

enum class State {
kNone,
Expand Down
2 changes: 1 addition & 1 deletion src/net/tcp_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ void TcpListener::OnNewConnection(struct evconnlistener*, evutil_socket_t fd, st

// make new conn
auto loop = acceptor->SelectEventLoop();
// IOThreadPool::Instance().Next();
// IOThreadPool::Instance().ChooseNextWorkerEventLoop();
auto on_create = acceptor->on_new_conn_; // cpp11 doesn't support lambda capture initializers
auto create_conn = [loop, on_create, fd, ipstr, port]() {
auto conn(std::make_shared<TcpConnection>(loop));
Expand Down
2 changes: 1 addition & 1 deletion src/pikiwidb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ void PikiwiDB::OnNewConnection(pikiwidb::TcpConnection* obj) {
obj->SetMessageCallback(msg_cb);
obj->SetOnDisconnect([](pikiwidb::TcpConnection* obj) { INFO("disconnect from {}", obj->GetPeerIp()); });
obj->SetNodelay(true);
obj->SetEventLoopSelector([]() { return pikiwidb::IOThreadPool::Instance().Next(); });
obj->SetEventLoopSelector([]() { return pikiwidb::IOThreadPool::Instance().ChooseNextWorkerEventLoop(); });
}

bool PikiwiDB::Init() {
Expand Down
8 changes: 4 additions & 4 deletions src/std/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void ThreadPool::SetMaxIdleThread(unsigned int m) {
}

void ThreadPool::JoinAll() {
decltype(workers_) tmp;
decltype(worker_threads_) tmp;

{
std::unique_lock<std::mutex> guard(mutex_);
Expand All @@ -35,8 +35,8 @@ void ThreadPool::JoinAll() {
shutdown_ = true;
cond_.notify_all();

tmp.swap(workers_);
workers_.clear();
tmp.swap(worker_threads_);
worker_threads_.clear();
}

for (auto& t : tmp) {
Expand All @@ -52,7 +52,7 @@ void ThreadPool::JoinAll() {

void ThreadPool::_CreateWorker() {
std::thread t([this]() { this->_WorkerRoutine(); });
workers_.push_back(std::move(t));
worker_threads_.push_back(std::move(t));
}

void ThreadPool::_WorkerRoutine() {
Expand Down
2 changes: 1 addition & 1 deletion src/std/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class ThreadPool final {
std::atomic<unsigned> pendingStopSignal_;

static thread_local bool working_;
std::deque<std::thread> workers_;
std::deque<std::thread> worker_threads_;

std::mutex mutex_;
std::condition_variable cond_;
Expand Down

0 comments on commit 26121e0

Please sign in to comment.