]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl/db_impl_write.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_impl / db_impl_write.cc
index 8f6f685e481538b562f894a2a5a5ee21eb560699..1cab2b6c05068ba886f887bf19efc08f69e2f941 100644 (file)
@@ -6,14 +6,15 @@
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
-#include "db/db_impl/db_impl.h"
-
 #include <cinttypes>
+
+#include "db/db_impl/db_impl.h"
 #include "db/error_handler.h"
 #include "db/event_helpers.h"
 #include "monitoring/perf_context_imp.h"
 #include "options/options_helper.h"
 #include "test_util/sync_point.h"
+#include "util/cast_util.h"
 
 namespace ROCKSDB_NAMESPACE {
 // Convenience methods
@@ -24,7 +25,7 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
 
 Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
                      const Slice& key, const Slice& val) {
-  auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
+  auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
   if (!cfh->cfd()->ioptions()->merge_operator) {
     return Status::NotSupported("Provide a merge_operator when opening DB");
   } else {
@@ -76,7 +77,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   if (tracer_) {
     InstrumentedMutexLock lock(&trace_mutex_);
     if (tracer_) {
-      tracer_->Write(my_batch);
+      // TODO: maybe handle the tracing status?
+      tracer_->Write(my_batch).PermitUncheckedError();
     }
   }
   if (write_options.sync && write_options.disableWAL) {
@@ -100,11 +102,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
          disable_memtable);
 
-  Status status;
   if (write_options.low_pri) {
-    status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
-    if (!status.ok()) {
-      return status;
+    Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
+    if (!s.ok()) {
+      return s;
     }
   }
 
@@ -124,13 +125,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
                                      ? batch_cnt
                                      // every key is a sub-batch consuming a seq
                                      : WriteBatchInternal::Count(my_batch);
-    uint64_t seq;
+    uint64_t seq = 0;
     // Use a write thread to i) optimize for WAL write, ii) publish last
     // sequence in in increasing order, iii) call pre_release_callback serially
-    status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
-                              log_used, log_ref, &seq, sub_batch_cnt,
-                              pre_release_callback, kDoAssignOrder,
-                              kDoPublishLastSeq, disable_memtable);
+    Status status = WriteImplWALOnly(
+        &write_thread_, write_options, my_batch, callback, log_used, log_ref,
+        &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder,
+        kDoPublishLastSeq, disable_memtable);
     TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
     if (!status.ok()) {
       return status;
@@ -162,6 +163,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
 
   write_thread_.JoinBatchGroup(&w);
+  Status status;
   if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
     // we are a non-leader in a parallel group
 
@@ -202,6 +204,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
       *seq_used = w.sequence;
     }
     // write is complete and leader has updated sequence
+    // Should we handle it?
+    status.PermitUncheckedError();
     return w.FinalStatus();
   }
   // else we are the leader of the write batch group
@@ -250,6 +254,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   last_batch_group_size_ =
       write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
 
+  IOStatus io_s;
   if (status.ok()) {
     // Rules for when we can update the memtable concurrently
     // 1. supported by memtable
@@ -322,21 +327,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
     if (!two_write_queues_) {
       if (status.ok() && !write_options.disableWAL) {
         PERF_TIMER_GUARD(write_wal_time);
-        status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
-                            need_log_dir_sync, last_sequence + 1);
+        io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
+                          need_log_dir_sync, last_sequence + 1);
       }
     } else {
       if (status.ok() && !write_options.disableWAL) {
         PERF_TIMER_GUARD(write_wal_time);
         // LastAllocatedSequence is increased inside WriteToWAL under
         // wal_write_mutex_ to ensure ordered events in WAL
-        status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
-                                      seq_inc);
+        io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
+                                    seq_inc);
       } else {
         // Otherwise we inc seq number for memtable writes
         last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
       }
     }
+    status = io_s;
     assert(last_sequence != kMaxSequenceNumber);
     const SequenceNumber current_sequence = last_sequence + 1;
     last_sequence += seq_inc;
