]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/options/options_helper.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / options / options_helper.cc
index 02139a62b5d93886e1470700d1390d4cddffea28..59b01e6fb18dbfd6ddb0cbd9b45ed01cf2de485b 100644 (file)
@@ -7,6 +7,7 @@
 #include <cassert>
 #include <cctype>
 #include <cstdlib>
+#include <set>
 #include <unordered_set>
 #include <vector>
 
 #include "util/string_util.h"
 
 namespace ROCKSDB_NAMESPACE {
+ConfigOptions::ConfigOptions()
+#ifndef ROCKSDB_LITE
+    : registry(ObjectRegistry::NewInstance())
+#endif
+{
+  env = Env::Default();
+}
+
+ConfigOptions::ConfigOptions(const DBOptions& db_opts) : env(db_opts.env) {
+#ifndef ROCKSDB_LITE
+  registry = ObjectRegistry::NewInstance();
+#endif
+}
+
 Status ValidateOptions(const DBOptions& db_opts,
                        const ColumnFamilyOptions& cf_opts) {
   Status s;
@@ -51,8 +66,12 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
       immutable_db_options.create_missing_column_families;
   options.error_if_exists = immutable_db_options.error_if_exists;
   options.paranoid_checks = immutable_db_options.paranoid_checks;
+  options.flush_verify_memtable_count =
+      immutable_db_options.flush_verify_memtable_count;
   options.track_and_verify_wals_in_manifest =
       immutable_db_options.track_and_verify_wals_in_manifest;
+  options.verify_sst_unique_id_in_manifest =
+      immutable_db_options.verify_sst_unique_id_in_manifest;
   options.env = immutable_db_options.env;
   options.rate_limiter = immutable_db_options.rate_limiter;
   options.sst_file_manager = immutable_db_options.sst_file_manager;
@@ -70,8 +89,6 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
   options.delete_obsolete_files_period_micros =
       mutable_db_options.delete_obsolete_files_period_micros;
   options.max_background_jobs = mutable_db_options.max_background_jobs;
-  options.base_background_compactions =
-      mutable_db_options.base_background_compactions;
   options.max_background_compactions =
       mutable_db_options.max_background_compactions;
   options.bytes_per_sync = mutable_db_options.bytes_per_sync;
@@ -86,8 +103,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
   options.max_manifest_file_size = immutable_db_options.max_manifest_file_size;
   options.table_cache_numshardbits =
       immutable_db_options.table_cache_numshardbits;
-  options.WAL_ttl_seconds = immutable_db_options.wal_ttl_seconds;
-  options.WAL_size_limit_MB = immutable_db_options.wal_size_limit_mb;
+  options.WAL_ttl_seconds = immutable_db_options.WAL_ttl_seconds;
+  options.WAL_size_limit_MB = immutable_db_options.WAL_size_limit_MB;
   options.manifest_preallocation_size =
       immutable_db_options.manifest_preallocation_size;
   options.allow_mmap_reads = immutable_db_options.allow_mmap_reads;
@@ -108,8 +125,6 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
   options.write_buffer_manager = immutable_db_options.write_buffer_manager;
   options.access_hint_on_compaction_start =
       immutable_db_options.access_hint_on_compaction_start;
-  options.new_table_reader_for_compaction_inputs =
-      immutable_db_options.new_table_reader_for_compaction_inputs;
   options.compaction_readahead_size =
       mutable_db_options.compaction_readahead_size;
   options.random_access_max_buffer_size =
@@ -149,12 +164,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
       immutable_db_options.avoid_flush_during_recovery;
   options.avoid_flush_during_shutdown =
       mutable_db_options.avoid_flush_during_shutdown;
-  options.allow_ingest_behind =
-      immutable_db_options.allow_ingest_behind;
-  options.preserve_deletes =
-      immutable_db_options.preserve_deletes;
+  options.allow_ingest_behind = immutable_db_options.allow_ingest_behind;
   options.two_write_queues = immutable_db_options.two_write_queues;
   options.manual_wal_flush = immutable_db_options.manual_wal_flush;
+  options.wal_compression = immutable_db_options.wal_compression;
   options.atomic_flush = immutable_db_options.atomic_flush;
   options.avoid_unnecessary_blocking_io =
       immutable_db_options.avoid_unnecessary_blocking_io;
@@ -168,6 +181,11 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
       immutable_db_options.bgerror_resume_retry_interval;
   options.db_host_id = immutable_db_options.db_host_id;
   options.allow_data_in_errors = immutable_db_options.allow_data_in_errors;
+  options.checksum_handoff_file_types =
+      immutable_db_options.checksum_handoff_file_types;
+  options.lowest_used_cache_tier = immutable_db_options.lowest_used_cache_tier;
+  options.enforce_single_del_contracts =
+      immutable_db_options.enforce_single_del_contracts;
   return options;
 }
 
@@ -175,85 +193,134 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
     const ColumnFamilyOptions& options,
     const MutableCFOptions& mutable_cf_options) {
   ColumnFamilyOptions cf_opts(options);
+  UpdateColumnFamilyOptions(mutable_cf_options, &cf_opts);
+  // TODO(yhchiang): find some way to handle the following derived options
+  // * max_file_size
+  return cf_opts;
+}
 
