From d9ed8502bcb2b712af304f5b6d60fec28941ae62 Mon Sep 17 00:00:00 2001 From: Max Fierro Date: Wed, 13 Nov 2024 02:10:35 -0800 Subject: [PATCH] defined main idioms for db api; missing schema serialization --- src/database/mod.rs | 35 +---- src/database/vector/mod.rs | 26 +--- src/database/volatile/mod.rs | 151 ++++++++++++++++-- src/database/volatile/resource/manager.rs | 13 +- src/database/volatile/resource/mod.rs | 129 +++++++++++++++- src/database/volatile/transaction/manager.rs | 2 +- src/database/volatile/transaction/mod.rs | 153 +++++++++++-------- src/game/util.rs | 8 +- src/solver/algorithm/strong/acyclic.rs | 47 ++---- 9 files changed, 387 insertions(+), 177 deletions(-) diff --git a/src/database/mod.rs b/src/database/mod.rs index fe0bfd1..851792e 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -102,10 +102,9 @@ pub trait KVStore { /// requires custom handling of what happens when the database is closed; if it /// has data on memory, then it should persist dirty data to ensure consistency /// via [`Drop`]. Database file structure is implementation-specific. -pub trait Persistent +pub trait Persistent where - Self: Tabular + Drop, - T: Table, + Self: Drop, { /// Interprets the contents of a directory at `path` to be the contents of /// a persistent database. Fails if the contents of `path` are unexpected. @@ -119,36 +118,6 @@ where /// already bound to another path, or if `path` is non-empty, or under any /// I/O failure. fn bind(&self, path: &Path) -> Result<()>; - - /// Evict the contents of `table` to disk in a batch operation, potentially - /// leaving cache space for other table's usage. Calling this on all tables - /// in a database should be equivalent to dropping the database reference. - fn flush(&self, table: &mut T) -> Result<()>; -} - -/// Allows for grouping data into [`Table`] implementations, which contain many -/// fixed-length records that share attributes under a single [`Schema`]. This -/// allows consumers of this implementation to have simultaneous references to -/// different mutable tables. -pub trait Tabular -where - T: Table, -{ - /// Creates a new table with `schema`. Returns a unique key that can be used - /// to later acquire the table. - fn insert_table(&self, schema: Schema) -> Result; - - /// Obtains a mutable reference to the [`Table`] with `id`. Fails if no such - /// table exists in the underlying database, or under any I/O failure. - fn get_table_mut(&self, key: SequenceKey) -> Result<&mut T>; - - /// Obtains an immutable reference to the [`Table`] with `id`. Fails if no - /// such table exists in the underlying database, or under any I/O failure. - fn get_table(&self, key: SequenceKey) -> Result<&T>; - - /// Forgets about the association of `id` to any existing table, doing - /// nothing if there is no such table. Fails under any I/O failure. - fn remove_table(&self, table: &mut T) -> Result<()>; } /* TABLE INTERFACE */ diff --git a/src/database/vector/mod.rs b/src/database/vector/mod.rs index db8ecea..0b29ac8 100644 --- a/src/database/vector/mod.rs +++ b/src/database/vector/mod.rs @@ -18,7 +18,7 @@ use std::path::Path; use crate::{ database::model::{Key, SequenceKey, Value}, - database::{self, KVStore, Persistent, Record, Schema, Tabular}, + database::{self, KVStore, Persistent, Record, Schema}, }; /* DEFINITIONS */ @@ -29,7 +29,7 @@ pub struct Table {} /* IMPLEMENTATIONS */ -impl Persistent for Database { +impl Persistent for Database { fn from(path: &Path) -> Result where Self: Sized, @@ -40,10 +40,6 @@ impl Persistent
for Database { fn bind(&self, path: &Path) -> Result<()> { todo!() } - - fn flush(&self, table: &mut Table) -> Result<()> { - todo!() - } } impl Drop for Database { @@ -52,24 +48,6 @@ impl Drop for Database { } } -impl Tabular
for Database { - fn insert_table(&self, schema: Schema) -> Result { - todo!() - } - - fn get_table_mut(&self, key: SequenceKey) -> Result<&mut Table> { - todo!() - } - - fn get_table(&self, key: SequenceKey) -> Result<&Table> { - todo!() - } - - fn remove_table(&self, table: &mut Table) -> Result<()> { - todo!() - } -} - impl database::Table for Table { fn schema(&self) -> &Schema { todo!() diff --git a/src/database/volatile/mod.rs b/src/database/volatile/mod.rs index 16dcd3e..29d9ccf 100644 --- a/src/database/volatile/mod.rs +++ b/src/database/volatile/mod.rs @@ -3,20 +3,26 @@ //! This module provides a trivial database implementation backed by a volatile //! in-memory data structure arrangement. +use anyhow::bail; use anyhow::Result; +use std::marker::PhantomData; use std::sync::Arc; use crate::database::model::SequenceKey; use crate::database::util::KeySequencer; +use crate::database::volatile::resource::Request; use crate::database::volatile::resource::ResourceManager; +use crate::database::volatile::transaction::Transaction; use crate::database::volatile::transaction::TransactionManager; +use crate::database::KVStore; /* RE-EXPORTS */ -pub use resource::Request; pub use resource::Resource; -pub use transaction::Transaction; +pub use transaction::WorkingSet; + +use super::Schema; /* MODULES */ @@ -31,9 +37,22 @@ type ResourceID = SequenceKey; pub struct Database { transaction_manager: Arc, resource_manager: Arc, + directory: Option, sequencer: Arc, } +pub struct TransactionBuilder +where + F: FnOnce(WorkingSet) -> Result, +{ + transaction_manager: Arc, + resource_manager: Arc, + write_requests: Vec, + read_requests: Vec, + function: Option, + _out: PhantomData, +} + #[derive(Default)] struct Sequencer { transaction: KeySequencer, @@ -53,7 +72,8 @@ impl Sequencer { } impl Database { - pub fn new() -> Self { + pub fn new() -> Result { + let directory = None; let sequencer = Arc::new(Sequencer::default()); let resource_manager = ResourceManager::new(sequencer.clone()); let transaction_manager = TransactionManager::new( @@ -61,28 +81,139 @@ impl Database { sequencer.clone(), ); - Self { + let mut db = Self { transaction_manager, resource_manager, + directory, sequencer, + }; + + let directory = db + .start_transaction(Request::empty())? + .create_resource(Self::directory_schema())?; + + db.directory = Some(directory); + Ok(db) + } + + pub fn build_transaction(&self) -> TransactionBuilder + where + F: FnOnce(WorkingSet) -> Result, + { + TransactionBuilder { + transaction_manager: self.transaction_manager.clone(), + resource_manager: self.resource_manager.clone(), + write_requests: Vec::new(), + read_requests: Vec::new(), + function: None, + _out: PhantomData, } } - pub fn create_transaction( + pub fn start_transaction( &self, request: Request, ) -> Result> { - let transaction = self + let txn = self .resource_manager .initialize_transaction( request, self.transaction_manager.clone(), )?; - { - self.transaction_manager - .add_transaction(transaction.clone()); - Ok(transaction) + self.transaction_manager + .add_transaction(txn.clone())?; + + Ok(txn) + } + + pub fn create_resource(&self, schema: Schema) -> Result { + let directory = self + .directory + .expect("Database directory table found uninitialized."); + + let txn = self.start_transaction(Request { + write: vec![self.directory.unwrap()], + read: vec![], + })?; + + let id = txn.create_resource(schema)?; + let mut directory = txn.write(directory)?; + todo!() + // directory.insert(id.into(), schema.into()); + // Ok(id) + } + + pub fn drop_resource(&self, id: ResourceID) -> Result<()> { + let directory = self + .directory + .expect("Database directory table found uninitialized."); + + let txn = self.start_transaction(Request { + write: vec![self.directory.unwrap()], + read: vec![], + })?; + + txn.drop_resource(id)?; + let mut directory = txn.write(directory)?; + todo!() + // directory.remove(id.into()); + // Ok(id) + } + + /* PRIVATE */ + + fn directory_schema() -> Schema { + todo!() + } +} + +impl TransactionBuilder +where + F: FnOnce(WorkingSet) -> Result, +{ + pub fn writing(mut self, id: ResourceID) -> Self { + self.read_requests.push(id); + self + } + + pub fn reading(mut self, id: ResourceID) -> Self { + self.write_requests.push(id); + self + } + + pub fn action(mut self, function: F) -> Self { + self.function = Some(function); + self + } + + pub fn execute(self) -> Result { + if self.read_requests.is_empty() && self.write_requests.is_empty() { + bail!("No resource acquisition requests provided.") } + + let function = if let Some(func) = self.function { + func + } else { + bail!("No actionable provided to transaction builder.") + }; + + let request = Request { + write: self.write_requests, + read: self.read_requests, + }; + + let txn = self + .resource_manager + .initialize_transaction( + request, + self.transaction_manager.clone(), + )?; + + self.transaction_manager + .add_transaction(txn.clone())?; + + let working_set = txn.resources()?; + function(working_set) } } diff --git a/src/database/volatile/resource/manager.rs b/src/database/volatile/resource/manager.rs index 20a579f..6b16f98 100644 --- a/src/database/volatile/resource/manager.rs +++ b/src/database/volatile/resource/manager.rs @@ -36,7 +36,7 @@ pub struct Request { } #[derive(Default)] -struct AccessControl { +pub struct AccessControl { pool: HashMap>>, owners: HashMap, reading: HashMap, @@ -46,6 +46,12 @@ struct AccessControl { /* IMPLEMENTATION */ +impl Request { + pub fn empty() -> Self { + Self::default() + } +} + impl AccessControl { fn conflict(&self, request: &Request) -> bool { request @@ -111,8 +117,8 @@ impl ResourceManager { request: Request, manager: Arc, ) -> Result> { + let mut resources = self.lock()?; loop { - let mut resources = self.lock()?; if request .write .iter() @@ -132,7 +138,8 @@ impl ResourceManager { return Ok(transaction); } - self.signal + resources = self + .signal .wait(resources) .map_err(|_| anyhow!("Resource access lock poisoned."))?; } diff --git a/src/database/volatile/resource/mod.rs b/src/database/volatile/resource/mod.rs index 48f786d..67d6ff2 100644 --- a/src/database/volatile/resource/mod.rs +++ b/src/database/volatile/resource/mod.rs @@ -2,9 +2,23 @@ //! //! TODO +use anyhow::Result; +use bitvec::order::Msb0; +use bitvec::slice::BitSlice; +use bitvec::vec::BitVec; + +use std::collections::HashMap; +use std::hash::{DefaultHasher, Hash, Hasher}; use std::sync::{Arc, Weak}; +use crate::database::model::Key; +use crate::database::model::SequenceKey; +use crate::database::model::Value; use crate::database::volatile::ResourceID; +use crate::database::KVStore; +use crate::database::Record; +use crate::database::Schema; +use crate::database::Table; /* RE-EXPORTS */ @@ -19,21 +33,130 @@ mod manager; pub struct Resource { manager: Weak, + schema: Schema, + data: Content, id: ResourceID, } +struct Content { + indices: HashMap, + current: usize, + storage: BitVec, + size: u64, +} + /* IMPLEMENTATION */ impl Resource { pub(in crate::database::volatile) fn new( manager: Arc, + schema: Schema, id: ResourceID, ) -> Self { - let manager = Arc::downgrade(&manager); - Self { manager, id } + Self { + manager: Arc::downgrade(&manager), + schema, + data: Content { + indices: HashMap::new(), + storage: BitVec::new(), + current: 0, + size: 0, + }, + id, + } } - pub fn id(&self) -> ResourceID { + pub(in crate::database::volatile) fn id(&self) -> ResourceID { self.id } + + /* UTILITIES */ + + fn hash_key(&self, key: &Key) -> u64 { + let mut h = DefaultHasher::new(); + key.hash(&mut h); + h.finish() + } + + fn data_slice( + &self, + from: usize, + to: usize, + ) -> Option<&BitSlice> { + if to > self.data.storage.len() { + None + } else { + Some(&self.data.storage[from..to]) + } + } + + fn index_removed(&self, index: usize) -> bool { + *self + .data + .storage + .get(index + self.schema.size()) + .expect("Out-of-bounds removal check on storage array.") + } + + fn mark_removed(&mut self, index: usize) { + self.data + .storage + .set(index + self.schema.size(), true); + } +} + +impl KVStore for Resource { + fn insert(&mut self, key: &Key, record: &R) -> Result<()> { + let key_hash = self.hash_key(key); + self.data + .indices + .insert(key_hash, self.data.current); + + self.data + .storage + .extend(record.raw()); + + self.data.storage.push(false); + self.data.current += self.schema.size() + 1; + self.data.size += 1; + Ok(()) + } + + fn get(&self, key: &Key) -> Option<&Value> { + let key_hash = self.hash_key(key); + if let Some(&location) = self.data.indices.get(&key_hash) { + if self.index_removed(location) { + return None; + } + self.data_slice(location, location + self.schema.size()) + } else { + None + } + } + + fn remove(&mut self, key: &Key) { + let key_hash = self.hash_key(key); + if let Some(&location) = self.data.indices.get(&key_hash) { + self.mark_removed(location); + self.data.size -= 1; + } + } +} + +impl Table for Resource { + fn schema(&self) -> &Schema { + &self.schema + } + + fn count(&self) -> u64 { + self.data.size + } + + fn bytes(&self) -> u64 { + (self.data.storage.len() as u64) / 8 + } + + fn id(&self) -> SequenceKey { + self.id() + } } diff --git a/src/database/volatile/transaction/manager.rs b/src/database/volatile/transaction/manager.rs index e9239c7..a8cd97a 100644 --- a/src/database/volatile/transaction/manager.rs +++ b/src/database/volatile/transaction/manager.rs @@ -6,7 +6,7 @@ use anyhow::anyhow; use anyhow::Result; use std::collections::HashMap; -use std::sync::{Arc, RwLock, Weak}; +use std::sync::{Arc, RwLock}; use crate::database::volatile::transaction::Transaction; use crate::database::volatile::ResourceManager; diff --git a/src/database/volatile/transaction/mod.rs b/src/database/volatile/transaction/mod.rs index fa3e5c9..a87ff57 100644 --- a/src/database/volatile/transaction/mod.rs +++ b/src/database/volatile/transaction/mod.rs @@ -5,15 +5,14 @@ use anyhow::anyhow; use anyhow::Result; +use std::collections::HashMap; use std::sync::RwLockReadGuard; -use std::{ - collections::HashMap, - sync::{Arc, RwLock, RwLockWriteGuard, Weak}, -}; +use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak}; use crate::database::volatile::resource::Resource; use crate::database::volatile::Request; use crate::database::volatile::{ResourceID, TransactionID}; +use crate::database::Schema; /* RE-EXPORTS */ @@ -25,12 +24,6 @@ mod manager; /* DEFINITIONS */ -#[derive(Default)] -pub struct ResourceHandles { - write: HashMap, - read: HashMap, -} - pub struct Transaction { manager: Weak, handles: ResourceHandles, @@ -38,49 +31,46 @@ pub struct Transaction { id: TransactionID, } -pub struct WriteLock(Arc>); -pub struct ReadLock(Arc>); - -/* IMPLEMENTATION */ - -impl WriteLock { - fn new(lock: Arc>) -> Self { - Self(lock) - } +#[derive(Default)] +pub struct ResourceHandles { + write: HashMap>>, + read: HashMap>>, +} - fn lock(&self) -> Result> { - self.0 - .write() - .map_err(|_| anyhow!("Resource lock poisoned.")) - } +pub struct WorkingSet<'a> { + write: HashMap>, + read: HashMap>, } -impl ReadLock { - fn new(lock: Arc>) -> Self { - Self(lock) +/* IMPLEMENTATION */ + +impl WorkingSet<'_> { + pub fn get_reading(&mut self, id: ResourceID) -> RwLockReadGuard { + self.read.remove(&id).unwrap() } - fn lock(&self) -> Result> { - self.0 - .read() - .map_err(|_| anyhow!("Resource lock poisoned.")) + pub fn get_writing( + &mut self, + id: ResourceID, + ) -> RwLockWriteGuard { + self.write.remove(&id).unwrap() } } impl ResourceHandles { pub fn add_reading(&mut self, id: ResourceID, lock: Arc>) { - self.read - .insert(id, ReadLock::new(lock)); + self.read.insert(id, lock); } pub fn add_writing(&mut self, id: ResourceID, lock: Arc>) { - self.write - .insert(id, WriteLock::new(lock)); + self.write.insert(id, lock); } - fn get_reading(&self, resource: ResourceID) -> Result<&ReadLock> { - if let Some(resource) = self.read.get(&resource) { - Ok(resource) + fn read(&self, resource: ResourceID) -> Result> { + if let Some(ref resource) = self.read.get(&resource) { + Ok(resource + .read() + .map_err(|_| anyhow!("Read on poisoned resource lock."))?) } else { Err(anyhow!( "Attempted read on unacquired resource {}.", @@ -89,9 +79,14 @@ impl ResourceHandles { } } - fn get_writing(&self, resource: ResourceID) -> Result<&WriteLock> { - if let Some(resource) = self.write.get(&resource) { - Ok(resource) + fn write( + &self, + resource: ResourceID, + ) -> Result> { + if let Some(ref resource) = self.write.get(&resource) { + Ok(resource + .write() + .map_err(|_| anyhow!("Write on poisoned resource lock."))?) } else { Err(anyhow!( "Attempted write on unacquired resource {}.", @@ -99,9 +94,51 @@ impl ResourceHandles { )) } } + + pub fn lock_all(&self) -> Result { + let read = self + .read + .iter() + .map(|(&id, l)| { + l.read() + .map(|l| (id, l)) + .map_err(|_| anyhow!("Read on poisoned resource lock.")) + }) + .collect::>>()?; + + let write = self + .write + .iter() + .map(|(&id, l)| { + l.write() + .map(|l| (id, l)) + .map_err(|_| anyhow!("Write on poisoned resource lock.")) + }) + .collect::>>()?; + + Ok(WorkingSet { write, read }) + } } impl Transaction { + pub fn read(&self, id: ResourceID) -> Result> { + self.handles.read(id) + } + + pub fn write(&self, id: ResourceID) -> Result> { + self.handles.write(id) + } + + pub fn resources(&self) -> Result { + self.handles.lock_all() + } + + pub fn id(&self) -> TransactionID { + self.id + } + + /* PROTECTED */ + pub(in crate::database::volatile) fn new( manager: Arc, handles: ResourceHandles, @@ -119,11 +156,14 @@ impl Transaction { Arc::new(transaction) } - pub fn create_resource(&self) -> Result { + pub(in crate::database::volatile) fn create_resource( + &self, + schema: Schema, + ) -> Result { if let Some(manager) = self.manager.upgrade() { let id = manager.sequencer.next_resource()?; let resource_manager = manager.resource_manager.clone(); - let resource = Resource::new(resource_manager.clone(), id); + let resource = Resource::new(resource_manager.clone(), schema, id); resource_manager.add_resource(resource)?; Ok(id) } else { @@ -131,7 +171,10 @@ impl Transaction { } } - pub fn drop_resource(&self, id: ResourceID) -> Result<()> { + pub(in crate::database::volatile) fn drop_resource( + &self, + id: ResourceID, + ) -> Result<()> { if let Some(manager) = self.manager.upgrade() { manager .resource_manager @@ -142,28 +185,6 @@ impl Transaction { Err(anyhow!("Transaction manager was dropped.")) } } - - pub fn reading(&self, id: ResourceID, func: F) -> Result - where - F: FnOnce(&Resource) -> Result, - { - let resource = self.handles.get_reading(id)?; - let guard = resource.lock()?; - func(&guard) - } - - pub fn writing(&self, id: ResourceID, func: F) -> Result - where - F: FnOnce(&mut Resource) -> Result, - { - let resource = self.handles.get_writing(id)?; - let mut guard = resource.lock()?; - func(&mut guard) - } - - pub fn id(&self) -> TransactionID { - self.id - } } impl Drop for Transaction { diff --git a/src/game/util.rs b/src/game/util.rs index f268ef1..aed27d9 100644 --- a/src/game/util.rs +++ b/src/game/util.rs @@ -3,11 +3,11 @@ //! This module provides some common utilities used in the implementation of //! more than a single game. -use std::fmt::Display; -use std::hash::{DefaultHasher, Hasher}; - use anyhow::bail; -use anyhow::Result; +use anyhow::{Context, Result}; + +use std::fmt::Display; +use std::hash::{DefaultHasher, Hash, Hasher}; use crate::database::model::SequenceKey; use crate::game::model::State; diff --git a/src/solver/algorithm/strong/acyclic.rs b/src/solver/algorithm/strong/acyclic.rs index fedcb1a..9a258f8 100644 --- a/src/solver/algorithm/strong/acyclic.rs +++ b/src/solver/algorithm/strong/acyclic.rs @@ -27,42 +27,24 @@ where + Sequential + Identify, { - let db = volatile_database(game, mode) - .context("Failed to initialize volatile database.")?; - - let table = db - .select_table(game.id()) - .context("Failed to select solution set database table.")?; - - backward_induction(table, game) - .context("Failed solving algorithm execution.")?; - - Ok(()) -} - -/* DATABASE INITIALIZATION */ - -/// Initializes a volatile database, creating a table schema according to the -/// solver record layout, initializing a table with that schema, and switching -/// to that table before returning the database handle. -fn volatile_database( - game: &G, - mode: IOMode, -) -> Result -where - G: Sequential + Identify, -{ - let id = game.id(); - let db = volatile::Database::initialize(); - let schema = RecordType::MUR(N) .try_into() .context("Failed to create table schema for solver records.")?; - db.create_table(id, schema) + let db = volatile::Database::new()?; + let solution = db + .create_resource(schema) .context("Failed to create database table for solution set.")?; - Ok(db) + db.build_transaction() + .writing(solution) + .action(|mut working_set| { + let mut t = working_set.get_writing(solution); + backward_induction(&mut (*t), game) + .context("Solving algorithm encountered an error.") + }) + .execute() + .context("Solving algorithm transaction failed.") } /* SOLVING ALGORITHMS */ @@ -81,7 +63,6 @@ where { let mut stack = Vec::new(); stack.push(game.start()); - while let Some(curr) = stack.pop() { let children = game.prograde(curr); let mut buf = RecordBuffer::new(game.players()) @@ -89,7 +70,6 @@ where if db.get(&curr).is_none() { db.insert(&curr, &buf)?; - if game.end(curr) { buf = RecordBuffer::new(game.players()) .context("Failed to create record for end state.")?; @@ -113,13 +93,14 @@ where let mut optimal = buf; let mut max_val = IUtility::MIN; let mut min_rem = Remoteness::MAX; - for state in children { let buf = RecordBuffer::from(db.get(&state).unwrap()) .context("Failed to create record for middle state.")?; + let val = buf .get_utility(game.turn(state)) .context("Failed to get utility from record.")?; + let rem = buf.get_remoteness(); if val > max_val || (val == max_val && rem < min_rem) { max_val = val;