diff --git a/src/bp_file/bufferpool.cpp b/src/bp_file/bufferpool.cpp index e2f33c5..72f7677 100644 --- a/src/bp_file/bufferpool.cpp +++ b/src/bp_file/bufferpool.cpp @@ -17,11 +17,12 @@ * along with Poseidon. If not, see . */ #include +#include "defs.hpp" #include "bufferpool.hpp" #include "spdlog/spdlog.h" bufferpool::bufferpool(std::size_t bsize) : bsize_(bsize), slots_(bsize_), p_reads_(0), l_reads_(0) { - spdlog::debug("bufferpool()"); + spdlog::info("creating bufferpool with {} pages", bsize); buffer_ = new page[bsize_]; slots_.set(); // set everything to 1 == unused } @@ -31,6 +32,32 @@ bufferpool::~bufferpool() { delete [] buffer_; } +void bufferpool::pin_page(paged_file::page_id pid) { + std::unique_lock lock(mutex_); + auto iter = ptable_.find(pid); + if (iter != ptable_.end()) { + iter->second.pinned_ = true; + // if (((pid & 0xF000000000000000) >> 60) == NODE_FILE_ID && (pid & 0xFFFFFFFFFFFFFFF) == 921) + // spdlog::info("pin page 921"); + } + else + spdlog::info("cannot pin page {}", pid); + +} + +void bufferpool::unpin_page(paged_file::page_id pid) { + std::unique_lock lock(mutex_); + auto iter = ptable_.find(pid); + if (iter != ptable_.end()) { + iter->second.pinned_ = false; + // if (((pid & 0xF000000000000000) >> 60) == NODE_FILE_ID && (pid & 0xFFFFFFFFFFFFFFF) == 921) + // spdlog::info("unpin page 921"); + } + else + spdlog::info("cannot unpin page {}", pid); + +} + void bufferpool::register_file(uint8_t file_id, paged_file_ptr pf) { assert(file_id < MAX_PFILES); files_[file_id] = pf; @@ -85,7 +112,7 @@ page *bufferpool::fetch_page(paged_file::page_id pid) { // load page from file auto p = load_page_from_file(pid); // ... add it to the hashtable - ptable_.emplace(pid, buf_slot{ p.first, false, p.second }); + ptable_.emplace(pid, buf_slot{ p.first, false, false, p.second}); // ... and to the LRU list lru_list_.push_back(pid); return p.first; @@ -171,6 +198,10 @@ void bufferpool::purge() { memset(buffer_, 0, sizeof(page) * bsize_); } +bool bufferpool::has_page(paged_file::page_id pid) { + return ptable_.find(pid) != ptable_.end(); +} + bool bufferpool::evict_page() { spdlog::debug("\t---- evict_page..."); std::unique_lock lock(mutex_); @@ -178,11 +209,16 @@ bool bufferpool::evict_page() { auto pid = *it1; auto it2 = ptable_.find(pid); if (it2 != ptable_.end()) { + if (it2->second.pinned_) + continue; if (it2->second.dirty_) { spdlog::info("bufferpool::evict dirty page {}", pid & 0xFFFFFFFFFFFFFFF); // TODO: write WAL log record for UNDO write_page_to_file(pid, it2->second.p_); } + //if (((pid & 0xF000000000000000) >> 60) == NODE_FILE_ID && (pid & 0xFFFFFFFFFFFFFFF) == 921) + // spdlog::info("bufferpool::evict page {} of file {} - pinned: {}", + // pid & 0xFFFFFFFFFFFFFFF, (pid & 0xF000000000000000) >> 60, it2->second.pinned_); slots_.set(it2->second.pos_); memset(it2->second.p_, 0, sizeof(PAGE_SIZE)); ptable_.erase(pid); diff --git a/src/bp_file/bufferpool.hpp b/src/bp_file/bufferpool.hpp index 536aba2..988391e 100644 --- a/src/bp_file/bufferpool.hpp +++ b/src/bp_file/bufferpool.hpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2022 DBIS Group - TU Ilmenau, All Rights Reserved. + * Copyright (C) 2019-2023 DBIS Group - TU Ilmenau, All Rights Reserved. * * This file is part of the Poseidon package. * @@ -114,6 +114,11 @@ class bufferpool { */ double hit_ratio() const; + void pin_page(paged_file::page_id pid); + void unpin_page(paged_file::page_id pid); + + bool has_page(paged_file::page_id pid); + private: void dump(); @@ -127,6 +132,7 @@ class bufferpool { struct buf_slot { page *p_; // a pointer to a page in buffer_ bool dirty_; // a flag indicating that the page is marked as dirty + bool pinned_; // a flag indicating that the page is pinned in the bufferpool std::size_t pos_; // position of the slot in buffer_ }; diff --git a/src/cmds/pcli.cpp b/src/cmds/pcli.cpp index 78ce8b0..8222b5d 100644 --- a/src/cmds/pcli.cpp +++ b/src/cmds/pcli.cpp @@ -423,6 +423,7 @@ std::string check_config_files(const std::string& fname) { int main(int argc, char* argv[]) { std::string db_name, pool_path, query_file, import_path, dot_file, qmode_str, format = "ldbc"; + std::size_t bp_size = 0; std::vector import_files; bool start_shell = false; query_proc::mode qmode = query_proc::Interpret; @@ -441,6 +442,7 @@ int main(int argc, char* argv[]) { ("verbose,v", bool_switch()->default_value(false), "Verbose - show debug output") ("db,d", value(&db_name)->required(), "Database name (required)") ("pool,p", value(&pool_path)->required(), "Path to the PMem/file pool") + ("buffersize,b", value(&bp_size), "Size of the bufferpool (in pages)") ("output,o", value(&dot_file), "Dump the graph to the given file (in DOT format)") ("strict", bool_switch()->default_value(true), "Strict mode - assumes that all columns contain values of the same type") ("delimiter", value(&delim_character)->default_value('|'), "Character delimiter") @@ -480,6 +482,9 @@ int main(int argc, char* argv[]) { if (vm.count("pool")) pool_path = vm["pool"].as(); + if (vm.count("buffersize")) + bp_size = vm["buffersize"].as(); + if (vm.count("delimiter")) delim_character = vm["delimiter"].as(); @@ -528,11 +533,11 @@ int main(int argc, char* argv[]) { if (access(pool_path.c_str(), F_OK) != 0) { spdlog::info("create poolset {}", pool_path); pool = graph_pool::create(pool_path); - graph = pool->create_graph(db_name); + graph = pool->create_graph(db_name, bp_size); } else { spdlog::info("open poolset {}", pool_path); pool = graph_pool::open(pool_path, true); - graph = pool->open_graph(db_name); + graph = pool->open_graph(db_name, bp_size); } if (!import_files.empty()) { diff --git a/src/pool/graph_pool.cpp b/src/pool/graph_pool.cpp index edb6587..40ffac4 100644 --- a/src/pool/graph_pool.cpp +++ b/src/pool/graph_pool.cpp @@ -56,13 +56,13 @@ graph_pool::graph_pool() { graph_pool::~graph_pool() {} -graph_db_ptr graph_pool::create_graph(const std::string& name) { - auto gptr = p_make_ptr(name, path_); +graph_db_ptr graph_pool::create_graph(const std::string& name, std::size_t bpool_size) { + auto gptr = p_make_ptr(name, path_, bpool_size); graphs_.insert({ name, gptr}); return gptr; } -graph_db_ptr graph_pool::open_graph(const std::string& name) { +graph_db_ptr graph_pool::open_graph(const std::string& name, std::size_t bpool_size) { // TODO: check whether graph directory exists std::filesystem::path path_obj(path_); path_obj /=name; @@ -71,7 +71,7 @@ graph_db_ptr graph_pool::open_graph(const std::string& name) { spdlog::info("FATAL: graph '{}' doesn't exist in pool '{}'.", name, path_); throw unknown_db(); } - auto gptr = p_make_ptr(name, path_); + auto gptr = p_make_ptr(name, path_, bpool_size); graphs_.insert({ name, gptr}); return gptr; } @@ -91,4 +91,4 @@ void graph_pool::close() { gp.second->purge_bufferpool(); gp.second->close_files(); } -} \ No newline at end of file +} diff --git a/src/pool/graph_pool.hpp b/src/pool/graph_pool.hpp index e04f16c..813201a 100644 --- a/src/pool/graph_pool.hpp +++ b/src/pool/graph_pool.hpp @@ -60,13 +60,13 @@ class graph_pool { /** * Create a new graph with the given name. */ - graph_db_ptr create_graph(const std::string& name); + graph_db_ptr create_graph(const std::string& name, std::size_t bpool_size = DEFAULT_BUFFER_SIZE); /** * Open an existing graph with the given name. If no graph * exists with this name an exception is raised. */ - graph_db_ptr open_graph(const std::string& name); + graph_db_ptr open_graph(const std::string& name, std::size_t bpool_size = DEFAULT_BUFFER_SIZE); void drop_graph(const std::string& name); diff --git a/src/py/py_poseidon.cpp b/src/py/py_poseidon.cpp index 2199a70..b45afa7 100644 --- a/src/py/py_poseidon.cpp +++ b/src/py/py_poseidon.cpp @@ -49,8 +49,8 @@ PYBIND11_MODULE(poseidon, m) { "Creates a new graph pool of the given size."); py::class_(m, "GraphPool") - .def("open_graph", &graph_pool::open_graph, py::arg("name"), "Opens the graph with the given name.") - .def("create_graph", &graph_pool::create_graph, py::arg("name"), "Creates a new graph with the given name.") + .def("open_graph", &graph_pool::open_graph, py::arg("name"), py::arg("buffersize"), "Opens the graph with the given name.") + .def("create_graph", &graph_pool::create_graph, py::arg("name"), py::arg("buffersize"), "Creates a new graph with the given name.") .def("drop_graph", &graph_pool::drop_graph, py::arg("name"), "Deletes the given graph.") .def("close", &graph_pool::close, "Closes the graph pool."); diff --git a/src/query/plan_op/algorithms.cpp b/src/query/plan_op/algorithms.cpp index f89e20f..83325ea 100644 --- a/src/query/plan_op/algorithms.cpp +++ b/src/query/plan_op/algorithms.cpp @@ -20,7 +20,7 @@ #include #include "qop_builtins.hpp" -qr_tuple num_links(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) { +std::optional num_links(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) { int in_links = 0, out_links = 0; auto n = qv_get_node(v.back()); @@ -29,13 +29,13 @@ qr_tuple num_links(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& qr_tuple res(2); res[0] = qv_(in_links); res[1] = qv_(out_links); - return res; + return std::make_optional(res); } -#if 0 + // GroupBy([$3.authorid:uint64], [count($0.tweetid:uint64)], Expand(OUT, 'Author', ForeachRelationship(FROM, 'AUTHOR', Algorithm([OldestTweet, TUPLE], Limit(10, NodeScan('Website')))))) // Expand(OUT, 'Author', ForeachRelationship(FROM, 'AUTHOR', Algorithm([OldestTweet, TUPLE], Limit(10, NodeScan('Website'))))) // Algorithm([OldestTweet, TUPLE], Limit(10, NodeScan('Website'))) -qr_tuple oldest_tweet(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) { +std::optional oldest_tweet(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_list& args) { auto n = qv_get_node(v.back()); auto tweet_label = ctx.gdb_->get_code("Tweet"); auto creation_label = ctx.gdb_->get_code("created_at"); @@ -47,24 +47,27 @@ qr_tuple oldest_tweet(query_ctx& ctx, const qr_tuple& v, algorithm_op::param_lis if (tweet.node_label == tweet_label) { auto pval = ctx.gdb_->get_property_value(tweet, creation_label); auto date_str = std::string(ctx.gdb_->get_string(pval.template get())); - std::cout << "Tweet - created at: " << date_str << std::endl; - auto dval = boost::posix_time::from_iso_extended_string(date_str); - std::cout << "Tweet - created at: " << dval << std::endl; + //std::cout << "Tweet - created at: " << date_str << std::endl; + //auto dval = boost::posix_time::from_iso_extended_string(date_str); + //std::cout << "Tweet - created at: " << dval << std::endl; if (date_str < oldest_date) { oldest_date = date_str; oldest_tweet = &tweet; } } }); - qr_tuple res(1); - res[0] = oldest_tweet; - return res; + if (oldest_tweet != nullptr) { + qr_tuple res(1); + res[0] = oldest_tweet; + return std::make_optional(res); + } + else { + return std::make_optional(); + } } -#endif + std::map algorithm_op::tuple_algorithms_ = { -#if 0 { std::string("OldestTweet"), oldest_tweet }, -#endif { std::string("NumLinks"), num_links } }; diff --git a/src/query/plan_op/qop_algorithm.cpp b/src/query/plan_op/qop_algorithm.cpp index 8639b02..21103c6 100644 --- a/src/query/plan_op/qop_algorithm.cpp +++ b/src/query/plan_op/qop_algorithm.cpp @@ -52,9 +52,15 @@ void algorithm_op::process(query_ctx &ctx, const qr_tuple &v) { PROF_PRE; if (call_mode_ == m_tuple) { auto res = tuple_func_ptr_(ctx, v, args_); - auto v2 = concat(v, res); - consume_(ctx, v2); - PROF_POST(1); + if (res) { + auto v2 = concat(v, res.value()); + consume_(ctx, v2); + PROF_POST(1); + } + else { + PROF_POST(0); + return; + } } else { input_.append(v); @@ -66,7 +72,8 @@ void algorithm_op::finish(query_ctx &ctx) { PROF_PRE0; if (call_mode_ == m_set) { auto res = set_func_ptr_(ctx, input_, args_); - consume_(ctx, res); + if (res) + consume_(ctx, res.value()); } finish_(ctx); PROF_POST(call_mode_ == m_set ? 1 : 0); diff --git a/src/query/plan_op/qop_algorithm.hpp b/src/query/plan_op/qop_algorithm.hpp index 26a157b..6d4ebe8 100644 --- a/src/query/plan_op/qop_algorithm.hpp +++ b/src/query/plan_op/qop_algorithm.hpp @@ -35,9 +35,9 @@ struct algorithm_op : public qop, public std::enable_shared_from_this; // parameter list for invoking the algorithm // function pointer type to a algorithm called in tuple mode - using tuple_algorithm_func = std::function; + using tuple_algorithm_func = std::function(query_ctx&, const qr_tuple&, param_list&)>; // function pointer type to a algorithm called in set mode - using set_algorithm_func = std::function; + using set_algorithm_func = std::function(query_ctx&, result_set&, param_list&)>; static void register_algorithm(const std::string& name, tuple_algorithm_func fptr) { tuple_algorithms_.insert({ name, fptr }); diff --git a/src/storage/graph_db.cpp b/src/storage/graph_db.cpp index 873f739..f8b4f03 100644 --- a/src/storage/graph_db.cpp +++ b/src/storage/graph_db.cpp @@ -67,7 +67,8 @@ void graph_db::prepare_files(const std::string &pool_path, const std::string &pf dict_ = p_make_ptr(bpool_, prefix); } -graph_db::graph_db(const std::string &db_name, const std::string& pool_path) : database_name_(db_name) { +graph_db::graph_db(const std::string &db_name, const std::string& pool_path, std::size_t bpool_size) : database_name_(db_name), + bpool_(bpool_size == 0 ? DEFAULT_BUFFER_SIZE : bpool_size) { pool_path_ = pool_path; prepare_files(pool_path, db_name); nodes_ = p_make_ptr >(bpool_, NODE_FILE_ID); diff --git a/src/storage/graph_db.hpp b/src/storage/graph_db.hpp index dbc791a..27cdb4b 100644 --- a/src/storage/graph_db.hpp +++ b/src/storage/graph_db.hpp @@ -63,7 +63,7 @@ class graph_db { /** * Constructor for a new empty graph database. */ - graph_db(const std::string &db_name = "", const std::string &pool_path = ""); + graph_db(const std::string &db_name = "", const std::string &pool_path = "", std::size_t bpool_size = DEFAULT_BUFFER_SIZE); /** * Destructor. diff --git a/src/vec/buffered_vec.hpp b/src/vec/buffered_vec.hpp index 8627493..40c1bfd 100644 --- a/src/vec/buffered_vec.hpp +++ b/src/vec/buffered_vec.hpp @@ -4,7 +4,7 @@ */ /* - * Copyright (C) 2019-2022 DBIS Group - TU Ilmenau, All Rights Reserved. + * Copyright (C) 2019-2023 DBIS Group - TU Ilmenau, All Rights Reserved. * * This file is part of the Poseidon package. * @@ -167,8 +167,11 @@ class buffered_vec { iter(buffered_vec& bv, paged_file::page_id pid, paged_file::page_id num, offset_t p = 0) : bvec_(bv), curr_pid_(pid), npages_(num), cptr_(nullptr), pos_(p) { if (pid == 0) - return; + return; cptr_ = bvec_.load_chunk(pid); + if (cptr_ != nullptr) + bvec_.pin(pid); + // make sure the element at pos_ isn't deleted if (cptr_ != nullptr) { while (pos_ < num_entries) { @@ -184,6 +187,8 @@ class buffered_vec { } } + ~iter() { if (cptr_ != nullptr) bvec_.unpin(curr_pid_); } + bool operator!=(const iter &other) const { return cptr_ != other.cptr_ || pos_ != other.pos_; } @@ -196,7 +201,10 @@ class buffered_vec { iter &operator++() { do { if (++pos_ == num_entries) { + bvec_.unpin(curr_pid_); cptr_ = bvec_.load_chunk(++curr_pid_); + if (cptr_ != nullptr) + bvec_.pin(curr_pid_); pos_ = 0; } // make sure, cptr_[pos_] is valid @@ -217,8 +225,11 @@ class buffered_vec { : bvec_(v), range_(first, last), current_pid_(first), cptr_(nullptr), pos_(pos) { cptr_ = bvec_.load_chunk(current_pid_); + if (cptr_ != nullptr) + bvec_.pin(current_pid_); } + ~range_iter() { /*if (cptr_ != nullptr) bvec_.unpin(current_pid_);*/ } operator bool() const { return current_pid_ <= range_.second && cptr_; } T &operator*() const { return cptr_->data_[pos_]; } @@ -226,7 +237,10 @@ class buffered_vec { range_iter &operator++() { do { if (++pos_ == num_entries) { + bvec_.unpin(current_pid_); cptr_ = bvec_.load_chunk(++current_pid_); + if (cptr_ != nullptr) + bvec_.pin(current_pid_); pos_ = 0; } // make sure, cptr_[pos_] is valid @@ -304,6 +318,14 @@ class buffered_vec { return new range_iter(*this, first_chunk + 1, last_chunk + 1, start_pos); } + void pin(paged_file::page_id pid) { + bpool_.pin_page(pid | file_mask_); + } + + void unpin(paged_file::page_id pid) { + bpool_.unpin_page(pid | file_mask_); + } + /** * Store the given record at position idx (note: move semantics) and mark this * slot as used. diff --git a/test/test_pyposeidon.py b/test/test_pyposeidon.py index 968d65b..e42e558 100644 --- a/test/test_pyposeidon.py +++ b/test/test_pyposeidon.py @@ -27,7 +27,7 @@ def teardown_function(): def test_create_pool(): p = poseidon.create_pool(path, 1024 * 1024 * 80) assert p != None - g = p.create_graph("my_py_graph1") + g = p.create_graph("my_py_graph1", 1000) assert g != None p.drop_graph("my_py_graph1") p.close() @@ -37,13 +37,13 @@ def test_create_pool(): # p = poseidon.create_pool(path, 1024 * 1024 * 80) # assert p != None # with pytest.raises(RuntimeError): -# g = p.open_graph("my_py_graph2") +# g = p.open_graph("my_py_graph2", 1000) # p.close() def test_create_node(): p = poseidon.create_pool(path, 1024 * 1024 * 80) assert p != None - g = p.create_graph("my_py_graph3") + g = p.create_graph("my_py_graph3", 1000) assert g != None g.begin() a1 = g.create_node("Actor", { "name": "John David Washington"})