1 // Copyright (c) Meta Platforms, Inc. and affiliates.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
7 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8 // Use of this source code is governed by a BSD-style license that can be
9 // found in the LICENSE file. See the AUTHORS file for names of contributors.
11 #include "db/compaction/compaction_job.h"
12 #include "db/compaction/compaction_state.h"
13 #include "logging/logging.h"
14 #include "monitoring/iostats_context_imp.h"
15 #include "monitoring/thread_status_util.h"
16 #include "options/options_helper.h"
17 #include "rocksdb/utilities/options_type.h"
20 namespace ROCKSDB_NAMESPACE
{
21 class SubcompactionState
;
23 CompactionServiceJobStatus
24 CompactionJob::ProcessKeyValueCompactionWithCompactionService(
25 SubcompactionState
* sub_compact
) {
27 assert(sub_compact
->compaction
);
28 assert(db_options_
.compaction_service
);
30 const Compaction
* compaction
= sub_compact
->compaction
;
31 CompactionServiceInput compaction_input
;
32 compaction_input
.output_level
= compaction
->output_level();
33 compaction_input
.db_id
= db_id_
;
35 const std::vector
<CompactionInputFiles
>& inputs
=
36 *(compact_
->compaction
->inputs());
37 for (const auto& files_per_level
: inputs
) {
38 for (const auto& file
: files_per_level
.files
) {
39 compaction_input
.input_files
.emplace_back(
40 MakeTableFileName(file
->fd
.GetNumber()));
43 compaction_input
.column_family
.name
=
44 compaction
->column_family_data()->GetName();
45 compaction_input
.column_family
.options
=
46 compaction
->column_family_data()->GetLatestCFOptions();
47 compaction_input
.db_options
=
48 BuildDBOptions(db_options_
, mutable_db_options_copy_
);
49 compaction_input
.snapshots
= existing_snapshots_
;
50 compaction_input
.has_begin
= sub_compact
->start
.has_value();
51 compaction_input
.begin
=
52 compaction_input
.has_begin
? sub_compact
->start
->ToString() : "";
53 compaction_input
.has_end
= sub_compact
->end
.has_value();
54 compaction_input
.end
=
55 compaction_input
.has_end
? sub_compact
->end
->ToString() : "";
57 std::string compaction_input_binary
;
58 Status s
= compaction_input
.Write(&compaction_input_binary
);
60 sub_compact
->status
= s
;
61 return CompactionServiceJobStatus::kFailure
;
64 std::ostringstream input_files_oss
;
65 bool is_first_one
= true;
66 for (const auto& file
: compaction_input
.input_files
) {
67 input_files_oss
<< (is_first_one
? "" : ", ") << file
;
73 "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
74 compaction_input
.column_family
.name
.c_str(), job_id_
,
75 compaction_input
.output_level
, input_files_oss
.str().c_str());
76 CompactionServiceJobInfo
info(dbname_
, db_id_
, db_session_id_
,
77 GetCompactionId(sub_compact
), thread_pri_
);
78 CompactionServiceJobStatus compaction_status
=
79 db_options_
.compaction_service
->StartV2(info
, compaction_input_binary
);
80 switch (compaction_status
) {
81 case CompactionServiceJobStatus::kSuccess
:
83 case CompactionServiceJobStatus::kFailure
:
84 sub_compact
->status
= Status::Incomplete(
85 "CompactionService failed to start compaction job.");
86 ROCKS_LOG_WARN(db_options_
.info_log
,
87 "[%s] [JOB %d] Remote compaction failed to start.",
88 compaction_input
.column_family
.name
.c_str(), job_id_
);
89 return compaction_status
;
90 case CompactionServiceJobStatus::kUseLocal
:
93 "[%s] [JOB %d] Remote compaction fallback to local by API Start.",
94 compaction_input
.column_family
.name
.c_str(), job_id_
);
95 return compaction_status
;
97 assert(false); // unknown status
101 ROCKS_LOG_INFO(db_options_
.info_log
,
102 "[%s] [JOB %d] Waiting for remote compaction...",
103 compaction_input
.column_family
.name
.c_str(), job_id_
);
104 std::string compaction_result_binary
;
105 compaction_status
= db_options_
.compaction_service
->WaitForCompleteV2(
106 info
, &compaction_result_binary
);
108 if (compaction_status
== CompactionServiceJobStatus::kUseLocal
) {
109 ROCKS_LOG_INFO(db_options_
.info_log
,
110 "[%s] [JOB %d] Remote compaction fallback to local by API "
112 compaction_input
.column_family
.name
.c_str(), job_id_
);
113 return compaction_status
;
116 CompactionServiceResult compaction_result
;
117 s
= CompactionServiceResult::Read(compaction_result_binary
,
120 if (compaction_status
== CompactionServiceJobStatus::kFailure
) {
122 if (compaction_result
.status
.ok()) {
123 sub_compact
->status
= Status::Incomplete(
124 "CompactionService failed to run the compaction job (even though "
125 "the internal status is okay).");
127 // set the current sub compaction status with the status returned from
129 sub_compact
->status
= compaction_result
.status
;
132 sub_compact
->status
= Status::Incomplete(
133 "CompactionService failed to run the compaction job (and no valid "
134 "result is returned).");
135 compaction_result
.status
.PermitUncheckedError();
137 ROCKS_LOG_WARN(db_options_
.info_log
,
138 "[%s] [JOB %d] Remote compaction failed.",
139 compaction_input
.column_family
.name
.c_str(), job_id_
);
140 return compaction_status
;
144 sub_compact
->status
= s
;
145 compaction_result
.status
.PermitUncheckedError();
146 return CompactionServiceJobStatus::kFailure
;
148 sub_compact
->status
= compaction_result
.status
;
150 std::ostringstream output_files_oss
;
152 for (const auto& file
: compaction_result
.output_files
) {
153 output_files_oss
<< (is_first_one
? "" : ", ") << file
.file_name
;
154 is_first_one
= false;
157 ROCKS_LOG_INFO(db_options_
.info_log
,
158 "[%s] [JOB %d] Receive remote compaction result, output path: "
160 compaction_input
.column_family
.name
.c_str(), job_id_
,
161 compaction_result
.output_path
.c_str(),
162 output_files_oss
.str().c_str());
165 sub_compact
->status
= s
;
166 return CompactionServiceJobStatus::kFailure
;
169 for (const auto& file
: compaction_result
.output_files
) {
170 uint64_t file_num
= versions_
->NewFileNumber();
171 auto src_file
= compaction_result
.output_path
+ "/" + file
.file_name
;
172 auto tgt_file
= TableFileName(compaction
->immutable_options()->cf_paths
,
173 file_num
, compaction
->output_path_id());
174 s
= fs_
->RenameFile(src_file
, tgt_file
, IOOptions(), nullptr);
176 sub_compact
->status
= s
;
177 return CompactionServiceJobStatus::kFailure
;
182 s
= fs_
->GetFileSize(tgt_file
, IOOptions(), &file_size
, nullptr);
184 sub_compact
->status
= s
;
185 return CompactionServiceJobStatus::kFailure
;
187 meta
.fd
= FileDescriptor(file_num
, compaction
->output_path_id(), file_size
,
188 file
.smallest_seqno
, file
.largest_seqno
);
189 meta
.smallest
.DecodeFrom(file
.smallest_internal_key
);
190 meta
.largest
.DecodeFrom(file
.largest_internal_key
);
191 meta
.oldest_ancester_time
= file
.oldest_ancester_time
;
192 meta
.file_creation_time
= file
.file_creation_time
;
193 meta
.marked_for_compaction
= file
.marked_for_compaction
;
194 meta
.unique_id
= file
.unique_id
;
196 auto cfd
= compaction
->column_family_data();
197 sub_compact
->Current().AddOutput(std::move(meta
),
198 cfd
->internal_comparator(), false, false,
199 true, file
.paranoid_hash
);
201 sub_compact
->compaction_job_stats
= compaction_result
.stats
;
202 sub_compact
->Current().SetNumOutputRecords(
203 compaction_result
.num_output_records
);
204 sub_compact
->Current().SetTotalBytes(compaction_result
.total_bytes
);
205 RecordTick(stats_
, REMOTE_COMPACT_READ_BYTES
, compaction_result
.bytes_read
);
206 RecordTick(stats_
, REMOTE_COMPACT_WRITE_BYTES
,
207 compaction_result
.bytes_written
);
208 return CompactionServiceJobStatus::kSuccess
;
211 std::string
CompactionServiceCompactionJob::GetTableFileName(
212 uint64_t file_number
) {
213 return MakeTableFileName(output_path_
, file_number
);
216 void CompactionServiceCompactionJob::RecordCompactionIOStats() {
217 compaction_result_
->bytes_read
+= IOSTATS(bytes_read
);
218 compaction_result_
->bytes_written
+= IOSTATS(bytes_written
);
219 CompactionJob::RecordCompactionIOStats();
222 CompactionServiceCompactionJob::CompactionServiceCompactionJob(
223 int job_id
, Compaction
* compaction
, const ImmutableDBOptions
& db_options
,
224 const MutableDBOptions
& mutable_db_options
, const FileOptions
& file_options
,
225 VersionSet
* versions
, const std::atomic
<bool>* shutting_down
,
226 LogBuffer
* log_buffer
, FSDirectory
* output_directory
, Statistics
* stats
,
227 InstrumentedMutex
* db_mutex
, ErrorHandler
* db_error_handler
,
228 std::vector
<SequenceNumber
> existing_snapshots
,
229 std::shared_ptr
<Cache
> table_cache
, EventLogger
* event_logger
,
230 const std::string
& dbname
, const std::shared_ptr
<IOTracer
>& io_tracer
,
231 const std::atomic
<bool>& manual_compaction_canceled
,
232 const std::string
& db_id
, const std::string
& db_session_id
,
233 std::string output_path
,
234 const CompactionServiceInput
& compaction_service_input
,
235 CompactionServiceResult
* compaction_service_result
)
237 job_id
, compaction
, db_options
, mutable_db_options
, file_options
,
238 versions
, shutting_down
, log_buffer
, nullptr, output_directory
,
239 nullptr, stats
, db_mutex
, db_error_handler
,
240 std::move(existing_snapshots
), kMaxSequenceNumber
, nullptr, nullptr,
241 std::move(table_cache
), event_logger
,
242 compaction
->mutable_cf_options()->paranoid_file_checks
,
243 compaction
->mutable_cf_options()->report_bg_io_stats
, dbname
,
244 &(compaction_service_result
->stats
), Env::Priority::USER
, io_tracer
,
245 manual_compaction_canceled
, db_id
, db_session_id
,
246 compaction
->column_family_data()->GetFullHistoryTsLow()),
247 output_path_(std::move(output_path
)),
248 compaction_input_(compaction_service_input
),
249 compaction_result_(compaction_service_result
) {}
251 Status
CompactionServiceCompactionJob::Run() {
252 AutoThreadOperationStageUpdater
stage_updater(
253 ThreadStatus::STAGE_COMPACTION_RUN
);
255 auto* c
= compact_
->compaction
;
256 assert(c
->column_family_data() != nullptr);
257 assert(c
->column_family_data()->current()->storage_info()->NumLevelFiles(
258 compact_
->compaction
->level()) > 0);
261 c
->column_family_data()->CalculateSSTWriteHint(c
->output_level());
262 bottommost_level_
= c
->bottommost_level();
264 Slice begin
= compaction_input_
.begin
;
265 Slice end
= compaction_input_
.end
;
266 compact_
->sub_compact_states
.emplace_back(
268 compaction_input_
.has_begin
? std::optional
<Slice
>(begin
)
269 : std::optional
<Slice
>(),
270 compaction_input_
.has_end
? std::optional
<Slice
>(end
)
271 : std::optional
<Slice
>(),
274 log_buffer_
->FlushBufferToLog();
276 const uint64_t start_micros
= db_options_
.clock
->NowMicros();
277 // Pick the only sub-compaction we should have
278 assert(compact_
->sub_compact_states
.size() == 1);
279 SubcompactionState
* sub_compact
= compact_
->sub_compact_states
.data();
281 ProcessKeyValueCompaction(sub_compact
);
283 compaction_stats_
.stats
.micros
=
284 db_options_
.clock
->NowMicros() - start_micros
;
285 compaction_stats_
.stats
.cpu_micros
=
286 sub_compact
->compaction_job_stats
.cpu_micros
;
288 RecordTimeToHistogram(stats_
, COMPACTION_TIME
,
289 compaction_stats_
.stats
.micros
);
290 RecordTimeToHistogram(stats_
, COMPACTION_CPU_TIME
,
291 compaction_stats_
.stats
.cpu_micros
);
293 Status status
= sub_compact
->status
;
294 IOStatus io_s
= sub_compact
->io_status
;
296 if (io_status_
.ok()) {
301 constexpr IODebugContext
* dbg
= nullptr;
303 if (output_directory_
) {
304 io_s
= output_directory_
->FsyncWithDirOptions(IOOptions(), dbg
,
308 if (io_status_
.ok()) {
315 // TODO: Add verify_table()
318 // Finish up all book-keeping to unify the subcompaction results
319 compact_
->AggregateCompactionStats(compaction_stats_
, *compaction_job_stats_
);
320 UpdateCompactionStats();
321 RecordCompactionIOStats();
323 LogFlush(db_options_
.info_log
);
324 compact_
->status
= status
;
325 compact_
->status
.PermitUncheckedError();
327 // Build compaction result
328 compaction_result_
->output_level
= compact_
->compaction
->output_level();
329 compaction_result_
->output_path
= output_path_
;
330 for (const auto& output_file
: sub_compact
->GetOutputs()) {
331 auto& meta
= output_file
.meta
;
332 compaction_result_
->output_files
.emplace_back(
333 MakeTableFileName(meta
.fd
.GetNumber()), meta
.fd
.smallest_seqno
,
334 meta
.fd
.largest_seqno
, meta
.smallest
.Encode().ToString(),
335 meta
.largest
.Encode().ToString(), meta
.oldest_ancester_time
,
336 meta
.file_creation_time
, output_file
.validator
.GetHash(),
337 meta
.marked_for_compaction
, meta
.unique_id
);
339 InternalStats::CompactionStatsFull compaction_stats
;
340 sub_compact
->AggregateCompactionStats(compaction_stats
);
341 compaction_result_
->num_output_records
=
342 compaction_stats
.stats
.num_output_records
;
343 compaction_result_
->total_bytes
= compaction_stats
.TotalBytesWritten();
348 void CompactionServiceCompactionJob::CleanupCompaction() {
349 CompactionJob::CleanupCompaction();
352 // Internal binary format for the input and result data
353 enum BinaryFormatVersion
: uint32_t {
354 kOptionsString
= 1, // Use string format similar to Option string format
357 static std::unordered_map
<std::string
, OptionTypeInfo
> cfd_type_info
= {
359 {offsetof(struct ColumnFamilyDescriptor
, name
), OptionType::kEncodedString
,
360 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
362 {offsetof(struct ColumnFamilyDescriptor
, options
),
363 OptionType::kConfigurable
, OptionVerificationType::kNormal
,
364 OptionTypeFlags::kNone
,
365 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
366 const std::string
& value
, void* addr
) {
367 auto cf_options
= static_cast<ColumnFamilyOptions
*>(addr
);
368 return GetColumnFamilyOptionsFromString(opts
, ColumnFamilyOptions(),
371 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
372 const void* addr
, std::string
* value
) {
373 const auto cf_options
= static_cast<const ColumnFamilyOptions
*>(addr
);
376 GetStringFromColumnFamilyOptions(opts
, *cf_options
, &result
);
377 *value
= "{" + result
+ "}";
380 [](const ConfigOptions
& opts
, const std::string
& name
, const void* addr1
,
381 const void* addr2
, std::string
* mismatch
) {
382 const auto this_one
= static_cast<const ColumnFamilyOptions
*>(addr1
);
383 const auto that_one
= static_cast<const ColumnFamilyOptions
*>(addr2
);
384 auto this_conf
= CFOptionsAsConfigurable(*this_one
);
385 auto that_conf
= CFOptionsAsConfigurable(*that_one
);
386 std::string mismatch_opt
;
388 this_conf
->AreEquivalent(opts
, that_conf
.get(), &mismatch_opt
);
390 *mismatch
= name
+ "." + mismatch_opt
;
396 static std::unordered_map
<std::string
, OptionTypeInfo
> cs_input_type_info
= {
398 OptionTypeInfo::Struct(
399 "column_family", &cfd_type_info
,
400 offsetof(struct CompactionServiceInput
, column_family
),
401 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
)},
403 {offsetof(struct CompactionServiceInput
, db_options
),
404 OptionType::kConfigurable
, OptionVerificationType::kNormal
,
405 OptionTypeFlags::kNone
,
406 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
407 const std::string
& value
, void* addr
) {
408 auto options
= static_cast<DBOptions
*>(addr
);
409 return GetDBOptionsFromString(opts
, DBOptions(), value
, options
);
411 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
412 const void* addr
, std::string
* value
) {
413 const auto options
= static_cast<const DBOptions
*>(addr
);
415 auto status
= GetStringFromDBOptions(opts
, *options
, &result
);
416 *value
= "{" + result
+ "}";
419 [](const ConfigOptions
& opts
, const std::string
& name
, const void* addr1
,
420 const void* addr2
, std::string
* mismatch
) {
421 const auto this_one
= static_cast<const DBOptions
*>(addr1
);
422 const auto that_one
= static_cast<const DBOptions
*>(addr2
);
423 auto this_conf
= DBOptionsAsConfigurable(*this_one
);
424 auto that_conf
= DBOptionsAsConfigurable(*that_one
);
425 std::string mismatch_opt
;
427 this_conf
->AreEquivalent(opts
, that_conf
.get(), &mismatch_opt
);
429 *mismatch
= name
+ "." + mismatch_opt
;
433 {"snapshots", OptionTypeInfo::Vector
<uint64_t>(
434 offsetof(struct CompactionServiceInput
, snapshots
),
435 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
,
436 {0, OptionType::kUInt64T
})},
437 {"input_files", OptionTypeInfo::Vector
<std::string
>(
438 offsetof(struct CompactionServiceInput
, input_files
),
439 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
,
440 {0, OptionType::kEncodedString
})},
442 {offsetof(struct CompactionServiceInput
, output_level
), OptionType::kInt
,
443 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
445 {offsetof(struct CompactionServiceInput
, db_id
),
446 OptionType::kEncodedString
}},
448 {offsetof(struct CompactionServiceInput
, has_begin
), OptionType::kBoolean
,
449 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
451 {offsetof(struct CompactionServiceInput
, begin
),
452 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
453 OptionTypeFlags::kNone
}},
455 {offsetof(struct CompactionServiceInput
, has_end
), OptionType::kBoolean
,
456 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
458 {offsetof(struct CompactionServiceInput
, end
), OptionType::kEncodedString
,
459 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
462 static std::unordered_map
<std::string
, OptionTypeInfo
>
463 cs_output_file_type_info
= {
465 {offsetof(struct CompactionServiceOutputFile
, file_name
),
466 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
467 OptionTypeFlags::kNone
}},
469 {offsetof(struct CompactionServiceOutputFile
, smallest_seqno
),
470 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
471 OptionTypeFlags::kNone
}},
473 {offsetof(struct CompactionServiceOutputFile
, largest_seqno
),
474 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
475 OptionTypeFlags::kNone
}},
476 {"smallest_internal_key",
477 {offsetof(struct CompactionServiceOutputFile
, smallest_internal_key
),
478 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
479 OptionTypeFlags::kNone
}},
480 {"largest_internal_key",
481 {offsetof(struct CompactionServiceOutputFile
, largest_internal_key
),
482 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
483 OptionTypeFlags::kNone
}},
484 {"oldest_ancester_time",
485 {offsetof(struct CompactionServiceOutputFile
, oldest_ancester_time
),
486 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
487 OptionTypeFlags::kNone
}},
488 {"file_creation_time",
489 {offsetof(struct CompactionServiceOutputFile
, file_creation_time
),
490 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
491 OptionTypeFlags::kNone
}},
493 {offsetof(struct CompactionServiceOutputFile
, paranoid_hash
),
494 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
495 OptionTypeFlags::kNone
}},
496 {"marked_for_compaction",
497 {offsetof(struct CompactionServiceOutputFile
, marked_for_compaction
),
498 OptionType::kBoolean
, OptionVerificationType::kNormal
,
499 OptionTypeFlags::kNone
}},
501 OptionTypeInfo::Array
<uint64_t, 2>(
502 offsetof(struct CompactionServiceOutputFile
, unique_id
),
503 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
,
504 {0, OptionType::kUInt64T
})},
507 static std::unordered_map
<std::string
, OptionTypeInfo
>
508 compaction_job_stats_type_info
= {
510 {offsetof(struct CompactionJobStats
, elapsed_micros
),
511 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
512 OptionTypeFlags::kNone
}},
514 {offsetof(struct CompactionJobStats
, cpu_micros
), OptionType::kUInt64T
,
515 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
516 {"num_input_records",
517 {offsetof(struct CompactionJobStats
, num_input_records
),
518 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
519 OptionTypeFlags::kNone
}},
521 {offsetof(struct CompactionJobStats
, num_blobs_read
),
522 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
523 OptionTypeFlags::kNone
}},
525 {offsetof(struct CompactionJobStats
, num_input_files
),
526 OptionType::kSizeT
, OptionVerificationType::kNormal
,
527 OptionTypeFlags::kNone
}},
528 {"num_input_files_at_output_level",
529 {offsetof(struct CompactionJobStats
, num_input_files_at_output_level
),
530 OptionType::kSizeT
, OptionVerificationType::kNormal
,
531 OptionTypeFlags::kNone
}},
532 {"num_output_records",
533 {offsetof(struct CompactionJobStats
, num_output_records
),
534 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
535 OptionTypeFlags::kNone
}},
537 {offsetof(struct CompactionJobStats
, num_output_files
),
538 OptionType::kSizeT
, OptionVerificationType::kNormal
,
539 OptionTypeFlags::kNone
}},
540 {"num_output_files_blob",
541 {offsetof(struct CompactionJobStats
, num_output_files_blob
),
542 OptionType::kSizeT
, OptionVerificationType::kNormal
,
543 OptionTypeFlags::kNone
}},
544 {"is_full_compaction",
545 {offsetof(struct CompactionJobStats
, is_full_compaction
),
546 OptionType::kBoolean
, OptionVerificationType::kNormal
,
547 OptionTypeFlags::kNone
}},
548 {"is_manual_compaction",
549 {offsetof(struct CompactionJobStats
, is_manual_compaction
),
550 OptionType::kBoolean
, OptionVerificationType::kNormal
,
551 OptionTypeFlags::kNone
}},
552 {"total_input_bytes",
553 {offsetof(struct CompactionJobStats
, total_input_bytes
),
554 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
555 OptionTypeFlags::kNone
}},
556 {"total_blob_bytes_read",
557 {offsetof(struct CompactionJobStats
, total_blob_bytes_read
),
558 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
559 OptionTypeFlags::kNone
}},
560 {"total_output_bytes",
561 {offsetof(struct CompactionJobStats
, total_output_bytes
),
562 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
563 OptionTypeFlags::kNone
}},
564 {"total_output_bytes_blob",
565 {offsetof(struct CompactionJobStats
, total_output_bytes_blob
),
566 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
567 OptionTypeFlags::kNone
}},
568 {"num_records_replaced",
569 {offsetof(struct CompactionJobStats
, num_records_replaced
),
570 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
571 OptionTypeFlags::kNone
}},
572 {"total_input_raw_key_bytes",
573 {offsetof(struct CompactionJobStats
, total_input_raw_key_bytes
),
574 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
575 OptionTypeFlags::kNone
}},
576 {"total_input_raw_value_bytes",
577 {offsetof(struct CompactionJobStats
, total_input_raw_value_bytes
),
578 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
579 OptionTypeFlags::kNone
}},
580 {"num_input_deletion_records",
581 {offsetof(struct CompactionJobStats
, num_input_deletion_records
),
582 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
583 OptionTypeFlags::kNone
}},
584 {"num_expired_deletion_records",
585 {offsetof(struct CompactionJobStats
, num_expired_deletion_records
),
586 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
587 OptionTypeFlags::kNone
}},
589 {offsetof(struct CompactionJobStats
, num_corrupt_keys
),
590 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
591 OptionTypeFlags::kNone
}},
593 {offsetof(struct CompactionJobStats
, file_write_nanos
),
594 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
595 OptionTypeFlags::kNone
}},
596 {"file_range_sync_nanos",
597 {offsetof(struct CompactionJobStats
, file_range_sync_nanos
),
598 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
599 OptionTypeFlags::kNone
}},
601 {offsetof(struct CompactionJobStats
, file_fsync_nanos
),
602 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
603 OptionTypeFlags::kNone
}},
604 {"file_prepare_write_nanos",
605 {offsetof(struct CompactionJobStats
, file_prepare_write_nanos
),
606 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
607 OptionTypeFlags::kNone
}},
608 {"smallest_output_key_prefix",
609 {offsetof(struct CompactionJobStats
, smallest_output_key_prefix
),
610 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
611 OptionTypeFlags::kNone
}},
612 {"largest_output_key_prefix",
613 {offsetof(struct CompactionJobStats
, largest_output_key_prefix
),
614 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
615 OptionTypeFlags::kNone
}},
616 {"num_single_del_fallthru",
617 {offsetof(struct CompactionJobStats
, num_single_del_fallthru
),
618 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
619 OptionTypeFlags::kNone
}},
620 {"num_single_del_mismatch",
621 {offsetof(struct CompactionJobStats
, num_single_del_mismatch
),
622 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
623 OptionTypeFlags::kNone
}},
627 // this is a helper struct to serialize and deserialize class Status, because
628 // Status's members are not public.
629 struct StatusSerializationAdapter
{
635 StatusSerializationAdapter() = default;
636 explicit StatusSerializationAdapter(const Status
& s
) {
638 subcode
= s
.subcode();
639 severity
= s
.severity();
640 auto msg
= s
.getState();
641 message
= msg
? msg
: "";
644 Status
GetStatus() const {
645 return Status
{static_cast<Status::Code
>(code
),
646 static_cast<Status::SubCode
>(subcode
),
647 static_cast<Status::Severity
>(severity
), message
};
652 static std::unordered_map
<std::string
, OptionTypeInfo
>
653 status_adapter_type_info
= {
655 {offsetof(struct StatusSerializationAdapter
, code
),
656 OptionType::kUInt8T
, OptionVerificationType::kNormal
,
657 OptionTypeFlags::kNone
}},
659 {offsetof(struct StatusSerializationAdapter
, subcode
),
660 OptionType::kUInt8T
, OptionVerificationType::kNormal
,
661 OptionTypeFlags::kNone
}},
663 {offsetof(struct StatusSerializationAdapter
, severity
),
664 OptionType::kUInt8T
, OptionVerificationType::kNormal
,
665 OptionTypeFlags::kNone
}},
667 {offsetof(struct StatusSerializationAdapter
, message
),
668 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
669 OptionTypeFlags::kNone
}},
672 static std::unordered_map
<std::string
, OptionTypeInfo
> cs_result_type_info
= {
674 {offsetof(struct CompactionServiceResult
, status
),
675 OptionType::kCustomizable
, OptionVerificationType::kNormal
,
676 OptionTypeFlags::kNone
,
677 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
678 const std::string
& value
, void* addr
) {
679 auto status_obj
= static_cast<Status
*>(addr
);
680 StatusSerializationAdapter adapter
;
681 Status s
= OptionTypeInfo::ParseType(
682 opts
, value
, status_adapter_type_info
, &adapter
);
683 *status_obj
= adapter
.GetStatus();
686 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
687 const void* addr
, std::string
* value
) {
688 const auto status_obj
= static_cast<const Status
*>(addr
);
689 StatusSerializationAdapter
adapter(*status_obj
);
691 Status s
= OptionTypeInfo::SerializeType(opts
, status_adapter_type_info
,
693 *value
= "{" + result
+ "}";
696 [](const ConfigOptions
& opts
, const std::string
& /*name*/,
697 const void* addr1
, const void* addr2
, std::string
* mismatch
) {
698 const auto status1
= static_cast<const Status
*>(addr1
);
699 const auto status2
= static_cast<const Status
*>(addr2
);
701 StatusSerializationAdapter
adatper1(*status1
);
702 StatusSerializationAdapter
adapter2(*status2
);
703 return OptionTypeInfo::TypesAreEqual(opts
, status_adapter_type_info
,
704 &adatper1
, &adapter2
, mismatch
);
707 OptionTypeInfo::Vector
<CompactionServiceOutputFile
>(
708 offsetof(struct CompactionServiceResult
, output_files
),
709 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
,
710 OptionTypeInfo::Struct("output_files", &cs_output_file_type_info
, 0,
711 OptionVerificationType::kNormal
,
712 OptionTypeFlags::kNone
))},
714 {offsetof(struct CompactionServiceResult
, output_level
), OptionType::kInt
,
715 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
}},
717 {offsetof(struct CompactionServiceResult
, output_path
),
718 OptionType::kEncodedString
, OptionVerificationType::kNormal
,
719 OptionTypeFlags::kNone
}},
720 {"num_output_records",
721 {offsetof(struct CompactionServiceResult
, num_output_records
),
722 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
723 OptionTypeFlags::kNone
}},
725 {offsetof(struct CompactionServiceResult
, total_bytes
),
726 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
727 OptionTypeFlags::kNone
}},
729 {offsetof(struct CompactionServiceResult
, bytes_read
),
730 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
731 OptionTypeFlags::kNone
}},
733 {offsetof(struct CompactionServiceResult
, bytes_written
),
734 OptionType::kUInt64T
, OptionVerificationType::kNormal
,
735 OptionTypeFlags::kNone
}},
736 {"stats", OptionTypeInfo::Struct(
737 "stats", &compaction_job_stats_type_info
,
738 offsetof(struct CompactionServiceResult
, stats
),
739 OptionVerificationType::kNormal
, OptionTypeFlags::kNone
)},
742 Status
CompactionServiceInput::Read(const std::string
& data_str
,
743 CompactionServiceInput
* obj
) {
744 if (data_str
.size() <= sizeof(BinaryFormatVersion
)) {
745 return Status::InvalidArgument("Invalid CompactionServiceInput string");
747 auto format_version
= DecodeFixed32(data_str
.data());
748 if (format_version
== kOptionsString
) {
750 cf
.invoke_prepare_options
= false;
751 cf
.ignore_unknown_options
= true;
752 return OptionTypeInfo::ParseType(
753 cf
, data_str
.substr(sizeof(BinaryFormatVersion
)), cs_input_type_info
,
756 return Status::NotSupported(
757 "Compaction Service Input data version not supported: " +
758 std::to_string(format_version
));
762 Status
CompactionServiceInput::Write(std::string
* output
) {
763 char buf
[sizeof(BinaryFormatVersion
)];
764 EncodeFixed32(buf
, kOptionsString
);
765 output
->append(buf
, sizeof(BinaryFormatVersion
));
767 cf
.invoke_prepare_options
= false;
768 return OptionTypeInfo::SerializeType(cf
, cs_input_type_info
, this, output
);
771 Status
CompactionServiceResult::Read(const std::string
& data_str
,
772 CompactionServiceResult
* obj
) {
773 if (data_str
.size() <= sizeof(BinaryFormatVersion
)) {
774 return Status::InvalidArgument("Invalid CompactionServiceResult string");
776 auto format_version
= DecodeFixed32(data_str
.data());
777 if (format_version
== kOptionsString
) {
779 cf
.invoke_prepare_options
= false;
780 cf
.ignore_unknown_options
= true;
781 return OptionTypeInfo::ParseType(
782 cf
, data_str
.substr(sizeof(BinaryFormatVersion
)), cs_result_type_info
,
785 return Status::NotSupported(
786 "Compaction Service Result data version not supported: " +
787 std::to_string(format_version
));
791 Status
CompactionServiceResult::Write(std::string
* output
) {
792 char buf
[sizeof(BinaryFormatVersion
)];
793 EncodeFixed32(buf
, kOptionsString
);
794 output
->append(buf
, sizeof(BinaryFormatVersion
));
796 cf
.invoke_prepare_options
= false;
797 return OptionTypeInfo::SerializeType(cf
, cs_result_type_info
, this, output
);
801 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult
* other
) {
802 std::string mismatch
;
803 return TEST_Equals(other
, &mismatch
);
806 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult
* other
,
807 std::string
* mismatch
) {
809 cf
.invoke_prepare_options
= false;
810 return OptionTypeInfo::TypesAreEqual(cf
, cs_result_type_info
, this, other
,
814 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput
* other
) {
815 std::string mismatch
;
816 return TEST_Equals(other
, &mismatch
);
819 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput
* other
,
820 std::string
* mismatch
) {
822 cf
.invoke_prepare_options
= false;
823 return OptionTypeInfo::TypesAreEqual(cf
, cs_input_type_info
, this, other
,
827 } // namespace ROCKSDB_NAMESPACE
829 #endif // !ROCKSDB_LITE