diff --git a/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result b/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result new file mode 100644 index 0000000000000..880e5229d3ce5 --- /dev/null +++ b/mysql-test/suite/innodb/r/avoid_deadlock_with_blocked.result @@ -0,0 +1,129 @@ +connect stop_purge,localhost,root; +START TRANSACTION WITH CONSISTENT SNAPSHOT; +connect con1,localhost,root,,; +connect con2,localhost,root,,; +connect con3,localhost,root,,; +connection default; +CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0; +INSERT INTO t1 (id) VALUES (1); +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +id +1 +COMMIT; +connection con2; +id +1 +COMMIT; +connection con1; +BEGIN; +SELECT * FROM t1 WHERE id=1 FOR UPDATE; +id +1 +connection con2; +BEGIN; +SET DEBUG_SYNC = 'lock_wait_start SIGNAL con2_will_wait'; +SELECT * FROM t1 LOCK IN SHARE MODE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +INSERT INTO t1 VALUES (0); +ROLLBACK; +connection con2; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +COMMIT; +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; +id +1 +connection default; +connection con3; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SET DEBUG_SYNC = 'lock_wait_start SIGNAL con1_will_wait'; +INSERT INTO t1 VALUES (0); +connection con2; +SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; +COMMIT; +connection con1; +ROLLBACK; +connection con3; +ERROR 40001: Deadlock found when trying to get lock; try restarting transaction +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +BEGIN; +SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; +id +1 +connection default; +connection con3; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con1_will_wait'; +SELECT * FROM t1 WHERE id=1 FOR UPDATE; +connection con2; +SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; +COMMIT; +connection con1; +id +1 +COMMIT; +connection con3; +id +1 +COMMIT; +connection con1; +BEGIN; +SELECT * FROM t1 LOCK IN SHARE MODE; +id +1 +connection con2; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con3; +SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; +SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; +SELECT * FROM t1 FOR UPDATE; +connection con1; +SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; +SELECT * FROM t1 WHERE id=1 FOR UPDATE; +id +1 +COMMIT; +connection con2; +id +1 +COMMIT; +connection con3; +id +1 +COMMIT; +connection default; +disconnect con1; +disconnect con2; +disconnect con3; +disconnect stop_purge; +DROP TABLE t1; diff --git a/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test b/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test new file mode 100644 index 0000000000000..00f7ece9de5f1 --- /dev/null +++ b/mysql-test/suite/innodb/t/avoid_deadlock_with_blocked.test @@ -0,0 +1,194 @@ +--source include/have_innodb.inc +--source include/have_debug_sync.inc +--source include/count_sessions.inc + +--disable_query_log +call mtr.add_suppression("InnoDB: Transaction was aborted due to "); +--enable_query_log + +connect stop_purge,localhost,root; +START TRANSACTION WITH CONSISTENT SNAPSHOT; + +--connect (con1,localhost,root,,) +--connect (con2,localhost,root,,) +--connect (con3,localhost,root,,) + +--connection default +CREATE TABLE t1 (id INT PRIMARY KEY) ENGINE=InnoDB STATS_PERSISTENT=0; +INSERT INTO t1 (id) VALUES (1); +# Simplest scenario: +# , +# , , +# Before MDEV-34877: +# , , +# After MDEV-34877: +# , , +# Expected: instead of deadlocking, the con1's request should ingore con2's + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + SELECT * FROM t1 FOR UPDATE; + COMMIT; + +--connection con2 + --reap + COMMIT; + +# A variant of the above scenario: +# , +# , , +# , , +# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them +--connection con1 + BEGIN; + SELECT * FROM t1 WHERE id=1 FOR UPDATE; + +--connection con2 + BEGIN; + SET DEBUG_SYNC = 'lock_wait_start SIGNAL con2_will_wait'; + --send SELECT * FROM t1 LOCK IN SHARE MODE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + INSERT INTO t1 VALUES (0); + ROLLBACK; + +--connection con2 + --error ER_LOCK_DEADLOCK + --reap + COMMIT; + +# More complicated scenario: +# , +# , , +# , , +# , , , +# , , +# Expected: a deadlock, as INSERT INTENTION should not overtake locks on gap, to not slice them + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; + +--connection default + +--connection con3 + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SET DEBUG_SYNC = 'lock_wait_start SIGNAL con1_will_wait'; + --send INSERT INTO t1 VALUES (0) + +--connection con2 + SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; + COMMIT; + +--connection con1 + --reap + ROLLBACK; + + +--connection con3 + --error ER_LOCK_DEADLOCK + --reap + +# More complicated scenario. +# , +# , , +# , , +# , , , +# Before MDEV-34877: +# , , +# After MDEV-34877: +# , , + + +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + BEGIN; + SELECT * FROM t1 WHERE id=1 LOCK IN SHARE MODE; + +--connection default + +--connection con3 + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con1_will_wait'; + --send SELECT * FROM t1 WHERE id=1 FOR UPDATE + +--connection con2 + SET DEBUG_SYNC = 'now WAIT_FOR con1_will_wait'; + COMMIT; + +--connection con1 + --reap + COMMIT; + +--connection con3 + --reap + COMMIT; + +# A secenario, where con1 has to bypass two transactions: +# +# +# +# Before MDEV-34877: +# +# After MDEV-34877: +# +--connection con1 + BEGIN; + SELECT * FROM t1 LOCK IN SHARE MODE; + +--connection con2 + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con2_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con3 + SET DEBUG_SYNC = 'now WAIT_FOR con2_will_wait'; + SET DEBUG_SYNC = 'lock_wait_before_suspend SIGNAL con3_will_wait'; + --send SELECT * FROM t1 FOR UPDATE + +--connection con1 + SET DEBUG_SYNC = 'now WAIT_FOR con3_will_wait'; + SELECT * FROM t1 WHERE id=1 FOR UPDATE; + COMMIT; + +--connection con2 + --reap + COMMIT; + +--connection con3 + --reap + COMMIT; + +--connection default +--disconnect con1 +--disconnect con2 +--disconnect con3 +--disconnect stop_purge + +DROP TABLE t1; + +--source include/wait_until_count_sessions.inc diff --git a/storage/innobase/include/hash0hash.h b/storage/innobase/include/hash0hash.h index 4d45c0bf7727c..c85340245932d 100644 --- a/storage/innobase/include/hash0hash.h +++ b/storage/innobase/include/hash0hash.h @@ -111,6 +111,38 @@ struct hash_cell_t { remove(search(next, [&element](const T *p){return p==&element;}), next); } + + /** Delete an element. + @tparam T type of the element + @param remove the being-removed element + @param next the next-element pointer in T */ + template + void remove(const T &remove, T *T::*next) + { + T *prev; + for (prev= static_cast(node); prev && prev->*next != &remove; + prev= prev->*next); + ut_a(prev); + prev->*next= remove.*next; + } + + /** Insert an element after another. + @tparam T type of the element + @param after the element after which to insert + @param insert the being-inserted element + @param next the next-element pointer in T */ + template void insert_after(T &after, T &insert, T *T::*next) + { +#ifdef UNIV_DEBUG + for (const T *c= static_cast(node); c; c= c->*next) + if (c == &after) + goto found; + ut_error; + found: +#endif + insert.*next= after.*next; + after.*next= &insert; + } }; /** Hash table with singly-linked overflow lists */ diff --git a/storage/innobase/include/lock0lock.h b/storage/innobase/include/lock0lock.h index c996e5f8227c4..b1baf3b6d0974 100644 --- a/storage/innobase/include/lock0lock.h +++ b/storage/innobase/include/lock0lock.h @@ -52,6 +52,18 @@ namespace Deadlock enum report { REPORT_OFF, REPORT_BASIC, REPORT_FULL }; } +/** Conflicting lock info */ +struct conflicting_lock_info { + /** Conflicting lock */ + const lock_t *conflicting; + /** If some lock was bypassed, points to the lock after which bypassing + lock must be inserted into linked list of locks for the certain cell of + record locks hash table. */ + lock_t *insert_after; + /** Bypassed lock */ + ut_d(const lock_t *bypassed;) +}; + /*********************************************************************//** Gets the heap_no of the smallest user record on a page. @return heap_no of smallest user record, or PAGE_HEAP_NO_SUPREMUM */ @@ -1151,25 +1163,6 @@ struct TMTrxGuard #endif }; -/*********************************************************************//** -Creates a new record lock and inserts it to the lock queue. Does NOT check -for deadlocks or lock compatibility! -@return created lock */ -UNIV_INLINE -lock_t* -lock_rec_create( -/*============*/ - lock_t* c_lock, /*!< conflicting lock */ - unsigned type_mode,/*!< in: lock mode and wait flag */ - const buf_block_t* block, /*!< in: buffer block containing - the record */ - ulint heap_no,/*!< in: heap number of the record */ - dict_index_t* index, /*!< in: index of record */ - trx_t* trx, /*!< in,out: transaction */ - bool caller_owns_trx_mutex); - /*!< in: true if caller owns - trx mutex */ - /** Remove a record lock request, waiting or granted, on a discarded page @param in_lock lock object @param cell hash table cell containing in_lock */ @@ -1177,7 +1170,7 @@ void lock_rec_discard(lock_t *in_lock, hash_cell_t &cell) noexcept; /** Create a new record lock and inserts it to the lock queue, without checking for deadlocks or conflicts. -@param[in] c_lock conflicting lock, or NULL +@param c_lock_info conflicting lock info @param[in] type_mode lock mode and wait flag @param[in] page_id index page number @param[in] page R-tree index page, or NULL @@ -1188,7 +1181,7 @@ without checking for deadlocks or conflicts. @return created lock */ lock_t* lock_rec_create_low( - lock_t* c_lock, + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t page_id, const page_t* page, @@ -1199,7 +1192,7 @@ lock_rec_create_low( /** Enqueue a waiting request for a lock which cannot be granted immediately. Check for deadlocks. -@param[in] c_lock conflicting lock +@param c_lock_info conflicting lock info @param[in] type_mode the requested lock mode (LOCK_S or LOCK_X) possibly ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed with @@ -1217,7 +1210,7 @@ Check for deadlocks. @retval DB_DEADLOCK if this transaction was chosen as the victim */ dberr_t lock_rec_enqueue_waiting( - lock_t* c_lock, + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t id, const page_t* page, diff --git a/storage/innobase/include/lock0lock.inl b/storage/innobase/include/lock0lock.inl index 1b9255ffb3e0a..37db4062e8c21 100644 --- a/storage/innobase/include/lock0lock.inl +++ b/storage/innobase/include/lock0lock.inl @@ -51,28 +51,3 @@ lock_get_min_heap_no( FALSE))); } } - -/*********************************************************************//** -Creates a new record lock and inserts it to the lock queue. Does NOT check -for deadlocks or lock compatibility! -@return created lock */ -UNIV_INLINE -lock_t* -lock_rec_create( -/*============*/ - lock_t* c_lock, /*!< conflicting lock */ - unsigned type_mode,/*!< in: lock mode and wait flag */ - const buf_block_t* block, /*!< in: buffer block containing - the record */ - ulint heap_no,/*!< in: heap number of the record */ - dict_index_t* index, /*!< in: index of record */ - trx_t* trx, /*!< in,out: transaction */ - bool caller_owns_trx_mutex) - /*!< in: TRUE if caller owns - trx mutex */ -{ - return lock_rec_create_low( - c_lock, - type_mode, block->page.id(), block->page.frame, heap_no, - index, trx, caller_owns_trx_mutex); -} diff --git a/storage/innobase/include/lock0priv.h b/storage/innobase/include/lock0priv.h index e8a4cdd52407c..14f0a6e0903b7 100644 --- a/storage/innobase/include/lock0priv.h +++ b/storage/innobase/include/lock0priv.h @@ -497,14 +497,11 @@ inline byte lock_rec_reset_nth_bit(lock_t* lock, ulint i) return(bit); } -/*********************************************************************//** -Gets the first or next record lock on a page. +/** Gets the first or next record lock on a page. +@param lock a record lock @return next lock, NULL if none exists */ UNIV_INLINE -lock_t* -lock_rec_get_next_on_page( -/*======================*/ - lock_t* lock); /*!< in: a record lock */ +lock_t *lock_rec_get_next_on_page(const lock_t *lock); /*********************************************************************//** Gets the next explicit lock request on a record. diff --git a/storage/innobase/include/lock0priv.inl b/storage/innobase/include/lock0priv.inl index 3c8ec01367b8d..27f12bc552d71 100644 --- a/storage/innobase/include/lock0priv.inl +++ b/storage/innobase/include/lock0priv.inl @@ -101,14 +101,11 @@ lock_rec_set_nth_bit( lock->trx->lock.set_nth_bit_calls++; } -/*********************************************************************//** -Gets the first or next record lock on a page. +/** Gets the first or next record lock on a page. +@param lock a record lock @return next lock, NULL if none exists */ UNIV_INLINE -lock_t* -lock_rec_get_next_on_page( -/*======================*/ - lock_t* lock) /*!< in: a record lock */ +lock_t *lock_rec_get_next_on_page(const lock_t *lock) { return const_cast(lock_rec_get_next_on_page_const(lock)); } @@ -167,14 +164,11 @@ lock_rec_get_nth_bit( return(1 & *b >> (i % 8)); } -/*********************************************************************//** -Gets the first or next record lock on a page. +/** Gets the first or next record lock on a page. +@param lock a record lock @return next lock, NULL if none exists */ UNIV_INLINE -const lock_t* -lock_rec_get_next_on_page_const( -/*============================*/ - const lock_t* lock) /*!< in: a record lock */ +const lock_t *lock_rec_get_next_on_page_const(const lock_t *lock) { ut_ad(!lock->is_table()); diff --git a/storage/innobase/include/lock0types.h b/storage/innobase/include/lock0types.h index 0d00b4b360d2d..8e4ceeee3a60c 100644 --- a/storage/innobase/include/lock0types.h +++ b/storage/innobase/include/lock0types.h @@ -237,6 +237,26 @@ struct ib_lock_t return (type_mode & (LOCK_MODE_MASK | LOCK_GAP)) == LOCK_X; } + static inline bool is_rec_granted_X_not_ii_gap(unsigned type_mode) + { + return (type_mode & (LOCK_INSERT_INTENTION | LOCK_GAP | + LOCK_MODE_MASK)) == LOCK_X; + } + + bool is_rec_granted_X_not_ii_gap() const { + return is_rec_granted_X_not_ii_gap(type_mode); + } + + /** Checks if a lock suits for bypassing. + @param blocking_trx transaction for which the lock is checked + @param has_s_lock_or_stronger if the transaction already holds not gap + and not insert intention S-lock or + stronger for the same heap_no as the + current lock + @return true if lock suits, false otherwise */ + inline bool can_be_bypassed(const trx_t *blocking_trx, + bool has_s_lock_or_stronger) const; + /** Print the lock object into the given output stream. @param[in,out] out the output stream @return the given output stream. */ diff --git a/storage/innobase/lock/lock0lock.cc b/storage/innobase/lock/lock0lock.cc index 1795852ec94d8..1006ea85d1077 100644 --- a/storage/innobase/lock/lock0lock.cc +++ b/storage/innobase/lock/lock0lock.cc @@ -56,6 +56,9 @@ Created 5/7/1996 Heikki Tuuri #include #endif /* WITH_WSREP */ +extern const conflicting_lock_info null_c_lock_info{nullptr, nullptr, + ut_d(nullptr)}; + /** The value of innodb_deadlock_detect */ my_bool innodb_deadlock_detect; /** The value of innodb_deadlock_report */ @@ -984,13 +987,14 @@ lock_rec_get_prev( ut_ad(!in_lock->is_table()); const page_id_t id{in_lock->un_member.rec_lock.page_id}; hash_cell_t *cell= lock_sys.hash_get(in_lock->type_mode).cell_get(id.fold()); + lock_t *prev_lock= nullptr; for (lock_t *lock= lock_sys_t::get_first(*cell, id); lock != in_lock; lock= lock_rec_get_next_on_page(lock)) if (lock_rec_get_nth_bit(lock, heap_no)) - return lock; + prev_lock= lock; - return nullptr; + return prev_lock; } /*============= FUNCTIONS FOR ANALYZING RECORD LOCK QUEUE ================*/ @@ -1159,12 +1163,23 @@ static void wsrep_handle_lock_conflict(trx_t *trx) lock= lock_rec_get_next(heap_no, lock); do { + /* TODO: Conflicting locks can be only before the waiting lock, + consider the following optimization: + if (lock == wait_lock) + break; */ /* This is similar case as above except here we have record-locks instead of table locks. See details from comment above. */ if (lock->trx->mysql_thd && wsrep_will_BF_abort(lock, trx)) { + /* There can't be bypassed locks because: + 1. The transaction can't be blocked by lock to bypass because + lock_rec_other_has_conflicting() does not treat such lock as + conflicting. + 2. The lock is placed before bypassed lock in + lock_rec_create_low(). + TODO: add debug check here */ victims.emplace(lock->trx); } } while ((lock= lock_rec_get_next(heap_no, lock))); @@ -1200,8 +1215,21 @@ static void wsrep_handle_lock_conflict(trx_t *trx) } #endif /* WITH_WSREP */ -/*********************************************************************//** -Checks if some other transaction has a conflicting explicit lock request +inline bool lock_t::can_be_bypassed(const trx_t *blocking_trx, + bool has_s_lock_or_stronger) const +{ + /* There is no need to lock lock_sys.wait_mutex to check + trx->lock.wait_trx here because the current function is executed under + the cell latch, and trx->lock.wait_trx transaction can change wait_trx + field only under the cell latch, wait_trx trx_t object can not be + deinitialized before releasing all its locks, and during releasing the + locks the cell latch will also be requested. So while the cell latch + is held, lock->trx->lock.wait_trx can't be changed. */ + return has_s_lock_or_stronger && is_waiting() && + trx->lock.wait_trx == blocking_trx && is_rec_granted_X_not_ii_gap(); +} + +/** Checks if some other transaction has a conflicting explicit lock request in the queue, so that we have to wait. @param[in] mode LOCK_S or LOCK_X, possibly ORed to LOCK_GAP or LOC_REC_NOT_GAP, LOCK_INSERT_INTENTION @@ -1209,23 +1237,49 @@ LOCK_INSERT_INTENTION @param[in] id page identifier @param[in] heap_no heap number of the record @param[in] trx our transaction -@return conflicting lock and the flag which indicated if conflicting locks -which wait for the current transaction were ignored */ -static lock_t *lock_rec_other_has_conflicting(unsigned mode, - const hash_cell_t &cell, - const page_id_t id, - ulint heap_no, const trx_t *trx) -{ - bool is_supremum = (heap_no == PAGE_HEAP_NO_SUPREMUM); +@return conflicting lock, lock after which new lock should be inserted +in lock queue in the case when the conflicting lock must be bypassed and +bypassed lock */ +static conflicting_lock_info +lock_rec_other_has_conflicting(unsigned mode, const hash_cell_t &cell, + const page_id_t id, ulint heap_no, + const trx_t *trx) +{ + bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); + bool bypass_mode= !is_supremum && lock_t::is_rec_granted_X_not_ii_gap(mode); + bool has_s_lock_or_stronger= false; + const lock_t *insert_after= nullptr; + ut_d(const lock_t *bypassed= nullptr;) + const lock_t *prev_lock= nullptr; - for (lock_t* lock = lock_sys_t::get_first(cell, id, heap_no); - lock; lock = lock_rec_get_next(heap_no, lock)) { - if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) { - return(lock); - } - } + for (lock_t *lock= lock_sys_t::get_first(cell, id, heap_no); lock; + lock= lock_rec_get_next(heap_no, lock)) + { + if (bypass_mode && lock->trx == trx && !lock->is_gap() && + !lock->is_waiting() && !lock->is_insert_intention() && + lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + { + has_s_lock_or_stronger= true; + } + else if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) + { + if (!bypass_mode || !lock->can_be_bypassed(trx, has_s_lock_or_stronger)) + return {lock, nullptr, ut_d(nullptr)}; + /* Store the first lock to bypass to invoke + lock_rec_find_similar_on_page() only for the locks which precede all + bypassed locks. */ + ut_d(if (!bypassed) + bypassed= lock;) + /* There can be several locks to bypass, insert bypassing lock just + before the first bypassed lock. */ + if (!insert_after) + insert_after= prev_lock; + continue; + } + prev_lock= lock; + } - return(NULL); + return {nullptr, const_cast(insert_after), ut_d(bypassed)}; } /*********************************************************************//** @@ -1294,6 +1348,70 @@ lock_number_of_tables_locked( /*============== RECORD LOCK CREATION AND QUEUE MANAGEMENT =============*/ +#ifdef UNIV_DEBUG +/** Validates the correctness of locks bypassing in lock queue on a single +record, i.e. there must not be the following sequence: + (trx1 S) (trx2 X) (trx3 X) (trx1 X) +If bypassing works correctly, where must be the following sequence instead of +the above: + (trx1 S) (trx1 X) (trx2 X) (trx3 X) +Note the above locks are record or next-key locks. +If wrong sequence is found, the function will crash with failed assertion. +@param checked_lock the lock up to which the queue to check +@param heap_no heap_no of the queue to check */ +static void lock_rec_queue_validate_bypass(const lock_t *checked_lock, + ulint heap_no) +{ + /* "do_lock_reverse_page_reorganize" causes lock queue reversing during page + reorganizing, what causes validation failure. Skip the validation for such + case. */ + DBUG_EXECUTE_IF("do_lock_reverse_page_reorganize", return;); + if (!checked_lock || checked_lock->is_waiting()) + return; + page_id_t page_id= checked_lock->un_member.rec_lock.page_id; + hash_cell_t *cell= lock_sys.rec_hash.cell_get(page_id.fold()); + auto mode = checked_lock->type_mode; + const trx_t *trx = checked_lock->trx; + bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); + if (is_supremum || !lock_t::is_rec_granted_X_not_ii_gap(mode)) + return; + const lock_t *has_s_lock_or_stronger= nullptr; + const lock_t *bypassed= nullptr; + + for (lock_t *lock= lock_sys_t::get_first(*cell, page_id, heap_no); lock; + lock= lock_rec_get_next(heap_no, lock)) + { + if (lock->trx == trx && !lock->is_gap() && + !lock->is_waiting() && !lock->is_insert_intention() && + lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + { + ut_ad(!bypassed || lock!=checked_lock); + has_s_lock_or_stronger= lock; + continue; + } + if (lock_rec_has_to_wait(trx, mode, lock, is_supremum)) + { + if (!lock->can_be_bypassed(trx, has_s_lock_or_stronger)) + return; + bypassed = lock; + } + ut_ad(lock != checked_lock || !bypassed); + if (lock == checked_lock) + return; + } +} + +/** Validates the correctness of locks bypassing in lock queue for each set bit +in the lock bitmap. If wrong sequence is found, the function will crash with +failed assertion. +@param lock the lock which bitmap to be checked */ +static void lock_rec_queue_validate_bypass(const lock_t *lock) { + for (ulint i= 0; i < lock_rec_get_n_bits(lock); ++i) + if (lock_rec_get_nth_bit(lock, i)) + lock_rec_queue_validate_bypass(lock, i); +} +#endif + /** Reset the wait status of a lock. @param[in,out] lock lock that was possibly being waited for */ static void lock_reset_lock_and_trx_wait(lock_t *lock) @@ -1308,6 +1426,10 @@ static void lock_reset_lock_and_trx_wait(lock_t *lock) trx->lock.wait_lock= nullptr; trx->lock.wait_trx= nullptr; lock->type_mode&= ~LOCK_WAIT; +#ifdef UNIV_DEBUG + if (!lock->is_table()) + lock_rec_queue_validate_bypass(lock); +#endif } #ifdef UNIV_DEBUG @@ -1325,7 +1447,7 @@ static void check_trx_state(const trx_t *trx) /** Create a new record lock and inserts it to the lock queue, without checking for deadlocks or conflicts. -@param[in] c_lock conflicting lock +@param[in] c_lock_info conflicting lock info @param[in] type_mode lock mode and wait flag @param[in] page_id index page number @param[in] page R-tree index page, or NULL @@ -1336,7 +1458,7 @@ without checking for deadlocks or conflicts. @return created lock */ lock_t* lock_rec_create_low( - lock_t* c_lock, + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t page_id, const page_t* page, @@ -1354,6 +1476,8 @@ lock_rec_create_low( ut_ad(!(type_mode & LOCK_TABLE)); ut_ad(trx->state != TRX_STATE_NOT_STARTED); ut_ad(!trx->is_autocommit_non_locking()); + ut_ad(!c_lock_info.insert_after || !(type_mode & LOCK_WAIT)); + ut_ad(!c_lock_info.bypassed || c_lock_info.insert_after); /* If rec is the supremum record, then we reset the gap and LOCK_REC_NOT_GAP bits, as all locks on the supremum are @@ -1424,23 +1548,30 @@ lock_rec_create_low( } else { /* Predicate lock always on INFIMUM (0) */ lock->un_member.rec_lock.n_bits = 8; - } + } lock_rec_bitmap_reset(lock); lock_rec_set_nth_bit(lock, heap_no); index->table->n_rec_locks++; ut_ad(index->table->get_ref_count() || !index->table->can_be_evicted); const auto lock_hash = &lock_sys.hash_get(type_mode); - lock_hash->cell_get(page_id.fold())->append(*lock, &lock_t::hash); + hash_cell_t& cell = *lock_hash->cell_get(page_id.fold()); + if (UNIV_LIKELY(!c_lock_info.insert_after)) + cell.append(*lock, &lock_t::hash); + else + cell.insert_after(*c_lock_info.insert_after, *lock, + &lock_t::hash); if (type_mode & LOCK_WAIT) { if (trx->lock.wait_trx) { - ut_ad(!c_lock || trx->lock.wait_trx == c_lock->trx); + ut_ad(!c_lock_info.conflicting + || trx->lock.wait_trx + == c_lock_info.conflicting->trx); ut_ad(trx->lock.wait_lock); ut_ad((*trx->lock.wait_lock).trx == trx); } else { - ut_ad(c_lock); - trx->lock.wait_trx = c_lock->trx; + ut_ad(c_lock_info.conflicting); + trx->lock.wait_trx = c_lock_info.conflicting->trx; ut_ad(!trx->lock.wait_lock); } trx->lock.wait_lock = lock; @@ -1451,12 +1582,13 @@ lock_rec_create_low( } MONITOR_INC(MONITOR_RECLOCK_CREATED); MONITOR_INC(MONITOR_NUM_RECLOCK); - + ut_d(lock_rec_queue_validate_bypass(lock, heap_no)); return lock; } /** Enqueue a waiting request for a lock which cannot be granted immediately. Check for deadlocks. +@param c_lock_info conflicting lock info @param[in] type_mode the requested lock mode (LOCK_S or LOCK_X) possibly ORed with LOCK_GAP or LOCK_REC_NOT_GAP, ORed with @@ -1474,7 +1606,7 @@ Check for deadlocks. @retval DB_DEADLOCK if this transaction was chosen as the victim */ dberr_t lock_rec_enqueue_waiting( - lock_t* c_lock, + const conflicting_lock_info &c_lock_info, unsigned type_mode, const page_id_t id, const page_t* page, @@ -1507,7 +1639,7 @@ lock_rec_enqueue_waiting( /* Enqueue the lock request that will wait to be granted, note that we already own the trx mutex. */ lock_t* lock = lock_rec_create_low( - c_lock, + c_lock_info, type_mode | LOCK_WAIT, id, page, heap_no, index, trx, true); if (prdt && type_mode & LOCK_PREDICATE) { @@ -1525,18 +1657,20 @@ lock_rec_enqueue_waiting( return DB_LOCK_WAIT; } -/*********************************************************************//** -Looks for a suitable type record lock struct by the same trx on the same page. -This can be used to save space when a new record lock should be set on a page: -no new struct is needed, if a suitable old is found. +/** Looks for a suitable type record lock struct by the same trx on the same +page. This can be used to save space when a new record lock should be set on a +page: no new struct is needed, if a suitable old is found. +@param type_mode lock type_mode field +@param heap_no heap number of the record +@param lock lock_sys.get_first() +@param last_lock the lock up to which to find +@param trx the transaction which lock we are looking for @return lock or NULL */ -static inline -lock_t* -lock_rec_find_similar_on_page( - ulint type_mode, /*!< in: lock type_mode field */ - ulint heap_no, /*!< in: heap number of the record */ - lock_t* lock, /*!< in: lock_sys.get_first() */ - const trx_t* trx) /*!< in: transaction */ +static inline lock_t *lock_rec_find_similar_on_page(ulint type_mode, + ulint heap_no, + const lock_t *lock, + const lock_t *last_lock, + const trx_t *trx) { lock_sys.rec_hash.assert_locked(lock->un_member.rec_lock.page_id); DBUG_EXECUTE_IF("innodb_skip_lock_bitmap", { @@ -1546,14 +1680,14 @@ lock_rec_find_similar_on_page( }); for (/* No op */; - lock != NULL; + lock != last_lock; lock = lock_rec_get_next_on_page(lock)) { if (lock->trx == trx && lock->type_mode == type_mode && lock_rec_get_n_bits(lock) > heap_no) { - return(lock); + return const_cast(lock); } } @@ -1576,7 +1710,8 @@ which does NOT check for deadlocks or lock compatibility! @param[in,out] trx transaction @param[in] caller_owns_trx_mutex TRUE if caller owns the transaction mutex */ TRANSACTIONAL_TARGET -static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, +static void lock_rec_add_to_queue(const conflicting_lock_info &c_lock_info, + unsigned type_mode, const hash_cell_t &cell, const page_id_t id, const page_t *page, ulint heap_no, dict_index_t *index, trx_t *trx, bool caller_owns_trx_mutex) @@ -1623,8 +1758,8 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, all locks on the supremum are automatically of the gap type, and we try to avoid unnecessary memory consumption of a new record lock struct for a gap type lock */ - - if (heap_no == PAGE_HEAP_NO_SUPREMUM) { + bool is_supremum= heap_no == PAGE_HEAP_NO_SUPREMUM; + if (is_supremum) { ut_ad(!(type_mode & LOCK_REC_NOT_GAP)); /* There should never be LOCK_REC_NOT_GAP on a supremum @@ -1636,21 +1771,41 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, if (type_mode & LOCK_WAIT) { goto create; } else if (lock_t *first_lock = lock_sys_t::get_first(cell, id)) { + bool bypass_mode= !is_supremum + && lock_t::is_rec_granted_X_not_ii_gap(type_mode); + bool has_s_lock_or_stronger= false; for (lock_t* lock = first_lock;;) { - if (lock->is_waiting() - && lock_rec_get_nth_bit(lock, heap_no)) { - goto create; + if (!lock_rec_get_nth_bit(lock, heap_no)) + goto cont; + if (bypass_mode && lock->trx == trx && !lock->is_gap() + && !lock->is_waiting() + && !lock->is_insert_intention() + && lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + { + has_s_lock_or_stronger= true; } + /* There can be several locks suited for bypassing, + skip them all */ + else if (lock->is_waiting() && + (!bypass_mode || !lock->can_be_bypassed(trx, + has_s_lock_or_stronger))) + goto create; +cont: if (!(lock = lock_rec_get_next_on_page(lock))) { break; } } + const lock_t *bypassed = c_lock_info.insert_after ? + lock_rec_get_next(heap_no, c_lock_info.insert_after) : + nullptr; + ut_ad(bypassed == c_lock_info.bypassed); /* Look for a similar record lock on the same page: if one is found and there are no waiting lock requests, we can just set the bit */ if (lock_t* lock = lock_rec_find_similar_on_page( - type_mode, heap_no, first_lock, trx)) { + type_mode, heap_no, first_lock, + bypassed, trx)) { trx_t* lock_trx = lock->trx; if (caller_owns_trx_mutex) { trx->mutex_unlock(); @@ -1663,6 +1818,7 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, if (caller_owns_trx_mutex) { trx->mutex_lock(); } + ut_d(lock_rec_queue_validate_bypass(lock)); return; } } @@ -1672,7 +1828,7 @@ static void lock_rec_add_to_queue(unsigned type_mode, const hash_cell_t &cell, because we should be moving an existing waiting lock request. */ ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); - lock_rec_create_low(nullptr, + lock_rec_create_low(c_lock_info, type_mode, id, page, heap_no, index, trx, caller_owns_trx_mutex); } @@ -1713,12 +1869,13 @@ static void lock_reuse_for_next_key_lock(const lock_t *held_lock, that GAP Locks do not conflict with anything. Therefore a GAP Lock could be granted to us right now if we've requested: */ mode|= LOCK_GAP; - ut_ad(nullptr == - lock_rec_other_has_conflicting(mode, cell, id, heap_no, trx)); + ut_ad(nullptr == lock_rec_other_has_conflicting(mode, cell, id, heap_no, trx) + .conflicting); /* It might be the case we already have one, so we first check that. */ if (lock_rec_has_expl(mode, cell, id, heap_no, trx) == nullptr) - lock_rec_add_to_queue(mode, cell, id, page, heap_no, index, trx, true); + lock_rec_add_to_queue(null_c_lock_info, mode, cell, id, page, heap_no, + index, trx, true); } @@ -1806,21 +1963,26 @@ lock_rec_lock( /* Do nothing if the trx already has a strong enough lock on rec */ if (!held_lock) { - if (lock_t *c_lock= lock_rec_other_has_conflicting(mode, g.cell(), id, - heap_no, trx)) + conflicting_lock_info c_lock_info= + lock_rec_other_has_conflicting(mode, g.cell(), id, heap_no, trx); + if (c_lock_info.conflicting) /* If another transaction has a non-gap conflicting request in the queue, as this transaction does not have a lock strong enough already granted on the record, we have to wait. */ - err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame, - heap_no, index, thr, nullptr); - else if (!impl) + err= lock_rec_enqueue_waiting(c_lock_info, mode, id, + block->page.frame, heap_no, index, thr, + nullptr); + /* If some lock was bypassed, we need to create explicit lock to avoid + conflicting lock search on every try to convert implicit to explicit + lock. */ + else if (!impl || c_lock_info.insert_after) { /* Set the requested lock on the record. */ - lock_rec_add_to_queue(mode, g.cell(), id, block->page.frame, heap_no, - index, trx, true); + lock_rec_add_to_queue(c_lock_info, mode, g.cell(), id, + block->page.frame, heap_no, index, trx, true); err= DB_SUCCESS_LOCKED_REC; } } @@ -1853,46 +2015,74 @@ lock_rec_lock( /* Simplified and faster path for the most common cases */ if (!impl) - lock_rec_create_low(nullptr, mode, id, block->page.frame, heap_no, index, - trx, false); + lock_rec_create_low(null_c_lock_info, mode, id, block->page.frame, heap_no, + index, trx, false); return DB_SUCCESS_LOCKED_REC; } -/*********************************************************************//** -Checks if a waiting record lock request still has to wait in a queue. +/** Checks if a waiting record lock request still has to wait in a queue. +@param cell record locks hash table cell for waiting lock +@param wait_lock waiting lock @return lock that is causing the wait */ -static -const lock_t* +static conflicting_lock_info lock_rec_has_to_wait_in_queue(const hash_cell_t &cell, const lock_t *wait_lock) { - const lock_t* lock; - ulint heap_no; - ulint bit_mask; - ulint bit_offset; - - ut_ad(wait_lock->is_waiting()); - ut_ad(!wait_lock->is_table()); - - heap_no = lock_rec_find_set_bit(wait_lock); - - bit_offset = heap_no / 8; - bit_mask = static_cast(1) << (heap_no % 8); - - for (lock = lock_sys_t::get_first( - cell, wait_lock->un_member.rec_lock.page_id); - lock != wait_lock; - lock = lock_rec_get_next_on_page_const(lock)) { - const byte* p = (const byte*) &lock[1]; - - if (heap_no < lock_rec_get_n_bits(lock) - && (p[bit_offset] & bit_mask) - && lock_has_to_wait(wait_lock, lock)) { - return(lock); - } - } + const lock_t *lock; + ulint heap_no; + ulint bit_mask; + ulint bit_offset; - return(NULL); + ut_ad(wait_lock->is_waiting()); + ut_ad(!wait_lock->is_table()); + + heap_no= lock_rec_find_set_bit(wait_lock); + bool is_supremum= (heap_no == PAGE_HEAP_NO_SUPREMUM); + bool bypass_mode= + !is_supremum && wait_lock->is_rec_granted_X_not_ii_gap(); + bool has_s_lock_or_stronger= false; + const lock_t *insert_after= nullptr; + ut_d(const lock_t *bypassed= nullptr); + + bit_offset= heap_no / 8; + bit_mask= static_cast(1) << (heap_no % 8); + + const trx_t *trx= wait_lock->trx; + const lock_t *prev_lock= nullptr; + /* We can't use lock_sys_t::get_first(cell, id, heap_no) here as in + lock_rec_other_has_conflicting() because we iterate locks only till + wait_lock */ + for (lock= + lock_sys_t::get_first(cell, wait_lock->un_member.rec_lock.page_id); + lock != wait_lock; lock= lock_rec_get_next_on_page_const(lock)) + { + const byte *p= (const byte *) &lock[1]; + if (heap_no >= lock_rec_get_n_bits(lock) || !(p[bit_offset] & bit_mask)) + continue; + if (bypass_mode && lock->trx == trx && !lock->is_gap() && + !lock->is_waiting() && !lock->is_insert_intention() && + lock_mode_stronger_or_eq(lock->mode(), LOCK_S)) + { + has_s_lock_or_stronger= true; + } + else if (lock_has_to_wait(wait_lock, lock)) + { + if (!bypass_mode || !lock->can_be_bypassed(trx, has_s_lock_or_stronger)) + return {lock, nullptr, ut_d(nullptr)}; + /* Store the first lock to bypass to invoke + lock_rec_find_similar_on_page() only for the locks which precede all + bypassed locks. */ + ut_d(if (!bypassed) + bypassed= lock;) + /* There can be several locks to bypass, insert bypassing lock just + before the first bypassed lock. */ + if (!insert_after) + insert_after= prev_lock; + continue; + } + prev_lock= lock; + } + return {nullptr, const_cast(insert_after), ut_d(bypassed)}; } /** Note that a record lock wait started */ @@ -2376,10 +2566,14 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex) the first X lock that is waiting or has been granted. */ for (lock_t* lock = lock_sys_t::get_first(cell, page_id); - lock != NULL; - lock = lock_rec_get_next_on_page(lock)) { - + lock != NULL;) { + /* Store pointer to the next element, because if some lock is + bypassed, the pointer to the next lock in the current lock + object will be changed, as the current lock will change + its position in lock queue. */ + lock_t *next= lock_rec_get_next_on_page(lock); if (!lock->is_waiting()) { + lock= next; continue; } @@ -2390,10 +2584,10 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex) ut_ad(lock->trx->lock.wait_trx); ut_ad(lock->trx->lock.wait_lock); - - if (const lock_t* c = lock_rec_has_to_wait_in_queue( - cell, lock)) { - trx_t* c_trx = c->trx; + conflicting_lock_info c_lock_info= + lock_rec_has_to_wait_in_queue(cell, lock); + if (c_lock_info.conflicting) { + trx_t* c_trx = c_lock_info.conflicting->trx; lock->trx->lock.wait_trx = c_trx; if (c_trx->lock.wait_trx && innodb_deadlock_detect @@ -2401,10 +2595,17 @@ static void lock_rec_dequeue_from_page(lock_t *in_lock, bool owns_wait_mutex) Deadlock::to_be_checked = true; } } else { + if (UNIV_UNLIKELY(c_lock_info.insert_after != nullptr)) + { + cell.remove(*lock, &lock_t::hash); + cell.insert_after(*c_lock_info.insert_after, + *lock, &lock_t::hash); + } /* Grant the lock */ ut_ad(lock->trx != in_lock->trx); lock_grant(lock); } + lock= next; } if (acquired) { @@ -2551,9 +2752,9 @@ lock_rec_inherit_to_gap(hash_cell_t &heir_cell, const page_id_t heir, ((!from_split || !lock->is_record_not_gap()) && lock->mode() != (lock_trx->duplicates ? LOCK_S : LOCK_X)))) { - lock_rec_add_to_queue(LOCK_GAP | lock->mode(), heir_cell, heir, - heir_page, heir_heap_no, lock->index, lock_trx, - false); + lock_rec_add_to_queue(null_c_lock_info, LOCK_GAP | lock->mode(), + heir_cell, heir, heir_page, heir_heap_no, + lock->index, lock_trx, false); } } } @@ -2583,7 +2784,7 @@ lock_rec_inherit_to_gap_if_gap_lock( !lock->is_insert_intention() && (heap_no == PAGE_HEAP_NO_SUPREMUM || !lock->is_record_not_gap()) && !lock_table_has(lock->trx, lock->index->table, LOCK_X)) - lock_rec_add_to_queue(LOCK_GAP | lock->mode(), + lock_rec_add_to_queue(null_c_lock_info, LOCK_GAP | lock->mode(), g.cell(), id, block->page.frame, heir_heap_no, lock->index, lock->trx, false); } @@ -2629,15 +2830,18 @@ lock_rec_move( /* Note that we FIRST reset the bit, and then set the lock: the function works also if donator_id == receiver_id */ - lock_rec_add_to_queue(type_mode, receiver_cell, - receiver_id, receiver.page.frame, - receiver_heap_no, + lock_rec_add_to_queue(null_c_lock_info, type_mode, + receiver_cell, receiver_id, + receiver.page.frame, receiver_heap_no, lock->index, lock_trx, true); lock_trx->mutex_unlock(); } ut_ad(!lock_sys_t::get_first(donator_cell, donator_id, donator_heap_no)); + ut_d(lock_rec_queue_validate_bypass(lock_sys_t::get_first(receiver_cell, + receiver_id, receiver_heap_no), + receiver_heap_no)); } /** Move all the granted locks to the front of the given lock list. @@ -2796,8 +3000,9 @@ lock_move_reorganize_page( /* NOTE that the old lock bitmap could be too small for the new heap number! */ - lock_rec_add_to_queue(lock->type_mode, cell, id, block->page.frame, - new_heap_no, lock->index, lock_trx, true); + lock_rec_add_to_queue(null_c_lock_info, lock->type_mode, cell, id, + block->page.frame, new_heap_no, lock->index, + lock_trx, true); } lock_trx->mutex_unlock(); @@ -2939,9 +3144,9 @@ lock_move_rec_list_end( lock->type_mode&= ~LOCK_WAIT; } - lock_rec_add_to_queue(type_mode, g.cell2(), new_id, - new_page, - rec2_heap_no, lock->index, lock_trx, true); + lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id, + new_page, rec2_heap_no, lock->index, lock_trx, + true); } lock_trx->mutex_unlock(); @@ -3062,7 +3267,7 @@ lock_move_rec_list_start( lock->type_mode&= ~LOCK_WAIT; } - lock_rec_add_to_queue(type_mode, g.cell2(), new_id, + lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id, new_block->page.frame, rec2_heap_no, lock->index, lock_trx, true); } @@ -3156,7 +3361,7 @@ lock_rtr_move_rec_list( lock->type_mode&= ~LOCK_WAIT; } - lock_rec_add_to_queue(type_mode, g.cell2(), new_id, + lock_rec_add_to_queue(null_c_lock_info, type_mode, g.cell2(), new_id, new_block->page.frame, rec2_heap_no, lock->index, lock_trx, true); @@ -4253,24 +4458,38 @@ static void lock_rec_rebuild_waiting_queue( { lock_sys.assert_locked(cell); - for (lock_t *lock= first_lock; lock != NULL; - lock= lock_rec_get_next(heap_no, lock)) + for (lock_t *lock= first_lock; lock != NULL;) { - if (!lock->is_waiting()) + /* Store pointer to the next element, because if some lock is + bypassed, the pointer to the next lock in the current lock + object will be changed, as the current lock will change + its position in lock queue. */ + lock_t *next= lock_rec_get_next(heap_no, lock); + if (!lock->is_waiting()) { + lock= next; continue; + } mysql_mutex_lock(&lock_sys.wait_mutex); ut_ad(lock->trx->lock.wait_trx); ut_ad(lock->trx->lock.wait_lock); - if (const lock_t *c= lock_rec_has_to_wait_in_queue(cell, lock)) - lock->trx->lock.wait_trx= c->trx; + conflicting_lock_info c_lock_info= + lock_rec_has_to_wait_in_queue(cell, lock); + if (c_lock_info.conflicting) + lock->trx->lock.wait_trx= c_lock_info.conflicting->trx; else { + if (c_lock_info.insert_after) + { + cell.remove(*lock, &lock_t::hash); + cell.insert_after(*c_lock_info.insert_after, *lock, &lock_t::hash); + } /* Grant the lock */ ut_ad(trx != lock->trx); lock_grant(lock); } mysql_mutex_unlock(&lock_sys.wait_mutex); + lock= next; } } @@ -5432,7 +5651,8 @@ lock_rec_queue_validate( ut_ad(trx_state_eq(lock->trx, TRX_STATE_COMMITTED_IN_MEMORY) || !lock->is_waiting() - || lock_rec_has_to_wait_in_queue(cell, lock)); + || lock_rec_has_to_wait_in_queue(cell, lock). + conflicting); lock->trx->mutex_unlock(); } @@ -5524,7 +5744,8 @@ lock_rec_queue_validate( if (lock->is_waiting()) { ut_a(lock->is_gap() - || lock_rec_has_to_wait_in_queue(cell, lock)); + || lock_rec_has_to_wait_in_queue(cell, lock). + conflicting); } else if (!lock->is_gap()) { const lock_mode mode = lock->mode() == LOCK_S ? LOCK_X : LOCK_S; @@ -5830,13 +6051,16 @@ lock_rec_insert_check_and_lock( on the successor, which produced an unnecessary deadlock. */ const unsigned type_mode= LOCK_X | LOCK_GAP | LOCK_INSERT_INTENTION; - if (lock_t *c_lock= lock_rec_other_has_conflicting(type_mode, - g.cell(), id, - heap_no, trx)) + conflicting_lock_info c_lock_info= lock_rec_other_has_conflicting( + type_mode, g.cell(), id, heap_no, trx); + /* Insert intention locks must not bypass any other lock. */ + ut_ad(!c_lock_info.insert_after && !c_lock_info.bypassed); + if (c_lock_info.conflicting) { trx->mutex_lock(); - err= lock_rec_enqueue_waiting(c_lock, type_mode, id, block->page.frame, - heap_no, index, thr, nullptr); + err= lock_rec_enqueue_waiting(c_lock_info, type_mode, id, + block->page.frame, heap_no, index, thr, + nullptr); trx->mutex_unlock(); } } @@ -5905,8 +6129,9 @@ static trx_t *lock_rec_convert_impl_to_expl_for_trx(trx_t *trx, if (!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY) && !lock_rec_has_expl(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, heap_no, trx)) - lock_rec_add_to_queue(LOCK_X | LOCK_REC_NOT_GAP, g.cell(), id, - page_align(rec), heap_no, index, trx, true); + lock_rec_add_to_queue(null_c_lock_info, LOCK_X | LOCK_REC_NOT_GAP, + g.cell(), id, page_align(rec), heap_no, index, + trx, true); } trx->release_reference(); diff --git a/storage/innobase/lock/lock0prdt.cc b/storage/innobase/lock/lock0prdt.cc index 3ea05ddb74198..4b99d43e48e5b 100644 --- a/storage/innobase/lock/lock0prdt.cc +++ b/storage/innobase/lock/lock0prdt.cc @@ -32,6 +32,8 @@ Created 9/7/2013 Jimmy Yang #include "dict0mem.h" #include "que0que.h" +extern const conflicting_lock_info null_c_lock_info; + /*********************************************************************//** Get a minimum bounding box from a Predicate @return the minimum bounding box */ @@ -470,8 +472,9 @@ lock_prdt_add_to_queue( because we should be moving an existing waiting lock request. */ ut_ad(!(type_mode & LOCK_WAIT) || trx->lock.wait_trx); - lock_t* lock = lock_rec_create(nullptr, - type_mode, block, PRDT_HEAPNO, index, + lock_t* lock = lock_rec_create_low(null_c_lock_info, + type_mode, block->page.id(), + block->page.frame, PRDT_HEAPNO, index, trx, caller_owns_trx_mutex); if (lock->type_mode & LOCK_PREDICATE) { @@ -533,8 +536,9 @@ lock_prdt_insert_check_and_lock( trx->mutex_lock(); /* Allocate MBR on the lock heap */ lock_init_prdt_from_mbr(prdt, mbr, 0, trx->lock.lock_heap); - err= lock_rec_enqueue_waiting(c_lock, mode, id, block->page.frame, - PRDT_HEAPNO, index, thr, prdt); + err= lock_rec_enqueue_waiting({c_lock, nullptr, ut_d(nullptr)}, mode, id, + block->page.frame, PRDT_HEAPNO, index, + thr, prdt); trx->mutex_unlock(); } } @@ -734,10 +738,10 @@ lock_prdt_lock( lock_t* lock = lock_sys_t::get_first(g.cell(), id); if (lock == NULL) { - lock = lock_rec_create( - NULL, - prdt_mode, block, PRDT_HEAPNO, - index, trx, FALSE); + lock = lock_rec_create_low( + null_c_lock_info, + prdt_mode, block->page.id(), block->page.frame, + PRDT_HEAPNO, index, trx, FALSE); status = LOCK_REC_SUCCESS_CREATED; } else { @@ -759,7 +763,8 @@ lock_prdt_lock( prdt_mode, g.cell(), id, prdt, trx)) { err = lock_rec_enqueue_waiting( - wait_for, prdt_mode, id, + {wait_for, nullptr, ut_d(nullptr)}, + prdt_mode, id, block->page.frame, PRDT_HEAPNO, index, thr, prdt); } else { @@ -826,8 +831,7 @@ lock_place_prdt_page_lock( } if (lock == NULL) { - lock = lock_rec_create_low( - NULL, + lock = lock_rec_create_low(null_c_lock_info, mode, page_id, NULL, PRDT_HEAPNO, index, trx, FALSE);