+void UpdateColumnFamilyOptions(const MutableCFOptions& moptions,
+                               ColumnFamilyOptions* cf_opts) {
   // Memtable related options
-  cf_opts.write_buffer_size = mutable_cf_options.write_buffer_size;
-  cf_opts.max_write_buffer_number = mutable_cf_options.max_write_buffer_number;
-  cf_opts.arena_block_size = mutable_cf_options.arena_block_size;
-  cf_opts.memtable_prefix_bloom_size_ratio =
-      mutable_cf_options.memtable_prefix_bloom_size_ratio;
-  cf_opts.memtable_whole_key_filtering =
-      mutable_cf_options.memtable_whole_key_filtering;
-  cf_opts.memtable_huge_page_size = mutable_cf_options.memtable_huge_page_size;
-  cf_opts.max_successive_merges = mutable_cf_options.max_successive_merges;
-  cf_opts.inplace_update_num_locks =
-      mutable_cf_options.inplace_update_num_locks;
-  cf_opts.prefix_extractor = mutable_cf_options.prefix_extractor;
+  cf_opts->write_buffer_size = moptions.write_buffer_size;
+  cf_opts->max_write_buffer_number = moptions.max_write_buffer_number;
+  cf_opts->arena_block_size = moptions.arena_block_size;
+  cf_opts->memtable_prefix_bloom_size_ratio =
+      moptions.memtable_prefix_bloom_size_ratio;
+  cf_opts->memtable_whole_key_filtering = moptions.memtable_whole_key_filtering;
+  cf_opts->memtable_huge_page_size = moptions.memtable_huge_page_size;
+  cf_opts->max_successive_merges = moptions.max_successive_merges;
+  cf_opts->inplace_update_num_locks = moptions.inplace_update_num_locks;
+  cf_opts->prefix_extractor = moptions.prefix_extractor;
+  cf_opts->experimental_mempurge_threshold =
+      moptions.experimental_mempurge_threshold;
+  cf_opts->memtable_protection_bytes_per_key =
+      moptions.memtable_protection_bytes_per_key;
 
   // Compaction related options
-  cf_opts.disable_auto_compactions =
-      mutable_cf_options.disable_auto_compactions;
-  cf_opts.soft_pending_compaction_bytes_limit =
-      mutable_cf_options.soft_pending_compaction_bytes_limit;
-  cf_opts.hard_pending_compaction_bytes_limit =
-      mutable_cf_options.hard_pending_compaction_bytes_limit;
-  cf_opts.level0_file_num_compaction_trigger =
-      mutable_cf_options.level0_file_num_compaction_trigger;
-  cf_opts.level0_slowdown_writes_trigger =
-      mutable_cf_options.level0_slowdown_writes_trigger;
-  cf_opts.level0_stop_writes_trigger =
-      mutable_cf_options.level0_stop_writes_trigger;
-  cf_opts.max_compaction_bytes = mutable_cf_options.max_compaction_bytes;
-  cf_opts.target_file_size_base = mutable_cf_options.target_file_size_base;
-  cf_opts.target_file_size_multiplier =
-      mutable_cf_options.target_file_size_multiplier;
-  cf_opts.max_bytes_for_level_base =
-      mutable_cf_options.max_bytes_for_level_base;
-  cf_opts.max_bytes_for_level_multiplier =
-      mutable_cf_options.max_bytes_for_level_multiplier;
-  cf_opts.ttl = mutable_cf_options.ttl;
-  cf_opts.periodic_compaction_seconds =
-      mutable_cf_options.periodic_compaction_seconds;
-
-  cf_opts.max_bytes_for_level_multiplier_additional.clear();
-  for (auto value :
-       mutable_cf_options.max_bytes_for_level_multiplier_additional) {
-    cf_opts.max_bytes_for_level_multiplier_additional.emplace_back(value);
+  cf_opts->disable_auto_compactions = moptions.disable_auto_compactions;
+  cf_opts->soft_pending_compaction_bytes_limit =
+      moptions.soft_pending_compaction_bytes_limit;
+  cf_opts->hard_pending_compaction_bytes_limit =
+      moptions.hard_pending_compaction_bytes_limit;
+  cf_opts->level0_file_num_compaction_trigger =
+      moptions.level0_file_num_compaction_trigger;
+  cf_opts->level0_slowdown_writes_trigger =
+      moptions.level0_slowdown_writes_trigger;
+  cf_opts->level0_stop_writes_trigger = moptions.level0_stop_writes_trigger;
+  cf_opts->max_compaction_bytes = moptions.max_compaction_bytes;
+  cf_opts->ignore_max_compaction_bytes_for_input =
+      moptions.ignore_max_compaction_bytes_for_input;
+  cf_opts->target_file_size_base = moptions.target_file_size_base;
+  cf_opts->target_file_size_multiplier = moptions.target_file_size_multiplier;
+  cf_opts->max_bytes_for_level_base = moptions.max_bytes_for_level_base;
+  cf_opts->max_bytes_for_level_multiplier =
+      moptions.max_bytes_for_level_multiplier;
+  cf_opts->ttl = moptions.ttl;
+  cf_opts->periodic_compaction_seconds = moptions.periodic_compaction_seconds;
+
+  cf_opts->max_bytes_for_level_multiplier_additional.clear();
+  for (auto value : moptions.max_bytes_for_level_multiplier_additional) {
+    cf_opts->max_bytes_for_level_multiplier_additional.emplace_back(value);
   }
 
-  cf_opts.compaction_options_fifo = mutable_cf_options.compaction_options_fifo;
-  cf_opts.compaction_options_universal =
-      mutable_cf_options.compaction_options_universal;
+  cf_opts->compaction_options_fifo = moptions.compaction_options_fifo;
+  cf_opts->compaction_options_universal = moptions.compaction_options_universal;
 
   // Blob file related options
-  cf_opts.enable_blob_files = mutable_cf_options.enable_blob_files;
-  cf_opts.min_blob_size = mutable_cf_options.min_blob_size;
-  cf_opts.blob_file_size = mutable_cf_options.blob_file_size;
-  cf_opts.blob_compression_type = mutable_cf_options.blob_compression_type;
-  cf_opts.enable_blob_garbage_collection =
-      mutable_cf_options.enable_blob_garbage_collection;
-  cf_opts.blob_garbage_collection_age_cutoff =
-      mutable_cf_options.blob_garbage_collection_age_cutoff;
+  cf_opts->enable_blob_files = moptions.enable_blob_files;
+  cf_opts->min_blob_size = moptions.min_blob_size;
+  cf_opts->blob_file_size = moptions.blob_file_size;
+  cf_opts->blob_compression_type = moptions.blob_compression_type;
+  cf_opts->enable_blob_garbage_collection =
+      moptions.enable_blob_garbage_collection;
+  cf_opts->blob_garbage_collection_age_cutoff =
+      moptions.blob_garbage_collection_age_cutoff;
+  cf_opts->blob_garbage_collection_force_threshold =
+      moptions.blob_garbage_collection_force_threshold;
+  cf_opts->blob_compaction_readahead_size =
+      moptions.blob_compaction_readahead_size;
+  cf_opts->blob_file_starting_level = moptions.blob_file_starting_level;
+  cf_opts->prepopulate_blob_cache = moptions.prepopulate_blob_cache;
 
   // Misc options
-  cf_opts.max_sequential_skip_in_iterations =
-      mutable_cf_options.max_sequential_skip_in_iterations;
-  cf_opts.check_flush_compaction_key_order =
-      mutable_cf_options.check_flush_compaction_key_order;
-  cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks;
-  cf_opts.report_bg_io_stats = mutable_cf_options.report_bg_io_stats;
-  cf_opts.compression = mutable_cf_options.compression;
-  cf_opts.compression_opts = mutable_cf_options.compression_opts;
-  cf_opts.bottommost_compression = mutable_cf_options.bottommost_compression;
-  cf_opts.bottommost_compression_opts =
-      mutable_cf_options.bottommost_compression_opts;
-  cf_opts.sample_for_compression = mutable_cf_options.sample_for_compression;
-
-  cf_opts.table_factory = options.table_factory;
+  cf_opts->max_sequential_skip_in_iterations =
+      moptions.max_sequential_skip_in_iterations;
+  cf_opts->check_flush_compaction_key_order =
+      moptions.check_flush_compaction_key_order;
+  cf_opts->paranoid_file_checks = moptions.paranoid_file_checks;
+  cf_opts->report_bg_io_stats = moptions.report_bg_io_stats;
+  cf_opts->compression = moptions.compression;
+  cf_opts->compression_opts = moptions.compression_opts;
+  cf_opts->bottommost_compression = moptions.bottommost_compression;
+  cf_opts->bottommost_compression_opts = moptions.bottommost_compression_opts;
+  cf_opts->sample_for_compression = moptions.sample_for_compression;
+  cf_opts->compression_per_level = moptions.compression_per_level;
+  cf_opts->last_level_temperature = moptions.last_level_temperature;
+  cf_opts->bottommost_temperature = moptions.last_level_temperature;
+}
+
+void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions,
+                               ColumnFamilyOptions* cf_opts) {
+  cf_opts->compaction_style = ioptions.compaction_style;
+  cf_opts->compaction_pri = ioptions.compaction_pri;
+  cf_opts->comparator = ioptions.user_comparator;
+  cf_opts->merge_operator = ioptions.merge_operator;
+  cf_opts->compaction_filter = ioptions.compaction_filter;
+  cf_opts->compaction_filter_factory = ioptions.compaction_filter_factory;
+  cf_opts->min_write_buffer_number_to_merge =
+      ioptions.min_write_buffer_number_to_merge;
+  cf_opts->max_write_buffer_number_to_maintain =
+      ioptions.max_write_buffer_number_to_maintain;
+  cf_opts->max_write_buffer_size_to_maintain =
+      ioptions.max_write_buffer_size_to_maintain;
+  cf_opts->inplace_update_support = ioptions.inplace_update_support;
+  cf_opts->inplace_callback = ioptions.inplace_callback;
+  cf_opts->memtable_factory = ioptions.memtable_factory;
+  cf_opts->table_factory = ioptions.table_factory;
+  cf_opts->table_properties_collector_factories =
+      ioptions.table_properties_collector_factories;
+  cf_opts->bloom_locality = ioptions.bloom_locality;
+  cf_opts->level_compaction_dynamic_level_bytes =
+      ioptions.level_compaction_dynamic_level_bytes;
+  cf_opts->level_compaction_dynamic_file_size =
+      ioptions.level_compaction_dynamic_file_size;
+  cf_opts->num_levels = ioptions.num_levels;
+  cf_opts->optimize_filters_for_hits = ioptions.optimize_filters_for_hits;
+  cf_opts->force_consistency_checks = ioptions.force_consistency_checks;
+  cf_opts->memtable_insert_with_hint_prefix_extractor =
+      ioptions.memtable_insert_with_hint_prefix_extractor;
+  cf_opts->cf_paths = ioptions.cf_paths;
+  cf_opts->compaction_thread_limiter = ioptions.compaction_thread_limiter;
+  cf_opts->sst_partitioner_factory = ioptions.sst_partitioner_factory;
+  cf_opts->blob_cache = ioptions.blob_cache;
+  cf_opts->preclude_last_level_data_seconds =
+      ioptions.preclude_last_level_data_seconds;
+  cf_opts->preserve_internal_time_seconds =
+      ioptions.preserve_internal_time_seconds;
+
   // TODO(yhchiang): find some way to handle the following derived options
   // * max_file_size
-
-  return cf_opts;
 }
 
 std::map<CompactionStyle, std::string>
