]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_service_job.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_service_job.cc
1 // Copyright (c) Meta Platforms, Inc. and affiliates.
2 //
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
6 //
7 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8 // Use of this source code is governed by a BSD-style license that can be
9 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10
11 #include "db/compaction/compaction_job.h"
12 #include "db/compaction/compaction_state.h"
13 #include "logging/logging.h"
14 #include "monitoring/iostats_context_imp.h"
15 #include "monitoring/thread_status_util.h"
16 #include "options/options_helper.h"
17 #include "rocksdb/utilities/options_type.h"
18
19 #ifndef ROCKSDB_LITE
20 namespace ROCKSDB_NAMESPACE {
21 class SubcompactionState;
22
23 CompactionServiceJobStatus
24 CompactionJob::ProcessKeyValueCompactionWithCompactionService(
25 SubcompactionState* sub_compact) {
26 assert(sub_compact);
27 assert(sub_compact->compaction);
28 assert(db_options_.compaction_service);
29
30 const Compaction* compaction = sub_compact->compaction;
31 CompactionServiceInput compaction_input;
32 compaction_input.output_level = compaction->output_level();
33 compaction_input.db_id = db_id_;
34
35 const std::vector<CompactionInputFiles>& inputs =
36 *(compact_->compaction->inputs());
37 for (const auto& files_per_level : inputs) {
38 for (const auto& file : files_per_level.files) {
39 compaction_input.input_files.emplace_back(
40 MakeTableFileName(file->fd.GetNumber()));
41 }
42 }
43 compaction_input.column_family.name =
44 compaction->column_family_data()->GetName();
45 compaction_input.column_family.options =
46 compaction->column_family_data()->GetLatestCFOptions();
47 compaction_input.db_options =
48 BuildDBOptions(db_options_, mutable_db_options_copy_);
49 compaction_input.snapshots = existing_snapshots_;
50 compaction_input.has_begin = sub_compact->start.has_value();
51 compaction_input.begin =
52 compaction_input.has_begin ? sub_compact->start->ToString() : "";
53 compaction_input.has_end = sub_compact->end.has_value();
54 compaction_input.end =
55 compaction_input.has_end ? sub_compact->end->ToString() : "";
56
57 std::string compaction_input_binary;
58 Status s = compaction_input.Write(&compaction_input_binary);
59 if (!s.ok()) {
60 sub_compact->status = s;
61 return CompactionServiceJobStatus::kFailure;
62 }
63
64 std::ostringstream input_files_oss;
65 bool is_first_one = true;
66 for (const auto& file : compaction_input.input_files) {
67 input_files_oss << (is_first_one ? "" : ", ") << file;
68 is_first_one = false;
69 }
70
71 ROCKS_LOG_INFO(
72 db_options_.info_log,
73 "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
74 compaction_input.column_family.name.c_str(), job_id_,
75 compaction_input.output_level, input_files_oss.str().c_str());
76 CompactionServiceJobInfo info(dbname_, db_id_, db_session_id_,
77 GetCompactionId(sub_compact), thread_pri_);
78 CompactionServiceJobStatus compaction_status =
79 db_options_.compaction_service->StartV2(info, compaction_input_binary);
80 switch (compaction_status) {
81 case CompactionServiceJobStatus::kSuccess:
82 break;
83 case CompactionServiceJobStatus::kFailure:
84 sub_compact->status = Status::Incomplete(
85 "CompactionService failed to start compaction job.");
86 ROCKS_LOG_WARN(db_options_.info_log,
87 "[%s] [JOB %d] Remote compaction failed to start.",
88 compaction_input.column_family.name.c_str(), job_id_);
89 return compaction_status;
90 case CompactionServiceJobStatus::kUseLocal:
91 ROCKS_LOG_INFO(
92 db_options_.info_log,
93 "[%s] [JOB %d] Remote compaction fallback to local by API Start.",
94 compaction_input.column_family.name.c_str(), job_id_);
95 return compaction_status;
96 default:
97 assert(false); // unknown status
98 break;
99 }
100
101 ROCKS_LOG_INFO(db_options_.info_log,
102 "[%s] [JOB %d] Waiting for remote compaction...",
103 compaction_input.column_family.name.c_str(), job_id_);
104 std::string compaction_result_binary;
105 compaction_status = db_options_.compaction_service->WaitForCompleteV2(
106 info, &compaction_result_binary);
107
108 if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
109 ROCKS_LOG_INFO(db_options_.info_log,
110 "[%s] [JOB %d] Remote compaction fallback to local by API "
111 "WaitForComplete.",
112 compaction_input.column_family.name.c_str(), job_id_);
113 return compaction_status;
114 }
115
116 CompactionServiceResult compaction_result;
117 s = CompactionServiceResult::Read(compaction_result_binary,
118 &compaction_result);
119
120 if (compaction_status == CompactionServiceJobStatus::kFailure) {
121 if (s.ok()) {
122 if (compaction_result.status.ok()) {
123 sub_compact->status = Status::Incomplete(
124 "CompactionService failed to run the compaction job (even though "
125 "the internal status is okay).");
126 } else {
127 // set the current sub compaction status with the status returned from
128 // remote
129 sub_compact->status = compaction_result.status;
130 }
131 } else {
132 sub_compact->status = Status::Incomplete(
133 "CompactionService failed to run the compaction job (and no valid "
134 "result is returned).");
135 compaction_result.status.PermitUncheckedError();
136 }
137 ROCKS_LOG_WARN(db_options_.info_log,
138 "[%s] [JOB %d] Remote compaction failed.",
139 compaction_input.column_family.name.c_str(), job_id_);
140 return compaction_status;
141 }
142
143 if (!s.ok()) {
144 sub_compact->status = s;
145 compaction_result.status.PermitUncheckedError();
146 return CompactionServiceJobStatus::kFailure;
147 }
148 sub_compact->status = compaction_result.status;
149
150 std::ostringstream output_files_oss;
151 is_first_one = true;
152 for (const auto& file : compaction_result.output_files) {
153 output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
154 is_first_one = false;
155 }
156
157 ROCKS_LOG_INFO(db_options_.info_log,
158 "[%s] [JOB %d] Receive remote compaction result, output path: "
159 "%s, files: %s",
160 compaction_input.column_family.name.c_str(), job_id_,
161 compaction_result.output_path.c_str(),
162 output_files_oss.str().c_str());
163
164 if (!s.ok()) {
165 sub_compact->status = s;
166 return CompactionServiceJobStatus::kFailure;
167 }
168
169 for (const auto& file : compaction_result.output_files) {
170 uint64_t file_num = versions_->NewFileNumber();
171 auto src_file = compaction_result.output_path + "/" + file.file_name;
172 auto tgt_file = TableFileName(compaction->immutable_options()->cf_paths,
173 file_num, compaction->output_path_id());
174 s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
175 if (!s.ok()) {
176 sub_compact->status = s;
177 return CompactionServiceJobStatus::kFailure;
178 }
179
180 FileMetaData meta;
181 uint64_t file_size;
182 s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
183 if (!s.ok()) {
184 sub_compact->status = s;
185 return CompactionServiceJobStatus::kFailure;
186 }
187 meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
188 file.smallest_seqno, file.largest_seqno);
189 meta.smallest.DecodeFrom(file.smallest_internal_key);
190 meta.largest.DecodeFrom(file.largest_internal_key);
191 meta.oldest_ancester_time = file.oldest_ancester_time;
192 meta.file_creation_time = file.file_creation_time;
193 meta.marked_for_compaction = file.marked_for_compaction;
194 meta.unique_id = file.unique_id;
195
196 auto cfd = compaction->column_family_data();
197 sub_compact->Current().AddOutput(std::move(meta),
198 cfd->internal_comparator(), false, false,
199 true, file.paranoid_hash);
200 }
201 sub_compact->compaction_job_stats = compaction_result.stats;
202 sub_compact->Current().SetNumOutputRecords(
203 compaction_result.num_output_records);
204 sub_compact->Current().SetTotalBytes(compaction_result.total_bytes);
205 RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
206 RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
207 compaction_result.bytes_written);
208 return CompactionServiceJobStatus::kSuccess;
209 }
210
211 std::string CompactionServiceCompactionJob::GetTableFileName(
212 uint64_t file_number) {
213 return MakeTableFileName(output_path_, file_number);
214 }
215
216 void CompactionServiceCompactionJob::RecordCompactionIOStats() {
217 compaction_result_->bytes_read += IOSTATS(bytes_read);
218 compaction_result_->bytes_written += IOSTATS(bytes_written);
219 CompactionJob::RecordCompactionIOStats();
220 }
221
222 CompactionServiceCompactionJob::CompactionServiceCompactionJob(
223 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
224 const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
225 VersionSet* versions, const std::atomic<bool>* shutting_down,
226 LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
227 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
228 std::vector<SequenceNumber> existing_snapshots,
229 std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
230 const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
231 const std::atomic<bool>& manual_compaction_canceled,
232 const std::string& db_id, const std::string& db_session_id,
233 std::string output_path,
234 const CompactionServiceInput& compaction_service_input,
235 CompactionServiceResult* compaction_service_result)
236 : CompactionJob(
237 job_id, compaction, db_options, mutable_db_options, file_options,
238 versions, shutting_down, log_buffer, nullptr, output_directory,
239 nullptr, stats, db_mutex, db_error_handler,
240 std::move(existing_snapshots), kMaxSequenceNumber, nullptr, nullptr,
241 std::move(table_cache), event_logger,
242 compaction->mutable_cf_options()->paranoid_file_checks,
243 compaction->mutable_cf_options()->report_bg_io_stats, dbname,
244 &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
245 manual_compaction_canceled, db_id, db_session_id,
246 compaction->column_family_data()->GetFullHistoryTsLow()),
247 output_path_(std::move(output_path)),
248 compaction_input_(compaction_service_input),
249 compaction_result_(compaction_service_result) {}
250
251 Status CompactionServiceCompactionJob::Run() {
252 AutoThreadOperationStageUpdater stage_updater(
253 ThreadStatus::STAGE_COMPACTION_RUN);
254
255 auto* c = compact_->compaction;
256 assert(c->column_family_data() != nullptr);
257 assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
258 compact_->compaction->level()) > 0);
259
260 write_hint_ =
261 c->column_family_data()->CalculateSSTWriteHint(c->output_level());
262 bottommost_level_ = c->bottommost_level();
263
264 Slice begin = compaction_input_.begin;
265 Slice end = compaction_input_.end;
266 compact_->sub_compact_states.emplace_back(
267 c,
268 compaction_input_.has_begin ? std::optional<Slice>(begin)
269 : std::optional<Slice>(),
270 compaction_input_.has_end ? std::optional<Slice>(end)
271 : std::optional<Slice>(),
272 /*sub_job_id*/ 0);
273
274 log_buffer_->FlushBufferToLog();
275 LogCompaction();
276 const uint64_t start_micros = db_options_.clock->NowMicros();
277 // Pick the only sub-compaction we should have
278 assert(compact_->sub_compact_states.size() == 1);
279 SubcompactionState* sub_compact = compact_->sub_compact_states.data();
280
281 ProcessKeyValueCompaction(sub_compact);
282
283 compaction_stats_.stats.micros =
284 db_options_.clock->NowMicros() - start_micros;
285 compaction_stats_.stats.cpu_micros =
286 sub_compact->compaction_job_stats.cpu_micros;
287
288 RecordTimeToHistogram(stats_, COMPACTION_TIME,
289 compaction_stats_.stats.micros);
290 RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
291 compaction_stats_.stats.cpu_micros);
292
293 Status status = sub_compact->status;
294 IOStatus io_s = sub_compact->io_status;
295
296 if (io_status_.ok()) {
297 io_status_ = io_s;
298 }
299
300 if (status.ok()) {
301 constexpr IODebugContext* dbg = nullptr;
302
303 if (output_directory_) {
304 io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
305 DirFsyncOptions());
306 }
307 }
308 if (io_status_.ok()) {
309 io_status_ = io_s;
310 }
311 if (status.ok()) {
312 status = io_s;
313 }
314 if (status.ok()) {
315 // TODO: Add verify_table()
316 }
317
318 // Finish up all book-keeping to unify the subcompaction results
319 compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
320 UpdateCompactionStats();
321 RecordCompactionIOStats();
322
323 LogFlush(db_options_.info_log);
324 compact_->status = status;
325 compact_->status.PermitUncheckedError();
326
327 // Build compaction result
328 compaction_result_->output_level = compact_->compaction->output_level();
329 compaction_result_->output_path = output_path_;
330 for (const auto& output_file : sub_compact->GetOutputs()) {
331 auto& meta = output_file.meta;
332 compaction_result_->output_files.emplace_back(
333 MakeTableFileName(meta.fd.GetNumber()), meta.fd.smallest_seqno,
334 meta.fd.largest_seqno, meta.smallest.Encode().ToString(),
335 meta.largest.Encode().ToString(), meta.oldest_ancester_time,
336 meta.file_creation_time, output_file.validator.GetHash(),
337 meta.marked_for_compaction, meta.unique_id);
338 }
339 InternalStats::CompactionStatsFull compaction_stats;
340 sub_compact->AggregateCompactionStats(compaction_stats);
341 compaction_result_->num_output_records =
342 compaction_stats.stats.num_output_records;
343 compaction_result_->total_bytes = compaction_stats.TotalBytesWritten();
344
345 return status;
346 }
347
348 void CompactionServiceCompactionJob::CleanupCompaction() {
349 CompactionJob::CleanupCompaction();
350 }
351
352 // Internal binary format for the input and result data
353 enum BinaryFormatVersion : uint32_t {
354 kOptionsString = 1, // Use string format similar to Option string format
355 };
356
357 static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
358 {"name",
359 {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
360 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
361 {"options",
362 {offsetof(struct ColumnFamilyDescriptor, options),
363 OptionType::kConfigurable, OptionVerificationType::kNormal,
364 OptionTypeFlags::kNone,
365 [](const ConfigOptions& opts, const std::string& /*name*/,
366 const std::string& value, void* addr) {
367 auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
368 return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
369 value, cf_options);
370 },
371 [](const ConfigOptions& opts, const std::string& /*name*/,
372 const void* addr, std::string* value) {
373 const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
374 std::string result;
375 auto status =
376 GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
377 *value = "{" + result + "}";
378 return status;
379 },
380 [](const ConfigOptions& opts, const std::string& name, const void* addr1,
381 const void* addr2, std::string* mismatch) {
382 const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
383 const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
384 auto this_conf = CFOptionsAsConfigurable(*this_one);
385 auto that_conf = CFOptionsAsConfigurable(*that_one);
386 std::string mismatch_opt;
387 bool result =
388 this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
389 if (!result) {
390 *mismatch = name + "." + mismatch_opt;
391 }
392 return result;
393 }}},
394 };
395
396 static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
397 {"column_family",
398 OptionTypeInfo::Struct(
399 "column_family", &cfd_type_info,
400 offsetof(struct CompactionServiceInput, column_family),
401 OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
402 {"db_options",
403 {offsetof(struct CompactionServiceInput, db_options),
404 OptionType::kConfigurable, OptionVerificationType::kNormal,
405 OptionTypeFlags::kNone,
406 [](const ConfigOptions& opts, const std::string& /*name*/,
407 const std::string& value, void* addr) {
408 auto options = static_cast<DBOptions*>(addr);
409 return GetDBOptionsFromString(opts, DBOptions(), value, options);
410 },
411 [](const ConfigOptions& opts, const std::string& /*name*/,
412 const void* addr, std::string* value) {
413 const auto options = static_cast<const DBOptions*>(addr);
414 std::string result;
415 auto status = GetStringFromDBOptions(opts, *options, &result);
416 *value = "{" + result + "}";
417 return status;
418 },
419 [](const ConfigOptions& opts, const std::string& name, const void* addr1,
420 const void* addr2, std::string* mismatch) {
421 const auto this_one = static_cast<const DBOptions*>(addr1);
422 const auto that_one = static_cast<const DBOptions*>(addr2);
423 auto this_conf = DBOptionsAsConfigurable(*this_one);
424 auto that_conf = DBOptionsAsConfigurable(*that_one);
425 std::string mismatch_opt;
426 bool result =
427 this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
428 if (!result) {
429 *mismatch = name + "." + mismatch_opt;
430 }
431 return result;
432 }}},
433 {"snapshots", OptionTypeInfo::Vector<uint64_t>(
434 offsetof(struct CompactionServiceInput, snapshots),
435 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
436 {0, OptionType::kUInt64T})},
437 {"input_files", OptionTypeInfo::Vector<std::string>(
438 offsetof(struct CompactionServiceInput, input_files),
439 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
440 {0, OptionType::kEncodedString})},
441 {"output_level",
442 {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
443 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
444 {"db_id",
445 {offsetof(struct CompactionServiceInput, db_id),
446 OptionType::kEncodedString}},
447 {"has_begin",
448 {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
449 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
450 {"begin",
451 {offsetof(struct CompactionServiceInput, begin),
452 OptionType::kEncodedString, OptionVerificationType::kNormal,
453 OptionTypeFlags::kNone}},
454 {"has_end",
455 {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
456 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
457 {"end",
458 {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
459 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
460 };
461
462 static std::unordered_map<std::string, OptionTypeInfo>
463 cs_output_file_type_info = {
464 {"file_name",
465 {offsetof(struct CompactionServiceOutputFile, file_name),
466 OptionType::kEncodedString, OptionVerificationType::kNormal,
467 OptionTypeFlags::kNone}},
468 {"smallest_seqno",
469 {offsetof(struct CompactionServiceOutputFile, smallest_seqno),
470 OptionType::kUInt64T, OptionVerificationType::kNormal,
471 OptionTypeFlags::kNone}},
472 {"largest_seqno",
473 {offsetof(struct CompactionServiceOutputFile, largest_seqno),
474 OptionType::kUInt64T, OptionVerificationType::kNormal,
475 OptionTypeFlags::kNone}},
476 {"smallest_internal_key",
477 {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
478 OptionType::kEncodedString, OptionVerificationType::kNormal,
479 OptionTypeFlags::kNone}},
480 {"largest_internal_key",
481 {offsetof(struct CompactionServiceOutputFile, largest_internal_key),
482 OptionType::kEncodedString, OptionVerificationType::kNormal,
483 OptionTypeFlags::kNone}},
484 {"oldest_ancester_time",
485 {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
486 OptionType::kUInt64T, OptionVerificationType::kNormal,
487 OptionTypeFlags::kNone}},
488 {"file_creation_time",
489 {offsetof(struct CompactionServiceOutputFile, file_creation_time),
490 OptionType::kUInt64T, OptionVerificationType::kNormal,
491 OptionTypeFlags::kNone}},
492 {"paranoid_hash",
493 {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
494 OptionType::kUInt64T, OptionVerificationType::kNormal,
495 OptionTypeFlags::kNone}},
496 {"marked_for_compaction",
497 {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
498 OptionType::kBoolean, OptionVerificationType::kNormal,
499 OptionTypeFlags::kNone}},
500 {"unique_id",
501 OptionTypeInfo::Array<uint64_t, 2>(
502 offsetof(struct CompactionServiceOutputFile, unique_id),
503 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
504 {0, OptionType::kUInt64T})},
505 };
506
507 static std::unordered_map<std::string, OptionTypeInfo>
508 compaction_job_stats_type_info = {
509 {"elapsed_micros",
510 {offsetof(struct CompactionJobStats, elapsed_micros),
511 OptionType::kUInt64T, OptionVerificationType::kNormal,
512 OptionTypeFlags::kNone}},
513 {"cpu_micros",
514 {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
515 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
516 {"num_input_records",
517 {offsetof(struct CompactionJobStats, num_input_records),
518 OptionType::kUInt64T, OptionVerificationType::kNormal,
519 OptionTypeFlags::kNone}},
520 {"num_blobs_read",
521 {offsetof(struct CompactionJobStats, num_blobs_read),
522 OptionType::kUInt64T, OptionVerificationType::kNormal,
523 OptionTypeFlags::kNone}},
524 {"num_input_files",
525 {offsetof(struct CompactionJobStats, num_input_files),
526 OptionType::kSizeT, OptionVerificationType::kNormal,
527 OptionTypeFlags::kNone}},
528 {"num_input_files_at_output_level",
529 {offsetof(struct CompactionJobStats, num_input_files_at_output_level),
530 OptionType::kSizeT, OptionVerificationType::kNormal,
531 OptionTypeFlags::kNone}},
532 {"num_output_records",
533 {offsetof(struct CompactionJobStats, num_output_records),
534 OptionType::kUInt64T, OptionVerificationType::kNormal,
535 OptionTypeFlags::kNone}},
536 {"num_output_files",
537 {offsetof(struct CompactionJobStats, num_output_files),
538 OptionType::kSizeT, OptionVerificationType::kNormal,
539 OptionTypeFlags::kNone}},
540 {"num_output_files_blob",
541 {offsetof(struct CompactionJobStats, num_output_files_blob),
542 OptionType::kSizeT, OptionVerificationType::kNormal,
543 OptionTypeFlags::kNone}},
544 {"is_full_compaction",
545 {offsetof(struct CompactionJobStats, is_full_compaction),
546 OptionType::kBoolean, OptionVerificationType::kNormal,
547 OptionTypeFlags::kNone}},
548 {"is_manual_compaction",
549 {offsetof(struct CompactionJobStats, is_manual_compaction),
550 OptionType::kBoolean, OptionVerificationType::kNormal,
551 OptionTypeFlags::kNone}},
552 {"total_input_bytes",
553 {offsetof(struct CompactionJobStats, total_input_bytes),
554 OptionType::kUInt64T, OptionVerificationType::kNormal,
555 OptionTypeFlags::kNone}},
556 {"total_blob_bytes_read",
557 {offsetof(struct CompactionJobStats, total_blob_bytes_read),
558 OptionType::kUInt64T, OptionVerificationType::kNormal,
559 OptionTypeFlags::kNone}},
560 {"total_output_bytes",
561 {offsetof(struct CompactionJobStats, total_output_bytes),
562 OptionType::kUInt64T, OptionVerificationType::kNormal,
563 OptionTypeFlags::kNone}},
564 {"total_output_bytes_blob",
565 {offsetof(struct CompactionJobStats, total_output_bytes_blob),
566 OptionType::kUInt64T, OptionVerificationType::kNormal,
567 OptionTypeFlags::kNone}},
568 {"num_records_replaced",
569 {offsetof(struct CompactionJobStats, num_records_replaced),
570 OptionType::kUInt64T, OptionVerificationType::kNormal,
571 OptionTypeFlags::kNone}},
572 {"total_input_raw_key_bytes",
573 {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
574 OptionType::kUInt64T, OptionVerificationType::kNormal,
575 OptionTypeFlags::kNone}},
576 {"total_input_raw_value_bytes",
577 {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
578 OptionType::kUInt64T, OptionVerificationType::kNormal,
579 OptionTypeFlags::kNone}},
580 {"num_input_deletion_records",
581 {offsetof(struct CompactionJobStats, num_input_deletion_records),
582 OptionType::kUInt64T, OptionVerificationType::kNormal,
583 OptionTypeFlags::kNone}},
584 {"num_expired_deletion_records",
585 {offsetof(struct CompactionJobStats, num_expired_deletion_records),
586 OptionType::kUInt64T, OptionVerificationType::kNormal,
587 OptionTypeFlags::kNone}},
588 {"num_corrupt_keys",
589 {offsetof(struct CompactionJobStats, num_corrupt_keys),
590 OptionType::kUInt64T, OptionVerificationType::kNormal,
591 OptionTypeFlags::kNone}},
592 {"file_write_nanos",
593 {offsetof(struct CompactionJobStats, file_write_nanos),
594 OptionType::kUInt64T, OptionVerificationType::kNormal,
595 OptionTypeFlags::kNone}},
596 {"file_range_sync_nanos",
597 {offsetof(struct CompactionJobStats, file_range_sync_nanos),
598 OptionType::kUInt64T, OptionVerificationType::kNormal,
599 OptionTypeFlags::kNone}},
600 {"file_fsync_nanos",
601 {offsetof(struct CompactionJobStats, file_fsync_nanos),
602 OptionType::kUInt64T, OptionVerificationType::kNormal,
603 OptionTypeFlags::kNone}},
604 {"file_prepare_write_nanos",
605 {offsetof(struct CompactionJobStats, file_prepare_write_nanos),
606 OptionType::kUInt64T, OptionVerificationType::kNormal,
607 OptionTypeFlags::kNone}},
608 {"smallest_output_key_prefix",
609 {offsetof(struct CompactionJobStats, smallest_output_key_prefix),
610 OptionType::kEncodedString, OptionVerificationType::kNormal,
611 OptionTypeFlags::kNone}},
612 {"largest_output_key_prefix",
613 {offsetof(struct CompactionJobStats, largest_output_key_prefix),
614 OptionType::kEncodedString, OptionVerificationType::kNormal,
615 OptionTypeFlags::kNone}},
616 {"num_single_del_fallthru",
617 {offsetof(struct CompactionJobStats, num_single_del_fallthru),
618 OptionType::kUInt64T, OptionVerificationType::kNormal,
619 OptionTypeFlags::kNone}},
620 {"num_single_del_mismatch",
621 {offsetof(struct CompactionJobStats, num_single_del_mismatch),
622 OptionType::kUInt64T, OptionVerificationType::kNormal,
623 OptionTypeFlags::kNone}},
624 };
625
626 namespace {
627 // this is a helper struct to serialize and deserialize class Status, because
628 // Status's members are not public.
629 struct StatusSerializationAdapter {
630 uint8_t code;
631 uint8_t subcode;
632 uint8_t severity;
633 std::string message;
634
635 StatusSerializationAdapter() = default;
636 explicit StatusSerializationAdapter(const Status& s) {
637 code = s.code();
638 subcode = s.subcode();
639 severity = s.severity();
640 auto msg = s.getState();
641 message = msg ? msg : "";
642 }
643
644 Status GetStatus() const {
645 return Status{static_cast<Status::Code>(code),
646 static_cast<Status::SubCode>(subcode),
647 static_cast<Status::Severity>(severity), message};
648 }
649 };
650 } // namespace
651
652 static std::unordered_map<std::string, OptionTypeInfo>
653 status_adapter_type_info = {
654 {"code",
655 {offsetof(struct StatusSerializationAdapter, code),
656 OptionType::kUInt8T, OptionVerificationType::kNormal,
657 OptionTypeFlags::kNone}},
658 {"subcode",
659 {offsetof(struct StatusSerializationAdapter, subcode),
660 OptionType::kUInt8T, OptionVerificationType::kNormal,
661 OptionTypeFlags::kNone}},
662 {"severity",
663 {offsetof(struct StatusSerializationAdapter, severity),
664 OptionType::kUInt8T, OptionVerificationType::kNormal,
665 OptionTypeFlags::kNone}},
666 {"message",
667 {offsetof(struct StatusSerializationAdapter, message),
668 OptionType::kEncodedString, OptionVerificationType::kNormal,
669 OptionTypeFlags::kNone}},
670 };
671
672 static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
673 {"status",
674 {offsetof(struct CompactionServiceResult, status),
675 OptionType::kCustomizable, OptionVerificationType::kNormal,
676 OptionTypeFlags::kNone,
677 [](const ConfigOptions& opts, const std::string& /*name*/,
678 const std::string& value, void* addr) {
679 auto status_obj = static_cast<Status*>(addr);
680 StatusSerializationAdapter adapter;
681 Status s = OptionTypeInfo::ParseType(
682 opts, value, status_adapter_type_info, &adapter);
683 *status_obj = adapter.GetStatus();
684 return s;
685 },
686 [](const ConfigOptions& opts, const std::string& /*name*/,
687 const void* addr, std::string* value) {
688 const auto status_obj = static_cast<const Status*>(addr);
689 StatusSerializationAdapter adapter(*status_obj);
690 std::string result;
691 Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
692 &adapter, &result);
693 *value = "{" + result + "}";
694 return s;
695 },
696 [](const ConfigOptions& opts, const std::string& /*name*/,
697 const void* addr1, const void* addr2, std::string* mismatch) {
698 const auto status1 = static_cast<const Status*>(addr1);
699 const auto status2 = static_cast<const Status*>(addr2);
700
701 StatusSerializationAdapter adatper1(*status1);
702 StatusSerializationAdapter adapter2(*status2);
703 return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
704 &adatper1, &adapter2, mismatch);
705 }}},
706 {"output_files",
707 OptionTypeInfo::Vector<CompactionServiceOutputFile>(
708 offsetof(struct CompactionServiceResult, output_files),
709 OptionVerificationType::kNormal, OptionTypeFlags::kNone,
710 OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
711 OptionVerificationType::kNormal,
712 OptionTypeFlags::kNone))},
713 {"output_level",
714 {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
715 OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
716 {"output_path",
717 {offsetof(struct CompactionServiceResult, output_path),
718 OptionType::kEncodedString, OptionVerificationType::kNormal,
719 OptionTypeFlags::kNone}},
720 {"num_output_records",
721 {offsetof(struct CompactionServiceResult, num_output_records),
722 OptionType::kUInt64T, OptionVerificationType::kNormal,
723 OptionTypeFlags::kNone}},
724 {"total_bytes",
725 {offsetof(struct CompactionServiceResult, total_bytes),
726 OptionType::kUInt64T, OptionVerificationType::kNormal,
727 OptionTypeFlags::kNone}},
728 {"bytes_read",
729 {offsetof(struct CompactionServiceResult, bytes_read),
730 OptionType::kUInt64T, OptionVerificationType::kNormal,
731 OptionTypeFlags::kNone}},
732 {"bytes_written",
733 {offsetof(struct CompactionServiceResult, bytes_written),
734 OptionType::kUInt64T, OptionVerificationType::kNormal,
735 OptionTypeFlags::kNone}},
736 {"stats", OptionTypeInfo::Struct(
737 "stats", &compaction_job_stats_type_info,
738 offsetof(struct CompactionServiceResult, stats),
739 OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
740 };
741
742 Status CompactionServiceInput::Read(const std::string& data_str,
743 CompactionServiceInput* obj) {
744 if (data_str.size() <= sizeof(BinaryFormatVersion)) {
745 return Status::InvalidArgument("Invalid CompactionServiceInput string");
746 }
747 auto format_version = DecodeFixed32(data_str.data());
748 if (format_version == kOptionsString) {
749 ConfigOptions cf;
750 cf.invoke_prepare_options = false;
751 cf.ignore_unknown_options = true;
752 return OptionTypeInfo::ParseType(
753 cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
754 obj);
755 } else {
756 return Status::NotSupported(
757 "Compaction Service Input data version not supported: " +
758 std::to_string(format_version));
759 }
760 }
761
762 Status CompactionServiceInput::Write(std::string* output) {
763 char buf[sizeof(BinaryFormatVersion)];
764 EncodeFixed32(buf, kOptionsString);
765 output->append(buf, sizeof(BinaryFormatVersion));
766 ConfigOptions cf;
767 cf.invoke_prepare_options = false;
768 return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
769 }
770
771 Status CompactionServiceResult::Read(const std::string& data_str,
772 CompactionServiceResult* obj) {
773 if (data_str.size() <= sizeof(BinaryFormatVersion)) {
774 return Status::InvalidArgument("Invalid CompactionServiceResult string");
775 }
776 auto format_version = DecodeFixed32(data_str.data());
777 if (format_version == kOptionsString) {
778 ConfigOptions cf;
779 cf.invoke_prepare_options = false;
780 cf.ignore_unknown_options = true;
781 return OptionTypeInfo::ParseType(
782 cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
783 obj);
784 } else {
785 return Status::NotSupported(
786 "Compaction Service Result data version not supported: " +
787 std::to_string(format_version));
788 }
789 }
790
791 Status CompactionServiceResult::Write(std::string* output) {
792 char buf[sizeof(BinaryFormatVersion)];
793 EncodeFixed32(buf, kOptionsString);
794 output->append(buf, sizeof(BinaryFormatVersion));
795 ConfigOptions cf;
796 cf.invoke_prepare_options = false;
797 return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
798 }
799
800 #ifndef NDEBUG
801 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
802 std::string mismatch;
803 return TEST_Equals(other, &mismatch);
804 }
805
806 bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
807 std::string* mismatch) {
808 ConfigOptions cf;
809 cf.invoke_prepare_options = false;
810 return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
811 mismatch);
812 }
813
814 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
815 std::string mismatch;
816 return TEST_Equals(other, &mismatch);
817 }
818
819 bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
820 std::string* mismatch) {
821 ConfigOptions cf;
822 cf.invoke_prepare_options = false;
823 return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
824 mismatch);
825 }
826 #endif // NDEBUG
827 } // namespace ROCKSDB_NAMESPACE
828
829 #endif // !ROCKSDB_LITE