@@ -411,12 +417,20 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
   PERF_TIMER_START(write_pre_and_post_process_time);
 
   if (!w.CallbackFailed()) {
-    WriteStatusCheck(status);
+    if (!io_s.ok()) {
+      IOStatusCheck(io_s);
+    } else {
+      WriteStatusCheck(status);
+    }
   }
 
   if (need_log_sync) {
     mutex_.Lock();
-    MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
+    if (status.ok()) {
+      status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
+    } else {
+      MarkLogsNotSynced(logfile_number_);
+    }
     mutex_.Unlock();
     // Requesting sync with two_write_queues_ is expected to be very rare. We
     // hence provide a simple implementation that is not necessarily efficient.
@@ -463,6 +477,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
   WriteThread::Writer w(write_options, my_batch, callback, log_ref,
                         disable_memtable);
   write_thread_.JoinBatchGroup(&w);
+  TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup");
   if (w.state == WriteThread::STATE_GROUP_LEADER) {
     WriteThread::WriteGroup wal_write_group;
     if (w.callback && !w.callback->AllowWriteBatching()) {
@@ -515,6 +530,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
 
     PERF_TIMER_STOP(write_pre_and_post_process_time);
 
+    IOStatus io_s;
     if (w.status.ok() && !write_options.disableWAL) {
       PERF_TIMER_GUARD(write_wal_time);
       stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
@@ -524,17 +540,26 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
                           wal_write_group.size - 1);
         RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
       }
-      w.status = WriteToWAL(wal_write_group, log_writer, log_used,
-                            need_log_sync, need_log_dir_sync, current_sequence);
+      io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
+                        need_log_dir_sync, current_sequence);
+      w.status = io_s;
     }
 
     if (!w.CallbackFailed()) {
-      WriteStatusCheck(w.status);
+      if (!io_s.ok()) {
+        IOStatusCheck(io_s);
+      } else {
+        WriteStatusCheck(w.status);
+      }
     }
 
     if (need_log_sync) {
       mutex_.Lock();
-      MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
+      if (w.status.ok()) {
+        w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
+      } else {
+        MarkLogsNotSynced(logfile_number_);
+      }
       mutex_.Unlock();
     }
 
@@ -610,8 +635,6 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
         0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
         seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/,
         write_options.memtable_insert_hint_per_batch);
-
-    WriteStatusCheck(w.status);
     if (write_options.disableWAL) {
       has_unpersisted_data_.store(true, std::memory_order_relaxed);
     }
@@ -626,6 +649,7 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
     std::lock_guard<std::mutex> lck(switch_mutex_);
     switch_cv_.notify_all();
   }