@@ -267,18 +334,26 @@ std::map<CompactionPri, std::string> OptionsHelper::compaction_pri_to_string = {
     {kByCompensatedSize, "kByCompensatedSize"},
     {kOldestLargestSeqFirst, "kOldestLargestSeqFirst"},
     {kOldestSmallestSeqFirst, "kOldestSmallestSeqFirst"},
-    {kMinOverlappingRatio, "kMinOverlappingRatio"}};
+    {kMinOverlappingRatio, "kMinOverlappingRatio"},
+    {kRoundRobin, "kRoundRobin"}};
 
 std::map<CompactionStopStyle, std::string>
     OptionsHelper::compaction_stop_style_to_string = {
         {kCompactionStopStyleSimilarSize, "kCompactionStopStyleSimilarSize"},
         {kCompactionStopStyleTotalSize, "kCompactionStopStyleTotalSize"}};
 
+std::map<Temperature, std::string> OptionsHelper::temperature_to_string = {
+    {Temperature::kUnknown, "kUnknown"},
+    {Temperature::kHot, "kHot"},
+    {Temperature::kWarm, "kWarm"},
+    {Temperature::kCold, "kCold"}};
+
 std::unordered_map<std::string, ChecksumType>
     OptionsHelper::checksum_type_string_map = {{"kNoChecksum", kNoChecksum},
                                                {"kCRC32c", kCRC32c},
                                                {"kxxHash", kxxHash},
-                                               {"kxxHash64", kxxHash64}};
+                                               {"kxxHash64", kxxHash64},
+                                               {"kXXH3", kXXH3}};
 
 std::unordered_map<std::string, CompressionType>
     OptionsHelper::compression_type_string_map = {
@@ -294,288 +369,198 @@ std::unordered_map<std::string, CompressionType>
         {"kDisableCompressionOption", kDisableCompressionOption}};
 
 std::vector<CompressionType> GetSupportedCompressions() {
-  std::vector<CompressionType> supported_compressions;
+  // std::set internally to deduplicate potential name aliases
+  std::set<CompressionType> supported_compressions;
   for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) {
     CompressionType t = comp_to_name.second;
     if (t != kDisableCompressionOption && CompressionTypeSupported(t)) {
-      supported_compressions.push_back(t);
+      supported_compressions.insert(t);
     }
   }
-  return supported_compressions;
+  return std::vector<CompressionType>(supported_compressions.begin(),
+                                      supported_compressions.end());
 }
 
 std::vector<CompressionType> GetSupportedDictCompressions() {
-  std::vector<CompressionType> dict_compression_types;
+  std::set<CompressionType> dict_compression_types;
   for (const auto& comp_to_name : OptionsHelper::compression_type_string_map) {
     CompressionType t = comp_to_name.second;
     if (t != kDisableCompressionOption && DictCompressionTypeSupported(t)) {
-      dict_compression_types.push_back(t);
+      dict_compression_types.insert(t);
     }
   }
-  return dict_compression_types;
-}
-
-#ifndef ROCKSDB_LITE
-bool ParseSliceTransformHelper(
-    const std::string& kFixedPrefixName, const std::string& kCappedPrefixName,
-    const std::string& value,
-    std::shared_ptr<const SliceTransform>* slice_transform) {
-  const char* no_op_name = "rocksdb.Noop";
-  size_t no_op_length = strlen(no_op_name);
-  auto& pe_value = value;
-  if (pe_value.size() > kFixedPrefixName.size() &&
-      pe_value.compare(0, kFixedPrefixName.size(), kFixedPrefixName) == 0) {
-    int prefix_length = ParseInt(trim(value.substr(kFixedPrefixName.size())));
-    slice_transform->reset(NewFixedPrefixTransform(prefix_length));
-  } else if (pe_value.size() > kCappedPrefixName.size() &&
-             pe_value.compare(0, kCappedPrefixName.size(), kCappedPrefixName) ==
-                 0) {
-    int prefix_length =
-        ParseInt(trim(pe_value.substr(kCappedPrefixName.size())));
-    slice_transform->reset(NewCappedPrefixTransform(prefix_length));
-  } else if (pe_value.size() == no_op_length &&
-             pe_value.compare(0, no_op_length, no_op_name) == 0) {
-    const SliceTransform* no_op_transform = NewNoopTransform();
-    slice_transform->reset(no_op_transform);
-  } else if (value == kNullptrString) {
-    slice_transform->reset();
-  } else {
-    return false;
-  }
-
-  return true;
+  return std::vector<CompressionType>(dict_compression_types.begin(),
+                                      dict_compression_types.end());
 }
 
