]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/builder.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / builder.cc
diff --git a/ceph/src/rocksdb/db/builder.cc b/ceph/src/rocksdb/db/builder.cc
new file mode 100644 (file)
index 0000000..65ffd44
--- /dev/null
@@ -0,0 +1,226 @@
+//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
+//  This source code is licensed under the BSD-style license found in the
+//  LICENSE file in the root directory of this source tree. An additional grant
+//  of patent rights can be found in the PATENTS file in the same directory.
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/builder.h"
+
+#include <algorithm>
+#include <deque>
+#include <vector>
+
+#include "db/compaction_iterator.h"
+#include "db/dbformat.h"
+#include "db/event_helpers.h"
+#include "db/internal_stats.h"
+#include "db/merge_helper.h"
+#include "db/table_cache.h"
+#include "db/version_edit.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/thread_status_util.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/options.h"
+#include "rocksdb/table.h"
+#include "table/block_based_table_builder.h"
+#include "table/internal_iterator.h"
+#include "util/file_reader_writer.h"
+#include "util/filename.h"
+#include "util/stop_watch.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+class TableFactory;
+
+TableBuilder* NewTableBuilder(
+    const ImmutableCFOptions& ioptions,
+    const InternalKeyComparator& internal_comparator,
+    const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+        int_tbl_prop_collector_factories,
+    uint32_t column_family_id, const std::string& column_family_name,
+    WritableFileWriter* file, const CompressionType compression_type,
+    const CompressionOptions& compression_opts,
+    int level,
+    const std::string* compression_dict, const bool skip_filters) {
+  assert((column_family_id ==
+          TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
+         column_family_name.empty());
+  return ioptions.table_factory->NewTableBuilder(
+      TableBuilderOptions(ioptions, internal_comparator,
+                          int_tbl_prop_collector_factories, compression_type,
+                          compression_opts, compression_dict, skip_filters,
+                          column_family_name, level),
+      column_family_id, file);
+}
+
+Status BuildTable(
+    const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
+    const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
+    TableCache* table_cache, InternalIterator* iter,
+    std::unique_ptr<InternalIterator> range_del_iter, FileMetaData* meta,
+    const InternalKeyComparator& internal_comparator,
+    const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
+        int_tbl_prop_collector_factories,
+    uint32_t column_family_id, const std::string& column_family_name,
+    std::vector<SequenceNumber> snapshots,
+    SequenceNumber earliest_write_conflict_snapshot,
+    const CompressionType compression,
+    const CompressionOptions& compression_opts, bool paranoid_file_checks,
+    InternalStats* internal_stats, TableFileCreationReason reason,
+    EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
+    TableProperties* table_properties, int level) {
+  assert((column_family_id ==
+          TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
+         column_family_name.empty());
+  // Reports the IOStats for flush for every following bytes.
+  const size_t kReportFlushIOStatsEvery = 1048576;
+  Status s;
+  meta->fd.file_size = 0;
+  iter->SeekToFirst();
+  std::unique_ptr<RangeDelAggregator> range_del_agg(
+      new RangeDelAggregator(internal_comparator, snapshots));
+  s = range_del_agg->AddTombstones(std::move(range_del_iter));
+  if (!s.ok()) {
+    // may be non-ok if a range tombstone key is unparsable
+    return s;
+  }
+
+  std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
+                                    meta->fd.GetPathId());
+#ifndef ROCKSDB_LITE
+  EventHelpers::NotifyTableFileCreationStarted(
+      ioptions.listeners, dbname, column_family_name, fname, job_id, reason);
+#endif  // !ROCKSDB_LITE
+  TableProperties tp;
+
+  if (iter->Valid() || range_del_agg->ShouldAddTombstones()) {
+    TableBuilder* builder;
+    unique_ptr<WritableFileWriter> file_writer;
+    {
+      unique_ptr<WritableFile> file;
+#ifndef NDEBUG
+      bool use_direct_writes = env_options.use_direct_writes;
+      TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
+#endif  // !NDEBUG
+      s = NewWritableFile(env, fname, &file, env_options);
+      if (!s.ok()) {
+        EventHelpers::LogAndNotifyTableFileCreationFinished(
+            event_logger, ioptions.listeners, dbname, column_family_name, fname,
+            job_id, meta->fd, tp, reason, s);
+        return s;
+      }
+      file->SetIOPriority(io_priority);
+
+      file_writer.reset(new WritableFileWriter(std::move(file), env_options,
+                                               ioptions.statistics));
+
+      builder = NewTableBuilder(
+          ioptions, internal_comparator, int_tbl_prop_collector_factories,
+          column_family_id, column_family_name, file_writer.get(), compression,
+          compression_opts, level);
+    }
+
+    MergeHelper merge(env, internal_comparator.user_comparator(),
+                      ioptions.merge_operator, nullptr, ioptions.info_log,
+                      true /* internal key corruption is not ok */,
+                      snapshots.empty() ? 0 : snapshots.back());
+
+    CompactionIterator c_iter(
+        iter, internal_comparator.user_comparator(), &merge, kMaxSequenceNumber,
+        &snapshots, earliest_write_conflict_snapshot, env,
+        true /* internal key corruption is not ok */, range_del_agg.get());
+    c_iter.SeekToFirst();
+    for (; c_iter.Valid(); c_iter.Next()) {
+      const Slice& key = c_iter.key();
+      const Slice& value = c_iter.value();
+      builder->Add(key, value);
+      meta->UpdateBoundaries(key, c_iter.ikey().sequence);
+
+      // TODO(noetzli): Update stats after flush, too.
+      if (io_priority == Env::IO_HIGH &&
+          IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) {
+        ThreadStatusUtil::SetThreadOperationProperty(
+            ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
+      }
+    }
+    // nullptr for table_{min,max} so all range tombstones will be flushed
+    range_del_agg->AddToBuilder(builder, nullptr /* lower_bound */,
+                                nullptr /* upper_bound */, meta);
+
+    // Finish and check for builder errors
+    bool empty = builder->NumEntries() == 0;
+    s = c_iter.status();
+    if (!s.ok() || empty) {
+      builder->Abandon();
+    } else {
+      s = builder->Finish();
+    }
+
+    if (s.ok() && !empty) {
+      uint64_t file_size = builder->FileSize();
+      meta->fd.file_size = file_size;
+      meta->marked_for_compaction = builder->NeedCompact();
+      assert(meta->fd.GetFileSize() > 0);
+      tp = builder->GetTableProperties();
+      if (table_properties) {
+        *table_properties = tp;
+      }
+    }
+    delete builder;
+
+    // Finish and check for file errors
+    if (s.ok() && !empty) {
+      StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
+      file_writer->Sync(ioptions.use_fsync);
+    }
+    if (s.ok() && !empty) {
+      s = file_writer->Close();
+    }
+
+    if (s.ok() && !empty) {
+      // Verify that the table is usable
+      // We set for_compaction to false and don't OptimizeForCompactionTableRead
+      // here because this is a special case after we finish the table building
+      // No matter whether use_direct_io_for_flush_and_compaction is true,
+      // we will regrad this verification as user reads since the goal is
+      // to cache it here for further user reads
+      std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
+          ReadOptions(), env_options, internal_comparator, meta->fd,
+          nullptr /* range_del_agg */, nullptr,
+          (internal_stats == nullptr) ? nullptr
+                                      : internal_stats->GetFileReadHist(0),
+          false /* for_compaction */, nullptr /* arena */,
+          false /* skip_filter */, level));
+      s = it->status();
+      if (s.ok() && paranoid_file_checks) {
+        for (it->SeekToFirst(); it->Valid(); it->Next()) {
+        }
+        s = it->status();
+      }
+    }
+  }
+
+  // Check for input iterator errors
+  if (!iter->status().ok()) {
+    s = iter->status();
+  }
+
+  if (!s.ok() || meta->fd.GetFileSize() == 0) {
+    env->DeleteFile(fname);
+  }
+
+  // Output to event logger and fire events.
+  EventHelpers::LogAndNotifyTableFileCreationFinished(
+      event_logger, ioptions.listeners, dbname, column_family_name, fname,
+      job_id, meta->fd, tp, reason, s);
+
+  return s;
+}
+
+}  // namespace rocksdb