]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/lock/range/range_tree/lib/locktree/locktree.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / lock / range / range_tree / lib / locktree / locktree.cc
1 /* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 // vim: ft=cpp:expandtab:ts=8:sw=2:softtabstop=2:
3 #ifndef ROCKSDB_LITE
4 #ifndef OS_WIN
5 #ident "$Id$"
6 /*======
7 This file is part of PerconaFT.
8
9
10 Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved.
11
12 PerconaFT is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License, version 2,
14 as published by the Free Software Foundation.
15
16 PerconaFT is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
23
24 ----------------------------------------
25
26 PerconaFT is free software: you can redistribute it and/or modify
27 it under the terms of the GNU Affero General Public License, version 3,
28 as published by the Free Software Foundation.
29
30 PerconaFT is distributed in the hope that it will be useful,
31 but WITHOUT ANY WARRANTY; without even the implied warranty of
32 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 GNU Affero General Public License for more details.
34
35 You should have received a copy of the GNU Affero General Public License
36 along with PerconaFT. If not, see <http://www.gnu.org/licenses/>.
37
38 ----------------------------------------
39
40 Licensed under the Apache License, Version 2.0 (the "License");
41 you may not use this file except in compliance with the License.
42 You may obtain a copy of the License at
43
44 http://www.apache.org/licenses/LICENSE-2.0
45
46 Unless required by applicable law or agreed to in writing, software
47 distributed under the License is distributed on an "AS IS" BASIS,
48 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
49 See the License for the specific language governing permissions and
50 limitations under the License.
51 ======= */
52
53 #ident \
54 "Copyright (c) 2006, 2015, Percona and/or its affiliates. All rights reserved."
55
56 #include "locktree.h"
57
58 #include <memory.h>
59
60 #include "../portability/toku_pthread.h"
61 #include "../portability/toku_time.h"
62 #include "../util/growable_array.h"
63 #include "range_buffer.h"
64
65 // including the concurrent_tree here expands the templates
66 // and "defines" the implementation, so we do it here in
67 // the locktree source file instead of the header.
68 #include "concurrent_tree.h"
69
70 namespace toku {
71 // A locktree represents the set of row locks owned by all transactions
72 // over an open dictionary. Read and write ranges are represented as
73 // a left and right key which are compared with the given descriptor
74 // and comparison fn.
75 //
76 // Each locktree has a reference count which it manages
77 // but does nothing based on the value of the reference count - it is
78 // up to the user of the locktree to destroy it when it sees fit.
79
80 void locktree::create(locktree_manager *mgr, DICTIONARY_ID dict_id,
81 const comparator &cmp,
82 toku_external_mutex_factory_t mutex_factory) {
83 m_mgr = mgr;
84 m_dict_id = dict_id;
85
86 m_cmp.create_from(cmp);
87 m_reference_count = 1;
88 m_userdata = nullptr;
89
90 XCALLOC(m_rangetree);
91 m_rangetree->create(&m_cmp);
92
93 m_sto_txnid = TXNID_NONE;
94 m_sto_buffer.create();
95 m_sto_score = STO_SCORE_THRESHOLD;
96 m_sto_end_early_count = 0;
97 m_sto_end_early_time = 0;
98
99 m_escalation_barrier = [](const DBT *, const DBT *, void *) -> bool {
100 return false;
101 };
102
103 m_lock_request_info.init(mutex_factory);
104 }
105
106 void locktree::set_escalation_barrier_func(
107 lt_escalation_barrier_check_func func, void *extra) {
108 m_escalation_barrier = func;
109 m_escalation_barrier_arg = extra;
110 }
111
112 void lt_lock_request_info::init(toku_external_mutex_factory_t mutex_factory) {
113 pending_lock_requests.create();
114 pending_is_empty = true;
115 toku_external_mutex_init(mutex_factory, &mutex);
116 retry_want = retry_done = 0;
117 ZERO_STRUCT(counters);
118 ZERO_STRUCT(retry_mutex);
119 toku_mutex_init(locktree_request_info_retry_mutex_key, &retry_mutex, nullptr);
120 toku_cond_init(locktree_request_info_retry_cv_key, &retry_cv, nullptr);
121 running_retry = false;
122
123 TOKU_VALGRIND_HG_DISABLE_CHECKING(&pending_is_empty,
124 sizeof(pending_is_empty));
125 TOKU_DRD_IGNORE_VAR(pending_is_empty);
126 }
127
128 void locktree::destroy(void) {
129 invariant(m_reference_count == 0);
130 invariant(m_lock_request_info.pending_lock_requests.size() == 0);
131 m_cmp.destroy();
132 m_rangetree->destroy();
133 toku_free(m_rangetree);
134 m_sto_buffer.destroy();
135 m_lock_request_info.destroy();
136 }
137
138 void lt_lock_request_info::destroy(void) {
139 pending_lock_requests.destroy();
140 toku_external_mutex_destroy(&mutex);
141 toku_mutex_destroy(&retry_mutex);
142 toku_cond_destroy(&retry_cv);
143 }
144
145 void locktree::add_reference(void) {
146 (void)toku_sync_add_and_fetch(&m_reference_count, 1);
147 }
148
149 uint32_t locktree::release_reference(void) {
150 return toku_sync_sub_and_fetch(&m_reference_count, 1);
151 }
152
153 uint32_t locktree::get_reference_count(void) { return m_reference_count; }
154
155 // a container for a range/txnid pair
156 struct row_lock {
157 keyrange range;
158 TXNID txnid;
159 bool is_shared;
160 TxnidVector *owners;
161 };
162
163 // iterate over a locked keyrange and copy out all of the data,
164 // storing each row lock into the given growable array. the
165 // caller does not own the range inside the returned row locks,
166 // so remove from the tree with care using them as keys.
167 static void iterate_and_get_overlapping_row_locks(
168 const concurrent_tree::locked_keyrange *lkr,
169 GrowableArray<row_lock> *row_locks) {
170 struct copy_fn_obj {
171 GrowableArray<row_lock> *row_locks;
172 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
173 TxnidVector *owners) {
174 row_lock lock = {.range = range,
175 .txnid = txnid,
176 .is_shared = is_shared,
177 .owners = owners};
178 row_locks->push(lock);
179 return true;
180 }
181 } copy_fn;
182 copy_fn.row_locks = row_locks;
183 lkr->iterate(&copy_fn);
184 }
185
186 // given a txnid and a set of overlapping row locks, determine
187 // which txnids are conflicting, and store them in the conflicts
188 // set, if given.
189 static bool determine_conflicting_txnids(
190 const GrowableArray<row_lock> &row_locks, const TXNID &txnid,
191 txnid_set *conflicts) {
192 bool conflicts_exist = false;
193 const size_t num_overlaps = row_locks.get_size();
194 for (size_t i = 0; i < num_overlaps; i++) {
195 const row_lock lock = row_locks.fetch_unchecked(i);
196 const TXNID other_txnid = lock.txnid;
197 if (other_txnid != txnid) {
198 if (conflicts) {
199 if (other_txnid == TXNID_SHARED) {
200 // Add all shared lock owners, except this transaction.
201 for (TXNID shared_id : *lock.owners) {
202 if (shared_id != txnid) conflicts->add(shared_id);
203 }
204 } else {
205 conflicts->add(other_txnid);
206 }
207 }
208 conflicts_exist = true;
209 }
210 }
211 return conflicts_exist;
212 }
213
214 // how much memory does a row lock take up in a concurrent tree?
215 static uint64_t row_lock_size_in_tree(const row_lock &lock) {
216 const uint64_t overhead = concurrent_tree::get_insertion_memory_overhead();
217 return lock.range.get_memory_size() + overhead;
218 }
219
220 // remove and destroy the given row lock from the locked keyrange,
221 // then notify the memory tracker of the newly freed lock.
222 static void remove_row_lock_from_tree(concurrent_tree::locked_keyrange *lkr,
223 const row_lock &lock, TXNID txnid,
224 locktree_manager *mgr) {
225 const uint64_t mem_released = row_lock_size_in_tree(lock);
226 lkr->remove(lock.range, txnid);
227 if (mgr != nullptr) {
228 mgr->note_mem_released(mem_released);
229 }
230 }
231
232 // insert a row lock into the locked keyrange, then notify
233 // the memory tracker of this newly acquired lock.
234 static void insert_row_lock_into_tree(concurrent_tree::locked_keyrange *lkr,
235 const row_lock &lock,
236 locktree_manager *mgr) {
237 uint64_t mem_used = row_lock_size_in_tree(lock);
238 lkr->insert(lock.range, lock.txnid, lock.is_shared);
239 if (mgr != nullptr) {
240 mgr->note_mem_used(mem_used);
241 }
242 }
243
244 void locktree::sto_begin(TXNID txnid) {
245 invariant(m_sto_txnid == TXNID_NONE);
246 invariant(m_sto_buffer.is_empty());
247 m_sto_txnid = txnid;
248 }
249
250 void locktree::sto_append(const DBT *left_key, const DBT *right_key,
251 bool is_write_request) {
252 uint64_t buffer_mem, delta;
253
254 // psergey: the below two lines do not make any sense
255 // (and it's the same in upstream TokuDB)
256 keyrange range;
257 range.create(left_key, right_key);
258
259 buffer_mem = m_sto_buffer.total_memory_size();
260 m_sto_buffer.append(left_key, right_key, is_write_request);
261 delta = m_sto_buffer.total_memory_size() - buffer_mem;
262 if (m_mgr != nullptr) {
263 m_mgr->note_mem_used(delta);
264 }
265 }
266
267 void locktree::sto_end(void) {
268 uint64_t mem_size = m_sto_buffer.total_memory_size();
269 if (m_mgr != nullptr) {
270 m_mgr->note_mem_released(mem_size);
271 }
272 m_sto_buffer.destroy();
273 m_sto_buffer.create();
274 m_sto_txnid = TXNID_NONE;
275 }
276
277 void locktree::sto_end_early_no_accounting(void *prepared_lkr) {
278 sto_migrate_buffer_ranges_to_tree(prepared_lkr);
279 sto_end();
280 toku_unsafe_set(m_sto_score, 0);
281 }
282
283 void locktree::sto_end_early(void *prepared_lkr) {
284 m_sto_end_early_count++;
285
286 tokutime_t t0 = toku_time_now();
287 sto_end_early_no_accounting(prepared_lkr);
288 tokutime_t t1 = toku_time_now();
289
290 m_sto_end_early_time += (t1 - t0);
291 }
292
293 void locktree::sto_migrate_buffer_ranges_to_tree(void *prepared_lkr) {
294 // There should be something to migrate, and nothing in the rangetree.
295 invariant(!m_sto_buffer.is_empty());
296 invariant(m_rangetree->is_empty());
297
298 concurrent_tree sto_rangetree;
299 concurrent_tree::locked_keyrange sto_lkr;
300 sto_rangetree.create(&m_cmp);
301
302 // insert all of the ranges from the single txnid buffer into a new rangtree
303 range_buffer::iterator iter(&m_sto_buffer);
304 range_buffer::iterator::record rec;
305 while (iter.current(&rec)) {
306 sto_lkr.prepare(&sto_rangetree);
307 int r = acquire_lock_consolidated(&sto_lkr, m_sto_txnid, rec.get_left_key(),
308 rec.get_right_key(),
309 rec.get_exclusive_flag(), nullptr);
310 invariant_zero(r);
311 sto_lkr.release();
312 iter.next();
313 }
314
315 // Iterate the newly created rangetree and insert each range into the
316 // locktree's rangetree, on behalf of the old single txnid.
317 struct migrate_fn_obj {
318 concurrent_tree::locked_keyrange *dst_lkr;
319 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
320 TxnidVector *owners) {
321 // There can't be multiple owners in STO mode
322 invariant_zero(owners);
323 dst_lkr->insert(range, txnid, is_shared);
324 return true;
325 }
326 } migrate_fn;
327 migrate_fn.dst_lkr =
328 static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
329 sto_lkr.prepare(&sto_rangetree);
330 sto_lkr.iterate(&migrate_fn);
331 sto_lkr.remove_all();
332 sto_lkr.release();
333 sto_rangetree.destroy();
334 invariant(!m_rangetree->is_empty());
335 }
336
337 bool locktree::sto_try_acquire(void *prepared_lkr, TXNID txnid,
338 const DBT *left_key, const DBT *right_key,
339 bool is_write_request) {
340 if (m_rangetree->is_empty() && m_sto_buffer.is_empty() &&
341 toku_unsafe_fetch(m_sto_score) >= STO_SCORE_THRESHOLD) {
342 // We can do the optimization because the rangetree is empty, and
343 // we know its worth trying because the sto score is big enough.
344 sto_begin(txnid);
345 } else if (m_sto_txnid != TXNID_NONE) {
346 // We are currently doing the optimization. Check if we need to cancel
347 // it because a new txnid appeared, or if the current single txnid has
348 // taken too many locks already.
349 if (m_sto_txnid != txnid ||
350 m_sto_buffer.get_num_ranges() > STO_BUFFER_MAX_SIZE) {
351 sto_end_early(prepared_lkr);
352 }
353 }
354
355 // At this point the sto txnid is properly set. If it is valid, then
356 // this txnid can append its lock to the sto buffer successfully.
357 if (m_sto_txnid != TXNID_NONE) {
358 invariant(m_sto_txnid == txnid);
359 sto_append(left_key, right_key, is_write_request);
360 return true;
361 } else {
362 invariant(m_sto_buffer.is_empty());
363 return false;
364 }
365 }
366
367 /*
368 Do the same as iterate_and_get_overlapping_row_locks does, but also check for
369 this:
370 The set of overlapping rows locks consists of just one read-only shared
371 lock with the same endpoints as specified (in that case, we can just add
372 ourselves into that list)
373
374 @return true - One compatible shared lock
375 false - Otherwise
376 */
377 static bool iterate_and_get_overlapping_row_locks2(
378 const concurrent_tree::locked_keyrange *lkr, const DBT *left_key,
379 const DBT *right_key, comparator *cmp, TXNID,
380 GrowableArray<row_lock> *row_locks) {
381 struct copy_fn_obj {
382 GrowableArray<row_lock> *row_locks;
383 bool first_call = true;
384 bool matching_lock_found = false;
385 const DBT *left_key, *right_key;
386 comparator *cmp;
387
388 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
389 TxnidVector *owners) {
390 if (first_call) {
391 first_call = false;
392 if (is_shared && !(*cmp)(left_key, range.get_left_key()) &&
393 !(*cmp)(right_key, range.get_right_key())) {
394 matching_lock_found = true;
395 }
396 } else {
397 // if we see multiple matching locks, it doesn't matter whether
398 // the first one was matching.
399 matching_lock_found = false;
400 }
401 row_lock lock = {.range = range,
402 .txnid = txnid,
403 .is_shared = is_shared,
404 .owners = owners};
405 row_locks->push(lock);
406 return true;
407 }
408 } copy_fn;
409 copy_fn.row_locks = row_locks;
410 copy_fn.left_key = left_key;
411 copy_fn.right_key = right_key;
412 copy_fn.cmp = cmp;
413 lkr->iterate(&copy_fn);
414 return copy_fn.matching_lock_found;
415 }
416
417 // try to acquire a lock and consolidate it with existing locks if possible
418 // param: lkr, a prepared locked keyrange
419 // return: 0 on success, DB_LOCK_NOTGRANTED if conflicting locks exist.
420 int locktree::acquire_lock_consolidated(void *prepared_lkr, TXNID txnid,
421 const DBT *left_key,
422 const DBT *right_key,
423 bool is_write_request,
424 txnid_set *conflicts) {
425 int r = 0;
426 concurrent_tree::locked_keyrange *lkr;
427
428 keyrange requested_range;
429 requested_range.create(left_key, right_key);
430 lkr = static_cast<concurrent_tree::locked_keyrange *>(prepared_lkr);
431 lkr->acquire(requested_range);
432
433 // copy out the set of overlapping row locks.
434 GrowableArray<row_lock> overlapping_row_locks;
435 overlapping_row_locks.init();
436 bool matching_shared_lock_found = false;
437
438 if (is_write_request)
439 iterate_and_get_overlapping_row_locks(lkr, &overlapping_row_locks);
440 else {
441 matching_shared_lock_found = iterate_and_get_overlapping_row_locks2(
442 lkr, left_key, right_key, &m_cmp, txnid, &overlapping_row_locks);
443 // psergey-todo: what to do now? So, we have figured we have just one
444 // shareable lock. Need to add us into it as an owner but the lock
445 // pointer cannot be kept?
446 // A: use find_node_with_overlapping_child(key_range, nullptr);
447 // then, add ourselves to the owner list.
448 // Dont' foreget to release the subtree after that.
449 }
450
451 if (matching_shared_lock_found) {
452 // there is just one non-confliting matching shared lock.
453 // we are hilding a lock on it (see acquire() call above).
454 // we need to modify it to indicate there is another locker...
455 if (lkr->add_shared_owner(requested_range, txnid)) {
456 // Pretend shared lock uses as much memory.
457 row_lock new_lock = {.range = requested_range,
458 .txnid = txnid,
459 .is_shared = false,
460 .owners = nullptr};
461 uint64_t mem_used = row_lock_size_in_tree(new_lock);
462 if (m_mgr) {
463 m_mgr->note_mem_used(mem_used);
464 }
465 }
466 requested_range.destroy();
467 overlapping_row_locks.deinit();
468 return 0;
469 }
470
471 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
472
473 // if any overlapping row locks conflict with this request, bail out.
474
475 bool conflicts_exist =
476 determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
477 if (!conflicts_exist) {
478 // there are no conflicts, so all of the overlaps are for the requesting
479 // txnid. so, we must consolidate all existing overlapping ranges and the
480 // requested range into one dominating range. then we insert the dominating
481 // range.
482 bool all_shared = !is_write_request;
483 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
484 row_lock overlapping_lock = overlapping_row_locks.fetch_unchecked(i);
485 invariant(overlapping_lock.txnid == txnid);
486 requested_range.extend(m_cmp, overlapping_lock.range);
487 remove_row_lock_from_tree(lkr, overlapping_lock, TXNID_ANY, m_mgr);
488 all_shared = all_shared && overlapping_lock.is_shared;
489 }
490
491 row_lock new_lock = {.range = requested_range,
492 .txnid = txnid,
493 .is_shared = all_shared,
494 .owners = nullptr};
495 insert_row_lock_into_tree(lkr, new_lock, m_mgr);
496 } else {
497 r = DB_LOCK_NOTGRANTED;
498 }
499
500 requested_range.destroy();
501 overlapping_row_locks.deinit();
502 return r;
503 }
504
505 // acquire a lock in the given key range, inclusive. if successful,
506 // return 0. otherwise, populate the conflicts txnid_set with the set of
507 // transactions that conflict with this request.
508 int locktree::acquire_lock(bool is_write_request, TXNID txnid,
509 const DBT *left_key, const DBT *right_key,
510 txnid_set *conflicts) {
511 int r = 0;
512
513 // we are only supporting write locks for simplicity
514 // invariant(is_write_request);
515
516 // acquire and prepare a locked keyrange over the requested range.
517 // prepare is a serialzation point, so we take the opportunity to
518 // try the single txnid optimization first.
519 concurrent_tree::locked_keyrange lkr;
520 lkr.prepare(m_rangetree);
521
522 bool acquired =
523 sto_try_acquire(&lkr, txnid, left_key, right_key, is_write_request);
524 if (!acquired) {
525 r = acquire_lock_consolidated(&lkr, txnid, left_key, right_key,
526 is_write_request, conflicts);
527 }
528
529 lkr.release();
530 return r;
531 }
532
533 int locktree::try_acquire_lock(bool is_write_request, TXNID txnid,
534 const DBT *left_key, const DBT *right_key,
535 txnid_set *conflicts, bool big_txn) {
536 // All ranges in the locktree must have left endpoints <= right endpoints.
537 // Range comparisons rely on this fact, so we make a paranoid invariant here.
538 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
539 int r = m_mgr == nullptr ? 0 : m_mgr->check_current_lock_constraints(big_txn);
540 if (r == 0) {
541 r = acquire_lock(is_write_request, txnid, left_key, right_key, conflicts);
542 }
543 return r;
544 }
545
546 // the locktree silently upgrades read locks to write locks for simplicity
547 int locktree::acquire_read_lock(TXNID txnid, const DBT *left_key,
548 const DBT *right_key, txnid_set *conflicts,
549 bool big_txn) {
550 return try_acquire_lock(false, txnid, left_key, right_key, conflicts,
551 big_txn);
552 }
553
554 int locktree::acquire_write_lock(TXNID txnid, const DBT *left_key,
555 const DBT *right_key, txnid_set *conflicts,
556 bool big_txn) {
557 return try_acquire_lock(true, txnid, left_key, right_key, conflicts, big_txn);
558 }
559
560 // typedef void (*dump_callback)(void *cdata, const DBT *left, const DBT *right,
561 // TXNID txnid);
562 void locktree::dump_locks(void *cdata, dump_callback cb) {
563 concurrent_tree::locked_keyrange lkr;
564 keyrange range;
565 range.create(toku_dbt_negative_infinity(), toku_dbt_positive_infinity());
566
567 lkr.prepare(m_rangetree);
568 lkr.acquire(range);
569
570 TXNID sto_txn;
571 if ((sto_txn = toku_unsafe_fetch(m_sto_txnid)) != TXNID_NONE) {
572 // insert all of the ranges from the single txnid buffer into a new rangtree
573 range_buffer::iterator iter(&m_sto_buffer);
574 range_buffer::iterator::record rec;
575 while (iter.current(&rec)) {
576 (*cb)(cdata, rec.get_left_key(), rec.get_right_key(), sto_txn,
577 !rec.get_exclusive_flag(), nullptr);
578 iter.next();
579 }
580 } else {
581 GrowableArray<row_lock> all_locks;
582 all_locks.init();
583 iterate_and_get_overlapping_row_locks(&lkr, &all_locks);
584
585 const size_t n_locks = all_locks.get_size();
586 for (size_t i = 0; i < n_locks; i++) {
587 const row_lock lock = all_locks.fetch_unchecked(i);
588 (*cb)(cdata, lock.range.get_left_key(), lock.range.get_right_key(),
589 lock.txnid, lock.is_shared, lock.owners);
590 }
591 all_locks.deinit();
592 }
593 lkr.release();
594 range.destroy();
595 }
596
597 void locktree::get_conflicts(bool is_write_request, TXNID txnid,
598 const DBT *left_key, const DBT *right_key,
599 txnid_set *conflicts) {
600 // because we only support write locks, ignore this bit for now.
601 (void)is_write_request;
602
603 // preparing and acquire a locked keyrange over the range
604 keyrange range;
605 range.create(left_key, right_key);
606 concurrent_tree::locked_keyrange lkr;
607 lkr.prepare(m_rangetree);
608 lkr.acquire(range);
609
610 // copy out the set of overlapping row locks and determine the conflicts
611 GrowableArray<row_lock> overlapping_row_locks;
612 overlapping_row_locks.init();
613 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
614
615 // we don't care if conflicts exist. we just want the conflicts set populated.
616 (void)determine_conflicting_txnids(overlapping_row_locks, txnid, conflicts);
617
618 lkr.release();
619 overlapping_row_locks.deinit();
620 range.destroy();
621 }
622
623 // Effect:
624 // For each range in the lock tree that overlaps the given range and has
625 // the given txnid, remove it.
626 // Rationale:
627 // In the common case, there is only the range [left_key, right_key] and
628 // it is associated with txnid, so this is a single tree delete.
629 //
630 // However, consolidation and escalation change the objects in the tree
631 // without telling the txn anything. In this case, the txn may own a
632 // large range lock that represents its ownership of many smaller range
633 // locks. For example, the txn may think it owns point locks on keys 1,
634 // 2, and 3, but due to escalation, only the object [1,3] exists in the
635 // tree.
636 //
637 // The first call for a small lock will remove the large range lock, and
638 // the rest of the calls should do nothing. After the first release,
639 // another thread can acquire one of the locks that the txn thinks it
640 // still owns. That's ok, because the txn doesn't want it anymore (it
641 // unlocks everything at once), but it may find a lock that it does not
642 // own.
643 //
644 // In our example, the txn unlocks key 1, which actually removes the
645 // whole lock [1,3]. Now, someone else can lock 2 before our txn gets
646 // around to unlocking 2, so we should not remove that lock.
647 void locktree::remove_overlapping_locks_for_txnid(TXNID txnid,
648 const DBT *left_key,
649 const DBT *right_key) {
650 keyrange release_range;
651 release_range.create(left_key, right_key);
652
653 // acquire and prepare a locked keyrange over the release range
654 concurrent_tree::locked_keyrange lkr;
655 lkr.prepare(m_rangetree);
656 lkr.acquire(release_range);
657
658 // copy out the set of overlapping row locks.
659 GrowableArray<row_lock> overlapping_row_locks;
660 overlapping_row_locks.init();
661 iterate_and_get_overlapping_row_locks(&lkr, &overlapping_row_locks);
662 size_t num_overlapping_row_locks = overlapping_row_locks.get_size();
663
664 for (size_t i = 0; i < num_overlapping_row_locks; i++) {
665 row_lock lock = overlapping_row_locks.fetch_unchecked(i);
666 // If this isn't our lock, that's ok, just don't remove it.
667 // See rationale above.
668 // psergey-todo: for shared locks, just remove ourselves from the
669 // owners.
670 if (lock.txnid == txnid || (lock.owners && lock.owners->contains(txnid))) {
671 remove_row_lock_from_tree(&lkr, lock, txnid, m_mgr);
672 }
673 }
674
675 lkr.release();
676 overlapping_row_locks.deinit();
677 release_range.destroy();
678 }
679
680 bool locktree::sto_txnid_is_valid_unsafe(void) const {
681 return toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE;
682 }
683
684 int locktree::sto_get_score_unsafe(void) const {
685 return toku_unsafe_fetch(m_sto_score);
686 }
687
688 bool locktree::sto_try_release(TXNID txnid) {
689 bool released = false;
690 if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
691 // check the bit again with a prepared locked keyrange,
692 // which protects the optimization bits and rangetree data
693 concurrent_tree::locked_keyrange lkr;
694 lkr.prepare(m_rangetree);
695 if (m_sto_txnid != TXNID_NONE) {
696 // this txnid better be the single txnid on this locktree,
697 // or else we are in big trouble (meaning the logic is broken)
698 invariant(m_sto_txnid == txnid);
699 invariant(m_rangetree->is_empty());
700 sto_end();
701 released = true;
702 }
703 lkr.release();
704 }
705 return released;
706 }
707
708 // release all of the locks for a txnid whose endpoints are pairs
709 // in the given range buffer.
710 void locktree::release_locks(TXNID txnid, const range_buffer *ranges,
711 bool all_trx_locks_hint) {
712 // try the single txn optimization. if it worked, then all of the
713 // locks are already released, otherwise we need to do it here.
714 bool released;
715 if (all_trx_locks_hint) {
716 // This will release all of the locks the transaction is holding
717 released = sto_try_release(txnid);
718 } else {
719 /*
720 psergey: we are asked to release *Some* of the locks the transaction
721 is holding.
722 We could try doing that without leaving the STO mode, but right now,
723 the easiest way is to exit the STO mode and let the non-STO code path
724 handle it.
725 */
726 if (toku_unsafe_fetch(m_sto_txnid) != TXNID_NONE) {
727 // check the bit again with a prepared locked keyrange,
728 // which protects the optimization bits and rangetree data
729 concurrent_tree::locked_keyrange lkr;
730 lkr.prepare(m_rangetree);
731 if (m_sto_txnid != TXNID_NONE) {
732 sto_end_early(&lkr);
733 }
734 lkr.release();
735 }
736 released = false;
737 }
738 if (!released) {
739 range_buffer::iterator iter(ranges);
740 range_buffer::iterator::record rec;
741 while (iter.current(&rec)) {
742 const DBT *left_key = rec.get_left_key();
743 const DBT *right_key = rec.get_right_key();
744 // All ranges in the locktree must have left endpoints <= right endpoints.
745 // Range comparisons rely on this fact, so we make a paranoid invariant
746 // here.
747 paranoid_invariant(m_cmp(left_key, right_key) <= 0);
748 remove_overlapping_locks_for_txnid(txnid, left_key, right_key);
749 iter.next();
750 }
751 // Increase the sto score slightly. Eventually it will hit
752 // the threshold and we'll try the optimization again. This
753 // is how a previously multithreaded system transitions into
754 // a single threaded system that benefits from the optimization.
755 if (toku_unsafe_fetch(m_sto_score) < STO_SCORE_THRESHOLD) {
756 toku_sync_fetch_and_add(&m_sto_score, 1);
757 }
758 }
759 }
760
761 // iterate over a locked keyrange and extract copies of the first N
762 // row locks, storing each one into the given array of size N,
763 // then removing each extracted lock from the locked keyrange.
764 static int extract_first_n_row_locks(concurrent_tree::locked_keyrange *lkr,
765 locktree_manager *mgr, row_lock *row_locks,
766 int num_to_extract) {
767 struct extract_fn_obj {
768 int num_extracted;
769 int num_to_extract;
770 row_lock *row_locks;
771 bool fn(const keyrange &range, TXNID txnid, bool is_shared,
772 TxnidVector *owners) {
773 if (num_extracted < num_to_extract) {
774 row_lock lock;
775 lock.range.create_copy(range);
776 lock.txnid = txnid;
777 lock.is_shared = is_shared;
778 // deep-copy the set of owners:
779 if (owners)
780 lock.owners = new TxnidVector(*owners);
781 else
782 lock.owners = nullptr;
783 row_locks[num_extracted++] = lock;
784 return true;
785 } else {
786 return false;
787 }
788 }
789 } extract_fn;
790
791 extract_fn.row_locks = row_locks;
792 extract_fn.num_to_extract = num_to_extract;
793 extract_fn.num_extracted = 0;
794 lkr->iterate(&extract_fn);
795
796 // now that the ranges have been copied out, complete
797 // the extraction by removing the ranges from the tree.
798 // use remove_row_lock_from_tree() so we properly track the
799 // amount of memory and number of locks freed.
800 int num_extracted = extract_fn.num_extracted;
801 invariant(num_extracted <= num_to_extract);
802 for (int i = 0; i < num_extracted; i++) {
803 remove_row_lock_from_tree(lkr, row_locks[i], TXNID_ANY, mgr);
804 }
805
806 return num_extracted;
807 }
808
809 // Store each newly escalated lock in a range buffer for appropriate txnid.
810 // We'll rebuild the locktree by iterating over these ranges, and then we
811 // can pass back each txnid/buffer pair individually through a callback
812 // to notify higher layers that locks have changed.
813 struct txnid_range_buffer {
814 TXNID txnid;
815 range_buffer buffer;
816
817 static int find_by_txnid(struct txnid_range_buffer *const &other_buffer,
818 const TXNID &txnid) {
819 if (txnid < other_buffer->txnid) {
820 return -1;
821 } else if (other_buffer->txnid == txnid) {
822 return 0;
823 } else {
824 return 1;
825 }
826 }
827 };
828
829 // escalate the locks in the locktree by merging adjacent
830 // locks that have the same txnid into one larger lock.
831 //
832 // if there's only one txnid in the locktree then this
833 // approach works well. if there are many txnids and each
834 // has locks in a random/alternating order, then this does
835 // not work so well.
836 void locktree::escalate(lt_escalate_cb after_escalate_callback,
837 void *after_escalate_callback_extra) {
838 omt<struct txnid_range_buffer *, struct txnid_range_buffer *> range_buffers;
839 range_buffers.create();
840
841 // prepare and acquire a locked keyrange on the entire locktree
842 concurrent_tree::locked_keyrange lkr;
843 keyrange infinite_range = keyrange::get_infinite_range();
844 lkr.prepare(m_rangetree);
845 lkr.acquire(infinite_range);
846
847 // if we're in the single txnid optimization, simply call it off.
848 // if you have to run escalation, you probably don't care about
849 // the optimization anyway, and this makes things easier.
850 if (m_sto_txnid != TXNID_NONE) {
851 // We are already accounting for this escalation time and
852 // count, so don't do it for sto_end_early too.
853 sto_end_early_no_accounting(&lkr);
854 }
855
856 // extract and remove batches of row locks from the locktree
857 int num_extracted;
858 const int num_row_locks_per_batch = 128;
859 row_lock *XCALLOC_N(num_row_locks_per_batch, extracted_buf);
860
861 // we always remove the "first" n because we are removing n
862 // each time we do an extraction. so this loops until its empty.
863 while ((num_extracted = extract_first_n_row_locks(
864 &lkr, m_mgr, extracted_buf, num_row_locks_per_batch)) > 0) {
865 int current_index = 0;
866 while (current_index < num_extracted) {
867 // every batch of extracted locks is in range-sorted order. search
868 // through them and merge adjacent locks with the same txnid into
869 // one dominating lock and save it to a set of escalated locks.
870 //
871 // first, find the index of the next row lock that
872 // - belongs to a different txnid, or
873 // - belongs to several txnids, or
874 // - is a shared lock (we could potentially merge those but
875 // currently we don't), or
876 // - is across a lock escalation barrier.
877 int next_txnid_index = current_index + 1;
878
879 while (next_txnid_index < num_extracted &&
880 (extracted_buf[current_index].txnid ==
881 extracted_buf[next_txnid_index].txnid) &&
882 !extracted_buf[next_txnid_index].is_shared &&
883 !extracted_buf[next_txnid_index].owners &&
884 !m_escalation_barrier(
885 extracted_buf[current_index].range.get_right_key(),
886 extracted_buf[next_txnid_index].range.get_left_key(),
887 m_escalation_barrier_arg)) {
888 next_txnid_index++;
889 }
890
891 // Create an escalated range for the current txnid that dominates
892 // each range between the current indext and the next txnid's index.
893 // const TXNID current_txnid = extracted_buf[current_index].txnid;
894 const DBT *escalated_left_key =
895 extracted_buf[current_index].range.get_left_key();
896 const DBT *escalated_right_key =
897 extracted_buf[next_txnid_index - 1].range.get_right_key();
898
899 // Try to find a range buffer for the current txnid. Create one if it
900 // doesn't exist. Then, append the new escalated range to the buffer. (If
901 // a lock is shared by multiple txnids, append it each of txnid's lists)
902 TxnidVector *owners_ptr;
903 TxnidVector singleton_owner;
904 if (extracted_buf[current_index].owners)
905 owners_ptr = extracted_buf[current_index].owners;
906 else {
907 singleton_owner.insert(extracted_buf[current_index].txnid);
908 owners_ptr = &singleton_owner;
909 }
910
911 for (auto cur_txnid : *owners_ptr) {
912 uint32_t idx;
913 struct txnid_range_buffer *existing_range_buffer;
914 int r =
915 range_buffers.find_zero<TXNID, txnid_range_buffer::find_by_txnid>(
916 cur_txnid, &existing_range_buffer, &idx);
917 if (r == DB_NOTFOUND) {
918 struct txnid_range_buffer *XMALLOC(new_range_buffer);
919 new_range_buffer->txnid = cur_txnid;
920 new_range_buffer->buffer.create();
921 new_range_buffer->buffer.append(
922 escalated_left_key, escalated_right_key,
923 !extracted_buf[current_index].is_shared);
924 range_buffers.insert_at(new_range_buffer, idx);
925 } else {
926 invariant_zero(r);
927 invariant(existing_range_buffer->txnid == cur_txnid);
928 existing_range_buffer->buffer.append(
929 escalated_left_key, escalated_right_key,
930 !extracted_buf[current_index].is_shared);
931 }
932 }
933
934 current_index = next_txnid_index;
935 }
936
937 // destroy the ranges copied during the extraction
938 for (int i = 0; i < num_extracted; i++) {
939 delete extracted_buf[i].owners;
940 extracted_buf[i].range.destroy();
941 }
942 }
943 toku_free(extracted_buf);
944
945 // Rebuild the locktree from each range in each range buffer,
946 // then notify higher layers that the txnid's locks have changed.
947 //
948 // (shared locks: if a lock was initially shared between transactions TRX1,
949 // TRX2, etc, we will now try to acquire it acting on behalf on TRX1, on
950 // TRX2, etc. This will succeed and an identical shared lock will be
951 // constructed)
952
953 invariant(m_rangetree->is_empty());
954 const uint32_t num_range_buffers = range_buffers.size();
955 for (uint32_t i = 0; i < num_range_buffers; i++) {
956 struct txnid_range_buffer *current_range_buffer;
957 int r = range_buffers.fetch(i, &current_range_buffer);
958 invariant_zero(r);
959 if (r == EINVAL) // Shouldn't happen, avoid compiler warning
960 continue;
961
962 const TXNID current_txnid = current_range_buffer->txnid;
963 range_buffer::iterator iter(&current_range_buffer->buffer);
964 range_buffer::iterator::record rec;
965 while (iter.current(&rec)) {
966 keyrange range;
967 range.create(rec.get_left_key(), rec.get_right_key());
968 row_lock lock = {.range = range,
969 .txnid = current_txnid,
970 .is_shared = !rec.get_exclusive_flag(),
971 .owners = nullptr};
972 insert_row_lock_into_tree(&lkr, lock, m_mgr);
973 iter.next();
974 }
975
976 // Notify higher layers that locks have changed for the current txnid
977 if (after_escalate_callback) {
978 after_escalate_callback(current_txnid, this, current_range_buffer->buffer,
979 after_escalate_callback_extra);
980 }
981 current_range_buffer->buffer.destroy();
982 }
983
984 while (range_buffers.size() > 0) {
985 struct txnid_range_buffer *buffer;
986 int r = range_buffers.fetch(0, &buffer);
987 invariant_zero(r);
988 r = range_buffers.delete_at(0);
989 invariant_zero(r);
990 toku_free(buffer);
991 }
992 range_buffers.destroy();
993
994 lkr.release();
995 }
996
997 void *locktree::get_userdata(void) const { return m_userdata; }
998
999 void locktree::set_userdata(void *userdata) { m_userdata = userdata; }
1000
1001 struct lt_lock_request_info *locktree::get_lock_request_info(void) {
1002 return &m_lock_request_info;
1003 }
1004
1005 void locktree::set_comparator(const comparator &cmp) { m_cmp.inherit(cmp); }
1006
1007 locktree_manager *locktree::get_manager(void) const { return m_mgr; }
1008
1009 int locktree::compare(const locktree *lt) const {
1010 if (m_dict_id.dictid < lt->m_dict_id.dictid) {
1011 return -1;
1012 } else if (m_dict_id.dictid == lt->m_dict_id.dictid) {
1013 return 0;
1014 } else {
1015 return 1;
1016 }
1017 }
1018
1019 DICTIONARY_ID locktree::get_dict_id() const { return m_dict_id; }
1020
1021 } /* namespace toku */
1022 #endif // OS_WIN
1023 #endif // ROCKSDB_LITE