-bool ParseSliceTransform(
-    const std::string& value,
-    std::shared_ptr<const SliceTransform>* slice_transform) {
-  // While we normally don't convert the string representation of a
-  // pointer-typed option into its instance, here we do so for backward
-  // compatibility as we allow this action in SetOption().
-
-  // TODO(yhchiang): A possible better place for these serialization /
-  // deserialization is inside the class definition of pointer-typed
-  // option itself, but this requires a bigger change of public API.
-  bool result =
-      ParseSliceTransformHelper("fixed:", "capped:", value, slice_transform);
-  if (result) {
-    return result;
-  }
-  result = ParseSliceTransformHelper(
-      "rocksdb.FixedPrefix.", "rocksdb.CappedPrefix.", value, slice_transform);
-  if (result) {
-    return result;
+std::vector<ChecksumType> GetSupportedChecksums() {
+  std::set<ChecksumType> checksum_types;
+  for (const auto& e : OptionsHelper::checksum_type_string_map) {
+    checksum_types.insert(e.second);
   }
-  // TODO(yhchiang): we can further support other default
-  //                 SliceTransforms here.
-  return false;
+  return std::vector<ChecksumType>(checksum_types.begin(),
+                                   checksum_types.end());
 }
 
-static bool ParseOptionHelper(char* opt_address, const OptionType& opt_type,
+#ifndef ROCKSDB_LITE
+static bool ParseOptionHelper(void* opt_address, const OptionType& opt_type,
                               const std::string& value) {
   switch (opt_type) {
     case OptionType::kBoolean:
-      *reinterpret_cast<bool*>(opt_address) = ParseBoolean("", value);
+      *static_cast<bool*>(opt_address) = ParseBoolean("", value);
       break;
     case OptionType::kInt:
-      *reinterpret_cast<int*>(opt_address) = ParseInt(value);
+      *static_cast<int*>(opt_address) = ParseInt(value);
       break;
     case OptionType::kInt32T:
-      *reinterpret_cast<int32_t*>(opt_address) = ParseInt32(value);
+      *static_cast<int32_t*>(opt_address) = ParseInt32(value);
       break;
     case OptionType::kInt64T:
-      PutUnaligned(reinterpret_cast<int64_t*>(opt_address), ParseInt64(value));
+      PutUnaligned(static_cast<int64_t*>(opt_address), ParseInt64(value));
       break;
     case OptionType::kUInt:
-      *reinterpret_cast<unsigned int*>(opt_address) = ParseUint32(value);
+      *static_cast<unsigned int*>(opt_address) = ParseUint32(value);
+      break;
+    case OptionType::kUInt8T:
+      *static_cast<uint8_t*>(opt_address) = ParseUint8(value);
       break;
     case OptionType::kUInt32T:
-      *reinterpret_cast<uint32_t*>(opt_address) = ParseUint32(value);
+      *static_cast<uint32_t*>(opt_address) = ParseUint32(value);
       break;
     case OptionType::kUInt64T:
-      PutUnaligned(reinterpret_cast<uint64_t*>(opt_address), ParseUint64(value));
+      PutUnaligned(static_cast<uint64_t*>(opt_address), ParseUint64(value));
       break;
     case OptionType::kSizeT:
-      PutUnaligned(reinterpret_cast<size_t*>(opt_address), ParseSizeT(value));
+      PutUnaligned(static_cast<size_t*>(opt_address), ParseSizeT(value));
       break;
     case OptionType::kString:
-      *reinterpret_cast<std::string*>(opt_address) = value;
+      *static_cast<std::string*>(opt_address) = value;
       break;
     case OptionType::kDouble:
-      *reinterpret_cast<double*>(opt_address) = ParseDouble(value);
+      *static_cast<double*>(opt_address) = ParseDouble(value);
       break;
     case OptionType::kCompactionStyle:
       return ParseEnum<CompactionStyle>(
           compaction_style_string_map, value,
-          reinterpret_cast<CompactionStyle*>(opt_address));
+          static_cast<CompactionStyle*>(opt_address));
     case OptionType::kCompactionPri:
-      return ParseEnum<CompactionPri>(
-          compaction_pri_string_map, value,
-          reinterpret_cast<CompactionPri*>(opt_address));
+      return ParseEnum<CompactionPri>(compaction_pri_string_map, value,
+                                      static_cast<CompactionPri*>(opt_address));
     case OptionType::kCompressionType:
       return ParseEnum<CompressionType>(
           compression_type_string_map, value,
-          reinterpret_cast<CompressionType*>(opt_address));
-    case OptionType::kSliceTransform:
-      return ParseSliceTransform(
-          value, reinterpret_cast<std::shared_ptr<const SliceTransform>*>(
-                     opt_address));
+          static_cast<CompressionType*>(opt_address));
     case OptionType::kChecksumType:
-      return ParseEnum<ChecksumType>(
-          checksum_type_string_map, value,
-          reinterpret_cast<ChecksumType*>(opt_address));
+      return ParseEnum<ChecksumType>(checksum_type_string_map, value,
+                                     static_cast<ChecksumType*>(opt_address));
     case OptionType::kEncodingType:
-      return ParseEnum<EncodingType>(
-          encoding_type_string_map, value,
-          reinterpret_cast<EncodingType*>(opt_address));
+      return ParseEnum<EncodingType>(encoding_type_string_map, value,
+                                     static_cast<EncodingType*>(opt_address));
     case OptionType::kCompactionStopStyle:
       return ParseEnum<CompactionStopStyle>(
           compaction_stop_style_string_map, value,
-          reinterpret_cast<CompactionStopStyle*>(opt_address));
+          static_cast<CompactionStopStyle*>(opt_address));
+    case OptionType::kEncodedString: {
+      std::string* output_addr = static_cast<std::string*>(opt_address);
+      (Slice(value)).DecodeHex(output_addr);
+      break;
+    }
+    case OptionType::kTemperature: {
+      return ParseEnum<Temperature>(temperature_string_map, value,
+                                    static_cast<Temperature*>(opt_address));
+    }
     default:
       return false;
   }
   return true;
 }
 
