Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: use buffered fwrite to write binlog instead of Mmap #2778

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "pstd/include/pstd_status.h"
#include "pstd/include/noncopyable.h"
#include "include/pika_define.h"
#include "net/src/dispatch_thread.h"

std::string NewFileName(const std::string& name, uint32_t current);

Expand Down Expand Up @@ -76,6 +77,7 @@ class Binlog : public pstd::noncopyable {
}

void Close();
void FlushBufferedFile();

private:
pstd::Status Put(const char* item, int len);
Expand Down Expand Up @@ -108,6 +110,8 @@ class Binlog : public pstd::noncopyable {
std::string filename_;

std::atomic<bool> binlog_io_error_;

net::TimerTaskThread timer_task_thread_;
};

#endif
1 change: 1 addition & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
class PikaServer;
/* Global Const */
constexpr int MAX_DB_NUM = 8;
constexpr int FWRITE_USER_SPACE_BUF_SIZE = 512LL << 10;//512KB

/* Port shift */
const int kPortShiftRSync = 1000;
Expand Down
32 changes: 23 additions & 9 deletions src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Binlog::Binlog(std::string binlog_path, const int file_size)
LOG(INFO) << "Binlog: Manifest file not exist, we create a new one.";

profile = NewFileName(filename_, pro_num_);
s = pstd::NewWritableFile(profile, queue_);
s = pstd::NewBufferedWritableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE);
if (!s.ok()) {
LOG(FATAL) << "Binlog: new " << filename_ << " " << s.ToString();
}
Expand Down Expand Up @@ -112,23 +112,37 @@ Binlog::Binlog(std::string binlog_path, const int file_size)

profile = NewFileName(filename_, pro_num_);
DLOG(INFO) << "Binlog: open profile " << profile;
s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_);
s = pstd::BufferedAppendableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE, version_->pro_offset_);
if (!s.ok()) {
LOG(FATAL) << "Binlog: Open file " << profile << " error " << s.ToString();
}

uint64_t filesize = queue_->Filesize();
DLOG(INFO) << "Binlog: filesize is " << filesize;
}

InitLogFile();

timer_task_thread_.AddTimerTask("flush_binlog_task", 500, true,
[this] { this->FlushBufferedFile(); });
timer_task_thread_.StartThread();
}

Binlog::~Binlog() {
std::lock_guard l(mutex_);
timer_task_thread_.StopThread();
Close();
}

void Binlog::FlushBufferedFile() {
std::lock_guard l(mutex_);
if (!opened_.load()) {
return;
}
if (queue_) {
queue_->Flush();
}
}

