Skip to content

Commit

Permalink
Fix for error in bufferpool
Browse files Browse the repository at this point in the history
  • Loading branch information
Kai-Uwe Sattler committed Dec 10, 2023
1 parent eedd9c8 commit fd703fa
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 40 deletions.
40 changes: 38 additions & 2 deletions src/bp_file/bufferpool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
* along with Poseidon. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include "defs.hpp"
#include "bufferpool.hpp"
#include "spdlog/spdlog.h"

bufferpool::bufferpool(std::size_t bsize) : bsize_(bsize), slots_(bsize_), p_reads_(0), l_reads_(0) {
spdlog::debug("bufferpool()");
spdlog::info("creating bufferpool with {} pages", bsize);
buffer_ = new page[bsize_];
slots_.set(); // set everything to 1 == unused
}
Expand All @@ -31,6 +32,32 @@ bufferpool::~bufferpool() {
delete [] buffer_;
}

void bufferpool::pin_page(paged_file::page_id pid) {
std::unique_lock lock(mutex_);
auto iter = ptable_.find(pid);
if (iter != ptable_.end()) {
iter->second.pinned_ = true;
// if (((pid & 0xF000000000000000) >> 60) == NODE_FILE_ID && (pid & 0xFFFFFFFFFFFFFFF) == 921)
// spdlog::info("pin page 921");
}
else
spdlog::info("cannot pin page {}", pid);

}

void bufferpool::unpin_page(paged_file::page_id pid) {
std::unique_lock lock(mutex_);
auto iter = ptable_.find(pid);
if (iter != ptable_.end()) {
iter->second.pinned_ = false;
// if (((pid & 0xF000000000000000) >> 60) == NODE_FILE_ID && (pid & 0xFFFFFFFFFFFFFFF) == 921)
// spdlog::info("unpin page 921");
}
else
spdlog::info("cannot unpin page {}", pid);

}