-bool SerializeSingleOptionHelper(const char* opt_address,
+bool SerializeSingleOptionHelper(const void* opt_address,
                                  const OptionType opt_type,
                                  std::string* value) {
-
   assert(value);
   switch (opt_type) {
     case OptionType::kBoolean:
-      *value = *(reinterpret_cast<const bool*>(opt_address)) ? "true" : "false";
+      *value = *(static_cast<const bool*>(opt_address)) ? "true" : "false";
       break;
     case OptionType::kInt:
-      *value = ToString(*(reinterpret_cast<const int*>(opt_address)));
+      *value = std::to_string(*(static_cast<const int*>(opt_address)));
       break;
     case OptionType::kInt32T:
-      *value = ToString(*(reinterpret_cast<const int32_t*>(opt_address)));
+      *value = std::to_string(*(static_cast<const int32_t*>(opt_address)));
       break;
     case OptionType::kInt64T:
       {
         int64_t v;
-        GetUnaligned(reinterpret_cast<const int64_t*>(opt_address), &v);
-        *value = ToString(v);
+        GetUnaligned(static_cast<const int64_t*>(opt_address), &v);
+        *value = std::to_string(v);
       }
       break;
     case OptionType::kUInt:
-      *value = ToString(*(reinterpret_cast<const unsigned int*>(opt_address)));
+      *value = std::to_string(*(static_cast<const unsigned int*>(opt_address)));
+      break;
+    case OptionType::kUInt8T:
+      *value = std::to_string(*(static_cast<const uint8_t*>(opt_address)));
       break;
     case OptionType::kUInt32T:
-      *value = ToString(*(reinterpret_cast<const uint32_t*>(opt_address)));
+      *value = std::to_string(*(static_cast<const uint32_t*>(opt_address)));
       break;
     case OptionType::kUInt64T:
       {
         uint64_t v;
-        GetUnaligned(reinterpret_cast<const uint64_t*>(opt_address), &v);
-        *value = ToString(v);
+        GetUnaligned(static_cast<const uint64_t*>(opt_address), &v);
+        *value = std::to_string(v);
       }
       break;
     case OptionType::kSizeT:
       {
         size_t v;
-        GetUnaligned(reinterpret_cast<const size_t*>(opt_address), &v);
-        *value = ToString(v);
+        GetUnaligned(static_cast<const size_t*>(opt_address), &v);
+        *value = std::to_string(v);
       }
       break;
     case OptionType::kDouble:
-      *value = ToString(*(reinterpret_cast<const double*>(opt_address)));
+      *value = std::to_string(*(static_cast<const double*>(opt_address)));
       break;
     case OptionType::kString:
-      *value = EscapeOptionString(
-          *(reinterpret_cast<const std::string*>(opt_address)));
+      *value =
+          EscapeOptionString(*(static_cast<const std::string*>(opt_address)));
       break;
     case OptionType::kCompactionStyle:
       return SerializeEnum<CompactionStyle>(
           compaction_style_string_map,
-          *(reinterpret_cast<const CompactionStyle*>(opt_address)), value);
+          *(static_cast<const CompactionStyle*>(opt_address)), value);
     case OptionType::kCompactionPri:
       return SerializeEnum<CompactionPri>(
           compaction_pri_string_map,
-          *(reinterpret_cast<const CompactionPri*>(opt_address)), value);
+          *(static_cast<const CompactionPri*>(opt_address)), value);
     case OptionType::kCompressionType:
       return SerializeEnum<CompressionType>(
           compression_type_string_map,
-          *(reinterpret_cast<const CompressionType*>(opt_address)), value);
-    case OptionType::kSliceTransform: {
-      const auto* slice_transform_ptr =
-          reinterpret_cast<const std::shared_ptr<const SliceTransform>*>(
-              opt_address);
-      *value = slice_transform_ptr->get() ? slice_transform_ptr->get()->Name()
-                                          : kNullptrString;
-      break;
-    }
-    case OptionType::kComparator: {
-      // it's a const pointer of const Comparator*
-      const auto* ptr = reinterpret_cast<const Comparator* const*>(opt_address);
-      // Since the user-specified comparator will be wrapped by
-      // InternalKeyComparator, we should persist the user-specified one
-      // instead of InternalKeyComparator.
-      if (*ptr == nullptr) {
-        *value = kNullptrString;
-      } else {
-        const Comparator* root_comp = (*ptr)->GetRootComparator();
-        if (root_comp == nullptr) {
-          root_comp = (*ptr);
-        }
-        *value = root_comp->Name();
-      }
-      break;
-    }
-    case OptionType::kCompactionFilter: {
-      // it's a const pointer of const CompactionFilter*
-      const auto* ptr =
-          reinterpret_cast<const CompactionFilter* const*>(opt_address);
-      *value = *ptr ? (*ptr)->Name() : kNullptrString;
+          *(static_cast<const CompressionType*>(opt_address)), value);
       break;
-    }
-    case OptionType::kCompactionFilterFactory: {
-      const auto* ptr =
-          reinterpret_cast<const std::shared_ptr<CompactionFilterFactory>*>(
-              opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
-    case OptionType::kMemTableRepFactory: {
-      const auto* ptr =
-          reinterpret_cast<const std::shared_ptr<MemTableRepFactory>*>(
-              opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
-    case OptionType::kMergeOperator: {
-      const auto* ptr =
-          reinterpret_cast<const std::shared_ptr<MergeOperator>*>(opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
-    case OptionType::kFilterPolicy: {
-      const auto* ptr =
-          reinterpret_cast<const std::shared_ptr<FilterPolicy>*>(opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
     case OptionType::kChecksumType:
       return SerializeEnum<ChecksumType>(
           checksum_type_string_map,
-          *reinterpret_cast<const ChecksumType*>(opt_address), value);
-    case OptionType::kFlushBlockPolicyFactory: {
-      const auto* ptr =
-          reinterpret_cast<const std::shared_ptr<FlushBlockPolicyFactory>*>(
-              opt_address);
-      *value = ptr->get() ? ptr->get()->Name() : kNullptrString;
-      break;
-    }
+          *static_cast<const ChecksumType*>(opt_address), value);
     case OptionType::kEncodingType:
       return SerializeEnum<EncodingType>(
           encoding_type_string_map,
-          *reinterpret_cast<const EncodingType*>(opt_address), value);
+          *static_cast<const EncodingType*>(opt_address), value);
     case OptionType::kCompactionStopStyle:
       return SerializeEnum<CompactionStopStyle>(
           compaction_stop_style_string_map,
-          *reinterpret_cast<const CompactionStopStyle*>(opt_address), value);
+          *static_cast<const CompactionStopStyle*>(opt_address), value);
+    case OptionType::kEncodedString: {
+      const auto* ptr = static_cast<const std::string*>(opt_address);
+      *value = (Slice(*ptr)).ToString(true);
+      break;
+    }
+    case OptionType::kTemperature: {
+      return SerializeEnum<Temperature>(
+          temperature_string_map, *static_cast<const Temperature*>(opt_address),
+          value);
+    }
     default:
       return false;
   }
@@ -594,32 +579,6 @@ Status ConfigureFromMap(
   return s;
 }
 
-Status GetMutableOptionsFromStrings(
-    const MutableCFOptions& base_options,
-    const std::unordered_map<std::string, std::string>& options_map,
-    Logger* /*info_log*/, MutableCFOptions* new_options) {
-  assert(new_options);
-  *new_options = base_options;
-  ConfigOptions config_options;
-  const auto config = CFOptionsAsConfigurable(base_options);
-  return ConfigureFromMap<MutableCFOptions>(config_options, options_map,
-                                            MutableCFOptions::kName(),
-                                            config.get(), new_options);
-}
-
-Status GetMutableDBOptionsFromStrings(
-    const MutableDBOptions& base_options,
-    const std::unordered_map<std::string, std::string>& options_map,
-    MutableDBOptions* new_options) {
-  assert(new_options);
-  *new_options = base_options;
-  ConfigOptions config_options;
-
-  auto config = DBOptionsAsConfigurable(base_options);
-  return ConfigureFromMap<MutableDBOptions>(config_options, options_map,
-                                            MutableDBOptions::kName(),
-                                            config.get(), new_options);
-}
 
 Status StringToMap(const std::string& opts_str,
                    std::unordered_map<std::string, std::string>* opts_map) {
@@ -635,10 +594,13 @@ Status StringToMap(const std::string& opts_str,
   }
 
   while (pos < opts.size()) {
-    size_t eq_pos = opts.find('=', pos);
+    size_t eq_pos = opts.find_first_of("={};", pos);
     if (eq_pos == std::string::npos) {
       return Status::InvalidArgument("Mismatched key value pair, '=' expected");
+    } else if (opts[eq_pos] != '=') {
+      return Status::InvalidArgument("Unexpected char in key");
     }
+
     std::string key = trim(opts.substr(pos, eq_pos - pos));
     if (key.empty()) {
       return Status::InvalidArgument("Empty key found");
@@ -661,17 +623,11 @@ Status StringToMap(const std::string& opts_str,
   return Status::OK();
 }
 
-Status GetStringFromMutableDBOptions(const ConfigOptions& config_options,
-                                     const MutableDBOptions& mutable_opts,
-                                     std::string* opt_string) {
-  auto config = DBOptionsAsConfigurable(mutable_opts);
-  return config->GetOptionString(config_options, opt_string);
-}
 
 Status GetStringFromDBOptions(std::string* opt_string,
                               const DBOptions& db_options,
                               const std::string& delimiter) {
-  ConfigOptions config_options;
+  ConfigOptions config_options(db_options);
   config_options.delimiter = delimiter;
   return GetStringFromDBOptions(config_options, db_options, opt_string);
 }
@@ -685,14 +641,6 @@ Status GetStringFromDBOptions(const ConfigOptions& config_options,
   return config->GetOptionString(config_options, opt_string);
 }
 
-Status GetStringFromMutableCFOptions(const ConfigOptions& config_options,
-                                     const MutableCFOptions& mutable_opts,
-                                     std::string* opt_string) {
-  assert(opt_string);
-  opt_string->clear();
-  const auto config = CFOptionsAsConfigurable(mutable_opts);
-  return config->GetOptionString(config_options, opt_string);
-}
 
 Status GetStringFromColumnFamilyOptions(std::string* opt_string,
                                         const ColumnFamilyOptions& cf_options,
@@ -784,7 +732,7 @@ Status GetDBOptionsFromMap(
     const std::unordered_map<std::string, std::string>& opts_map,
     DBOptions* new_options, bool input_strings_escaped,
     bool ignore_unknown_options) {
-  ConfigOptions config_options;
+  ConfigOptions config_options(base_options);
   config_options.input_strings_escaped = input_strings_escaped;
   config_options.ignore_unknown_options = ignore_unknown_options;
   return GetDBOptionsFromMap(config_options, base_options, opts_map,
@@ -812,7 +760,7 @@ Status GetDBOptionsFromMap(
 Status GetDBOptionsFromString(const DBOptions& base_options,
                               const std::string& opts_str,
                               DBOptions* new_options) {
-  ConfigOptions config_options;
+  ConfigOptions config_options(base_options);
   config_options.input_strings_escaped = false;
   config_options.ignore_unknown_options = false;
 
@@ -836,7 +784,7 @@ Status GetDBOptionsFromString(const ConfigOptions& config_options,
 
 Status GetOptionsFromString(const Options& base_options,
                             const std::string& opts_str, Options* new_options) {
-  ConfigOptions config_options;
+  ConfigOptions config_options(base_options);
   config_options.input_strings_escaped = false;
   config_options.ignore_unknown_options = false;
 
@@ -851,6 +799,7 @@ Status GetOptionsFromString(const ConfigOptions& config_options,
   std::unordered_map<std::string, std::string> unused_opts;
   std::unordered_map<std::string, std::string> opts_map;
 
+  assert(new_options);
   *new_options = base_options;
   Status s = StringToMap(opts_str, &opts_map);
   if (!s.ok()) {
@@ -896,13 +845,26 @@ std::unordered_map<std::string, CompactionPri>
         {"kByCompensatedSize", kByCompensatedSize},
         {"kOldestLargestSeqFirst", kOldestLargestSeqFirst},
         {"kOldestSmallestSeqFirst", kOldestSmallestSeqFirst},
-        {"kMinOverlappingRatio", kMinOverlappingRatio}};
+        {"kMinOverlappingRatio", kMinOverlappingRatio},
+        {"kRoundRobin", kRoundRobin}};
 
 std::unordered_map<std::string, CompactionStopStyle>
     OptionsHelper::compaction_stop_style_string_map = {
         {"kCompactionStopStyleSimilarSize", kCompactionStopStyleSimilarSize},
         {"kCompactionStopStyleTotalSize", kCompactionStopStyleTotalSize}};
 
+std::unordered_map<std::string, Temperature>
+    OptionsHelper::temperature_string_map = {
+        {"kUnknown", Temperature::kUnknown},
+        {"kHot", Temperature::kHot},
+        {"kWarm", Temperature::kWarm},
+        {"kCold", Temperature::kCold}};
+
+std::unordered_map<std::string, PrepopulateBlobCache>
+    OptionsHelper::prepopulate_blob_cache_string_map = {
+        {"kDisable", PrepopulateBlobCache::kDisable},
+        {"kFlushOnly", PrepopulateBlobCache::kFlushOnly}};
+
 Status OptionTypeInfo::NextToken(const std::string& opts, char delimiter,
                                  size_t pos, size_t* end, std::string* token) {
   while (pos < opts.size() && isspace(opts[pos])) {
@@ -963,18 +925,18 @@ Status OptionTypeInfo::Parse(const ConfigOptions& config_options,
     return Status::OK();
   }
   try {
-    char* opt_addr = reinterpret_cast<char*>(opt_ptr) + offset_;
     const std::string& opt_value = config_options.input_strings_escaped
                                        ? UnescapeOptionString(value)
                                        : value;
 
-    if (opt_addr == nullptr) {
+    if (opt_ptr == nullptr) {
       return Status::NotFound("Could not find option", opt_name);
     } else if (parse_func_ != nullptr) {
       ConfigOptions copy = config_options;
       copy.invoke_prepare_options = false;
+      void* opt_addr = GetOffset(opt_ptr);
       return parse_func_(copy, opt_name, opt_value, opt_addr);
-    } else if (ParseOptionHelper(opt_addr, type_, opt_value)) {
+    } else if (ParseOptionHelper(GetOffset(opt_ptr), type_, opt_value)) {
       return Status::OK();
     } else if (IsConfigurable()) {
       // The option is <config>.<name>
@@ -1005,28 +967,56 @@ Status OptionTypeInfo::Parse(const ConfigOptions& config_options,
   }
 }
 
+Status OptionTypeInfo::ParseType(
+    const ConfigOptions& config_options, const std::string& opts_str,
+    const std::unordered_map<std::string, OptionTypeInfo>& type_map,
+    void* opt_addr, std::unordered_map<std::string, std::string>* unused) {
+  std::unordered_map<std::string, std::string> opts_map;
+  Status status = StringToMap(opts_str, &opts_map);
+  if (!status.ok()) {
+    return status;
+  } else {
+    return ParseType(config_options, opts_map, type_map, opt_addr, unused);
+  }
+}
+
+Status OptionTypeInfo::ParseType(
+    const ConfigOptions& config_options,
+    const std::unordered_map<std::string, std::string>& opts_map,
+    const std::unordered_map<std::string, OptionTypeInfo>& type_map,
+    void* opt_addr, std::unordered_map<std::string, std::string>* unused) {
+  for (const auto& opts_iter : opts_map) {
+    std::string opt_name;
+    const auto* opt_info = Find(opts_iter.first, type_map, &opt_name);
+    if (opt_info != nullptr) {
+      Status status =
+          opt_info->Parse(config_options, opt_name, opts_iter.second, opt_addr);
+      if (!status.ok()) {
+        return status;
+      }
+    } else if (unused != nullptr) {
+      (*unused)[opts_iter.first] = opts_iter.second;
+    } else if (!config_options.ignore_unknown_options) {
+      return Status::NotFound("Unrecognized option", opts_iter.first);
+    }
+  }
+  return Status::OK();
+}
+
 Status OptionTypeInfo::ParseStruct(
     const ConfigOptions& config_options, const std::string& struct_name,
     const std::unordered_map<std::string, OptionTypeInfo>* struct_map,
-    const std::string& opt_name, const std::string& opt_value, char* opt_addr) {
+    const std::string& opt_name, const std::string& opt_value, void* opt_addr) {
   assert(struct_map);
   Status status;
   if (opt_name == struct_name || EndsWith(opt_name, "." + struct_name)) {
     // This option represents the entire struct
-    std::unordered_map<std::string, std::string> opt_map;
-    status = StringToMap(opt_value, &opt_map);
-    for (const auto& map_iter : opt_map) {
-      if (!status.ok()) {
-        break;
-      }
-      const auto iter = struct_map->find(map_iter.first);
-      if (iter != struct_map->end()) {
-        status = iter->second.Parse(config_options, map_iter.first,
-                                    map_iter.second, opt_addr);
-      } else {
-        status = Status::InvalidArgument("Unrecognized option",
-                                         struct_name + "." + map_iter.first);
-      }
+    std::unordered_map<std::string, std::string> unused;
+    status =
+        ParseType(config_options, opt_value, *struct_map, opt_addr, &unused);
+    if (status.ok() && !unused.empty()) {
+      status = Status::InvalidArgument(
+          "Unrecognized option", struct_name + "." + unused.begin()->first);
     }
   } else if (StartsWith(opt_name, struct_name + ".")) {
     // This option represents a nested field in the struct (e.g, struct.field)
@@ -1058,26 +1048,47 @@ Status OptionTypeInfo::Serialize(const ConfigOptions& config_options,
                                  std::string* opt_value) const {
   // If the option is no longer used in rocksdb and marked as deprecated,
   // we skip it in the serialization.
-  const char* opt_addr = reinterpret_cast<const char*>(opt_ptr) + offset_;
-  if (opt_addr == nullptr || IsDeprecated()) {
+  if (opt_ptr == nullptr || IsDeprecated()) {
     return Status::OK();
   } else if (IsEnabled(OptionTypeFlags::kDontSerialize)) {
     return Status::NotSupported("Cannot serialize option: ", opt_name);
   } else if (serialize_func_ != nullptr) {
+    const void* opt_addr = GetOffset(opt_ptr);
     return serialize_func_(config_options, opt_name, opt_addr, opt_value);
-  } else if (SerializeSingleOptionHelper(opt_addr, type_, opt_value)) {
-    return Status::OK();
   } else if (IsCustomizable()) {
     const Customizable* custom = AsRawPointer<Customizable>(opt_ptr);
+    opt_value->clear();
     if (custom == nullptr) {
-      *opt_value = kNullptrString;
+      // We do not have a custom object to serialize.
+      // If the option is not mutable and we are doing only mutable options,
+      // we return an empty string (which will cause the option not to be
+      // printed). Otherwise, we return the "nullptr" string, which will result
+      // in "option=nullptr" being printed.
+      if (IsMutable() || !config_options.mutable_options_only) {
+        *opt_value = kNullptrString;
+      } else {
+        *opt_value = "";
+      }
     } else if (IsEnabled(OptionTypeFlags::kStringNameOnly) &&
                !config_options.IsDetailed()) {
-      *opt_value = custom->GetId();
+      if (!config_options.mutable_options_only || IsMutable()) {
+        *opt_value = custom->GetId();
+      }
     } else {
       ConfigOptions embedded = config_options;
       embedded.delimiter = ";";
-      *opt_value = custom->ToString(embedded);
+      // If this option is mutable, everything inside it should be considered
+      // mutable
+      if (IsMutable()) {
+        embedded.mutable_options_only = false;
+      }
+      std::string value = custom->ToString(embedded);
+      if (!embedded.mutable_options_only ||
+          value.find("=") != std::string::npos) {
+        *opt_value = value;
+      } else {
+        *opt_value = "";
+      }
     }
     return Status::OK();
   } else if (IsConfigurable()) {
@@ -1088,15 +1099,41 @@ Status OptionTypeInfo::Serialize(const ConfigOptions& config_options,
       *opt_value = config->ToString(embedded);
     }
     return Status::OK();
+  } else if (config_options.mutable_options_only && !IsMutable()) {
+    return Status::OK();
+  } else if (SerializeSingleOptionHelper(GetOffset(opt_ptr), type_,
+                                         opt_value)) {
+    return Status::OK();
   } else {
     return Status::InvalidArgument("Cannot serialize option: ", opt_name);
   }
 }
 
+Status OptionTypeInfo::SerializeType(
+    const ConfigOptions& config_options,
+    const std::unordered_map<std::string, OptionTypeInfo>& type_map,
+    const void* opt_addr, std::string* result) {
+  Status status;
+  for (const auto& iter : type_map) {
+    std::string single;
+    const auto& opt_info = iter.second;
+    if (opt_info.ShouldSerialize()) {
+      status =
+          opt_info.Serialize(config_options, iter.first, opt_addr, &single);
+      if (!status.ok()) {
+        return status;
+      } else {
+        result->append(iter.first + "=" + single + config_options.delimiter);
+      }
+    }
+  }
+  return status;
+}
+
 Status OptionTypeInfo::SerializeStruct(
     const ConfigOptions& config_options, const std::string& struct_name,
     const std::unordered_map<std::string, OptionTypeInfo>* struct_map,
-    const std::string& opt_name, const char* opt_addr, std::string* value) {
+    const std::string& opt_name, const void* opt_addr, std::string* value) {
   assert(struct_map);
   Status status;
   if (EndsWith(opt_name, struct_name)) {
@@ -1107,19 +1144,12 @@ Status OptionTypeInfo::SerializeStruct(
 
     // This option represents the entire struct
     std::string result;
-    for (const auto& iter : *struct_map) {
-      std::string single;
-      const auto& opt_info = iter.second;
-      if (opt_info.ShouldSerialize()) {
-        status = opt_info.Serialize(embedded, iter.first, opt_addr, &single);
-        if (!status.ok()) {
-          return status;
-        } else {
-          result.append(iter.first + "=" + single + embedded.delimiter);
-        }
-      }
+    status = SerializeType(embedded, *struct_map, opt_addr, &result);
+    if (!status.ok()) {
+      return status;
+    } else {
+      *value = "{" + result + "}";
     }
-    *value = "{" + result + "}";
   } else if (StartsWith(opt_name, struct_name + ".")) {
     // This option represents a nested field in the struct (e.g, struct.field)
     std::string elem_name;
@@ -1145,17 +1175,16 @@ Status OptionTypeInfo::SerializeStruct(
 }
 
 template <typename T>
-bool IsOptionEqual(const char* offset1, const char* offset2) {
-  return (*reinterpret_cast<const T*>(offset1) ==
-          *reinterpret_cast<const T*>(offset2));
+bool IsOptionEqual(const void* offset1, const void* offset2) {
+  return (*static_cast<const T*>(offset1) == *static_cast<const T*>(offset2));
 }
 
 static bool AreEqualDoubles(const double a, const double b) {
   return (fabs(a - b) < 0.00001);
 }
 
-static bool AreOptionsEqual(OptionType type, const char* this_offset,
-                            const char* that_offset) {
+static bool AreOptionsEqual(OptionType type, const void* this_offset,
+                            const void* that_offset) {
   switch (type) {
     case OptionType::kBoolean:
       return IsOptionEqual<bool>(this_offset, that_offset);
@@ -1167,29 +1196,31 @@ static bool AreOptionsEqual(OptionType type, const char* this_offset,
       return IsOptionEqual<int32_t>(this_offset, that_offset);
     case OptionType::kInt64T: {
       int64_t v1, v2;
-      GetUnaligned(reinterpret_cast<const int64_t*>(this_offset), &v1);
-      GetUnaligned(reinterpret_cast<const int64_t*>(that_offset), &v2);
+      GetUnaligned(static_cast<const int64_t*>(this_offset), &v1);
+      GetUnaligned(static_cast<const int64_t*>(that_offset), &v2);
       return (v1 == v2);
     }
+    case OptionType::kUInt8T:
+      return IsOptionEqual<uint8_t>(this_offset, that_offset);
     case OptionType::kUInt32T:
       return IsOptionEqual<uint32_t>(this_offset, that_offset);
     case OptionType::kUInt64T: {
       uint64_t v1, v2;
-      GetUnaligned(reinterpret_cast<const uint64_t*>(this_offset), &v1);
-      GetUnaligned(reinterpret_cast<const uint64_t*>(that_offset), &v2);
+      GetUnaligned(static_cast<const uint64_t*>(this_offset), &v1);
+      GetUnaligned(static_cast<const uint64_t*>(that_offset), &v2);
       return (v1 == v2);
     }
     case OptionType::kSizeT: {
       size_t v1, v2;
-      GetUnaligned(reinterpret_cast<const size_t*>(this_offset), &v1);
-      GetUnaligned(reinterpret_cast<const size_t*>(that_offset), &v2);
+      GetUnaligned(static_cast<const size_t*>(this_offset), &v1);
+      GetUnaligned(static_cast<const size_t*>(that_offset), &v2);
       return (v1 == v2);
     }
     case OptionType::kString:
       return IsOptionEqual<std::string>(this_offset, that_offset);
     case OptionType::kDouble:
-      return AreEqualDoubles(*reinterpret_cast<const double*>(this_offset),
-                             *reinterpret_cast<const double*>(that_offset));
+      return AreEqualDoubles(*static_cast<const double*>(this_offset),
+                             *static_cast<const double*>(that_offset));
     case OptionType::kCompactionStyle:
       return IsOptionEqual<CompactionStyle>(this_offset, that_offset);
     case OptionType::kCompactionStopStyle:
@@ -1202,6 +1233,10 @@ static bool AreOptionsEqual(OptionType type, const char* this_offset,
       return IsOptionEqual<ChecksumType>(this_offset, that_offset);
     case OptionType::kEncodingType:
       return IsOptionEqual<EncodingType>(this_offset, that_offset);
+    case OptionType::kEncodedString:
+      return IsOptionEqual<std::string>(this_offset, that_offset);
+    case OptionType::kTemperature:
+      return IsOptionEqual<Temperature>(this_offset, that_offset);
     default:
       return false;
   }  // End switch
@@ -1216,39 +1251,43 @@ bool OptionTypeInfo::AreEqual(const ConfigOptions& config_options,
   if (!config_options.IsCheckEnabled(level)) {
     return true;  // If the sanity level is not being checked, skip it
   }
-  const auto this_addr = reinterpret_cast<const char*>(this_ptr) + offset_;
-  const auto that_addr = reinterpret_cast<const char*>(that_ptr) + offset_;
-  if (this_addr == nullptr || that_addr == nullptr) {
-    if (this_addr == that_addr) {
+  if (this_ptr == nullptr || that_ptr == nullptr) {
+    if (this_ptr == that_ptr) {
       return true;
     }
   } else if (equals_func_ != nullptr) {
+    const void* this_addr = GetOffset(this_ptr);
+    const void* that_addr = GetOffset(that_ptr);
     if (equals_func_(config_options, opt_name, this_addr, that_addr,
                      mismatch)) {
       return true;
     }
-  } else if (AreOptionsEqual(type_, this_addr, that_addr)) {
-    return true;
-  } else if (IsConfigurable()) {
-    const auto* this_config = AsRawPointer<Configurable>(this_ptr);
-    const auto* that_config = AsRawPointer<Configurable>(that_ptr);
-    if (this_config == that_config) {
+  } else {
+    const void* this_addr = GetOffset(this_ptr);
+    const void* that_addr = GetOffset(that_ptr);
+    if (AreOptionsEqual(type_, this_addr, that_addr)) {
       return true;
-    } else if (this_config != nullptr && that_config != nullptr) {
-      std::string bad_name;
-      bool matches;
-      if (level < config_options.sanity_level) {
-        ConfigOptions copy = config_options;
-        copy.sanity_level = level;
-        matches = this_config->AreEquivalent(copy, that_config, &bad_name);
-      } else {
-        matches =
-            this_config->AreEquivalent(config_options, that_config, &bad_name);
-      }
-      if (!matches) {
-        *mismatch = opt_name + "." + bad_name;
+    } else if (IsConfigurable()) {
+      const auto* this_config = AsRawPointer<Configurable>(this_ptr);
+      const auto* that_config = AsRawPointer<Configurable>(that_ptr);
+      if (this_config == that_config) {
+        return true;
+      } else if (this_config != nullptr && that_config != nullptr) {
+        std::string bad_name;
+        bool matches;
+        if (level < config_options.sanity_level) {
+          ConfigOptions copy = config_options;
+          copy.sanity_level = level;
+          matches = this_config->AreEquivalent(copy, that_config, &bad_name);
+        } else {
+          matches = this_config->AreEquivalent(config_options, that_config,
+                                               &bad_name);
+        }
+        if (!matches) {
+          *mismatch = opt_name + "." + bad_name;
+        }
+        return matches;
       }
-      return matches;
     }
   }
   if (mismatch->empty()) {
@@ -1257,25 +1296,35 @@ bool OptionTypeInfo::AreEqual(const ConfigOptions& config_options,
   return false;
 }
 
+bool OptionTypeInfo::TypesAreEqual(
+    const ConfigOptions& config_options,
+    const std::unordered_map<std::string, OptionTypeInfo>& type_map,
+    const void* this_addr, const void* that_addr, std::string* mismatch) {
+  for (const auto& iter : type_map) {
+    const auto& opt_info = iter.second;
+    if (!opt_info.AreEqual(config_options, iter.first, this_addr, that_addr,
+                           mismatch)) {
+      return false;
+    }
+  }
+  return true;
+}
+
 bool OptionTypeInfo::StructsAreEqual(
     const ConfigOptions& config_options, const std::string& struct_name,
     const std::unordered_map<std::string, OptionTypeInfo>* struct_map,
-    const std::string& opt_name, const char* this_addr, const char* that_addr,
+    const std::string& opt_name, const void* this_addr, const void* that_addr,
     std::string* mismatch) {
   assert(struct_map);
   bool matches = true;
   std::string result;
   if (EndsWith(opt_name, struct_name)) {
     // This option represents the entire struct
-    for (const auto& iter : *struct_map) {
-      const auto& opt_info = iter.second;
-
-      matches = opt_info.AreEqual(config_options, iter.first, this_addr,
-                                  that_addr, &result);
-      if (!matches) {
-        *mismatch = struct_name + "." + result;
-        return false;
-      }
+    matches = TypesAreEqual(config_options, *struct_map, this_addr, that_addr,
+                            &result);
+    if (!matches) {
+      *mismatch = struct_name + "." + result;
+      return false;
     }
   } else if (StartsWith(opt_name, struct_name + ".")) {
     // This option represents a nested field in the struct (e.g, struct.field)
@@ -1362,6 +1411,44 @@ bool OptionTypeInfo::AreEqualByName(const ConfigOptions& config_options,
   return (this_value == that_value);
 }
 
+Status OptionTypeInfo::Prepare(const ConfigOptions& config_options,
+                               const std::string& name, void* opt_ptr) const {
+  if (ShouldPrepare()) {
+    if (prepare_func_ != nullptr) {
+      void* opt_addr = GetOffset(opt_ptr);
+      return prepare_func_(config_options, name, opt_addr);
+    } else if (IsConfigurable()) {
+      Configurable* config = AsRawPointer<Configurable>(opt_ptr);
+      if (config != nullptr) {
+        return config->PrepareOptions(config_options);
+      } else if (!CanBeNull()) {
+        return Status::NotFound("Missing configurable object", name);
+      }
+    }
+  }
+  return Status::OK();
+}
+
+Status OptionTypeInfo::Validate(const DBOptions& db_opts,
+                                const ColumnFamilyOptions& cf_opts,
+                                const std::string& name,
+                                const void* opt_ptr) const {
+  if (ShouldValidate()) {
+    if (validate_func_ != nullptr) {
+      const void* opt_addr = GetOffset(opt_ptr);
+      return validate_func_(db_opts, cf_opts, name, opt_addr);
+    } else if (IsConfigurable()) {
+      const Configurable* config = AsRawPointer<Configurable>(opt_ptr);
+      if (config != nullptr) {
+        return config->ValidateOptions(db_opts, cf_opts);
+      } else if (!CanBeNull()) {
+        return Status::NotFound("Missing configurable object", name);
+      }
+    }
+  }
+  return Status::OK();
+}
+
 const OptionTypeInfo* OptionTypeInfo::Find(
     const std::string& opt_name,
     const std::unordered_map<std::string, OptionTypeInfo>& opt_map,