1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // An example code demonstrating how to use CompactFiles, EventListener,
7 // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
11 #include "rocksdb/db.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/options.h"
15 using namespace ROCKSDB_NAMESPACE
;
18 std::string kDBPath
= "C:\\Windows\\TEMP\\rocksdb_compact_files_example";
20 std::string kDBPath
= "/tmp/rocksdb_compact_files_example";
23 struct CompactionTask
;
25 // This is an example interface of external-compaction algorithm.
26 // Compaction algorithm can be implemented outside the core-RocksDB
27 // code by using the pluggable compaction APIs that RocksDb provides.
28 class Compactor
: public EventListener
{
30 // Picks and returns a compaction task given the specified DB
31 // and column family. It is the caller's responsibility to
32 // destroy the returned CompactionTask. Returns "nullptr"
33 // if it cannot find a proper compaction task.
34 virtual CompactionTask
* PickCompaction(
35 DB
* db
, const std::string
& cf_name
) = 0;
37 // Schedule and run the specified compaction task in background.
38 virtual void ScheduleCompaction(CompactionTask
*task
) = 0;
41 // Example structure that describes a compaction task.
42 struct CompactionTask
{
44 DB
* _db
, Compactor
* _compactor
,
45 const std::string
& _column_family_name
,
46 const std::vector
<std::string
>& _input_file_names
,
47 const int _output_level
,
48 const CompactionOptions
& _compact_options
,
51 compactor(_compactor
),
52 column_family_name(_column_family_name
),
53 input_file_names(_input_file_names
),
54 output_level(_output_level
),
55 compact_options(_compact_options
),
56 retry_on_fail(_retry_on_fail
) {}
59 const std::string
& column_family_name
;
60 std::vector
<std::string
> input_file_names
;
62 CompactionOptions compact_options
;
66 // A simple compaction algorithm that always compacts everything
67 // to the highest level whenever possible.
68 class FullCompactor
: public Compactor
{
70 explicit FullCompactor(const Options options
) : options_(options
) {
71 compact_options_
.compression
= options_
.compression
;
72 compact_options_
.output_file_size_limit
=
73 options_
.target_file_size_base
;
76 // When flush happens, it determines whether to trigger compaction. If
77 // triggered_writes_stop is true, it will also set the retry flag of
78 // compaction-task to true.
79 void OnFlushCompleted(
80 DB
* db
, const FlushJobInfo
& info
) override
{
81 CompactionTask
* task
= PickCompaction(db
, info
.cf_name
);
82 if (task
!= nullptr) {
83 if (info
.triggered_writes_stop
) {
84 task
->retry_on_fail
= true;
86 // Schedule compaction in a different thread.
87 ScheduleCompaction(task
);
91 // Always pick a compaction which includes all files whenever possible.
92 CompactionTask
* PickCompaction(
93 DB
* db
, const std::string
& cf_name
) override
{
94 ColumnFamilyMetaData cf_meta
;
95 db
->GetColumnFamilyMetaData(&cf_meta
);
97 std::vector
<std::string
> input_file_names
;
98 for (auto level
: cf_meta
.levels
) {
99 for (auto file
: level
.files
) {
100 if (file
.being_compacted
) {
103 input_file_names
.push_back(file
.name
);
106 return new CompactionTask(
107 db
, this, cf_name
, input_file_names
,
108 options_
.num_levels
- 1, compact_options_
, false);
111 // Schedule the specified compaction task in background.
112 void ScheduleCompaction(CompactionTask
* task
) override
{
113 options_
.env
->Schedule(&FullCompactor::CompactFiles
, task
);
116 static void CompactFiles(void* arg
) {
117 std::unique_ptr
<CompactionTask
> task(
118 reinterpret_cast<CompactionTask
*>(arg
));
121 Status s
= task
->db
->CompactFiles(
122 task
->compact_options
,
123 task
->input_file_names
,
125 printf("CompactFiles() finished with status %s\n", s
.ToString().c_str());
126 if (!s
.ok() && !s
.IsIOError() && task
->retry_on_fail
) {
127 // If a compaction task with its retry_on_fail=true failed,
128 // try to schedule another compaction in case the reason
129 // is not an IO error.
130 CompactionTask
* new_task
= task
->compactor
->PickCompaction(
131 task
->db
, task
->column_family_name
);
132 task
->compactor
->ScheduleCompaction(new_task
);
138 CompactionOptions compact_options_
;
143 options
.create_if_missing
= true;
144 // Disable RocksDB background compaction.
145 options
.compaction_style
= kCompactionStyleNone
;
146 // Small slowdown and stop trigger for experimental purpose.
147 options
.level0_slowdown_writes_trigger
= 3;
148 options
.level0_stop_writes_trigger
= 5;
149 options
.IncreaseParallelism(5);
150 options
.listeners
.emplace_back(new FullCompactor(options
));
153 DestroyDB(kDBPath
, options
);
154 Status s
= DB::Open(options
, kDBPath
, &db
);
158 // if background compaction is not working, write will stall
159 // because of options.level0_stop_writes_trigger
160 for (int i
= 1000; i
< 99999; ++i
) {
161 db
->Put(WriteOptions(), std::to_string(i
),
162 std::string(500, 'a' + (i
% 26)));
165 // verify the values are still there
167 for (int i
= 1000; i
< 99999; ++i
) {
168 db
->Get(ReadOptions(), std::to_string(i
),
170 assert(value
== std::string(500, 'a' + (i
% 26)));