1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // An example code demonstrating how to use CompactFiles, EventListener,
7 // and GetColumnFamilyMetaData APIs to implement custom compaction algorithm.
11 #include "rocksdb/db.h"
12 #include "rocksdb/env.h"
13 #include "rocksdb/options.h"
15 using namespace rocksdb
;
16 std::string kDBPath
= "/tmp/rocksdb_compact_files_example";
17 struct CompactionTask
;
19 // This is an example interface of external-compaction algorithm.
20 // Compaction algorithm can be implemented outside the core-RocksDB
21 // code by using the pluggable compaction APIs that RocksDb provides.
22 class Compactor
: public EventListener
{
24 // Picks and returns a compaction task given the specified DB
25 // and column family. It is the caller's responsibility to
26 // destroy the returned CompactionTask. Returns "nullptr"
27 // if it cannot find a proper compaction task.
28 virtual CompactionTask
* PickCompaction(
29 DB
* db
, const std::string
& cf_name
) = 0;
31 // Schedule and run the specified compaction task in background.
32 virtual void ScheduleCompaction(CompactionTask
*task
) = 0;
35 // Example structure that describes a compaction task.
36 struct CompactionTask
{
38 DB
* _db
, Compactor
* _compactor
,
39 const std::string
& _column_family_name
,
40 const std::vector
<std::string
>& _input_file_names
,
41 const int _output_level
,
42 const CompactionOptions
& _compact_options
,
45 compactor(_compactor
),
46 column_family_name(_column_family_name
),
47 input_file_names(_input_file_names
),
48 output_level(_output_level
),
49 compact_options(_compact_options
),
50 retry_on_fail(_retry_on_fail
) {}
53 const std::string
& column_family_name
;
54 std::vector
<std::string
> input_file_names
;
56 CompactionOptions compact_options
;
60 // A simple compaction algorithm that always compacts everything
61 // to the highest level whenever possible.
62 class FullCompactor
: public Compactor
{
64 explicit FullCompactor(const Options options
) : options_(options
) {
65 compact_options_
.compression
= options_
.compression
;
66 compact_options_
.output_file_size_limit
=
67 options_
.target_file_size_base
;
70 // When flush happens, it determines whether to trigger compaction. If
71 // triggered_writes_stop is true, it will also set the retry flag of
72 // compaction-task to true.
73 void OnFlushCompleted(
74 DB
* db
, const FlushJobInfo
& info
) override
{
75 CompactionTask
* task
= PickCompaction(db
, info
.cf_name
);
76 if (task
!= nullptr) {
77 if (info
.triggered_writes_stop
) {
78 task
->retry_on_fail
= true;
80 // Schedule compaction in a different thread.
81 ScheduleCompaction(task
);
85 // Always pick a compaction which includes all files whenever possible.
86 CompactionTask
* PickCompaction(
87 DB
* db
, const std::string
& cf_name
) override
{
88 ColumnFamilyMetaData cf_meta
;
89 db
->GetColumnFamilyMetaData(&cf_meta
);
91 std::vector
<std::string
> input_file_names
;
92 for (auto level
: cf_meta
.levels
) {
93 for (auto file
: level
.files
) {
94 if (file
.being_compacted
) {
97 input_file_names
.push_back(file
.name
);
100 return new CompactionTask(
101 db
, this, cf_name
, input_file_names
,
102 options_
.num_levels
- 1, compact_options_
, false);
105 // Schedule the specified compaction task in background.
106 void ScheduleCompaction(CompactionTask
* task
) override
{
107 options_
.env
->Schedule(&FullCompactor::CompactFiles
, task
);
110 static void CompactFiles(void* arg
) {
111 std::unique_ptr
<CompactionTask
> task(
112 reinterpret_cast<CompactionTask
*>(arg
));
115 Status s
= task
->db
->CompactFiles(
116 task
->compact_options
,
117 task
->input_file_names
,
119 printf("CompactFiles() finished with status %s\n", s
.ToString().c_str());
120 if (!s
.ok() && !s
.IsIOError() && task
->retry_on_fail
) {
121 // If a compaction task with its retry_on_fail=true failed,
122 // try to schedule another compaction in case the reason
123 // is not an IO error.
124 CompactionTask
* new_task
= task
->compactor
->PickCompaction(
125 task
->db
, task
->column_family_name
);
126 task
->compactor
->ScheduleCompaction(new_task
);
132 CompactionOptions compact_options_
;
137 options
.create_if_missing
= true;
138 // Disable RocksDB background compaction.
139 options
.compaction_style
= kCompactionStyleNone
;
140 // Small slowdown and stop trigger for experimental purpose.
141 options
.level0_slowdown_writes_trigger
= 3;
142 options
.level0_stop_writes_trigger
= 5;
143 options
.IncreaseParallelism(5);
144 options
.listeners
.emplace_back(new FullCompactor(options
));
147 DestroyDB(kDBPath
, options
);
148 Status s
= DB::Open(options
, kDBPath
, &db
);
152 // if background compaction is not working, write will stall
153 // because of options.level0_stop_writes_trigger
154 for (int i
= 1000; i
< 99999; ++i
) {
155 db
->Put(WriteOptions(), std::to_string(i
),
156 std::string(500, 'a' + (i
% 26)));
159 // verify the values are still there
161 for (int i
= 1000; i
< 99999; ++i
) {
162 db
->Get(ReadOptions(), std::to_string(i
),
164 assert(value
== std::string(500, 'a' + (i
% 26)));