]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/env/env_posix.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / env / env_posix.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors
9#include <dirent.h>
10#include <errno.h>
11#include <fcntl.h>
12#if defined(OS_LINUX)
13#include <linux/fs.h>
14#endif
15#include <pthread.h>
16#include <signal.h>
17#include <stdio.h>
18#include <stdlib.h>
19#include <string.h>
20#include <sys/ioctl.h>
21#include <sys/mman.h>
22#include <sys/stat.h>
11fdf7f2 23#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
7c673cae
FG
24#include <sys/statfs.h>
25#include <sys/syscall.h>
11fdf7f2 26#include <sys/sysmacros.h>
7c673cae 27#endif
11fdf7f2 28#include <sys/statvfs.h>
7c673cae
FG
29#include <sys/time.h>
30#include <sys/types.h>
31#include <time.h>
32#include <algorithm>
33// Get nano time includes
34#if defined(OS_LINUX) || defined(OS_FREEBSD)
35#elif defined(__MACH__)
36#include <mach/clock.h>
37#include <mach/mach.h>
38#else
39#include <chrono>
40#endif
41#include <deque>
42#include <set>
43#include <vector>
44
45#include "env/io_posix.h"
46#include "env/posix_logger.h"
47#include "monitoring/iostats_context_imp.h"
48#include "monitoring/thread_status_updater.h"
49#include "port/port.h"
50#include "rocksdb/options.h"
51#include "rocksdb/slice.h"
52#include "util/coding.h"
11fdf7f2 53#include "util/compression_context_cache.h"
7c673cae
FG
54#include "util/logging.h"
55#include "util/random.h"
56#include "util/string_util.h"
57#include "util/sync_point.h"
58#include "util/thread_local.h"
59#include "util/threadpool_imp.h"
60
61#if !defined(TMPFS_MAGIC)
62#define TMPFS_MAGIC 0x01021994
63#endif
64#if !defined(XFS_SUPER_MAGIC)
65#define XFS_SUPER_MAGIC 0x58465342
66#endif
67#if !defined(EXT4_SUPER_MAGIC)
68#define EXT4_SUPER_MAGIC 0xEF53
69#endif
70
71namespace rocksdb {
72
73namespace {
74
75ThreadStatusUpdater* CreateThreadStatusUpdater() {
76 return new ThreadStatusUpdater();
77}
78
11fdf7f2
TL
79inline mode_t GetDBFileMode(bool allow_non_owner_access) {
80 return allow_non_owner_access ? 0644 : 0600;
81}
82
7c673cae
FG
83// list of pathnames that are locked
84static std::set<std::string> lockedFiles;
85static port::Mutex mutex_lockedFiles;
86
11fdf7f2 87static int LockOrUnlock(int fd, bool lock) {
7c673cae
FG
88 errno = 0;
89 struct flock f;
90 memset(&f, 0, sizeof(f));
91 f.l_type = (lock ? F_WRLCK : F_UNLCK);
92 f.l_whence = SEEK_SET;
93 f.l_start = 0;
94 f.l_len = 0; // Lock/unlock entire file
95 int value = fcntl(fd, F_SETLK, &f);
11fdf7f2 96
7c673cae
FG
97 return value;
98}
99
100class PosixFileLock : public FileLock {
101 public:
102 int fd_;
103 std::string filename;
104};
105
11fdf7f2
TL
106int cloexec_flags(int flags, const EnvOptions* options) {
107 // If the system supports opening the file with cloexec enabled,
108 // do so, as this avoids a race condition if a db is opened around
109 // the same time that a child process is forked
110#ifdef O_CLOEXEC
111 if (options == nullptr || options->set_fd_cloexec) {
112 flags |= O_CLOEXEC;
113 }
114#endif
115 return flags;
116}
117
7c673cae
FG
118class PosixEnv : public Env {
119 public:
120 PosixEnv();
121
494da23a 122 ~PosixEnv() override {
7c673cae
FG
123 for (const auto tid : threads_to_join_) {
124 pthread_join(tid, nullptr);
125 }
126 for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
127 thread_pools_[pool_id].JoinAllThreads();
128 }
129 // Delete the thread_status_updater_ only when the current Env is not
130 // Env::Default(). This is to avoid the free-after-use error when
131 // Env::Default() is destructed while some other child threads are
132 // still trying to update thread status.
133 if (this != Env::Default()) {
134 delete thread_status_updater_;
135 }
136 }
137
138 void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
139 if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
140 fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
141 }
142 }
143
494da23a
TL
144 Status NewSequentialFile(const std::string& fname,
145 std::unique_ptr<SequentialFile>* result,
146 const EnvOptions& options) override {
7c673cae
FG
147 result->reset();
148 int fd = -1;
11fdf7f2 149 int flags = cloexec_flags(O_RDONLY, &options);
7c673cae
FG
150 FILE* file = nullptr;
151
152 if (options.use_direct_reads && !options.use_mmap_reads) {
153#ifdef ROCKSDB_LITE
154 return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
155#endif // !ROCKSDB_LITE
156#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
157 flags |= O_DIRECT;
158#endif
159 }
160
161 do {
162 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2 163 fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
7c673cae
FG
164 } while (fd < 0 && errno == EINTR);
165 if (fd < 0) {
11fdf7f2
TL
166 return IOError("While opening a file for sequentially reading", fname,
167 errno);
7c673cae
FG
168 }
169
170 SetFD_CLOEXEC(fd, &options);
171
172 if (options.use_direct_reads && !options.use_mmap_reads) {
173#ifdef OS_MACOSX
174 if (fcntl(fd, F_NOCACHE, 1) == -1) {
175 close(fd);
11fdf7f2 176 return IOError("While fcntl NoCache", fname, errno);
7c673cae
FG
177 }
178#endif
179 } else {
180 do {
181 IOSTATS_TIMER_GUARD(open_nanos);
182 file = fdopen(fd, "r");
183 } while (file == nullptr && errno == EINTR);
184 if (file == nullptr) {
185 close(fd);
11fdf7f2
TL
186 return IOError("While opening file for sequentially read", fname,
187 errno);
7c673cae
FG
188 }
189 }
190 result->reset(new PosixSequentialFile(fname, file, fd, options));
191 return Status::OK();
192 }
193
494da23a
TL
194 Status NewRandomAccessFile(const std::string& fname,
195 std::unique_ptr<RandomAccessFile>* result,
196 const EnvOptions& options) override {
7c673cae
FG
197 result->reset();
198 Status s;
199 int fd;
11fdf7f2
TL
200 int flags = cloexec_flags(O_RDONLY, &options);
201
7c673cae
FG
202 if (options.use_direct_reads && !options.use_mmap_reads) {
203#ifdef ROCKSDB_LITE
204 return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
205#endif // !ROCKSDB_LITE
206#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
207 flags |= O_DIRECT;
208 TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
209#endif
210 }
211
212 do {
213 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2 214 fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
7c673cae
FG
215 } while (fd < 0 && errno == EINTR);
216 if (fd < 0) {
11fdf7f2 217 return IOError("While open a file for random read", fname, errno);
7c673cae
FG
218 }
219 SetFD_CLOEXEC(fd, &options);
220
221 if (options.use_mmap_reads && sizeof(void*) >= 8) {
222 // Use of mmap for random reads has been removed because it
223 // kills performance when storage is fast.
224 // Use mmap when virtual address-space is plentiful.
225 uint64_t size;
226 s = GetFileSize(fname, &size);
227 if (s.ok()) {
228 void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
229 if (base != MAP_FAILED) {
230 result->reset(new PosixMmapReadableFile(fd, fname, base,
231 size, options));
232 } else {
11fdf7f2
TL
233 s = IOError("while mmap file for read", fname, errno);
234 close(fd);
7c673cae
FG
235 }
236 }
7c673cae
FG
237 } else {
238 if (options.use_direct_reads && !options.use_mmap_reads) {
239#ifdef OS_MACOSX
240 if (fcntl(fd, F_NOCACHE, 1) == -1) {
241 close(fd);
11fdf7f2 242 return IOError("while fcntl NoCache", fname, errno);
7c673cae
FG
243 }
244#endif
245 }
246 result->reset(new PosixRandomAccessFile(fname, fd, options));
247 }
248 return s;
249 }
250
11fdf7f2 251 virtual Status OpenWritableFile(const std::string& fname,
494da23a 252 std::unique_ptr<WritableFile>* result,
11fdf7f2
TL
253 const EnvOptions& options,
254 bool reopen = false) {
7c673cae
FG
255 result->reset();
256 Status s;
257 int fd = -1;
11fdf7f2 258 int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
7c673cae
FG
259 // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
260 if (options.use_direct_writes && !options.use_mmap_writes) {
261 // Note: we should avoid O_APPEND here due to ta the following bug:
262 // POSIX requires that opening a file with the O_APPEND flag should
263 // have no affect on the location at which pwrite() writes data.
264 // However, on Linux, if a file is opened with O_APPEND, pwrite()
265 // appends data to the end of the file, regardless of the value of
266 // offset.
267 // More info here: https://linux.die.net/man/2/pwrite
268#ifdef ROCKSDB_LITE
269 return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
270#endif // ROCKSDB_LITE
271 flags |= O_WRONLY;
272#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
273 flags |= O_DIRECT;
274#endif
275 TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
276 } else if (options.use_mmap_writes) {
277 // non-direct I/O
278 flags |= O_RDWR;
279 } else {
280 flags |= O_WRONLY;
281 }
282
11fdf7f2
TL
283 flags = cloexec_flags(flags, &options);
284
7c673cae
FG
285 do {
286 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2 287 fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
7c673cae
FG
288 } while (fd < 0 && errno == EINTR);
289
290 if (fd < 0) {
11fdf7f2 291 s = IOError("While open a file for appending", fname, errno);
7c673cae
FG
292 return s;
293 }
294 SetFD_CLOEXEC(fd, &options);
295
296 if (options.use_mmap_writes) {
297 if (!checkedDiskForMmap_) {
298 // this will be executed once in the program's lifetime.
299 // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
300 if (!SupportsFastAllocate(fname)) {
301 forceMmapOff_ = true;
302 }
303 checkedDiskForMmap_ = true;
304 }
305 }
306 if (options.use_mmap_writes && !forceMmapOff_) {
307 result->reset(new PosixMmapFile(fname, fd, page_size_, options));
308 } else if (options.use_direct_writes && !options.use_mmap_writes) {
309#ifdef OS_MACOSX
310 if (fcntl(fd, F_NOCACHE, 1) == -1) {
311 close(fd);
11fdf7f2
TL
312 s = IOError("While fcntl NoCache an opened file for appending", fname,
313 errno);
7c673cae
FG
314 return s;
315 }
316#elif defined(OS_SOLARIS)
317 if (directio(fd, DIRECTIO_ON) == -1) {
318 if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
319 close(fd);
11fdf7f2 320 s = IOError("While calling directio()", fname, errno);
7c673cae
FG
321 return s;
322 }
323 }
324#endif
325 result->reset(new PosixWritableFile(fname, fd, options));
326 } else {
327 // disable mmap writes
328 EnvOptions no_mmap_writes_options = options;
329 no_mmap_writes_options.use_mmap_writes = false;
330 result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
331 }
332 return s;
333 }
334
494da23a
TL
335 Status NewWritableFile(const std::string& fname,
336 std::unique_ptr<WritableFile>* result,
337 const EnvOptions& options) override {
11fdf7f2
TL
338 return OpenWritableFile(fname, result, options, false);
339 }
340
494da23a
TL
341 Status ReopenWritableFile(const std::string& fname,
342 std::unique_ptr<WritableFile>* result,
343 const EnvOptions& options) override {
11fdf7f2
TL
344 return OpenWritableFile(fname, result, options, true);
345 }
346
494da23a
TL
347 Status ReuseWritableFile(const std::string& fname,
348 const std::string& old_fname,
349 std::unique_ptr<WritableFile>* result,
350 const EnvOptions& options) override {
7c673cae
FG
351 result->reset();
352 Status s;
353 int fd = -1;
354
355 int flags = 0;
356 // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
357 if (options.use_direct_writes && !options.use_mmap_writes) {
358#ifdef ROCKSDB_LITE
359 return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
360#endif // !ROCKSDB_LITE
361 flags |= O_WRONLY;
362#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
363 flags |= O_DIRECT;
364#endif
365 TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
366 } else if (options.use_mmap_writes) {
367 // mmap needs O_RDWR mode
368 flags |= O_RDWR;
369 } else {
370 flags |= O_WRONLY;
371 }
372
11fdf7f2
TL
373 flags = cloexec_flags(flags, &options);
374
7c673cae
FG
375 do {
376 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2
TL
377 fd = open(old_fname.c_str(), flags,
378 GetDBFileMode(allow_non_owner_access_));
7c673cae
FG
379 } while (fd < 0 && errno == EINTR);
380 if (fd < 0) {
11fdf7f2 381 s = IOError("while reopen file for write", fname, errno);
7c673cae
FG
382 return s;
383 }
384
385 SetFD_CLOEXEC(fd, &options);
386 // rename into place
387 if (rename(old_fname.c_str(), fname.c_str()) != 0) {
11fdf7f2 388 s = IOError("while rename file to " + fname, old_fname, errno);
7c673cae
FG
389 close(fd);
390 return s;
391 }
392
393 if (options.use_mmap_writes) {
394 if (!checkedDiskForMmap_) {
395 // this will be executed once in the program's lifetime.
396 // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
397 if (!SupportsFastAllocate(fname)) {
398 forceMmapOff_ = true;
399 }
400 checkedDiskForMmap_ = true;
401 }
402 }
403 if (options.use_mmap_writes && !forceMmapOff_) {
404 result->reset(new PosixMmapFile(fname, fd, page_size_, options));
405 } else if (options.use_direct_writes && !options.use_mmap_writes) {
406#ifdef OS_MACOSX
407 if (fcntl(fd, F_NOCACHE, 1) == -1) {
408 close(fd);
11fdf7f2
TL
409 s = IOError("while fcntl NoCache for reopened file for append", fname,
410 errno);
7c673cae
FG
411 return s;
412 }
413#elif defined(OS_SOLARIS)
414 if (directio(fd, DIRECTIO_ON) == -1) {
415 if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
416 close(fd);
11fdf7f2 417 s = IOError("while calling directio()", fname, errno);
7c673cae
FG
418 return s;
419 }
420 }
421#endif
422 result->reset(new PosixWritableFile(fname, fd, options));
423 } else {
424 // disable mmap writes
425 EnvOptions no_mmap_writes_options = options;
426 no_mmap_writes_options.use_mmap_writes = false;
427 result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
428 }
429 return s;
7c673cae
FG
430 }
431
494da23a
TL
432 Status NewRandomRWFile(const std::string& fname,
433 std::unique_ptr<RandomRWFile>* result,
434 const EnvOptions& options) override {
7c673cae 435 int fd = -1;
11fdf7f2
TL
436 int flags = cloexec_flags(O_RDWR, &options);
437
7c673cae
FG
438 while (fd < 0) {
439 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2
TL
440
441 fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
7c673cae
FG
442 if (fd < 0) {
443 // Error while opening the file
444 if (errno == EINTR) {
445 continue;
446 }
11fdf7f2 447 return IOError("While open file for random read/write", fname, errno);
7c673cae
FG
448 }
449 }
450
451 SetFD_CLOEXEC(fd, &options);
452 result->reset(new PosixRandomRWFile(fname, fd, options));
453 return Status::OK();
454 }
455
494da23a 456 Status NewMemoryMappedFileBuffer(
11fdf7f2 457 const std::string& fname,
494da23a 458 std::unique_ptr<MemoryMappedFileBuffer>* result) override {
11fdf7f2
TL
459 int fd = -1;
460 Status status;
461 int flags = cloexec_flags(O_RDWR, nullptr);
462
463 while (fd < 0) {
464 IOSTATS_TIMER_GUARD(open_nanos);
465 fd = open(fname.c_str(), flags, 0644);
466 if (fd < 0) {
467 // Error while opening the file
468 if (errno == EINTR) {
469 continue;
470 }
471 status =
472 IOError("While open file for raw mmap buffer access", fname, errno);
473 break;
474 }
475 }
476 uint64_t size;
477 if (status.ok()) {
478 status = GetFileSize(fname, &size);
479 }
480 void* base = nullptr;
481 if (status.ok()) {
482 base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
483 MAP_SHARED, fd, 0);
484 if (base == MAP_FAILED) {
485 status = IOError("while mmap file for read", fname, errno);
486 }
487 }
488 if (status.ok()) {
489 result->reset(
490 new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
491 }
492 if (fd >= 0) {
493 // don't need to keep it open after mmap has been called
494 close(fd);
495 }
496 return status;
497 }
498
494da23a
TL
499 Status NewDirectory(const std::string& name,
500 std::unique_ptr<Directory>* result) override {
7c673cae
FG
501 result->reset();
502 int fd;
11fdf7f2 503 int flags = cloexec_flags(0, nullptr);
7c673cae
FG
504 {
505 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2 506 fd = open(name.c_str(), flags);
7c673cae
FG
507 }
508 if (fd < 0) {
11fdf7f2 509 return IOError("While open directory", name, errno);
7c673cae
FG
510 } else {
511 result->reset(new PosixDirectory(fd));
512 }
513 return Status::OK();
514 }
515
494da23a 516 Status FileExists(const std::string& fname) override {
7c673cae
FG
517 int result = access(fname.c_str(), F_OK);
518
519 if (result == 0) {
520 return Status::OK();
521 }
522
11fdf7f2
TL
523 int err = errno;
524 switch (err) {
7c673cae
FG
525 case EACCES:
526 case ELOOP:
527 case ENAMETOOLONG:
528 case ENOENT:
529 case ENOTDIR:
530 return Status::NotFound();
531 default:
11fdf7f2
TL
532 assert(err == EIO || err == ENOMEM);
533 return Status::IOError("Unexpected error(" + ToString(err) +
7c673cae
FG
534 ") accessing file `" + fname + "' ");
535 }
536 }
537
494da23a
TL
538 Status GetChildren(const std::string& dir,
539 std::vector<std::string>* result) override {
7c673cae
FG
540 result->clear();
541 DIR* d = opendir(dir.c_str());
542 if (d == nullptr) {
543 switch (errno) {
544 case EACCES:
545 case ENOENT:
546 case ENOTDIR:
547 return Status::NotFound();
548 default:
11fdf7f2 549 return IOError("While opendir", dir, errno);
7c673cae
FG
550 }
551 }
552 struct dirent* entry;
553 while ((entry = readdir(d)) != nullptr) {
554 result->push_back(entry->d_name);
555 }
556 closedir(d);
557 return Status::OK();
558 }
559
494da23a 560 Status DeleteFile(const std::string& fname) override {
7c673cae
FG
561 Status result;
562 if (unlink(fname.c_str()) != 0) {
11fdf7f2 563 result = IOError("while unlink() file", fname, errno);
7c673cae
FG
564 }
565 return result;
566 };
567
494da23a 568 Status CreateDir(const std::string& name) override {
7c673cae
FG
569 Status result;
570 if (mkdir(name.c_str(), 0755) != 0) {
11fdf7f2 571 result = IOError("While mkdir", name, errno);
7c673cae
FG
572 }
573 return result;
574 };
575
494da23a 576 Status CreateDirIfMissing(const std::string& name) override {
7c673cae
FG
577 Status result;
578 if (mkdir(name.c_str(), 0755) != 0) {
579 if (errno != EEXIST) {
11fdf7f2 580 result = IOError("While mkdir if missing", name, errno);
7c673cae
FG
581 } else if (!DirExists(name)) { // Check that name is actually a
582 // directory.
583 // Message is taken from mkdir
584 result = Status::IOError("`"+name+"' exists but is not a directory");
585 }
586 }
587 return result;
588 };
589
494da23a 590 Status DeleteDir(const std::string& name) override {
7c673cae
FG
591 Status result;
592 if (rmdir(name.c_str()) != 0) {
11fdf7f2 593 result = IOError("file rmdir", name, errno);
7c673cae
FG
594 }
595 return result;
596 };
597
494da23a 598 Status GetFileSize(const std::string& fname, uint64_t* size) override {
7c673cae
FG
599 Status s;
600 struct stat sbuf;
601 if (stat(fname.c_str(), &sbuf) != 0) {
602 *size = 0;
11fdf7f2 603 s = IOError("while stat a file for size", fname, errno);
7c673cae
FG
604 } else {
605 *size = sbuf.st_size;
606 }
607 return s;
608 }
609
494da23a
TL
610 Status GetFileModificationTime(const std::string& fname,
611 uint64_t* file_mtime) override {
7c673cae
FG
612 struct stat s;
613 if (stat(fname.c_str(), &s) !=0) {
11fdf7f2 614 return IOError("while stat a file for modification time", fname, errno);
7c673cae
FG
615 }
616 *file_mtime = static_cast<uint64_t>(s.st_mtime);
617 return Status::OK();
618 }
494da23a
TL
619 Status RenameFile(const std::string& src,
620 const std::string& target) override {
7c673cae
FG
621 Status result;
622 if (rename(src.c_str(), target.c_str()) != 0) {
11fdf7f2 623 result = IOError("While renaming a file to " + target, src, errno);
7c673cae
FG
624 }
625 return result;
626 }
627
494da23a 628 Status LinkFile(const std::string& src, const std::string& target) override {
7c673cae
FG
629 Status result;
630 if (link(src.c_str(), target.c_str()) != 0) {
631 if (errno == EXDEV) {
632 return Status::NotSupported("No cross FS links allowed");
633 }
11fdf7f2 634 result = IOError("while link file to " + target, src, errno);
7c673cae
FG
635 }
636 return result;
637 }
638
11fdf7f2
TL
639 Status NumFileLinks(const std::string& fname, uint64_t* count) override {
640 struct stat s;
641 if (stat(fname.c_str(), &s) != 0) {
642 return IOError("while stat a file for num file links", fname, errno);
643 }
644 *count = static_cast<uint64_t>(s.st_nlink);
645 return Status::OK();
646 }
647
494da23a
TL
648 Status AreFilesSame(const std::string& first, const std::string& second,
649 bool* res) override {
11fdf7f2
TL
650 struct stat statbuf[2];
651 if (stat(first.c_str(), &statbuf[0]) != 0) {
652 return IOError("stat file", first, errno);
653 }
654 if (stat(second.c_str(), &statbuf[1]) != 0) {
655 return IOError("stat file", second, errno);
656 }
657
658 if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) ||
659 minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) ||
660 statbuf[0].st_ino != statbuf[1].st_ino) {
661 *res = false;
662 } else {
663 *res = true;
664 }
665 return Status::OK();
666 }
667
494da23a 668 Status LockFile(const std::string& fname, FileLock** lock) override {
7c673cae
FG
669 *lock = nullptr;
670 Status result;
11fdf7f2
TL
671
672 mutex_lockedFiles.Lock();
673 // If it already exists in the lockedFiles set, then it is already locked,
674 // and fail this lock attempt. Otherwise, insert it into lockedFiles.
675 // This check is needed because fcntl() does not detect lock conflict
676 // if the fcntl is issued by the same thread that earlier acquired
677 // this lock.
678 // We must do this check *before* opening the file:
679 // Otherwise, we will open a new file descriptor. Locks are associated with
680 // a process, not a file descriptor and when *any* file descriptor is closed,
681 // all locks the process holds for that *file* are released
682 if (lockedFiles.insert(fname).second == false) {
683 mutex_lockedFiles.Unlock();
684 errno = ENOLCK;
685 return IOError("lock ", fname, errno);
686 }
687
7c673cae 688 int fd;
11fdf7f2
TL
689 int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr);
690
7c673cae
FG
691 {
692 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2 693 fd = open(fname.c_str(), flags, 0644);
7c673cae
FG
694 }
695 if (fd < 0) {
11fdf7f2
TL
696 result = IOError("while open a file for lock", fname, errno);
697 } else if (LockOrUnlock(fd, true) == -1) {
698 // if there is an error in locking, then remove the pathname from lockedfiles
699 lockedFiles.erase(fname);
700 result = IOError("While lock file", fname, errno);
7c673cae
FG
701 close(fd);
702 } else {
703 SetFD_CLOEXEC(fd, nullptr);
704 PosixFileLock* my_lock = new PosixFileLock;
705 my_lock->fd_ = fd;
706 my_lock->filename = fname;
707 *lock = my_lock;
708 }
11fdf7f2
TL
709
710 mutex_lockedFiles.Unlock();
7c673cae
FG
711 return result;
712 }
713
494da23a 714 Status UnlockFile(FileLock* lock) override {
7c673cae
FG
715 PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
716 Status result;
11fdf7f2
TL
717 mutex_lockedFiles.Lock();
718 // If we are unlocking, then verify that we had locked it earlier,
719 // it should already exist in lockedFiles. Remove it from lockedFiles.
720 if (lockedFiles.erase(my_lock->filename) != 1) {
721 errno = ENOLCK;
722 result = IOError("unlock", my_lock->filename, errno);
723 } else if (LockOrUnlock(my_lock->fd_, false) == -1) {
724 result = IOError("unlock", my_lock->filename, errno);
7c673cae
FG
725 }
726 close(my_lock->fd_);
727 delete my_lock;
11fdf7f2 728 mutex_lockedFiles.Unlock();
7c673cae
FG
729 return result;
730 }
731
494da23a
TL
732 void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
733 void* tag = nullptr,
734 void (*unschedFunction)(void* arg) = nullptr) override;
7c673cae 735
494da23a 736 int UnSchedule(void* arg, Priority pri) override;
7c673cae 737
494da23a 738 void StartThread(void (*function)(void* arg), void* arg) override;
7c673cae 739
494da23a 740 void WaitForJoin() override;
7c673cae 741
494da23a 742 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
7c673cae 743
494da23a 744 Status GetTestDirectory(std::string* result) override {
7c673cae
FG
745 const char* env = getenv("TEST_TMPDIR");
746 if (env && env[0] != '\0') {
747 *result = env;
748 } else {
749 char buf[100];
750 snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
751 *result = buf;
752 }
753 // Directory may already exist
754 CreateDir(*result);
755 return Status::OK();
756 }
757
494da23a 758 Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
7c673cae
FG
759 assert(thread_status_updater_);
760 return thread_status_updater_->GetThreadList(thread_list);
761 }
762
763 static uint64_t gettid(pthread_t tid) {
764 uint64_t thread_id = 0;
765 memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
766 return thread_id;
767 }
768
769 static uint64_t gettid() {
770 pthread_t tid = pthread_self();
771 return gettid(tid);
772 }
773
494da23a 774 uint64_t GetThreadID() const override { return gettid(pthread_self()); }
7c673cae 775
494da23a 776 Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override {
11fdf7f2
TL
777 struct statvfs sbuf;
778
779 if (statvfs(fname.c_str(), &sbuf) < 0) {
780 return IOError("While doing statvfs", fname, errno);
781 }
782
783 *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
784 return Status::OK();
785 }
786
494da23a
TL
787 Status NewLogger(const std::string& fname,
788 std::shared_ptr<Logger>* result) override {
7c673cae
FG
789 FILE* f;
790 {
791 IOSTATS_TIMER_GUARD(open_nanos);
11fdf7f2
TL
792 f = fopen(fname.c_str(), "w"
793#ifdef __GLIBC_PREREQ
794#if __GLIBC_PREREQ(2, 7)
795 "e" // glibc extension to enable O_CLOEXEC
796#endif
797#endif
798 );
7c673cae
FG
799 }
800 if (f == nullptr) {
801 result->reset();
11fdf7f2 802 return IOError("when fopen a file for new logger", fname, errno);
7c673cae
FG
803 } else {
804 int fd = fileno(f);
805#ifdef ROCKSDB_FALLOCATE_PRESENT
806 fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
807#endif
808 SetFD_CLOEXEC(fd, nullptr);
809 result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
810 return Status::OK();
811 }
812 }
813
494da23a 814 uint64_t NowMicros() override {
7c673cae
FG
815 struct timeval tv;
816 gettimeofday(&tv, nullptr);
817 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
818 }
819
494da23a 820 uint64_t NowNanos() override {
7c673cae
FG
821#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
822 struct timespec ts;
823 clock_gettime(CLOCK_MONOTONIC, &ts);
824 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
825#elif defined(OS_SOLARIS)
826 return gethrtime();
827#elif defined(__MACH__)
828 clock_serv_t cclock;
829 mach_timespec_t ts;
830 host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
831 clock_get_time(cclock, &ts);
832 mach_port_deallocate(mach_task_self(), cclock);
833 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
834#else
835 return std::chrono::duration_cast<std::chrono::nanoseconds>(
836 std::chrono::steady_clock::now().time_since_epoch()).count();
837#endif
838 }
839
494da23a
TL
840 uint64_t NowCPUNanos() override {
841#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
842 defined(__MACH__)
843 struct timespec ts;
844 clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
845 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
846#endif
847 return 0;
848 }
849
850 void SleepForMicroseconds(int micros) override { usleep(micros); }
7c673cae 851
494da23a 852 Status GetHostName(char* name, uint64_t len) override {
7c673cae
FG
853 int ret = gethostname(name, static_cast<size_t>(len));
854 if (ret < 0) {
855 if (errno == EFAULT || errno == EINVAL)
856 return Status::InvalidArgument(strerror(errno));
857 else
11fdf7f2 858 return IOError("GetHostName", name, errno);
7c673cae
FG
859 }
860 return Status::OK();
861 }
862
494da23a 863 Status GetCurrentTime(int64_t* unix_time) override {
7c673cae
FG
864 time_t ret = time(nullptr);
865 if (ret == (time_t) -1) {
11fdf7f2 866 return IOError("GetCurrentTime", "", errno);
7c673cae
FG
867 }
868 *unix_time = (int64_t) ret;
869 return Status::OK();
870 }
871
494da23a
TL
872 Status GetAbsolutePath(const std::string& db_path,
873 std::string* output_path) override {
11fdf7f2 874 if (!db_path.empty() && db_path[0] == '/') {
7c673cae
FG
875 *output_path = db_path;
876 return Status::OK();
877 }
878
879 char the_path[256];
880 char* ret = getcwd(the_path, 256);
881 if (ret == nullptr) {
882 return Status::IOError(strerror(errno));
883 }
884
885 *output_path = ret;
886 return Status::OK();
887 }
888
889 // Allow increasing the number of worker threads.
494da23a 890 void SetBackgroundThreads(int num, Priority pri) override {
11fdf7f2 891 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
7c673cae
FG
892 thread_pools_[pri].SetBackgroundThreads(num);
893 }
894
494da23a 895 int GetBackgroundThreads(Priority pri) override {
11fdf7f2
TL
896 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
897 return thread_pools_[pri].GetBackgroundThreads();
898 }
899
494da23a 900 Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
11fdf7f2
TL
901 allow_non_owner_access_ = allow_non_owner_access;
902 return Status::OK();
903 }
904
7c673cae 905 // Allow increasing the number of worker threads.
494da23a 906 void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
11fdf7f2 907 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
7c673cae
FG
908 thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
909 }
910
494da23a 911 void LowerThreadPoolIOPriority(Priority pool = LOW) override {
11fdf7f2 912 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
7c673cae
FG
913#ifdef OS_LINUX
914 thread_pools_[pool].LowerIOPriority();
11fdf7f2
TL
915#else
916 (void)pool;
917#endif
918 }
919
494da23a 920 void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
11fdf7f2
TL
921 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
922#ifdef OS_LINUX
923 thread_pools_[pool].LowerCPUPriority();
924#else
925 (void)pool;
7c673cae
FG
926#endif
927 }
928
494da23a 929 std::string TimeToString(uint64_t secondsSince1970) override {
7c673cae
FG
930 const time_t seconds = (time_t)secondsSince1970;
931 struct tm t;
932 int maxsize = 64;
933 std::string dummy;
934 dummy.reserve(maxsize);
935 dummy.resize(maxsize);
936 char* p = &dummy[0];
937 localtime_r(&seconds, &t);
938 snprintf(p, maxsize,
939 "%04d/%02d/%02d-%02d:%02d:%02d ",
940 t.tm_year + 1900,
941 t.tm_mon + 1,
942 t.tm_mday,
943 t.tm_hour,
944 t.tm_min,
945 t.tm_sec);
946 return dummy;
947 }
948
949 EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
950 const DBOptions& db_options) const override {
951 EnvOptions optimized = env_options;
952 optimized.use_mmap_writes = false;
953 optimized.use_direct_writes = false;
954 optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
955 // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
956 // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
957 // test and make this false
958 optimized.fallocate_with_keep_size = true;
11fdf7f2
TL
959 optimized.writable_file_max_buffer_size =
960 db_options.writable_file_max_buffer_size;
7c673cae
FG
961 return optimized;
962 }
963
964 EnvOptions OptimizeForManifestWrite(
965 const EnvOptions& env_options) const override {
966 EnvOptions optimized = env_options;
967 optimized.use_mmap_writes = false;
968 optimized.use_direct_writes = false;
969 optimized.fallocate_with_keep_size = true;
970 return optimized;
971 }
972
973 private:
974 bool checkedDiskForMmap_;
975 bool forceMmapOff_; // do we override Env options?
976
977 // Returns true iff the named directory exists and is a directory.
978 virtual bool DirExists(const std::string& dname) {
979 struct stat statbuf;
980 if (stat(dname.c_str(), &statbuf) == 0) {
981 return S_ISDIR(statbuf.st_mode);
982 }
983 return false; // stat() failed return false
984 }
985
986 bool SupportsFastAllocate(const std::string& path) {
987#ifdef ROCKSDB_FALLOCATE_PRESENT
988 struct statfs s;
989 if (statfs(path.c_str(), &s)){
990 return false;
991 }
992 switch (s.f_type) {
993 case EXT4_SUPER_MAGIC:
994 return true;
995 case XFS_SUPER_MAGIC:
996 return true;
997 case TMPFS_MAGIC:
998 return true;
999 default:
1000 return false;
1001 }
1002#else
11fdf7f2 1003 (void)path;
7c673cae
FG
1004 return false;
1005#endif
1006 }
1007
1008 size_t page_size_;
1009
1010 std::vector<ThreadPoolImpl> thread_pools_;
1011 pthread_mutex_t mu_;
1012 std::vector<pthread_t> threads_to_join_;
11fdf7f2
TL
1013 // If true, allow non owner read access for db files. Otherwise, non-owner
1014 // has no access to db files.
1015 bool allow_non_owner_access_;
7c673cae
FG
1016};
1017
1018PosixEnv::PosixEnv()
1019 : checkedDiskForMmap_(false),
1020 forceMmapOff_(false),
1021 page_size_(getpagesize()),
11fdf7f2
TL
1022 thread_pools_(Priority::TOTAL),
1023 allow_non_owner_access_(true) {
7c673cae
FG
1024 ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
1025 for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
1026 thread_pools_[pool_id].SetThreadPriority(
1027 static_cast<Env::Priority>(pool_id));
1028 // This allows later initializing the thread-local-env of each thread.
1029 thread_pools_[pool_id].SetHostEnv(this);
1030 }
1031 thread_status_updater_ = CreateThreadStatusUpdater();
1032}
1033
1034void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
1035 void* tag, void (*unschedFunction)(void* arg)) {
11fdf7f2 1036 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
7c673cae
FG
1037 thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
1038}
1039
1040int PosixEnv::UnSchedule(void* arg, Priority pri) {
1041 return thread_pools_[pri].UnSchedule(arg);
1042}
1043
1044unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
11fdf7f2 1045 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
7c673cae
FG
1046 return thread_pools_[pri].GetQueueLen();
1047}
1048
1049struct StartThreadState {
1050 void (*user_function)(void*);
1051 void* arg;
1052};
1053
1054static void* StartThreadWrapper(void* arg) {
1055 StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
1056 state->user_function(state->arg);
1057 delete state;
1058 return nullptr;
1059}
1060
1061void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
1062 pthread_t t;
1063 StartThreadState* state = new StartThreadState;
1064 state->user_function = function;
1065 state->arg = arg;
1066 ThreadPoolImpl::PthreadCall(
1067 "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
1068 ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
1069 threads_to_join_.push_back(t);
1070 ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
1071}
1072
1073void PosixEnv::WaitForJoin() {
1074 for (const auto tid : threads_to_join_) {
1075 pthread_join(tid, nullptr);
1076 }
1077 threads_to_join_.clear();
1078}
1079
1080} // namespace
1081
1082std::string Env::GenerateUniqueId() {
1083 std::string uuid_file = "/proc/sys/kernel/random/uuid";
1084
1085 Status s = FileExists(uuid_file);
1086 if (s.ok()) {
1087 std::string uuid;
1088 s = ReadFileToString(this, uuid_file, &uuid);
1089 if (s.ok()) {
1090 return uuid;
1091 }
1092 }
1093 // Could not read uuid_file - generate uuid using "nanos-random"
1094 Random64 r(time(nullptr));
1095 uint64_t random_uuid_portion =
1096 r.Uniform(std::numeric_limits<uint64_t>::max());
1097 uint64_t nanos_uuid_portion = NowNanos();
1098 char uuid2[200];
1099 snprintf(uuid2,
1100 200,
1101 "%lx-%lx",
1102 (unsigned long)nanos_uuid_portion,
1103 (unsigned long)random_uuid_portion);
1104 return uuid2;
1105}
1106
1107//
1108// Default Posix Env
1109//
1110Env* Env::Default() {
1111 // The following function call initializes the singletons of ThreadLocalPtr
1112 // right before the static default_env. This guarantees default_env will
1113 // always being destructed before the ThreadLocalPtr singletons get
1114 // destructed as C++ guarantees that the destructions of static variables
1115 // is in the reverse order of their constructions.
1116 //
1117 // Since static members are destructed in the reverse order
1118 // of their construction, having this call here guarantees that
1119 // the destructor of static PosixEnv will go first, then the
1120 // the singletons of ThreadLocalPtr.
1121 ThreadLocalPtr::InitSingletons();
11fdf7f2
TL
1122 CompressionContextCache::InitSingleton();
1123 INIT_SYNC_POINT_SINGLETONS();
7c673cae
FG
1124 static PosixEnv default_env;
1125 return &default_env;
1126}
1127
1128} // namespace rocksdb