]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/examples/compact_files_example.cc
01e7d94d5072f6a87b458d0adea3dc587ee358be
[ceph.git] / ceph / src / rocksdb / examples / compact_files_example.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // An example code demonstrating how to use CompactFiles, EventListener,
7 // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
8
9 #include <mutex>
10 #include <string>
11 #include "rocksdb/db.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/options.h"
14
15 using namespace ROCKSDB_NAMESPACE;
16
17 #if defined(OS_WIN)
18 std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_compact_files_example";
19 #else
20 std::string kDBPath = "/tmp/rocksdb_compact_files_example";
21 #endif
22
23 struct CompactionTask;
24
25 // This is an example interface of external-compaction algorithm.
26 // Compaction algorithm can be implemented outside the core-RocksDB
27 // code by using the pluggable compaction APIs that RocksDb provides.
28 class Compactor : public EventListener {
29 public:
30 // Picks and returns a compaction task given the specified DB
31 // and column family. It is the caller's responsibility to
32 // destroy the returned CompactionTask. Returns "nullptr"
33 // if it cannot find a proper compaction task.
34 virtual CompactionTask* PickCompaction(
35 DB* db, const std::string& cf_name) = 0;
36
37 // Schedule and run the specified compaction task in background.
38 virtual void ScheduleCompaction(CompactionTask *task) = 0;
39 };
40
41 // Example structure that describes a compaction task.
42 struct CompactionTask {
43 CompactionTask(
44 DB* _db, Compactor* _compactor,
45 const std::string& _column_family_name,
46 const std::vector<std::string>& _input_file_names,
47 const int _output_level,
48 const CompactionOptions& _compact_options,
49 bool _retry_on_fail)
50 : db(_db),
51 compactor(_compactor),
52 column_family_name(_column_family_name),
53 input_file_names(_input_file_names),
54 output_level(_output_level),
55 compact_options(_compact_options),
56 retry_on_fail(_retry_on_fail) {}
57 DB* db;
58 Compactor* compactor;
59 const std::string& column_family_name;
60 std::vector<std::string> input_file_names;
61 int output_level;
62 CompactionOptions compact_options;
63 bool retry_on_fail;
64 };
65
66 // A simple compaction algorithm that always compacts everything
67 // to the highest level whenever possible.
68 class FullCompactor : public Compactor {
69 public:
70 explicit FullCompactor(const Options options) : options_(options) {
71 compact_options_.compression = options_.compression;
72 compact_options_.output_file_size_limit =
73 options_.target_file_size_base;
74 }
75
76 // When flush happens, it determines whether to trigger compaction. If
77 // triggered_writes_stop is true, it will also set the retry flag of
78 // compaction-task to true.
79 void OnFlushCompleted(
80 DB* db, const FlushJobInfo& info) override {
81 CompactionTask* task = PickCompaction(db, info.cf_name);
82 if (task != nullptr) {
83 if (info.triggered_writes_stop) {
84 task->retry_on_fail = true;
85 }
86 // Schedule compaction in a different thread.
87 ScheduleCompaction(task);
88 }
89 }
90
91 // Always pick a compaction which includes all files whenever possible.
92 CompactionTask* PickCompaction(
93 DB* db, const std::string& cf_name) override {
94 ColumnFamilyMetaData cf_meta;
95 db->GetColumnFamilyMetaData(&cf_meta);
96
97 std::vector<std::string> input_file_names;
98 for (auto level : cf_meta.levels) {
99 for (auto file : level.files) {
100 if (file.being_compacted) {
101 return nullptr;
102 }
103 input_file_names.push_back(file.name);
104 }
105 }
106 return new CompactionTask(
107 db, this, cf_name, input_file_names,
108 options_.num_levels - 1, compact_options_, false);
109 }
110
111 // Schedule the specified compaction task in background.
112 void ScheduleCompaction(CompactionTask* task) override {
113 options_.env->Schedule(&FullCompactor::CompactFiles, task);
114 }
115
116 static void CompactFiles(void* arg) {
117 std::unique_ptr<CompactionTask> task(
118 reinterpret_cast<CompactionTask*>(arg));
119 assert(task);
120 assert(task->db);
121 Status s = task->db->CompactFiles(
122 task->compact_options,
123 task->input_file_names,
124 task->output_level);
125 printf("CompactFiles() finished with status %s\n", s.ToString().c_str());
126 if (!s.ok() && !s.IsIOError() && task->retry_on_fail) {
127 // If a compaction task with its retry_on_fail=true failed,
128 // try to schedule another compaction in case the reason
129 // is not an IO error.
130 CompactionTask* new_task = task->compactor->PickCompaction(
131 task->db, task->column_family_name);
132 task->compactor->ScheduleCompaction(new_task);
133 }
134 }
135
136 private:
137 Options options_;
138 CompactionOptions compact_options_;
139 };
140
141 int main() {
142 Options options;
143 options.create_if_missing = true;
144 // Disable RocksDB background compaction.
145 options.compaction_style = kCompactionStyleNone;
146 // Small slowdown and stop trigger for experimental purpose.
147 options.level0_slowdown_writes_trigger = 3;
148 options.level0_stop_writes_trigger = 5;
149 options.IncreaseParallelism(5);
150 options.listeners.emplace_back(new FullCompactor(options));
151
152 DB* db = nullptr;
153 DestroyDB(kDBPath, options);
154 Status s = DB::Open(options, kDBPath, &db);
155 assert(s.ok());
156 assert(db);
157
158 // if background compaction is not working, write will stall
159 // because of options.level0_stop_writes_trigger
160 for (int i = 1000; i < 99999; ++i) {
161 db->Put(WriteOptions(), std::to_string(i),
162 std::string(500, 'a' + (i % 26)));
163 }
164
165 // verify the values are still there
166 std::string value;
167 for (int i = 1000; i < 99999; ++i) {
168 db->Get(ReadOptions(), std::to_string(i),
169 &value);
170 assert(value == std::string(500, 'a' + (i % 26)));
171 }
172
173 // close the db.
174 delete db;
175
176 return 0;
177 }