+  WriteStatusCheck(w.status);
 
   if (!w.FinalStatus().ok()) {
     return w.FinalStatus();
@@ -676,7 +700,7 @@ Status DBImpl::WriteImplWALOnly(
       InstrumentedMutexLock l(&mutex_);
       bool need_log_sync = false;
       status = PreprocessWrite(write_options, &need_log_sync, &write_context);
-      WriteStatusCheck(status);
+      WriteStatusCheckOnLocked(status);
     }
     if (!status.ok()) {
       WriteThread::WriteGroup write_group;
@@ -740,9 +764,10 @@ Status DBImpl::WriteImplWALOnly(
     }
     seq_inc = total_batch_cnt;
   }
+  IOStatus io_s;
   if (!write_options.disableWAL) {
-    status =
-        ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
+    io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
+    status = io_s;
   } else {
     // Otherwise we inc seq number to do solely the seq allocation
     last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
@@ -777,7 +802,11 @@ Status DBImpl::WriteImplWALOnly(
   PERF_TIMER_START(write_pre_and_post_process_time);
 
   if (!w.CallbackFailed()) {
-    WriteStatusCheck(status);
+    if (!io_s.ok()) {
+      IOStatusCheck(io_s);
+    } else {
+      WriteStatusCheck(status);
+    }
   }
   if (status.ok()) {
     size_t index = 0;
@@ -812,13 +841,44 @@ Status DBImpl::WriteImplWALOnly(
   return status;
 }
 
+void DBImpl::WriteStatusCheckOnLocked(const Status& status) {
+  // Is setting bg_error_ enough here?  This will at least stop
+  // compaction and fail any further writes.
+  // Caller must hold mutex_.
+  assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
+  mutex_.AssertHeld();
+  if (immutable_db_options_.paranoid_checks && !status.ok() &&
+      !status.IsBusy() && !status.IsIncomplete()) {
+    // Maybe change the return status to void?
+    error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback)
+        .PermitUncheckedError();
+  }
+}
+
 void DBImpl::WriteStatusCheck(const Status& status) {
   // Is setting bg_error_ enough here?  This will at least stop
   // compaction and fail any further writes.
+  assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok());
   if (immutable_db_options_.paranoid_checks && !status.ok() &&
       !status.IsBusy() && !status.IsIncomplete()) {
     mutex_.Lock();
-    error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
+    // Maybe change the return status to void?
+    error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback)
+        .PermitUncheckedError();
+    mutex_.Unlock();
+  }
+}
+
+void DBImpl::IOStatusCheck(const IOStatus& io_status) {
+  // Is setting bg_error_ enough here?  This will at least stop
+  // compaction and fail any further writes.
+  if ((immutable_db_options_.paranoid_checks && !io_status.ok() &&
+       !io_status.IsBusy() && !io_status.IsIncomplete()) ||
+      io_status.IsIOFenced()) {
+    mutex_.Lock();
+    // Maybe change the return status to void?
+    error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback)
+        .PermitUncheckedError();
     mutex_.Unlock();
   }
 }
@@ -832,7 +892,9 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) {
   if (!status.ok()) {
     mutex_.Lock();
     assert(!error_handler_.IsBGWorkStopped());
-    error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
+    // Maybe change the return status to void?
+    error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable)
+        .PermitUncheckedError();
     mutex_.Unlock();
   }
 }
