]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/listener_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / listener_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include "db/db_impl.h"
7#include "db/db_test_util.h"
8#include "db/dbformat.h"
9#include "db/version_set.h"
10#include "db/write_batch_internal.h"
11#include "memtable/hash_linklist_rep.h"
12#include "monitoring/statistics.h"
13#include "rocksdb/cache.h"
14#include "rocksdb/compaction_filter.h"
15#include "rocksdb/db.h"
16#include "rocksdb/env.h"
17#include "rocksdb/filter_policy.h"
18#include "rocksdb/options.h"
19#include "rocksdb/perf_context.h"
20#include "rocksdb/slice.h"
21#include "rocksdb/slice_transform.h"
22#include "rocksdb/table.h"
23#include "rocksdb/table_properties.h"
24#include "table/block_based_table_factory.h"
25#include "table/plain_table_factory.h"
26#include "util/filename.h"
27#include "util/hash.h"
28#include "util/logging.h"
29#include "util/mutexlock.h"
30#include "util/rate_limiter.h"
31#include "util/string_util.h"
32#include "util/sync_point.h"
33#include "util/testharness.h"
34#include "util/testutil.h"
35#include "utilities/merge_operators.h"
36
37#ifndef ROCKSDB_LITE
38
39namespace rocksdb {
40
41class EventListenerTest : public DBTestBase {
42 public:
43 EventListenerTest() : DBTestBase("/listener_test") {}
44
45 const size_t k110KB = 110 << 10;
46};
47
48struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector {
494da23a
TL
49 rocksdb::Status AddUserKey(const rocksdb::Slice& /*key*/,
50 const rocksdb::Slice& /*value*/,
51 rocksdb::EntryType /*type*/,
52 rocksdb::SequenceNumber /*seq*/,
53 uint64_t /*file_size*/) override {
7c673cae
FG
54 return Status::OK();
55 }
494da23a 56 rocksdb::Status Finish(
7c673cae
FG
57 rocksdb::UserCollectedProperties* properties) override {
58 properties->insert({"0", "1"});
59 return Status::OK();
60 }
61
494da23a 62 const char* Name() const override { return "TestTablePropertiesCollector"; }
7c673cae
FG
63
64 rocksdb::UserCollectedProperties GetReadableProperties() const override {
65 rocksdb::UserCollectedProperties ret;
66 ret["2"] = "3";
67 return ret;
68 }
69};
70
71class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
72 public:
494da23a 73 TablePropertiesCollector* CreateTablePropertiesCollector(
11fdf7f2 74 TablePropertiesCollectorFactory::Context /*context*/) override {
7c673cae
FG
75 return new TestPropertiesCollector;
76 }
77 const char* Name() const override { return "TestTablePropertiesCollector"; }
78};
79
80class TestCompactionListener : public EventListener {
81 public:
82 void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override {
83 std::lock_guard<std::mutex> lock(mutex_);
84 compacted_dbs_.push_back(db);
85 ASSERT_GT(ci.input_files.size(), 0U);
86 ASSERT_GT(ci.output_files.size(), 0U);
87 ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
88 ASSERT_GT(ci.thread_id, 0U);
89
90 for (auto fl : {ci.input_files, ci.output_files}) {
91 for (auto fn : fl) {
92 auto it = ci.table_properties.find(fn);
93 ASSERT_NE(it, ci.table_properties.end());
94 auto tp = it->second;
95 ASSERT_TRUE(tp != nullptr);
96 ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
97 }
98 }
99 }
100
101 std::vector<DB*> compacted_dbs_;
102 std::mutex mutex_;
103};
104
105TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
106 const int kTestKeySize = 16;
107 const int kTestValueSize = 984;
108 const int kEntrySize = kTestKeySize + kTestValueSize;
109 const int kEntriesPerBuffer = 100;
110 const int kNumL0Files = 4;
111
112 Options options;
11fdf7f2 113 options.env = CurrentOptions().env;
7c673cae
FG
114 options.create_if_missing = true;
115 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
116 options.compaction_style = kCompactionStyleLevel;
117 options.target_file_size_base = options.write_buffer_size;
118 options.max_bytes_for_level_base = options.target_file_size_base * 2;
119 options.max_bytes_for_level_multiplier = 2;
120 options.compression = kNoCompression;
121#ifdef ROCKSDB_USING_THREAD_STATUS
122 options.enable_thread_tracking = true;
123#endif // ROCKSDB_USING_THREAD_STATUS
124 options.level0_file_num_compaction_trigger = kNumL0Files;
125 options.table_properties_collector_factories.push_back(
126 std::make_shared<TestPropertiesCollectorFactory>());
127
128 TestCompactionListener* listener = new TestCompactionListener();
129 options.listeners.emplace_back(listener);
130 std::vector<std::string> cf_names = {
131 "pikachu", "ilya", "muromec", "dobrynia",
132 "nikitich", "alyosha", "popovich"};
133 CreateAndReopenWithCF(cf_names, options);
134 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
135 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
136 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
137 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
138 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
139 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
140 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
141 for (int i = 1; i < 8; ++i) {
142 ASSERT_OK(Flush(i));
143 const Slice kRangeStart = "a";
144 const Slice kRangeEnd = "z";
145 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
146 &kRangeStart, &kRangeEnd));
147 dbfull()->TEST_WaitForFlushMemTable();
148 dbfull()->TEST_WaitForCompact();
149 }
150
151 ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
152 for (size_t i = 0; i < cf_names.size(); ++i) {
153 ASSERT_EQ(listener->compacted_dbs_[i], db_);
154 }
155}
156
157// This simple Listener can only handle one flush at a time.
158class TestFlushListener : public EventListener {
159 public:
160 explicit TestFlushListener(Env* env)
161 : slowdown_count(0), stop_count(0), db_closed(), env_(env) {
162 db_closed = false;
163 }
164 void OnTableFileCreated(
165 const TableFileCreationInfo& info) override {
166 // remember the info for later checking the FlushJobInfo.
167 prev_fc_info_ = info;
168 ASSERT_GT(info.db_name.size(), 0U);
169 ASSERT_GT(info.cf_name.size(), 0U);
170 ASSERT_GT(info.file_path.size(), 0U);
171 ASSERT_GT(info.job_id, 0);
172 ASSERT_GT(info.table_properties.data_size, 0U);
173 ASSERT_GT(info.table_properties.raw_key_size, 0U);
174 ASSERT_GT(info.table_properties.raw_value_size, 0U);
175 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
176 ASSERT_GT(info.table_properties.num_entries, 0U);
177
178#ifdef ROCKSDB_USING_THREAD_STATUS
179 // Verify the id of the current thread that created this table
180 // file matches the id of any active flush or compaction thread.
181 uint64_t thread_id = env_->GetThreadID();
182 std::vector<ThreadStatus> thread_list;
183 ASSERT_OK(env_->GetThreadList(&thread_list));
184 bool found_match = false;
185 for (auto thread_status : thread_list) {
186 if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
187 thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
188 if (thread_id == thread_status.thread_id) {
189 found_match = true;
190 break;
191 }
192 }
193 }
194 ASSERT_TRUE(found_match);
195#endif // ROCKSDB_USING_THREAD_STATUS
196 }
197
198 void OnFlushCompleted(
199 DB* db, const FlushJobInfo& info) override {
200 flushed_dbs_.push_back(db);
201 flushed_column_family_names_.push_back(info.cf_name);
202 if (info.triggered_writes_slowdown) {
203 slowdown_count++;
204 }
205 if (info.triggered_writes_stop) {
206 stop_count++;
207 }
208 // verify whether the previously created file matches the flushed file.
209 ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
210 ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
211 ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
212 ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
213 ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
214 ASSERT_GT(info.thread_id, 0U);
215 ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
216 "1");
217 }
218
219 std::vector<std::string> flushed_column_family_names_;
220 std::vector<DB*> flushed_dbs_;
221 int slowdown_count;
222 int stop_count;
223 bool db_closing;
224 std::atomic_bool db_closed;
225 TableFileCreationInfo prev_fc_info_;
226
227 protected:
228 Env* env_;
229};
230
231TEST_F(EventListenerTest, OnSingleDBFlushTest) {
232 Options options;
11fdf7f2 233 options.env = CurrentOptions().env;
7c673cae
FG
234 options.write_buffer_size = k110KB;
235#ifdef ROCKSDB_USING_THREAD_STATUS
236 options.enable_thread_tracking = true;
237#endif // ROCKSDB_USING_THREAD_STATUS
238 TestFlushListener* listener = new TestFlushListener(options.env);
239 options.listeners.emplace_back(listener);
240 std::vector<std::string> cf_names = {
241 "pikachu", "ilya", "muromec", "dobrynia",
242 "nikitich", "alyosha", "popovich"};
243 options.table_properties_collector_factories.push_back(
244 std::make_shared<TestPropertiesCollectorFactory>());
245 CreateAndReopenWithCF(cf_names, options);
246
247 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
248 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
249 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
250 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
251 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
252 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
253 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
254 for (int i = 1; i < 8; ++i) {
255 ASSERT_OK(Flush(i));
256 dbfull()->TEST_WaitForFlushMemTable();
257 ASSERT_EQ(listener->flushed_dbs_.size(), i);
258 ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
259 }
260
11fdf7f2 261 // make sure callback functions are called in the right order
7c673cae
FG
262 for (size_t i = 0; i < cf_names.size(); ++i) {
263 ASSERT_EQ(listener->flushed_dbs_[i], db_);
264 ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
265 }
266}
267
268TEST_F(EventListenerTest, MultiCF) {
269 Options options;
11fdf7f2 270 options.env = CurrentOptions().env;
7c673cae
FG
271 options.write_buffer_size = k110KB;
272#ifdef ROCKSDB_USING_THREAD_STATUS
273 options.enable_thread_tracking = true;
274#endif // ROCKSDB_USING_THREAD_STATUS
275 TestFlushListener* listener = new TestFlushListener(options.env);
276 options.listeners.emplace_back(listener);
277 options.table_properties_collector_factories.push_back(
278 std::make_shared<TestPropertiesCollectorFactory>());
279 std::vector<std::string> cf_names = {
280 "pikachu", "ilya", "muromec", "dobrynia",
281 "nikitich", "alyosha", "popovich"};
282 CreateAndReopenWithCF(cf_names, options);
283
284 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
285 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
286 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
287 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
288 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
289 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
290 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
291 for (int i = 1; i < 8; ++i) {
292 ASSERT_OK(Flush(i));
293 ASSERT_EQ(listener->flushed_dbs_.size(), i);
294 ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
295 }
296
11fdf7f2 297 // make sure callback functions are called in the right order
7c673cae
FG
298 for (size_t i = 0; i < cf_names.size(); i++) {
299 ASSERT_EQ(listener->flushed_dbs_[i], db_);
300 ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
301 }
302}
303
304TEST_F(EventListenerTest, MultiDBMultiListeners) {
305 Options options;
11fdf7f2 306 options.env = CurrentOptions().env;
7c673cae
FG
307#ifdef ROCKSDB_USING_THREAD_STATUS
308 options.enable_thread_tracking = true;
309#endif // ROCKSDB_USING_THREAD_STATUS
310 options.table_properties_collector_factories.push_back(
311 std::make_shared<TestPropertiesCollectorFactory>());
312 std::vector<TestFlushListener*> listeners;
313 const int kNumDBs = 5;
314 const int kNumListeners = 10;
315 for (int i = 0; i < kNumListeners; ++i) {
316 listeners.emplace_back(new TestFlushListener(options.env));
317 }
318
319 std::vector<std::string> cf_names = {
320 "pikachu", "ilya", "muromec", "dobrynia",
321 "nikitich", "alyosha", "popovich"};
322
323 options.create_if_missing = true;
324 for (int i = 0; i < kNumListeners; ++i) {
325 options.listeners.emplace_back(listeners[i]);
326 }
327 DBOptions db_opts(options);
328 ColumnFamilyOptions cf_opts(options);
329
330 std::vector<DB*> dbs;
331 std::vector<std::vector<ColumnFamilyHandle *>> vec_handles;
332
333 for (int d = 0; d < kNumDBs; ++d) {
334 ASSERT_OK(DestroyDB(dbname_ + ToString(d), options));
335 DB* db;
336 std::vector<ColumnFamilyHandle*> handles;
337 ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db));
338 for (size_t c = 0; c < cf_names.size(); ++c) {
339 ColumnFamilyHandle* handle;
340 db->CreateColumnFamily(cf_opts, cf_names[c], &handle);
341 handles.push_back(handle);
342 }
343
344 vec_handles.push_back(std::move(handles));
345 dbs.push_back(db);
346 }
347
348 for (int d = 0; d < kNumDBs; ++d) {
349 for (size_t c = 0; c < cf_names.size(); ++c) {
350 ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c],
351 cf_names[c], cf_names[c]));
352 }
353 }
354
355 for (size_t c = 0; c < cf_names.size(); ++c) {
356 for (int d = 0; d < kNumDBs; ++d) {
357 ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
358 reinterpret_cast<DBImpl*>(dbs[d])->TEST_WaitForFlushMemTable();
359 }
360 }
361
362 for (auto* listener : listeners) {
363 int pos = 0;
364 for (size_t c = 0; c < cf_names.size(); ++c) {
365 for (int d = 0; d < kNumDBs; ++d) {
366 ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
367 ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
368 pos++;
369 }
370 }
371 }
372
373
374 for (auto handles : vec_handles) {
375 for (auto h : handles) {
376 delete h;
377 }
378 handles.clear();
379 }
380 vec_handles.clear();
381
382 for (auto db : dbs) {
383 delete db;
384 }
385}
386
387TEST_F(EventListenerTest, DisableBGCompaction) {
388 Options options;
11fdf7f2 389 options.env = CurrentOptions().env;
7c673cae
FG
390#ifdef ROCKSDB_USING_THREAD_STATUS
391 options.enable_thread_tracking = true;
392#endif // ROCKSDB_USING_THREAD_STATUS
393 TestFlushListener* listener = new TestFlushListener(options.env);
394 const int kCompactionTrigger = 1;
395 const int kSlowdownTrigger = 5;
396 const int kStopTrigger = 100;
397 options.level0_file_num_compaction_trigger = kCompactionTrigger;
398 options.level0_slowdown_writes_trigger = kSlowdownTrigger;
399 options.level0_stop_writes_trigger = kStopTrigger;
400 options.max_write_buffer_number = 10;
401 options.listeners.emplace_back(listener);
402 // BG compaction is disabled. Number of L0 files will simply keeps
403 // increasing in this test.
404 options.compaction_style = kCompactionStyleNone;
405 options.compression = kNoCompression;
406 options.write_buffer_size = 100000; // Small write buffer
407 options.table_properties_collector_factories.push_back(
408 std::make_shared<TestPropertiesCollectorFactory>());
409
410 CreateAndReopenWithCF({"pikachu"}, options);
411 ColumnFamilyMetaData cf_meta;
412 db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
413
414 // keep writing until writes are forced to stop.
415 for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
416 ++i) {
417 Put(1, ToString(i), std::string(10000, 'x'), WriteOptions());
11fdf7f2
TL
418 FlushOptions fo;
419 fo.allow_write_stall = true;
420 db_->Flush(fo, handles_[1]);
7c673cae
FG
421 db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
422 }
423 ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
424}
425
426class TestCompactionReasonListener : public EventListener {
427 public:
11fdf7f2 428 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
7c673cae
FG
429 std::lock_guard<std::mutex> lock(mutex_);
430 compaction_reasons_.push_back(ci.compaction_reason);
431 }
432
433 std::vector<CompactionReason> compaction_reasons_;
434 std::mutex mutex_;
435};
436
437TEST_F(EventListenerTest, CompactionReasonLevel) {
438 Options options;
11fdf7f2 439 options.env = CurrentOptions().env;
7c673cae
FG
440 options.create_if_missing = true;
441 options.memtable_factory.reset(
442 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
443
444 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
445 options.listeners.emplace_back(listener);
446
447 options.level0_file_num_compaction_trigger = 4;
448 options.compaction_style = kCompactionStyleLevel;
449
450 DestroyAndReopen(options);
451 Random rnd(301);
452
453 // Write 4 files in L0
454 for (int i = 0; i < 4; i++) {
455 GenerateNewRandomFile(&rnd);
456 }
457 dbfull()->TEST_WaitForCompact();
458
459 ASSERT_EQ(listener->compaction_reasons_.size(), 1);
460 ASSERT_EQ(listener->compaction_reasons_[0],
461 CompactionReason::kLevelL0FilesNum);
462
463 DestroyAndReopen(options);
464
465 // Write 3 non-overlapping files in L0
466 for (int k = 1; k <= 30; k++) {
467 ASSERT_OK(Put(Key(k), Key(k)));
468 if (k % 10 == 0) {
469 Flush();
470 }
471 }
472
473 // Do a trivial move from L0 -> L1
474 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
475
476 options.max_bytes_for_level_base = 1;
477 Close();
478 listener->compaction_reasons_.clear();
479 Reopen(options);
480
481 dbfull()->TEST_WaitForCompact();
482 ASSERT_GT(listener->compaction_reasons_.size(), 1);
483
484 for (auto compaction_reason : listener->compaction_reasons_) {
485 ASSERT_EQ(compaction_reason, CompactionReason::kLevelMaxLevelSize);
486 }
487
488 options.disable_auto_compactions = true;
489 Close();
490 listener->compaction_reasons_.clear();
491 Reopen(options);
492
493 Put("key", "value");
494 CompactRangeOptions cro;
495 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
496 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
497 ASSERT_GT(listener->compaction_reasons_.size(), 0);
498 for (auto compaction_reason : listener->compaction_reasons_) {
499 ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
500 }
501}
502
503TEST_F(EventListenerTest, CompactionReasonUniversal) {
504 Options options;
11fdf7f2 505 options.env = CurrentOptions().env;
7c673cae
FG
506 options.create_if_missing = true;
507 options.memtable_factory.reset(
508 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
509
510 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
511 options.listeners.emplace_back(listener);
512
513 options.compaction_style = kCompactionStyleUniversal;
514
515 Random rnd(301);
516
517 options.level0_file_num_compaction_trigger = 8;
518 options.compaction_options_universal.max_size_amplification_percent = 100000;
519 options.compaction_options_universal.size_ratio = 100000;
520 DestroyAndReopen(options);
521 listener->compaction_reasons_.clear();
522
523 // Write 8 files in L0
524 for (int i = 0; i < 8; i++) {
525 GenerateNewRandomFile(&rnd);
526 }
527 dbfull()->TEST_WaitForCompact();
528
529 ASSERT_GT(listener->compaction_reasons_.size(), 0);
530 for (auto compaction_reason : listener->compaction_reasons_) {
11fdf7f2 531 ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeRatio);
7c673cae
FG
532 }
533
534 options.level0_file_num_compaction_trigger = 8;
535 options.compaction_options_universal.max_size_amplification_percent = 1;
536 options.compaction_options_universal.size_ratio = 100000;
537
538 DestroyAndReopen(options);
539 listener->compaction_reasons_.clear();
540
541 // Write 8 files in L0
542 for (int i = 0; i < 8; i++) {
543 GenerateNewRandomFile(&rnd);
544 }
545 dbfull()->TEST_WaitForCompact();
546
547 ASSERT_GT(listener->compaction_reasons_.size(), 0);
548 for (auto compaction_reason : listener->compaction_reasons_) {
549 ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification);
550 }
551
552 options.disable_auto_compactions = true;
553 Close();
554 listener->compaction_reasons_.clear();
555 Reopen(options);
556
557 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
558
559 ASSERT_GT(listener->compaction_reasons_.size(), 0);
560 for (auto compaction_reason : listener->compaction_reasons_) {
561 ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
562 }
563}
564
565TEST_F(EventListenerTest, CompactionReasonFIFO) {
566 Options options;
11fdf7f2 567 options.env = CurrentOptions().env;
7c673cae
FG
568 options.create_if_missing = true;
569 options.memtable_factory.reset(
570 new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile));
571
572 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
573 options.listeners.emplace_back(listener);
574
575 options.level0_file_num_compaction_trigger = 4;
576 options.compaction_style = kCompactionStyleFIFO;
577 options.compaction_options_fifo.max_table_files_size = 1;
578
579 DestroyAndReopen(options);
580 Random rnd(301);
581
582 // Write 4 files in L0
583 for (int i = 0; i < 4; i++) {
584 GenerateNewRandomFile(&rnd);
585 }
586 dbfull()->TEST_WaitForCompact();
587
588 ASSERT_GT(listener->compaction_reasons_.size(), 0);
589 for (auto compaction_reason : listener->compaction_reasons_) {
590 ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize);
591 }
592}
593
594class TableFileCreationListener : public EventListener {
595 public:
596 class TestEnv : public EnvWrapper {
597 public:
598 TestEnv() : EnvWrapper(Env::Default()) {}
599
600 void SetStatus(Status s) { status_ = s; }
601
602 Status NewWritableFile(const std::string& fname,
603 std::unique_ptr<WritableFile>* result,
494da23a 604 const EnvOptions& options) override {
7c673cae
FG
605 if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") {
606 if (!status_.ok()) {
607 return status_;
608 }
609 }
610 return Env::Default()->NewWritableFile(fname, result, options);
611 }
612
613 private:
614 Status status_;
615 };
616
617 TableFileCreationListener() {
618 for (int i = 0; i < 2; i++) {
619 started_[i] = finished_[i] = failure_[i] = 0;
620 }
621 }
622
623 int Index(TableFileCreationReason reason) {
624 int idx;
625 switch (reason) {
626 case TableFileCreationReason::kFlush:
627 idx = 0;
628 break;
629 case TableFileCreationReason::kCompaction:
630 idx = 1;
631 break;
632 default:
633 idx = -1;
634 }
635 return idx;
636 }
637
638 void CheckAndResetCounters(int flush_started, int flush_finished,
639 int flush_failure, int compaction_started,
640 int compaction_finished, int compaction_failure) {
641 ASSERT_EQ(started_[0], flush_started);
642 ASSERT_EQ(finished_[0], flush_finished);
643 ASSERT_EQ(failure_[0], flush_failure);
644 ASSERT_EQ(started_[1], compaction_started);
645 ASSERT_EQ(finished_[1], compaction_finished);
646 ASSERT_EQ(failure_[1], compaction_failure);
647 for (int i = 0; i < 2; i++) {
648 started_[i] = finished_[i] = failure_[i] = 0;
649 }
650 }
651
652 void OnTableFileCreationStarted(
653 const TableFileCreationBriefInfo& info) override {
654 int idx = Index(info.reason);
655 if (idx >= 0) {
656 started_[idx]++;
657 }
658 ASSERT_GT(info.db_name.size(), 0U);
659 ASSERT_GT(info.cf_name.size(), 0U);
660 ASSERT_GT(info.file_path.size(), 0U);
661 ASSERT_GT(info.job_id, 0);
662 }
663
664 void OnTableFileCreated(const TableFileCreationInfo& info) override {
665 int idx = Index(info.reason);
666 if (idx >= 0) {
667 finished_[idx]++;
668 }
669 ASSERT_GT(info.db_name.size(), 0U);
670 ASSERT_GT(info.cf_name.size(), 0U);
671 ASSERT_GT(info.file_path.size(), 0U);
672 ASSERT_GT(info.job_id, 0);
673 if (info.status.ok()) {
674 ASSERT_GT(info.table_properties.data_size, 0U);
675 ASSERT_GT(info.table_properties.raw_key_size, 0U);
676 ASSERT_GT(info.table_properties.raw_value_size, 0U);
677 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
678 ASSERT_GT(info.table_properties.num_entries, 0U);
679 } else {
680 if (idx >= 0) {
681 failure_[idx]++;
682 }
683 }
684 }
685
686 TestEnv test_env;
687 int started_[2];
688 int finished_[2];
689 int failure_[2];
690};
691
692TEST_F(EventListenerTest, TableFileCreationListenersTest) {
693 auto listener = std::make_shared<TableFileCreationListener>();
694 Options options;
695 options.create_if_missing = true;
696 options.listeners.push_back(listener);
697 options.env = &listener->test_env;
698 DestroyAndReopen(options);
699
700 ASSERT_OK(Put("foo", "aaa"));
701 ASSERT_OK(Put("bar", "bbb"));
702 ASSERT_OK(Flush());
703 dbfull()->TEST_WaitForFlushMemTable();
704 listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
705
706 ASSERT_OK(Put("foo", "aaa1"));
707 ASSERT_OK(Put("bar", "bbb1"));
708 listener->test_env.SetStatus(Status::NotSupported("not supported"));
709 ASSERT_NOK(Flush());
710 listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
711 listener->test_env.SetStatus(Status::OK());
712
713 Reopen(options);
714 ASSERT_OK(Put("foo", "aaa2"));
715 ASSERT_OK(Put("bar", "bbb2"));
716 ASSERT_OK(Flush());
717 dbfull()->TEST_WaitForFlushMemTable();
718 listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
719
720 const Slice kRangeStart = "a";
721 const Slice kRangeEnd = "z";
722 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
723 dbfull()->TEST_WaitForCompact();
724 listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
725
726 ASSERT_OK(Put("foo", "aaa3"));
727 ASSERT_OK(Put("bar", "bbb3"));
728 ASSERT_OK(Flush());
729 listener->test_env.SetStatus(Status::NotSupported("not supported"));
730 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd);
731 dbfull()->TEST_WaitForCompact();
732 listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
733}
734
735class MemTableSealedListener : public EventListener {
736private:
737 SequenceNumber latest_seq_number_;
738public:
739 MemTableSealedListener() {}
740 void OnMemTableSealed(const MemTableInfo& info) override {
741 latest_seq_number_ = info.first_seqno;
742 }
743
744 void OnFlushCompleted(DB* /*db*/,
745 const FlushJobInfo& flush_job_info) override {
746 ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
747 }
748};
749
750TEST_F(EventListenerTest, MemTableSealedListenerTest) {
751 auto listener = std::make_shared<MemTableSealedListener>();
752 Options options;
753 options.create_if_missing = true;
754 options.listeners.push_back(listener);
755 DestroyAndReopen(options);
756
757 for (unsigned int i = 0; i < 10; i++) {
758 std::string tag = std::to_string(i);
759 ASSERT_OK(Put("foo"+tag, "aaa"));
760 ASSERT_OK(Put("bar"+tag, "bbb"));
761
762 ASSERT_OK(Flush());
763 }
764}
765
766class ColumnFamilyHandleDeletionStartedListener : public EventListener {
767 private:
768 std::vector<std::string> cfs_;
769 int counter;
770
771 public:
772 explicit ColumnFamilyHandleDeletionStartedListener(
773 const std::vector<std::string>& cfs)
774 : cfs_(cfs), counter(0) {
775 cfs_.insert(cfs_.begin(), kDefaultColumnFamilyName);
776 }
777 void OnColumnFamilyHandleDeletionStarted(
778 ColumnFamilyHandle* handle) override {
779 ASSERT_EQ(cfs_[handle->GetID()], handle->GetName());
780 counter++;
781 }
782 int getCounter() { return counter; }
783};
784
785TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) {
786 std::vector<std::string> cfs{"pikachu", "eevee", "Mewtwo"};
787 auto listener =
788 std::make_shared<ColumnFamilyHandleDeletionStartedListener>(cfs);
789 Options options;
11fdf7f2 790 options.env = CurrentOptions().env;
7c673cae
FG
791 options.create_if_missing = true;
792 options.listeners.push_back(listener);
793 CreateAndReopenWithCF(cfs, options);
794 ASSERT_EQ(handles_.size(), 4);
795 delete handles_[3];
796 delete handles_[2];
797 delete handles_[1];
798 handles_.resize(1);
799 ASSERT_EQ(listener->getCounter(), 3);
800}
801
11fdf7f2
TL
802class BackgroundErrorListener : public EventListener {
803 private:
804 SpecialEnv* env_;
805 int counter_;
806
807 public:
808 BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {}
809
810 void OnBackgroundError(BackgroundErrorReason /*reason*/,
811 Status* bg_error) override {
812 if (counter_ == 0) {
813 // suppress the first error and disable write-dropping such that a retry
814 // can succeed.
815 *bg_error = Status::OK();
816 env_->drop_writes_.store(false, std::memory_order_release);
817 env_->no_slowdown_ = false;
818 }
819 ++counter_;
820 }
821
822 int counter() { return counter_; }
823};
824
825TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
826 auto listener = std::make_shared<BackgroundErrorListener>(env_);
827 Options options;
828 options.create_if_missing = true;
829 options.env = env_;
830 options.listeners.push_back(listener);
831 options.memtable_factory.reset(new SpecialSkipListFactory(1));
832 options.paranoid_checks = true;
833 DestroyAndReopen(options);
834
835 // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
836 // forge a custom one for the failed flush case.
837 rocksdb::SyncPoint::GetInstance()->LoadDependency(
838 {{"DBImpl::BGWorkFlush:done",
839 "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
840 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
841
842 env_->drop_writes_.store(true, std::memory_order_release);
843 env_->no_slowdown_ = true;
844
845 ASSERT_OK(Put("key0", "val"));
846 ASSERT_OK(Put("key1", "val"));
847 TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
848 ASSERT_EQ(1, listener->counter());
849 ASSERT_OK(Put("key2", "val"));
850 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
851 ASSERT_EQ(1, NumTableFilesAtLevel(0));
852}
853
854TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
855 auto listener = std::make_shared<BackgroundErrorListener>(env_);
856 Options options;
857 options.create_if_missing = true;
858 options.disable_auto_compactions = true;
859 options.env = env_;
860 options.level0_file_num_compaction_trigger = 2;
861 options.listeners.push_back(listener);
862 options.memtable_factory.reset(new SpecialSkipListFactory(2));
863 options.paranoid_checks = true;
864 DestroyAndReopen(options);
865
866 // third iteration triggers the second memtable's flush
867 for (int i = 0; i < 3; ++i) {
868 ASSERT_OK(Put("key0", "val"));
869 if (i > 0) {
870 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
871 }
872 ASSERT_OK(Put("key1", "val"));
873 }
874 ASSERT_EQ(2, NumTableFilesAtLevel(0));
875
876 env_->drop_writes_.store(true, std::memory_order_release);
877 env_->no_slowdown_ = true;
878 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
879 ASSERT_OK(dbfull()->TEST_WaitForCompact());
880 ASSERT_EQ(1, listener->counter());
881
882 // trigger flush so compaction is triggered again; this time it succeeds
883 // The previous failed compaction may get retried automatically, so we may
884 // be left with 0 or 1 files in level 1, depending on when the retry gets
885 // scheduled
886 ASSERT_OK(Put("key0", "val"));
887 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
888 ASSERT_OK(dbfull()->TEST_WaitForCompact());
889 ASSERT_LE(1, NumTableFilesAtLevel(0));
890}
891
494da23a
TL
892class TestFileOperationListener : public EventListener {
893 public:
894 TestFileOperationListener() {
895 file_reads_.store(0);
896 file_reads_success_.store(0);
897 file_writes_.store(0);
898 file_writes_success_.store(0);
899 }
900
901 void OnFileReadFinish(const FileOperationInfo& info) override {
902 ++file_reads_;
903 if (info.status.ok()) {
904 ++file_reads_success_;
905 }
906 ReportDuration(info);
907 }
908
909 void OnFileWriteFinish(const FileOperationInfo& info) override {
910 ++file_writes_;
911 if (info.status.ok()) {
912 ++file_writes_success_;
913 }
914 ReportDuration(info);
915 }
916
917 bool ShouldBeNotifiedOnFileIO() override { return true; }
918
919 std::atomic<size_t> file_reads_;
920 std::atomic<size_t> file_reads_success_;
921 std::atomic<size_t> file_writes_;
922 std::atomic<size_t> file_writes_success_;
923
924 private:
925 void ReportDuration(const FileOperationInfo& info) const {
926 auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(
927 info.finish_timestamp - info.start_timestamp);
928 ASSERT_GT(duration.count(), 0);
929 }
930};
931
932TEST_F(EventListenerTest, OnFileOperationTest) {
933 Options options;
934 options.env = CurrentOptions().env;
935 options.create_if_missing = true;
936
937 TestFileOperationListener* listener = new TestFileOperationListener();
938 options.listeners.emplace_back(listener);
939
940 DestroyAndReopen(options);
941 ASSERT_OK(Put("foo", "aaa"));
942 dbfull()->Flush(FlushOptions());
943 dbfull()->TEST_WaitForFlushMemTable();
944 ASSERT_GE(listener->file_writes_.load(),
945 listener->file_writes_success_.load());
946 ASSERT_GT(listener->file_writes_.load(), 0);
947 Close();
948
949 Reopen(options);
950 ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
951 ASSERT_GT(listener->file_reads_.load(), 0);
952}
953
7c673cae
FG
954} // namespace rocksdb
955
956#endif // ROCKSDB_LITE
957
958int main(int argc, char** argv) {
959 ::testing::InitGoogleTest(&argc, argv);
960 return RUN_ALL_TESTS();
961}