]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/sst_file_writer.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / table / sst_file_writer.cc
index 0b9fa6e0e92147e1744316178fb80dfb7d150912..e0c4c31896b091fbac32b9012126e0b7b2a12f19 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 
 #include "rocksdb/sst_file_writer.h"
 
@@ -26,41 +26,143 @@ const size_t kFadviseTrigger = 1024 * 1024; // 1MB
 
 struct SstFileWriter::Rep {
   Rep(const EnvOptions& _env_options, const Options& options,
-      const Comparator* _user_comparator, ColumnFamilyHandle* _cfh,
-      bool _invalidate_page_cache)
+      Env::IOPriority _io_priority, const Comparator* _user_comparator,
+      ColumnFamilyHandle* _cfh, bool _invalidate_page_cache, bool _skip_filters)
       : env_options(_env_options),
         ioptions(options),
         mutable_cf_options(options),
+        io_priority(_io_priority),
         internal_comparator(_user_comparator),
         cfh(_cfh),
         invalidate_page_cache(_invalidate_page_cache),
-        last_fadvise_size(0) {}
+        last_fadvise_size(0),
+        skip_filters(_skip_filters) {}
 
   std::unique_ptr<WritableFileWriter> file_writer;
   std::unique_ptr<TableBuilder> builder;
   EnvOptions env_options;
   ImmutableCFOptions ioptions;
   MutableCFOptions mutable_cf_options;
+  Env::IOPriority io_priority;
   InternalKeyComparator internal_comparator;
   ExternalSstFileInfo file_info;
   InternalKey ikey;
   std::string column_family_name;
   ColumnFamilyHandle* cfh;
   // If true, We will give the OS a hint that this file pages is not needed
-  // everytime we write 1MB to the file
+  // every time we write 1MB to the file.
   bool invalidate_page_cache;
-  // the size of the file during the last time we called Fadvise to remove
+  // The size of the file during the last time we called Fadvise to remove
   // cached pages from page cache.
   uint64_t last_fadvise_size;
+  bool skip_filters;
+  Status Add(const Slice& user_key, const Slice& value,
+             const ValueType value_type) {
+    if (!builder) {
+      return Status::InvalidArgument("File is not opened");
+    }
+
+    if (file_info.num_entries == 0) {
+      file_info.smallest_key.assign(user_key.data(), user_key.size());
+    } else {
+      if (internal_comparator.user_comparator()->Compare(
+              user_key, file_info.largest_key) <= 0) {
+        // Make sure that keys are added in order
+        return Status::InvalidArgument("Keys must be added in order");
+      }
+    }
+
+    // TODO(tec) : For external SST files we could omit the seqno and type.
+    switch (value_type) {
+      case ValueType::kTypeValue:
+        ikey.Set(user_key, 0 /* Sequence Number */,
+                 ValueType::kTypeValue /* Put */);
+        break;
+      case ValueType::kTypeMerge:
+        ikey.Set(user_key, 0 /* Sequence Number */,
+                 ValueType::kTypeMerge /* Merge */);
+        break;
+      case ValueType::kTypeDeletion:
+        ikey.Set(user_key, 0 /* Sequence Number */,
+                 ValueType::kTypeDeletion /* Delete */);
+        break;
+      default:
+        return Status::InvalidArgument("Value type is not supported");
+    }
+    builder->Add(ikey.Encode(), value);
+
+    // update file info
+    file_info.num_entries++;
+    file_info.largest_key.assign(user_key.data(), user_key.size());
+    file_info.file_size = builder->FileSize();
+
+    InvalidatePageCache(false /* closing */);
+
+    return Status::OK();
+  }
+
+  Status DeleteRange(const Slice& begin_key, const Slice& end_key) {
+    if (!builder) {
+      return Status::InvalidArgument("File is not opened");
+    }
+
+    RangeTombstone tombstone(begin_key, end_key, 0 /* Sequence Number */);
+    if (file_info.num_range_del_entries == 0) {
+      file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
+                                              tombstone.start_key_.size());
+      file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
+                                             tombstone.end_key_.size());
+    } else {
+      if (internal_comparator.user_comparator()->Compare(
+              tombstone.start_key_, file_info.smallest_range_del_key) < 0) {
+        file_info.smallest_range_del_key.assign(tombstone.start_key_.data(),
+                                                tombstone.start_key_.size());
+      }
+      if (internal_comparator.user_comparator()->Compare(
+              tombstone.end_key_, file_info.largest_range_del_key) > 0) {
+        file_info.largest_range_del_key.assign(tombstone.end_key_.data(),
+                                               tombstone.end_key_.size());
+      }
+    }
+
+    auto ikey_and_end_key = tombstone.Serialize();
+    builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second);
+
+    // update file info
+    file_info.num_range_del_entries++;
+    file_info.file_size = builder->FileSize();
+
+    InvalidatePageCache(false /* closing */);
+
+    return Status::OK();
+  }
+
+  void InvalidatePageCache(bool closing) {
+    if (invalidate_page_cache == false) {
+      // Fadvise disabled
+      return;
+    }
+    uint64_t bytes_since_last_fadvise =
+      builder->FileSize() - last_fadvise_size;
+    if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
+      TEST_SYNC_POINT_CALLBACK("SstFileWriter::Rep::InvalidatePageCache",
+                               &(bytes_since_last_fadvise));
+      // Tell the OS that we dont need this file in page cache
+      file_writer->InvalidateCache(0, 0);
+      last_fadvise_size = builder->FileSize();
+    }
+  }
+
 };
 
 SstFileWriter::SstFileWriter(const EnvOptions& env_options,
                              const Options& options,
                              const Comparator* user_comparator,
                              ColumnFamilyHandle* column_family,
-                             bool invalidate_page_cache)
-    : rep_(new Rep(env_options, options, user_comparator, column_family,
-                   invalidate_page_cache)) {
+                             bool invalidate_page_cache,
+                             Env::IOPriority io_priority, bool skip_filters)
+    : rep_(new Rep(env_options, options, io_priority, user_comparator,
+                   column_family, invalidate_page_cache, skip_filters)) {
   rep_->file_info.file_size = 0;
 }
 
