]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl_write.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl_write.cc
index 29b54bfd1e2dec8fe9074ad35fbcd71c16ff61a4..21a9378d21ffdf5a2257457bdf1c8af53c178495 100644 (file)
@@ -146,17 +146,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
 
     if (write_thread_.CompleteParallelMemTableWriter(&w)) {
       // we're responsible for exit batch group
-      for (auto* writer : *(w.write_group)) {
-        if (!writer->CallbackFailed() && writer->pre_release_callback) {
-          assert(writer->sequence != kMaxSequenceNumber);
-          Status ws = writer->pre_release_callback->Callback(writer->sequence,
-                                                             disable_memtable);
-          if (!ws.ok()) {
-            status = ws;
-            break;
-          }
-        }
-      }
       // TODO(myabandeh): propagate status to write_group
       auto last_sequence = w.write_group->last_sequence;
       versions_->SetLastSequence(last_sequence);
@@ -279,7 +268,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
                         concurrent_update);
       RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
     }
-    MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+    RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
 
     if (write_options.disableWAL) {
       has_unpersisted_data_.store(true, std::memory_order_relaxed);
@@ -309,6 +298,35 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
     const SequenceNumber current_sequence = last_sequence + 1;
     last_sequence += seq_inc;
 
+    // PreReleaseCallback is called after WAL write and before memtable write
+    if (status.ok()) {
+      SequenceNumber next_sequence = current_sequence;
+      // Note: the logic for advancing seq here must be consistent with the
+      // logic in WriteBatchInternal::InsertInto(write_group...) as well as
+      // with WriteBatchInternal::InsertInto(write_batch...) that is called on
+      // the merged batch during recovery from the WAL.
+      for (auto* writer : write_group) {
+        if (writer->CallbackFailed()) {
+          continue;
+        }
+        writer->sequence = next_sequence;
+        if (writer->pre_release_callback) {
+          Status ws = writer->pre_release_callback->Callback(
+              writer->sequence, disable_memtable, writer->log_used);
+          if (!ws.ok()) {
+            status = ws;
+            break;
+          }
+        }
+        if (seq_per_batch_) {
+          assert(writer->batch_cnt);
+          next_sequence += writer->batch_cnt;
+        } else if (writer->ShouldWriteToMemtable()) {
+          next_sequence += WriteBatchInternal::Count(writer->batch);
+        }
+      }
+    }
+
     if (status.ok()) {
       PERF_TIMER_GUARD(write_memtable_time);
 
@@ -320,26 +338,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
             0 /*recovery_log_number*/, this, parallel, seq_per_batch_,
             batch_per_txn_);
       } else {
-        SequenceNumber next_sequence = current_sequence;
-        // Note: the logic for advancing seq here must be consistent with the
-        // logic in WriteBatchInternal::InsertInto(write_group...) as well as
-        // with WriteBatchInternal::InsertInto(write_batch...) that is called on
-        // the merged batch during recovery from the WAL.
-        for (auto* writer : write_group) {
-          if (writer->CallbackFailed()) {
-            continue;
-          }
-          writer->sequence = next_sequence;
-          if (seq_per_batch_) {
-            assert(writer->batch_cnt);
-            next_sequence += writer->batch_cnt;
-          } else if (writer->ShouldWriteToMemtable()) {
-            next_sequence += WriteBatchInternal::Count(writer->batch);
-          }
-        }
         write_group.last_sequence = last_sequence;
-        write_group.running.store(static_cast<uint32_t>(write_group.size),
-                                  std::memory_order_relaxed);
         write_thread_.LaunchParallelMemTableWriters(&write_group);
         in_parallel_group = true;
 
@@ -390,17 +389,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   }
   if (should_exit_batch_group) {
     if (status.ok()) {
-      for (auto* writer : write_group) {
-        if (!writer->CallbackFailed() && writer->pre_release_callback) {
-          assert(writer->sequence != kMaxSequenceNumber);
-          Status ws = writer->pre_release_callback->Callback(writer->sequence,
-                                                             disable_memtable);
-          if (!ws.ok()) {
-            status = ws;
-            break;
-          }
-        }
-      }
+      // Note: if we are to resume after non-OK statuses we need to revisit how
+      // we reacts to non-OK statuses here.
       versions_->SetLastSequence(last_sequence);
     }
     MemTableInsertStatusCheck(w.status);
@@ -473,11 +463,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
     RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
     stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
     RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
-    MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+    RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
 
     PERF_TIMER_STOP(write_pre_and_post_process_time);
 
-    if (w.ShouldWriteToWAL()) {
+    if (w.status.ok() && !write_options.disableWAL) {
       PERF_TIMER_GUARD(write_wal_time);
       stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
       RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
@@ -506,7 +496,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
   WriteThread::WriteGroup memtable_write_group;
   if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
     PERF_TIMER_GUARD(write_memtable_time);
-    assert(w.status.ok());
+    assert(w.ShouldWriteToMemtable());
     write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
     if (memtable_write_group.size > 1 &&
         immutable_db_options_.allow_concurrent_memtable_write) {
@@ -604,7 +594,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
                       concurrent_update);
     RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
   }
-  MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
+  RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);
 
   PERF_TIMER_STOP(write_pre_and_post_process_time);
 
@@ -659,8 +649,8 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
       if (!writer->CallbackFailed() && writer->pre_release_callback) {
         assert(writer->sequence != kMaxSequenceNumber);
         const bool DISABLE_MEMTABLE = true;
-        Status ws = writer->pre_release_callback->Callback(writer->sequence,
-                                                           DISABLE_MEMTABLE);
+        Status ws = writer->pre_release_callback->Callback(
+            writer->sequence, DISABLE_MEMTABLE, writer->log_used);
         if (!ws.ok()) {
           status = ws;
           break;
@@ -878,7 +868,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
   status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
   if (to_be_cached_state) {
     cached_recoverable_state_ = *to_be_cached_state;
-      cached_recoverable_state_empty_ = false;
+    cached_recoverable_state_empty_ = false;
   }
 
   if (status.ok() && need_log_sync) {
@@ -954,7 +944,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
   status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
   if (to_be_cached_state) {
     cached_recoverable_state_ = *to_be_cached_state;
-      cached_recoverable_state_empty_ = false;
+    cached_recoverable_state_empty_ = false;
   }
   log_write_mutex_.Unlock();
 
@@ -1003,8 +993,9 @@ Status DBImpl::WriteRecoverableState() {
       const bool DISABLE_MEMTABLE = true;
       for (uint64_t sub_batch_seq = seq + 1;
            sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) {
+        uint64_t const no_log_num = 0;
         status = recoverable_state_pre_release_callback_->Callback(
-            sub_batch_seq, !DISABLE_MEMTABLE);
+            sub_batch_seq, !DISABLE_MEMTABLE, no_log_num);
       }
     }
     if (status.ok()) {
@@ -1016,6 +1007,28 @@ Status DBImpl::WriteRecoverableState() {
   return Status::OK();
 }
 
+void DBImpl::SelectColumnFamiliesForAtomicFlush(
+    autovector<ColumnFamilyData*>* cfds) {
+  for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
+    if (cfd->IsDropped()) {
+      continue;
+    }
+    if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
+        !cached_recoverable_state_empty_.load()) {
+      cfds->push_back(cfd);
+    }
+  }
+}
+
+// Assign sequence number for atomic flush.
+void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
+  assert(immutable_db_options_.atomic_flush);
+  auto seq = versions_->LastSequence();
+  for (auto cfd : cfds) {
+    cfd->imm()->AssignAtomicFlushSeq(seq);
+  }
+}
+
 Status DBImpl::SwitchWAL(WriteContext* write_context) {
   mutex_.AssertHeld();
   assert(write_context != nullptr);
@@ -1028,22 +1041,22 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
   auto oldest_alive_log = alive_log_files_.begin()->number;
   bool flush_wont_release_oldest_log = false;
   if (allow_2pc()) {
-    auto oldest_log_with_uncommited_prep =
+    auto oldest_log_with_uncommitted_prep =
         logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
 
-    assert(oldest_log_with_uncommited_prep == 0 ||
-           oldest_log_with_uncommited_prep >= oldest_alive_log);
-    if (oldest_log_with_uncommited_prep > 0 &&
-        oldest_log_with_uncommited_prep == oldest_alive_log) {
+    assert(oldest_log_with_uncommitted_prep == 0 ||
+           oldest_log_with_uncommitted_prep >= oldest_alive_log);
+    if (oldest_log_with_uncommitted_prep > 0 &&
+        oldest_log_with_uncommitted_prep == oldest_alive_log) {
       if (unable_to_release_oldest_log_) {
         // we already attempted to flush all column families dependent on
-        // the oldest alive log but the log still contained uncommited
+        // the oldest alive log but the log still contained uncommitted
         // transactions so there is still nothing that we can do.
         return status;
       } else {
         ROCKS_LOG_WARN(
             immutable_db_options_.info_log,
-            "Unable to release oldest log due to uncommited transaction");
+            "Unable to release oldest log due to uncommitted transaction");
         unable_to_release_oldest_log_ = true;
         flush_wont_release_oldest_log = true;
       }
@@ -1052,33 +1065,49 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
   if (!flush_wont_release_oldest_log) {
     // we only mark this log as getting flushed if we have successfully
     // flushed all data in this log. If this log contains outstanding prepared
-    // transactions then we cannot flush this log until those transactions are commited.
+    // transactions then we cannot flush this log until those transactions are
+    // commited.
     unable_to_release_oldest_log_ = false;
     alive_log_files_.begin()->getting_flushed = true;
   }
 
-  ROCKS_LOG_INFO(immutable_db_options_.info_log,
-                 "Flushing all column families with data in WAL number %" PRIu64
-                 ". Total log size is %" PRIu64
-                 " while max_total_wal_size is %" PRIu64,
-                 oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
+  ROCKS_LOG_INFO(
+      immutable_db_options_.info_log,
+      "Flushing all column families with data in WAL number %" PRIu64
+      ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64,
+      oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
   // no need to refcount because drop is happening in write thread, so can't
   // happen while we're in the write thread
-  FlushRequest flush_req;
-  for (auto cfd : *versions_->GetColumnFamilySet()) {
-    if (cfd->IsDropped()) {
-      continue;
-    }
-    if (cfd->OldestLogToKeep() <= oldest_alive_log) {
-      status = SwitchMemtable(cfd, write_context);
-      if (!status.ok()) {
-        break;
+  autovector<ColumnFamilyData*> cfds;
+  if (immutable_db_options_.atomic_flush) {
+    SelectColumnFamiliesForAtomicFlush(&cfds);
+  } else {
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      if (cfd->IsDropped()) {
+        continue;
+      }
+      if (cfd->OldestLogToKeep() <= oldest_alive_log) {
+        cfds.push_back(cfd);
       }
-      flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
-      cfd->imm()->FlushRequested();
+    }
+  }
+  for (const auto cfd : cfds) {
+    cfd->Ref();
+    status = SwitchMemtable(cfd, write_context);
+    cfd->Unref();
+    if (!status.ok()) {
+      break;
     }
   }
   if (status.ok()) {
+    if (immutable_db_options_.atomic_flush) {
+      AssignAtomicFlushSeq(cfds);
+    }
+    for (auto cfd : cfds) {
+      cfd->imm()->FlushRequested();
+    }
+    FlushRequest flush_req;
+    GenerateFlushRequest(cfds, &flush_req);
     SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
     MaybeScheduleFlushOrCompaction();
   }
@@ -1098,46 +1127,57 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
   ROCKS_LOG_INFO(
       immutable_db_options_.info_log,
       "Flushing column family with largest mem table size. Write buffer is "
-      "using %" PRIu64 " bytes out of a total of %" PRIu64 ".",
+      "using %" ROCKSDB_PRIszt " bytes out of a total of %" ROCKSDB_PRIszt ".",
       write_buffer_manager_->memory_usage(),
       write_buffer_manager_->buffer_size());
   // no need to refcount because drop is happening in write thread, so can't
   // happen while we're in the write thread
-  ColumnFamilyData* cfd_picked = nullptr;
-  SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
+  autovector<ColumnFamilyData*> cfds;
+  if (immutable_db_options_.atomic_flush) {
+    SelectColumnFamiliesForAtomicFlush(&cfds);
+  } else {
+    ColumnFamilyData* cfd_picked = nullptr;
+    SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
 
-  for (auto cfd : *versions_->GetColumnFamilySet()) {
-    if (cfd->IsDropped()) {
-      continue;
-    }
-    if (!cfd->mem()->IsEmpty()) {
-      // We only consider active mem table, hoping immutable memtable is
-      // already in the process of flushing.
-      uint64_t seq = cfd->mem()->GetCreationSeq();
-      if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
-        cfd_picked = cfd;
-        seq_num_for_cf_picked = seq;
+    for (auto cfd : *versions_->GetColumnFamilySet()) {
+      if (cfd->IsDropped()) {
+        continue;
+      }
+      if (!cfd->mem()->IsEmpty()) {
+        // We only consider active mem table, hoping immutable memtable is
+        // already in the process of flushing.
+        uint64_t seq = cfd->mem()->GetCreationSeq();
+        if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) {
+          cfd_picked = cfd;
+          seq_num_for_cf_picked = seq;
+        }
       }
     }
+    if (cfd_picked != nullptr) {
+      cfds.push_back(cfd_picked);
+    }
   }
 
-  autovector<ColumnFamilyData*> cfds;
-  if (cfd_picked != nullptr) {
-    cfds.push_back(cfd_picked);
-  }
-  FlushRequest flush_req;
   for (const auto cfd : cfds) {
+    if (cfd->mem()->IsEmpty()) {
+      continue;
+    }
     cfd->Ref();
     status = SwitchMemtable(cfd, write_context);
     cfd->Unref();
     if (!status.ok()) {
       break;
     }
-    uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
-    cfd->imm()->FlushRequested();
-    flush_req.emplace_back(cfd, flush_memtable_id);
   }
   if (status.ok()) {
+    if (immutable_db_options_.atomic_flush) {
+      AssignAtomicFlushSeq(cfds);
+    }
+    for (const auto cfd : cfds) {
+      cfd->imm()->FlushRequested();
+    }
+    FlushRequest flush_req;
+    GenerateFlushRequest(cfds, &flush_req);
     SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
     MaybeScheduleFlushOrCompaction();
   }
@@ -1260,25 +1300,38 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
 }
 
 Status DBImpl::ScheduleFlushes(WriteContext* context) {
-  ColumnFamilyData* cfd;
-  FlushRequest flush_req;
+  autovector<ColumnFamilyData*> cfds;
+  if (immutable_db_options_.atomic_flush) {
+    SelectColumnFamiliesForAtomicFlush(&cfds);
+    for (auto cfd : cfds) {
+      cfd->Ref();
+    }
+    flush_scheduler_.Clear();
+  } else {
+    ColumnFamilyData* tmp_cfd;
+    while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
+      cfds.push_back(tmp_cfd);
+    }
+  }
   Status status;
-  while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
-    status = SwitchMemtable(cfd, context);
-    bool should_schedule = true;
+  for (auto& cfd : cfds) {
+    if (!cfd->mem()->IsEmpty()) {
+      status = SwitchMemtable(cfd, context);
+    }
     if (cfd->Unref()) {
       delete cfd;
-      should_schedule = false;
+      cfd = nullptr;
     }
     if (!status.ok()) {
       break;
     }
-    if (should_schedule) {
-      uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
-      flush_req.emplace_back(cfd, flush_memtable_id);
-    }
   }
   if (status.ok()) {
+    if (immutable_db_options_.atomic_flush) {
+      AssignAtomicFlushSeq(cfds);
+    }
+    FlushRequest flush_req;
+    GenerateFlushRequest(cfds, &flush_req);
     SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
     MaybeScheduleFlushOrCompaction();
   }
@@ -1312,7 +1365,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
     nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
   }
 
-  unique_ptr<WritableFile> lfile;
+  std::unique_ptr<WritableFile> lfile;
   log::Writer* new_log = nullptr;
   MemTable* new_mem = nullptr;
 
@@ -1368,7 +1421,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
   DBOptions db_options =
       BuildDBOptions(immutable_db_options_, mutable_db_options_);
   const auto preallocate_block_size =
-    GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
+      GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size);
   auto write_hint = CalculateWALWriteHint();
   mutex_.Unlock();
   {
@@ -1396,8 +1449,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
         // of calling GetWalPreallocateBlockSize()
         lfile->SetPreallocationBlockSize(preallocate_block_size);
         lfile->SetWriteLifeTimeHint(write_hint);
-        unique_ptr<WritableFileWriter> file_writer(
-            new WritableFileWriter(std::move(lfile), log_fname, opt_env_opt));
+        std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+            std::move(lfile), log_fname, opt_env_opt, env_, nullptr /* stats */,
+            immutable_db_options_.listeners));
         new_log = new log::Writer(
             std::move(file_writer), new_log_number,
             immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
@@ -1409,13 +1463,6 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
       new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq);
       context->superversion_context.NewSuperVersion();
     }
-
-#ifndef ROCKSDB_LITE
-    // PLEASE NOTE: We assume that there are no failable operations
-    // after lock is acquired below since we are already notifying
-    // client about mem table becoming immutable.
-    NotifyOnMemTableSealed(cfd, memtable_info);
-#endif //ROCKSDB_LITE
   }
   ROCKS_LOG_INFO(immutable_db_options_.info_log,
                  "[%s] New memtable created with log file: #%" PRIu64
@@ -1424,10 +1471,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
   mutex_.Lock();
   if (s.ok() && creating_new_log) {
     log_write_mutex_.Lock();
-    logfile_number_ = new_log_number;
     assert(new_log != nullptr);
-    log_empty_ = true;
-    log_dir_synced_ = false;
     if (!logs_.empty()) {
       // Alway flush the buffer of the last log before switching to a new one
       log::Writer* cur_log_writer = logs_.back().writer;
@@ -1435,21 +1479,41 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
       if (!s.ok()) {
         ROCKS_LOG_WARN(immutable_db_options_.info_log,
                        "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
-                       "  WAL file -- %s\n",
+                       "  WAL file\n",
                        cfd->GetName().c_str(), cur_log_writer->get_log_number(),
                        new_log_number);
       }
     }
-    logs_.emplace_back(logfile_number_, new_log);
-    alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
+    if (s.ok()) {
+      logfile_number_ = new_log_number;
+      log_empty_ = true;
+      log_dir_synced_ = false;
+      logs_.emplace_back(logfile_number_, new_log);
+      alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
+    }
     log_write_mutex_.Unlock();
   }
 
   if (!s.ok()) {
     // how do we fail if we're not creating new log?
     assert(creating_new_log);
-    assert(!new_mem);
-    assert(!new_log);
+    if (new_mem) {
+      delete new_mem;
+    }
+    if (new_log) {
+      delete new_log;
+    }
+    SuperVersion* new_superversion =
+        context->superversion_context.new_superversion.release();
+    if (new_superversion != nullptr) {
+      delete new_superversion;
+    }
+    // We may have lost data from the WritableFileBuffer in-memory buffer for
+    // the current log, so treat it as a fatal error and set bg_error
+    error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
+    // Read back bg_error in order to get the right severity
+    s = error_handler_.GetBGError();
+
     if (two_write_queues_) {
       nonmem_write_thread_.ExitUnbatched(&nonmem_w);
     }
@@ -1476,6 +1540,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
   cfd->SetMemtable(new_mem);
   InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context,
                                      mutable_cf_options);
+#ifndef ROCKSDB_LITE
+  mutex_.Unlock();
+  // Notify client that memtable is sealed, now that we have successfully
+  // installed a new memtable
+  NotifyOnMemTableSealed(cfd, memtable_info);
+  mutex_.Lock();
+#endif  // ROCKSDB_LITE
   if (two_write_queues_) {
     nonmem_write_thread_.ExitUnbatched(&nonmem_w);
   }
@@ -1484,13 +1555,13 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
 
 size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const {
   mutex_.AssertHeld();
-  size_t bsize = static_cast<size_t>(
-    write_buffer_size / 10 + write_buffer_size);
+  size_t bsize =
+      static_cast<size_t>(write_buffer_size / 10 + write_buffer_size);
   // Some users might set very high write_buffer_size and rely on
   // max_total_wal_size or other parameters to control the WAL size.
   if (mutable_db_options_.max_total_wal_size > 0) {
-    bsize = std::min<size_t>(bsize, static_cast<size_t>(
-      mutable_db_options_.max_total_wal_size));
+    bsize = std::min<size_t>(
+        bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size));
   }
   if (immutable_db_options_.db_write_buffer_size > 0) {
     bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size);