]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/listener_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / listener_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_index.h"
7 #include "db/db_impl/db_impl.h"
8 #include "db/db_test_util.h"
9 #include "db/dbformat.h"
10 #include "db/version_set.h"
11 #include "db/write_batch_internal.h"
12 #include "file/filename.h"
13 #include "monitoring/statistics.h"
14 #include "rocksdb/cache.h"
15 #include "rocksdb/compaction_filter.h"
16 #include "rocksdb/db.h"
17 #include "rocksdb/env.h"
18 #include "rocksdb/filter_policy.h"
19 #include "rocksdb/options.h"
20 #include "rocksdb/perf_context.h"
21 #include "rocksdb/slice.h"
22 #include "rocksdb/slice_transform.h"
23 #include "rocksdb/table.h"
24 #include "rocksdb/table_properties.h"
25 #include "test_util/sync_point.h"
26 #include "test_util/testharness.h"
27 #include "test_util/testutil.h"
28 #include "util/hash.h"
29 #include "util/mutexlock.h"
30 #include "util/rate_limiter.h"
31 #include "util/string_util.h"
32 #include "utilities/merge_operators.h"
33
34 #ifndef ROCKSDB_LITE
35
36 namespace ROCKSDB_NAMESPACE {
37
38 class EventListenerTest : public DBTestBase {
39 public:
40 EventListenerTest() : DBTestBase("listener_test", /*env_do_fsync=*/true) {}
41
42 static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
43 uint64_t size) {
44 std::string blob_index;
45 BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
46 kNoCompression);
47 return blob_index;
48 }
49
50 const size_t k110KB = 110 << 10;
51 };
52
53 struct TestPropertiesCollector
54 : public ROCKSDB_NAMESPACE::TablePropertiesCollector {
55 ROCKSDB_NAMESPACE::Status AddUserKey(
56 const ROCKSDB_NAMESPACE::Slice& /*key*/,
57 const ROCKSDB_NAMESPACE::Slice& /*value*/,
58 ROCKSDB_NAMESPACE::EntryType /*type*/,
59 ROCKSDB_NAMESPACE::SequenceNumber /*seq*/,
60 uint64_t /*file_size*/) override {
61 return Status::OK();
62 }
63 ROCKSDB_NAMESPACE::Status Finish(
64 ROCKSDB_NAMESPACE::UserCollectedProperties* properties) override {
65 properties->insert({"0", "1"});
66 return Status::OK();
67 }
68
69 const char* Name() const override { return "TestTablePropertiesCollector"; }
70
71 ROCKSDB_NAMESPACE::UserCollectedProperties GetReadableProperties()
72 const override {
73 ROCKSDB_NAMESPACE::UserCollectedProperties ret;
74 ret["2"] = "3";
75 return ret;
76 }
77 };
78
79 class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory {
80 public:
81 TablePropertiesCollector* CreateTablePropertiesCollector(
82 TablePropertiesCollectorFactory::Context /*context*/) override {
83 return new TestPropertiesCollector;
84 }
85 const char* Name() const override { return "TestTablePropertiesCollector"; }
86 };
87
88 class TestCompactionListener : public EventListener {
89 public:
90 explicit TestCompactionListener(EventListenerTest* test) : test_(test) {}
91
92 void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
93 std::lock_guard<std::mutex> lock(mutex_);
94 compacted_dbs_.push_back(db);
95 ASSERT_GT(ci.input_files.size(), 0U);
96 ASSERT_EQ(ci.input_files.size(), ci.input_file_infos.size());
97
98 for (size_t i = 0; i < ci.input_file_infos.size(); ++i) {
99 ASSERT_EQ(ci.input_file_infos[i].level, ci.base_input_level);
100 ASSERT_EQ(ci.input_file_infos[i].file_number,
101 TableFileNameToNumber(ci.input_files[i]));
102 }
103
104 ASSERT_GT(ci.output_files.size(), 0U);
105 ASSERT_EQ(ci.output_files.size(), ci.output_file_infos.size());
106
107 ASSERT_TRUE(test_);
108 ASSERT_EQ(test_->db_, db);
109
110 std::vector<std::vector<FileMetaData>> files_by_level;
111 test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[ci.cf_id],
112 &files_by_level);
113 ASSERT_GT(files_by_level.size(), ci.output_level);
114
115 for (size_t i = 0; i < ci.output_file_infos.size(); ++i) {
116 ASSERT_EQ(ci.output_file_infos[i].level, ci.output_level);
117 ASSERT_EQ(ci.output_file_infos[i].file_number,
118 TableFileNameToNumber(ci.output_files[i]));
119
120 auto it = std::find_if(
121 files_by_level[ci.output_level].begin(),
122 files_by_level[ci.output_level].end(), [&](const FileMetaData& meta) {
123 return meta.fd.GetNumber() == ci.output_file_infos[i].file_number;
124 });
125 ASSERT_NE(it, files_by_level[ci.output_level].end());
126
127 ASSERT_EQ(ci.output_file_infos[i].oldest_blob_file_number,
128 it->oldest_blob_file_number);
129 }
130
131 ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id);
132 ASSERT_GT(ci.thread_id, 0U);
133
134 for (auto fl : {ci.input_files, ci.output_files}) {
135 for (auto fn : fl) {
136 auto it = ci.table_properties.find(fn);
137 ASSERT_NE(it, ci.table_properties.end());
138 auto tp = it->second;
139 ASSERT_TRUE(tp != nullptr);
140 ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1");
141 }
142 }
143 }
144
145 EventListenerTest* test_;
146 std::vector<DB*> compacted_dbs_;
147 std::mutex mutex_;
148 };
149
150 TEST_F(EventListenerTest, OnSingleDBCompactionTest) {
151 const int kTestKeySize = 16;
152 const int kTestValueSize = 984;
153 const int kEntrySize = kTestKeySize + kTestValueSize;
154 const int kEntriesPerBuffer = 100;
155 const int kNumL0Files = 4;
156
157 Options options;
158 options.env = CurrentOptions().env;
159 options.create_if_missing = true;
160 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
161 options.compaction_style = kCompactionStyleLevel;
162 options.target_file_size_base = options.write_buffer_size;
163 options.max_bytes_for_level_base = options.target_file_size_base * 2;
164 options.max_bytes_for_level_multiplier = 2;
165 options.compression = kNoCompression;
166 #ifdef ROCKSDB_USING_THREAD_STATUS
167 options.enable_thread_tracking = true;
168 #endif // ROCKSDB_USING_THREAD_STATUS
169 options.level0_file_num_compaction_trigger = kNumL0Files;
170 options.table_properties_collector_factories.push_back(
171 std::make_shared<TestPropertiesCollectorFactory>());
172
173 TestCompactionListener* listener = new TestCompactionListener(this);
174 options.listeners.emplace_back(listener);
175 std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
176 "dobrynia", "nikitich", "alyosha",
177 "popovich"};
178 CreateAndReopenWithCF(cf_names, options);
179 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
180
181 WriteBatch batch;
182 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
183 BlobStr(123, 0, 1 << 10)));
184 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
185
186 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
187 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
188 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
189 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
190 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
191 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
192 for (int i = 1; i < 8; ++i) {
193 ASSERT_OK(Flush(i));
194 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
195 ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i],
196 nullptr, nullptr));
197 ASSERT_OK(dbfull()->TEST_WaitForCompact());
198 }
199
200 ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size());
201 for (size_t i = 0; i < cf_names.size(); ++i) {
202 ASSERT_EQ(listener->compacted_dbs_[i], db_);
203 }
204 }
205
206 // This simple Listener can only handle one flush at a time.
207 class TestFlushListener : public EventListener {
208 public:
209 TestFlushListener(Env* env, EventListenerTest* test)
210 : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
211 db_closed = false;
212 }
213
214 virtual ~TestFlushListener() {
215 prev_fc_info_.status.PermitUncheckedError(); // Ignore the status
216 }
217 void OnTableFileCreated(const TableFileCreationInfo& info) override {
218 // remember the info for later checking the FlushJobInfo.
219 prev_fc_info_ = info;
220 ASSERT_GT(info.db_name.size(), 0U);
221 ASSERT_GT(info.cf_name.size(), 0U);
222 ASSERT_GT(info.file_path.size(), 0U);
223 ASSERT_GT(info.job_id, 0);
224 ASSERT_GT(info.table_properties.data_size, 0U);
225 ASSERT_GT(info.table_properties.raw_key_size, 0U);
226 ASSERT_GT(info.table_properties.raw_value_size, 0U);
227 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
228 ASSERT_GT(info.table_properties.num_entries, 0U);
229 ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
230 ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
231
232 #ifdef ROCKSDB_USING_THREAD_STATUS
233 // Verify the id of the current thread that created this table
234 // file matches the id of any active flush or compaction thread.
235 uint64_t thread_id = env_->GetThreadID();
236 std::vector<ThreadStatus> thread_list;
237 ASSERT_OK(env_->GetThreadList(&thread_list));
238 bool found_match = false;
239 for (auto thread_status : thread_list) {
240 if (thread_status.operation_type == ThreadStatus::OP_FLUSH ||
241 thread_status.operation_type == ThreadStatus::OP_COMPACTION) {
242 if (thread_id == thread_status.thread_id) {
243 found_match = true;
244 break;
245 }
246 }
247 }
248 ASSERT_TRUE(found_match);
249 #endif // ROCKSDB_USING_THREAD_STATUS
250 }
251
252 void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
253 flushed_dbs_.push_back(db);
254 flushed_column_family_names_.push_back(info.cf_name);
255 if (info.triggered_writes_slowdown) {
256 slowdown_count++;
257 }
258 if (info.triggered_writes_stop) {
259 stop_count++;
260 }
261 // verify whether the previously created file matches the flushed file.
262 ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
263 ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
264 ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
265 ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
266 ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
267
268 // Note: the following chunk relies on the notification pertaining to the
269 // database pointed to by DBTestBase::db_, and is thus bypassed when
270 // that assumption does not hold (see the test case MultiDBMultiListeners
271 // below).
272 ASSERT_TRUE(test_);
273 if (db == test_->db_) {
274 std::vector<std::vector<FileMetaData>> files_by_level;
275 ASSERT_LT(info.cf_id, test_->handles_.size());
276 ASSERT_GE(info.cf_id, 0u);
277 ASSERT_NE(test_->handles_[info.cf_id], nullptr);
278 test_->dbfull()->TEST_GetFilesMetaData(test_->handles_[info.cf_id],
279 &files_by_level);
280
281 ASSERT_FALSE(files_by_level.empty());
282 auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
283 [&](const FileMetaData& meta) {
284 return meta.fd.GetNumber() == info.file_number;
285 });
286 ASSERT_NE(it, files_by_level[0].end());
287 ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
288 }
289
290 ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
291 ASSERT_GT(info.thread_id, 0U);
292 ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second,
293 "1");
294 }
295
296 std::vector<std::string> flushed_column_family_names_;
297 std::vector<DB*> flushed_dbs_;
298 int slowdown_count;
299 int stop_count;
300 bool db_closing;
301 std::atomic_bool db_closed;
302 TableFileCreationInfo prev_fc_info_;
303
304 protected:
305 Env* env_;
306 EventListenerTest* test_;
307 };
308
309 TEST_F(EventListenerTest, OnSingleDBFlushTest) {
310 Options options;
311 options.env = CurrentOptions().env;
312 options.write_buffer_size = k110KB;
313 #ifdef ROCKSDB_USING_THREAD_STATUS
314 options.enable_thread_tracking = true;
315 #endif // ROCKSDB_USING_THREAD_STATUS
316 TestFlushListener* listener = new TestFlushListener(options.env, this);
317 options.listeners.emplace_back(listener);
318 std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
319 "dobrynia", "nikitich", "alyosha",
320 "popovich"};
321 options.table_properties_collector_factories.push_back(
322 std::make_shared<TestPropertiesCollectorFactory>());
323 CreateAndReopenWithCF(cf_names, options);
324
325 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
326
327 WriteBatch batch;
328 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 1, "ditto",
329 BlobStr(456, 0, 1 << 10)));
330 ASSERT_OK(dbfull()->Write(WriteOptions(), &batch));
331
332 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
333 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
334 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
335 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
336 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
337 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
338 for (int i = 1; i < 8; ++i) {
339 ASSERT_OK(Flush(i));
340 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
341 // Ensure background work is fully finished including listener callbacks
342 // before accessing listener state.
343 ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
344 ASSERT_EQ(listener->flushed_dbs_.size(), i);
345 ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
346 }
347
348 // make sure callback functions are called in the right order
349 for (size_t i = 0; i < cf_names.size(); ++i) {
350 ASSERT_EQ(listener->flushed_dbs_[i], db_);
351 ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
352 }
353 }
354
355 TEST_F(EventListenerTest, MultiCF) {
356 Options options;
357 options.env = CurrentOptions().env;
358 options.write_buffer_size = k110KB;
359 #ifdef ROCKSDB_USING_THREAD_STATUS
360 options.enable_thread_tracking = true;
361 #endif // ROCKSDB_USING_THREAD_STATUS
362 for (auto atomic_flush : {false, true}) {
363 options.atomic_flush = atomic_flush;
364 options.create_if_missing = true;
365 DestroyAndReopen(options);
366 TestFlushListener* listener = new TestFlushListener(options.env, this);
367 options.listeners.emplace_back(listener);
368 options.table_properties_collector_factories.push_back(
369 std::make_shared<TestPropertiesCollectorFactory>());
370 std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
371 "dobrynia", "nikitich", "alyosha",
372 "popovich"};
373 CreateAndReopenWithCF(cf_names, options);
374
375 ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
376 ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
377 ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
378 ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
379 ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
380 ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
381 ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
382
383 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
384
385 for (int i = 1; i < 8; ++i) {
386 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
387 {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted",
388 "EventListenerTest.MultiCF:PreVerifyListener"}});
389 ASSERT_OK(Flush(i));
390 TEST_SYNC_POINT("EventListenerTest.MultiCF:PreVerifyListener");
391 ASSERT_EQ(listener->flushed_dbs_.size(), i);
392 ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
393 // make sure callback functions are called in the right order
394 if (i == 7) {
395 for (size_t j = 0; j < cf_names.size(); j++) {
396 ASSERT_EQ(listener->flushed_dbs_[j], db_);
397 ASSERT_EQ(listener->flushed_column_family_names_[j], cf_names[j]);
398 }
399 }
400 }
401
402 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
403 Close();
404 }
405 }
406
407 TEST_F(EventListenerTest, MultiDBMultiListeners) {
408 Options options;
409 options.env = CurrentOptions().env;
410 #ifdef ROCKSDB_USING_THREAD_STATUS
411 options.enable_thread_tracking = true;
412 #endif // ROCKSDB_USING_THREAD_STATUS
413 options.table_properties_collector_factories.push_back(
414 std::make_shared<TestPropertiesCollectorFactory>());
415 std::vector<TestFlushListener*> listeners;
416 const int kNumDBs = 5;
417 const int kNumListeners = 10;
418 for (int i = 0; i < kNumListeners; ++i) {
419 listeners.emplace_back(new TestFlushListener(options.env, this));
420 }
421
422 std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
423 "dobrynia", "nikitich", "alyosha",
424 "popovich"};
425
426 options.create_if_missing = true;
427 for (int i = 0; i < kNumListeners; ++i) {
428 options.listeners.emplace_back(listeners[i]);
429 }
430 DBOptions db_opts(options);
431 ColumnFamilyOptions cf_opts(options);
432
433 std::vector<DB*> dbs;
434 std::vector<std::vector<ColumnFamilyHandle*>> vec_handles;
435
436 for (int d = 0; d < kNumDBs; ++d) {
437 ASSERT_OK(DestroyDB(dbname_ + std::to_string(d), options));
438 DB* db;
439 std::vector<ColumnFamilyHandle*> handles;
440 ASSERT_OK(DB::Open(options, dbname_ + std::to_string(d), &db));
441 for (size_t c = 0; c < cf_names.size(); ++c) {
442 ColumnFamilyHandle* handle;
443 ASSERT_OK(db->CreateColumnFamily(cf_opts, cf_names[c], &handle));
444 handles.push_back(handle);
445 }
446
447 vec_handles.push_back(std::move(handles));
448 dbs.push_back(db);
449 }
450
451 for (int d = 0; d < kNumDBs; ++d) {
452 for (size_t c = 0; c < cf_names.size(); ++c) {
453 ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], cf_names[c],
454 cf_names[c]));
455 }
456 }
457
458 for (size_t c = 0; c < cf_names.size(); ++c) {
459 for (int d = 0; d < kNumDBs; ++d) {
460 ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c]));
461 ASSERT_OK(
462 static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForFlushMemTable());
463 }
464 }
465
466 for (int d = 0; d < kNumDBs; ++d) {
467 // Ensure background work is fully finished including listener callbacks
468 // before accessing listener state.
469 ASSERT_OK(
470 static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForBackgroundWork());
471 }
472
473 for (auto* listener : listeners) {
474 int pos = 0;
475 for (size_t c = 0; c < cf_names.size(); ++c) {
476 for (int d = 0; d < kNumDBs; ++d) {
477 ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]);
478 ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]);
479 pos++;
480 }
481 }
482 }
483
484 for (auto handles : vec_handles) {
485 for (auto h : handles) {
486 delete h;
487 }
488 handles.clear();
489 }
490 vec_handles.clear();
491
492 for (auto db : dbs) {
493 delete db;
494 }
495 }
496
497 TEST_F(EventListenerTest, DisableBGCompaction) {
498 Options options;
499 options.env = CurrentOptions().env;
500 #ifdef ROCKSDB_USING_THREAD_STATUS
501 options.enable_thread_tracking = true;
502 #endif // ROCKSDB_USING_THREAD_STATUS
503 TestFlushListener* listener = new TestFlushListener(options.env, this);
504 const int kCompactionTrigger = 1;
505 const int kSlowdownTrigger = 5;
506 const int kStopTrigger = 100;
507 options.level0_file_num_compaction_trigger = kCompactionTrigger;
508 options.level0_slowdown_writes_trigger = kSlowdownTrigger;
509 options.level0_stop_writes_trigger = kStopTrigger;
510 options.max_write_buffer_number = 10;
511 options.listeners.emplace_back(listener);
512 // BG compaction is disabled. Number of L0 files will simply keeps
513 // increasing in this test.
514 options.compaction_style = kCompactionStyleNone;
515 options.compression = kNoCompression;
516 options.write_buffer_size = 100000; // Small write buffer
517 options.table_properties_collector_factories.push_back(
518 std::make_shared<TestPropertiesCollectorFactory>());
519
520 CreateAndReopenWithCF({"pikachu"}, options);
521 ColumnFamilyMetaData cf_meta;
522 db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
523
524 // keep writing until writes are forced to stop.
525 for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10;
526 ++i) {
527 ASSERT_OK(
528 Put(1, std::to_string(i), std::string(10000, 'x'), WriteOptions()));
529 FlushOptions fo;
530 fo.allow_write_stall = true;
531 ASSERT_OK(db_->Flush(fo, handles_[1]));
532 db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
533 }
534 // Ensure background work is fully finished including listener callbacks
535 // before accessing listener state.
536 ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
537 ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
538 }
539
540 class TestCompactionReasonListener : public EventListener {
541 public:
542 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
543 std::lock_guard<std::mutex> lock(mutex_);
544 compaction_reasons_.push_back(ci.compaction_reason);
545 }
546
547 std::vector<CompactionReason> compaction_reasons_;
548 std::mutex mutex_;
549 };
550
551 TEST_F(EventListenerTest, CompactionReasonLevel) {
552 Options options;
553 options.env = CurrentOptions().env;
554 options.create_if_missing = true;
555 options.memtable_factory.reset(test::NewSpecialSkipListFactory(
556 DBTestBase::kNumKeysByGenerateNewRandomFile));
557
558 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
559 options.listeners.emplace_back(listener);
560
561 options.level0_file_num_compaction_trigger = 4;
562 options.compaction_style = kCompactionStyleLevel;
563
564 DestroyAndReopen(options);
565 Random rnd(301);
566
567 // Write 4 files in L0
568 for (int i = 0; i < 4; i++) {
569 GenerateNewRandomFile(&rnd);
570 }
571 ASSERT_OK(dbfull()->TEST_WaitForCompact());
572
573 ASSERT_EQ(listener->compaction_reasons_.size(), 1);
574 ASSERT_EQ(listener->compaction_reasons_[0],
575 CompactionReason::kLevelL0FilesNum);
576
577 DestroyAndReopen(options);
578
579 // Write 3 non-overlapping files in L0
580 for (int k = 1; k <= 30; k++) {
581 ASSERT_OK(Put(Key(k), Key(k)));
582 if (k % 10 == 0) {
583 Flush();
584 }
585 }
586
587 // Do a trivial move from L0 -> L1
588 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
589
590 options.max_bytes_for_level_base = 1;
591 Close();
592 listener->compaction_reasons_.clear();
593 Reopen(options);
594
595 ASSERT_OK(dbfull()->TEST_WaitForCompact());
596 ASSERT_GT(listener->compaction_reasons_.size(), 1);
597
598 for (auto compaction_reason : listener->compaction_reasons_) {
599 ASSERT_EQ(compaction_reason, CompactionReason::kLevelMaxLevelSize);
600 }
601
602 options.disable_auto_compactions = true;
603 Close();
604 listener->compaction_reasons_.clear();
605 Reopen(options);
606
607 ASSERT_OK(Put("key", "value"));
608 CompactRangeOptions cro;
609 cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
610 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
611 ASSERT_GT(listener->compaction_reasons_.size(), 0);
612 for (auto compaction_reason : listener->compaction_reasons_) {
613 ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
614 }
615 }
616
617 TEST_F(EventListenerTest, CompactionReasonUniversal) {
618 Options options;
619 options.env = CurrentOptions().env;
620 options.create_if_missing = true;
621 options.memtable_factory.reset(test::NewSpecialSkipListFactory(
622 DBTestBase::kNumKeysByGenerateNewRandomFile));
623
624 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
625 options.listeners.emplace_back(listener);
626
627 options.compaction_style = kCompactionStyleUniversal;
628
629 Random rnd(301);
630
631 options.level0_file_num_compaction_trigger = 8;
632 options.compaction_options_universal.max_size_amplification_percent = 100000;
633 options.compaction_options_universal.size_ratio = 100000;
634 DestroyAndReopen(options);
635 listener->compaction_reasons_.clear();
636
637 // Write 8 files in L0
638 for (int i = 0; i < 8; i++) {
639 GenerateNewRandomFile(&rnd);
640 }
641 ASSERT_OK(dbfull()->TEST_WaitForCompact());
642
643 ASSERT_GT(listener->compaction_reasons_.size(), 0);
644 for (auto compaction_reason : listener->compaction_reasons_) {
645 ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeRatio);
646 }
647
648 options.level0_file_num_compaction_trigger = 8;
649 options.compaction_options_universal.max_size_amplification_percent = 1;
650 options.compaction_options_universal.size_ratio = 100000;
651
652 DestroyAndReopen(options);
653 listener->compaction_reasons_.clear();
654
655 // Write 8 files in L0
656 for (int i = 0; i < 8; i++) {
657 GenerateNewRandomFile(&rnd);
658 }
659 ASSERT_OK(dbfull()->TEST_WaitForCompact());
660
661 ASSERT_GT(listener->compaction_reasons_.size(), 0);
662 for (auto compaction_reason : listener->compaction_reasons_) {
663 ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification);
664 }
665
666 options.disable_auto_compactions = true;
667 Close();
668 listener->compaction_reasons_.clear();
669 Reopen(options);
670
671 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
672
673 ASSERT_GT(listener->compaction_reasons_.size(), 0);
674 for (auto compaction_reason : listener->compaction_reasons_) {
675 ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction);
676 }
677 }
678
679 TEST_F(EventListenerTest, CompactionReasonFIFO) {
680 Options options;
681 options.env = CurrentOptions().env;
682 options.create_if_missing = true;
683 options.memtable_factory.reset(test::NewSpecialSkipListFactory(
684 DBTestBase::kNumKeysByGenerateNewRandomFile));
685
686 TestCompactionReasonListener* listener = new TestCompactionReasonListener();
687 options.listeners.emplace_back(listener);
688
689 options.level0_file_num_compaction_trigger = 4;
690 options.compaction_style = kCompactionStyleFIFO;
691 options.compaction_options_fifo.max_table_files_size = 1;
692
693 DestroyAndReopen(options);
694 Random rnd(301);
695
696 // Write 4 files in L0
697 for (int i = 0; i < 4; i++) {
698 GenerateNewRandomFile(&rnd);
699 }
700 ASSERT_OK(dbfull()->TEST_WaitForCompact());
701
702 ASSERT_GT(listener->compaction_reasons_.size(), 0);
703 for (auto compaction_reason : listener->compaction_reasons_) {
704 ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize);
705 }
706 }
707
708 class TableFileCreationListener : public EventListener {
709 public:
710 class TestEnv : public EnvWrapper {
711 public:
712 explicit TestEnv(Env* t) : EnvWrapper(t) {}
713 static const char* kClassName() { return "TestEnv"; }
714 const char* Name() const override { return kClassName(); }
715
716 void SetStatus(Status s) { status_ = s; }
717
718 Status NewWritableFile(const std::string& fname,
719 std::unique_ptr<WritableFile>* result,
720 const EnvOptions& options) override {
721 if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") {
722 if (!status_.ok()) {
723 return status_;
724 }
725 }
726 return target()->NewWritableFile(fname, result, options);
727 }
728
729 private:
730 Status status_;
731 };
732
733 TableFileCreationListener() {
734 for (int i = 0; i < 2; i++) {
735 started_[i] = finished_[i] = failure_[i] = 0;
736 }
737 }
738
739 int Index(TableFileCreationReason reason) {
740 int idx;
741 switch (reason) {
742 case TableFileCreationReason::kFlush:
743 idx = 0;
744 break;
745 case TableFileCreationReason::kCompaction:
746 idx = 1;
747 break;
748 default:
749 idx = -1;
750 }
751 return idx;
752 }
753
754 void CheckAndResetCounters(int flush_started, int flush_finished,
755 int flush_failure, int compaction_started,
756 int compaction_finished, int compaction_failure) {
757 ASSERT_EQ(started_[0], flush_started);
758 ASSERT_EQ(finished_[0], flush_finished);
759 ASSERT_EQ(failure_[0], flush_failure);
760 ASSERT_EQ(started_[1], compaction_started);
761 ASSERT_EQ(finished_[1], compaction_finished);
762 ASSERT_EQ(failure_[1], compaction_failure);
763 for (int i = 0; i < 2; i++) {
764 started_[i] = finished_[i] = failure_[i] = 0;
765 }
766 }
767
768 void OnTableFileCreationStarted(
769 const TableFileCreationBriefInfo& info) override {
770 int idx = Index(info.reason);
771 if (idx >= 0) {
772 started_[idx]++;
773 }
774 ASSERT_GT(info.db_name.size(), 0U);
775 ASSERT_GT(info.cf_name.size(), 0U);
776 ASSERT_GT(info.file_path.size(), 0U);
777 ASSERT_GT(info.job_id, 0);
778 }
779
780 void OnTableFileCreated(const TableFileCreationInfo& info) override {
781 int idx = Index(info.reason);
782 if (idx >= 0) {
783 finished_[idx]++;
784 }
785 ASSERT_GT(info.db_name.size(), 0U);
786 ASSERT_GT(info.cf_name.size(), 0U);
787 ASSERT_GT(info.file_path.size(), 0U);
788 ASSERT_GT(info.job_id, 0);
789 ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
790 ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
791 if (info.status.ok()) {
792 if (info.table_properties.num_range_deletions == 0U) {
793 ASSERT_GT(info.table_properties.data_size, 0U);
794 ASSERT_GT(info.table_properties.raw_key_size, 0U);
795 ASSERT_GT(info.table_properties.raw_value_size, 0U);
796 ASSERT_GT(info.table_properties.num_data_blocks, 0U);
797 ASSERT_GT(info.table_properties.num_entries, 0U);
798 }
799 } else {
800 if (idx >= 0) {
801 failure_[idx]++;
802 last_failure_ = info.status;
803 }
804 }
805 }
806
807 int started_[2];
808 int finished_[2];
809 int failure_[2];
810 Status last_failure_;
811 };
812
813 TEST_F(EventListenerTest, TableFileCreationListenersTest) {
814 auto listener = std::make_shared<TableFileCreationListener>();
815 Options options;
816 std::unique_ptr<TableFileCreationListener::TestEnv> test_env(
817 new TableFileCreationListener::TestEnv(CurrentOptions().env));
818 options.create_if_missing = true;
819 options.listeners.push_back(listener);
820 options.env = test_env.get();
821 DestroyAndReopen(options);
822
823 ASSERT_OK(Put("foo", "aaa"));
824 ASSERT_OK(Put("bar", "bbb"));
825 ASSERT_OK(Flush());
826 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
827 listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
828 ASSERT_OK(Put("foo", "aaa1"));
829 ASSERT_OK(Put("bar", "bbb1"));
830 test_env->SetStatus(Status::NotSupported("not supported"));
831 ASSERT_NOK(Flush());
832 listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
833 ASSERT_TRUE(listener->last_failure_.IsNotSupported());
834 test_env->SetStatus(Status::OK());
835
836 Reopen(options);
837 ASSERT_OK(Put("foo", "aaa2"));
838 ASSERT_OK(Put("bar", "bbb2"));
839 ASSERT_OK(Flush());
840 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
841 listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0);
842
843 const Slice kRangeStart = "a";
844 const Slice kRangeEnd = "z";
845 ASSERT_OK(
846 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
847 ASSERT_OK(dbfull()->TEST_WaitForCompact());
848 listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0);
849
850 ASSERT_OK(Put("foo", "aaa3"));
851 ASSERT_OK(Put("bar", "bbb3"));
852 ASSERT_OK(Flush());
853 test_env->SetStatus(Status::NotSupported("not supported"));
854 ASSERT_NOK(
855 dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd));
856 ASSERT_NOK(dbfull()->TEST_WaitForCompact());
857 listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1);
858 ASSERT_TRUE(listener->last_failure_.IsNotSupported());
859
860 // Reset
861 test_env->SetStatus(Status::OK());
862 DestroyAndReopen(options);
863
864 // Verify that an empty table file that is immediately deleted gives Aborted
865 // status to listener.
866 ASSERT_OK(Put("baz", "z"));
867 ASSERT_OK(SingleDelete("baz"));
868 ASSERT_OK(Flush());
869 listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0);
870 ASSERT_TRUE(listener->last_failure_.IsAborted());
871
872 // Also in compaction
873 ASSERT_OK(Put("baz", "z"));
874 ASSERT_OK(Flush());
875 ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
876 kRangeStart, kRangeEnd));
877 ASSERT_OK(Flush());
878 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
879 ASSERT_OK(dbfull()->TEST_WaitForCompact());
880 listener->CheckAndResetCounters(2, 2, 0, 1, 1, 1);
881 ASSERT_TRUE(listener->last_failure_.IsAborted());
882
883 Close(); // Avoid UAF on listener
884 }
885
886 class MemTableSealedListener : public EventListener {
887 private:
888 SequenceNumber latest_seq_number_;
889
890 public:
891 MemTableSealedListener() {}
892 void OnMemTableSealed(const MemTableInfo& info) override {
893 latest_seq_number_ = info.first_seqno;
894 }
895
896 void OnFlushCompleted(DB* /*db*/,
897 const FlushJobInfo& flush_job_info) override {
898 ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_);
899 }
900 };
901
902 TEST_F(EventListenerTest, MemTableSealedListenerTest) {
903 auto listener = std::make_shared<MemTableSealedListener>();
904 Options options;
905 options.env = CurrentOptions().env;
906 options.create_if_missing = true;
907 options.listeners.push_back(listener);
908 DestroyAndReopen(options);
909
910 for (unsigned int i = 0; i < 10; i++) {
911 std::string tag = std::to_string(i);
912 ASSERT_OK(Put("foo" + tag, "aaa"));
913 ASSERT_OK(Put("bar" + tag, "bbb"));
914
915 ASSERT_OK(Flush());
916 }
917 }
918
919 class ColumnFamilyHandleDeletionStartedListener : public EventListener {
920 private:
921 std::vector<std::string> cfs_;
922 int counter;
923
924 public:
925 explicit ColumnFamilyHandleDeletionStartedListener(
926 const std::vector<std::string>& cfs)
927 : cfs_(cfs), counter(0) {
928 cfs_.insert(cfs_.begin(), kDefaultColumnFamilyName);
929 }
930 void OnColumnFamilyHandleDeletionStarted(
931 ColumnFamilyHandle* handle) override {
932 ASSERT_EQ(cfs_[handle->GetID()], handle->GetName());
933 counter++;
934 }
935 int getCounter() { return counter; }
936 };
937
938 TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) {
939 std::vector<std::string> cfs{"pikachu", "eevee", "Mewtwo"};
940 auto listener =
941 std::make_shared<ColumnFamilyHandleDeletionStartedListener>(cfs);
942 Options options;
943 options.env = CurrentOptions().env;
944 options.create_if_missing = true;
945 options.listeners.push_back(listener);
946 CreateAndReopenWithCF(cfs, options);
947 ASSERT_EQ(handles_.size(), 4);
948 delete handles_[3];
949 delete handles_[2];
950 delete handles_[1];
951 handles_.resize(1);
952 ASSERT_EQ(listener->getCounter(), 3);
953 }
954
955 class BackgroundErrorListener : public EventListener {
956 private:
957 SpecialEnv* env_;
958 int counter_;
959
960 public:
961 BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {}
962
963 void OnBackgroundError(BackgroundErrorReason /*reason*/,
964 Status* bg_error) override {
965 if (counter_ == 0) {
966 // suppress the first error and disable write-dropping such that a retry
967 // can succeed.
968 *bg_error = Status::OK();
969 env_->drop_writes_.store(false, std::memory_order_release);
970 env_->SetMockSleep(false);
971 }
972 ++counter_;
973 }
974
975 int counter() { return counter_; }
976 };
977
978 TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
979 auto listener = std::make_shared<BackgroundErrorListener>(env_);
980 Options options;
981 options.create_if_missing = true;
982 options.env = env_;
983 options.listeners.push_back(listener);
984 options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
985 options.paranoid_checks = true;
986 DestroyAndReopen(options);
987
988 // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
989 // forge a custom one for the failed flush case.
990 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
991 {{"DBImpl::BGWorkFlush:done",
992 "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
993 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
994
995 env_->drop_writes_.store(true, std::memory_order_release);
996 env_->SetMockSleep();
997
998 ASSERT_OK(Put("key0", "val"));
999 ASSERT_OK(Put("key1", "val"));
1000 TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
1001 ASSERT_EQ(1, listener->counter());
1002 ASSERT_OK(Put("key2", "val"));
1003 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1004 ASSERT_EQ(1, NumTableFilesAtLevel(0));
1005 }
1006
1007 TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
1008 auto listener = std::make_shared<BackgroundErrorListener>(env_);
1009 Options options;
1010 options.create_if_missing = true;
1011 options.disable_auto_compactions = true;
1012 options.env = env_;
1013 options.level0_file_num_compaction_trigger = 2;
1014 options.listeners.push_back(listener);
1015 options.memtable_factory.reset(test::NewSpecialSkipListFactory(2));
1016 options.paranoid_checks = true;
1017 DestroyAndReopen(options);
1018
1019 // third iteration triggers the second memtable's flush
1020 for (int i = 0; i < 3; ++i) {
1021 ASSERT_OK(Put("key0", "val"));
1022 if (i > 0) {
1023 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1024 }
1025 ASSERT_OK(Put("key1", "val"));
1026 }
1027 ASSERT_EQ(2, NumTableFilesAtLevel(0));
1028
1029 env_->drop_writes_.store(true, std::memory_order_release);
1030 env_->SetMockSleep();
1031 ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
1032 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1033 ASSERT_EQ(1, listener->counter());
1034
1035 // trigger flush so compaction is triggered again; this time it succeeds
1036 // The previous failed compaction may get retried automatically, so we may
1037 // be left with 0 or 1 files in level 1, depending on when the retry gets
1038 // scheduled
1039 ASSERT_OK(Put("key0", "val"));
1040 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1041 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1042 ASSERT_LE(1, NumTableFilesAtLevel(0));
1043 }
1044
1045 class TestFileOperationListener : public EventListener {
1046 public:
1047 TestFileOperationListener() {
1048 file_reads_.store(0);
1049 file_reads_success_.store(0);
1050 file_writes_.store(0);
1051 file_writes_success_.store(0);
1052 file_flushes_.store(0);
1053 file_flushes_success_.store(0);
1054 file_closes_.store(0);
1055 file_closes_success_.store(0);
1056 file_syncs_.store(0);
1057 file_syncs_success_.store(0);
1058 file_truncates_.store(0);
1059 file_truncates_success_.store(0);
1060 file_seq_reads_.store(0);
1061 blob_file_reads_.store(0);
1062 blob_file_writes_.store(0);
1063 blob_file_flushes_.store(0);
1064 blob_file_closes_.store(0);
1065 blob_file_syncs_.store(0);
1066 blob_file_truncates_.store(0);
1067 }
1068
1069 void OnFileReadFinish(const FileOperationInfo& info) override {
1070 ++file_reads_;
1071 if (info.status.ok()) {
1072 ++file_reads_success_;
1073 }
1074 if (info.path.find("MANIFEST") != std::string::npos) {
1075 ++file_seq_reads_;
1076 }
1077 if (EndsWith(info.path, ".blob")) {
1078 ++blob_file_reads_;
1079 }
1080 ReportDuration(info);
1081 }
1082
1083 void OnFileWriteFinish(const FileOperationInfo& info) override {
1084 ++file_writes_;
1085 if (info.status.ok()) {
1086 ++file_writes_success_;
1087 }
1088 if (EndsWith(info.path, ".blob")) {
1089 ++blob_file_writes_;
1090 }
1091 ReportDuration(info);
1092 }
1093
1094 void OnFileFlushFinish(const FileOperationInfo& info) override {
1095 ++file_flushes_;
1096 if (info.status.ok()) {
1097 ++file_flushes_success_;
1098 }
1099 if (EndsWith(info.path, ".blob")) {
1100 ++blob_file_flushes_;
1101 }
1102 ReportDuration(info);
1103 }
1104
1105 void OnFileCloseFinish(const FileOperationInfo& info) override {
1106 ++file_closes_;
1107 if (info.status.ok()) {
1108 ++file_closes_success_;
1109 }
1110 if (EndsWith(info.path, ".blob")) {
1111 ++blob_file_closes_;
1112 }
1113 ReportDuration(info);
1114 }
1115
1116 void OnFileSyncFinish(const FileOperationInfo& info) override {
1117 ++file_syncs_;
1118 if (info.status.ok()) {
1119 ++file_syncs_success_;
1120 }
1121 if (EndsWith(info.path, ".blob")) {
1122 ++blob_file_syncs_;
1123 }
1124 ReportDuration(info);
1125 }
1126
1127 void OnFileTruncateFinish(const FileOperationInfo& info) override {
1128 ++file_truncates_;
1129 if (info.status.ok()) {
1130 ++file_truncates_success_;
1131 }
1132 if (EndsWith(info.path, ".blob")) {
1133 ++blob_file_truncates_;
1134 }
1135 ReportDuration(info);
1136 }
1137
1138 bool ShouldBeNotifiedOnFileIO() override { return true; }
1139
1140 std::atomic<size_t> file_reads_;
1141 std::atomic<size_t> file_reads_success_;
1142 std::atomic<size_t> file_writes_;
1143 std::atomic<size_t> file_writes_success_;
1144 std::atomic<size_t> file_flushes_;
1145 std::atomic<size_t> file_flushes_success_;
1146 std::atomic<size_t> file_closes_;
1147 std::atomic<size_t> file_closes_success_;
1148 std::atomic<size_t> file_syncs_;
1149 std::atomic<size_t> file_syncs_success_;
1150 std::atomic<size_t> file_truncates_;
1151 std::atomic<size_t> file_truncates_success_;
1152 std::atomic<size_t> file_seq_reads_;
1153 std::atomic<size_t> blob_file_reads_;
1154 std::atomic<size_t> blob_file_writes_;
1155 std::atomic<size_t> blob_file_flushes_;
1156 std::atomic<size_t> blob_file_closes_;
1157 std::atomic<size_t> blob_file_syncs_;
1158 std::atomic<size_t> blob_file_truncates_;
1159
1160 private:
1161 void ReportDuration(const FileOperationInfo& info) const {
1162 ASSERT_GT(info.duration.count(), 0);
1163 }
1164 };
1165
1166 TEST_F(EventListenerTest, OnFileOperationTest) {
1167 Options options;
1168 options.env = CurrentOptions().env;
1169 options.create_if_missing = true;
1170
1171 TestFileOperationListener* listener = new TestFileOperationListener();
1172 options.listeners.emplace_back(listener);
1173
1174 options.use_direct_io_for_flush_and_compaction = false;
1175 Status s = TryReopen(options);
1176 if (s.IsInvalidArgument()) {
1177 options.use_direct_io_for_flush_and_compaction = false;
1178 } else {
1179 ASSERT_OK(s);
1180 }
1181 DestroyAndReopen(options);
1182 ASSERT_OK(Put("foo", "aaa"));
1183 ASSERT_OK(dbfull()->Flush(FlushOptions()));
1184 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
1185 ASSERT_GE(listener->file_writes_.load(),
1186 listener->file_writes_success_.load());
1187 ASSERT_GT(listener->file_writes_.load(), 0);
1188 ASSERT_GE(listener->file_flushes_.load(),
1189 listener->file_flushes_success_.load());
1190 ASSERT_GT(listener->file_flushes_.load(), 0);
1191 Close();
1192
1193 Reopen(options);
1194 ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load());
1195 ASSERT_GT(listener->file_reads_.load(), 0);
1196 ASSERT_GE(listener->file_closes_.load(),
1197 listener->file_closes_success_.load());
1198 ASSERT_GT(listener->file_closes_.load(), 0);
1199 ASSERT_GE(listener->file_syncs_.load(), listener->file_syncs_success_.load());
1200 ASSERT_GT(listener->file_syncs_.load(), 0);
1201 if (true == options.use_direct_io_for_flush_and_compaction) {
1202 ASSERT_GE(listener->file_truncates_.load(),
1203 listener->file_truncates_success_.load());
1204 ASSERT_GT(listener->file_truncates_.load(), 0);
1205 }
1206 }
1207
1208 TEST_F(EventListenerTest, OnBlobFileOperationTest) {
1209 Options options;
1210 options.env = CurrentOptions().env;
1211 options.create_if_missing = true;
1212 TestFileOperationListener* listener = new TestFileOperationListener();
1213 options.listeners.emplace_back(listener);
1214 options.disable_auto_compactions = true;
1215 options.enable_blob_files = true;
1216 options.min_blob_size = 0;
1217 options.enable_blob_garbage_collection = true;
1218 options.blob_garbage_collection_age_cutoff = 0.5;
1219
1220 DestroyAndReopen(options);
1221
1222 ASSERT_OK(Put("Key1", "blob_value1"));
1223 ASSERT_OK(Put("Key2", "blob_value2"));
1224 ASSERT_OK(Put("Key3", "blob_value3"));
1225 ASSERT_OK(Put("Key4", "blob_value4"));
1226 ASSERT_OK(Flush());
1227
1228 ASSERT_OK(Put("Key3", "new_blob_value3"));
1229 ASSERT_OK(Put("Key4", "new_blob_value4"));
1230 ASSERT_OK(Flush());
1231
1232 ASSERT_OK(Put("Key5", "blob_value5"));
1233 ASSERT_OK(Put("Key6", "blob_value6"));
1234 ASSERT_OK(Flush());
1235
1236 ASSERT_GT(listener->blob_file_writes_.load(), 0U);
1237 ASSERT_GT(listener->blob_file_flushes_.load(), 0U);
1238 Close();
1239
1240 Reopen(options);
1241 ASSERT_GT(listener->blob_file_closes_.load(), 0U);
1242 ASSERT_GT(listener->blob_file_syncs_.load(), 0U);
1243 if (true == options.use_direct_io_for_flush_and_compaction) {
1244 ASSERT_GT(listener->blob_file_truncates_.load(), 0U);
1245 }
1246 }
1247
1248 TEST_F(EventListenerTest, ReadManifestAndWALOnRecovery) {
1249 Options options;
1250 options.env = CurrentOptions().env;
1251 options.create_if_missing = true;
1252
1253 TestFileOperationListener* listener = new TestFileOperationListener();
1254 options.listeners.emplace_back(listener);
1255
1256 options.use_direct_io_for_flush_and_compaction = false;
1257 Status s = TryReopen(options);
1258 if (s.IsInvalidArgument()) {
1259 options.use_direct_io_for_flush_and_compaction = false;
1260 } else {
1261 ASSERT_OK(s);
1262 }
1263 DestroyAndReopen(options);
1264 ASSERT_OK(Put("foo", "aaa"));
1265 Close();
1266
1267 size_t seq_reads = listener->file_seq_reads_.load();
1268 Reopen(options);
1269 ASSERT_GT(listener->file_seq_reads_.load(), seq_reads);
1270 }
1271
1272 class BlobDBJobLevelEventListenerTest : public EventListener {
1273 public:
1274 explicit BlobDBJobLevelEventListenerTest(EventListenerTest* test)
1275 : test_(test), call_count_(0) {}
1276
1277 const VersionStorageInfo* GetVersionStorageInfo() const {
1278 VersionSet* const versions = test_->dbfull()->GetVersionSet();
1279 assert(versions);
1280
1281 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
1282 EXPECT_NE(cfd, nullptr);
1283
1284 Version* const current = cfd->current();
1285 EXPECT_NE(current, nullptr);
1286
1287 const VersionStorageInfo* const storage_info = current->storage_info();
1288 EXPECT_NE(storage_info, nullptr);
1289
1290 return storage_info;
1291 }
1292
1293 void CheckBlobFileAdditions(
1294 const std::vector<BlobFileAdditionInfo>& blob_file_addition_infos) const {
1295 const auto* vstorage = GetVersionStorageInfo();
1296
1297 EXPECT_FALSE(blob_file_addition_infos.empty());
1298
1299 for (const auto& blob_file_addition_info : blob_file_addition_infos) {
1300 const auto meta = vstorage->GetBlobFileMetaData(
1301 blob_file_addition_info.blob_file_number);
1302
1303 EXPECT_NE(meta, nullptr);
1304 EXPECT_EQ(meta->GetBlobFileNumber(),
1305 blob_file_addition_info.blob_file_number);
1306 EXPECT_EQ(meta->GetTotalBlobBytes(),
1307 blob_file_addition_info.total_blob_bytes);
1308 EXPECT_EQ(meta->GetTotalBlobCount(),
1309 blob_file_addition_info.total_blob_count);
1310 EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
1311 }
1312 }
1313
1314 std::vector<std::string> GetFlushedFiles() {
1315 std::lock_guard<std::mutex> lock(mutex_);
1316 std::vector<std::string> result;
1317 for (const auto& fname : flushed_files_) {
1318 result.push_back(fname);
1319 }
1320 return result;
1321 }
1322
1323 void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
1324 call_count_++;
1325
1326 {
1327 std::lock_guard<std::mutex> lock(mutex_);
1328 flushed_files_.push_back(info.file_path);
1329 }
1330
1331 EXPECT_EQ(info.blob_compression_type, kNoCompression);
1332
1333 CheckBlobFileAdditions(info.blob_file_addition_infos);
1334 }
1335
1336 void OnCompactionCompleted(DB* /*db*/,
1337 const CompactionJobInfo& info) override {
1338 call_count_++;
1339
1340 EXPECT_EQ(info.blob_compression_type, kNoCompression);
1341
1342 CheckBlobFileAdditions(info.blob_file_addition_infos);
1343
1344 EXPECT_FALSE(info.blob_file_garbage_infos.empty());
1345
1346 for (const auto& blob_file_garbage_info : info.blob_file_garbage_infos) {
1347 EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
1348 EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
1349 EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
1350 EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
1351 }
1352 }
1353
1354 EventListenerTest* test_;
1355 uint32_t call_count_;
1356
1357 private:
1358 std::vector<std::string> flushed_files_;
1359 std::mutex mutex_;
1360 };
1361
1362 // Test OnFlushCompleted EventListener called for blob files
1363 TEST_F(EventListenerTest, BlobDBOnFlushCompleted) {
1364 Options options;
1365 options.env = CurrentOptions().env;
1366 options.enable_blob_files = true;
1367 options.create_if_missing = true;
1368 options.disable_auto_compactions = true;
1369
1370 options.min_blob_size = 0;
1371 BlobDBJobLevelEventListenerTest* blob_event_listener =
1372 new BlobDBJobLevelEventListenerTest(this);
1373 options.listeners.emplace_back(blob_event_listener);
1374
1375 DestroyAndReopen(options);
1376
1377 ASSERT_OK(Put("Key1", "blob_value1"));
1378 ASSERT_OK(Put("Key2", "blob_value2"));
1379 ASSERT_OK(Flush());
1380
1381 ASSERT_OK(Put("Key3", "blob_value3"));
1382 ASSERT_OK(Flush());
1383
1384 ASSERT_EQ(Get("Key1"), "blob_value1");
1385 ASSERT_EQ(Get("Key2"), "blob_value2");
1386 ASSERT_EQ(Get("Key3"), "blob_value3");
1387
1388 ASSERT_GT(blob_event_listener->call_count_, 0U);
1389 }
1390
1391 // Test OnCompactionCompleted EventListener called for blob files
1392 TEST_F(EventListenerTest, BlobDBOnCompactionCompleted) {
1393 Options options;
1394 options.env = CurrentOptions().env;
1395 options.enable_blob_files = true;
1396 options.create_if_missing = true;
1397 options.disable_auto_compactions = true;
1398 options.min_blob_size = 0;
1399 BlobDBJobLevelEventListenerTest* blob_event_listener =
1400 new BlobDBJobLevelEventListenerTest(this);
1401 options.listeners.emplace_back(blob_event_listener);
1402
1403 options.enable_blob_garbage_collection = true;
1404 options.blob_garbage_collection_age_cutoff = 0.5;
1405
1406 DestroyAndReopen(options);
1407
1408 ASSERT_OK(Put("Key1", "blob_value1"));
1409 ASSERT_OK(Put("Key2", "blob_value2"));
1410 ASSERT_OK(Put("Key3", "blob_value3"));
1411 ASSERT_OK(Put("Key4", "blob_value4"));
1412 ASSERT_OK(Flush());
1413
1414 ASSERT_OK(Put("Key3", "new_blob_value3"));
1415 ASSERT_OK(Put("Key4", "new_blob_value4"));
1416 ASSERT_OK(Flush());
1417
1418 ASSERT_OK(Put("Key5", "blob_value5"));
1419 ASSERT_OK(Put("Key6", "blob_value6"));
1420 ASSERT_OK(Flush());
1421
1422 blob_event_listener->call_count_ = 0;
1423 constexpr Slice* begin = nullptr;
1424 constexpr Slice* end = nullptr;
1425
1426 // On compaction, because of blob_garbage_collection_age_cutoff, it will
1427 // delete the oldest blob file and create new blob file during compaction.
1428 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
1429
1430 // Make sure, OnCompactionCompleted is called.
1431 ASSERT_GT(blob_event_listener->call_count_, 0U);
1432 }
1433
1434 // Test CompactFiles calls OnCompactionCompleted EventListener for blob files
1435 // and populate the blob files info.
1436 TEST_F(EventListenerTest, BlobDBCompactFiles) {
1437 Options options;
1438 options.env = CurrentOptions().env;
1439 options.enable_blob_files = true;
1440 options.create_if_missing = true;
1441 options.disable_auto_compactions = true;
1442 options.min_blob_size = 0;
1443 options.enable_blob_garbage_collection = true;
1444 options.blob_garbage_collection_age_cutoff = 0.5;
1445
1446 BlobDBJobLevelEventListenerTest* blob_event_listener =
1447 new BlobDBJobLevelEventListenerTest(this);
1448 options.listeners.emplace_back(blob_event_listener);
1449
1450 DestroyAndReopen(options);
1451
1452 ASSERT_OK(Put("Key1", "blob_value1"));
1453 ASSERT_OK(Put("Key2", "blob_value2"));
1454 ASSERT_OK(Put("Key3", "blob_value3"));
1455 ASSERT_OK(Put("Key4", "blob_value4"));
1456 ASSERT_OK(Flush());
1457
1458 ASSERT_OK(Put("Key3", "new_blob_value3"));
1459 ASSERT_OK(Put("Key4", "new_blob_value4"));
1460 ASSERT_OK(Flush());
1461
1462 ASSERT_OK(Put("Key5", "blob_value5"));
1463 ASSERT_OK(Put("Key6", "blob_value6"));
1464 ASSERT_OK(Flush());
1465
1466 std::vector<std::string> output_file_names;
1467 CompactionJobInfo compaction_job_info;
1468
1469 // On compaction, because of blob_garbage_collection_age_cutoff, it will
1470 // delete the oldest blob file and create new blob file during compaction
1471 // which will be populated in output_files_names.
1472 ASSERT_OK(dbfull()->CompactFiles(
1473 CompactionOptions(), blob_event_listener->GetFlushedFiles(), 1, -1,
1474 &output_file_names, &compaction_job_info));
1475
1476 bool is_blob_in_output = false;
1477 for (const auto& file : output_file_names) {
1478 if (EndsWith(file, ".blob")) {
1479 is_blob_in_output = true;
1480 }
1481 }
1482 ASSERT_TRUE(is_blob_in_output);
1483
1484 for (const auto& blob_file_addition_info :
1485 compaction_job_info.blob_file_addition_infos) {
1486 EXPECT_GT(blob_file_addition_info.blob_file_number, 0U);
1487 EXPECT_GT(blob_file_addition_info.total_blob_bytes, 0U);
1488 EXPECT_GT(blob_file_addition_info.total_blob_count, 0U);
1489 EXPECT_FALSE(blob_file_addition_info.blob_file_path.empty());
1490 }
1491
1492 for (const auto& blob_file_garbage_info :
1493 compaction_job_info.blob_file_garbage_infos) {
1494 EXPECT_GT(blob_file_garbage_info.blob_file_number, 0U);
1495 EXPECT_GT(blob_file_garbage_info.garbage_blob_count, 0U);
1496 EXPECT_GT(blob_file_garbage_info.garbage_blob_bytes, 0U);
1497 EXPECT_FALSE(blob_file_garbage_info.blob_file_path.empty());
1498 }
1499 }
1500
1501 class BlobDBFileLevelEventListener : public EventListener {
1502 public:
1503 void OnBlobFileCreationStarted(
1504 const BlobFileCreationBriefInfo& info) override {
1505 files_started_++;
1506 EXPECT_FALSE(info.db_name.empty());
1507 EXPECT_FALSE(info.cf_name.empty());
1508 EXPECT_FALSE(info.file_path.empty());
1509 EXPECT_GT(info.job_id, 0);
1510 }
1511
1512 void OnBlobFileCreated(const BlobFileCreationInfo& info) override {
1513 files_created_++;
1514 EXPECT_FALSE(info.db_name.empty());
1515 EXPECT_FALSE(info.cf_name.empty());
1516 EXPECT_FALSE(info.file_path.empty());
1517 EXPECT_GT(info.job_id, 0);
1518 EXPECT_GT(info.total_blob_count, 0U);
1519 EXPECT_GT(info.total_blob_bytes, 0U);
1520 EXPECT_EQ(info.file_checksum, kUnknownFileChecksum);
1521 EXPECT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
1522 EXPECT_TRUE(info.status.ok());
1523 }
1524
1525 void OnBlobFileDeleted(const BlobFileDeletionInfo& info) override {
1526 files_deleted_++;
1527 EXPECT_FALSE(info.db_name.empty());
1528 EXPECT_FALSE(info.file_path.empty());
1529 EXPECT_GT(info.job_id, 0);
1530 EXPECT_TRUE(info.status.ok());
1531 }
1532
1533 void CheckCounters() {
1534 EXPECT_EQ(files_started_, files_created_);
1535 EXPECT_GT(files_started_, 0U);
1536 EXPECT_GT(files_deleted_, 0U);
1537 EXPECT_LT(files_deleted_, files_created_);
1538 }
1539
1540 private:
1541 std::atomic<uint32_t> files_started_{};
1542 std::atomic<uint32_t> files_created_{};
1543 std::atomic<uint32_t> files_deleted_{};
1544 };
1545
1546 TEST_F(EventListenerTest, BlobDBFileTest) {
1547 Options options;
1548 options.env = CurrentOptions().env;
1549 options.enable_blob_files = true;
1550 options.create_if_missing = true;
1551 options.disable_auto_compactions = true;
1552 options.min_blob_size = 0;
1553 options.enable_blob_garbage_collection = true;
1554 options.blob_garbage_collection_age_cutoff = 0.5;
1555
1556 BlobDBFileLevelEventListener* blob_event_listener =
1557 new BlobDBFileLevelEventListener();
1558 options.listeners.emplace_back(blob_event_listener);
1559
1560 DestroyAndReopen(options);
1561
1562 ASSERT_OK(Put("Key1", "blob_value1"));
1563 ASSERT_OK(Put("Key2", "blob_value2"));
1564 ASSERT_OK(Put("Key3", "blob_value3"));
1565 ASSERT_OK(Put("Key4", "blob_value4"));
1566 ASSERT_OK(Flush());
1567
1568 ASSERT_OK(Put("Key3", "new_blob_value3"));
1569 ASSERT_OK(Put("Key4", "new_blob_value4"));
1570 ASSERT_OK(Flush());
1571
1572 ASSERT_OK(Put("Key5", "blob_value5"));
1573 ASSERT_OK(Put("Key6", "blob_value6"));
1574 ASSERT_OK(Flush());
1575
1576 constexpr Slice* begin = nullptr;
1577 constexpr Slice* end = nullptr;
1578
1579 // On compaction, because of blob_garbage_collection_age_cutoff, it will
1580 // delete the oldest blob file and create new blob file during compaction.
1581 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
1582 ASSERT_OK(dbfull()->TEST_WaitForCompact());
1583
1584 blob_event_listener->CheckCounters();
1585 }
1586
1587 } // namespace ROCKSDB_NAMESPACE
1588
1589 #endif // ROCKSDB_LITE
1590
1591 int main(int argc, char** argv) {
1592 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1593 ::testing::InitGoogleTest(&argc, argv);
1594 return RUN_ALL_TESTS();
1595 }