@@ -70,12 +172,10 @@ SstFileWriter::~SstFileWriter() {
     // abandon the builder.
     rep_->builder->Abandon();
   }
-
-  delete rep_;
 }
 
 Status SstFileWriter::Open(const std::string& file_path) {
-  Rep* r = rep_;
+  Rep* r = rep_.get();
   Status s;
   std::unique_ptr<WritableFile> sst_file;
   s = r->ioptions.env->NewWritableFile(file_path, &sst_file, r->env_options);
@@ -83,14 +183,24 @@ Status SstFileWriter::Open(const std::string& file_path) {
     return s;
   }
 
+  sst_file->SetIOPriority(r->io_priority);
+
   CompressionType compression_type;
+  CompressionOptions compression_opts;
   if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
     compression_type = r->ioptions.bottommost_compression;
+    if (r->ioptions.bottommost_compression_opts.enabled) {
+      compression_opts = r->ioptions.bottommost_compression_opts;
+    } else {
+      compression_opts = r->ioptions.compression_opts;
+    }
   } else if (!r->ioptions.compression_per_level.empty()) {
     // Use the compression of the last level if we have per level compression
     compression_type = *(r->ioptions.compression_per_level.rbegin());
+    compression_opts = r->ioptions.compression_opts;
   } else {
     compression_type = r->mutable_cf_options.compression;
+    compression_opts = r->ioptions.compression_opts;
   }
 
   std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
@@ -123,63 +233,52 @@ Status SstFileWriter::Open(const std::string& file_path) {
   }
 
   TableBuilderOptions table_builder_options(
-      r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories,
-      compression_type, r->ioptions.compression_opts,
-      nullptr /* compression_dict */, false /* skip_filters */,
-      r->column_family_name, unknown_level);
+      r->ioptions, r->mutable_cf_options, r->internal_comparator,
+      &int_tbl_prop_collector_factories, compression_type, compression_opts,
+      nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
+      unknown_level);
   r->file_writer.reset(
-      new WritableFileWriter(std::move(sst_file), r->env_options));
+      new WritableFileWriter(std::move(sst_file), file_path, r->env_options));
 
   // TODO(tec) : If table_factory is using compressed block cache, we will
   // be adding the external sst file blocks into it, which is wasteful.
   r->builder.reset(r->ioptions.table_factory->NewTableBuilder(
       table_builder_options, cf_id, r->file_writer.get()));
 
