]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / env / env_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #ifndef OS_WIN
11 #include <sys/ioctl.h>
12 #endif
13
14 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
15 #ifdef OS_FREEBSD
16 #include <malloc_np.h>
17 #else
18 #include <malloc.h>
19 #endif
20 #endif
21 #include <sys/types.h>
22
23 #include <iostream>
24 #include <unordered_set>
25 #include <atomic>
26 #include <list>
27
28 #ifdef OS_LINUX
29 #include <fcntl.h>
30 #include <linux/fs.h>
31 #include <stdlib.h>
32 #include <sys/stat.h>
33 #include <unistd.h>
34 #endif
35
36 #ifdef ROCKSDB_FALLOCATE_PRESENT
37 #include <errno.h>
38 #endif
39
40 #include "env/env_chroot.h"
41 #include "port/port.h"
42 #include "rocksdb/env.h"
43 #include "util/coding.h"
44 #include "util/log_buffer.h"
45 #include "util/mutexlock.h"
46 #include "util/string_util.h"
47 #include "util/sync_point.h"
48 #include "util/testharness.h"
49 #include "util/testutil.h"
50
51 #ifdef OS_LINUX
52 static const size_t kPageSize = sysconf(_SC_PAGESIZE);
53 #else
54 static const size_t kPageSize = 4 * 1024;
55 #endif
56
57 namespace rocksdb {
58
59 static const int kDelayMicros = 100000;
60
61 struct Deleter {
62 explicit Deleter(void (*fn)(void*)) : fn_(fn) {}
63
64 void operator()(void* ptr) {
65 assert(fn_);
66 assert(ptr);
67 (*fn_)(ptr);
68 }
69
70 void (*fn_)(void*);
71 };
72
73 std::unique_ptr<char, Deleter> NewAligned(const size_t size, const char ch) {
74 char* ptr = nullptr;
75 #ifdef OS_WIN
76 if (!(ptr = reinterpret_cast<char*>(_aligned_malloc(size, kPageSize)))) {
77 return std::unique_ptr<char, Deleter>(nullptr, Deleter(_aligned_free));
78 }
79 std::unique_ptr<char, Deleter> uptr(ptr, Deleter(_aligned_free));
80 #else
81 if (posix_memalign(reinterpret_cast<void**>(&ptr), kPageSize, size) != 0) {
82 return std::unique_ptr<char, Deleter>(nullptr, Deleter(free));
83 }
84 std::unique_ptr<char, Deleter> uptr(ptr, Deleter(free));
85 #endif
86 memset(uptr.get(), ch, size);
87 return uptr;
88 }
89
90 class EnvPosixTest : public testing::Test {
91 private:
92 port::Mutex mu_;
93 std::string events_;
94
95 public:
96 Env* env_;
97 bool direct_io_;
98 EnvPosixTest() : env_(Env::Default()), direct_io_(false) {}
99 };
100
101 class EnvPosixTestWithParam
102 : public EnvPosixTest,
103 public ::testing::WithParamInterface<std::pair<Env*, bool>> {
104 public:
105 EnvPosixTestWithParam() {
106 std::pair<Env*, bool> param_pair = GetParam();
107 env_ = param_pair.first;
108 direct_io_ = param_pair.second;
109 }
110
111 void WaitThreadPoolsEmpty() {
112 // Wait until the thread pools are empty.
113 while (env_->GetThreadPoolQueueLen(Env::Priority::LOW) != 0) {
114 Env::Default()->SleepForMicroseconds(kDelayMicros);
115 }
116 while (env_->GetThreadPoolQueueLen(Env::Priority::HIGH) != 0) {
117 Env::Default()->SleepForMicroseconds(kDelayMicros);
118 }
119 }
120
121 ~EnvPosixTestWithParam() { WaitThreadPoolsEmpty(); }
122 };
123
124 static void SetBool(void* ptr) {
125 reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
126 }
127
128 TEST_P(EnvPosixTestWithParam, RunImmediately) {
129 std::atomic<bool> called(false);
130 env_->Schedule(&SetBool, &called);
131 Env::Default()->SleepForMicroseconds(kDelayMicros);
132 ASSERT_TRUE(called.load());
133 WaitThreadPoolsEmpty();
134 }
135
136 TEST_P(EnvPosixTestWithParam, UnSchedule) {
137 std::atomic<bool> called(false);
138 env_->SetBackgroundThreads(1, Env::LOW);
139
140 /* Block the low priority queue */
141 test::SleepingBackgroundTask sleeping_task, sleeping_task1;
142 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
143 Env::Priority::LOW);
144
145 /* Schedule another task */
146 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task1,
147 Env::Priority::LOW, &sleeping_task1);
148
149 /* Remove it with a different tag */
150 ASSERT_EQ(0, env_->UnSchedule(&called, Env::Priority::LOW));
151
152 /* Remove it from the queue with the right tag */
153 ASSERT_EQ(1, env_->UnSchedule(&sleeping_task1, Env::Priority::LOW));
154
155 // Unblock background thread
156 sleeping_task.WakeUp();
157
158 /* Schedule another task */
159 env_->Schedule(&SetBool, &called);
160 for (int i = 0; i < kDelayMicros; i++) {
161 if (called.load()) {
162 break;
163 }
164 Env::Default()->SleepForMicroseconds(1);
165 }
166 ASSERT_TRUE(called.load());
167
168 ASSERT_TRUE(!sleeping_task.IsSleeping() && !sleeping_task1.IsSleeping());
169 WaitThreadPoolsEmpty();
170 }
171
172 TEST_P(EnvPosixTestWithParam, RunMany) {
173 std::atomic<int> last_id(0);
174
175 struct CB {
176 std::atomic<int>* last_id_ptr; // Pointer to shared slot
177 int id; // Order# for the execution of this callback
178
179 CB(std::atomic<int>* p, int i) : last_id_ptr(p), id(i) {}
180
181 static void Run(void* v) {
182 CB* cb = reinterpret_cast<CB*>(v);
183 int cur = cb->last_id_ptr->load();
184 ASSERT_EQ(cb->id - 1, cur);
185 cb->last_id_ptr->store(cb->id);
186 }
187 };
188
189 // Schedule in different order than start time
190 CB cb1(&last_id, 1);
191 CB cb2(&last_id, 2);
192 CB cb3(&last_id, 3);
193 CB cb4(&last_id, 4);
194 env_->Schedule(&CB::Run, &cb1);
195 env_->Schedule(&CB::Run, &cb2);
196 env_->Schedule(&CB::Run, &cb3);
197 env_->Schedule(&CB::Run, &cb4);
198
199 Env::Default()->SleepForMicroseconds(kDelayMicros);
200 int cur = last_id.load(std::memory_order_acquire);
201 ASSERT_EQ(4, cur);
202 WaitThreadPoolsEmpty();
203 }
204
205 struct State {
206 port::Mutex mu;
207 int val;
208 int num_running;
209 };
210
211 static void ThreadBody(void* arg) {
212 State* s = reinterpret_cast<State*>(arg);
213 s->mu.Lock();
214 s->val += 1;
215 s->num_running -= 1;
216 s->mu.Unlock();
217 }
218
219 TEST_P(EnvPosixTestWithParam, StartThread) {
220 State state;
221 state.val = 0;
222 state.num_running = 3;
223 for (int i = 0; i < 3; i++) {
224 env_->StartThread(&ThreadBody, &state);
225 }
226 while (true) {
227 state.mu.Lock();
228 int num = state.num_running;
229 state.mu.Unlock();
230 if (num == 0) {
231 break;
232 }
233 Env::Default()->SleepForMicroseconds(kDelayMicros);
234 }
235 ASSERT_EQ(state.val, 3);
236 WaitThreadPoolsEmpty();
237 }
238
239 TEST_P(EnvPosixTestWithParam, TwoPools) {
240 // Data structures to signal tasks to run.
241 port::Mutex mutex;
242 port::CondVar cv(&mutex);
243 bool should_start = false;
244
245 class CB {
246 public:
247 CB(const std::string& pool_name, int pool_size, port::Mutex* trigger_mu,
248 port::CondVar* trigger_cv, bool* _should_start)
249 : mu_(),
250 num_running_(0),
251 num_finished_(0),
252 pool_size_(pool_size),
253 pool_name_(pool_name),
254 trigger_mu_(trigger_mu),
255 trigger_cv_(trigger_cv),
256 should_start_(_should_start) {}
257
258 static void Run(void* v) {
259 CB* cb = reinterpret_cast<CB*>(v);
260 cb->Run();
261 }
262
263 void Run() {
264 {
265 MutexLock l(&mu_);
266 num_running_++;
267 // make sure we don't have more than pool_size_ jobs running.
268 ASSERT_LE(num_running_, pool_size_.load());
269 }
270
271 {
272 MutexLock l(trigger_mu_);
273 while (!(*should_start_)) {
274 trigger_cv_->Wait();
275 }
276 }
277
278 {
279 MutexLock l(&mu_);
280 num_running_--;
281 num_finished_++;
282 }
283 }
284
285 int NumFinished() {
286 MutexLock l(&mu_);
287 return num_finished_;
288 }
289
290 void Reset(int pool_size) {
291 pool_size_.store(pool_size);
292 num_finished_ = 0;
293 }
294
295 private:
296 port::Mutex mu_;
297 int num_running_;
298 int num_finished_;
299 std::atomic<int> pool_size_;
300 std::string pool_name_;
301 port::Mutex* trigger_mu_;
302 port::CondVar* trigger_cv_;
303 bool* should_start_;
304 };
305
306 const int kLowPoolSize = 2;
307 const int kHighPoolSize = 4;
308 const int kJobs = 8;
309
310 CB low_pool_job("low", kLowPoolSize, &mutex, &cv, &should_start);
311 CB high_pool_job("high", kHighPoolSize, &mutex, &cv, &should_start);
312
313 env_->SetBackgroundThreads(kLowPoolSize);
314 env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
315
316 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
317 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
318
319 // schedule same number of jobs in each pool
320 for (int i = 0; i < kJobs; i++) {
321 env_->Schedule(&CB::Run, &low_pool_job);
322 env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
323 }
324 // Wait a short while for the jobs to be dispatched.
325 int sleep_count = 0;
326 while ((unsigned int)(kJobs - kLowPoolSize) !=
327 env_->GetThreadPoolQueueLen(Env::Priority::LOW) ||
328 (unsigned int)(kJobs - kHighPoolSize) !=
329 env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) {
330 env_->SleepForMicroseconds(kDelayMicros);
331 if (++sleep_count > 100) {
332 break;
333 }
334 }
335
336 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
337 env_->GetThreadPoolQueueLen());
338 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
339 env_->GetThreadPoolQueueLen(Env::Priority::LOW));
340 ASSERT_EQ((unsigned int)(kJobs - kHighPoolSize),
341 env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
342
343 // Trigger jobs to run.
344 {
345 MutexLock l(&mutex);
346 should_start = true;
347 cv.SignalAll();
348 }
349
350 // wait for all jobs to finish
351 while (low_pool_job.NumFinished() < kJobs ||
352 high_pool_job.NumFinished() < kJobs) {
353 env_->SleepForMicroseconds(kDelayMicros);
354 }
355
356 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
357 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
358
359 // Hold jobs to schedule;
360 should_start = false;
361
362 // call IncBackgroundThreadsIfNeeded to two pools. One increasing and
363 // the other decreasing
364 env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW);
365 env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH);
366 high_pool_job.Reset(kHighPoolSize + 1);
367 low_pool_job.Reset(kLowPoolSize);
368
369 // schedule same number of jobs in each pool
370 for (int i = 0; i < kJobs; i++) {
371 env_->Schedule(&CB::Run, &low_pool_job);
372 env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
373 }
374 // Wait a short while for the jobs to be dispatched.
375 sleep_count = 0;
376 while ((unsigned int)(kJobs - kLowPoolSize) !=
377 env_->GetThreadPoolQueueLen(Env::Priority::LOW) ||
378 (unsigned int)(kJobs - (kHighPoolSize + 1)) !=
379 env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) {
380 env_->SleepForMicroseconds(kDelayMicros);
381 if (++sleep_count > 100) {
382 break;
383 }
384 }
385 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
386 env_->GetThreadPoolQueueLen());
387 ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
388 env_->GetThreadPoolQueueLen(Env::Priority::LOW));
389 ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)),
390 env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
391
392 // Trigger jobs to run.
393 {
394 MutexLock l(&mutex);
395 should_start = true;
396 cv.SignalAll();
397 }
398
399 // wait for all jobs to finish
400 while (low_pool_job.NumFinished() < kJobs ||
401 high_pool_job.NumFinished() < kJobs) {
402 env_->SleepForMicroseconds(kDelayMicros);
403 }
404
405 env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
406 WaitThreadPoolsEmpty();
407 }
408
409 TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
410 std::vector<test::SleepingBackgroundTask> tasks(10);
411
412 // Set number of thread to 1 first.
413 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
414 Env::Default()->SleepForMicroseconds(kDelayMicros);
415
416 // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
417 for (size_t i = 0; i < 3; i++) {
418 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
419 Env::Priority::HIGH);
420 Env::Default()->SleepForMicroseconds(kDelayMicros);
421 }
422 ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
423 ASSERT_TRUE(tasks[0].IsSleeping());
424 ASSERT_TRUE(!tasks[1].IsSleeping());
425 ASSERT_TRUE(!tasks[2].IsSleeping());
426
427 // Increase to 2 threads. Task 0, 1 running; 2 waiting
428 env_->SetBackgroundThreads(2, Env::Priority::HIGH);
429 Env::Default()->SleepForMicroseconds(kDelayMicros);
430 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
431 ASSERT_TRUE(tasks[0].IsSleeping());
432 ASSERT_TRUE(tasks[1].IsSleeping());
433 ASSERT_TRUE(!tasks[2].IsSleeping());
434
435 // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
436 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
437 Env::Default()->SleepForMicroseconds(kDelayMicros);
438 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
439 ASSERT_TRUE(tasks[0].IsSleeping());
440 ASSERT_TRUE(tasks[1].IsSleeping());
441 ASSERT_TRUE(!tasks[2].IsSleeping());
442
443 // The last task finishes. Task 0 running, 2 waiting.
444 tasks[1].WakeUp();
445 Env::Default()->SleepForMicroseconds(kDelayMicros);
446 ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
447 ASSERT_TRUE(tasks[0].IsSleeping());
448 ASSERT_TRUE(!tasks[1].IsSleeping());
449 ASSERT_TRUE(!tasks[2].IsSleeping());
450
451 // Increase to 5 threads. Task 0 and 2 running.
452 env_->SetBackgroundThreads(5, Env::Priority::HIGH);
453 Env::Default()->SleepForMicroseconds(kDelayMicros);
454 ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
455 ASSERT_TRUE(tasks[0].IsSleeping());
456 ASSERT_TRUE(tasks[2].IsSleeping());
457
458 // Change number of threads a couple of times while there is no sufficient
459 // tasks.
460 env_->SetBackgroundThreads(7, Env::Priority::HIGH);
461 Env::Default()->SleepForMicroseconds(kDelayMicros);
462 tasks[2].WakeUp();
463 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
464 env_->SetBackgroundThreads(3, Env::Priority::HIGH);
465 Env::Default()->SleepForMicroseconds(kDelayMicros);
466 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
467 env_->SetBackgroundThreads(4, Env::Priority::HIGH);
468 Env::Default()->SleepForMicroseconds(kDelayMicros);
469 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
470 env_->SetBackgroundThreads(5, Env::Priority::HIGH);
471 Env::Default()->SleepForMicroseconds(kDelayMicros);
472 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
473 env_->SetBackgroundThreads(4, Env::Priority::HIGH);
474 Env::Default()->SleepForMicroseconds(kDelayMicros);
475 ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
476
477 Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
478
479 // Enqueue 5 more tasks. Thread pool size now is 4.
480 // Task 0, 3, 4, 5 running;6, 7 waiting.
481 for (size_t i = 3; i < 8; i++) {
482 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
483 Env::Priority::HIGH);
484 }
485 Env::Default()->SleepForMicroseconds(kDelayMicros);
486 ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
487 ASSERT_TRUE(tasks[3].IsSleeping());
488 ASSERT_TRUE(tasks[4].IsSleeping());
489 ASSERT_TRUE(tasks[5].IsSleeping());
490 ASSERT_TRUE(!tasks[6].IsSleeping());
491 ASSERT_TRUE(!tasks[7].IsSleeping());
492
493 // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
494 tasks[0].WakeUp();
495 tasks[3].WakeUp();
496 tasks[4].WakeUp();
497
498 Env::Default()->SleepForMicroseconds(kDelayMicros);
499 ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
500 for (size_t i = 5; i < 8; i++) {
501 ASSERT_TRUE(tasks[i].IsSleeping());
502 }
503
504 // Shrink back to 1 thread. Still task 5, 6, 7 running
505 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
506 Env::Default()->SleepForMicroseconds(kDelayMicros);
507 ASSERT_TRUE(tasks[5].IsSleeping());
508 ASSERT_TRUE(tasks[6].IsSleeping());
509 ASSERT_TRUE(tasks[7].IsSleeping());
510
511 // Wake up task 6. Task 5, 7 running
512 tasks[6].WakeUp();
513 Env::Default()->SleepForMicroseconds(kDelayMicros);
514 ASSERT_TRUE(tasks[5].IsSleeping());
515 ASSERT_TRUE(!tasks[6].IsSleeping());
516 ASSERT_TRUE(tasks[7].IsSleeping());
517
518 // Wake up threads 7. Task 5 running
519 tasks[7].WakeUp();
520 Env::Default()->SleepForMicroseconds(kDelayMicros);
521 ASSERT_TRUE(!tasks[7].IsSleeping());
522
523 // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
524 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[8],
525 Env::Priority::HIGH);
526 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[9],
527 Env::Priority::HIGH);
528 Env::Default()->SleepForMicroseconds(kDelayMicros);
529 ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0);
530 ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
531
532 // Increase to 4 threads. Task 5, 8, 9 running.
533 env_->SetBackgroundThreads(4, Env::Priority::HIGH);
534 Env::Default()->SleepForMicroseconds(kDelayMicros);
535 ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
536 ASSERT_TRUE(tasks[8].IsSleeping());
537 ASSERT_TRUE(tasks[9].IsSleeping());
538
539 // Shrink to 1 thread
540 env_->SetBackgroundThreads(1, Env::Priority::HIGH);
541
542 // Wake up thread 9.
543 tasks[9].WakeUp();
544 Env::Default()->SleepForMicroseconds(kDelayMicros);
545 ASSERT_TRUE(!tasks[9].IsSleeping());
546 ASSERT_TRUE(tasks[8].IsSleeping());
547
548 // Wake up thread 8
549 tasks[8].WakeUp();
550 Env::Default()->SleepForMicroseconds(kDelayMicros);
551 ASSERT_TRUE(!tasks[8].IsSleeping());
552
553 // Wake up the last thread
554 tasks[5].WakeUp();
555
556 Env::Default()->SleepForMicroseconds(kDelayMicros);
557 ASSERT_TRUE(!tasks[5].IsSleeping());
558 WaitThreadPoolsEmpty();
559 }
560
561 #if (defined OS_LINUX || defined OS_WIN)
562 // Travis doesn't support fallocate or getting unique ID from files for whatever
563 // reason.
564 #ifndef TRAVIS
565
566 namespace {
567 bool IsSingleVarint(const std::string& s) {
568 Slice slice(s);
569
570 uint64_t v;
571 if (!GetVarint64(&slice, &v)) {
572 return false;
573 }
574
575 return slice.size() == 0;
576 }
577
578 bool IsUniqueIDValid(const std::string& s) {
579 return !s.empty() && !IsSingleVarint(s);
580 }
581
582 const size_t MAX_ID_SIZE = 100;
583 char temp_id[MAX_ID_SIZE];
584
585
586 } // namespace
587
588 // Determine whether we can use the FS_IOC_GETVERSION ioctl
589 // on a file in directory DIR. Create a temporary file therein,
590 // try to apply the ioctl (save that result), cleanup and
591 // return the result. Return true if it is supported, and
592 // false if anything fails.
593 // Note that this function "knows" that dir has just been created
594 // and is empty, so we create a simply-named test file: "f".
595 bool ioctl_support__FS_IOC_GETVERSION(const std::string& dir) {
596 #ifdef OS_WIN
597 return true;
598 #else
599 const std::string file = dir + "/f";
600 int fd;
601 do {
602 fd = open(file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
603 } while (fd < 0 && errno == EINTR);
604 long int version;
605 bool ok = (fd >= 0 && ioctl(fd, FS_IOC_GETVERSION, &version) >= 0);
606
607 close(fd);
608 unlink(file.c_str());
609
610 return ok;
611 #endif
612 }
613
614 // To ensure that Env::GetUniqueId-related tests work correctly, the files
615 // should be stored in regular storage like "hard disk" or "flash device",
616 // and not on a tmpfs file system (like /dev/shm and /tmp on some systems).
617 // Otherwise we cannot get the correct id.
618 //
619 // This function serves as the replacement for test::TmpDir(), which may be
620 // customized to be on a file system that doesn't work with GetUniqueId().
621
622 class IoctlFriendlyTmpdir {
623 public:
624 explicit IoctlFriendlyTmpdir() {
625 char dir_buf[100];
626
627 const char *fmt = "%s/rocksdb.XXXXXX";
628 const char *tmp = getenv("TEST_IOCTL_FRIENDLY_TMPDIR");
629
630 #ifdef OS_WIN
631 #define rmdir _rmdir
632 if(tmp == nullptr) {
633 tmp = getenv("TMP");
634 }
635
636 snprintf(dir_buf, sizeof dir_buf, fmt, tmp);
637 auto result = _mktemp(dir_buf);
638 assert(result != nullptr);
639 BOOL ret = CreateDirectory(dir_buf, NULL);
640 assert(ret == TRUE);
641 dir_ = dir_buf;
642 #else
643 std::list<std::string> candidate_dir_list = {"/var/tmp", "/tmp"};
644
645 // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use
646 // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and
647 // add 1 for the trailing NUL byte.
648 if (tmp && strlen(tmp) + strlen(fmt) - 2 + 1 <= sizeof dir_buf) {
649 // use $TEST_IOCTL_FRIENDLY_TMPDIR value
650 candidate_dir_list.push_front(tmp);
651 }
652
653 for (const std::string& d : candidate_dir_list) {
654 snprintf(dir_buf, sizeof dir_buf, fmt, d.c_str());
655 if (mkdtemp(dir_buf)) {
656 if (ioctl_support__FS_IOC_GETVERSION(dir_buf)) {
657 dir_ = dir_buf;
658 return;
659 } else {
660 // Diagnose ioctl-related failure only if this is the
661 // directory specified via that envvar.
662 if (tmp && tmp == d) {
663 fprintf(stderr, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is "
664 "not suitable: %s\n", d.c_str());
665 }
666 rmdir(dir_buf); // ignore failure
667 }
668 } else {
669 // mkdtemp failed: diagnose it, but don't give up.
670 fprintf(stderr, "mkdtemp(%s/...) failed: %s\n", d.c_str(),
671 strerror(errno));
672 }
673 }
674
675 fprintf(stderr, "failed to find an ioctl-friendly temporary directory;"
676 " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n");
677 std::abort();
678 #endif
679 }
680
681 ~IoctlFriendlyTmpdir() {
682 rmdir(dir_.c_str());
683 }
684
685 const std::string& name() const {
686 return dir_;
687 }
688
689 private:
690 std::string dir_;
691 };
692
693 #ifndef ROCKSDB_LITE
694 TEST_F(EnvPosixTest, PositionedAppend) {
695 unique_ptr<WritableFile> writable_file;
696 EnvOptions options;
697 options.use_direct_writes = true;
698 options.use_mmap_writes = false;
699 IoctlFriendlyTmpdir ift;
700 ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options));
701 const size_t kBlockSize = 4096;
702 const size_t kPageSize = 4096;
703 const size_t kDataSize = kPageSize;
704 // Write a page worth of 'a'
705 auto data_ptr = NewAligned(kDataSize, 'a');
706 Slice data_a(data_ptr.get(), kDataSize);
707 ASSERT_OK(writable_file->PositionedAppend(data_a, 0U));
708 // Write a page worth of 'b' right after the first sector
709 data_ptr = NewAligned(kDataSize, 'b');
710 Slice data_b(data_ptr.get(), kDataSize);
711 ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize));
712 ASSERT_OK(writable_file->Close());
713 // The file now has 1 sector worth of a followed by a page worth of b
714
715 // Verify the above
716 unique_ptr<SequentialFile> seq_file;
717 ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options));
718 char scratch[kPageSize * 2];
719 Slice result;
720 ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch));
721 ASSERT_EQ(kPageSize + kBlockSize, result.size());
722 ASSERT_EQ('a', result[kBlockSize - 1]);
723 ASSERT_EQ('b', result[kBlockSize]);
724 }
725 #endif // !ROCKSDB_LITE
726
727 // Only works in linux platforms
728 TEST_P(EnvPosixTestWithParam, RandomAccessUniqueID) {
729 // Create file.
730 if (env_ == Env::Default()) {
731 EnvOptions soptions;
732 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
733 IoctlFriendlyTmpdir ift;
734 std::string fname = ift.name() + "/testfile";
735 unique_ptr<WritableFile> wfile;
736 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
737
738 unique_ptr<RandomAccessFile> file;
739
740 // Get Unique ID
741 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
742 size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
743 ASSERT_TRUE(id_size > 0);
744 std::string unique_id1(temp_id, id_size);
745 ASSERT_TRUE(IsUniqueIDValid(unique_id1));
746
747 // Get Unique ID again
748 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
749 id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
750 ASSERT_TRUE(id_size > 0);
751 std::string unique_id2(temp_id, id_size);
752 ASSERT_TRUE(IsUniqueIDValid(unique_id2));
753
754 // Get Unique ID again after waiting some time.
755 env_->SleepForMicroseconds(1000000);
756 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
757 id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
758 ASSERT_TRUE(id_size > 0);
759 std::string unique_id3(temp_id, id_size);
760 ASSERT_TRUE(IsUniqueIDValid(unique_id3));
761
762 // Check IDs are the same.
763 ASSERT_EQ(unique_id1, unique_id2);
764 ASSERT_EQ(unique_id2, unique_id3);
765
766 // Delete the file
767 env_->DeleteFile(fname);
768 }
769 }
770
771 // only works in linux platforms
772 #ifdef ROCKSDB_FALLOCATE_PRESENT
773 TEST_P(EnvPosixTestWithParam, AllocateTest) {
774 if (env_ == Env::Default()) {
775 IoctlFriendlyTmpdir ift;
776 std::string fname = ift.name() + "/preallocate_testfile";
777
778 // Try fallocate in a file to see whether the target file system supports
779 // it.
780 // Skip the test if fallocate is not supported.
781 std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2";
782 int fd = -1;
783 do {
784 fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
785 } while (fd < 0 && errno == EINTR);
786 ASSERT_GT(fd, 0);
787
788 int alloc_status = fallocate(fd, 0, 0, 1);
789
790 int err_number = 0;
791 if (alloc_status != 0) {
792 err_number = errno;
793 fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number));
794 }
795 close(fd);
796 ASSERT_OK(env_->DeleteFile(fname_test_fallocate));
797 if (alloc_status != 0 && err_number == EOPNOTSUPP) {
798 // The filesystem containing the file does not support fallocate
799 return;
800 }
801
802 EnvOptions soptions;
803 soptions.use_mmap_writes = false;
804 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
805 unique_ptr<WritableFile> wfile;
806 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
807
808 // allocate 100 MB
809 size_t kPreallocateSize = 100 * 1024 * 1024;
810 size_t kBlockSize = 512;
811 size_t kPageSize = 4096;
812 size_t kDataSize = 1024 * 1024;
813 auto data_ptr = NewAligned(kDataSize, 'A');
814 Slice data(data_ptr.get(), kDataSize);
815 wfile->SetPreallocationBlockSize(kPreallocateSize);
816 wfile->PrepareWrite(wfile->GetFileSize(), kDataSize);
817 ASSERT_OK(wfile->Append(data));
818 ASSERT_OK(wfile->Flush());
819
820 struct stat f_stat;
821 ASSERT_EQ(stat(fname.c_str(), &f_stat), 0);
822 ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
823 // verify that blocks are preallocated
824 // Note here that we don't check the exact number of blocks preallocated --
825 // we only require that number of allocated blocks is at least what we
826 // expect.
827 // It looks like some FS give us more blocks that we asked for. That's fine.
828 // It might be worth investigating further.
829 ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks);
830
831 // close the file, should deallocate the blocks
832 wfile.reset();
833
834 stat(fname.c_str(), &f_stat);
835 ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
836 // verify that preallocated blocks were deallocated on file close
837 // Because the FS might give us more blocks, we add a full page to the size
838 // and expect the number of blocks to be less or equal to that.
839 ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize,
840 (unsigned int)f_stat.st_blocks);
841 }
842 }
843 #endif // ROCKSDB_FALLOCATE_PRESENT
844
845 // Returns true if any of the strings in ss are the prefix of another string.
846 bool HasPrefix(const std::unordered_set<std::string>& ss) {
847 for (const std::string& s: ss) {
848 if (s.empty()) {
849 return true;
850 }
851 for (size_t i = 1; i < s.size(); ++i) {
852 if (ss.count(s.substr(0, i)) != 0) {
853 return true;
854 }
855 }
856 }
857 return false;
858 }
859
860 // Only works in linux and WIN platforms
861 TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDConcurrent) {
862 if (env_ == Env::Default()) {
863 // Check whether a bunch of concurrently existing files have unique IDs.
864 EnvOptions soptions;
865 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
866
867 // Create the files
868 IoctlFriendlyTmpdir ift;
869 std::vector<std::string> fnames;
870 for (int i = 0; i < 1000; ++i) {
871 fnames.push_back(ift.name() + "/" + "testfile" + ToString(i));
872
873 // Create file.
874 unique_ptr<WritableFile> wfile;
875 ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions));
876 }
877
878 // Collect and check whether the IDs are unique.
879 std::unordered_set<std::string> ids;
880 for (const std::string fname : fnames) {
881 unique_ptr<RandomAccessFile> file;
882 std::string unique_id;
883 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
884 size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
885 ASSERT_TRUE(id_size > 0);
886 unique_id = std::string(temp_id, id_size);
887 ASSERT_TRUE(IsUniqueIDValid(unique_id));
888
889 ASSERT_TRUE(ids.count(unique_id) == 0);
890 ids.insert(unique_id);
891 }
892
893 // Delete the files
894 for (const std::string fname : fnames) {
895 ASSERT_OK(env_->DeleteFile(fname));
896 }
897
898 ASSERT_TRUE(!HasPrefix(ids));
899 }
900 }
901
902 // Only works in linux and WIN platforms
903 TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) {
904 if (env_ == Env::Default()) {
905 EnvOptions soptions;
906 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
907
908 IoctlFriendlyTmpdir ift;
909 std::string fname = ift.name() + "/" + "testfile";
910
911 // Check that after file is deleted we don't get same ID again in a new
912 // file.
913 std::unordered_set<std::string> ids;
914 for (int i = 0; i < 1000; ++i) {
915 // Create file.
916 {
917 unique_ptr<WritableFile> wfile;
918 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
919 }
920
921 // Get Unique ID
922 std::string unique_id;
923 {
924 unique_ptr<RandomAccessFile> file;
925 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
926 size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
927 ASSERT_TRUE(id_size > 0);
928 unique_id = std::string(temp_id, id_size);
929 }
930
931 ASSERT_TRUE(IsUniqueIDValid(unique_id));
932 ASSERT_TRUE(ids.count(unique_id) == 0);
933 ids.insert(unique_id);
934
935 // Delete the file
936 ASSERT_OK(env_->DeleteFile(fname));
937 }
938
939 ASSERT_TRUE(!HasPrefix(ids));
940 }
941 }
942
943 // Only works in linux platforms
944 #ifdef OS_WIN
945 TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {
946 #else
947 TEST_P(EnvPosixTestWithParam, InvalidateCache) {
948 #endif
949 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
950 EnvOptions soptions;
951 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
952 std::string fname = test::TmpDir(env_) + "/" + "testfile";
953
954 const size_t kSectorSize = 512;
955 auto data = NewAligned(kSectorSize, 0);
956 Slice slice(data.get(), kSectorSize);
957
958 // Create file.
959 {
960 unique_ptr<WritableFile> wfile;
961 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
962 if (soptions.use_direct_writes) {
963 soptions.use_direct_writes = false;
964 }
965 #endif
966 ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
967 ASSERT_OK(wfile->Append(slice));
968 ASSERT_OK(wfile->InvalidateCache(0, 0));
969 ASSERT_OK(wfile->Close());
970 }
971
972 // Random Read
973 {
974 unique_ptr<RandomAccessFile> file;
975 auto scratch = NewAligned(kSectorSize, 0);
976 Slice result;
977 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
978 if (soptions.use_direct_reads) {
979 soptions.use_direct_reads = false;
980 }
981 #endif
982 ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
983 ASSERT_OK(file->Read(0, kSectorSize, &result, scratch.get()));
984 ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
985 ASSERT_OK(file->InvalidateCache(0, 11));
986 ASSERT_OK(file->InvalidateCache(0, 0));
987 }
988
989 // Sequential Read
990 {
991 unique_ptr<SequentialFile> file;
992 auto scratch = NewAligned(kSectorSize, 0);
993 Slice result;
994 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
995 if (soptions.use_direct_reads) {
996 soptions.use_direct_reads = false;
997 }
998 #endif
999 ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions));
1000 if (file->use_direct_io()) {
1001 ASSERT_OK(file->PositionedRead(0, kSectorSize, &result, scratch.get()));
1002 } else {
1003 ASSERT_OK(file->Read(kSectorSize, &result, scratch.get()));
1004 }
1005 ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
1006 ASSERT_OK(file->InvalidateCache(0, 11));
1007 ASSERT_OK(file->InvalidateCache(0, 0));
1008 }
1009 // Delete the file
1010 ASSERT_OK(env_->DeleteFile(fname));
1011 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1012 }
1013 #endif // not TRAVIS
1014 #endif // OS_LINUX || OS_WIN
1015
1016 class TestLogger : public Logger {
1017 public:
1018 using Logger::Logv;
1019 virtual void Logv(const char* format, va_list ap) override {
1020 log_count++;
1021
1022 char new_format[550];
1023 std::fill_n(new_format, sizeof(new_format), '2');
1024 {
1025 va_list backup_ap;
1026 va_copy(backup_ap, ap);
1027 int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
1028 // 48 bytes for extra information + bytes allocated
1029
1030 // When we have n == -1 there is not a terminating zero expected
1031 #ifdef OS_WIN
1032 if (n < 0) {
1033 char_0_count++;
1034 }
1035 #endif
1036
1037 if (new_format[0] == '[') {
1038 // "[DEBUG] "
1039 ASSERT_TRUE(n <= 56 + (512 - static_cast<int>(sizeof(struct timeval))));
1040 } else {
1041 ASSERT_TRUE(n <= 48 + (512 - static_cast<int>(sizeof(struct timeval))));
1042 }
1043 va_end(backup_ap);
1044 }
1045
1046 for (size_t i = 0; i < sizeof(new_format); i++) {
1047 if (new_format[i] == 'x') {
1048 char_x_count++;
1049 } else if (new_format[i] == '\0') {
1050 char_0_count++;
1051 }
1052 }
1053 }
1054 int log_count;
1055 int char_x_count;
1056 int char_0_count;
1057 };
1058
1059 TEST_P(EnvPosixTestWithParam, LogBufferTest) {
1060 TestLogger test_logger;
1061 test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
1062 test_logger.log_count = 0;
1063 test_logger.char_x_count = 0;
1064 test_logger.char_0_count = 0;
1065 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
1066 LogBuffer log_buffer_debug(DEBUG_LEVEL, &test_logger);
1067
1068 char bytes200[200];
1069 std::fill_n(bytes200, sizeof(bytes200), '1');
1070 bytes200[sizeof(bytes200) - 1] = '\0';
1071 char bytes600[600];
1072 std::fill_n(bytes600, sizeof(bytes600), '1');
1073 bytes600[sizeof(bytes600) - 1] = '\0';
1074 char bytes9000[9000];
1075 std::fill_n(bytes9000, sizeof(bytes9000), '1');
1076 bytes9000[sizeof(bytes9000) - 1] = '\0';
1077
1078 ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes200);
1079 ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes600);
1080 ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx%sx", bytes200, bytes200, bytes200);
1081 ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes200, bytes600);
1082 ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes600, bytes9000);
1083
1084 ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx", bytes200);
1085 test_logger.SetInfoLogLevel(DEBUG_LEVEL);
1086 ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx%sx%sx", bytes600, bytes9000,
1087 bytes200);
1088
1089 ASSERT_EQ(0, test_logger.log_count);
1090 log_buffer.FlushBufferToLog();
1091 log_buffer_debug.FlushBufferToLog();
1092 ASSERT_EQ(6, test_logger.log_count);
1093 ASSERT_EQ(6, test_logger.char_0_count);
1094 ASSERT_EQ(10, test_logger.char_x_count);
1095 }
1096
1097 class TestLogger2 : public Logger {
1098 public:
1099 explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {}
1100 using Logger::Logv;
1101 virtual void Logv(const char* format, va_list ap) override {
1102 char new_format[2000];
1103 std::fill_n(new_format, sizeof(new_format), '2');
1104 {
1105 va_list backup_ap;
1106 va_copy(backup_ap, ap);
1107 int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
1108 // 48 bytes for extra information + bytes allocated
1109 ASSERT_TRUE(
1110 n <= 48 + static_cast<int>(max_log_size_ - sizeof(struct timeval)));
1111 ASSERT_TRUE(n > static_cast<int>(max_log_size_ - sizeof(struct timeval)));
1112 va_end(backup_ap);
1113 }
1114 }
1115 size_t max_log_size_;
1116 };
1117
1118 TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) {
1119 char bytes9000[9000];
1120 std::fill_n(bytes9000, sizeof(bytes9000), '1');
1121 bytes9000[sizeof(bytes9000) - 1] = '\0';
1122
1123 for (size_t max_log_size = 256; max_log_size <= 1024;
1124 max_log_size += 1024 - 256) {
1125 TestLogger2 test_logger(max_log_size);
1126 test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
1127 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
1128 ROCKS_LOG_BUFFER_MAX_SZ(&log_buffer, max_log_size, "%s", bytes9000);
1129 log_buffer.FlushBufferToLog();
1130 }
1131 }
1132
1133 TEST_P(EnvPosixTestWithParam, Preallocation) {
1134 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1135 const std::string src = test::TmpDir(env_) + "/" + "testfile";
1136 unique_ptr<WritableFile> srcfile;
1137 EnvOptions soptions;
1138 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1139 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1140 if (soptions.use_direct_writes) {
1141 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1142 "NewWritableFile:O_DIRECT", [&](void* arg) {
1143 int* val = static_cast<int*>(arg);
1144 *val &= ~O_DIRECT;
1145 });
1146 }
1147 #endif
1148 ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
1149 srcfile->SetPreallocationBlockSize(1024 * 1024);
1150
1151 // No writes should mean no preallocation
1152 size_t block_size, last_allocated_block;
1153 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1154 ASSERT_EQ(last_allocated_block, 0UL);
1155
1156 // Small write should preallocate one block
1157 size_t kStrSize = 4096;
1158 auto data = NewAligned(kStrSize, 'A');
1159 Slice str(data.get(), kStrSize);
1160 srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize);
1161 srcfile->Append(str);
1162 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1163 ASSERT_EQ(last_allocated_block, 1UL);
1164
1165 // Write an entire preallocation block, make sure we increased by two.
1166 {
1167 auto buf_ptr = NewAligned(block_size, ' ');
1168 Slice buf(buf_ptr.get(), block_size);
1169 srcfile->PrepareWrite(srcfile->GetFileSize(), block_size);
1170 srcfile->Append(buf);
1171 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1172 ASSERT_EQ(last_allocated_block, 2UL);
1173 }
1174
1175 // Write five more blocks at once, ensure we're where we need to be.
1176 {
1177 auto buf_ptr = NewAligned(block_size * 5, ' ');
1178 Slice buf = Slice(buf_ptr.get(), block_size * 5);
1179 srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size());
1180 srcfile->Append(buf);
1181 srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
1182 ASSERT_EQ(last_allocated_block, 7UL);
1183 }
1184 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1185 }
1186
1187 // Test that the two ways to get children file attributes (in bulk or
1188 // individually) behave consistently.
1189 TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
1190 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1191 EnvOptions soptions;
1192 soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
1193 const int kNumChildren = 10;
1194
1195 std::string data;
1196 for (int i = 0; i < kNumChildren; ++i) {
1197 std::ostringstream oss;
1198 oss << test::TmpDir(env_) << "/testfile_" << i;
1199 const std::string path = oss.str();
1200 unique_ptr<WritableFile> file;
1201 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1202 if (soptions.use_direct_writes) {
1203 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1204 "NewWritableFile:O_DIRECT", [&](void* arg) {
1205 int* val = static_cast<int*>(arg);
1206 *val &= ~O_DIRECT;
1207 });
1208 }
1209 #endif
1210 ASSERT_OK(env_->NewWritableFile(path, &file, soptions));
1211 auto buf_ptr = NewAligned(data.size(), 'T');
1212 Slice buf(buf_ptr.get(), data.size());
1213 file->Append(buf);
1214 data.append(std::string(4096, 'T'));
1215 }
1216
1217 std::vector<Env::FileAttributes> file_attrs;
1218 ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs));
1219 for (int i = 0; i < kNumChildren; ++i) {
1220 std::ostringstream oss;
1221 oss << "testfile_" << i;
1222 const std::string name = oss.str();
1223 const std::string path = test::TmpDir(env_) + "/" + name;
1224
1225 auto file_attrs_iter = std::find_if(
1226 file_attrs.begin(), file_attrs.end(),
1227 [&name](const Env::FileAttributes& fm) { return fm.name == name; });
1228 ASSERT_TRUE(file_attrs_iter != file_attrs.end());
1229 uint64_t size;
1230 ASSERT_OK(env_->GetFileSize(path, &size));
1231 ASSERT_EQ(size, 4096 * i);
1232 ASSERT_EQ(size, file_attrs_iter->size_bytes);
1233 }
1234 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1235 }
1236
1237 // Test that all WritableFileWrapper forwards all calls to WritableFile.
1238 TEST_P(EnvPosixTestWithParam, WritableFileWrapper) {
1239 class Base : public WritableFile {
1240 public:
1241 mutable int *step_;
1242
1243 void inc(int x) const {
1244 EXPECT_EQ(x, (*step_)++);
1245 }
1246
1247 explicit Base(int* step) : step_(step) {
1248 inc(0);
1249 }
1250
1251 Status Append(const Slice& data) override { inc(1); return Status::OK(); }
1252 Status Truncate(uint64_t size) override { return Status::OK(); }
1253 Status Close() override { inc(2); return Status::OK(); }
1254 Status Flush() override { inc(3); return Status::OK(); }
1255 Status Sync() override { inc(4); return Status::OK(); }
1256 Status Fsync() override { inc(5); return Status::OK(); }
1257 void SetIOPriority(Env::IOPriority pri) override { inc(6); }
1258 uint64_t GetFileSize() override { inc(7); return 0; }
1259 void GetPreallocationStatus(size_t* block_size,
1260 size_t* last_allocated_block) override {
1261 inc(8);
1262 }
1263 size_t GetUniqueId(char* id, size_t max_size) const override {
1264 inc(9);
1265 return 0;
1266 }
1267 Status InvalidateCache(size_t offset, size_t length) override {
1268 inc(10);
1269 return Status::OK();
1270 }
1271
1272 protected:
1273 Status Allocate(uint64_t offset, uint64_t len) override {
1274 inc(11);
1275 return Status::OK();
1276 }
1277 Status RangeSync(uint64_t offset, uint64_t nbytes) override {
1278 inc(12);
1279 return Status::OK();
1280 }
1281
1282 public:
1283 ~Base() {
1284 inc(13);
1285 }
1286 };
1287
1288 class Wrapper : public WritableFileWrapper {
1289 public:
1290 explicit Wrapper(WritableFile* target) : WritableFileWrapper(target) {}
1291
1292 void CallProtectedMethods() {
1293 Allocate(0, 0);
1294 RangeSync(0, 0);
1295 }
1296 };
1297
1298 int step = 0;
1299
1300 {
1301 Base b(&step);
1302 Wrapper w(&b);
1303 w.Append(Slice());
1304 w.Close();
1305 w.Flush();
1306 w.Sync();
1307 w.Fsync();
1308 w.SetIOPriority(Env::IOPriority::IO_HIGH);
1309 w.GetFileSize();
1310 w.GetPreallocationStatus(nullptr, nullptr);
1311 w.GetUniqueId(nullptr, 0);
1312 w.InvalidateCache(0, 0);
1313 w.CallProtectedMethods();
1314 }
1315
1316 EXPECT_EQ(14, step);
1317 }
1318
1319 TEST_P(EnvPosixTestWithParam, PosixRandomRWFile) {
1320 const std::string path = test::TmpDir(env_) + "/random_rw_file";
1321
1322 env_->DeleteFile(path);
1323
1324 std::unique_ptr<RandomRWFile> file;
1325 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1326
1327 char buf[10000];
1328 Slice read_res;
1329
1330 ASSERT_OK(file->Write(0, "ABCD"));
1331 ASSERT_OK(file->Read(0, 10, &read_res, buf));
1332 ASSERT_EQ(read_res.ToString(), "ABCD");
1333
1334 ASSERT_OK(file->Write(2, "XXXX"));
1335 ASSERT_OK(file->Read(0, 10, &read_res, buf));
1336 ASSERT_EQ(read_res.ToString(), "ABXXXX");
1337
1338 ASSERT_OK(file->Write(10, "ZZZ"));
1339 ASSERT_OK(file->Read(10, 10, &read_res, buf));
1340 ASSERT_EQ(read_res.ToString(), "ZZZ");
1341
1342 ASSERT_OK(file->Write(11, "Y"));
1343 ASSERT_OK(file->Read(10, 10, &read_res, buf));
1344 ASSERT_EQ(read_res.ToString(), "ZYZ");
1345
1346 ASSERT_OK(file->Write(200, "FFFFF"));
1347 ASSERT_OK(file->Read(200, 10, &read_res, buf));
1348 ASSERT_EQ(read_res.ToString(), "FFFFF");
1349
1350 ASSERT_OK(file->Write(205, "XXXX"));
1351 ASSERT_OK(file->Read(200, 10, &read_res, buf));
1352 ASSERT_EQ(read_res.ToString(), "FFFFFXXXX");
1353
1354 ASSERT_OK(file->Write(5, "QQQQ"));
1355 ASSERT_OK(file->Read(0, 9, &read_res, buf));
1356 ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ");
1357
1358 ASSERT_OK(file->Read(2, 4, &read_res, buf));
1359 ASSERT_EQ(read_res.ToString(), "XXXQ");
1360
1361 // Close file and reopen it
1362 file->Close();
1363 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1364
1365 ASSERT_OK(file->Read(0, 9, &read_res, buf));
1366 ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ");
1367
1368 ASSERT_OK(file->Read(10, 3, &read_res, buf));
1369 ASSERT_EQ(read_res.ToString(), "ZYZ");
1370
1371 ASSERT_OK(file->Read(200, 9, &read_res, buf));
1372 ASSERT_EQ(read_res.ToString(), "FFFFFXXXX");
1373
1374 ASSERT_OK(file->Write(4, "TTTTTTTTTTTTTTTT"));
1375 ASSERT_OK(file->Read(0, 10, &read_res, buf));
1376 ASSERT_EQ(read_res.ToString(), "ABXXTTTTTT");
1377
1378 // Clean up
1379 env_->DeleteFile(path);
1380 }
1381
1382 class RandomRWFileWithMirrorString {
1383 public:
1384 explicit RandomRWFileWithMirrorString(RandomRWFile* _file) : file_(_file) {}
1385
1386 void Write(size_t offset, const std::string& data) {
1387 // Write to mirror string
1388 StringWrite(offset, data);
1389
1390 // Write to file
1391 Status s = file_->Write(offset, data);
1392 ASSERT_OK(s) << s.ToString();
1393 }
1394
1395 void Read(size_t offset = 0, size_t n = 1000000) {
1396 Slice str_res(nullptr, 0);
1397 if (offset < file_mirror_.size()) {
1398 size_t str_res_sz = std::min(file_mirror_.size() - offset, n);
1399 str_res = Slice(file_mirror_.data() + offset, str_res_sz);
1400 StopSliceAtNull(&str_res);
1401 }
1402
1403 Slice file_res;
1404 Status s = file_->Read(offset, n, &file_res, buf_);
1405 ASSERT_OK(s) << s.ToString();
1406 StopSliceAtNull(&file_res);
1407
1408 ASSERT_EQ(str_res.ToString(), file_res.ToString()) << offset << " " << n;
1409 }
1410
1411 void SetFile(RandomRWFile* _file) { file_ = _file; }
1412
1413 private:
1414 void StringWrite(size_t offset, const std::string& src) {
1415 if (offset + src.size() > file_mirror_.size()) {
1416 file_mirror_.resize(offset + src.size(), '\0');
1417 }
1418
1419 char* pos = const_cast<char*>(file_mirror_.data() + offset);
1420 memcpy(pos, src.data(), src.size());
1421 }
1422
1423 void StopSliceAtNull(Slice* slc) {
1424 for (size_t i = 0; i < slc->size(); i++) {
1425 if ((*slc)[i] == '\0') {
1426 *slc = Slice(slc->data(), i);
1427 break;
1428 }
1429 }
1430 }
1431
1432 char buf_[10000];
1433 RandomRWFile* file_;
1434 std::string file_mirror_;
1435 };
1436
1437 TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) {
1438 const std::string path = test::TmpDir(env_) + "/random_rw_file_rand";
1439 env_->DeleteFile(path);
1440
1441 unique_ptr<RandomRWFile> file;
1442 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1443 RandomRWFileWithMirrorString file_with_mirror(file.get());
1444
1445 Random rnd(301);
1446 std::string buf;
1447 for (int i = 0; i < 10000; i++) {
1448 // Genrate random data
1449 test::RandomString(&rnd, 10, &buf);
1450
1451 // Pick random offset for write
1452 size_t write_off = rnd.Next() % 1000;
1453 file_with_mirror.Write(write_off, buf);
1454
1455 // Pick random offset for read
1456 size_t read_off = rnd.Next() % 1000;
1457 size_t read_sz = rnd.Next() % 20;
1458 file_with_mirror.Read(read_off, read_sz);
1459
1460 if (i % 500 == 0) {
1461 // Reopen the file every 500 iters
1462 ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
1463 file_with_mirror.SetFile(file.get());
1464 }
1465 }
1466
1467 // clean up
1468 env_->DeleteFile(path);
1469 }
1470
1471 INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam,
1472 ::testing::Values(std::pair<Env*, bool>(Env::Default(),
1473 false)));
1474 #if !defined(ROCKSDB_LITE)
1475 INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO, EnvPosixTestWithParam,
1476 ::testing::Values(std::pair<Env*, bool>(Env::Default(),
1477 true)));
1478 #endif // !defined(ROCKSDB_LITE)
1479
1480 #if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1481 static unique_ptr<Env> chroot_env(NewChrootEnv(Env::Default(),
1482 test::TmpDir(Env::Default())));
1483 INSTANTIATE_TEST_CASE_P(
1484 ChrootEnvWithoutDirectIO, EnvPosixTestWithParam,
1485 ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), false)));
1486 INSTANTIATE_TEST_CASE_P(
1487 ChrootEnvWithDirectIO, EnvPosixTestWithParam,
1488 ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), true)));
1489 #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1490
1491 } // namespace rocksdb
1492
1493 int main(int argc, char** argv) {
1494 ::testing::InitGoogleTest(&argc, argv);
1495 return RUN_ALL_TESTS();
1496 }