@@ -946,8 +1008,10 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
     merged_batch = tmp_batch;
     for (auto writer : write_group) {
       if (!writer->CallbackFailed()) {
-        WriteBatchInternal::Append(merged_batch, writer->batch,
-                                   /*WAL_only*/ true);
+        Status s = WriteBatchInternal::Append(merged_batch, writer->batch,
+                                              /*WAL_only*/ true);
+        // Always returns Status::OK.
+        assert(s.ok());
         if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) {
           // We only need to cache the last of such write batch
           *to_be_cached_state = writer->batch;
@@ -961,9 +1025,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
 
 // When two_write_queues_ is disabled, this function is called from the only
 // write thread. Otherwise this must be called holding log_write_mutex_.
-Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
-                          log::Writer* log_writer, uint64_t* log_used,
-                          uint64_t* log_size) {
+IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
+                            log::Writer* log_writer, uint64_t* log_used,
+                            uint64_t* log_size) {
   assert(log_size != nullptr);
   Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
   *log_size = log_entry.size();
@@ -978,7 +1042,8 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
   if (UNLIKELY(needs_locking)) {
     log_write_mutex_.Lock();
   }
-  Status status = log_writer->AddRecord(log_entry);
+  IOStatus io_s = log_writer->AddRecord(log_entry);
+
   if (UNLIKELY(needs_locking)) {
     log_write_mutex_.Unlock();
   }
@@ -990,15 +1055,14 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
   // since alive_log_files_ might be modified concurrently
   alive_log_files_.back().AddSize(log_entry.size());
   log_empty_ = false;
-  return status;
+  return io_s;
 }
 
-Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
-                          log::Writer* log_writer, uint64_t* log_used,
-                          bool need_log_sync, bool need_log_dir_sync,
-                          SequenceNumber sequence) {
-  Status status;
-
+IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
+                            log::Writer* log_writer, uint64_t* log_used,
+                            bool need_log_sync, bool need_log_dir_sync,
+                            SequenceNumber sequence) {
+  IOStatus io_s;
   assert(!write_group.leader->disable_wal);
   // Same holds for all in the batch group
   size_t write_with_wal = 0;
@@ -1016,13 +1080,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
   WriteBatchInternal::SetSequence(merged_batch, sequence);
 
   uint64_t log_size;
-  status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
+  io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
   if (to_be_cached_state) {
     cached_recoverable_state_ = *to_be_cached_state;
     cached_recoverable_state_empty_ = false;
   }
 
-  if (status.ok() && need_log_sync) {
+  if (io_s.ok() && need_log_sync) {
     StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
     // It's safe to access logs_ with unlocked mutex_ here because:
     //  - we've set getting_synced=true for all logs,
@@ -1032,23 +1096,24 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
     //  - as long as other threads don't modify it, it's safe to read
     //    from std::deque from multiple threads concurrently.
     for (auto& log : logs_) {
-      status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
-      if (!status.ok()) {
+      io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
+      if (!io_s.ok()) {
         break;
       }
     }
-    if (status.ok() && need_log_dir_sync) {
+
+    if (io_s.ok() && need_log_dir_sync) {
       // We only sync WAL directory the first time WAL syncing is
       // requested, so that in case users never turn on WAL sync,
       // we can avoid the disk I/O in the write code path.
-      status = directories_.GetWalDir()->Fsync();
+      io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
     }
   }
 
   if (merged_batch == &tmp_batch_) {
     tmp_batch_.Clear();
   }
-  if (status.ok()) {
+  if (io_s.ok()) {
     auto stats = default_cf_internal_stats_;
     if (need_log_sync) {
       stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
@@ -1059,14 +1124,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
     stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
     RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
   }
-  return status;
+  return io_s;
 }
 
-Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
-                                    uint64_t* log_used,
-                                    SequenceNumber* last_sequence,
-                                    size_t seq_inc) {
-  Status status;
+IOStatus DBImpl::ConcurrentWriteToWAL(
+    const WriteThread::WriteGroup& write_group, uint64_t* log_used,
+    SequenceNumber* last_sequence, size_t seq_inc) {
+  IOStatus io_s;
 
   assert(!write_group.leader->disable_wal);
   // Same holds for all in the batch group
@@ -1092,14 +1156,14 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
 
   log::Writer* log_writer = logs_.back().writer;
   uint64_t log_size;
-  status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
+  io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
   if (to_be_cached_state) {
     cached_recoverable_state_ = *to_be_cached_state;
     cached_recoverable_state_empty_ = false;
   }
   log_write_mutex_.Unlock();
 
-  if (status.ok()) {
+  if (io_s.ok()) {
     const bool concurrent = true;
     auto stats = default_cf_internal_stats_;
     stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
@@ -1109,7 +1173,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
                       concurrent);
     RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
   }
-  return status;
+  return io_s;
 }
 
 Status DBImpl::WriteRecoverableState() {
@@ -1517,11 +1581,14 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) {
   }
   for (auto& cfd : cfds) {
     autovector<MemTable*> to_delete;
-    cfd->imm()->TrimHistory(&to_delete, cfd->mem()->ApproximateMemoryUsage());
+    bool trimmed = cfd->imm()->TrimHistory(
+        &to_delete, cfd->mem()->ApproximateMemoryUsage());
     if (!to_delete.empty()) {
       for (auto m : to_delete) {
         delete m;
       }
+    }
+    if (trimmed) {
       context->superversion_context.NewSuperVersion();
       assert(context->superversion_context.new_superversion.get() != nullptr);
       cfd->InstallSuperVersion(&context->superversion_context, &mutex_);
@@ -1609,6 +1676,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
   std::unique_ptr<WritableFile> lfile;
   log::Writer* new_log = nullptr;
   MemTable* new_mem = nullptr;
+  IOStatus io_s;
 
   // Recoverable state is persisted in WAL. After memtable switch, WAL might
   // be deleted, so we write the state to memtable to be persisted as well.
@@ -1654,8 +1722,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
   if (creating_new_log) {
     // TODO: Write buffer size passed in should be max of all CF's instead
     // of mutable_cf_options.write_buffer_size.
-    s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
-                  &new_log);
+    io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,
+                     &new_log);
+    if (s.ok()) {
+      s = io_s;
+    }
   }
   if (s.ok()) {
     SequenceNumber seq = versions_->LastSequence();
@@ -1681,7 +1752,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
     if (!logs_.empty()) {
       // Alway flush the buffer of the last log before switching to a new one
       log::Writer* cur_log_writer = logs_.back().writer;
-      s = cur_log_writer->WriteBuffer();
+      io_s = cur_log_writer->WriteBuffer();
+      if (s.ok()) {
+        s = io_s;
+      }
       if (!s.ok()) {
         ROCKS_LOG_WARN(immutable_db_options_.info_log,
                        "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64
@@ -1716,7 +1790,16 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
     }
     // We may have lost data from the WritableFileBuffer in-memory buffer for
     // the current log, so treat it as a fatal error and set bg_error
-    error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable);
+    // Should handle return error?
+    if (!io_s.ok()) {
+      // Should handle return error?
+      error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable)
+          .PermitUncheckedError();
+    } else {
+      // Should handle return error?
+      error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable)
+          .PermitUncheckedError();
+    }
     // Read back bg_error in order to get the right severity
     s = error_handler_.GetBGError();
     return s;
@@ -1749,6 +1832,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
   NotifyOnMemTableSealed(cfd, memtable_info);
   mutex_.Lock();
 #endif  // ROCKSDB_LITE
+  // It is possible that we got here without checking the value of i_os, but
+  // that is okay.  If we did, it most likely means that s was already an error.
+  // In any case, ignore any unchecked error for i_os here.
+  io_s.PermitUncheckedError();
   return s;
 }
 
@@ -1792,6 +1879,8 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
   const Slice* ts = opt.timestamp;
   assert(nullptr != ts);
   size_t ts_sz = ts->size();
+  assert(column_family->GetComparator());
+  assert(ts_sz == column_family->GetComparator()->timestamp_size());
   WriteBatch batch(key.size() + ts_sz + value.size() + 24, /*max_bytes=*/0,
                    ts_sz);
   Status s = batch.Put(column_family, key, value);
@@ -1807,15 +1896,40 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
 
 Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
                   const Slice& key) {
-  WriteBatch batch;
-  batch.Delete(column_family, key);
+  if (nullptr == opt.timestamp) {
+    WriteBatch batch;
+    Status s = batch.Delete(column_family, key);
+    if (!s.ok()) {
+      return s;
+    }
+    return Write(opt, &batch);
+  }
+  const Slice* ts = opt.timestamp;
+  assert(ts != nullptr);
+  const size_t ts_sz = ts->size();
+  constexpr size_t kKeyAndValueLenSize = 11;
+  constexpr size_t kWriteBatchOverhead =
+      WriteBatchInternal::kHeader + sizeof(ValueType) + kKeyAndValueLenSize;
+  WriteBatch batch(key.size() + ts_sz + kWriteBatchOverhead, /*max_bytes=*/0,
+                   ts_sz);
+  Status s = batch.Delete(column_family, key);
+  if (!s.ok()) {
+    return s;
+  }
+  s = batch.AssignTimestamp(*ts);
+  if (!s.ok()) {
+    return s;
+  }
   return Write(opt, &batch);
 }
 
 Status DB::SingleDelete(const WriteOptions& opt,
                         ColumnFamilyHandle* column_family, const Slice& key) {
   WriteBatch batch;
-  batch.SingleDelete(column_family, key);
+  Status s = batch.SingleDelete(column_family, key);
+  if (!s.ok()) {
+    return s;
+  }
   return Write(opt, &batch);
 }
 
@@ -1823,7 +1937,10 @@ Status DB::DeleteRange(const WriteOptions& opt,
                        ColumnFamilyHandle* column_family,
                        const Slice& begin_key, const Slice& end_key) {
   WriteBatch batch;
-  batch.DeleteRange(column_family, begin_key, end_key);
+  Status s = batch.DeleteRange(column_family, begin_key, end_key);
+  if (!s.ok()) {
+    return s;
+  }
   return Write(opt, &batch);
 }