+  r->file_info = ExternalSstFileInfo();
   r->file_info.file_path = file_path;
-  r->file_info.file_size = 0;
-  r->file_info.num_entries = 0;
-  r->file_info.sequence_number = 0;
   r->file_info.version = 2;
   return s;
 }
 
 Status SstFileWriter::Add(const Slice& user_key, const Slice& value) {
-  Rep* r = rep_;
-  if (!r->builder) {
-    return Status::InvalidArgument("File is not opened");
-  }
-
-  if (r->file_info.num_entries == 0) {
-    r->file_info.smallest_key.assign(user_key.data(), user_key.size());
-  } else {
-    if (r->internal_comparator.user_comparator()->Compare(
-            user_key, r->file_info.largest_key) <= 0) {
-      // Make sure that keys are added in order
-      return Status::InvalidArgument("Keys must be added in order");
-    }
-  }
+  return rep_->Add(user_key, value, ValueType::kTypeValue);
+}
 
-  // TODO(tec) : For external SST files we could omit the seqno and type.
-  r->ikey.Set(user_key, 0 /* Sequence Number */,
-              ValueType::kTypeValue /* Put */);
-  r->builder->Add(r->ikey.Encode(), value);
+Status SstFileWriter::Put(const Slice& user_key, const Slice& value) {
+  return rep_->Add(user_key, value, ValueType::kTypeValue);
+}
 
-  // update file info
-  r->file_info.num_entries++;
-  r->file_info.largest_key.assign(user_key.data(), user_key.size());
-  r->file_info.file_size = r->builder->FileSize();
+Status SstFileWriter::Merge(const Slice& user_key, const Slice& value) {
+  return rep_->Add(user_key, value, ValueType::kTypeMerge);
+}
 
-  InvalidatePageCache(false /* closing */);
+Status SstFileWriter::Delete(const Slice& user_key) {
+  return rep_->Add(user_key, Slice(), ValueType::kTypeDeletion);
+}
 
-  return Status::OK();
+Status SstFileWriter::DeleteRange(const Slice& begin_key,
+                                  const Slice& end_key) {
+  return rep_->DeleteRange(begin_key, end_key);
 }
 
 Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
-  Rep* r = rep_;
+  Rep* r = rep_.get();
   if (!r->builder) {
     return Status::InvalidArgument("File is not opened");
   }
-  if (r->file_info.num_entries == 0) {
+  if (r->file_info.num_entries == 0 &&
+      r->file_info.num_range_del_entries == 0) {
     return Status::InvalidArgument("Cannot create sst file with no entries");
   }
 
@@ -188,7 +287,7 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
 
   if (s.ok()) {
     s = r->file_writer->Sync(r->ioptions.use_fsync);
-    InvalidatePageCache(true /* closing */);
+    r->InvalidatePageCache(true /* closing */);
     if (s.ok()) {
       s = r->file_writer->Close();
     }
@@ -205,24 +304,6 @@ Status SstFileWriter::Finish(ExternalSstFileInfo* file_info) {
   return s;
 }
 
-void SstFileWriter::InvalidatePageCache(bool closing) {
-  Rep* r = rep_;
-  if (r->invalidate_page_cache == false) {
-    // Fadvise disabled
-    return;
-  }
-
-  uint64_t bytes_since_last_fadvise =
-      r->builder->FileSize() - r->last_fadvise_size;
-  if (bytes_since_last_fadvise > kFadviseTrigger || closing) {
-    TEST_SYNC_POINT_CALLBACK("SstFileWriter::InvalidatePageCache",
-                             &(bytes_since_last_fadvise));
-    // Tell the OS that we dont need this file in page cache
-    r->file_writer->InvalidateCache(0, 0);
-    r->last_fadvise_size = r->builder->FileSize();
-  }
-}
-
 uint64_t SstFileWriter::FileSize() {
   return rep_->file_info.file_size;
 }