]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include "db/db_impl.h" | |
7 | #include "db/db_test_util.h" | |
8 | #include "db/dbformat.h" | |
9 | #include "db/version_set.h" | |
10 | #include "db/write_batch_internal.h" | |
11 | #include "memtable/hash_linklist_rep.h" | |
12 | #include "monitoring/statistics.h" | |
13 | #include "rocksdb/cache.h" | |
14 | #include "rocksdb/compaction_filter.h" | |
15 | #include "rocksdb/db.h" | |
16 | #include "rocksdb/env.h" | |
17 | #include "rocksdb/filter_policy.h" | |
18 | #include "rocksdb/options.h" | |
19 | #include "rocksdb/perf_context.h" | |
20 | #include "rocksdb/slice.h" | |
21 | #include "rocksdb/slice_transform.h" | |
22 | #include "rocksdb/table.h" | |
23 | #include "rocksdb/table_properties.h" | |
24 | #include "table/block_based_table_factory.h" | |
25 | #include "table/plain_table_factory.h" | |
26 | #include "util/filename.h" | |
27 | #include "util/hash.h" | |
28 | #include "util/logging.h" | |
29 | #include "util/mutexlock.h" | |
30 | #include "util/rate_limiter.h" | |
31 | #include "util/string_util.h" | |
32 | #include "util/sync_point.h" | |
33 | #include "util/testharness.h" | |
34 | #include "util/testutil.h" | |
35 | #include "utilities/merge_operators.h" | |
36 | ||
37 | #ifndef ROCKSDB_LITE | |
38 | ||
39 | namespace rocksdb { | |
40 | ||
41 | class EventListenerTest : public DBTestBase { | |
42 | public: | |
43 | EventListenerTest() : DBTestBase("/listener_test") {} | |
44 | ||
45 | const size_t k110KB = 110 << 10; | |
46 | }; | |
47 | ||
48 | struct TestPropertiesCollector : public rocksdb::TablePropertiesCollector { | |
494da23a TL |
49 | rocksdb::Status AddUserKey(const rocksdb::Slice& /*key*/, |
50 | const rocksdb::Slice& /*value*/, | |
51 | rocksdb::EntryType /*type*/, | |
52 | rocksdb::SequenceNumber /*seq*/, | |
53 | uint64_t /*file_size*/) override { | |
7c673cae FG |
54 | return Status::OK(); |
55 | } | |
494da23a | 56 | rocksdb::Status Finish( |
7c673cae FG |
57 | rocksdb::UserCollectedProperties* properties) override { |
58 | properties->insert({"0", "1"}); | |
59 | return Status::OK(); | |
60 | } | |
61 | ||
494da23a | 62 | const char* Name() const override { return "TestTablePropertiesCollector"; } |
7c673cae FG |
63 | |
64 | rocksdb::UserCollectedProperties GetReadableProperties() const override { | |
65 | rocksdb::UserCollectedProperties ret; | |
66 | ret["2"] = "3"; | |
67 | return ret; | |
68 | } | |
69 | }; | |
70 | ||
71 | class TestPropertiesCollectorFactory : public TablePropertiesCollectorFactory { | |
72 | public: | |
494da23a | 73 | TablePropertiesCollector* CreateTablePropertiesCollector( |
11fdf7f2 | 74 | TablePropertiesCollectorFactory::Context /*context*/) override { |
7c673cae FG |
75 | return new TestPropertiesCollector; |
76 | } | |
77 | const char* Name() const override { return "TestTablePropertiesCollector"; } | |
78 | }; | |
79 | ||
80 | class TestCompactionListener : public EventListener { | |
81 | public: | |
82 | void OnCompactionCompleted(DB *db, const CompactionJobInfo& ci) override { | |
83 | std::lock_guard<std::mutex> lock(mutex_); | |
84 | compacted_dbs_.push_back(db); | |
85 | ASSERT_GT(ci.input_files.size(), 0U); | |
86 | ASSERT_GT(ci.output_files.size(), 0U); | |
87 | ASSERT_EQ(db->GetEnv()->GetThreadID(), ci.thread_id); | |
88 | ASSERT_GT(ci.thread_id, 0U); | |
89 | ||
90 | for (auto fl : {ci.input_files, ci.output_files}) { | |
91 | for (auto fn : fl) { | |
92 | auto it = ci.table_properties.find(fn); | |
93 | ASSERT_NE(it, ci.table_properties.end()); | |
94 | auto tp = it->second; | |
95 | ASSERT_TRUE(tp != nullptr); | |
96 | ASSERT_EQ(tp->user_collected_properties.find("0")->second, "1"); | |
97 | } | |
98 | } | |
99 | } | |
100 | ||
101 | std::vector<DB*> compacted_dbs_; | |
102 | std::mutex mutex_; | |
103 | }; | |
104 | ||
105 | TEST_F(EventListenerTest, OnSingleDBCompactionTest) { | |
106 | const int kTestKeySize = 16; | |
107 | const int kTestValueSize = 984; | |
108 | const int kEntrySize = kTestKeySize + kTestValueSize; | |
109 | const int kEntriesPerBuffer = 100; | |
110 | const int kNumL0Files = 4; | |
111 | ||
112 | Options options; | |
11fdf7f2 | 113 | options.env = CurrentOptions().env; |
7c673cae FG |
114 | options.create_if_missing = true; |
115 | options.write_buffer_size = kEntrySize * kEntriesPerBuffer; | |
116 | options.compaction_style = kCompactionStyleLevel; | |
117 | options.target_file_size_base = options.write_buffer_size; | |
118 | options.max_bytes_for_level_base = options.target_file_size_base * 2; | |
119 | options.max_bytes_for_level_multiplier = 2; | |
120 | options.compression = kNoCompression; | |
121 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
122 | options.enable_thread_tracking = true; | |
123 | #endif // ROCKSDB_USING_THREAD_STATUS | |
124 | options.level0_file_num_compaction_trigger = kNumL0Files; | |
125 | options.table_properties_collector_factories.push_back( | |
126 | std::make_shared<TestPropertiesCollectorFactory>()); | |
127 | ||
128 | TestCompactionListener* listener = new TestCompactionListener(); | |
129 | options.listeners.emplace_back(listener); | |
130 | std::vector<std::string> cf_names = { | |
131 | "pikachu", "ilya", "muromec", "dobrynia", | |
132 | "nikitich", "alyosha", "popovich"}; | |
133 | CreateAndReopenWithCF(cf_names, options); | |
134 | ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); | |
135 | ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); | |
136 | ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); | |
137 | ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); | |
138 | ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); | |
139 | ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); | |
140 | ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); | |
141 | for (int i = 1; i < 8; ++i) { | |
142 | ASSERT_OK(Flush(i)); | |
143 | const Slice kRangeStart = "a"; | |
144 | const Slice kRangeEnd = "z"; | |
145 | ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[i], | |
146 | &kRangeStart, &kRangeEnd)); | |
147 | dbfull()->TEST_WaitForFlushMemTable(); | |
148 | dbfull()->TEST_WaitForCompact(); | |
149 | } | |
150 | ||
151 | ASSERT_EQ(listener->compacted_dbs_.size(), cf_names.size()); | |
152 | for (size_t i = 0; i < cf_names.size(); ++i) { | |
153 | ASSERT_EQ(listener->compacted_dbs_[i], db_); | |
154 | } | |
155 | } | |
156 | ||
157 | // This simple Listener can only handle one flush at a time. | |
158 | class TestFlushListener : public EventListener { | |
159 | public: | |
160 | explicit TestFlushListener(Env* env) | |
161 | : slowdown_count(0), stop_count(0), db_closed(), env_(env) { | |
162 | db_closed = false; | |
163 | } | |
164 | void OnTableFileCreated( | |
165 | const TableFileCreationInfo& info) override { | |
166 | // remember the info for later checking the FlushJobInfo. | |
167 | prev_fc_info_ = info; | |
168 | ASSERT_GT(info.db_name.size(), 0U); | |
169 | ASSERT_GT(info.cf_name.size(), 0U); | |
170 | ASSERT_GT(info.file_path.size(), 0U); | |
171 | ASSERT_GT(info.job_id, 0); | |
172 | ASSERT_GT(info.table_properties.data_size, 0U); | |
173 | ASSERT_GT(info.table_properties.raw_key_size, 0U); | |
174 | ASSERT_GT(info.table_properties.raw_value_size, 0U); | |
175 | ASSERT_GT(info.table_properties.num_data_blocks, 0U); | |
176 | ASSERT_GT(info.table_properties.num_entries, 0U); | |
177 | ||
178 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
179 | // Verify the id of the current thread that created this table | |
180 | // file matches the id of any active flush or compaction thread. | |
181 | uint64_t thread_id = env_->GetThreadID(); | |
182 | std::vector<ThreadStatus> thread_list; | |
183 | ASSERT_OK(env_->GetThreadList(&thread_list)); | |
184 | bool found_match = false; | |
185 | for (auto thread_status : thread_list) { | |
186 | if (thread_status.operation_type == ThreadStatus::OP_FLUSH || | |
187 | thread_status.operation_type == ThreadStatus::OP_COMPACTION) { | |
188 | if (thread_id == thread_status.thread_id) { | |
189 | found_match = true; | |
190 | break; | |
191 | } | |
192 | } | |
193 | } | |
194 | ASSERT_TRUE(found_match); | |
195 | #endif // ROCKSDB_USING_THREAD_STATUS | |
196 | } | |
197 | ||
198 | void OnFlushCompleted( | |
199 | DB* db, const FlushJobInfo& info) override { | |
200 | flushed_dbs_.push_back(db); | |
201 | flushed_column_family_names_.push_back(info.cf_name); | |
202 | if (info.triggered_writes_slowdown) { | |
203 | slowdown_count++; | |
204 | } | |
205 | if (info.triggered_writes_stop) { | |
206 | stop_count++; | |
207 | } | |
208 | // verify whether the previously created file matches the flushed file. | |
209 | ASSERT_EQ(prev_fc_info_.db_name, db->GetName()); | |
210 | ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name); | |
211 | ASSERT_EQ(prev_fc_info_.job_id, info.job_id); | |
212 | ASSERT_EQ(prev_fc_info_.file_path, info.file_path); | |
213 | ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id); | |
214 | ASSERT_GT(info.thread_id, 0U); | |
215 | ASSERT_EQ(info.table_properties.user_collected_properties.find("0")->second, | |
216 | "1"); | |
217 | } | |
218 | ||
219 | std::vector<std::string> flushed_column_family_names_; | |
220 | std::vector<DB*> flushed_dbs_; | |
221 | int slowdown_count; | |
222 | int stop_count; | |
223 | bool db_closing; | |
224 | std::atomic_bool db_closed; | |
225 | TableFileCreationInfo prev_fc_info_; | |
226 | ||
227 | protected: | |
228 | Env* env_; | |
229 | }; | |
230 | ||
231 | TEST_F(EventListenerTest, OnSingleDBFlushTest) { | |
232 | Options options; | |
11fdf7f2 | 233 | options.env = CurrentOptions().env; |
7c673cae FG |
234 | options.write_buffer_size = k110KB; |
235 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
236 | options.enable_thread_tracking = true; | |
237 | #endif // ROCKSDB_USING_THREAD_STATUS | |
238 | TestFlushListener* listener = new TestFlushListener(options.env); | |
239 | options.listeners.emplace_back(listener); | |
240 | std::vector<std::string> cf_names = { | |
241 | "pikachu", "ilya", "muromec", "dobrynia", | |
242 | "nikitich", "alyosha", "popovich"}; | |
243 | options.table_properties_collector_factories.push_back( | |
244 | std::make_shared<TestPropertiesCollectorFactory>()); | |
245 | CreateAndReopenWithCF(cf_names, options); | |
246 | ||
247 | ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); | |
248 | ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); | |
249 | ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); | |
250 | ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); | |
251 | ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); | |
252 | ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); | |
253 | ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); | |
254 | for (int i = 1; i < 8; ++i) { | |
255 | ASSERT_OK(Flush(i)); | |
256 | dbfull()->TEST_WaitForFlushMemTable(); | |
257 | ASSERT_EQ(listener->flushed_dbs_.size(), i); | |
258 | ASSERT_EQ(listener->flushed_column_family_names_.size(), i); | |
259 | } | |
260 | ||
11fdf7f2 | 261 | // make sure callback functions are called in the right order |
7c673cae FG |
262 | for (size_t i = 0; i < cf_names.size(); ++i) { |
263 | ASSERT_EQ(listener->flushed_dbs_[i], db_); | |
264 | ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); | |
265 | } | |
266 | } | |
267 | ||
268 | TEST_F(EventListenerTest, MultiCF) { | |
269 | Options options; | |
11fdf7f2 | 270 | options.env = CurrentOptions().env; |
7c673cae FG |
271 | options.write_buffer_size = k110KB; |
272 | #ifdef ROCKSDB_USING_THREAD_STATUS | |
273 | options.enable_thread_tracking = true; | |
274 | #endif // ROCKSDB_USING_THREAD_STATUS | |
275 | TestFlushListener* listener = new TestFlushListener(options.env); | |
276 | options.listeners.emplace_back(listener); | |
277 | options.table_properties_collector_factories.push_back( | |
278 | std::make_shared<TestPropertiesCollectorFactory>()); | |
279 | std::vector<std::string> cf_names = { | |
280 | "pikachu", "ilya", "muromec", "dobrynia", | |
281 | "nikitich", "alyosha", "popovich"}; | |
282 | CreateAndReopenWithCF(cf_names, options); | |
283 | ||
284 | ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p'))); | |
285 | ASSERT_OK(Put(2, "ilya", std::string(90000, 'i'))); | |
286 | ASSERT_OK(Put(3, "muromec", std::string(90000, 'm'))); | |
287 | ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd'))); | |
288 | ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); | |
289 | ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); | |
290 | ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); | |
291 | for (int i = 1; i < 8; ++i) { | |
292 | ASSERT_OK(Flush(i)); | |
293 | ASSERT_EQ(listener->flushed_dbs_.size(), i); | |
294 | ASSERT_EQ(listener->flushed_column_family_names_.size(), i); | |
295 | } | |
296 | ||
11fdf7f2 | 297 | // make sure callback functions are called in the right order |
7c673cae FG |
298 | for (size_t i = 0; i < cf_names.size(); i++) { |
299 | ASSERT_EQ(listener->flushed_dbs_[i], db_); | |
300 | ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); | |
301 | } | |
302 | } | |
303 | ||
304 | TEST_F(EventListenerTest, MultiDBMultiListeners) { | |
305 | Options options; | |
11fdf7f2 | 306 | options.env = CurrentOptions().env; |
7c673cae FG |
307 | #ifdef ROCKSDB_USING_THREAD_STATUS |
308 | options.enable_thread_tracking = true; | |
309 | #endif // ROCKSDB_USING_THREAD_STATUS | |
310 | options.table_properties_collector_factories.push_back( | |
311 | std::make_shared<TestPropertiesCollectorFactory>()); | |
312 | std::vector<TestFlushListener*> listeners; | |
313 | const int kNumDBs = 5; | |
314 | const int kNumListeners = 10; | |
315 | for (int i = 0; i < kNumListeners; ++i) { | |
316 | listeners.emplace_back(new TestFlushListener(options.env)); | |
317 | } | |
318 | ||
319 | std::vector<std::string> cf_names = { | |
320 | "pikachu", "ilya", "muromec", "dobrynia", | |
321 | "nikitich", "alyosha", "popovich"}; | |
322 | ||
323 | options.create_if_missing = true; | |
324 | for (int i = 0; i < kNumListeners; ++i) { | |
325 | options.listeners.emplace_back(listeners[i]); | |
326 | } | |
327 | DBOptions db_opts(options); | |
328 | ColumnFamilyOptions cf_opts(options); | |
329 | ||
330 | std::vector<DB*> dbs; | |
331 | std::vector<std::vector<ColumnFamilyHandle *>> vec_handles; | |
332 | ||
333 | for (int d = 0; d < kNumDBs; ++d) { | |
334 | ASSERT_OK(DestroyDB(dbname_ + ToString(d), options)); | |
335 | DB* db; | |
336 | std::vector<ColumnFamilyHandle*> handles; | |
337 | ASSERT_OK(DB::Open(options, dbname_ + ToString(d), &db)); | |
338 | for (size_t c = 0; c < cf_names.size(); ++c) { | |
339 | ColumnFamilyHandle* handle; | |
340 | db->CreateColumnFamily(cf_opts, cf_names[c], &handle); | |
341 | handles.push_back(handle); | |
342 | } | |
343 | ||
344 | vec_handles.push_back(std::move(handles)); | |
345 | dbs.push_back(db); | |
346 | } | |
347 | ||
348 | for (int d = 0; d < kNumDBs; ++d) { | |
349 | for (size_t c = 0; c < cf_names.size(); ++c) { | |
350 | ASSERT_OK(dbs[d]->Put(WriteOptions(), vec_handles[d][c], | |
351 | cf_names[c], cf_names[c])); | |
352 | } | |
353 | } | |
354 | ||
355 | for (size_t c = 0; c < cf_names.size(); ++c) { | |
356 | for (int d = 0; d < kNumDBs; ++d) { | |
357 | ASSERT_OK(dbs[d]->Flush(FlushOptions(), vec_handles[d][c])); | |
358 | reinterpret_cast<DBImpl*>(dbs[d])->TEST_WaitForFlushMemTable(); | |
359 | } | |
360 | } | |
361 | ||
362 | for (auto* listener : listeners) { | |
363 | int pos = 0; | |
364 | for (size_t c = 0; c < cf_names.size(); ++c) { | |
365 | for (int d = 0; d < kNumDBs; ++d) { | |
366 | ASSERT_EQ(listener->flushed_dbs_[pos], dbs[d]); | |
367 | ASSERT_EQ(listener->flushed_column_family_names_[pos], cf_names[c]); | |
368 | pos++; | |
369 | } | |
370 | } | |
371 | } | |
372 | ||
373 | ||
374 | for (auto handles : vec_handles) { | |
375 | for (auto h : handles) { | |
376 | delete h; | |
377 | } | |
378 | handles.clear(); | |
379 | } | |
380 | vec_handles.clear(); | |
381 | ||
382 | for (auto db : dbs) { | |
383 | delete db; | |
384 | } | |
385 | } | |
386 | ||
387 | TEST_F(EventListenerTest, DisableBGCompaction) { | |
388 | Options options; | |
11fdf7f2 | 389 | options.env = CurrentOptions().env; |
7c673cae FG |
390 | #ifdef ROCKSDB_USING_THREAD_STATUS |
391 | options.enable_thread_tracking = true; | |
392 | #endif // ROCKSDB_USING_THREAD_STATUS | |
393 | TestFlushListener* listener = new TestFlushListener(options.env); | |
394 | const int kCompactionTrigger = 1; | |
395 | const int kSlowdownTrigger = 5; | |
396 | const int kStopTrigger = 100; | |
397 | options.level0_file_num_compaction_trigger = kCompactionTrigger; | |
398 | options.level0_slowdown_writes_trigger = kSlowdownTrigger; | |
399 | options.level0_stop_writes_trigger = kStopTrigger; | |
400 | options.max_write_buffer_number = 10; | |
401 | options.listeners.emplace_back(listener); | |
402 | // BG compaction is disabled. Number of L0 files will simply keeps | |
403 | // increasing in this test. | |
404 | options.compaction_style = kCompactionStyleNone; | |
405 | options.compression = kNoCompression; | |
406 | options.write_buffer_size = 100000; // Small write buffer | |
407 | options.table_properties_collector_factories.push_back( | |
408 | std::make_shared<TestPropertiesCollectorFactory>()); | |
409 | ||
410 | CreateAndReopenWithCF({"pikachu"}, options); | |
411 | ColumnFamilyMetaData cf_meta; | |
412 | db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); | |
413 | ||
414 | // keep writing until writes are forced to stop. | |
415 | for (int i = 0; static_cast<int>(cf_meta.file_count) < kSlowdownTrigger * 10; | |
416 | ++i) { | |
417 | Put(1, ToString(i), std::string(10000, 'x'), WriteOptions()); | |
11fdf7f2 TL |
418 | FlushOptions fo; |
419 | fo.allow_write_stall = true; | |
420 | db_->Flush(fo, handles_[1]); | |
7c673cae FG |
421 | db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); |
422 | } | |
423 | ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); | |
424 | } | |
425 | ||
426 | class TestCompactionReasonListener : public EventListener { | |
427 | public: | |
11fdf7f2 | 428 | void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override { |
7c673cae FG |
429 | std::lock_guard<std::mutex> lock(mutex_); |
430 | compaction_reasons_.push_back(ci.compaction_reason); | |
431 | } | |
432 | ||
433 | std::vector<CompactionReason> compaction_reasons_; | |
434 | std::mutex mutex_; | |
435 | }; | |
436 | ||
437 | TEST_F(EventListenerTest, CompactionReasonLevel) { | |
438 | Options options; | |
11fdf7f2 | 439 | options.env = CurrentOptions().env; |
7c673cae FG |
440 | options.create_if_missing = true; |
441 | options.memtable_factory.reset( | |
442 | new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile)); | |
443 | ||
444 | TestCompactionReasonListener* listener = new TestCompactionReasonListener(); | |
445 | options.listeners.emplace_back(listener); | |
446 | ||
447 | options.level0_file_num_compaction_trigger = 4; | |
448 | options.compaction_style = kCompactionStyleLevel; | |
449 | ||
450 | DestroyAndReopen(options); | |
451 | Random rnd(301); | |
452 | ||
453 | // Write 4 files in L0 | |
454 | for (int i = 0; i < 4; i++) { | |
455 | GenerateNewRandomFile(&rnd); | |
456 | } | |
457 | dbfull()->TEST_WaitForCompact(); | |
458 | ||
459 | ASSERT_EQ(listener->compaction_reasons_.size(), 1); | |
460 | ASSERT_EQ(listener->compaction_reasons_[0], | |
461 | CompactionReason::kLevelL0FilesNum); | |
462 | ||
463 | DestroyAndReopen(options); | |
464 | ||
465 | // Write 3 non-overlapping files in L0 | |
466 | for (int k = 1; k <= 30; k++) { | |
467 | ASSERT_OK(Put(Key(k), Key(k))); | |
468 | if (k % 10 == 0) { | |
469 | Flush(); | |
470 | } | |
471 | } | |
472 | ||
473 | // Do a trivial move from L0 -> L1 | |
474 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
475 | ||
476 | options.max_bytes_for_level_base = 1; | |
477 | Close(); | |
478 | listener->compaction_reasons_.clear(); | |
479 | Reopen(options); | |
480 | ||
481 | dbfull()->TEST_WaitForCompact(); | |
482 | ASSERT_GT(listener->compaction_reasons_.size(), 1); | |
483 | ||
484 | for (auto compaction_reason : listener->compaction_reasons_) { | |
485 | ASSERT_EQ(compaction_reason, CompactionReason::kLevelMaxLevelSize); | |
486 | } | |
487 | ||
488 | options.disable_auto_compactions = true; | |
489 | Close(); | |
490 | listener->compaction_reasons_.clear(); | |
491 | Reopen(options); | |
492 | ||
493 | Put("key", "value"); | |
494 | CompactRangeOptions cro; | |
495 | cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; | |
496 | ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); | |
497 | ASSERT_GT(listener->compaction_reasons_.size(), 0); | |
498 | for (auto compaction_reason : listener->compaction_reasons_) { | |
499 | ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction); | |
500 | } | |
501 | } | |
502 | ||
503 | TEST_F(EventListenerTest, CompactionReasonUniversal) { | |
504 | Options options; | |
11fdf7f2 | 505 | options.env = CurrentOptions().env; |
7c673cae FG |
506 | options.create_if_missing = true; |
507 | options.memtable_factory.reset( | |
508 | new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile)); | |
509 | ||
510 | TestCompactionReasonListener* listener = new TestCompactionReasonListener(); | |
511 | options.listeners.emplace_back(listener); | |
512 | ||
513 | options.compaction_style = kCompactionStyleUniversal; | |
514 | ||
515 | Random rnd(301); | |
516 | ||
517 | options.level0_file_num_compaction_trigger = 8; | |
518 | options.compaction_options_universal.max_size_amplification_percent = 100000; | |
519 | options.compaction_options_universal.size_ratio = 100000; | |
520 | DestroyAndReopen(options); | |
521 | listener->compaction_reasons_.clear(); | |
522 | ||
523 | // Write 8 files in L0 | |
524 | for (int i = 0; i < 8; i++) { | |
525 | GenerateNewRandomFile(&rnd); | |
526 | } | |
527 | dbfull()->TEST_WaitForCompact(); | |
528 | ||
529 | ASSERT_GT(listener->compaction_reasons_.size(), 0); | |
530 | for (auto compaction_reason : listener->compaction_reasons_) { | |
11fdf7f2 | 531 | ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeRatio); |
7c673cae FG |
532 | } |
533 | ||
534 | options.level0_file_num_compaction_trigger = 8; | |
535 | options.compaction_options_universal.max_size_amplification_percent = 1; | |
536 | options.compaction_options_universal.size_ratio = 100000; | |
537 | ||
538 | DestroyAndReopen(options); | |
539 | listener->compaction_reasons_.clear(); | |
540 | ||
541 | // Write 8 files in L0 | |
542 | for (int i = 0; i < 8; i++) { | |
543 | GenerateNewRandomFile(&rnd); | |
544 | } | |
545 | dbfull()->TEST_WaitForCompact(); | |
546 | ||
547 | ASSERT_GT(listener->compaction_reasons_.size(), 0); | |
548 | for (auto compaction_reason : listener->compaction_reasons_) { | |
549 | ASSERT_EQ(compaction_reason, CompactionReason::kUniversalSizeAmplification); | |
550 | } | |
551 | ||
552 | options.disable_auto_compactions = true; | |
553 | Close(); | |
554 | listener->compaction_reasons_.clear(); | |
555 | Reopen(options); | |
556 | ||
557 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
558 | ||
559 | ASSERT_GT(listener->compaction_reasons_.size(), 0); | |
560 | for (auto compaction_reason : listener->compaction_reasons_) { | |
561 | ASSERT_EQ(compaction_reason, CompactionReason::kManualCompaction); | |
562 | } | |
563 | } | |
564 | ||
565 | TEST_F(EventListenerTest, CompactionReasonFIFO) { | |
566 | Options options; | |
11fdf7f2 | 567 | options.env = CurrentOptions().env; |
7c673cae FG |
568 | options.create_if_missing = true; |
569 | options.memtable_factory.reset( | |
570 | new SpecialSkipListFactory(DBTestBase::kNumKeysByGenerateNewRandomFile)); | |
571 | ||
572 | TestCompactionReasonListener* listener = new TestCompactionReasonListener(); | |
573 | options.listeners.emplace_back(listener); | |
574 | ||
575 | options.level0_file_num_compaction_trigger = 4; | |
576 | options.compaction_style = kCompactionStyleFIFO; | |
577 | options.compaction_options_fifo.max_table_files_size = 1; | |
578 | ||
579 | DestroyAndReopen(options); | |
580 | Random rnd(301); | |
581 | ||
582 | // Write 4 files in L0 | |
583 | for (int i = 0; i < 4; i++) { | |
584 | GenerateNewRandomFile(&rnd); | |
585 | } | |
586 | dbfull()->TEST_WaitForCompact(); | |
587 | ||
588 | ASSERT_GT(listener->compaction_reasons_.size(), 0); | |
589 | for (auto compaction_reason : listener->compaction_reasons_) { | |
590 | ASSERT_EQ(compaction_reason, CompactionReason::kFIFOMaxSize); | |
591 | } | |
592 | } | |
593 | ||
594 | class TableFileCreationListener : public EventListener { | |
595 | public: | |
596 | class TestEnv : public EnvWrapper { | |
597 | public: | |
598 | TestEnv() : EnvWrapper(Env::Default()) {} | |
599 | ||
600 | void SetStatus(Status s) { status_ = s; } | |
601 | ||
602 | Status NewWritableFile(const std::string& fname, | |
603 | std::unique_ptr<WritableFile>* result, | |
494da23a | 604 | const EnvOptions& options) override { |
7c673cae FG |
605 | if (fname.size() > 4 && fname.substr(fname.size() - 4) == ".sst") { |
606 | if (!status_.ok()) { | |
607 | return status_; | |
608 | } | |
609 | } | |
610 | return Env::Default()->NewWritableFile(fname, result, options); | |
611 | } | |
612 | ||
613 | private: | |
614 | Status status_; | |
615 | }; | |
616 | ||
617 | TableFileCreationListener() { | |
618 | for (int i = 0; i < 2; i++) { | |
619 | started_[i] = finished_[i] = failure_[i] = 0; | |
620 | } | |
621 | } | |
622 | ||
623 | int Index(TableFileCreationReason reason) { | |
624 | int idx; | |
625 | switch (reason) { | |
626 | case TableFileCreationReason::kFlush: | |
627 | idx = 0; | |
628 | break; | |
629 | case TableFileCreationReason::kCompaction: | |
630 | idx = 1; | |
631 | break; | |
632 | default: | |
633 | idx = -1; | |
634 | } | |
635 | return idx; | |
636 | } | |
637 | ||
638 | void CheckAndResetCounters(int flush_started, int flush_finished, | |
639 | int flush_failure, int compaction_started, | |
640 | int compaction_finished, int compaction_failure) { | |
641 | ASSERT_EQ(started_[0], flush_started); | |
642 | ASSERT_EQ(finished_[0], flush_finished); | |
643 | ASSERT_EQ(failure_[0], flush_failure); | |
644 | ASSERT_EQ(started_[1], compaction_started); | |
645 | ASSERT_EQ(finished_[1], compaction_finished); | |
646 | ASSERT_EQ(failure_[1], compaction_failure); | |
647 | for (int i = 0; i < 2; i++) { | |
648 | started_[i] = finished_[i] = failure_[i] = 0; | |
649 | } | |
650 | } | |
651 | ||
652 | void OnTableFileCreationStarted( | |
653 | const TableFileCreationBriefInfo& info) override { | |
654 | int idx = Index(info.reason); | |
655 | if (idx >= 0) { | |
656 | started_[idx]++; | |
657 | } | |
658 | ASSERT_GT(info.db_name.size(), 0U); | |
659 | ASSERT_GT(info.cf_name.size(), 0U); | |
660 | ASSERT_GT(info.file_path.size(), 0U); | |
661 | ASSERT_GT(info.job_id, 0); | |
662 | } | |
663 | ||
664 | void OnTableFileCreated(const TableFileCreationInfo& info) override { | |
665 | int idx = Index(info.reason); | |
666 | if (idx >= 0) { | |
667 | finished_[idx]++; | |
668 | } | |
669 | ASSERT_GT(info.db_name.size(), 0U); | |
670 | ASSERT_GT(info.cf_name.size(), 0U); | |
671 | ASSERT_GT(info.file_path.size(), 0U); | |
672 | ASSERT_GT(info.job_id, 0); | |
673 | if (info.status.ok()) { | |
674 | ASSERT_GT(info.table_properties.data_size, 0U); | |
675 | ASSERT_GT(info.table_properties.raw_key_size, 0U); | |
676 | ASSERT_GT(info.table_properties.raw_value_size, 0U); | |
677 | ASSERT_GT(info.table_properties.num_data_blocks, 0U); | |
678 | ASSERT_GT(info.table_properties.num_entries, 0U); | |
679 | } else { | |
680 | if (idx >= 0) { | |
681 | failure_[idx]++; | |
682 | } | |
683 | } | |
684 | } | |
685 | ||
686 | TestEnv test_env; | |
687 | int started_[2]; | |
688 | int finished_[2]; | |
689 | int failure_[2]; | |
690 | }; | |
691 | ||
692 | TEST_F(EventListenerTest, TableFileCreationListenersTest) { | |
693 | auto listener = std::make_shared<TableFileCreationListener>(); | |
694 | Options options; | |
695 | options.create_if_missing = true; | |
696 | options.listeners.push_back(listener); | |
697 | options.env = &listener->test_env; | |
698 | DestroyAndReopen(options); | |
699 | ||
700 | ASSERT_OK(Put("foo", "aaa")); | |
701 | ASSERT_OK(Put("bar", "bbb")); | |
702 | ASSERT_OK(Flush()); | |
703 | dbfull()->TEST_WaitForFlushMemTable(); | |
704 | listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0); | |
705 | ||
706 | ASSERT_OK(Put("foo", "aaa1")); | |
707 | ASSERT_OK(Put("bar", "bbb1")); | |
708 | listener->test_env.SetStatus(Status::NotSupported("not supported")); | |
709 | ASSERT_NOK(Flush()); | |
710 | listener->CheckAndResetCounters(1, 1, 1, 0, 0, 0); | |
711 | listener->test_env.SetStatus(Status::OK()); | |
712 | ||
713 | Reopen(options); | |
714 | ASSERT_OK(Put("foo", "aaa2")); | |
715 | ASSERT_OK(Put("bar", "bbb2")); | |
716 | ASSERT_OK(Flush()); | |
717 | dbfull()->TEST_WaitForFlushMemTable(); | |
718 | listener->CheckAndResetCounters(1, 1, 0, 0, 0, 0); | |
719 | ||
720 | const Slice kRangeStart = "a"; | |
721 | const Slice kRangeEnd = "z"; | |
722 | dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd); | |
723 | dbfull()->TEST_WaitForCompact(); | |
724 | listener->CheckAndResetCounters(0, 0, 0, 1, 1, 0); | |
725 | ||
726 | ASSERT_OK(Put("foo", "aaa3")); | |
727 | ASSERT_OK(Put("bar", "bbb3")); | |
728 | ASSERT_OK(Flush()); | |
729 | listener->test_env.SetStatus(Status::NotSupported("not supported")); | |
730 | dbfull()->CompactRange(CompactRangeOptions(), &kRangeStart, &kRangeEnd); | |
731 | dbfull()->TEST_WaitForCompact(); | |
732 | listener->CheckAndResetCounters(1, 1, 0, 1, 1, 1); | |
733 | } | |
734 | ||
735 | class MemTableSealedListener : public EventListener { | |
736 | private: | |
737 | SequenceNumber latest_seq_number_; | |
738 | public: | |
739 | MemTableSealedListener() {} | |
740 | void OnMemTableSealed(const MemTableInfo& info) override { | |
741 | latest_seq_number_ = info.first_seqno; | |
742 | } | |
743 | ||
744 | void OnFlushCompleted(DB* /*db*/, | |
745 | const FlushJobInfo& flush_job_info) override { | |
746 | ASSERT_LE(flush_job_info.smallest_seqno, latest_seq_number_); | |
747 | } | |
748 | }; | |
749 | ||
750 | TEST_F(EventListenerTest, MemTableSealedListenerTest) { | |
751 | auto listener = std::make_shared<MemTableSealedListener>(); | |
752 | Options options; | |
753 | options.create_if_missing = true; | |
754 | options.listeners.push_back(listener); | |
755 | DestroyAndReopen(options); | |
756 | ||
757 | for (unsigned int i = 0; i < 10; i++) { | |
758 | std::string tag = std::to_string(i); | |
759 | ASSERT_OK(Put("foo"+tag, "aaa")); | |
760 | ASSERT_OK(Put("bar"+tag, "bbb")); | |
761 | ||
762 | ASSERT_OK(Flush()); | |
763 | } | |
764 | } | |
765 | ||
766 | class ColumnFamilyHandleDeletionStartedListener : public EventListener { | |
767 | private: | |
768 | std::vector<std::string> cfs_; | |
769 | int counter; | |
770 | ||
771 | public: | |
772 | explicit ColumnFamilyHandleDeletionStartedListener( | |
773 | const std::vector<std::string>& cfs) | |
774 | : cfs_(cfs), counter(0) { | |
775 | cfs_.insert(cfs_.begin(), kDefaultColumnFamilyName); | |
776 | } | |
777 | void OnColumnFamilyHandleDeletionStarted( | |
778 | ColumnFamilyHandle* handle) override { | |
779 | ASSERT_EQ(cfs_[handle->GetID()], handle->GetName()); | |
780 | counter++; | |
781 | } | |
782 | int getCounter() { return counter; } | |
783 | }; | |
784 | ||
785 | TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) { | |
786 | std::vector<std::string> cfs{"pikachu", "eevee", "Mewtwo"}; | |
787 | auto listener = | |
788 | std::make_shared<ColumnFamilyHandleDeletionStartedListener>(cfs); | |
789 | Options options; | |
11fdf7f2 | 790 | options.env = CurrentOptions().env; |
7c673cae FG |
791 | options.create_if_missing = true; |
792 | options.listeners.push_back(listener); | |
793 | CreateAndReopenWithCF(cfs, options); | |
794 | ASSERT_EQ(handles_.size(), 4); | |
795 | delete handles_[3]; | |
796 | delete handles_[2]; | |
797 | delete handles_[1]; | |
798 | handles_.resize(1); | |
799 | ASSERT_EQ(listener->getCounter(), 3); | |
800 | } | |
801 | ||
11fdf7f2 TL |
802 | class BackgroundErrorListener : public EventListener { |
803 | private: | |
804 | SpecialEnv* env_; | |
805 | int counter_; | |
806 | ||
807 | public: | |
808 | BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {} | |
809 | ||
810 | void OnBackgroundError(BackgroundErrorReason /*reason*/, | |
811 | Status* bg_error) override { | |
812 | if (counter_ == 0) { | |
813 | // suppress the first error and disable write-dropping such that a retry | |
814 | // can succeed. | |
815 | *bg_error = Status::OK(); | |
816 | env_->drop_writes_.store(false, std::memory_order_release); | |
817 | env_->no_slowdown_ = false; | |
818 | } | |
819 | ++counter_; | |
820 | } | |
821 | ||
822 | int counter() { return counter_; } | |
823 | }; | |
824 | ||
825 | TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) { | |
826 | auto listener = std::make_shared<BackgroundErrorListener>(env_); | |
827 | Options options; | |
828 | options.create_if_missing = true; | |
829 | options.env = env_; | |
830 | options.listeners.push_back(listener); | |
831 | options.memtable_factory.reset(new SpecialSkipListFactory(1)); | |
832 | options.paranoid_checks = true; | |
833 | DestroyAndReopen(options); | |
834 | ||
835 | // the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so | |
836 | // forge a custom one for the failed flush case. | |
837 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
838 | {{"DBImpl::BGWorkFlush:done", | |
839 | "EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}}); | |
840 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
841 | ||
842 | env_->drop_writes_.store(true, std::memory_order_release); | |
843 | env_->no_slowdown_ = true; | |
844 | ||
845 | ASSERT_OK(Put("key0", "val")); | |
846 | ASSERT_OK(Put("key1", "val")); | |
847 | TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"); | |
848 | ASSERT_EQ(1, listener->counter()); | |
849 | ASSERT_OK(Put("key2", "val")); | |
850 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
851 | ASSERT_EQ(1, NumTableFilesAtLevel(0)); | |
852 | } | |
853 | ||
854 | TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) { | |
855 | auto listener = std::make_shared<BackgroundErrorListener>(env_); | |
856 | Options options; | |
857 | options.create_if_missing = true; | |
858 | options.disable_auto_compactions = true; | |
859 | options.env = env_; | |
860 | options.level0_file_num_compaction_trigger = 2; | |
861 | options.listeners.push_back(listener); | |
862 | options.memtable_factory.reset(new SpecialSkipListFactory(2)); | |
863 | options.paranoid_checks = true; | |
864 | DestroyAndReopen(options); | |
865 | ||
866 | // third iteration triggers the second memtable's flush | |
867 | for (int i = 0; i < 3; ++i) { | |
868 | ASSERT_OK(Put("key0", "val")); | |
869 | if (i > 0) { | |
870 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
871 | } | |
872 | ASSERT_OK(Put("key1", "val")); | |
873 | } | |
874 | ASSERT_EQ(2, NumTableFilesAtLevel(0)); | |
875 | ||
876 | env_->drop_writes_.store(true, std::memory_order_release); | |
877 | env_->no_slowdown_ = true; | |
878 | ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); | |
879 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
880 | ASSERT_EQ(1, listener->counter()); | |
881 | ||
882 | // trigger flush so compaction is triggered again; this time it succeeds | |
883 | // The previous failed compaction may get retried automatically, so we may | |
884 | // be left with 0 or 1 files in level 1, depending on when the retry gets | |
885 | // scheduled | |
886 | ASSERT_OK(Put("key0", "val")); | |
887 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
888 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); | |
889 | ASSERT_LE(1, NumTableFilesAtLevel(0)); | |
890 | } | |
891 | ||
494da23a TL |
892 | class TestFileOperationListener : public EventListener { |
893 | public: | |
894 | TestFileOperationListener() { | |
895 | file_reads_.store(0); | |
896 | file_reads_success_.store(0); | |
897 | file_writes_.store(0); | |
898 | file_writes_success_.store(0); | |
899 | } | |
900 | ||
901 | void OnFileReadFinish(const FileOperationInfo& info) override { | |
902 | ++file_reads_; | |
903 | if (info.status.ok()) { | |
904 | ++file_reads_success_; | |
905 | } | |
906 | ReportDuration(info); | |
907 | } | |
908 | ||
909 | void OnFileWriteFinish(const FileOperationInfo& info) override { | |
910 | ++file_writes_; | |
911 | if (info.status.ok()) { | |
912 | ++file_writes_success_; | |
913 | } | |
914 | ReportDuration(info); | |
915 | } | |
916 | ||
917 | bool ShouldBeNotifiedOnFileIO() override { return true; } | |
918 | ||
919 | std::atomic<size_t> file_reads_; | |
920 | std::atomic<size_t> file_reads_success_; | |
921 | std::atomic<size_t> file_writes_; | |
922 | std::atomic<size_t> file_writes_success_; | |
923 | ||
924 | private: | |
925 | void ReportDuration(const FileOperationInfo& info) const { | |
926 | auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>( | |
927 | info.finish_timestamp - info.start_timestamp); | |
928 | ASSERT_GT(duration.count(), 0); | |
929 | } | |
930 | }; | |
931 | ||
932 | TEST_F(EventListenerTest, OnFileOperationTest) { | |
933 | Options options; | |
934 | options.env = CurrentOptions().env; | |
935 | options.create_if_missing = true; | |
936 | ||
937 | TestFileOperationListener* listener = new TestFileOperationListener(); | |
938 | options.listeners.emplace_back(listener); | |
939 | ||
940 | DestroyAndReopen(options); | |
941 | ASSERT_OK(Put("foo", "aaa")); | |
942 | dbfull()->Flush(FlushOptions()); | |
943 | dbfull()->TEST_WaitForFlushMemTable(); | |
944 | ASSERT_GE(listener->file_writes_.load(), | |
945 | listener->file_writes_success_.load()); | |
946 | ASSERT_GT(listener->file_writes_.load(), 0); | |
947 | Close(); | |
948 | ||
949 | Reopen(options); | |
950 | ASSERT_GE(listener->file_reads_.load(), listener->file_reads_success_.load()); | |
951 | ASSERT_GT(listener->file_reads_.load(), 0); | |
952 | } | |
953 | ||
7c673cae FG |
954 | } // namespace rocksdb |
955 | ||
956 | #endif // ROCKSDB_LITE | |
957 | ||
958 | int main(int argc, char** argv) { | |
959 | ::testing::InitGoogleTest(&argc, argv); | |
960 | return RUN_ALL_TESTS(); | |
961 | } |