Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SlotLockManager::lock_all_slots #156

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 36 additions & 13 deletions src/llfs/slot_lock_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ SlotLockManager::~SlotLockManager() noexcept
{
this->halt();

auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

BATT_CHECK(locked->lock_heap_.empty()) << this->debug_info_locked(locked);
}
Expand Down Expand Up @@ -53,7 +53,7 @@ slot_offset_type SlotLockManager::get_lower_bound() const
//
slot_offset_type SlotLockManager::get_upper_bound() const
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};
return locked->upper_bound_;
}

Expand All @@ -68,7 +68,7 @@ StatusOr<slot_offset_type> SlotLockManager::await_lower_bound(slot_offset_type m
//
void SlotLockManager::update_upper_bound(slot_offset_type offset)
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

this->update_upper_bound_locked(locked, /*new_upper_bound=*/offset);
}
Expand All @@ -77,16 +77,39 @@ void SlotLockManager::update_upper_bound(slot_offset_type offset)
//
StatusOr<SlotReadLock> SlotLockManager::lock_slots(const SlotRange& range, const char* holder)
{
(void)holder;
batt::ScopedLock<State> locked{this->state_};

auto locked = this->state_.lock();

if (range.lower_bound < this->lower_bound_.get_value()) {
if (slot_less_than(range.lower_bound, this->lower_bound_.get_value())) {
return Status{
batt::StatusCode::kOutOfRange}; // TODO [tastolfi 2021-10-20] "the requested value
// extends below the current locked slot range"
}

return this->lock_slots_nocheck(locked, range, holder);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<SlotReadLock> SlotLockManager::lock_all_slots(const char* holder)
{
batt::ScopedLock<State> locked{this->state_};

const llfs::slot_offset_type lower_bound = this->lower_bound_.get_value();

return this->lock_slots_nocheck(locked,
SlotRange{
gabrielbornstein marked this conversation as resolved.
Show resolved Hide resolved
.lower_bound = lower_bound,
.upper_bound = lower_bound,
},
holder);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<SlotReadLock> SlotLockManager::lock_slots_nocheck(batt::ScopedLock<State>& locked,
const SlotRange& range,
const char* holder)
{
const usize size_before = locked->lock_heap_.size();

SlotLockHeap::handle_type handle =
Expand All @@ -104,7 +127,7 @@ StatusOr<SlotReadLock> SlotLockManager::lock_slots(const SlotRange& range, const
//
void SlotLockManager::unlock_slots(SlotReadLock* read_lock)
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

const usize size_before = locked->lock_heap_.size();
BATT_CHECK_GT(size_before, 0u);
Expand All @@ -128,7 +151,7 @@ StatusOr<SlotReadLock> SlotLockManager::update_lock(SlotReadLock old_lock,
BATT_CHECK_GE(new_range.lower_bound, old_lock.slot_range().lower_bound)
<< "The locked lower bound must increase monotonically" << BATT_INSPECT(holder);

auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

const usize size_before = locked->lock_heap_.size();

Expand All @@ -147,15 +170,15 @@ StatusOr<SlotReadLock> SlotLockManager::update_lock(SlotReadLock old_lock,
//
std::function<void(std::ostream&)> SlotLockManager::debug_info()
{
auto locked = this->state_.lock();
batt::ScopedLock<State> locked{this->state_};

return this->debug_info_locked(locked);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
std::function<void(std::ostream&)> SlotLockManager::debug_info_locked(
batt::Mutex<State>::Lock& locked)
batt::ScopedLock<State>& locked)
{
Optional<SlotLockRecord> top_copy;
if (!locked->lock_heap_.empty()) {
Expand Down Expand Up @@ -187,7 +210,7 @@ std::function<void(std::ostream&)> SlotLockManager::debug_info_locked(

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void SlotLockManager::update_lower_bound_locked(batt::Mutex<State>::Lock& locked)
void SlotLockManager::update_lower_bound_locked(batt::ScopedLock<State>& locked)
{
if (!locked->lock_heap_.empty()) {
const slot_offset_type trim_pos = get_slot_offset(locked->lock_heap_.top());
Expand All @@ -204,7 +227,7 @@ void SlotLockManager::update_lower_bound_locked(batt::Mutex<State>::Lock& locked

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
void SlotLockManager::update_upper_bound_locked(batt::Mutex<State>::Lock& locked,
void SlotLockManager::update_upper_bound_locked(batt::ScopedLock<State>& locked,
slot_offset_type new_upper_bound)
{
locked->upper_bound_ = slot_max(new_upper_bound, locked->upper_bound_);
Expand Down
105 changes: 60 additions & 45 deletions src/llfs/slot_lock_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,70 +26,80 @@ class SlotLockManager : public SlotReadLock::Sponsor

~SlotLockManager() noexcept;

// Returns true if `halt()` has been called.
//
/** \brief Returns true if `halt()` has been called.
*/
bool is_closed() const;

// Closes all watch objects owned by the lock manager.
//
/** \brief Closes all watch objects owned by the lock manager.
*/
void halt();

// Return the current locked range lower bound.
//
// This value is guaranteed to increase monotonically. A consequence of this invariant is that
// any attempt to lock a slot range that extends below the current locked range will fail.
//
/** \brief Return the current locked range lower bound.
*
* This value is guaranteed to increase monotonically. A consequence of this invariant is that
* any attempt to lock a slot range that extends below the current locked range will fail.
*/
slot_offset_type get_lower_bound() const;

// Return the current locked range upper bound.
//
/** \brief Return the current locked range upper bound.
*/
slot_offset_type get_upper_bound() const;

// Convenience wrapper: SlotRange{this->get_lower_bound(), this->get_upper_bound()}.
//
/** \brief Convenience wrapper: SlotRange{this->get_lower_bound(), this->get_upper_bound()}.
*/
SlotRange get_locked_range() const
{
return SlotRange{this->get_lower_bound(), this->get_upper_bound()};
}

// Blocks the current task until the locked lower bound is at least `min_offset`. This may happen
// either due to updating the upper bound or because a lock is released (see below for details).
//
/** \brief Blocks the current task until the locked lower bound is at least `min_offset`. This
* may happen either due to updating the upper bound or because a lock is released (see below for
* details).
*/
StatusOr<slot_offset_type> await_lower_bound(slot_offset_type min_offset);

// Updates the locked upper bound to be the greater of `offset` and its current value.
//
// If there are no active locks, this has the side-effect of also updating the locked lower bound
// (to maintain the invariant that the locked range is empty when no locks are held).
//
// The upper bound is guranteed to increase monotonically.
//
/** \brief Updates the locked upper bound to be the greater of `offset` and its current value.
*
* If there are no active locks, this has the side-effect of also updating the locked lower bound
* (to maintain the invariant that the locked range is empty when no locks are held).
*
* The upper bound is guranteed to increase monotonically.
*/
void update_upper_bound(slot_offset_type offset);

// Acquire a lock on the given slot range, if it does not extend below the current locked range.
//
// If `range.lower_bound` is lower than the current locked lower bound, the operation fails and an
// error Status is returned.
//
// If `range.upper_bound` is greater than the current locked upper bound, then the locked upper
// bound is set to `range.upper_bound`.
//
// When the last moved copy of the returned SlotReadLock is destroyed, the lock is released and
// the locked interval is shrunk.
//
/** \brief Acquire a lock on the given slot range, if it does not extend below the current locked
* range.
*
* If `range.lower_bound` is lower than the current locked lower bound, the operation fails and an
* error Status is returned.
*
* If `range.upper_bound` is greater than the current locked upper bound, then the locked upper
* bound is set to `range.upper_bound`.
*
* When the last moved copy of the returned SlotReadLock is destroyed, the lock is released and
* the locked interval is shrunk.
*/
StatusOr<SlotReadLock> lock_slots(const SlotRange& range, const char* holder);

// Efficiently updates an existing lock by changing the range to a greater one.
//
/** \brief Atomically reads the slot lower bound and acquires/returns a lock at that
* offset.
*
* This function creates a lock whose lower and upper bounds are equal; thus neither bound (in the
* manager) is modified by acquiring and this lock.
*/
StatusOr<SlotReadLock> lock_all_slots(const char* holder);

/** \brief Efficiently updates an existing lock by changing the range to a greater one.
*/
StatusOr<SlotReadLock> update_lock(SlotReadLock old_lock, const SlotRange& new_range,
const char* holder);

// For debugging
//
/** \brief For debugging.
*/
std::function<void(std::ostream&)> debug_info();

// Clone a lock.
//
/** \brief Clone a lock.
*/
SlotReadLock clone_lock(const SlotReadLock* lock) override;

private:
Expand All @@ -105,16 +115,21 @@ class SlotLockManager : public SlotReadLock::Sponsor
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
};

std::function<void(std::ostream&)> debug_info_locked(batt::Mutex<State>::Lock& locked);
/** \brief Implements the core logic of acquiring a SlotReadLock. Does not check to make sure the
* range is valid; the caller must make sure it is or behavior is undefined!
*/
StatusOr<SlotReadLock> lock_slots_nocheck(batt::ScopedLock<State>& locked, const SlotRange& range,
const char* holder);

std::function<void(std::ostream&)> debug_info_locked(batt::ScopedLock<State>& locked);

void unlock_slots(SlotReadLock*) override;

void update_lower_bound_locked(batt::Mutex<State>::Lock& locked);
void update_lower_bound_locked(batt::ScopedLock<State>& locked);

void update_upper_bound_locked(batt::Mutex<State>::Lock& locked,
slot_offset_type new_upper_bound);
void update_upper_bound_locked(batt::ScopedLock<State>& locked, slot_offset_type new_upper_bound);

batt::Mutex<State> state_;
mutable batt::Mutex<State> state_;
batt::Watch<slot_offset_type> lower_bound_{0};
};

Expand Down