Skip to content

Commit

Permalink
#303 Refactor round storage method to return actions
Browse files Browse the repository at this point in the history
+ try_lock_chunk
  • Loading branch information
kellpossible committed Aug 30, 2021
1 parent c93c1b6 commit 01ee29f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
4 changes: 3 additions & 1 deletion phase1-coordinator/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,9 @@ impl Coordinator {

// Attempt to acquire the chunk lock for participant.
trace!("Preparing to lock chunk {}", chunk_id);
let locked_locators = round.try_lock_chunk(&self.environment, &mut self.storage, chunk_id, &participant)?;
let (locked_locators, actions) =
round.try_lock_chunk(&self.environment, &self.storage, chunk_id, &participant)?;
self.storage.perform_actions(actions)?;
trace!("Participant {} locked chunk {}", participant, chunk_id);

// Add the updated round to storage.
Expand Down
45 changes: 26 additions & 19 deletions phase1-coordinator/src/objects/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
ContributionLocator,
ContributionSignatureLocator,
Disk,
InitializeAction,
Locator,
LocatorPath,
Object,
Expand Down Expand Up @@ -551,10 +552,10 @@ impl Round {
pub(crate) fn try_lock_chunk(
&mut self,
environment: &Environment,
storage: &mut Disk,
storage: &Disk,
chunk_id: u64,
participant: &Participant,
) -> Result<LockedLocators, CoordinatorError> {
) -> Result<(LockedLocators, Vec<StorageAction>), CoordinatorError> {
debug!("{} is attempting to lock chunk {}", participant, chunk_id);

// Check that the participant is holding less than the chunk lock limit.
Expand Down Expand Up @@ -711,38 +712,44 @@ impl Round {
self.chunk_mut(chunk_id)?
.acquire_lock(participant.clone(), expected_num_contributions)?;

let mut actions: Vec<StorageAction> = Vec::new();

// Initialize the next contribution locator.
match participant {
Participant::Contributor(_) => {
// Initialize the unverified response file.
storage.initialize(
Locator::ContributionFile(locked_locators.next_contribution.clone()),
Object::contribution_file_size(environment, chunk_id, false),
)?;
actions.push(StorageAction::Initialize(InitializeAction {
locator: Locator::ContributionFile(locked_locators.next_contribution.clone()),
object_size: Object::contribution_file_size(environment, chunk_id, false),
}));

// Initialize the contribution file signature.
storage.initialize(
Locator::ContributionFileSignature(locked_locators.next_contribution_file_signature.clone()),
Object::contribution_file_signature_size(false),
)?;
actions.push(StorageAction::Initialize(InitializeAction {
locator: Locator::ContributionFileSignature(
locked_locators.next_contribution_file_signature.clone(),
),
object_size: Object::contribution_file_signature_size(false),
}));
}
Participant::Verifier(_) => {
// Initialize the next challenge file.
storage.initialize(
Locator::ContributionFile(locked_locators.next_contribution.clone()),
Object::contribution_file_size(environment, chunk_id, true),
)?;
actions.push(StorageAction::Initialize(InitializeAction {
locator: Locator::ContributionFile(locked_locators.next_contribution.clone()),
object_size: Object::contribution_file_size(environment, chunk_id, true),
}));

// Initialize the contribution file signature.
storage.initialize(
Locator::ContributionFileSignature(locked_locators.next_contribution_file_signature.clone()),
Object::contribution_file_signature_size(true),
)?;
actions.push(StorageAction::Initialize(InitializeAction {
locator: Locator::ContributionFileSignature(
locked_locators.next_contribution_file_signature.clone(),
),
object_size: Object::contribution_file_signature_size(true),
}));
}
};

debug!("{} locked chunk {}", participant, chunk_id);
Ok(locked_locators)
Ok((locked_locators, actions))
}

///
Expand Down
3 changes: 3 additions & 0 deletions phase1-coordinator/src/storage/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,9 @@ impl Disk {
Ok(())
}
}
StorageAction::Initialize(initialize_action) => {
self.initialize(initialize_action.locator, initialize_action.object_size)
}
}
}

Expand Down
18 changes: 13 additions & 5 deletions phase1-coordinator/src/storage/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl From<Locator> for LocatorOrPath {
}
}

/// An action to remove an item from [Storage].
/// An action to remove an item from [Disk].
#[derive(Clone, PartialEq, Debug)]
pub struct RemoveAction {
locator_or_path: LocatorOrPath,
Expand All @@ -301,13 +301,13 @@ impl RemoveAction {
}
}

/// Obtain the location of the item to be removed from [Storage]
/// Obtain the location of the item to be removed from [Disk]
/// as a [LocatorOrPath].
pub fn locator_or_path(&self) -> &LocatorOrPath {
&self.locator_or_path
}

/// Obtain the location of the item to be removed from [Storage]
/// Obtain the location of the item to be removed from [Disk]
/// as a [Locator].
pub fn try_into_locator(self, storage: &Disk) -> Result<Locator, CoordinatorError> {
self.locator_or_path.try_into_locator(storage)
Expand All @@ -318,13 +318,19 @@ impl RemoveAction {
}
}

/// An action to update an item in [Storage].
/// An action to update an item in [Disk].
pub struct UpdateAction {
pub locator: Locator,
pub object: Object,
}

/// An action taken to mutate [Storage], which can be processed by
/// An action to initialize an item in [Disk].
pub struct InitializeAction {
pub locator: Locator,
pub object_size: u64,
}

/// An action taken to mutate [Disk], which can be processed by
/// [Storage::process()].
#[non_exhaustive]
pub enum StorageAction {
Expand All @@ -335,6 +341,8 @@ pub enum StorageAction {
RemoveIfExists(RemoveAction),
/// Update an item in storage.
Update(UpdateAction),
/// Initialize an item in storage.
Initialize(InitializeAction),
}

pub trait StorageLocator {
Expand Down

0 comments on commit 01ee29f

Please sign in to comment.