void bufferpool::register_file(uint8_t file_id, paged_file_ptr pf) {
assert(file_id < MAX_PFILES);
files_[file_id] = pf;
Expand Down Expand Up @@ -85,7 +112,7 @@ page *bufferpool::fetch_page(paged_file::page_id pid) {
// load page from file
auto p = load_page_from_file(pid);
// ... add it to the hashtable
ptable_.emplace(pid, buf_slot{ p.first, false, p.second });
ptable_.emplace(pid, buf_slot{ p.first, false, false, p.second});
// ... and to the LRU list
lru_list_.push_back(pid);
return p.first;
Expand Down Expand Up @@ -171,18 +198,27 @@ void bufferpool::purge() {
memset(buffer_, 0, sizeof(page) * bsize_);
}

bool bufferpool::has_page(paged_file::page_id pid) {
return ptable_.find(pid) != ptable_.end();
}

bool bufferpool::evict_page() {
spdlog::debug("\t---- evict_page...");
std::unique_lock lock(mutex_);
for (auto it1 = lru_list_.begin(); it1 != lru_list_.end(); it1++) {
auto pid = *it1;
auto it2 = ptable_.find(pid);
if (it2 != ptable_.end()) {
if (it2->second.pinned_)
continue;
if (it2->second.dirty_) {
spdlog::info("bufferpool::evict dirty page {}", pid & 0xFFFFFFFFFFFFFFF);
// TODO: write WAL log record for UNDO
write_page_to_file(pid, it2->second.p_);
}
//if (((pid & 0xF000000000000000) >> 60) == NODE_FILE_ID && (pid & 0xFFFFFFFFFFFFFFF) == 921)
// spdlog::info("bufferpool::evict page {} of file {} - pinned: {}",
// pid & 0xFFFFFFFFFFFFFFF, (pid & 0xF000000000000000) >> 60, it2->second.pinned_);
slots_.set(it2->second.pos_);
memset(it2->second.p_, 0, sizeof(PAGE_SIZE));
ptable_.erase(pid);
Expand Down
8 changes: 7 additions & 1 deletion src/bp_file/bufferpool.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2022 DBIS Group - TU Ilmenau, All Rights Reserved.
* Copyright (C) 2019-2023 DBIS Group - TU Ilmenau, All Rights Reserved.
*
* This file is part of the Poseidon package.
*
Expand Down Expand Up @@ -114,6 +114,11 @@ class bufferpool {
*/
double hit_ratio() const;

void pin_page(paged_file::page_id pid);
void unpin_page(paged_file::page_id pid);

bool has_page(paged_file::page_id pid);

private:
void dump();

Expand All @@ -127,6 +132,7 @@ class bufferpool {
struct buf_slot {
page *p_; // a pointer to a page in buffer_
bool dirty_; // a flag indicating that the page is marked as dirty
bool pinned_; // a flag indicating that the page is pinned in the bufferpool
std::size_t pos_; // position of the slot in buffer_
};

Expand Down
9 changes: 7 additions & 2 deletions src/cmds/pcli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ std::string check_config_files(const std::string& fname) {

int main(int argc, char* argv[]) {
std::string db_name, pool_path, query_file, import_path, dot_file, qmode_str, format = "ldbc";
std::size_t bp_size = 0;
std::vector<std::string> import_files;
bool start_shell = false;
query_proc::mode qmode = query_proc::Interpret;
Expand All @@ -441,6 +442,7 @@ int main(int argc, char* argv[]) {
("verbose,v", bool_switch()->default_value(false), "Verbose - show debug output")
("db,d", value<std::string>(&db_name)->required(), "Database name (required)")
("pool,p", value<std::string>(&pool_path)->required(), "Path to the PMem/file pool")
("buffersize,b", value<std::size_t>(&bp_size), "Size of the bufferpool (in pages)")
("output,o", value<std::string>(&dot_file), "Dump the graph to the given file (in DOT format)")
("strict", bool_switch()->default_value(true), "Strict mode - assumes that all columns contain values of the same type")
("delimiter", value<char>(&delim_character)->default_value('|'), "Character delimiter")
Expand Down Expand Up @@ -480,6 +482,9 @@ int main(int argc, char* argv[]) {
if (vm.count("pool"))
pool_path = vm["pool"].as<std::string>();

if (vm.count("buffersize"))
bp_size = vm["buffersize"].as<std::size_t>();

if (vm.count("delimiter"))
delim_character = vm["delimiter"].as<char>();

Expand Down Expand Up @@ -528,11 +533,11 @@ int main(int argc, char* argv[]) {
if (access(pool_path.c_str(), F_OK) != 0) {
spdlog::info("create poolset {}", pool_path);
pool = graph_pool::create(pool_path);
graph = pool->create_graph(db_name);
graph = pool->create_graph(db_name, bp_size);
} else {
spdlog::info("open poolset {}", pool_path);
pool = graph_pool::open(pool_path, true);
graph = pool->open_graph(db_name);
graph = pool->open_graph(db_name, bp_size);
}

if (!import_files.empty()) {
Expand Down
10 changes: 5 additions & 5 deletions src/pool/graph_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ graph_pool::graph_pool() {

graph_pool::~graph_pool() {}

graph_db_ptr graph_pool::create_graph(const std::string& name) {
auto gptr = p_make_ptr<graph_db>(name, path_);
graph_db_ptr graph_pool::create_graph(const std::string& name, std::size_t bpool_size) {
auto gptr = p_make_ptr<graph_db>(name, path_, bpool_size);
graphs_.insert({ name, gptr});
return gptr;
}

graph_db_ptr graph_pool::open_graph(const std::string& name) {
graph_db_ptr graph_pool::open_graph(const std::string& name, std::size_t bpool_size) {
// TODO: check whether graph directory exists
std::filesystem::path path_obj(path_);
path_obj /=name;
Expand All @@ -71,7 +71,7 @@ graph_db_ptr graph_pool::open_graph(const std::string& name) {
spdlog::info("FATAL: graph '{}' doesn't exist in pool '{}'.", name, path_);
throw unknown_db();
}
auto gptr = p_make_ptr<graph_db>(name, path_);
auto gptr = p_make_ptr<graph_db>(name, path_, bpool_size);
graphs_.insert({ name, gptr});
return gptr;
}
Expand All @@ -91,4 +91,4 @@ void graph_pool::close() {
gp.second->purge_bufferpool();
gp.second->close_files();
}
}
}
4 changes: 2 additions & 2 deletions src/pool/graph_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ class graph_pool {
/**
* Create a new graph with the given name.
*/
graph_db_ptr create_graph(const std::string& name);
graph_db_ptr create_graph(const std::string& name, std::size_t bpool_size = DEFAULT_BUFFER_SIZE);

/**
* Open an existing graph with the given name. If no graph
* exists with this name an exception is raised.
*/
graph_db_ptr open_graph(const std::string& name);
graph_db_ptr open_graph(const std::string& name, std::size_t bpool_size = DEFAULT_BUFFER_SIZE);

void drop_graph(const std::string& name);

Expand Down
4 changes: 2 additions & 2 deletions src/py/py_poseidon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ PYBIND11_MODULE(poseidon, m) {
"Creates a new graph pool of the given size.");

py::class_<graph_pool>(m, "GraphPool")
.def("open_graph", &graph_pool::open_graph, py::arg("name"), "Opens the graph with the given name.")
.def("create_graph", &graph_pool::create_graph, py::arg("name"), "Creates a new graph with the given name.")
.def("open_graph", &graph_pool::open_graph, py::arg("name"), py::arg("buffersize"), "Opens the graph with the given name.")
.def("create_graph", &graph_pool::create_graph, py::arg("name"), py::arg("buffersize"), "Creates a new graph with the given name.")
.def("drop_graph", &graph_pool::drop_graph, py::arg("name"), "Deletes the given graph.")
.def("close", &graph_pool::close, "Closes the graph pool.");

Expand Down
29 changes: 16 additions & 13 deletions src/query/plan_op/algorithms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include <limits>
#include "qop_builtins.hpp"

qr_tuple num_links(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) {
std::optional<qr_tuple> num_links(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) {
int in_links = 0, out_links = 0;
auto n = qv_get_node(v.back());

Expand All @@ -29,13 +29,13 @@ qr_tuple num_links(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list&

qr_tuple res(2);
res[0] = qv_(in_links); res[1] = qv_(out_links);
return res;
return std::make_optional<qr_tuple>(res);
}
#if 0

// GroupBy([$3.authorid:uint64], [count($0.tweetid:uint64)], Expand(OUT, 'Author', ForeachRelationship(FROM, 'AUTHOR', Algorithm([OldestTweet, TUPLE], Limit(10, NodeScan('Website'))))))
// Expand(OUT, 'Author', ForeachRelationship(FROM, 'AUTHOR', Algorithm([OldestTweet, TUPLE], Limit(10, NodeScan('Website')))))
// Algorithm([OldestTweet, TUPLE], Limit(10, NodeScan('Website')))
qr_tuple oldest_tweet(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) {
std::optional<qr_tuple> oldest_tweet(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) {
auto n = qv_get_node(v.back());
auto tweet_label = ctx.gdb_->get_code("Tweet");
auto creation_label = ctx.gdb_->get_code("created_at");
Expand All @@ -47,24 +47,27 @@ qr_tuple oldest_tweet(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_lis
if (tweet.node_label == tweet_label) {
auto pval = ctx.gdb_->get_property_value(tweet, creation_label);
auto date_str = std::string(ctx.gdb_->get_string(pval.template get<dcode_t>()));
std::cout << "Tweet - created at: " << date_str << std::endl;
auto dval = boost::posix_time::from_iso_extended_string(date_str);
std::cout << "Tweet - created at: " << dval << std::endl;
//std::cout << "Tweet - created at: " << date_str << std::endl;
//auto dval = boost::posix_time::from_iso_extended_string(date_str);
//std::cout << "Tweet - created at: " << dval << std::endl;
if (date_str < oldest_date) {
oldest_date = date_str;
oldest_tweet = &tweet;
}
}
});
qr_tuple res(1);
res[0] = oldest_tweet;
return res;
if (oldest_tweet != nullptr) {
qr_tuple res(1);
res[0] = oldest_tweet;
return std::make_optional<qr_tuple>(res);
}
else {
return std::make_optional<qr_tuple>();
}
}
#endif

std::map<std::string, algorithm_op::tuple_algorithm_func> algorithm_op::tuple_algorithms_ = {
#if 0
{ std::string("OldestTweet"), oldest_tweet },
#endif
{ std::string("NumLinks"), num_links }
};

Expand Down
15 changes: 11 additions & 4 deletions src/query/plan_op/qop_algorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,15 @@ void algorithm_op::process(query_ctx &ctx, const qr_tuple &v) {
PROF_PRE;
if (call_mode_ == m_tuple) {
auto res = tuple_func_ptr_(ctx, v, args_);
auto v2 = concat(v, res);
consume_(ctx, v2);
PROF_POST(1);
if (res) {
auto v2 = concat(v, res.value());
consume_(ctx, v2);
PROF_POST(1);
}
else {
PROF_POST(0);
return;
}
}
else {
input_.append(v);
Expand All @@ -66,7 +72,8 @@ void algorithm_op::finish(query_ctx &ctx) {
PROF_PRE0;
if (call_mode_ == m_set) {
auto res = set_func_ptr_(ctx, input_, args_);
consume_(ctx, res);
if (res)
consume_(ctx, res.value());
}
finish_(ctx);
PROF_POST(call_mode_ == m_set ? 1 : 0);
Expand Down
4 changes: 2 additions & 2 deletions src/query/plan_op/qop_algorithm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ struct algorithm_op : public qop, public std::enable_shared_from_this<algorithm_
using param_list = std::vector<std::any>; // parameter list for invoking the algorithm

// function pointer type to a algorithm called in tuple mode
using tuple_algorithm_func = std::function<qr_tuple(query_ctx&, const qr_tuple&, param_list&)>;
using tuple_algorithm_func = std::function<std::optional<qr_tuple>(query_ctx&, const qr_tuple&, param_list&)>;
// function pointer type to a algorithm called in set mode
using set_algorithm_func = std::function<qr_tuple(query_ctx&, result_set&, param_list&)>;
using set_algorithm_func = std::function<std::optional<qr_tuple>(query_ctx&, result_set&, param_list&)>;

static void register_algorithm(const std::string& name, tuple_algorithm_func fptr) {
tuple_algorithms_.insert({ name, fptr });
Expand Down
3 changes: 2 additions & 1 deletion src/storage/graph_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ void graph_db::prepare_files(const std::string &pool_path, const std::string &pf
dict_ = p_make_ptr<dict>(bpool_, prefix);
}

graph_db::graph_db(const std::string &db_name, const std::string& pool_path) : database_name_(db_name) {
graph_db::graph_db(const std::string &db_name, const std::string& pool_path, std::size_t bpool_size) : database_name_(db_name),
bpool_(bpool_size == 0 ? DEFAULT_BUFFER_SIZE : bpool_size) {
pool_path_ = pool_path;
prepare_files(pool_path, db_name);
nodes_ = p_make_ptr<node_list<buffered_vec> >(bpool_, NODE_FILE_ID);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/graph_db.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class graph_db {
/**
* Constructor for a new empty graph database.
*/
graph_db(const std::string &db_name = "", const std::string &pool_path = "");
graph_db(const std::string &db_name = "", const std::string &pool_path = "", std::size_t bpool_size = DEFAULT_BUFFER_SIZE);

/**
* Destructor.
Expand Down
Loading

0 comments on commit fd703fa

Please sign in to comment.