void Binlog::Close() {
if (!opened_.load()) {
return;
Expand Down Expand Up @@ -210,7 +224,7 @@ Status Binlog::Put(const char* item, int len) {
if (filesize > file_size_) {
std::unique_ptr<pstd::WritableFile> queue;
std::string profile = NewFileName(filename_, pro_num_ + 1);
s = pstd::NewWritableFile(profile, queue);
s = pstd::NewBufferedWritableFile(profile, queue, FWRITE_USER_SPACE_BUF_SIZE);
if (!s.ok()) {
LOG(ERROR) << "Binlog: new " << filename_ << " " << s.ToString();
return s;
Expand Down Expand Up @@ -263,9 +277,9 @@ Status Binlog::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int*
s = queue_->Append(pstd::Slice(buf, kHeaderSize));
if (s.ok()) {
s = queue_->Append(pstd::Slice(ptr, n));
if (s.ok()) {
s = queue_->Flush();
}
// if (s.ok()) {
// s = queue_->Flush();
// }
}
block_offset_ += static_cast<int32_t>(kHeaderSize + n);

Expand Down Expand Up @@ -387,7 +401,7 @@ Status Binlog::SetProducerStatus(uint32_t pro_num, uint64_t pro_offset, uint32_t
pstd::DeleteFile(profile);
}

pstd::NewWritableFile(profile, queue_);
pstd::NewBufferedWritableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE);
Binlog::AppendPadding(queue_.get(), &pro_offset);

pro_num_ = pro_num;
Expand Down Expand Up @@ -426,7 +440,7 @@ Status Binlog::Truncate(uint32_t pro_num, uint64_t pro_offset, uint64_t index) {
version_->StableSave();
}

Status s = pstd::AppendWritableFile(profile, queue_, version_->pro_offset_);
Status s = pstd::BufferedAppendableFile(profile, queue_, FWRITE_USER_SPACE_BUF_SIZE, version_->pro_offset_);
if (!s.ok()) {
return s;
}
Expand Down
8 changes: 7 additions & 1 deletion src/pstd/include/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ Status NewSequentialFile(const std::string& fname, std::unique_ptr<SequentialFil

Status NewWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result);

Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes);

Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset);

Status NewRWFile(const std::string& fname, std::unique_ptr<RWFile>& result);

Status AppendSequentialFile(const std::string& fname, SequentialFile** result);
Expand Down Expand Up @@ -100,7 +106,7 @@ class SequentialFile {
// virtual Status Read(size_t n, char *&result, char *scratch) = 0;
virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
virtual Status Skip(uint64_t n) = 0;
// virtual Status Close() = 0;
// virtual Status Close() = 0;F
virtual char* ReadLine(char* buf, int n) = 0;
};

Expand Down
183 changes: 179 additions & 4 deletions src/pstd/src/env.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
#include "pstd/include/env.h"

#include <dirent.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <cassert>

#include <cstdio>
#include <fstream>
#include <sstream>
#include <utility>
#include <thread>
Expand Down Expand Up @@ -472,6 +468,102 @@ class PosixMmapFile : public WritableFile {
uint64_t Filesize() override { return write_len_ + file_offset_ + (dst_ - base_); }
};

class BufferedWritableFile : public WritableFile {
private:
std::string filename_;
FILE* file_;
int32_t user_space_buf_size_;
uint64_t curr_file_size_;
char* buffer_;

public:
BufferedWritableFile() = delete;
BufferedWritableFile(const BufferedWritableFile&) = delete;
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete;

// caller must ensure passing-in 'file' is not nullptr
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
: filename_(std::move(file_name)),
file_(file),
user_space_buf_size_(user_space_buf_size),
curr_file_size_(curr_file_size),
buffer_(buf) {
assert(file_ && "file_ can not be nullptr");
}
Comment on lines +471 to +492
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper buffer management in the constructor.

The constructor takes ownership of the buffer but does not handle the case where setvbuf fails after buffer allocation.

-  BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
+  BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
    : filename_(std::move(file_name)),
      file_(file),
      user_space_buf_size_(user_space_buf_size),
      curr_file_size_(curr_file_size),
      buffer_(buf) {
    assert(file_ && "file_ can not be nullptr");
    if (user_space_buf_size_ > 0 && setvbuf(file_, buffer_, _IOFBF, user_space_buf_size_) != 0) {
      free(buffer_);
      buffer_ = nullptr;
      user_space_buf_size_ = 0;
      // Handle error appropriately, e.g., throw an exception or set an error status
    }
  }
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class BufferedWritableFile : public WritableFile {
private:
std::string filename_;
FILE* file_;
int32_t user_space_buf_size_;
uint64_t curr_file_size_;
char* buffer_;
public:
BufferedWritableFile() = delete;
BufferedWritableFile(const BufferedWritableFile&) = delete;
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete;
// caller must ensure passing-in 'file' is not nullptr
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
: filename_(std::move(file_name)),
file_(file),
user_space_buf_size_(user_space_buf_size),
curr_file_size_(curr_file_size),
buffer_(buf) {
assert(file_ && "file_ can not be nullptr");
}
class BufferedWritableFile : public WritableFile {
private:
std::string filename_;
FILE* file_;
int32_t user_space_buf_size_;
uint64_t curr_file_size_;
char* buffer_;
public:
BufferedWritableFile() = delete;
BufferedWritableFile(const BufferedWritableFile&) = delete;
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete;
// caller must ensure passing-in 'file' is not nullptr
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
: filename_(std::move(file_name)),
file_(file),
user_space_buf_size_(user_space_buf_size),
curr_file_size_(curr_file_size),
buffer_(buf) {
assert(file_ && "file_ can not be nullptr");
if (user_space_buf_size_ > 0 && setvbuf(file_, buffer_, _IOFBF, user_space_buf_size_) != 0) {
free(buffer_);
buffer_ = nullptr;
user_space_buf_size_ = 0;
// Handle error appropriately, e.g., throw an exception or set an error status
}
}


~BufferedWritableFile() override {
if (file_) {
BufferedWritableFile::Close();
}
if(buffer_) {
free(buffer_);
}
}

int32_t GetUserSpaceBufSize() const { return user_space_buf_size_; }

Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;

while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}

return Status::OK();
}
Comment on lines +505 to +533
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle retry logic and error conditions in Append method.

The Append method includes retry logic but does not handle the case where fwrite fails due to reasons other than ENOSPC. Consider adding a retry limit and handling other error scenarios.

  Status Append(const Slice& data) override {
    if (!file_) {
      return IOError("fwrite target: " + filename_ + " is not opened", errno);
    }
    const char* src = data.data();
    size_t left = data.size();
    int32_t max_retries = 4;
    int retry_count = 0;

    while (left > 0) {
      size_t written = fwrite(src, sizeof(char), left, file_);
      if (written == 0) {
        if (ferror(file_)) {
          int err_num = errno;
          clearerr(file_);
          return IOError("fwrite error with " + filename_, err_num);
        }
        if (errno == ENOSPC || ++retry_count > max_retries) {
          return IOError(filename_, errno);
        }
      }
      src += written;
      left -= written;
      curr_file_size_ += written;
      retry_count = 0;
    }

    return Status::OK();
  }

Committable suggestion was skipped due to low confidence.


Status Close() override {
if (fclose(file_) != 0) {
return IOError("fclose failed: " + filename_, errno);
}
file_ = nullptr;
return Status::OK();
}

Status Flush() override {
if (fflush(file_) != 0) {
return IOError("fflush failed: " + filename_, errno);
}
return Status::OK();
}

Status Sync() override {
auto s = BufferedWritableFile::Flush();
if (!s.ok()) {
return s;
}
int32_t file_fd = fileno(file_);
if (fsync(file_fd) != 0) {
return IOError("fsync failed: " + filename_, errno);
}
return Status::OK();
}

Status Trim(uint64_t target) override { return Status::OK(); }

uint64_t Filesize() override { return curr_file_size_; }
};

RWFile::~RWFile() = default;

class MmapRWFile : public RWFile {
Expand Down Expand Up @@ -648,6 +740,89 @@ Status NewWritableFile(const std::string& fname, std::unique_ptr<WritableFile>&
return s;
}

Status BufferedAppendableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes, int64_t offset) {
const int fd = open(fname.c_str(), O_RDWR | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}

FILE* file = fdopen(fd, "r+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}

if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedAppendableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}

// Move the file pointer to the specified offset
if (fseek(file, offset, SEEK_SET) != 0) {
fclose(file);
return IOError("Failed to seek to the specified offset in " + fname, errno);
}

result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, offset, buf);
return Status::OK();
}

Status NewBufferedWritableFile(const std::string& fname, std::unique_ptr<WritableFile>& result,
int32_t user_space_buf_size_bytes) {
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC, 0644);
if (fd < 0) {
return IOError(fname, errno);
}

FILE* file = fdopen(fd, "w+");
if (file == nullptr) {
close(fd);
return IOError("Error converting file descriptor to FILE* when NewBufferedWritableFile for file:" + fname, errno);
}

if (user_space_buf_size_bytes < 0) {
fclose(file);
return Status::Error("user_space_buf_size_bytes must not be negative when NewBufferedWritableFile");
}
char* buf = nullptr;
int32_t r = 0;
if (user_space_buf_size_bytes != 0) {
buf = (char*)malloc(user_space_buf_size_bytes);
if (!buf) {
fclose(file);
return Status::Error("Failed to allocate buffer when BufferedAppendableFile");
}
r = setvbuf(file, buf, _IOFBF, user_space_buf_size_bytes);
} else {
r = setvbuf(file, nullptr, _IONBF, 0);
}
if (r != 0) {
fclose(file);
return IOError("Failed to set user space buffer for " + fname, errno);
}

// the file was trancated if it was existing for syscall open use flag "O_TRUNC",
result = std::make_unique<BufferedWritableFile>(fname, file, user_space_buf_size_bytes, 0, buf);
return Status::OK();
}

Status NewRWFile(const std::string& fname, std::unique_ptr<RWFile>& result) {
Status s;
const int fd = open(fname.c_str(), O_CREAT | O_RDWR | O_CLOEXEC, 0644);
Expand Down
Loading