Skip to content

Commit

Permalink
Flush async log before exit
Browse files Browse the repository at this point in the history
  • Loading branch information
chenBright committed Jan 1, 2024
1 parent 6d5439d commit be401a4
Showing 1 changed file with 76 additions and 69 deletions.
145 changes: 76 additions & 69 deletions src/butil/logging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ DEFINE_bool(log_func_name, false, "Log function name in each log");
DEFINE_bool(async_log, false, "Use async log");

DEFINE_int32(max_async_log_queue_size, 100000, "Max async log size. "
"If current log count of async log > max_async_log_size, Use sync log to protect process.");
"If current log count of async log > max_async_log_size, "
"Use sync log to protect process.");

DEFINE_int32(sleep_to_flush_async_log_s, 0,
"If the value > 0, sleep before atexit to flush async log");

namespace {

Expand Down Expand Up @@ -444,28 +448,36 @@ struct BAIDU_CACHELINE_ALIGNMENT LogRequest {

LogRequest* const LogRequest::UNCONNECTED = (LogRequest*)(intptr_t)-1;

class AsyncLog : public butil::SimpleThread {
class AsyncLogger : public butil::SimpleThread {
public:
static AsyncLog* GetInstance();
static AsyncLogger* GetInstance();

void Log(const std::string& log);
void Log(std::string&& log);
void StopAndJoin();

private:
friend struct DefaultSingletonTraits<AsyncLog>;
friend struct DefaultSingletonTraits<AsyncLogger>;

static LogRequest _stop_req;

AsyncLog();
~AsyncLog() override;
AsyncLogger();
~AsyncLogger() override;

static void AtExit() {
GetInstance()->StopAndJoin();
if (FLAGS_sleep_to_flush_async_log_s > 0) {
::sleep(FLAGS_sleep_to_flush_async_log_s);
}
}

void LogImpl(LogRequest* log_req);

void Run() override;

void LogTask();
void LogTask(LogRequest* req);

bool IsLogComplete(LogRequest* old_head,
bool singular_node,
LogRequest** new_tail);
bool IsLogComplete(LogRequest* old_head);

void DoLog(LogRequest* req);
void DoLog(const std::string& log);
Expand All @@ -475,39 +487,40 @@ friend struct DefaultSingletonTraits<AsyncLog>;
butil::ConditionVariable _cond;
LogRequest* _current_log_request;
butil::atomic<int32_t> _log_request_count;
bool _stop;
butil::atomic<bool> _stop;
};

AsyncLog* AsyncLog::GetInstance() {
return Singleton<AsyncLog,
LeakySingletonTraits<AsyncLog>>::get();
AsyncLogger* AsyncLogger::GetInstance() {
return Singleton<AsyncLogger,
LeakySingletonTraits<AsyncLogger>>::get();
}

AsyncLog::AsyncLog()
AsyncLogger::AsyncLogger()
: butil::SimpleThread("async_log_thread")
, _log_head(NULL)
, _cond(&_mutex)
, _current_log_request(NULL)
, _stop(false) {
Start();
// We need to stop async logger and
// flush all async log before exit.
atexit(AtExit);
}

AsyncLog::~AsyncLog() {
{
BAIDU_SCOPED_LOCK(_mutex);
_stop = true;
_cond.Signal();
}
Join();
AsyncLogger::~AsyncLogger() {
StopAndJoin();
}

void AsyncLog::Log(const std::string& log) {
void AsyncLogger::Log(const std::string& log) {
if (log.empty()) {
return;
}

if (_log_request_count.fetch_add(1, butil::memory_order_relaxed) >
FLAGS_max_async_log_queue_size) {
bool is_full = FLAGS_max_async_log_queue_size > 0 &&
_log_request_count.fetch_add(1, butil::memory_order_relaxed) >
FLAGS_max_async_log_queue_size;
if (is_full || _stop) {
// Async logger is full or stopped, fallback to sync log.
DoLog(log);
return;
}
Expand All @@ -522,13 +535,16 @@ void AsyncLog::Log(const std::string& log) {
LogImpl(log_req);
}

void AsyncLog::Log(std::string&& log) {
void AsyncLogger::Log(std::string&& log) {
if (log.empty()) {
return;
}

if (_log_request_count.fetch_add(1, butil::memory_order_relaxed) >
FLAGS_max_async_log_queue_size) {
bool is_full = FLAGS_max_async_log_queue_size > 0 &&
_log_request_count.fetch_add(1, butil::memory_order_relaxed) >
FLAGS_max_async_log_queue_size;
if (is_full || _stop) {
// Async logger is full or stopped, fallback to sync log.
DoLog(log);
return;
}
Expand All @@ -543,7 +559,7 @@ void AsyncLog::Log(std::string&& log) {
LogImpl(log_req);
}

void AsyncLog::LogImpl(LogRequest* log_req) {
void AsyncLogger::LogImpl(LogRequest* log_req) {
log_req->next = LogRequest::UNCONNECTED;
LogRequest* const prev_head =
_log_head.exchange(log_req, butil::memory_order_release);
Expand All @@ -555,89 +571,83 @@ void AsyncLog::LogImpl(LogRequest* log_req) {
log_req->next = NULL;

BAIDU_SCOPED_LOCK(_mutex);
_current_log_request = log_req;
_cond.Signal();
if (_stop) {
LogTask(log_req);
} else {
_current_log_request = log_req;
_cond.Signal();
}
}

void AsyncLog::Run() {
void AsyncLogger::StopAndJoin() {
if (!_stop) {
BAIDU_SCOPED_LOCK(_mutex);
_stop = true;
_cond.Signal();
}
if (!HasBeenJoined()) {
Join();
}
}

void AsyncLogger::Run() {
while (true) {
BAIDU_SCOPED_LOCK(_mutex);
while (!_stop && !_current_log_request) {
_cond.Wait();
}
if (_stop) {
if (_stop && !_current_log_request) {
break;
}

LogTask();
LogTask(_current_log_request);
_current_log_request = NULL;
}
}

void AsyncLog::LogTask() {
LogRequest* req = _current_log_request;
LogRequest* cur_tail = NULL;
void AsyncLogger::LogTask(LogRequest* req) {
do {
// req was written, skip it.
// req was logged, skip it.
if (req->next != NULL && req->data.empty()) {
LogRequest* const saved_req = req;
req = req->next;
butil::return_object(saved_req);
}

// Log all req to file.
// Log all requests to file.
while (req->next != NULL) {
LogRequest* const saved_req = req;
req = req->next;
if (!saved_req->data.empty()) {
DoLog(saved_req);
saved_req->data.clear();
}
// Release WriteRequest until last request.
// Release LogRequests until last request.
butil::return_object(saved_req);
}
if (!req->data.empty()) {
DoLog(req);
req->data.clear();
}

if (NULL == cur_tail) {
for (cur_tail = req; cur_tail->next != NULL;
cur_tail = cur_tail->next);
}
// Return when there's no more WriteRequests and req is completely
// written.
if (IsLogComplete(cur_tail, (req == cur_tail), &cur_tail)) {
if (cur_tail != req) {
fprintf(stderr, "cur_tail should equal to req\n");
}
// Return when there's no more LogRequests.
if (IsLogComplete(req)) {
butil::return_object(req);
return;
}
} while (true);
}

bool AsyncLog::IsLogComplete(LogRequest* old_head,
bool singular_node,
LogRequest** new_tail) {
bool AsyncLogger::IsLogComplete(LogRequest* old_head) {
if (old_head->next) {
fprintf(stderr, "old_head->next should be NULL\n");
}
LogRequest* new_head = old_head;
LogRequest* desired = NULL;
bool return_when_no_more = true;
if (!singular_node) {
desired = old_head;
// Write is obviously not complete if old_head is not fully written.
return_when_no_more = false;
}
if (_log_head.compare_exchange_strong(
new_head, desired, butil::memory_order_acquire)) {
// No one added new requests.
if (new_tail) {
*new_tail = old_head;
}
return return_when_no_more;
return true;
}
if (new_head == old_head) {
fprintf(stderr, "new_head should not be equal to old_head\n");
Expand All @@ -664,17 +674,14 @@ bool AsyncLog::IsLogComplete(LogRequest* old_head,

// Link old list with new list.
old_head->next = tail;
if (new_tail) {
*new_tail = new_head;
}
return false;
}

void AsyncLog::DoLog(LogRequest* req) {
void AsyncLogger::DoLog(LogRequest* req) {
DoLog(req->data);
}

void AsyncLog::DoLog(const std::string& log) {
void AsyncLogger::DoLog(const std::string& log) {
Log2File(log);
_log_request_count.fetch_sub(1);
}
Expand Down Expand Up @@ -1236,7 +1243,7 @@ class DefaultLogSink : public LogSink {
// write to log file
if ((logging_destination & LOG_TO_FILE) != 0) {
if (FLAGS_async_log) {
AsyncLog::GetInstance()->Log(std::move(log));
AsyncLogger::GetInstance()->Log(std::move(log));
} else {
Log2File(log);
}
Expand Down

0 comments on commit be401a4

Please sign in to comment.