Skip to content

Commit

Permalink
TocStore cached handle cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Jan 23, 2025
1 parent c3d1558 commit 6668556
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 26 deletions.
43 changes: 22 additions & 21 deletions src/fdb5/toc/TocStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,24 @@ void TocStore::remove(const eckit::URI& uri, std::ostream& logAlways, std::ostre
}
}

eckit::DataHandle *TocStore::getCachedHandle( const eckit::PathName &path ) const {
eckit::DataHandle* TocStore::getCachedHandle( const eckit::PathName &path ) const {
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
HandleStore::const_iterator j = handles_.find( path );
if ( j != handles_.end() )
return j->second;
return j->second.get();
else
return nullptr;
}

void TocStore::closeDataHandles() {
for ( HandleStore::iterator j = handles_.begin(); j != handles_.end(); ++j ) {
eckit::DataHandle *dh = j->second;
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
for (const auto& [p, dh] : handles_) {
dh->close();
delete dh;
}
handles_.clear();
}

eckit::DataHandle *TocStore::createFileHandle(const eckit::PathName &path) {
std::unique_ptr<eckit::DataHandle> TocStore::createFileHandle(const eckit::PathName &path) {

static size_t sizeBuffer = eckit::Resource<unsigned long>("fdbBufferSize", 64 * 1024 * 1024);

Expand All @@ -191,17 +191,17 @@ eckit::DataHandle *TocStore::createFileHandle(const eckit::PathName &path) {
<< " buffer size " << sizeBuffer
<< std::endl;

return new LustreFileHandle<FDBFileHandle>(path, sizeBuffer, stripeDataLustreSettings());
return std::unique_ptr<eckit::DataHandle>(new LustreFileHandle<FDBFileHandle>(path, sizeBuffer, stripeDataLustreSettings()));
}

LOG_DEBUG_LIB(LibFdb5) << "Creating FDBFileHandle to " << path
<< " with buffer of " << eckit::Bytes(sizeBuffer)
<< std::endl;

return new FDBFileHandle(path, sizeBuffer);
return std::unique_ptr<eckit::DataHandle>(new FDBFileHandle(path, sizeBuffer));
}

eckit::DataHandle *TocStore::createAsyncHandle(const eckit::PathName &path) {
std::unique_ptr<eckit::DataHandle> TocStore::createAsyncHandle(const eckit::PathName &path) {

static size_t nbBuffers = eckit::Resource<unsigned long>("fdbNbAsyncBuffers", 4);
static size_t sizeBuffer = eckit::Resource<unsigned long>("fdbSizeAsyncBuffer", 64 * 1024 * 1024);
Expand All @@ -213,17 +213,17 @@ eckit::DataHandle *TocStore::createAsyncHandle(const eckit::PathName &path) {
<< " buffer each with " << eckit::Bytes(sizeBuffer)
<< std::endl;

return new LustreFileHandle<eckit::AIOHandle>(path, nbBuffers, sizeBuffer, stripeDataLustreSettings());
return std::unique_ptr<eckit::DataHandle>(new LustreFileHandle<eckit::AIOHandle>(path, nbBuffers, sizeBuffer, stripeDataLustreSettings()));
}

return new eckit::AIOHandle(path, nbBuffers, sizeBuffer);
return std::unique_ptr<eckit::DataHandle>(new eckit::AIOHandle(path, nbBuffers, sizeBuffer));
}

eckit::DataHandle *TocStore::createDataHandle(const eckit::PathName &path) {
std::unique_ptr<eckit::DataHandle> TocStore::createDataHandle(const eckit::PathName &path) {

static bool fdbWriteToNull = eckit::Resource<bool>("fdbWriteToNull;$FDB_WRITE_TO_NULL", false);
if(fdbWriteToNull)
return new eckit::EmptyHandle();
return std::unique_ptr<eckit::DataHandle>(new eckit::EmptyHandle());

static bool fdbAsyncWrite = eckit::Resource<bool>("fdbAsyncWrite;$FDB_ASYNC_WRITE", false);
if(fdbAsyncWrite)
Expand All @@ -233,12 +233,14 @@ eckit::DataHandle *TocStore::createDataHandle(const eckit::PathName &path) {
}

eckit::DataHandle& TocStore::getDataHandle( const eckit::PathName &path ) {
eckit::DataHandle *dh = getCachedHandle(path);
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
eckit::DataHandle* dh = getCachedHandle(path);
if ( !dh ) {
dh = createDataHandle(path);
ASSERT(dh);
handles_[path] = dh;
dh->openForAppend(0);
auto dataHandle = createDataHandle(path);
ASSERT(dataHandle);
dataHandle->openForAppend(0);
dh = dataHandle.get();
handles_[path] = std::move(dataHandle);
}
return *dh;
}
Expand All @@ -264,9 +266,8 @@ eckit::PathName TocStore::getDataPath(const Key& key) const {
}

void TocStore::flushDataHandles() {

for (HandleStore::iterator j = handles_.begin(); j != handles_.end(); ++j) {
eckit::DataHandle *dh = j->second;
std::lock_guard<std::recursive_mutex> lock(handlesMutex_);
for (const auto& [p, dh] : handles_) {
dh->flush();
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/fdb5/toc/TocStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ class TocStore : public Store, public TocCommon {

void remove(const eckit::URI& uri, std::ostream& logAlways, std::ostream& logVerbose, bool doit) const override;

eckit::DataHandle *getCachedHandle( const eckit::PathName &path ) const;
eckit::DataHandle* getCachedHandle( const eckit::PathName &path ) const;
void closeDataHandles();
eckit::DataHandle *createFileHandle(const eckit::PathName &path);
eckit::DataHandle *createAsyncHandle(const eckit::PathName &path);
eckit::DataHandle *createDataHandle(const eckit::PathName &path);
std::unique_ptr<eckit::DataHandle> createFileHandle(const eckit::PathName &path);
std::unique_ptr<eckit::DataHandle> createAsyncHandle(const eckit::PathName &path);
std::unique_ptr<eckit::DataHandle> createDataHandle(const eckit::PathName &path);
eckit::DataHandle& getDataHandle( const eckit::PathName &path );
eckit::PathName generateDataPath(const Key& key) const;
eckit::PathName getDataPath(const Key& key) const;
Expand All @@ -84,11 +84,12 @@ class TocStore : public Store, public TocCommon {

private: // types

typedef std::map< std::string, eckit::DataHandle * > HandleStore;
typedef std::map< std::string, std::unique_ptr<eckit::DataHandle>> HandleStore;
typedef std::map< Key, std::string > PathStore;

private: // members

mutable std::recursive_mutex handlesMutex_;
HandleStore handles_; ///< stores the DataHandles being used by the Session

mutable PathStore dataPaths_;
Expand Down

0 comments on commit 6668556

Please sign in to comment.