1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
7 This file is part of PerconaFT.
10 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
12 PerconaFT is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License, version 2,
14 as published by the Free Software Foundation.
16 PerconaFT is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
24 ----------------------------------------
26 PerconaFT is free software: you can redistribute it and/or modify
27 it under the terms of the GNU Affero General Public License, version 3,
28 as published by the Free Software Foundation.
30 PerconaFT is distributed in the hope that it will be useful,
31 but WITHOUT ANY WARRANTY; without even the implied warranty of
32 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 GNU Affero General Public License for more details.
35 You should have received a copy of the GNU Affero General Public License
36 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
38 ----------------------------------------
40 Licensed under the Apache License, Version 2.0 (the "License");
41 you may not use this file except in compliance with the License.
42 You may obtain a copy of the License at
44 http://www.apache.org/licenses/LICENSE-2.0
46 Unless required by applicable law or agreed to in writing, software
47 distributed under the License is distributed on an "AS IS" BASIS,
48 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49 See the License for the specific language governing permissions and
50 limitations under the License.
54 "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
56 #include "lock_request.h"
58 #include "../portability/toku_race_tools.h"
59 #include "../portability/txn_subst.h"
60 #include "../util/dbt.h"
65 // initialize a lock request's internals
66 void lock_request::create(toku_external_mutex_factory_t mutex_factory
) {
68 m_conflicting_txnid
= TXNID_NONE
;
71 m_right_key
= nullptr;
72 toku_init_dbt(&m_left_key_copy
);
73 toku_init_dbt(&m_right_key_copy
);
75 m_type
= type::UNKNOWN
;
79 m_state
= state::UNINITIALIZED
;
82 // psergey-todo: this condition is for interruptible wait
83 // note: moved to here from lock_request::create:
84 toku_external_cond_init(mutex_factory
, &m_wait_cond
);
86 m_start_test_callback
= nullptr;
87 m_start_before_pending_test_callback
= nullptr;
88 m_retry_test_callback
= nullptr;
91 // destroy a lock request.
92 void lock_request::destroy(void) {
93 invariant(m_state
!= state::PENDING
);
94 invariant(m_state
!= state::DESTROYED
);
95 m_state
= state::DESTROYED
;
96 toku_destroy_dbt(&m_left_key_copy
);
97 toku_destroy_dbt(&m_right_key_copy
);
98 toku_external_cond_destroy(&m_wait_cond
);
101 // set the lock request parameters. this API allows a lock request to be reused.
102 void lock_request::set(locktree
*lt
, TXNID txnid
, const DBT
*left_key
,
103 const DBT
*right_key
, lock_request::type lock_type
,
104 bool big_txn
, void *extra
) {
105 invariant(m_state
!= state::PENDING
);
109 m_left_key
= left_key
;
110 m_right_key
= right_key
;
111 toku_destroy_dbt(&m_left_key_copy
);
112 toku_destroy_dbt(&m_right_key_copy
);
114 m_state
= state::INITIALIZED
;
115 m_info
= lt
? lt
->get_lock_request_info() : nullptr;
120 // get rid of any stored left and right key copies and
121 // replace them with copies of the given left and right key
122 void lock_request::copy_keys() {
123 if (!toku_dbt_is_infinite(m_left_key
)) {
124 toku_clone_dbt(&m_left_key_copy
, *m_left_key
);
125 m_left_key
= &m_left_key_copy
;
127 if (!toku_dbt_is_infinite(m_right_key
)) {
128 toku_clone_dbt(&m_right_key_copy
, *m_right_key
);
129 m_right_key
= &m_right_key_copy
;
133 // what are the conflicts for this pending lock request?
134 void lock_request::get_conflicts(txnid_set
*conflicts
) {
135 invariant(m_state
== state::PENDING
);
136 const bool is_write_request
= m_type
== type::WRITE
;
137 m_lt
->get_conflicts(is_write_request
, m_txnid
, m_left_key
, m_right_key
,
141 // build a wait-for-graph for this lock request and the given conflict set
142 // for each transaction B that blocks A's lock request
143 // if B is blocked then
144 // add (A,T) to the WFG and if B is new, fill in the WFG from B
145 void lock_request::build_wait_graph(wfg
*wait_graph
,
146 const txnid_set
&conflicts
) {
147 uint32_t num_conflicts
= conflicts
.size();
148 for (uint32_t i
= 0; i
< num_conflicts
; i
++) {
149 TXNID conflicting_txnid
= conflicts
.get(i
);
150 lock_request
*conflicting_request
= find_lock_request(conflicting_txnid
);
151 invariant(conflicting_txnid
!= m_txnid
);
152 invariant(conflicting_request
!= this);
153 if (conflicting_request
) {
154 bool already_exists
= wait_graph
->node_exists(conflicting_txnid
);
155 wait_graph
->add_edge(m_txnid
, conflicting_txnid
);
156 if (!already_exists
) {
157 // recursively build the wait for graph rooted at the conflicting
158 // request, given its set of lock conflicts.
159 txnid_set other_conflicts
;
160 other_conflicts
.create();
161 conflicting_request
->get_conflicts(&other_conflicts
);
162 conflicting_request
->build_wait_graph(wait_graph
, other_conflicts
);
163 other_conflicts
.destroy();
169 // returns: true if the current set of lock requests contains
170 // a deadlock, false otherwise.
171 bool lock_request::deadlock_exists(const txnid_set
&conflicts
) {
175 build_wait_graph(&wait_graph
, conflicts
);
177 std::function
<void(TXNID
)> reporter
;
179 reporter
= [this](TXNID a
) {
180 lock_request
*req
= find_lock_request(a
);
182 m_deadlock_cb(req
->m_txnid
, (req
->m_type
== lock_request::WRITE
),
183 req
->m_left_key
, req
->m_right_key
);
188 bool deadlock
= wait_graph
.cycle_exists_from_txnid(m_txnid
, reporter
);
189 wait_graph
.destroy();
193 // try to acquire a lock described by this lock request.
194 int lock_request::start(void) {
199 if (m_type
== type::WRITE
) {
200 r
= m_lt
->acquire_write_lock(m_txnid
, m_left_key
, m_right_key
, &conflicts
,
203 invariant(m_type
== type::READ
);
204 r
= m_lt
->acquire_read_lock(m_txnid
, m_left_key
, m_right_key
, &conflicts
,
208 // if the lock is not granted, save it to the set of lock requests
209 // and check for a deadlock. if there is one, complete it as failed
210 if (r
== DB_LOCK_NOTGRANTED
) {
212 m_state
= state::PENDING
;
213 m_start_time
= toku_current_time_microsec() / 1000;
214 m_conflicting_txnid
= conflicts
.get(0);
215 if (m_start_before_pending_test_callback
)
216 m_start_before_pending_test_callback();
217 toku_external_mutex_lock(&m_info
->mutex
);
218 insert_into_lock_requests();
219 if (deadlock_exists(conflicts
)) {
220 remove_from_lock_requests();
221 r
= DB_LOCK_DEADLOCK
;
223 toku_external_mutex_unlock(&m_info
->mutex
);
224 if (m_start_test_callback
) m_start_test_callback(); // test callback
227 if (r
!= DB_LOCK_NOTGRANTED
) {
235 // sleep on the lock request until it becomes resolved or the wait time has
237 int lock_request::wait(uint64_t wait_time_ms
) {
238 return wait(wait_time_ms
, 0, nullptr);
241 int lock_request::wait(uint64_t wait_time_ms
, uint64_t killed_time_ms
,
242 int (*killed_callback
)(void),
243 void (*lock_wait_callback
)(void *, lock_wait_infos
*),
244 void *callback_arg
) {
245 uint64_t t_now
= toku_current_time_microsec();
246 uint64_t t_start
= t_now
;
247 uint64_t t_end
= t_start
+ wait_time_ms
* 1000;
249 toku_external_mutex_lock(&m_info
->mutex
);
251 // check again, this time locking out other retry calls
252 if (m_state
== state::PENDING
) {
253 lock_wait_infos conflicts_collector
;
254 retry(&conflicts_collector
);
255 if (m_state
== state::PENDING
) {
256 report_waits(&conflicts_collector
, lock_wait_callback
, callback_arg
);
260 while (m_state
== state::PENDING
) {
261 // check if this thread is killed
262 if (killed_callback
&& killed_callback()) {
263 remove_from_lock_requests();
264 complete(DB_LOCK_NOTGRANTED
);
268 // compute the time until we should wait
270 if (killed_time_ms
== 0) {
273 t_wait
= t_now
+ killed_time_ms
* 1000;
274 if (t_wait
> t_end
) t_wait
= t_end
;
277 int r
= toku_external_cond_timedwait(&m_wait_cond
, &m_info
->mutex
,
278 (int64_t)(t_wait
- t_now
));
279 invariant(r
== 0 || r
== ETIMEDOUT
);
281 t_now
= toku_current_time_microsec();
282 if (m_state
== state::PENDING
&& (t_now
>= t_end
)) {
283 m_info
->counters
.timeout_count
+= 1;
285 // if we're still pending and we timed out, then remove our
286 // request from the set of lock requests and fail.
287 remove_from_lock_requests();
289 // complete sets m_state to COMPLETE, breaking us out of the loop
290 complete(DB_LOCK_NOTGRANTED
);
294 uint64_t t_real_end
= toku_current_time_microsec();
295 uint64_t duration
= t_real_end
- t_start
;
296 m_info
->counters
.wait_count
+= 1;
297 m_info
->counters
.wait_time
+= duration
;
298 if (duration
>= 1000000) {
299 m_info
->counters
.long_wait_count
+= 1;
300 m_info
->counters
.long_wait_time
+= duration
;
302 toku_external_mutex_unlock(&m_info
->mutex
);
304 invariant(m_state
== state::COMPLETE
);
308 // complete this lock request with the given return value
309 void lock_request::complete(int complete_r
) {
310 m_complete_r
= complete_r
;
311 m_state
= state::COMPLETE
;
314 const DBT
*lock_request::get_left_key(void) const { return m_left_key
; }
316 const DBT
*lock_request::get_right_key(void) const { return m_right_key
; }
318 TXNID
lock_request::get_txnid(void) const { return m_txnid
; }
320 uint64_t lock_request::get_start_time(void) const { return m_start_time
; }
322 TXNID
lock_request::get_conflicting_txnid(void) const {
323 return m_conflicting_txnid
;
326 int lock_request::retry(lock_wait_infos
*conflicts_collector
) {
327 invariant(m_state
== state::PENDING
);
332 if (m_type
== type::WRITE
) {
333 r
= m_lt
->acquire_write_lock(m_txnid
, m_left_key
, m_right_key
, &conflicts
,
336 r
= m_lt
->acquire_read_lock(m_txnid
, m_left_key
, m_right_key
, &conflicts
,
340 // if the acquisition succeeded then remove ourselves from the
341 // set of lock requests, complete, and signal the waiting thread.
343 remove_from_lock_requests();
345 if (m_retry_test_callback
) m_retry_test_callback(); // test callback
346 toku_external_cond_broadcast(&m_wait_cond
);
348 m_conflicting_txnid
= conflicts
.get(0);
349 add_conflicts_to_waits(&conflicts
, conflicts_collector
);
356 void lock_request::retry_all_lock_requests(
357 locktree
*lt
, void (*lock_wait_callback
)(void *, lock_wait_infos
*),
358 void *callback_arg
, void (*after_retry_all_test_callback
)(void)) {
359 lt_lock_request_info
*info
= lt
->get_lock_request_info();
361 // if there are no pending lock requests than there is nothing to do
362 // the unlocked data race on pending_is_empty is OK since lock requests
363 // are retried after added to the pending set.
364 if (info
->pending_is_empty
) return;
366 // get my retry generation (post increment of retry_want)
367 unsigned long long my_retry_want
= (info
->retry_want
+= 1);
369 toku_mutex_lock(&info
->retry_mutex
);
371 // here is the group retry algorithm.
372 // get the latest retry_want count and use it as the generation number of
373 // this retry operation. if this retry generation is > the last retry
374 // generation, then do the lock retries. otherwise, no lock retries
376 if ((my_retry_want
- 1) == info
->retry_done
) {
378 if (!info
->running_retry
) {
379 info
->running_retry
= true;
380 info
->retry_done
= info
->retry_want
;
381 toku_mutex_unlock(&info
->retry_mutex
);
382 retry_all_lock_requests_info(info
, lock_wait_callback
, callback_arg
);
383 if (after_retry_all_test_callback
) after_retry_all_test_callback();
384 toku_mutex_lock(&info
->retry_mutex
);
385 info
->running_retry
= false;
386 toku_cond_broadcast(&info
->retry_cv
);
389 toku_cond_wait(&info
->retry_cv
, &info
->retry_mutex
);
393 toku_mutex_unlock(&info
->retry_mutex
);
396 void lock_request::retry_all_lock_requests_info(
397 lt_lock_request_info
*info
,
398 void (*lock_wait_callback
)(void *, lock_wait_infos
*), void *callback_arg
) {
399 toku_external_mutex_lock(&info
->mutex
);
400 // retry all of the pending lock requests.
401 lock_wait_infos conflicts_collector
;
402 for (uint32_t i
= 0; i
< info
->pending_lock_requests
.size();) {
403 lock_request
*request
;
404 int r
= info
->pending_lock_requests
.fetch(i
, &request
);
407 // retry the lock request. if it didn't succeed,
408 // move on to the next lock request. otherwise
409 // the request is gone from the list so we may
410 // read the i'th entry for the next one.
411 r
= request
->retry(&conflicts_collector
);
417 // call report_waits while holding the pending queue lock since
418 // the waiter object is still valid while it's in the queue
419 report_waits(&conflicts_collector
, lock_wait_callback
, callback_arg
);
421 // future threads should only retry lock requests if some still exist
422 info
->should_retry_lock_requests
= info
->pending_lock_requests
.size() > 0;
423 toku_external_mutex_unlock(&info
->mutex
);
426 void lock_request::add_conflicts_to_waits(txnid_set
*conflicts
,
427 lock_wait_infos
*wait_conflicts
) {
428 wait_conflicts
->push_back({m_lt
, get_txnid(), m_extra
, {}});
429 uint32_t num_conflicts
= conflicts
->size();
430 for (uint32_t i
= 0; i
< num_conflicts
; i
++) {
431 wait_conflicts
->back().waitees
.push_back(conflicts
->get(i
));
435 void lock_request::report_waits(lock_wait_infos
*wait_conflicts
,
436 void (*lock_wait_callback
)(void *,
438 void *callback_arg
) {
439 if (lock_wait_callback
) (*lock_wait_callback
)(callback_arg
, wait_conflicts
);
442 void *lock_request::get_extra(void) const { return m_extra
; }
444 void lock_request::kill_waiter(void) {
445 remove_from_lock_requests();
446 complete(DB_LOCK_NOTGRANTED
);
447 toku_external_cond_broadcast(&m_wait_cond
);
450 void lock_request::kill_waiter(locktree
*lt
, void *extra
) {
451 lt_lock_request_info
*info
= lt
->get_lock_request_info();
452 toku_external_mutex_lock(&info
->mutex
);
453 for (uint32_t i
= 0; i
< info
->pending_lock_requests
.size(); i
++) {
454 lock_request
*request
;
455 int r
= info
->pending_lock_requests
.fetch(i
, &request
);
456 if (r
== 0 && request
->get_extra() == extra
) {
457 request
->kill_waiter();
461 toku_external_mutex_unlock(&info
->mutex
);
464 // find another lock request by txnid. must hold the mutex.
465 lock_request
*lock_request::find_lock_request(const TXNID
&txnid
) {
466 lock_request
*request
;
467 int r
= m_info
->pending_lock_requests
.find_zero
<TXNID
, find_by_txnid
>(
468 txnid
, &request
, nullptr);
475 // insert this lock request into the locktree's set. must hold the mutex.
476 void lock_request::insert_into_lock_requests(void) {
478 lock_request
*request
;
479 int r
= m_info
->pending_lock_requests
.find_zero
<TXNID
, find_by_txnid
>(
480 m_txnid
, &request
, &idx
);
481 invariant(r
== DB_NOTFOUND
);
482 r
= m_info
->pending_lock_requests
.insert_at(this, idx
);
484 m_info
->pending_is_empty
= false;
487 // remove this lock request from the locktree's set. must hold the mutex.
488 void lock_request::remove_from_lock_requests(void) {
490 lock_request
*request
;
491 int r
= m_info
->pending_lock_requests
.find_zero
<TXNID
, find_by_txnid
>(
492 m_txnid
, &request
, &idx
);
494 invariant(request
== this);
495 r
= m_info
->pending_lock_requests
.delete_at(idx
);
497 if (m_info
->pending_lock_requests
.size() == 0)
498 m_info
->pending_is_empty
= true;
501 int lock_request::find_by_txnid(lock_request
*const &request
,
502 const TXNID
&txnid
) {
503 TXNID request_txnid
= request
->m_txnid
;
504 if (request_txnid
< txnid
) {
506 } else if (request_txnid
== txnid
) {
513 void lock_request::set_start_test_callback(void (*f
)(void)) {
514 m_start_test_callback
= f
;
517 void lock_request::set_start_before_pending_test_callback(void (*f
)(void)) {
518 m_start_before_pending_test_callback
= f
;
521 void lock_request::set_retry_test_callback(void (*f
)(void)) {
522 m_retry_test_callback
= f
;
525 } /* namespace toku */
527 #endif // ROCKSDB_LITE