Skip to content

Commit

Permalink
Improved fcntl locking in TocHandler to cover all cases of write-read…
Browse files Browse the repository at this point in the history
… contention on the shared TOC file.
  • Loading branch information
nicolau-manubens committed Dec 2, 2024
1 parent 3adc301 commit e2891e9
Showing 1 changed file with 52 additions and 22 deletions.
74 changes: 52 additions & 22 deletions src/fdb5/toc/TocHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,18 @@ void TocHandler::openForAppend() {
}
#endif
SYSCALL2((fd_ = ::open( tocPath_.localPath(), iomode, (mode_t)0777 )), tocPath_);

if (!isSubToc_) {

struct flock lock;
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris
/// @note: this lock will be automatically released on close()

}
}

void TocHandler::openForRead() const {
Expand Down Expand Up @@ -293,13 +305,17 @@ void TocHandler::openForRead() const {
FileDescHandle toc(fd_, true); // closes the file descriptor
AutoClose closer1(toc);

struct flock lock;
lock.l_type = F_RDLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris
/// @note: will be released when AutoClose'd
if (!isSubToc_) {

struct flock lock;
lock.l_type = F_RDLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris
/// @note: will be released when AutoClose'd

}

fd_ = -1;

Expand All @@ -309,7 +325,8 @@ void TocHandler::openForRead() const {
long buffersize = 4*1024*1024;
toc.copyTo(*cachedToc_, buffersize, tocSize, tocReadStats_);
cachedToc_->openForRead();
} else {

} else if (!isSubToc_) {

struct flock lock;
lock.l_type = F_RDLCK;
Expand All @@ -318,7 +335,6 @@ void TocHandler::openForRead() const {
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris
/// @note: this lock will be automatically released on close()
/// @todo: is this OK? Maybe somewhere the TocHandle is kept open for read e.g. for the lifetime of the process

}
}
Expand Down Expand Up @@ -791,7 +807,25 @@ void TocHandler::writeInitRecord(const Key& key) {

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used())); // allocate (large) TocRecord on heap not stack (MARS-779)

/// @note: lock for read if this TocHandler is not for a subtoc and the contents were not cached
struct flock lock;
bool lock_acquired = false;
if (!isSubToc_ && !cachedToc_) {
lock.l_type = F_RDLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris
lock_acquired = true;
}

size_t len = readNext(*r);

if (lock_acquired) {
lock.l_type = F_UNLCK;
::fcntl(fd_, F_SETLKW, &lock);
}

if (len == 0) {

LOG_DEBUG_LIB(LibFdb5) << "Initializing FDB TOC in " << tocPath_ << std::endl;
Expand Down Expand Up @@ -829,6 +863,15 @@ void TocHandler::writeInitRecord(const Key& key) {
eckit::MemoryStream s(&r2->payload_[0], r2->maxPayloadSize);
s << key;
s << isSubToc_;

if (!isSubToc_) {
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris
}

append(*r2, s.position());
dbUID_ = r2->header_.uid_;

Expand Down Expand Up @@ -869,13 +912,6 @@ void TocHandler::writeSubTocRecord(const TocHandler& subToc) {
NFSTocHandlerCloser closer(*this); // only closes the fd, which also releases the lock
/// @todo: explore using FileDescHandle and AutoClose instead of NFSTocHandlerCloser

struct flock lock;
lock.l_type = F_WRLCK;
lock.l_start = 0;
lock.l_whence = SEEK_SET;
lock.l_len = 0;
::fcntl(fd_, F_SETLKW, &lock); /// @note: updates file attributes, although only on Linux and Solaris

std::unique_ptr<TocRecord> r(new TocRecord(serialisationVersion_.used(), TocRecord::TOC_SUB_TOC)); // allocate (large) TocRecord on heap not stack (MARS-779)

eckit::MemoryStream s(&r->payload_[0], r->maxPayloadSize);
Expand All @@ -891,10 +927,6 @@ void TocHandler::writeSubTocRecord(const TocHandler& subToc) {

eckit::fdatasync(fd_); // flush toc file content and some metadata

/// @note: releasing the lock is unnecessary as the fd will be closed and release the lock automatically
// lock.l_type = F_UNLCK;
// ::fcntl(fd_, F_SETLKW, &lock);

tocPath_.syncParentDirectory(); // flush DB directory file handle cache

LOG_DEBUG_LIB(LibFdb5) << "Write TOC_SUB_TOC " << path << std::endl;
Expand Down Expand Up @@ -954,8 +986,6 @@ void TocHandler::writeIndexRecord(const Index& index) {

// Otherwise, we actually do the writing!

// TODO: if !useSubToc_, get a lock here

openForAppend();
TocHandlerCloser closer(*this);

Expand Down

0 comments on commit e2891e9

Please sign in to comment.