1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
11 #include <sys/ioctl.h>
14 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
16 #include <malloc_np.h>
21 #include <sys/types.h>
24 #include <unordered_set>
36 #ifdef ROCKSDB_FALLOCATE_PRESENT
40 #include "env/env_chroot.h"
41 #include "port/port.h"
42 #include "rocksdb/env.h"
43 #include "util/coding.h"
44 #include "util/log_buffer.h"
45 #include "util/mutexlock.h"
46 #include "util/string_util.h"
47 #include "util/sync_point.h"
48 #include "util/testharness.h"
49 #include "util/testutil.h"
52 static const size_t kPageSize
= sysconf(_SC_PAGESIZE
);
54 static const size_t kPageSize
= 4 * 1024;
59 static const int kDelayMicros
= 100000;
62 explicit Deleter(void (*fn
)(void*)) : fn_(fn
) {}
64 void operator()(void* ptr
) {
73 std::unique_ptr
<char, Deleter
> NewAligned(const size_t size
, const char ch
) {
76 if (nullptr == (ptr
= reinterpret_cast<char*>(_aligned_malloc(size
, kPageSize
)))) {
77 return std::unique_ptr
<char, Deleter
>(nullptr, Deleter(_aligned_free
));
79 std::unique_ptr
<char, Deleter
> uptr(ptr
, Deleter(_aligned_free
));
81 if (posix_memalign(reinterpret_cast<void**>(&ptr
), kPageSize
, size
) != 0) {
82 return std::unique_ptr
<char, Deleter
>(nullptr, Deleter(free
));
84 std::unique_ptr
<char, Deleter
> uptr(ptr
, Deleter(free
));
86 memset(uptr
.get(), ch
, size
);
90 class EnvPosixTest
: public testing::Test
{
98 EnvPosixTest() : env_(Env::Default()), direct_io_(false) {}
101 class EnvPosixTestWithParam
102 : public EnvPosixTest
,
103 public ::testing::WithParamInterface
<std::pair
<Env
*, bool>> {
105 EnvPosixTestWithParam() {
106 std::pair
<Env
*, bool> param_pair
= GetParam();
107 env_
= param_pair
.first
;
108 direct_io_
= param_pair
.second
;
111 void WaitThreadPoolsEmpty() {
112 // Wait until the thread pools are empty.
113 while (env_
->GetThreadPoolQueueLen(Env::Priority::LOW
) != 0) {
114 Env::Default()->SleepForMicroseconds(kDelayMicros
);
116 while (env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
) != 0) {
117 Env::Default()->SleepForMicroseconds(kDelayMicros
);
121 ~EnvPosixTestWithParam() override
{ WaitThreadPoolsEmpty(); }
124 static void SetBool(void* ptr
) {
125 reinterpret_cast<std::atomic
<bool>*>(ptr
)->store(true);
128 TEST_F(EnvPosixTest
, DISABLED_RunImmediately
) {
129 for (int pri
= Env::BOTTOM
; pri
< Env::TOTAL
; ++pri
) {
130 std::atomic
<bool> called(false);
131 env_
->SetBackgroundThreads(1, static_cast<Env::Priority
>(pri
));
132 env_
->Schedule(&SetBool
, &called
, static_cast<Env::Priority
>(pri
));
133 Env::Default()->SleepForMicroseconds(kDelayMicros
);
134 ASSERT_TRUE(called
.load());
138 TEST_F(EnvPosixTest
, RunEventually
) {
139 std::atomic
<bool> called(false);
140 env_
->StartThread(&SetBool
, &called
);
142 ASSERT_TRUE(called
.load());
146 TEST_F(EnvPosixTest
, AreFilesSame
) {
149 if (env_
->AreFilesSame("", "", &tmp
).IsNotSupported()) {
151 "skipping EnvBasicTestWithParam.AreFilesSame due to "
152 "unsupported Env::AreFilesSame\n");
157 const EnvOptions soptions
;
158 auto* env
= Env::Default();
159 std::string same_file_name
= test::PerThreadDBPath(env
, "same_file");
160 std::string same_file_link_name
= same_file_name
+ "_link";
162 std::unique_ptr
<WritableFile
> same_file
;
163 ASSERT_OK(env
->NewWritableFile(same_file_name
,
164 &same_file
, soptions
));
165 same_file
->Append("random_data");
166 ASSERT_OK(same_file
->Flush());
169 ASSERT_OK(env
->LinkFile(same_file_name
, same_file_link_name
));
171 ASSERT_OK(env
->AreFilesSame(same_file_name
, same_file_link_name
, &result
));
177 TEST_F(EnvPosixTest
, DISABLED_FilePermission
) {
178 // Only works for Linux environment
179 if (env_
== Env::Default()) {
181 std::vector
<std::string
> fileNames
{
182 test::PerThreadDBPath(env_
, "testfile"),
183 test::PerThreadDBPath(env_
, "testfile1")};
184 std::unique_ptr
<WritableFile
> wfile
;
185 ASSERT_OK(env_
->NewWritableFile(fileNames
[0], &wfile
, soptions
));
186 ASSERT_OK(env_
->NewWritableFile(fileNames
[1], &wfile
, soptions
));
188 std::unique_ptr
<RandomRWFile
> rwfile
;
189 ASSERT_OK(env_
->NewRandomRWFile(fileNames
[1], &rwfile
, soptions
));
192 for (const auto& filename
: fileNames
) {
193 if (::stat(filename
.c_str(), &sb
) == 0) {
194 ASSERT_EQ(sb
.st_mode
& 0777, 0644);
196 env_
->DeleteFile(filename
);
199 env_
->SetAllowNonOwnerAccess(false);
200 ASSERT_OK(env_
->NewWritableFile(fileNames
[0], &wfile
, soptions
));
201 ASSERT_OK(env_
->NewWritableFile(fileNames
[1], &wfile
, soptions
));
203 ASSERT_OK(env_
->NewRandomRWFile(fileNames
[1], &rwfile
, soptions
));
205 for (const auto& filename
: fileNames
) {
206 if (::stat(filename
.c_str(), &sb
) == 0) {
207 ASSERT_EQ(sb
.st_mode
& 0777, 0600);
209 env_
->DeleteFile(filename
);
215 TEST_F(EnvPosixTest
, MemoryMappedFileBuffer
) {
216 const int kFileBytes
= 1 << 15; // 32 KB
217 std::string expected_data
;
218 std::string fname
= test::PerThreadDBPath(env_
, "testfile");
220 std::unique_ptr
<WritableFile
> wfile
;
221 const EnvOptions soptions
;
222 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
225 test::RandomString(&rnd
, kFileBytes
, &expected_data
);
226 ASSERT_OK(wfile
->Append(expected_data
));
229 std::unique_ptr
<MemoryMappedFileBuffer
> mmap_buffer
;
230 Status status
= env_
->NewMemoryMappedFileBuffer(fname
, &mmap_buffer
);
231 // it should be supported at least on linux
232 #if !defined(OS_LINUX)
233 if (status
.IsNotSupported()) {
235 "skipping EnvPosixTest.MemoryMappedFileBuffer due to "
236 "unsupported Env::NewMemoryMappedFileBuffer\n");
239 #endif // !defined(OS_LINUX)
242 ASSERT_NE(nullptr, mmap_buffer
.get());
243 ASSERT_NE(nullptr, mmap_buffer
->GetBase());
244 ASSERT_EQ(kFileBytes
, mmap_buffer
->GetLen());
245 std::string
actual_data(reinterpret_cast<const char*>(mmap_buffer
->GetBase()),
246 mmap_buffer
->GetLen());
247 ASSERT_EQ(expected_data
, actual_data
);
250 TEST_P(EnvPosixTestWithParam
, UnSchedule
) {
251 std::atomic
<bool> called(false);
252 env_
->SetBackgroundThreads(1, Env::LOW
);
254 /* Block the low priority queue */
255 test::SleepingBackgroundTask sleeping_task
, sleeping_task1
;
256 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task
,
259 /* Schedule another task */
260 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task1
,
261 Env::Priority::LOW
, &sleeping_task1
);
263 /* Remove it with a different tag */
264 ASSERT_EQ(0, env_
->UnSchedule(&called
, Env::Priority::LOW
));
266 /* Remove it from the queue with the right tag */
267 ASSERT_EQ(1, env_
->UnSchedule(&sleeping_task1
, Env::Priority::LOW
));
269 // Unblock background thread
270 sleeping_task
.WakeUp();
272 /* Schedule another task */
273 env_
->Schedule(&SetBool
, &called
);
274 for (int i
= 0; i
< kDelayMicros
; i
++) {
278 Env::Default()->SleepForMicroseconds(1);
280 ASSERT_TRUE(called
.load());
282 ASSERT_TRUE(!sleeping_task
.IsSleeping() && !sleeping_task1
.IsSleeping());
283 WaitThreadPoolsEmpty();
286 // This tests assumes that the last scheduled
287 // task will run last. In fact, in the allotted
288 // sleeping time nothing may actually run or they may
289 // run in any order. The purpose of the test is unclear.
291 TEST_P(EnvPosixTestWithParam
, RunMany
) {
292 std::atomic
<int> last_id(0);
295 std::atomic
<int>* last_id_ptr
; // Pointer to shared slot
296 int id
; // Order# for the execution of this callback
298 CB(std::atomic
<int>* p
, int i
) : last_id_ptr(p
), id(i
) {}
300 static void Run(void* v
) {
301 CB
* cb
= reinterpret_cast<CB
*>(v
);
302 int cur
= cb
->last_id_ptr
->load();
303 ASSERT_EQ(cb
->id
- 1, cur
);
304 cb
->last_id_ptr
->store(cb
->id
);
308 // Schedule in different order than start time
313 env_
->Schedule(&CB::Run
, &cb1
);
314 env_
->Schedule(&CB::Run
, &cb2
);
315 env_
->Schedule(&CB::Run
, &cb3
);
316 env_
->Schedule(&CB::Run
, &cb4
);
318 Env::Default()->SleepForMicroseconds(kDelayMicros
);
319 int cur
= last_id
.load(std::memory_order_acquire
);
321 WaitThreadPoolsEmpty();
331 static void ThreadBody(void* arg
) {
332 State
* s
= reinterpret_cast<State
*>(arg
);
339 TEST_P(EnvPosixTestWithParam
, StartThread
) {
342 state
.num_running
= 3;
343 for (int i
= 0; i
< 3; i
++) {
344 env_
->StartThread(&ThreadBody
, &state
);
348 int num
= state
.num_running
;
353 Env::Default()->SleepForMicroseconds(kDelayMicros
);
355 ASSERT_EQ(state
.val
, 3);
356 WaitThreadPoolsEmpty();
359 TEST_P(EnvPosixTestWithParam
, TwoPools
) {
360 // Data structures to signal tasks to run.
362 port::CondVar
cv(&mutex
);
363 bool should_start
= false;
367 CB(const std::string
& pool_name
, int pool_size
, port::Mutex
* trigger_mu
,
368 port::CondVar
* trigger_cv
, bool* _should_start
)
372 pool_size_(pool_size
),
373 pool_name_(pool_name
),
374 trigger_mu_(trigger_mu
),
375 trigger_cv_(trigger_cv
),
376 should_start_(_should_start
) {}
378 static void Run(void* v
) {
379 CB
* cb
= reinterpret_cast<CB
*>(v
);
387 // make sure we don't have more than pool_size_ jobs running.
388 ASSERT_LE(num_running_
, pool_size_
.load());
392 MutexLock
l(trigger_mu_
);
393 while (!(*should_start_
)) {
407 return num_finished_
;
410 void Reset(int pool_size
) {
411 pool_size_
.store(pool_size
);
419 std::atomic
<int> pool_size_
;
420 std::string pool_name_
;
421 port::Mutex
* trigger_mu_
;
422 port::CondVar
* trigger_cv_
;
426 const int kLowPoolSize
= 2;
427 const int kHighPoolSize
= 4;
430 CB
low_pool_job("low", kLowPoolSize
, &mutex
, &cv
, &should_start
);
431 CB
high_pool_job("high", kHighPoolSize
, &mutex
, &cv
, &should_start
);
433 env_
->SetBackgroundThreads(kLowPoolSize
);
434 env_
->SetBackgroundThreads(kHighPoolSize
, Env::Priority::HIGH
);
436 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
437 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
439 // schedule same number of jobs in each pool
440 for (int i
= 0; i
< kJobs
; i
++) {
441 env_
->Schedule(&CB::Run
, &low_pool_job
);
442 env_
->Schedule(&CB::Run
, &high_pool_job
, Env::Priority::HIGH
);
444 // Wait a short while for the jobs to be dispatched.
446 while ((unsigned int)(kJobs
- kLowPoolSize
) !=
447 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
) ||
448 (unsigned int)(kJobs
- kHighPoolSize
) !=
449 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
)) {
450 env_
->SleepForMicroseconds(kDelayMicros
);
451 if (++sleep_count
> 100) {
456 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
457 env_
->GetThreadPoolQueueLen());
458 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
459 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
460 ASSERT_EQ((unsigned int)(kJobs
- kHighPoolSize
),
461 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
463 // Trigger jobs to run.
470 // wait for all jobs to finish
471 while (low_pool_job
.NumFinished() < kJobs
||
472 high_pool_job
.NumFinished() < kJobs
) {
473 env_
->SleepForMicroseconds(kDelayMicros
);
476 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
477 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
479 // Hold jobs to schedule;
480 should_start
= false;
482 // call IncBackgroundThreadsIfNeeded to two pools. One increasing and
483 // the other decreasing
484 env_
->IncBackgroundThreadsIfNeeded(kLowPoolSize
- 1, Env::Priority::LOW
);
485 env_
->IncBackgroundThreadsIfNeeded(kHighPoolSize
+ 1, Env::Priority::HIGH
);
486 high_pool_job
.Reset(kHighPoolSize
+ 1);
487 low_pool_job
.Reset(kLowPoolSize
);
489 // schedule same number of jobs in each pool
490 for (int i
= 0; i
< kJobs
; i
++) {
491 env_
->Schedule(&CB::Run
, &low_pool_job
);
492 env_
->Schedule(&CB::Run
, &high_pool_job
, Env::Priority::HIGH
);
494 // Wait a short while for the jobs to be dispatched.
496 while ((unsigned int)(kJobs
- kLowPoolSize
) !=
497 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
) ||
498 (unsigned int)(kJobs
- (kHighPoolSize
+ 1)) !=
499 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
)) {
500 env_
->SleepForMicroseconds(kDelayMicros
);
501 if (++sleep_count
> 100) {
505 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
506 env_
->GetThreadPoolQueueLen());
507 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
508 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
509 ASSERT_EQ((unsigned int)(kJobs
- (kHighPoolSize
+ 1)),
510 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
512 // Trigger jobs to run.
519 // wait for all jobs to finish
520 while (low_pool_job
.NumFinished() < kJobs
||
521 high_pool_job
.NumFinished() < kJobs
) {
522 env_
->SleepForMicroseconds(kDelayMicros
);
525 env_
->SetBackgroundThreads(kHighPoolSize
, Env::Priority::HIGH
);
526 WaitThreadPoolsEmpty();
529 TEST_P(EnvPosixTestWithParam
, DecreaseNumBgThreads
) {
530 std::vector
<test::SleepingBackgroundTask
> tasks(10);
532 // Set number of thread to 1 first.
533 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
534 Env::Default()->SleepForMicroseconds(kDelayMicros
);
536 // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
537 for (size_t i
= 0; i
< 3; i
++) {
538 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[i
],
539 Env::Priority::HIGH
);
540 Env::Default()->SleepForMicroseconds(kDelayMicros
);
542 ASSERT_EQ(2U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
543 ASSERT_TRUE(tasks
[0].IsSleeping());
544 ASSERT_TRUE(!tasks
[1].IsSleeping());
545 ASSERT_TRUE(!tasks
[2].IsSleeping());
547 // Increase to 2 threads. Task 0, 1 running; 2 waiting
548 env_
->SetBackgroundThreads(2, Env::Priority::HIGH
);
549 Env::Default()->SleepForMicroseconds(kDelayMicros
);
550 ASSERT_EQ(1U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
551 ASSERT_TRUE(tasks
[0].IsSleeping());
552 ASSERT_TRUE(tasks
[1].IsSleeping());
553 ASSERT_TRUE(!tasks
[2].IsSleeping());
555 // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
556 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
557 Env::Default()->SleepForMicroseconds(kDelayMicros
);
558 ASSERT_EQ(1U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
559 ASSERT_TRUE(tasks
[0].IsSleeping());
560 ASSERT_TRUE(tasks
[1].IsSleeping());
561 ASSERT_TRUE(!tasks
[2].IsSleeping());
563 // The last task finishes. Task 0 running, 2 waiting.
565 Env::Default()->SleepForMicroseconds(kDelayMicros
);
566 ASSERT_EQ(1U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
567 ASSERT_TRUE(tasks
[0].IsSleeping());
568 ASSERT_TRUE(!tasks
[1].IsSleeping());
569 ASSERT_TRUE(!tasks
[2].IsSleeping());
571 // Increase to 5 threads. Task 0 and 2 running.
572 env_
->SetBackgroundThreads(5, Env::Priority::HIGH
);
573 Env::Default()->SleepForMicroseconds(kDelayMicros
);
574 ASSERT_EQ((unsigned int)0, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
575 ASSERT_TRUE(tasks
[0].IsSleeping());
576 ASSERT_TRUE(tasks
[2].IsSleeping());
578 // Change number of threads a couple of times while there is no sufficient
580 env_
->SetBackgroundThreads(7, Env::Priority::HIGH
);
581 Env::Default()->SleepForMicroseconds(kDelayMicros
);
583 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
584 env_
->SetBackgroundThreads(3, Env::Priority::HIGH
);
585 Env::Default()->SleepForMicroseconds(kDelayMicros
);
586 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
587 env_
->SetBackgroundThreads(4, Env::Priority::HIGH
);
588 Env::Default()->SleepForMicroseconds(kDelayMicros
);
589 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
590 env_
->SetBackgroundThreads(5, Env::Priority::HIGH
);
591 Env::Default()->SleepForMicroseconds(kDelayMicros
);
592 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
593 env_
->SetBackgroundThreads(4, Env::Priority::HIGH
);
594 Env::Default()->SleepForMicroseconds(kDelayMicros
);
595 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
597 Env::Default()->SleepForMicroseconds(kDelayMicros
* 50);
599 // Enqueue 5 more tasks. Thread pool size now is 4.
600 // Task 0, 3, 4, 5 running;6, 7 waiting.
601 for (size_t i
= 3; i
< 8; i
++) {
602 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[i
],
603 Env::Priority::HIGH
);
605 Env::Default()->SleepForMicroseconds(kDelayMicros
);
606 ASSERT_EQ(2U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
607 ASSERT_TRUE(tasks
[3].IsSleeping());
608 ASSERT_TRUE(tasks
[4].IsSleeping());
609 ASSERT_TRUE(tasks
[5].IsSleeping());
610 ASSERT_TRUE(!tasks
[6].IsSleeping());
611 ASSERT_TRUE(!tasks
[7].IsSleeping());
613 // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
618 Env::Default()->SleepForMicroseconds(kDelayMicros
);
619 ASSERT_EQ((unsigned int)0, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
620 for (size_t i
= 5; i
< 8; i
++) {
621 ASSERT_TRUE(tasks
[i
].IsSleeping());
624 // Shrink back to 1 thread. Still task 5, 6, 7 running
625 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
626 Env::Default()->SleepForMicroseconds(kDelayMicros
);
627 ASSERT_TRUE(tasks
[5].IsSleeping());
628 ASSERT_TRUE(tasks
[6].IsSleeping());
629 ASSERT_TRUE(tasks
[7].IsSleeping());
631 // Wake up task 6. Task 5, 7 running
633 Env::Default()->SleepForMicroseconds(kDelayMicros
);
634 ASSERT_TRUE(tasks
[5].IsSleeping());
635 ASSERT_TRUE(!tasks
[6].IsSleeping());
636 ASSERT_TRUE(tasks
[7].IsSleeping());
638 // Wake up threads 7. Task 5 running
640 Env::Default()->SleepForMicroseconds(kDelayMicros
);
641 ASSERT_TRUE(!tasks
[7].IsSleeping());
643 // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
644 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[8],
645 Env::Priority::HIGH
);
646 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[9],
647 Env::Priority::HIGH
);
648 Env::Default()->SleepForMicroseconds(kDelayMicros
);
649 ASSERT_GT(env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
), (unsigned int)0);
650 ASSERT_TRUE(!tasks
[8].IsSleeping() || !tasks
[9].IsSleeping());
652 // Increase to 4 threads. Task 5, 8, 9 running.
653 env_
->SetBackgroundThreads(4, Env::Priority::HIGH
);
654 Env::Default()->SleepForMicroseconds(kDelayMicros
);
655 ASSERT_EQ((unsigned int)0, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
656 ASSERT_TRUE(tasks
[8].IsSleeping());
657 ASSERT_TRUE(tasks
[9].IsSleeping());
659 // Shrink to 1 thread
660 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
664 Env::Default()->SleepForMicroseconds(kDelayMicros
);
665 ASSERT_TRUE(!tasks
[9].IsSleeping());
666 ASSERT_TRUE(tasks
[8].IsSleeping());
670 Env::Default()->SleepForMicroseconds(kDelayMicros
);
671 ASSERT_TRUE(!tasks
[8].IsSleeping());
673 // Wake up the last thread
676 Env::Default()->SleepForMicroseconds(kDelayMicros
);
677 ASSERT_TRUE(!tasks
[5].IsSleeping());
678 WaitThreadPoolsEmpty();
681 #if (defined OS_LINUX || defined OS_WIN)
682 // Travis doesn't support fallocate or getting unique ID from files for whatever
687 bool IsSingleVarint(const std::string
& s
) {
691 if (!GetVarint64(&slice
, &v
)) {
695 return slice
.size() == 0;
698 bool IsUniqueIDValid(const std::string
& s
) {
699 return !s
.empty() && !IsSingleVarint(s
);
702 const size_t MAX_ID_SIZE
= 100;
703 char temp_id
[MAX_ID_SIZE
];
708 // Determine whether we can use the FS_IOC_GETVERSION ioctl
709 // on a file in directory DIR. Create a temporary file therein,
710 // try to apply the ioctl (save that result), cleanup and
711 // return the result. Return true if it is supported, and
712 // false if anything fails.
713 // Note that this function "knows" that dir has just been created
714 // and is empty, so we create a simply-named test file: "f".
715 bool ioctl_support__FS_IOC_GETVERSION(const std::string
& dir
) {
719 const std::string file
= dir
+ "/f";
722 fd
= open(file
.c_str(), O_CREAT
| O_RDWR
| O_TRUNC
, 0644);
723 } while (fd
< 0 && errno
== EINTR
);
725 bool ok
= (fd
>= 0 && ioctl(fd
, FS_IOC_GETVERSION
, &version
) >= 0);
728 unlink(file
.c_str());
734 // To ensure that Env::GetUniqueId-related tests work correctly, the files
735 // should be stored in regular storage like "hard disk" or "flash device",
736 // and not on a tmpfs file system (like /dev/shm and /tmp on some systems).
737 // Otherwise we cannot get the correct id.
739 // This function serves as the replacement for test::TmpDir(), which may be
740 // customized to be on a file system that doesn't work with GetUniqueId().
742 class IoctlFriendlyTmpdir
{
744 explicit IoctlFriendlyTmpdir() {
747 const char *fmt
= "%s/rocksdb.XXXXXX";
748 const char *tmp
= getenv("TEST_IOCTL_FRIENDLY_TMPDIR");
756 snprintf(dir_buf
, sizeof dir_buf
, fmt
, tmp
);
757 auto result
= _mktemp(dir_buf
);
758 assert(result
!= nullptr);
759 BOOL ret
= CreateDirectory(dir_buf
, NULL
);
763 std::list
<std::string
> candidate_dir_list
= {"/var/tmp", "/tmp"};
765 // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use
766 // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and
767 // add 1 for the trailing NUL byte.
768 if (tmp
&& strlen(tmp
) + strlen(fmt
) - 2 + 1 <= sizeof dir_buf
) {
769 // use $TEST_IOCTL_FRIENDLY_TMPDIR value
770 candidate_dir_list
.push_front(tmp
);
773 for (const std::string
& d
: candidate_dir_list
) {
774 snprintf(dir_buf
, sizeof dir_buf
, fmt
, d
.c_str());
775 if (mkdtemp(dir_buf
)) {
776 if (ioctl_support__FS_IOC_GETVERSION(dir_buf
)) {
780 // Diagnose ioctl-related failure only if this is the
781 // directory specified via that envvar.
782 if (tmp
&& tmp
== d
) {
783 fprintf(stderr
, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is "
784 "not suitable: %s\n", d
.c_str());
786 rmdir(dir_buf
); // ignore failure
789 // mkdtemp failed: diagnose it, but don't give up.
790 fprintf(stderr
, "mkdtemp(%s/...) failed: %s\n", d
.c_str(),
795 fprintf(stderr
, "failed to find an ioctl-friendly temporary directory;"
796 " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n");
801 ~IoctlFriendlyTmpdir() {
805 const std::string
& name() const {
814 TEST_F(EnvPosixTest
, PositionedAppend
) {
815 std::unique_ptr
<WritableFile
> writable_file
;
817 options
.use_direct_writes
= true;
818 options
.use_mmap_writes
= false;
819 IoctlFriendlyTmpdir ift
;
820 ASSERT_OK(env_
->NewWritableFile(ift
.name() + "/f", &writable_file
, options
));
821 const size_t kBlockSize
= 4096;
822 const size_t kDataSize
= kPageSize
;
823 // Write a page worth of 'a'
824 auto data_ptr
= NewAligned(kDataSize
, 'a');
825 Slice
data_a(data_ptr
.get(), kDataSize
);
826 ASSERT_OK(writable_file
->PositionedAppend(data_a
, 0U));
827 // Write a page worth of 'b' right after the first sector
828 data_ptr
= NewAligned(kDataSize
, 'b');
829 Slice
data_b(data_ptr
.get(), kDataSize
);
830 ASSERT_OK(writable_file
->PositionedAppend(data_b
, kBlockSize
));
831 ASSERT_OK(writable_file
->Close());
832 // The file now has 1 sector worth of a followed by a page worth of b
835 std::unique_ptr
<SequentialFile
> seq_file
;
836 ASSERT_OK(env_
->NewSequentialFile(ift
.name() + "/f", &seq_file
, options
));
837 char scratch
[kPageSize
* 2];
839 ASSERT_OK(seq_file
->Read(sizeof(scratch
), &result
, scratch
));
840 ASSERT_EQ(kPageSize
+ kBlockSize
, result
.size());
841 ASSERT_EQ('a', result
[kBlockSize
- 1]);
842 ASSERT_EQ('b', result
[kBlockSize
]);
844 #endif // !ROCKSDB_LITE
846 // Only works in linux platforms
847 TEST_P(EnvPosixTestWithParam
, RandomAccessUniqueID
) {
849 if (env_
== Env::Default()) {
851 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
852 IoctlFriendlyTmpdir ift
;
853 std::string fname
= ift
.name() + "/testfile";
854 std::unique_ptr
<WritableFile
> wfile
;
855 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
857 std::unique_ptr
<RandomAccessFile
> file
;
860 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
861 size_t id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
862 ASSERT_TRUE(id_size
> 0);
863 std::string
unique_id1(temp_id
, id_size
);
864 ASSERT_TRUE(IsUniqueIDValid(unique_id1
));
866 // Get Unique ID again
867 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
868 id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
869 ASSERT_TRUE(id_size
> 0);
870 std::string
unique_id2(temp_id
, id_size
);
871 ASSERT_TRUE(IsUniqueIDValid(unique_id2
));
873 // Get Unique ID again after waiting some time.
874 env_
->SleepForMicroseconds(1000000);
875 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
876 id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
877 ASSERT_TRUE(id_size
> 0);
878 std::string
unique_id3(temp_id
, id_size
);
879 ASSERT_TRUE(IsUniqueIDValid(unique_id3
));
881 // Check IDs are the same.
882 ASSERT_EQ(unique_id1
, unique_id2
);
883 ASSERT_EQ(unique_id2
, unique_id3
);
886 env_
->DeleteFile(fname
);
890 // only works in linux platforms
891 #ifdef ROCKSDB_FALLOCATE_PRESENT
892 TEST_P(EnvPosixTestWithParam
, AllocateTest
) {
893 if (env_
== Env::Default()) {
894 IoctlFriendlyTmpdir ift
;
895 std::string fname
= ift
.name() + "/preallocate_testfile";
897 // Try fallocate in a file to see whether the target file system supports
899 // Skip the test if fallocate is not supported.
900 std::string fname_test_fallocate
= ift
.name() + "/preallocate_testfile_2";
903 fd
= open(fname_test_fallocate
.c_str(), O_CREAT
| O_RDWR
| O_TRUNC
, 0644);
904 } while (fd
< 0 && errno
== EINTR
);
907 int alloc_status
= fallocate(fd
, 0, 0, 1);
910 if (alloc_status
!= 0) {
912 fprintf(stderr
, "Warning: fallocate() fails, %s\n", strerror(err_number
));
915 ASSERT_OK(env_
->DeleteFile(fname_test_fallocate
));
916 if (alloc_status
!= 0 && err_number
== EOPNOTSUPP
) {
917 // The filesystem containing the file does not support fallocate
922 soptions
.use_mmap_writes
= false;
923 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
924 std::unique_ptr
<WritableFile
> wfile
;
925 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
928 size_t kPreallocateSize
= 100 * 1024 * 1024;
929 size_t kBlockSize
= 512;
930 size_t kPageSize
= 4096;
931 size_t kDataSize
= 1024 * 1024;
932 auto data_ptr
= NewAligned(kDataSize
, 'A');
933 Slice
data(data_ptr
.get(), kDataSize
);
934 wfile
->SetPreallocationBlockSize(kPreallocateSize
);
935 wfile
->PrepareWrite(wfile
->GetFileSize(), kDataSize
);
936 ASSERT_OK(wfile
->Append(data
));
937 ASSERT_OK(wfile
->Flush());
940 ASSERT_EQ(stat(fname
.c_str(), &f_stat
), 0);
941 ASSERT_EQ((unsigned int)kDataSize
, f_stat
.st_size
);
942 // verify that blocks are preallocated
943 // Note here that we don't check the exact number of blocks preallocated --
944 // we only require that number of allocated blocks is at least what we
946 // It looks like some FS give us more blocks that we asked for. That's fine.
947 // It might be worth investigating further.
948 ASSERT_LE((unsigned int)(kPreallocateSize
/ kBlockSize
), f_stat
.st_blocks
);
950 // close the file, should deallocate the blocks
953 stat(fname
.c_str(), &f_stat
);
954 ASSERT_EQ((unsigned int)kDataSize
, f_stat
.st_size
);
955 // verify that preallocated blocks were deallocated on file close
956 // Because the FS might give us more blocks, we add a full page to the size
957 // and expect the number of blocks to be less or equal to that.
958 ASSERT_GE((f_stat
.st_size
+ kPageSize
+ kBlockSize
- 1) / kBlockSize
,
959 (unsigned int)f_stat
.st_blocks
);
962 #endif // ROCKSDB_FALLOCATE_PRESENT
964 // Returns true if any of the strings in ss are the prefix of another string.
965 bool HasPrefix(const std::unordered_set
<std::string
>& ss
) {
966 for (const std::string
& s
: ss
) {
970 for (size_t i
= 1; i
< s
.size(); ++i
) {
971 if (ss
.count(s
.substr(0, i
)) != 0) {
979 // Only works in linux and WIN platforms
980 TEST_P(EnvPosixTestWithParam
, RandomAccessUniqueIDConcurrent
) {
981 if (env_
== Env::Default()) {
982 // Check whether a bunch of concurrently existing files have unique IDs.
984 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
987 IoctlFriendlyTmpdir ift
;
988 std::vector
<std::string
> fnames
;
989 for (int i
= 0; i
< 1000; ++i
) {
990 fnames
.push_back(ift
.name() + "/" + "testfile" + ToString(i
));
993 std::unique_ptr
<WritableFile
> wfile
;
994 ASSERT_OK(env_
->NewWritableFile(fnames
[i
], &wfile
, soptions
));
997 // Collect and check whether the IDs are unique.
998 std::unordered_set
<std::string
> ids
;
999 for (const std::string fname
: fnames
) {
1000 std::unique_ptr
<RandomAccessFile
> file
;
1001 std::string unique_id
;
1002 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
1003 size_t id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
1004 ASSERT_TRUE(id_size
> 0);
1005 unique_id
= std::string(temp_id
, id_size
);
1006 ASSERT_TRUE(IsUniqueIDValid(unique_id
));
1008 ASSERT_TRUE(ids
.count(unique_id
) == 0);
1009 ids
.insert(unique_id
);
1013 for (const std::string fname
: fnames
) {
1014 ASSERT_OK(env_
->DeleteFile(fname
));
1017 ASSERT_TRUE(!HasPrefix(ids
));
1021 // Only works in linux and WIN platforms
1022 TEST_P(EnvPosixTestWithParam
, RandomAccessUniqueIDDeletes
) {
1023 if (env_
== Env::Default()) {
1024 EnvOptions soptions
;
1025 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
1027 IoctlFriendlyTmpdir ift
;
1028 std::string fname
= ift
.name() + "/" + "testfile";
1030 // Check that after file is deleted we don't get same ID again in a new
1032 std::unordered_set
<std::string
> ids
;
1033 for (int i
= 0; i
< 1000; ++i
) {
1036 std::unique_ptr
<WritableFile
> wfile
;
1037 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
1041 std::string unique_id
;
1043 std::unique_ptr
<RandomAccessFile
> file
;
1044 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
1045 size_t id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
1046 ASSERT_TRUE(id_size
> 0);
1047 unique_id
= std::string(temp_id
, id_size
);
1050 ASSERT_TRUE(IsUniqueIDValid(unique_id
));
1051 ASSERT_TRUE(ids
.count(unique_id
) == 0);
1052 ids
.insert(unique_id
);
1055 ASSERT_OK(env_
->DeleteFile(fname
));
1058 ASSERT_TRUE(!HasPrefix(ids
));
1062 // Only works in linux platforms
1064 TEST_P(EnvPosixTestWithParam
, DISABLED_InvalidateCache
) {
1066 TEST_P(EnvPosixTestWithParam
, InvalidateCache
) {
1068 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1069 EnvOptions soptions
;
1070 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
1071 std::string fname
= test::PerThreadDBPath(env_
, "testfile");
1073 const size_t kSectorSize
= 512;
1074 auto data
= NewAligned(kSectorSize
, 0);
1075 Slice
slice(data
.get(), kSectorSize
);
1079 std::unique_ptr
<WritableFile
> wfile
;
1080 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1081 if (soptions
.use_direct_writes
) {
1082 soptions
.use_direct_writes
= false;
1085 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
1086 ASSERT_OK(wfile
->Append(slice
));
1087 ASSERT_OK(wfile
->InvalidateCache(0, 0));
1088 ASSERT_OK(wfile
->Close());
1093 std::unique_ptr
<RandomAccessFile
> file
;
1094 auto scratch
= NewAligned(kSectorSize
, 0);
1096 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1097 if (soptions
.use_direct_reads
) {
1098 soptions
.use_direct_reads
= false;
1101 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
1102 ASSERT_OK(file
->Read(0, kSectorSize
, &result
, scratch
.get()));
1103 ASSERT_EQ(memcmp(scratch
.get(), data
.get(), kSectorSize
), 0);
1104 ASSERT_OK(file
->InvalidateCache(0, 11));
1105 ASSERT_OK(file
->InvalidateCache(0, 0));
1110 std::unique_ptr
<SequentialFile
> file
;
1111 auto scratch
= NewAligned(kSectorSize
, 0);
1113 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1114 if (soptions
.use_direct_reads
) {
1115 soptions
.use_direct_reads
= false;
1118 ASSERT_OK(env_
->NewSequentialFile(fname
, &file
, soptions
));
1119 if (file
->use_direct_io()) {
1120 ASSERT_OK(file
->PositionedRead(0, kSectorSize
, &result
, scratch
.get()));
1122 ASSERT_OK(file
->Read(kSectorSize
, &result
, scratch
.get()));
1124 ASSERT_EQ(memcmp(scratch
.get(), data
.get(), kSectorSize
), 0);
1125 ASSERT_OK(file
->InvalidateCache(0, 11));
1126 ASSERT_OK(file
->InvalidateCache(0, 0));
1129 ASSERT_OK(env_
->DeleteFile(fname
));
1130 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1132 #endif // not TRAVIS
1133 #endif // OS_LINUX || OS_WIN
1135 class TestLogger
: public Logger
{
1138 void Logv(const char* format
, va_list ap
) override
{
1141 char new_format
[550];
1142 std::fill_n(new_format
, sizeof(new_format
), '2');
1145 va_copy(backup_ap
, ap
);
1146 int n
= vsnprintf(new_format
, sizeof(new_format
) - 1, format
, backup_ap
);
1147 // 48 bytes for extra information + bytes allocated
1149 // When we have n == -1 there is not a terminating zero expected
1156 if (new_format
[0] == '[') {
1158 ASSERT_TRUE(n
<= 56 + (512 - static_cast<int>(sizeof(struct timeval
))));
1160 ASSERT_TRUE(n
<= 48 + (512 - static_cast<int>(sizeof(struct timeval
))));
1165 for (size_t i
= 0; i
< sizeof(new_format
); i
++) {
1166 if (new_format
[i
] == 'x') {
1168 } else if (new_format
[i
] == '\0') {
1178 TEST_P(EnvPosixTestWithParam
, LogBufferTest
) {
1179 TestLogger test_logger
;
1180 test_logger
.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL
);
1181 test_logger
.log_count
= 0;
1182 test_logger
.char_x_count
= 0;
1183 test_logger
.char_0_count
= 0;
1184 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
, &test_logger
);
1185 LogBuffer
log_buffer_debug(DEBUG_LEVEL
, &test_logger
);
1188 std::fill_n(bytes200
, sizeof(bytes200
), '1');
1189 bytes200
[sizeof(bytes200
) - 1] = '\0';
1191 std::fill_n(bytes600
, sizeof(bytes600
), '1');
1192 bytes600
[sizeof(bytes600
) - 1] = '\0';
1193 char bytes9000
[9000];
1194 std::fill_n(bytes9000
, sizeof(bytes9000
), '1');
1195 bytes9000
[sizeof(bytes9000
) - 1] = '\0';
1197 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx", bytes200
);
1198 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx", bytes600
);
1199 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx%sx%sx", bytes200
, bytes200
, bytes200
);
1200 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx%sx", bytes200
, bytes600
);
1201 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx%sx", bytes600
, bytes9000
);
1203 ROCKS_LOG_BUFFER(&log_buffer_debug
, "x%sx", bytes200
);
1204 test_logger
.SetInfoLogLevel(DEBUG_LEVEL
);
1205 ROCKS_LOG_BUFFER(&log_buffer_debug
, "x%sx%sx%sx", bytes600
, bytes9000
,
1208 ASSERT_EQ(0, test_logger
.log_count
);
1209 log_buffer
.FlushBufferToLog();
1210 log_buffer_debug
.FlushBufferToLog();
1211 ASSERT_EQ(6, test_logger
.log_count
);
1212 ASSERT_EQ(6, test_logger
.char_0_count
);
1213 ASSERT_EQ(10, test_logger
.char_x_count
);
1216 class TestLogger2
: public Logger
{
1218 explicit TestLogger2(size_t max_log_size
) : max_log_size_(max_log_size
) {}
1220 void Logv(const char* format
, va_list ap
) override
{
1221 char new_format
[2000];
1222 std::fill_n(new_format
, sizeof(new_format
), '2');
1225 va_copy(backup_ap
, ap
);
1226 int n
= vsnprintf(new_format
, sizeof(new_format
) - 1, format
, backup_ap
);
1227 // 48 bytes for extra information + bytes allocated
1229 n
<= 48 + static_cast<int>(max_log_size_
- sizeof(struct timeval
)));
1230 ASSERT_TRUE(n
> static_cast<int>(max_log_size_
- sizeof(struct timeval
)));
1234 size_t max_log_size_
;
1237 TEST_P(EnvPosixTestWithParam
, LogBufferMaxSizeTest
) {
1238 char bytes9000
[9000];
1239 std::fill_n(bytes9000
, sizeof(bytes9000
), '1');
1240 bytes9000
[sizeof(bytes9000
) - 1] = '\0';
1242 for (size_t max_log_size
= 256; max_log_size
<= 1024;
1243 max_log_size
+= 1024 - 256) {
1244 TestLogger2
test_logger(max_log_size
);
1245 test_logger
.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL
);
1246 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
, &test_logger
);
1247 ROCKS_LOG_BUFFER_MAX_SZ(&log_buffer
, max_log_size
, "%s", bytes9000
);
1248 log_buffer
.FlushBufferToLog();
1252 TEST_P(EnvPosixTestWithParam
, Preallocation
) {
1253 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1254 const std::string src
= test::PerThreadDBPath(env_
, "testfile");
1255 std::unique_ptr
<WritableFile
> srcfile
;
1256 EnvOptions soptions
;
1257 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
1258 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
1259 if (soptions
.use_direct_writes
) {
1260 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1261 "NewWritableFile:O_DIRECT", [&](void* arg
) {
1262 int* val
= static_cast<int*>(arg
);
1267 ASSERT_OK(env_
->NewWritableFile(src
, &srcfile
, soptions
));
1268 srcfile
->SetPreallocationBlockSize(1024 * 1024);
1270 // No writes should mean no preallocation
1271 size_t block_size
, last_allocated_block
;
1272 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1273 ASSERT_EQ(last_allocated_block
, 0UL);
1275 // Small write should preallocate one block
1276 size_t kStrSize
= 4096;
1277 auto data
= NewAligned(kStrSize
, 'A');
1278 Slice
str(data
.get(), kStrSize
);
1279 srcfile
->PrepareWrite(srcfile
->GetFileSize(), kStrSize
);
1280 srcfile
->Append(str
);
1281 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1282 ASSERT_EQ(last_allocated_block
, 1UL);
1284 // Write an entire preallocation block, make sure we increased by two.
1286 auto buf_ptr
= NewAligned(block_size
, ' ');
1287 Slice
buf(buf_ptr
.get(), block_size
);
1288 srcfile
->PrepareWrite(srcfile
->GetFileSize(), block_size
);
1289 srcfile
->Append(buf
);
1290 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1291 ASSERT_EQ(last_allocated_block
, 2UL);
1294 // Write five more blocks at once, ensure we're where we need to be.
1296 auto buf_ptr
= NewAligned(block_size
* 5, ' ');
1297 Slice buf
= Slice(buf_ptr
.get(), block_size
* 5);
1298 srcfile
->PrepareWrite(srcfile
->GetFileSize(), buf
.size());
1299 srcfile
->Append(buf
);
1300 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1301 ASSERT_EQ(last_allocated_block
, 7UL);
1303 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1306 // Test that the two ways to get children file attributes (in bulk or
1307 // individually) behave consistently.
1308 TEST_P(EnvPosixTestWithParam
, ConsistentChildrenAttributes
) {
1309 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1310 EnvOptions soptions
;
1311 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
1312 const int kNumChildren
= 10;
1315 for (int i
= 0; i
< kNumChildren
; ++i
) {
1316 const std::string path
=
1317 test::TmpDir(env_
) + "/" + "testfile_" + std::to_string(i
);
1318 std::unique_ptr
<WritableFile
> file
;
1319 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
1320 if (soptions
.use_direct_writes
) {
1321 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1322 "NewWritableFile:O_DIRECT", [&](void* arg
) {
1323 int* val
= static_cast<int*>(arg
);
1328 ASSERT_OK(env_
->NewWritableFile(path
, &file
, soptions
));
1329 auto buf_ptr
= NewAligned(data
.size(), 'T');
1330 Slice
buf(buf_ptr
.get(), data
.size());
1332 data
.append(std::string(4096, 'T'));
1335 std::vector
<Env::FileAttributes
> file_attrs
;
1336 ASSERT_OK(env_
->GetChildrenFileAttributes(test::TmpDir(env_
), &file_attrs
));
1337 for (int i
= 0; i
< kNumChildren
; ++i
) {
1338 const std::string name
= "testfile_" + std::to_string(i
);
1339 const std::string path
= test::TmpDir(env_
) + "/" + name
;
1341 auto file_attrs_iter
= std::find_if(
1342 file_attrs
.begin(), file_attrs
.end(),
1343 [&name
](const Env::FileAttributes
& fm
) { return fm
.name
== name
; });
1344 ASSERT_TRUE(file_attrs_iter
!= file_attrs
.end());
1346 ASSERT_OK(env_
->GetFileSize(path
, &size
));
1347 ASSERT_EQ(size
, 4096 * i
);
1348 ASSERT_EQ(size
, file_attrs_iter
->size_bytes
);
1350 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1353 // Test that all WritableFileWrapper forwards all calls to WritableFile.
1354 TEST_P(EnvPosixTestWithParam
, WritableFileWrapper
) {
1355 class Base
: public WritableFile
{
1359 void inc(int x
) const {
1360 EXPECT_EQ(x
, (*step_
)++);
1363 explicit Base(int* step
) : step_(step
) {
1367 Status
Append(const Slice
& /*data*/) override
{
1369 return Status::OK();
1372 Status
PositionedAppend(const Slice
& /*data*/,
1373 uint64_t /*offset*/) override
{
1375 return Status::OK();
1378 Status
Truncate(uint64_t /*size*/) override
{
1380 return Status::OK();
1383 Status
Close() override
{
1385 return Status::OK();
1388 Status
Flush() override
{
1390 return Status::OK();
1393 Status
Sync() override
{
1395 return Status::OK();
1398 Status
Fsync() override
{
1400 return Status::OK();
1403 bool IsSyncThreadSafe() const override
{
1408 bool use_direct_io() const override
{
1413 size_t GetRequiredBufferAlignment() const override
{
1418 void SetIOPriority(Env::IOPriority
/*pri*/) override
{ inc(11); }
1420 Env::IOPriority
GetIOPriority() override
{
1422 return Env::IOPriority::IO_LOW
;
1425 void SetWriteLifeTimeHint(Env::WriteLifeTimeHint
/*hint*/) override
{
1429 Env::WriteLifeTimeHint
GetWriteLifeTimeHint() override
{
1431 return Env::WriteLifeTimeHint::WLTH_NOT_SET
;
1434 uint64_t GetFileSize() override
{
1439 void SetPreallocationBlockSize(size_t /*size*/) override
{ inc(16); }
1441 void GetPreallocationStatus(size_t* /*block_size*/,
1442 size_t* /*last_allocated_block*/) override
{
1446 size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override
{
1451 Status
InvalidateCache(size_t /*offset*/, size_t /*length*/) override
{
1453 return Status::OK();
1456 Status
RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/) override
{
1458 return Status::OK();
1461 void PrepareWrite(size_t /*offset*/, size_t /*len*/) override
{ inc(21); }
1463 Status
Allocate(uint64_t /*offset*/, uint64_t /*len*/) override
{
1465 return Status::OK();
1469 ~Base() override
{ inc(23); }
1472 class Wrapper
: public WritableFileWrapper
{
1474 explicit Wrapper(WritableFile
* target
) : WritableFileWrapper(target
) {}
1483 w
.PositionedAppend(Slice(), 0);
1489 w
.IsSyncThreadSafe();
1491 w
.GetRequiredBufferAlignment();
1492 w
.SetIOPriority(Env::IOPriority::IO_HIGH
);
1494 w
.SetWriteLifeTimeHint(Env::WriteLifeTimeHint::WLTH_NOT_SET
);
1495 w
.GetWriteLifeTimeHint();
1497 w
.SetPreallocationBlockSize(0);
1498 w
.GetPreallocationStatus(nullptr, nullptr);
1499 w
.GetUniqueId(nullptr, 0);
1500 w
.InvalidateCache(0, 0);
1502 w
.PrepareWrite(0, 0);
1506 EXPECT_EQ(24, step
);
1509 TEST_P(EnvPosixTestWithParam
, PosixRandomRWFile
) {
1510 const std::string path
= test::PerThreadDBPath(env_
, "random_rw_file");
1512 env_
->DeleteFile(path
);
1514 std::unique_ptr
<RandomRWFile
> file
;
1516 // Cannot open non-existing file.
1517 ASSERT_NOK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1519 // Create the file using WriteableFile
1521 std::unique_ptr
<WritableFile
> wf
;
1522 ASSERT_OK(env_
->NewWritableFile(path
, &wf
, EnvOptions()));
1525 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1530 ASSERT_OK(file
->Write(0, "ABCD"));
1531 ASSERT_OK(file
->Read(0, 10, &read_res
, buf
));
1532 ASSERT_EQ(read_res
.ToString(), "ABCD");
1534 ASSERT_OK(file
->Write(2, "XXXX"));
1535 ASSERT_OK(file
->Read(0, 10, &read_res
, buf
));
1536 ASSERT_EQ(read_res
.ToString(), "ABXXXX");
1538 ASSERT_OK(file
->Write(10, "ZZZ"));
1539 ASSERT_OK(file
->Read(10, 10, &read_res
, buf
));
1540 ASSERT_EQ(read_res
.ToString(), "ZZZ");
1542 ASSERT_OK(file
->Write(11, "Y"));
1543 ASSERT_OK(file
->Read(10, 10, &read_res
, buf
));
1544 ASSERT_EQ(read_res
.ToString(), "ZYZ");
1546 ASSERT_OK(file
->Write(200, "FFFFF"));
1547 ASSERT_OK(file
->Read(200, 10, &read_res
, buf
));
1548 ASSERT_EQ(read_res
.ToString(), "FFFFF");
1550 ASSERT_OK(file
->Write(205, "XXXX"));
1551 ASSERT_OK(file
->Read(200, 10, &read_res
, buf
));
1552 ASSERT_EQ(read_res
.ToString(), "FFFFFXXXX");
1554 ASSERT_OK(file
->Write(5, "QQQQ"));
1555 ASSERT_OK(file
->Read(0, 9, &read_res
, buf
));
1556 ASSERT_EQ(read_res
.ToString(), "ABXXXQQQQ");
1558 ASSERT_OK(file
->Read(2, 4, &read_res
, buf
));
1559 ASSERT_EQ(read_res
.ToString(), "XXXQ");
1561 // Close file and reopen it
1563 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1565 ASSERT_OK(file
->Read(0, 9, &read_res
, buf
));
1566 ASSERT_EQ(read_res
.ToString(), "ABXXXQQQQ");
1568 ASSERT_OK(file
->Read(10, 3, &read_res
, buf
));
1569 ASSERT_EQ(read_res
.ToString(), "ZYZ");
1571 ASSERT_OK(file
->Read(200, 9, &read_res
, buf
));
1572 ASSERT_EQ(read_res
.ToString(), "FFFFFXXXX");
1574 ASSERT_OK(file
->Write(4, "TTTTTTTTTTTTTTTT"));
1575 ASSERT_OK(file
->Read(0, 10, &read_res
, buf
));
1576 ASSERT_EQ(read_res
.ToString(), "ABXXTTTTTT");
1579 env_
->DeleteFile(path
);
1582 class RandomRWFileWithMirrorString
{
1584 explicit RandomRWFileWithMirrorString(RandomRWFile
* _file
) : file_(_file
) {}
1586 void Write(size_t offset
, const std::string
& data
) {
1587 // Write to mirror string
1588 StringWrite(offset
, data
);
1591 Status s
= file_
->Write(offset
, data
);
1592 ASSERT_OK(s
) << s
.ToString();
1595 void Read(size_t offset
= 0, size_t n
= 1000000) {
1596 Slice
str_res(nullptr, 0);
1597 if (offset
< file_mirror_
.size()) {
1598 size_t str_res_sz
= std::min(file_mirror_
.size() - offset
, n
);
1599 str_res
= Slice(file_mirror_
.data() + offset
, str_res_sz
);
1600 StopSliceAtNull(&str_res
);
1604 Status s
= file_
->Read(offset
, n
, &file_res
, buf_
);
1605 ASSERT_OK(s
) << s
.ToString();
1606 StopSliceAtNull(&file_res
);
1608 ASSERT_EQ(str_res
.ToString(), file_res
.ToString()) << offset
<< " " << n
;
1611 void SetFile(RandomRWFile
* _file
) { file_
= _file
; }
1614 void StringWrite(size_t offset
, const std::string
& src
) {
1615 if (offset
+ src
.size() > file_mirror_
.size()) {
1616 file_mirror_
.resize(offset
+ src
.size(), '\0');
1619 char* pos
= const_cast<char*>(file_mirror_
.data() + offset
);
1620 memcpy(pos
, src
.data(), src
.size());
1623 void StopSliceAtNull(Slice
* slc
) {
1624 for (size_t i
= 0; i
< slc
->size(); i
++) {
1625 if ((*slc
)[i
] == '\0') {
1626 *slc
= Slice(slc
->data(), i
);
1633 RandomRWFile
* file_
;
1634 std::string file_mirror_
;
1637 TEST_P(EnvPosixTestWithParam
, PosixRandomRWFileRandomized
) {
1638 const std::string path
= test::PerThreadDBPath(env_
, "random_rw_file_rand");
1639 env_
->DeleteFile(path
);
1641 std::unique_ptr
<RandomRWFile
> file
;
1644 // Cannot open non-existing file.
1645 ASSERT_NOK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1648 // Create the file using WriteableFile
1650 std::unique_ptr
<WritableFile
> wf
;
1651 ASSERT_OK(env_
->NewWritableFile(path
, &wf
, EnvOptions()));
1654 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1655 RandomRWFileWithMirrorString
file_with_mirror(file
.get());
1659 for (int i
= 0; i
< 10000; i
++) {
1660 // Genrate random data
1661 test::RandomString(&rnd
, 10, &buf
);
1663 // Pick random offset for write
1664 size_t write_off
= rnd
.Next() % 1000;
1665 file_with_mirror
.Write(write_off
, buf
);
1667 // Pick random offset for read
1668 size_t read_off
= rnd
.Next() % 1000;
1669 size_t read_sz
= rnd
.Next() % 20;
1670 file_with_mirror
.Read(read_off
, read_sz
);
1673 // Reopen the file every 500 iters
1674 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1675 file_with_mirror
.SetFile(file
.get());
1680 env_
->DeleteFile(path
);
1683 class TestEnv
: public EnvWrapper
{
1685 explicit TestEnv() : EnvWrapper(Env::Default()),
1688 class TestLogger
: public Logger
{
1691 TestLogger(TestEnv
* env_ptr
) : Logger() { env
= env_ptr
; }
1692 ~TestLogger() override
{
1697 void Logv(const char* /*format*/, va_list /*ap*/) override
{};
1700 Status
CloseImpl() override
{ return CloseHelper(); }
1703 Status
CloseHelper() {
1704 env
->CloseCountInc();;
1705 return Status::OK();
1710 void CloseCountInc() { close_count
++; }
1712 int GetCloseCount() { return close_count
; }
1714 Status
NewLogger(const std::string
& /*fname*/,
1715 std::shared_ptr
<Logger
>* result
) override
{
1716 result
->reset(new TestLogger(this));
1717 return Status::OK();
1724 class EnvTest
: public testing::Test
{};
1726 TEST_F(EnvTest
, Close
) {
1727 TestEnv
* env
= new TestEnv();
1728 std::shared_ptr
<Logger
> logger
;
1731 s
= env
->NewLogger("", &logger
);
1732 ASSERT_EQ(s
, Status::OK());
1733 logger
.get()->Close();
1734 ASSERT_EQ(env
->GetCloseCount(), 1);
1735 // Call Close() again. CloseHelper() should not be called again
1736 logger
.get()->Close();
1737 ASSERT_EQ(env
->GetCloseCount(), 1);
1739 ASSERT_EQ(env
->GetCloseCount(), 1);
1741 s
= env
->NewLogger("", &logger
);
1742 ASSERT_EQ(s
, Status::OK());
1744 ASSERT_EQ(env
->GetCloseCount(), 2);
1749 INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO
, EnvPosixTestWithParam
,
1750 ::testing::Values(std::pair
<Env
*, bool>(Env::Default(),
1752 #if !defined(ROCKSDB_LITE)
1753 INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO
, EnvPosixTestWithParam
,
1754 ::testing::Values(std::pair
<Env
*, bool>(Env::Default(),
1756 #endif // !defined(ROCKSDB_LITE)
1758 #if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1759 static std::unique_ptr
<Env
> chroot_env(
1760 NewChrootEnv(Env::Default(), test::TmpDir(Env::Default())));
1761 INSTANTIATE_TEST_CASE_P(
1762 ChrootEnvWithoutDirectIO
, EnvPosixTestWithParam
,
1763 ::testing::Values(std::pair
<Env
*, bool>(chroot_env
.get(), false)));
1764 INSTANTIATE_TEST_CASE_P(
1765 ChrootEnvWithDirectIO
, EnvPosixTestWithParam
,
1766 ::testing::Values(std::pair
<Env
*, bool>(chroot_env
.get(), true)));
1767 #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1769 } // namespace rocksdb
1771 int main(int argc
, char** argv
) {
1772 ::testing::InitGoogleTest(&argc
, argv
);
1773 return RUN_ALL_TESTS();