]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/lock_request.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / lock / range / range_tree / lib / locktree / lock_request.cc
1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
3 #ifndef ROCKSDB_LITE
4 #ifndef OS_WIN
5 #ident "$Id$"
6 /*======
7 This file is part of PerconaFT.
8
9
10 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
11
12 PerconaFT is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License, version 2,
14 as published by the Free Software Foundation.
15
16 PerconaFT is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
23
24 ----------------------------------------
25
26 PerconaFT is free software: you can redistribute it and/or modify
27 it under the terms of the GNU Affero General Public License, version 3,
28 as published by the Free Software Foundation.
29
30 PerconaFT is distributed in the hope that it will be useful,
31 but WITHOUT ANY WARRANTY; without even the implied warranty of
32 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 GNU Affero General Public License for more details.
34
35 You should have received a copy of the GNU Affero General Public License
36 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
37
38 ----------------------------------------
39
40 Licensed under the Apache License, Version 2.0 (the "License");
41 you may not use this file except in compliance with the License.
42 You may obtain a copy of the License at
43
44 http://www.apache.org/licenses/LICENSE-2.0
45
46 Unless required by applicable law or agreed to in writing, software
47 distributed under the License is distributed on an "AS IS" BASIS,
48 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49 See the License for the specific language governing permissions and
50 limitations under the License.
51 ======= */
52
53 #ident \
54 "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
55
56 #include "lock_request.h"
57
58 #include "../portability/toku_race_tools.h"
59 #include "../portability/txn_subst.h"
60 #include "../util/dbt.h"
61 #include "locktree.h"
62
63 namespace toku {
64
65 // initialize a lock request's internals
66 void lock_request::create(toku_external_mutex_factory_t mutex_factory) {
67 m_txnid = TXNID_NONE;
68 m_conflicting_txnid = TXNID_NONE;
69 m_start_time = 0;
70 m_left_key = nullptr;
71 m_right_key = nullptr;
72 toku_init_dbt(&m_left_key_copy);
73 toku_init_dbt(&m_right_key_copy);
74
75 m_type = type::UNKNOWN;
76 m_lt = nullptr;
77
78 m_complete_r = 0;
79 m_state = state::UNINITIALIZED;
80 m_info = nullptr;
81
82 // psergey-todo: this condition is for interruptible wait
83 // note: moved to here from lock_request::create:
84 toku_external_cond_init(mutex_factory, &m_wait_cond);
85
86 m_start_test_callback = nullptr;
87 m_start_before_pending_test_callback = nullptr;
88 m_retry_test_callback = nullptr;
89 }
90
91 // destroy a lock request.
92 void lock_request::destroy(void) {
93 invariant(m_state != state::PENDING);
94 invariant(m_state != state::DESTROYED);
95 m_state = state::DESTROYED;
96 toku_destroy_dbt(&m_left_key_copy);
97 toku_destroy_dbt(&m_right_key_copy);
98 toku_external_cond_destroy(&m_wait_cond);
99 }
100
101 // set the lock request parameters. this API allows a lock request to be reused.
102 void lock_request::set(locktree *lt, TXNID txnid, const DBT *left_key,
103 const DBT *right_key, lock_request::type lock_type,
104 bool big_txn, void *extra) {
105 invariant(m_state != state::PENDING);
106 m_lt = lt;
107
108 m_txnid = txnid;
109 m_left_key = left_key;
110 m_right_key = right_key;
111 toku_destroy_dbt(&m_left_key_copy);
112 toku_destroy_dbt(&m_right_key_copy);
113 m_type = lock_type;
114 m_state = state::INITIALIZED;
115 m_info = lt ? lt->get_lock_request_info() : nullptr;
116 m_big_txn = big_txn;
117 m_extra = extra;
118 }
119
120 // get rid of any stored left and right key copies and
121 // replace them with copies of the given left and right key
122 void lock_request::copy_keys() {
123 if (!toku_dbt_is_infinite(m_left_key)) {
124 toku_clone_dbt(&m_left_key_copy, *m_left_key);
125 m_left_key = &m_left_key_copy;
126 }
127 if (!toku_dbt_is_infinite(m_right_key)) {
128 toku_clone_dbt(&m_right_key_copy, *m_right_key);
129 m_right_key = &m_right_key_copy;
130 }
131 }
132
133 // what are the conflicts for this pending lock request?
134 void lock_request::get_conflicts(txnid_set *conflicts) {
135 invariant(m_state == state::PENDING);
136 const bool is_write_request = m_type == type::WRITE;
137 m_lt->get_conflicts(is_write_request, m_txnid, m_left_key, m_right_key,
138 conflicts);
139 }
140
141 // build a wait-for-graph for this lock request and the given conflict set
142 // for each transaction B that blocks A's lock request
143 // if B is blocked then
144 // add (A,T) to the WFG and if B is new, fill in the WFG from B
145 void lock_request::build_wait_graph(wfg *wait_graph,
146 const txnid_set &conflicts) {
147 uint32_t num_conflicts = conflicts.size();
148 for (uint32_t i = 0; i < num_conflicts; i++) {
149 TXNID conflicting_txnid = conflicts.get(i);
150 lock_request *conflicting_request = find_lock_request(conflicting_txnid);
151 invariant(conflicting_txnid != m_txnid);
152 invariant(conflicting_request != this);
153 if (conflicting_request) {
154 bool already_exists = wait_graph->node_exists(conflicting_txnid);
155 wait_graph->add_edge(m_txnid, conflicting_txnid);
156 if (!already_exists) {
157 // recursively build the wait for graph rooted at the conflicting
158 // request, given its set of lock conflicts.
159 txnid_set other_conflicts;
160 other_conflicts.create();
161 conflicting_request->get_conflicts(&other_conflicts);
162 conflicting_request->build_wait_graph(wait_graph, other_conflicts);
163 other_conflicts.destroy();
164 }
165 }
166 }
167 }
168
169 // returns: true if the current set of lock requests contains
170 // a deadlock, false otherwise.
171 bool lock_request::deadlock_exists(const txnid_set &conflicts) {
172 wfg wait_graph;
173 wait_graph.create();
174
175 build_wait_graph(&wait_graph, conflicts);
176
177 std::function<void(TXNID)> reporter;
178 if (m_deadlock_cb) {
179 reporter = [this](TXNID a) {
180 lock_request *req = find_lock_request(a);
181 if (req) {
182 m_deadlock_cb(req->m_txnid, (req->m_type == lock_request::WRITE),
183 req->m_left_key, req->m_right_key);
184 }
185 };
186 }
187
188 bool deadlock = wait_graph.cycle_exists_from_txnid(m_txnid, reporter);
189 wait_graph.destroy();
190 return deadlock;
191 }
192
193 // try to acquire a lock described by this lock request.
194 int lock_request::start(void) {
195 int r;
196
197 txnid_set conflicts;
198 conflicts.create();
199 if (m_type == type::WRITE) {
200 r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts,
201 m_big_txn);
202 } else {
203 invariant(m_type == type::READ);
204 r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts,
205 m_big_txn);
206 }
207
208 // if the lock is not granted, save it to the set of lock requests
209 // and check for a deadlock. if there is one, complete it as failed
210 if (r == DB_LOCK_NOTGRANTED) {
211 copy_keys();
212 m_state = state::PENDING;
213 m_start_time = toku_current_time_microsec() / 1000;
214 m_conflicting_txnid = conflicts.get(0);
215 if (m_start_before_pending_test_callback)
216 m_start_before_pending_test_callback();
217 toku_external_mutex_lock(&m_info->mutex);
218 insert_into_lock_requests();
219 if (deadlock_exists(conflicts)) {
220 remove_from_lock_requests();
221 r = DB_LOCK_DEADLOCK;
222 }
223 toku_external_mutex_unlock(&m_info->mutex);
224 if (m_start_test_callback) m_start_test_callback(); // test callback
225 }
226
227 if (r != DB_LOCK_NOTGRANTED) {
228 complete(r);
229 }
230
231 conflicts.destroy();
232 return r;
233 }
234
235 // sleep on the lock request until it becomes resolved or the wait time has
236 // elapsed.
237 int lock_request::wait(uint64_t wait_time_ms) {
238 return wait(wait_time_ms, 0, nullptr);
239 }
240
241 int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms,
242 int (*killed_callback)(void),
243 void (*lock_wait_callback)(void *, lock_wait_infos *),
244 void *callback_arg) {
245 uint64_t t_now = toku_current_time_microsec();
246 uint64_t t_start = t_now;
247 uint64_t t_end = t_start + wait_time_ms * 1000;
248
249 toku_external_mutex_lock(&m_info->mutex);
250
251 // check again, this time locking out other retry calls
252 if (m_state == state::PENDING) {
253 lock_wait_infos conflicts_collector;
254 retry(&conflicts_collector);
255 if (m_state == state::PENDING) {
256 report_waits(&conflicts_collector, lock_wait_callback, callback_arg);
257 }
258 }
259
260 while (m_state == state::PENDING) {
261 // check if this thread is killed
262 if (killed_callback && killed_callback()) {
263 remove_from_lock_requests();
264 complete(DB_LOCK_NOTGRANTED);
265 continue;
266 }
267
268 // compute the time until we should wait
269 uint64_t t_wait;
270 if (killed_time_ms == 0) {
271 t_wait = t_end;
272 } else {
273 t_wait = t_now + killed_time_ms * 1000;
274 if (t_wait > t_end) t_wait = t_end;
275 }
276
277 int r = toku_external_cond_timedwait(&m_wait_cond, &m_info->mutex,
278 (int64_t)(t_wait - t_now));
279 invariant(r == 0 || r == ETIMEDOUT);
280
281 t_now = toku_current_time_microsec();
282 if (m_state == state::PENDING && (t_now >= t_end)) {
283 m_info->counters.timeout_count += 1;
284
285 // if we're still pending and we timed out, then remove our
286 // request from the set of lock requests and fail.
287 remove_from_lock_requests();
288
289 // complete sets m_state to COMPLETE, breaking us out of the loop
290 complete(DB_LOCK_NOTGRANTED);
291 }
292 }
293
294 uint64_t t_real_end = toku_current_time_microsec();
295 uint64_t duration = t_real_end - t_start;
296 m_info->counters.wait_count += 1;
297 m_info->counters.wait_time += duration;
298 if (duration >= 1000000) {
299 m_info->counters.long_wait_count += 1;
300 m_info->counters.long_wait_time += duration;
301 }
302 toku_external_mutex_unlock(&m_info->mutex);
303
304 invariant(m_state == state::COMPLETE);
305 return m_complete_r;
306 }
307
308 // complete this lock request with the given return value
309 void lock_request::complete(int complete_r) {
310 m_complete_r = complete_r;
311 m_state = state::COMPLETE;
312 }
313
314 const DBT *lock_request::get_left_key(void) const { return m_left_key; }
315
316 const DBT *lock_request::get_right_key(void) const { return m_right_key; }
317
318 TXNID lock_request::get_txnid(void) const { return m_txnid; }
319
320 uint64_t lock_request::get_start_time(void) const { return m_start_time; }
321
322 TXNID lock_request::get_conflicting_txnid(void) const {
323 return m_conflicting_txnid;
324 }
325
326 int lock_request::retry(lock_wait_infos *conflicts_collector) {
327 invariant(m_state == state::PENDING);
328 int r;
329 txnid_set conflicts;
330 conflicts.create();
331
332 if (m_type == type::WRITE) {
333 r = m_lt->acquire_write_lock(m_txnid, m_left_key, m_right_key, &conflicts,
334 m_big_txn);
335 } else {
336 r = m_lt->acquire_read_lock(m_txnid, m_left_key, m_right_key, &conflicts,
337 m_big_txn);
338 }
339
340 // if the acquisition succeeded then remove ourselves from the
341 // set of lock requests, complete, and signal the waiting thread.
342 if (r == 0) {
343 remove_from_lock_requests();
344 complete(r);
345 if (m_retry_test_callback) m_retry_test_callback(); // test callback
346 toku_external_cond_broadcast(&m_wait_cond);
347 } else {
348 m_conflicting_txnid = conflicts.get(0);
349 add_conflicts_to_waits(&conflicts, conflicts_collector);
350 }
351 conflicts.destroy();
352
353 return r;
354 }
355
356 void lock_request::retry_all_lock_requests(
357 locktree *lt, void (*lock_wait_callback)(void *, lock_wait_infos *),
358 void *callback_arg, void (*after_retry_all_test_callback)(void)) {
359 lt_lock_request_info *info = lt->get_lock_request_info();
360
361 // if there are no pending lock requests than there is nothing to do
362 // the unlocked data race on pending_is_empty is OK since lock requests
363 // are retried after added to the pending set.
364 if (info->pending_is_empty) return;
365
366 // get my retry generation (post increment of retry_want)
367 unsigned long long my_retry_want = (info->retry_want += 1);
368
369 toku_mutex_lock(&info->retry_mutex);
370
371 // here is the group retry algorithm.
372 // get the latest retry_want count and use it as the generation number of
373 // this retry operation. if this retry generation is > the last retry
374 // generation, then do the lock retries. otherwise, no lock retries
375 // are needed.
376 if ((my_retry_want - 1) == info->retry_done) {
377 for (;;) {
378 if (!info->running_retry) {
379 info->running_retry = true;
380 info->retry_done = info->retry_want;
381 toku_mutex_unlock(&info->retry_mutex);
382 retry_all_lock_requests_info(info, lock_wait_callback, callback_arg);
383 if (after_retry_all_test_callback) after_retry_all_test_callback();
384 toku_mutex_lock(&info->retry_mutex);
385 info->running_retry = false;
386 toku_cond_broadcast(&info->retry_cv);
387 break;
388 } else {
389 toku_cond_wait(&info->retry_cv, &info->retry_mutex);
390 }
391 }
392 }
393 toku_mutex_unlock(&info->retry_mutex);
394 }
395
396 void lock_request::retry_all_lock_requests_info(
397 lt_lock_request_info *info,
398 void (*lock_wait_callback)(void *, lock_wait_infos *), void *callback_arg) {
399 toku_external_mutex_lock(&info->mutex);
400 // retry all of the pending lock requests.
401 lock_wait_infos conflicts_collector;
402 for (uint32_t i = 0; i < info->pending_lock_requests.size();) {
403 lock_request *request;
404 int r = info->pending_lock_requests.fetch(i, &request);
405 invariant_zero(r);
406
407 // retry the lock request. if it didn't succeed,
408 // move on to the next lock request. otherwise
409 // the request is gone from the list so we may
410 // read the i'th entry for the next one.
411 r = request->retry(&conflicts_collector);
412 if (r != 0) {
413 i++;
414 }
415 }
416
417 // call report_waits while holding the pending queue lock since
418 // the waiter object is still valid while it's in the queue
419 report_waits(&conflicts_collector, lock_wait_callback, callback_arg);
420
421 // future threads should only retry lock requests if some still exist
422 info->should_retry_lock_requests = info->pending_lock_requests.size() > 0;
423 toku_external_mutex_unlock(&info->mutex);
424 }
425
426 void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
427 lock_wait_infos *wait_conflicts) {
428 wait_conflicts->push_back({m_lt, get_txnid(), m_extra, {}});
429 uint32_t num_conflicts = conflicts->size();
430 for (uint32_t i = 0; i < num_conflicts; i++) {
431 wait_conflicts->back().waitees.push_back(conflicts->get(i));
432 }
433 }
434
435 void lock_request::report_waits(lock_wait_infos *wait_conflicts,
436 void (*lock_wait_callback)(void *,
437 lock_wait_infos *),
438 void *callback_arg) {
439 if (lock_wait_callback) (*lock_wait_callback)(callback_arg, wait_conflicts);
440 }
441
442 void *lock_request::get_extra(void) const { return m_extra; }
443
444 void lock_request::kill_waiter(void) {
445 remove_from_lock_requests();
446 complete(DB_LOCK_NOTGRANTED);
447 toku_external_cond_broadcast(&m_wait_cond);
448 }
449
450 void lock_request::kill_waiter(locktree *lt, void *extra) {
451 lt_lock_request_info *info = lt->get_lock_request_info();
452 toku_external_mutex_lock(&info->mutex);
453 for (uint32_t i = 0; i < info->pending_lock_requests.size(); i++) {
454 lock_request *request;
455 int r = info->pending_lock_requests.fetch(i, &request);
456 if (r == 0 && request->get_extra() == extra) {
457 request->kill_waiter();
458 break;
459 }
460 }
461 toku_external_mutex_unlock(&info->mutex);
462 }
463
464 // find another lock request by txnid. must hold the mutex.
465 lock_request *lock_request::find_lock_request(const TXNID &txnid) {
466 lock_request *request;
467 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
468 txnid, &request, nullptr);
469 if (r != 0) {
470 request = nullptr;
471 }
472 return request;
473 }
474
475 // insert this lock request into the locktree's set. must hold the mutex.
476 void lock_request::insert_into_lock_requests(void) {
477 uint32_t idx;
478 lock_request *request;
479 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
480 m_txnid, &request, &idx);
481 invariant(r == DB_NOTFOUND);
482 r = m_info->pending_lock_requests.insert_at(this, idx);
483 invariant_zero(r);
484 m_info->pending_is_empty = false;
485 }
486
487 // remove this lock request from the locktree's set. must hold the mutex.
488 void lock_request::remove_from_lock_requests(void) {
489 uint32_t idx;
490 lock_request *request;
491 int r = m_info->pending_lock_requests.find_zero<TXNID, find_by_txnid>(
492 m_txnid, &request, &idx);
493 invariant_zero(r);
494 invariant(request == this);
495 r = m_info->pending_lock_requests.delete_at(idx);
496 invariant_zero(r);
497 if (m_info->pending_lock_requests.size() == 0)
498 m_info->pending_is_empty = true;
499 }
500
501 int lock_request::find_by_txnid(lock_request *const &request,
502 const TXNID &txnid) {
503 TXNID request_txnid = request->m_txnid;
504 if (request_txnid < txnid) {
505 return -1;
506 } else if (request_txnid == txnid) {
507 return 0;
508 } else {
509 return 1;
510 }
511 }
512
513 void lock_request::set_start_test_callback(void (*f)(void)) {
514 m_start_test_callback = f;
515 }
516
517 void lock_request::set_start_before_pending_test_callback(void (*f)(void)) {
518 m_start_before_pending_test_callback = f;
519 }
520
521 void lock_request::set_retry_test_callback(void (*f)(void)) {
522 m_retry_test_callback = f;
523 }
524
525 } /* namespace toku */
526 #endif // OS_WIN
527 #endif // ROCKSDB_LITE