]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / utilities / transactions / pessimistic_transaction_db.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/transactions/pessimistic_transaction_db.h"
9
10 #include <cinttypes>
11 #include <string>
12 #include <unordered_set>
13 #include <vector>
14
15 #include "db/db_impl/db_impl.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/options.h"
18 #include "rocksdb/utilities/transaction_db.h"
19 #include "test_util/sync_point.h"
20 #include "util/cast_util.h"
21 #include "util/mutexlock.h"
22 #include "utilities/transactions/pessimistic_transaction.h"
23 #include "utilities/transactions/transaction_db_mutex_impl.h"
24 #include "utilities/transactions/write_prepared_txn_db.h"
25 #include "utilities/transactions/write_unprepared_txn_db.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 PessimisticTransactionDB::PessimisticTransactionDB(
30 DB* db, const TransactionDBOptions& txn_db_options)
31 : TransactionDB(db),
32 db_impl_(static_cast_with_check<DBImpl, DB>(db)),
33 txn_db_options_(txn_db_options),
34 lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
35 txn_db_options_.max_num_deadlocks,
36 txn_db_options_.custom_mutex_factory
37 ? txn_db_options_.custom_mutex_factory
38 : std::shared_ptr<TransactionDBMutexFactory>(
39 new TransactionDBMutexFactoryImpl())) {
40 assert(db_impl_ != nullptr);
41 info_log_ = db_impl_->GetDBOptions().info_log;
42 }
43
44 // Support initiliazing PessimisticTransactionDB from a stackable db
45 //
46 // PessimisticTransactionDB
47 // ^ ^
48 // | |
49 // | +
50 // | StackableDB
51 // | ^
52 // | |
53 // + +
54 // DBImpl
55 // ^
56 // |(inherit)
57 // +
58 // DB
59 //
60 PessimisticTransactionDB::PessimisticTransactionDB(
61 StackableDB* db, const TransactionDBOptions& txn_db_options)
62 : TransactionDB(db),
63 db_impl_(static_cast_with_check<DBImpl, DB>(db->GetRootDB())),
64 txn_db_options_(txn_db_options),
65 lock_mgr_(this, txn_db_options_.num_stripes, txn_db_options.max_num_locks,
66 txn_db_options_.max_num_deadlocks,
67 txn_db_options_.custom_mutex_factory
68 ? txn_db_options_.custom_mutex_factory
69 : std::shared_ptr<TransactionDBMutexFactory>(
70 new TransactionDBMutexFactoryImpl())) {
71 assert(db_impl_ != nullptr);
72 }
73
74 PessimisticTransactionDB::~PessimisticTransactionDB() {
75 while (!transactions_.empty()) {
76 delete transactions_.begin()->second;
77 // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
78 // clear whether delete would also remove the entry from transactions_.
79 }
80 }
81
82 Status PessimisticTransactionDB::VerifyCFOptions(const ColumnFamilyOptions&) {
83 return Status::OK();
84 }
85
86 Status PessimisticTransactionDB::Initialize(
87 const std::vector<size_t>& compaction_enabled_cf_indices,
88 const std::vector<ColumnFamilyHandle*>& handles) {
89 for (auto cf_ptr : handles) {
90 AddColumnFamily(cf_ptr);
91 }
92 // Verify cf options
93 for (auto handle : handles) {
94 ColumnFamilyDescriptor cfd;
95 Status s = handle->GetDescriptor(&cfd);
96 if (!s.ok()) {
97 return s;
98 }
99 s = VerifyCFOptions(cfd.options);
100 if (!s.ok()) {
101 return s;
102 }
103 }
104
105 // Re-enable compaction for the column families that initially had
106 // compaction enabled.
107 std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
108 compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
109 for (auto index : compaction_enabled_cf_indices) {
110 compaction_enabled_cf_handles.push_back(handles[index]);
111 }
112
113 Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
114
115 // create 'real' transactions from recovered shell transactions
116 auto dbimpl = static_cast_with_check<DBImpl, DB>(GetRootDB());
117 assert(dbimpl != nullptr);
118 auto rtrxs = dbimpl->recovered_transactions();
119
120 for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
121 auto recovered_trx = it->second;
122 assert(recovered_trx);
123 assert(recovered_trx->batches_.size() == 1);
124 const auto& seq = recovered_trx->batches_.begin()->first;
125 const auto& batch_info = recovered_trx->batches_.begin()->second;
126 assert(batch_info.log_number_);
127 assert(recovered_trx->name_.length());
128
129 WriteOptions w_options;
130 w_options.sync = true;
131 TransactionOptions t_options;
132 // This would help avoiding deadlock for keys that although exist in the WAL
133 // did not go through concurrency control. This includes the merge that
134 // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
135 // there is a conflict between the keys of two transactions that must be
136 // avoided, it is already avoided by the application, MyRocks, before the
137 // restart (ii) application, MyRocks, guarntees to rollback/commit the
138 // recovered transactions before new transactions start.
139 t_options.skip_concurrency_control = true;
140
141 Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
142 assert(real_trx);
143 real_trx->SetLogNumber(batch_info.log_number_);
144 assert(seq != kMaxSequenceNumber);
145 if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
146 real_trx->SetId(seq);
147 }
148
149 s = real_trx->SetName(recovered_trx->name_);
150 if (!s.ok()) {
151 break;
152 }
153
154 s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
155 // WriteCommitted set this to to disable this check that is specific to
156 // WritePrepared txns
157 assert(batch_info.batch_cnt_ == 0 ||
158 real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
159 real_trx->SetState(Transaction::PREPARED);
160 if (!s.ok()) {
161 break;
162 }
163 }
164 if (s.ok()) {
165 dbimpl->DeleteAllRecoveredTransactions();
166 }
167 return s;
168 }
169
170 Transaction* WriteCommittedTxnDB::BeginTransaction(
171 const WriteOptions& write_options, const TransactionOptions& txn_options,
172 Transaction* old_txn) {
173 if (old_txn != nullptr) {
174 ReinitializeTransaction(old_txn, write_options, txn_options);
175 return old_txn;
176 } else {
177 return new WriteCommittedTxn(this, write_options, txn_options);
178 }
179 }
180
181 TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
182 const TransactionDBOptions& txn_db_options) {
183 TransactionDBOptions validated = txn_db_options;
184
185 if (txn_db_options.num_stripes == 0) {
186 validated.num_stripes = 1;
187 }
188
189 return validated;
190 }
191
192 Status TransactionDB::Open(const Options& options,
193 const TransactionDBOptions& txn_db_options,
194 const std::string& dbname, TransactionDB** dbptr) {
195 DBOptions db_options(options);
196 ColumnFamilyOptions cf_options(options);
197 std::vector<ColumnFamilyDescriptor> column_families;
198 column_families.push_back(
199 ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
200 std::vector<ColumnFamilyHandle*> handles;
201 Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
202 column_families, &handles, dbptr);
203 if (s.ok()) {
204 assert(handles.size() == 1);
205 // i can delete the handle since DBImpl is always holding a reference to
206 // default column family
207 delete handles[0];
208 }
209
210 return s;
211 }
212
213 Status TransactionDB::Open(
214 const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
215 const std::string& dbname,
216 const std::vector<ColumnFamilyDescriptor>& column_families,
217 std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
218 Status s;
219 DB* db = nullptr;
220 if (txn_db_options.write_policy == WRITE_COMMITTED &&
221 db_options.unordered_write) {
222 return Status::NotSupported(
223 "WRITE_COMMITTED is incompatible with unordered_writes");
224 }
225 if (txn_db_options.write_policy == WRITE_UNPREPARED &&
226 db_options.unordered_write) {
227 // TODO(lth): support it
228 return Status::NotSupported(
229 "WRITE_UNPREPARED is currently incompatible with unordered_writes");
230 }
231 if (txn_db_options.write_policy == WRITE_PREPARED &&
232 db_options.unordered_write && !db_options.two_write_queues) {
233 return Status::NotSupported(
234 "WRITE_PREPARED is incompatible with unordered_writes if "
235 "two_write_queues is not enabled.");
236 }
237
238 std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
239 std::vector<size_t> compaction_enabled_cf_indices;
240 DBOptions db_options_2pc = db_options;
241 PrepareWrap(&db_options_2pc, &column_families_copy,
242 &compaction_enabled_cf_indices);
243 const bool use_seq_per_batch =
244 txn_db_options.write_policy == WRITE_PREPARED ||
245 txn_db_options.write_policy == WRITE_UNPREPARED;
246 const bool use_batch_per_txn =
247 txn_db_options.write_policy == WRITE_COMMITTED ||
248 txn_db_options.write_policy == WRITE_PREPARED;
249 s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
250 use_seq_per_batch, use_batch_per_txn);
251 if (s.ok()) {
252 ROCKS_LOG_WARN(db->GetDBOptions().info_log,
253 "Transaction write_policy is %" PRId32,
254 static_cast<int>(txn_db_options.write_policy));
255 s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
256 dbptr);
257 }
258 if (!s.ok()) {
259 // just in case it was not deleted (and not set to nullptr).
260 delete db;
261 }
262 return s;
263 }
264
265 void TransactionDB::PrepareWrap(
266 DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
267 std::vector<size_t>* compaction_enabled_cf_indices) {
268 compaction_enabled_cf_indices->clear();
269
270 // Enable MemTable History if not already enabled
271 for (size_t i = 0; i < column_families->size(); i++) {
272 ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
273
274 if (cf_options->max_write_buffer_size_to_maintain == 0 &&
275 cf_options->max_write_buffer_number_to_maintain == 0) {
276 // Setting to -1 will set the History size to
277 // max_write_buffer_number * write_buffer_size.
278 cf_options->max_write_buffer_size_to_maintain = -1;
279 }
280 if (!cf_options->disable_auto_compactions) {
281 // Disable compactions momentarily to prevent race with DB::Open
282 cf_options->disable_auto_compactions = true;
283 compaction_enabled_cf_indices->push_back(i);
284 }
285 }
286 db_options->allow_2pc = true;
287 }
288
289 Status TransactionDB::WrapDB(
290 // make sure this db is already opened with memtable history enabled,
291 // auto compaction distabled and 2 phase commit enabled
292 DB* db, const TransactionDBOptions& txn_db_options,
293 const std::vector<size_t>& compaction_enabled_cf_indices,
294 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
295 assert(db != nullptr);
296 assert(dbptr != nullptr);
297 *dbptr = nullptr;
298 std::unique_ptr<PessimisticTransactionDB> txn_db;
299 switch (txn_db_options.write_policy) {
300 case WRITE_UNPREPARED:
301 txn_db.reset(new WriteUnpreparedTxnDB(
302 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
303 break;
304 case WRITE_PREPARED:
305 txn_db.reset(new WritePreparedTxnDB(
306 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
307 break;
308 case WRITE_COMMITTED:
309 default:
310 txn_db.reset(new WriteCommittedTxnDB(
311 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
312 }
313 txn_db->UpdateCFComparatorMap(handles);
314 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
315 // In case of a failure at this point, db is deleted via the txn_db destructor
316 // and set to nullptr.
317 if (s.ok()) {
318 *dbptr = txn_db.release();
319 }
320 return s;
321 }
322
323 Status TransactionDB::WrapStackableDB(
324 // make sure this stackable_db is already opened with memtable history
325 // enabled, auto compaction distabled and 2 phase commit enabled
326 StackableDB* db, const TransactionDBOptions& txn_db_options,
327 const std::vector<size_t>& compaction_enabled_cf_indices,
328 const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
329 assert(db != nullptr);
330 assert(dbptr != nullptr);
331 *dbptr = nullptr;
332 std::unique_ptr<PessimisticTransactionDB> txn_db;
333
334 switch (txn_db_options.write_policy) {
335 case WRITE_UNPREPARED:
336 txn_db.reset(new WriteUnpreparedTxnDB(
337 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
338 break;
339 case WRITE_PREPARED:
340 txn_db.reset(new WritePreparedTxnDB(
341 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
342 break;
343 case WRITE_COMMITTED:
344 default:
345 txn_db.reset(new WriteCommittedTxnDB(
346 db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
347 }
348 txn_db->UpdateCFComparatorMap(handles);
349 Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
350 // In case of a failure at this point, db is deleted via the txn_db destructor
351 // and set to nullptr.
352 if (s.ok()) {
353 *dbptr = txn_db.release();
354 }
355 return s;
356 }
357
358 // Let TransactionLockMgr know that this column family exists so it can
359 // allocate a LockMap for it.
360 void PessimisticTransactionDB::AddColumnFamily(
361 const ColumnFamilyHandle* handle) {
362 lock_mgr_.AddColumnFamily(handle->GetID());
363 }
364
365 Status PessimisticTransactionDB::CreateColumnFamily(
366 const ColumnFamilyOptions& options, const std::string& column_family_name,
367 ColumnFamilyHandle** handle) {
368 InstrumentedMutexLock l(&column_family_mutex_);
369 Status s = VerifyCFOptions(options);
370 if (!s.ok()) {
371 return s;
372 }
373
374 s = db_->CreateColumnFamily(options, column_family_name, handle);
375 if (s.ok()) {
376 lock_mgr_.AddColumnFamily((*handle)->GetID());
377 UpdateCFComparatorMap(*handle);
378 }
379
380 return s;
381 }
382
383 // Let TransactionLockMgr know that it can deallocate the LockMap for this
384 // column family.
385 Status PessimisticTransactionDB::DropColumnFamily(
386 ColumnFamilyHandle* column_family) {
387 InstrumentedMutexLock l(&column_family_mutex_);
388
389 Status s = db_->DropColumnFamily(column_family);
390 if (s.ok()) {
391 lock_mgr_.RemoveColumnFamily(column_family->GetID());
392 }
393
394 return s;
395 }
396
397 Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
398 uint32_t cfh_id,
399 const std::string& key,
400 bool exclusive) {
401 return lock_mgr_.TryLock(txn, cfh_id, key, GetEnv(), exclusive);
402 }
403
404 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
405 const TransactionKeyMap* keys) {
406 lock_mgr_.UnLock(txn, keys, GetEnv());
407 }
408
409 void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
410 uint32_t cfh_id, const std::string& key) {
411 lock_mgr_.UnLock(txn, cfh_id, key, GetEnv());
412 }
413
414 // Used when wrapping DB write operations in a transaction
415 Transaction* PessimisticTransactionDB::BeginInternalTransaction(
416 const WriteOptions& options) {
417 TransactionOptions txn_options;
418 Transaction* txn = BeginTransaction(options, txn_options, nullptr);
419
420 // Use default timeout for non-transactional writes
421 txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
422 return txn;
423 }
424
425 // All user Put, Merge, Delete, and Write requests must be intercepted to make
426 // sure that they lock all keys that they are writing to avoid causing conflicts
427 // with any concurrent transactions. The easiest way to do this is to wrap all
428 // write operations in a transaction.
429 //
430 // Put(), Merge(), and Delete() only lock a single key per call. Write() will
431 // sort its keys before locking them. This guarantees that TransactionDB write
432 // methods cannot deadlock with each other (but still could deadlock with a
433 // Transaction).
434 Status PessimisticTransactionDB::Put(const WriteOptions& options,
435 ColumnFamilyHandle* column_family,
436 const Slice& key, const Slice& val) {
437 Status s;
438
439 Transaction* txn = BeginInternalTransaction(options);
440 txn->DisableIndexing();
441
442 // Since the client didn't create a transaction, they don't care about
443 // conflict checking for this write. So we just need to do PutUntracked().
444 s = txn->PutUntracked(column_family, key, val);
445
446 if (s.ok()) {
447 s = txn->Commit();
448 }
449
450 delete txn;
451
452 return s;
453 }
454
455 Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
456 ColumnFamilyHandle* column_family,
457 const Slice& key) {
458 Status s;
459
460 Transaction* txn = BeginInternalTransaction(wopts);
461 txn->DisableIndexing();
462
463 // Since the client didn't create a transaction, they don't care about
464 // conflict checking for this write. So we just need to do
465 // DeleteUntracked().
466 s = txn->DeleteUntracked(column_family, key);
467
468 if (s.ok()) {
469 s = txn->Commit();
470 }
471
472 delete txn;
473
474 return s;
475 }
476
477 Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
478 ColumnFamilyHandle* column_family,
479 const Slice& key) {
480 Status s;
481
482 Transaction* txn = BeginInternalTransaction(wopts);
483 txn->DisableIndexing();
484
485 // Since the client didn't create a transaction, they don't care about
486 // conflict checking for this write. So we just need to do
487 // SingleDeleteUntracked().
488 s = txn->SingleDeleteUntracked(column_family, key);
489
490 if (s.ok()) {
491 s = txn->Commit();
492 }
493
494 delete txn;
495
496 return s;
497 }
498
499 Status PessimisticTransactionDB::Merge(const WriteOptions& options,
500 ColumnFamilyHandle* column_family,
501 const Slice& key, const Slice& value) {
502 Status s;
503
504 Transaction* txn = BeginInternalTransaction(options);
505 txn->DisableIndexing();
506
507 // Since the client didn't create a transaction, they don't care about
508 // conflict checking for this write. So we just need to do
509 // MergeUntracked().
510 s = txn->MergeUntracked(column_family, key, value);
511
512 if (s.ok()) {
513 s = txn->Commit();
514 }
515
516 delete txn;
517
518 return s;
519 }
520
521 Status PessimisticTransactionDB::Write(const WriteOptions& opts,
522 WriteBatch* updates) {
523 return WriteWithConcurrencyControl(opts, updates);
524 }
525
526 Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
527 WriteBatch* updates) {
528 if (txn_db_options_.skip_concurrency_control) {
529 return db_impl_->Write(opts, updates);
530 } else {
531 return WriteWithConcurrencyControl(opts, updates);
532 }
533 }
534
535 Status WriteCommittedTxnDB::Write(
536 const WriteOptions& opts,
537 const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
538 if (optimizations.skip_concurrency_control) {
539 return db_impl_->Write(opts, updates);
540 } else {
541 return WriteWithConcurrencyControl(opts, updates);
542 }
543 }
544
545 void PessimisticTransactionDB::InsertExpirableTransaction(
546 TransactionID tx_id, PessimisticTransaction* tx) {
547 assert(tx->GetExpirationTime() > 0);
548 std::lock_guard<std::mutex> lock(map_mutex_);
549 expirable_transactions_map_.insert({tx_id, tx});
550 }
551
552 void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
553 std::lock_guard<std::mutex> lock(map_mutex_);
554 expirable_transactions_map_.erase(tx_id);
555 }
556
557 bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
558 TransactionID tx_id) {
559 std::lock_guard<std::mutex> lock(map_mutex_);
560
561 auto tx_it = expirable_transactions_map_.find(tx_id);
562 if (tx_it == expirable_transactions_map_.end()) {
563 return true;
564 }
565 PessimisticTransaction& tx = *(tx_it->second);
566 return tx.TryStealingLocks();
567 }
568
569 void PessimisticTransactionDB::ReinitializeTransaction(
570 Transaction* txn, const WriteOptions& write_options,
571 const TransactionOptions& txn_options) {
572 auto txn_impl =
573 static_cast_with_check<PessimisticTransaction, Transaction>(txn);
574
575 txn_impl->Reinitialize(this, write_options, txn_options);
576 }
577
578 Transaction* PessimisticTransactionDB::GetTransactionByName(
579 const TransactionName& name) {
580 std::lock_guard<std::mutex> lock(name_map_mutex_);
581 auto it = transactions_.find(name);
582 if (it == transactions_.end()) {
583 return nullptr;
584 } else {
585 return it->second;
586 }
587 }
588
589 void PessimisticTransactionDB::GetAllPreparedTransactions(
590 std::vector<Transaction*>* transv) {
591 assert(transv);
592 transv->clear();
593 std::lock_guard<std::mutex> lock(name_map_mutex_);
594 for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
595 if (it->second->GetState() == Transaction::PREPARED) {
596 transv->push_back(it->second);
597 }
598 }
599 }
600
601 TransactionLockMgr::LockStatusData
602 PessimisticTransactionDB::GetLockStatusData() {
603 return lock_mgr_.GetLockStatusData();
604 }
605
606 std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
607 return lock_mgr_.GetDeadlockInfoBuffer();
608 }
609
610 void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
611 lock_mgr_.Resize(target_size);
612 }
613
614 void PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
615 assert(txn);
616 assert(txn->GetName().length() > 0);
617 assert(GetTransactionByName(txn->GetName()) == nullptr);
618 assert(txn->GetState() == Transaction::STARTED);
619 std::lock_guard<std::mutex> lock(name_map_mutex_);
620 transactions_[txn->GetName()] = txn;
621 }
622
623 void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
624 assert(txn);
625 std::lock_guard<std::mutex> lock(name_map_mutex_);
626 auto it = transactions_.find(txn->GetName());
627 assert(it != transactions_.end());
628 transactions_.erase(it);
629 }
630
631 } // namespace ROCKSDB_NAMESPACE
632 #endif // ROCKSDB_LITE