Skip to content

Commit

Permalink
defined main idioms for db api; missing schema serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
maxfierrog committed Nov 13, 2024
1 parent 35dc426 commit d9ed850
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 177 deletions.
35 changes: 2 additions & 33 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ pub trait KVStore {
/// requires custom handling of what happens when the database is closed; if it
/// has data on memory, then it should persist dirty data to ensure consistency
/// via [`Drop`]. Database file structure is implementation-specific.
pub trait Persistent<T>
pub trait Persistent
where
Self: Tabular<T> + Drop,
T: Table,
Self: Drop,
{
/// Interprets the contents of a directory at `path` to be the contents of
/// a persistent database. Fails if the contents of `path` are unexpected.
Expand All @@ -119,36 +118,6 @@ where
/// already bound to another path, or if `path` is non-empty, or under any
/// I/O failure.
fn bind(&self, path: &Path) -> Result<()>;

/// Evict the contents of `table` to disk in a batch operation, potentially
/// leaving cache space for other table's usage. Calling this on all tables
/// in a database should be equivalent to dropping the database reference.
fn flush(&self, table: &mut T) -> Result<()>;
}

/// Allows for grouping data into [`Table`] implementations, which contain many
/// fixed-length records that share attributes under a single [`Schema`]. This
/// allows consumers of this implementation to have simultaneous references to
/// different mutable tables.
pub trait Tabular<T>
where
T: Table,
{
/// Creates a new table with `schema`. Returns a unique key that can be used
/// to later acquire the table.
fn insert_table(&self, schema: Schema) -> Result<SequenceKey>;

/// Obtains a mutable reference to the [`Table`] with `id`. Fails if no such
/// table exists in the underlying database, or under any I/O failure.
fn get_table_mut(&self, key: SequenceKey) -> Result<&mut T>;

/// Obtains an immutable reference to the [`Table`] with `id`. Fails if no
/// such table exists in the underlying database, or under any I/O failure.
fn get_table(&self, key: SequenceKey) -> Result<&T>;

/// Forgets about the association of `id` to any existing table, doing
/// nothing if there is no such table. Fails under any I/O failure.
fn remove_table(&self, table: &mut T) -> Result<()>;
}

/* TABLE INTERFACE */
Expand Down
26 changes: 2 additions & 24 deletions src/database/vector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::path::Path;

use crate::{
database::model::{Key, SequenceKey, Value},
database::{self, KVStore, Persistent, Record, Schema, Tabular},
database::{self, KVStore, Persistent, Record, Schema},
};

/* DEFINITIONS */
Expand All @@ -29,7 +29,7 @@ pub struct Table {}

/* IMPLEMENTATIONS */

impl Persistent<Table> for Database {
impl Persistent for Database {
fn from(path: &Path) -> Result<Self>
where
Self: Sized,
Expand All @@ -40,10 +40,6 @@ impl Persistent<Table> for Database {
fn bind(&self, path: &Path) -> Result<()> {
todo!()
}

fn flush(&self, table: &mut Table) -> Result<()> {
todo!()
}
}

impl Drop for Database {
Expand All @@ -52,24 +48,6 @@ impl Drop for Database {
}
}

impl Tabular<Table> for Database {
fn insert_table(&self, schema: Schema) -> Result<SequenceKey> {
todo!()
}

fn get_table_mut(&self, key: SequenceKey) -> Result<&mut Table> {
todo!()
}

fn get_table(&self, key: SequenceKey) -> Result<&Table> {
todo!()
}

fn remove_table(&self, table: &mut Table) -> Result<()> {
todo!()
}
}

impl database::Table for Table {
fn schema(&self) -> &Schema {
todo!()
Expand Down
151 changes: 141 additions & 10 deletions src/database/volatile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@
//! This module provides a trivial database implementation backed by a volatile
//! in-memory data structure arrangement.
use anyhow::bail;
use anyhow::Result;

use std::marker::PhantomData;
use std::sync::Arc;

use crate::database::model::SequenceKey;
use crate::database::util::KeySequencer;
use crate::database::volatile::resource::Request;
use crate::database::volatile::resource::ResourceManager;
use crate::database::volatile::transaction::Transaction;
use crate::database::volatile::transaction::TransactionManager;
use crate::database::KVStore;

/* RE-EXPORTS */

pub use resource::Request;
pub use resource::Resource;
pub use transaction::Transaction;
pub use transaction::WorkingSet;

use super::Schema;

/* MODULES */

Expand All @@ -31,9 +37,22 @@ type ResourceID = SequenceKey;
pub struct Database {
transaction_manager: Arc<TransactionManager>,
resource_manager: Arc<ResourceManager>,
directory: Option<ResourceID>,
sequencer: Arc<Sequencer>,
}

pub struct TransactionBuilder<F, O>
where
F: FnOnce(WorkingSet) -> Result<O>,
{
transaction_manager: Arc<TransactionManager>,
resource_manager: Arc<ResourceManager>,
write_requests: Vec<ResourceID>,
read_requests: Vec<ResourceID>,
function: Option<F>,
_out: PhantomData<O>,
}

#[derive(Default)]
struct Sequencer {
transaction: KeySequencer,
Expand All @@ -53,36 +72,148 @@ impl Sequencer {
}

impl Database {
pub fn new() -> Self {
pub fn new() -> Result<Self> {
let directory = None;
let sequencer = Arc::new(Sequencer::default());
let resource_manager = ResourceManager::new(sequencer.clone());
let transaction_manager = TransactionManager::new(
resource_manager.clone(),
sequencer.clone(),
);

Self {
let mut db = Self {
transaction_manager,
resource_manager,
directory,
sequencer,
};

let directory = db
.start_transaction(Request::empty())?
.create_resource(Self::directory_schema())?;

db.directory = Some(directory);
Ok(db)
}

pub fn build_transaction<F, O>(&self) -> TransactionBuilder<F, O>
where
F: FnOnce(WorkingSet) -> Result<O>,
{
TransactionBuilder {
transaction_manager: self.transaction_manager.clone(),
resource_manager: self.resource_manager.clone(),
write_requests: Vec::new(),
read_requests: Vec::new(),
function: None,
_out: PhantomData,
}
}

pub fn create_transaction(
pub fn start_transaction(
&self,
request: Request,
) -> Result<Arc<Transaction>> {
let transaction = self
let txn = self
.resource_manager
.initialize_transaction(
request,
self.transaction_manager.clone(),
)?;

{
self.transaction_manager
.add_transaction(transaction.clone());
Ok(transaction)
self.transaction_manager
.add_transaction(txn.clone())?;

Ok(txn)
}

pub fn create_resource(&self, schema: Schema) -> Result<ResourceID> {
let directory = self
.directory
.expect("Database directory table found uninitialized.");

let txn = self.start_transaction(Request {
write: vec![self.directory.unwrap()],
read: vec![],
})?;

let id = txn.create_resource(schema)?;
let mut directory = txn.write(directory)?;
todo!()
// directory.insert(id.into(), schema.into());
// Ok(id)
}

pub fn drop_resource(&self, id: ResourceID) -> Result<()> {
let directory = self
.directory
.expect("Database directory table found uninitialized.");

let txn = self.start_transaction(Request {
write: vec![self.directory.unwrap()],
read: vec![],
})?;

txn.drop_resource(id)?;
let mut directory = txn.write(directory)?;
todo!()
// directory.remove(id.into());
// Ok(id)
}

/* PRIVATE */

fn directory_schema() -> Schema {
todo!()
}
}

impl<F, O> TransactionBuilder<F, O>
where
F: FnOnce(WorkingSet) -> Result<O>,
{
pub fn writing(mut self, id: ResourceID) -> Self {
self.read_requests.push(id);
self
}

pub fn reading(mut self, id: ResourceID) -> Self {
self.write_requests.push(id);
self
}

pub fn action(mut self, function: F) -> Self {
self.function = Some(function);
self
}

pub fn execute(self) -> Result<O> {
if self.read_requests.is_empty() && self.write_requests.is_empty() {
bail!("No resource acquisition requests provided.")
}

let function = if let Some(func) = self.function {
func
} else {
bail!("No actionable provided to transaction builder.")
};

let request = Request {
write: self.write_requests,
read: self.read_requests,
};

let txn = self
.resource_manager
.initialize_transaction(
request,
self.transaction_manager.clone(),
)?;

self.transaction_manager
.add_transaction(txn.clone())?;

let working_set = txn.resources()?;
function(working_set)
}
}
13 changes: 10 additions & 3 deletions src/database/volatile/resource/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Request {
}

#[derive(Default)]
struct AccessControl {
pub struct AccessControl {
pool: HashMap<ResourceID, Arc<RwLock<Resource>>>,
owners: HashMap<ResourceID, TransactionID>,
reading: HashMap<ResourceID, u32>,
Expand All @@ -46,6 +46,12 @@ struct AccessControl {

/* IMPLEMENTATION */

impl Request {
pub fn empty() -> Self {
Self::default()
}
}

impl AccessControl {
fn conflict(&self, request: &Request) -> bool {
request
Expand Down Expand Up @@ -111,8 +117,8 @@ impl ResourceManager {
request: Request,
manager: Arc<TransactionManager>,
) -> Result<Arc<Transaction>> {
let mut resources = self.lock()?;
loop {
let mut resources = self.lock()?;
if request
.write
.iter()
Expand All @@ -132,7 +138,8 @@ impl ResourceManager {
return Ok(transaction);
}

self.signal
resources = self
.signal
.wait(resources)
.map_err(|_| anyhow!("Resource access lock poisoned."))?;
}
Expand Down
Loading

0 comments on commit d9ed850

Please sign in to comment.