]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/examples/compact_files_example.cc
a0a9fa90aae3ad2425550073876eafbffad9c801
[ceph.git] / ceph / src / rocksdb / examples / compact_files_example.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // An example code demonstrating how to use CompactFiles, EventListener,
7 // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
8
9 #include <mutex>
10 #include <string>
11 #include "rocksdb/db.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/options.h"
14
15 using namespace ROCKSDB_NAMESPACE;
16 std::string kDBPath = "/tmp/rocksdb_compact_files_example";
17 struct CompactionTask;
18
19 // This is an example interface of external-compaction algorithm.
20 // Compaction algorithm can be implemented outside the core-RocksDB
21 // code by using the pluggable compaction APIs that RocksDb provides.
22 class Compactor : public EventListener {
23 public:
24 // Picks and returns a compaction task given the specified DB
25 // and column family. It is the caller's responsibility to
26 // destroy the returned CompactionTask. Returns "nullptr"
27 // if it cannot find a proper compaction task.
28 virtual CompactionTask* PickCompaction(
29 DB* db, const std::string& cf_name) = 0;
30
31 // Schedule and run the specified compaction task in background.
32 virtual void ScheduleCompaction(CompactionTask *task) = 0;
33 };
34
35 // Example structure that describes a compaction task.
36 struct CompactionTask {
37 CompactionTask(
38 DB* _db, Compactor* _compactor,
39 const std::string& _column_family_name,
40 const std::vector<std::string>& _input_file_names,
41 const int _output_level,
42 const CompactionOptions& _compact_options,
43 bool _retry_on_fail)
44 : db(_db),
45 compactor(_compactor),
46 column_family_name(_column_family_name),
47 input_file_names(_input_file_names),
48 output_level(_output_level),
49 compact_options(_compact_options),
50 retry_on_fail(_retry_on_fail) {}
51 DB* db;
52 Compactor* compactor;
53 const std::string& column_family_name;
54 std::vector<std::string> input_file_names;
55 int output_level;
56 CompactionOptions compact_options;
57 bool retry_on_fail;
58 };
59
60 // A simple compaction algorithm that always compacts everything
61 // to the highest level whenever possible.
62 class FullCompactor : public Compactor {
63 public:
64 explicit FullCompactor(const Options options) : options_(options) {
65 compact_options_.compression = options_.compression;
66 compact_options_.output_file_size_limit =
67 options_.target_file_size_base;
68 }
69
70 // When flush happens, it determines whether to trigger compaction. If
71 // triggered_writes_stop is true, it will also set the retry flag of
72 // compaction-task to true.
73 void OnFlushCompleted(
74 DB* db, const FlushJobInfo& info) override {
75 CompactionTask* task = PickCompaction(db, info.cf_name);
76 if (task != nullptr) {
77 if (info.triggered_writes_stop) {
78 task->retry_on_fail = true;
79 }
80 // Schedule compaction in a different thread.
81 ScheduleCompaction(task);
82 }
83 }
84
85 // Always pick a compaction which includes all files whenever possible.
86 CompactionTask* PickCompaction(
87 DB* db, const std::string& cf_name) override {
88 ColumnFamilyMetaData cf_meta;
89 db->GetColumnFamilyMetaData(&cf_meta);
90
91 std::vector<std::string> input_file_names;
92 for (auto level : cf_meta.levels) {
93 for (auto file : level.files) {
94 if (file.being_compacted) {
95 return nullptr;
96 }
97 input_file_names.push_back(file.name);
98 }
99 }
100 return new CompactionTask(
101 db, this, cf_name, input_file_names,
102 options_.num_levels - 1, compact_options_, false);
103 }
104
105 // Schedule the specified compaction task in background.
106 void ScheduleCompaction(CompactionTask* task) override {
107 options_.env->Schedule(&FullCompactor::CompactFiles, task);
108 }
109
110 static void CompactFiles(void* arg) {
111 std::unique_ptr<CompactionTask> task(
112 reinterpret_cast<CompactionTask*>(arg));
113 assert(task);
114 assert(task->db);
115 Status s = task->db->CompactFiles(
116 task->compact_options,
117 task->input_file_names,
118 task->output_level);
119 printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
120 if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
121 // If a compaction task with its retry_on_fail=true failed,
122 // try to schedule another compaction in case the reason
123 // is not an IO error.
124 CompactionTask* new_task = task->compactor->PickCompaction(
125 task->db, task->column_family_name);
126 task->compactor->ScheduleCompaction(new_task);
127 }
128 }
129
130 private:
131 Options options_;
132 CompactionOptions compact_options_;
133 };
134
135 int main() {
136 Options options;
137 options.create_if_missing = true;
138 // Disable RocksDB background compaction.
139 options.compaction_style = kCompactionStyleNone;
140 // Small slowdown and stop trigger for experimental purpose.
141 options.level0_slowdown_writes_trigger = 3;
142 options.level0_stop_writes_trigger = 5;
143 options.IncreaseParallelism(5);
144 options.listeners.emplace_back(new FullCompactor(options));
145
146 DB* db = nullptr;
147 DestroyDB(kDBPath, options);
148 Status s = DB::Open(options, kDBPath, &db);
149 assert(s.ok());
150 assert(db);
151
152 // if background compaction is not working, write will stall
153 // because of options.level0_stop_writes_trigger
154 for (int i = 1000; i < 99999; ++i) {
155 db->Put(WriteOptions(), std::to_string(i),
156 std::string(500, 'a' + (i % 26)));
157 }
158
159 // verify the values are still there
160 std::string value;
161 for (int i = 1000; i < 99999; ++i) {
162 db->Get(ReadOptions(), std::to_string(i),
163 &value);
164 assert(value == std::string(500, 'a' + (i % 26)));
165 }
166
167 // close the db.
168 delete db;
169
170 return 0;
171 }