]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/env/env_posix.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / env / env_posix.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
9
10 #if !defined(OS_WIN)
11
12 #include <dirent.h>
13 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
14 #include <dlfcn.h>
15 #endif
16 #include <errno.h>
17 #include <fcntl.h>
18
19 #if defined(ROCKSDB_IOURING_PRESENT)
20 #include <liburing.h>
21 #endif
22 #include <pthread.h>
23 #include <signal.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <sys/mman.h>
28 #include <sys/stat.h>
29 #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
30 #include <sys/statfs.h>
31 #endif
32 #include <sys/statvfs.h>
33 #include <sys/time.h>
34 #include <sys/types.h>
35 #if defined(ROCKSDB_IOURING_PRESENT)
36 #include <sys/uio.h>
37 #endif
38 #include <time.h>
39 #include <algorithm>
40 // Get nano time includes
41 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD)
42 #elif defined(__MACH__)
43 #include <Availability.h>
44 #include <mach/clock.h>
45 #include <mach/mach.h>
46 #else
47 #include <chrono>
48 #endif
49 #include <deque>
50 #include <set>
51 #include <vector>
52
53 #include "env/composite_env_wrapper.h"
54 #include "env/io_posix.h"
55 #include "logging/posix_logger.h"
56 #include "monitoring/iostats_context_imp.h"
57 #include "monitoring/thread_status_updater.h"
58 #include "port/port.h"
59 #include "rocksdb/options.h"
60 #include "rocksdb/slice.h"
61 #include "test_util/sync_point.h"
62 #include "util/coding.h"
63 #include "util/compression_context_cache.h"
64 #include "util/random.h"
65 #include "util/string_util.h"
66 #include "util/thread_local.h"
67 #include "util/threadpool_imp.h"
68
69 #if !defined(TMPFS_MAGIC)
70 #define TMPFS_MAGIC 0x01021994
71 #endif
72 #if !defined(XFS_SUPER_MAGIC)
73 #define XFS_SUPER_MAGIC 0x58465342
74 #endif
75 #if !defined(EXT4_SUPER_MAGIC)
76 #define EXT4_SUPER_MAGIC 0xEF53
77 #endif
78
79 namespace ROCKSDB_NAMESPACE {
80 #if defined(OS_WIN)
81 static const std::string kSharedLibExt = ".dll";
82 static const char kPathSeparator = ';';
83 #else
84 static const char kPathSeparator = ':';
85 #if defined(OS_MACOSX)
86 static const std::string kSharedLibExt = ".dylib";
87 #else
88 static const std::string kSharedLibExt = ".so";
89 #endif
90 #endif
91
92 namespace {
93
94 ThreadStatusUpdater* CreateThreadStatusUpdater() {
95 return new ThreadStatusUpdater();
96 }
97
98 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
99 class PosixDynamicLibrary : public DynamicLibrary {
100 public:
101 PosixDynamicLibrary(const std::string& name, void* handle)
102 : name_(name), handle_(handle) {}
103 ~PosixDynamicLibrary() override { dlclose(handle_); }
104
105 Status LoadSymbol(const std::string& sym_name, void** func) override {
106 assert(nullptr != func);
107 dlerror(); // Clear any old error
108 *func = dlsym(handle_, sym_name.c_str());
109 if (*func != nullptr) {
110 return Status::OK();
111 } else {
112 char* err = dlerror();
113 return Status::NotFound("Error finding symbol: " + sym_name, err);
114 }
115 }
116
117 const char* Name() const override { return name_.c_str(); }
118
119 private:
120 std::string name_;
121 void* handle_;
122 };
123 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
124
125 class PosixEnv : public CompositeEnvWrapper {
126 public:
127 // This constructor is for constructing non-default Envs, mainly by
128 // NewCompositeEnv(). It allows new instances to share the same
129 // threadpool and other resources as the default Env, while allowing
130 // a non-default FileSystem implementation
131 PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs);
132
133 ~PosixEnv() override {
134 if (this == Env::Default()) {
135 for (const auto tid : threads_to_join_) {
136 pthread_join(tid, nullptr);
137 }
138 for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
139 thread_pools_[pool_id].JoinAllThreads();
140 }
141 // Do not delete the thread_status_updater_ in order to avoid the
142 // free after use when Env::Default() is destructed while some other
143 // child threads are still trying to update thread status. All
144 // PosixEnv instances use the same thread_status_updater_, so never
145 // explicitly delete it.
146 }
147 }
148
149 void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
150 if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
151 fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
152 }
153 }
154
155 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
156 // Loads the named library into the result.
157 // If the input name is empty, the current executable is loaded
158 // On *nix systems, a "lib" prefix is added to the name if one is not supplied
159 // Comparably, the appropriate shared library extension is added to the name
160 // if not supplied. If search_path is not specified, the shared library will
161 // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
162 // specified, the shared library will be searched for in the directories
163 // provided by the search path
164 Status LoadLibrary(const std::string& name, const std::string& path,
165 std::shared_ptr<DynamicLibrary>* result) override {
166 assert(result != nullptr);
167 if (name.empty()) {
168 void* hndl = dlopen(NULL, RTLD_NOW);
169 if (hndl != nullptr) {
170 result->reset(new PosixDynamicLibrary(name, hndl));
171 return Status::OK();
172 }
173 } else {
174 std::string library_name = name;
175 if (library_name.find(kSharedLibExt) == std::string::npos) {
176 library_name = library_name + kSharedLibExt;
177 }
178 #if !defined(OS_WIN)
179 if (library_name.find('/') == std::string::npos &&
180 library_name.compare(0, 3, "lib") != 0) {
181 library_name = "lib" + library_name;
182 }
183 #endif
184 if (path.empty()) {
185 void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
186 if (hndl != nullptr) {
187 result->reset(new PosixDynamicLibrary(library_name, hndl));
188 return Status::OK();
189 }
190 } else {
191 std::string local_path;
192 std::stringstream ss(path);
193 while (getline(ss, local_path, kPathSeparator)) {
194 if (!path.empty()) {
195 std::string full_name = local_path + "/" + library_name;
196 void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
197 if (hndl != nullptr) {
198 result->reset(new PosixDynamicLibrary(full_name, hndl));
199 return Status::OK();
200 }
201 }
202 }
203 }
204 }
205 return Status::IOError(
206 IOErrorMsg("Failed to open shared library: xs", name), dlerror());
207 }
208 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
209
210 void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
211 void* tag = nullptr,
212 void (*unschedFunction)(void* arg) = nullptr) override;
213
214 int UnSchedule(void* arg, Priority pri) override;
215
216 void StartThread(void (*function)(void* arg), void* arg) override;
217
218 void WaitForJoin() override;
219
220 unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
221
222 Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
223 assert(thread_status_updater_);
224 return thread_status_updater_->GetThreadList(thread_list);
225 }
226
227 static uint64_t gettid(pthread_t tid) {
228 uint64_t thread_id = 0;
229 memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
230 return thread_id;
231 }
232
233 static uint64_t gettid() {
234 pthread_t tid = pthread_self();
235 return gettid(tid);
236 }
237
238 uint64_t GetThreadID() const override { return gettid(pthread_self()); }
239
240 uint64_t NowMicros() override {
241 struct timeval tv;
242 gettimeofday(&tv, nullptr);
243 return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
244 }
245
246 uint64_t NowNanos() override {
247 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
248 defined(OS_AIX)
249 struct timespec ts;
250 clock_gettime(CLOCK_MONOTONIC, &ts);
251 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
252 #elif defined(OS_SOLARIS)
253 return gethrtime();
254 #elif defined(__MACH__)
255 clock_serv_t cclock;
256 mach_timespec_t ts;
257 host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
258 clock_get_time(cclock, &ts);
259 mach_port_deallocate(mach_task_self(), cclock);
260 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
261 #else
262 return std::chrono::duration_cast<std::chrono::nanoseconds>(
263 std::chrono::steady_clock::now().time_since_epoch()).count();
264 #endif
265 }
266
267 uint64_t NowCPUNanos() override {
268 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
269 defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
270 struct timespec ts;
271 clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
272 return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
273 #endif
274 return 0;
275 }
276
277 void SleepForMicroseconds(int micros) override { usleep(micros); }
278
279 Status GetHostName(char* name, uint64_t len) override {
280 int ret = gethostname(name, static_cast<size_t>(len));
281 if (ret < 0) {
282 if (errno == EFAULT || errno == EINVAL) {
283 return Status::InvalidArgument(strerror(errno));
284 } else {
285 return IOError("GetHostName", name, errno);
286 }
287 }
288 return Status::OK();
289 }
290
291 Status GetCurrentTime(int64_t* unix_time) override {
292 time_t ret = time(nullptr);
293 if (ret == (time_t) -1) {
294 return IOError("GetCurrentTime", "", errno);
295 }
296 *unix_time = (int64_t) ret;
297 return Status::OK();
298 }
299
300 ThreadStatusUpdater* GetThreadStatusUpdater() const override {
301 return Env::GetThreadStatusUpdater();
302 }
303
304 std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
305
306 // Allow increasing the number of worker threads.
307 void SetBackgroundThreads(int num, Priority pri) override {
308 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
309 thread_pools_[pri].SetBackgroundThreads(num);
310 }
311
312 int GetBackgroundThreads(Priority pri) override {
313 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
314 return thread_pools_[pri].GetBackgroundThreads();
315 }
316
317 Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
318 allow_non_owner_access_ = allow_non_owner_access;
319 return Status::OK();
320 }
321
322 // Allow increasing the number of worker threads.
323 void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
324 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
325 thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
326 }
327
328 void LowerThreadPoolIOPriority(Priority pool) override {
329 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
330 #ifdef OS_LINUX
331 thread_pools_[pool].LowerIOPriority();
332 #else
333 (void)pool;
334 #endif
335 }
336
337 void LowerThreadPoolCPUPriority(Priority pool) override {
338 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
339 thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow);
340 }
341
342 Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
343 assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
344 thread_pools_[pool].LowerCPUPriority(pri);
345 return Status::OK();
346 }
347
348 std::string TimeToString(uint64_t secondsSince1970) override {
349 const time_t seconds = (time_t)secondsSince1970;
350 struct tm t;
351 int maxsize = 64;
352 std::string dummy;
353 dummy.reserve(maxsize);
354 dummy.resize(maxsize);
355 char* p = &dummy[0];
356 localtime_r(&seconds, &t);
357 snprintf(p, maxsize,
358 "%04d/%02d/%02d-%02d:%02d:%02d ",
359 t.tm_year + 1900,
360 t.tm_mon + 1,
361 t.tm_mday,
362 t.tm_hour,
363 t.tm_min,
364 t.tm_sec);
365 return dummy;
366 }
367
368 private:
369 friend Env* Env::Default();
370 // Constructs the default Env, a singleton
371 PosixEnv();
372
373 // The below 4 members are only used by the default PosixEnv instance.
374 // Non-default instances simply maintain references to the backing
375 // members in te default instance
376 std::vector<ThreadPoolImpl> thread_pools_storage_;
377 pthread_mutex_t mu_storage_;
378 std::vector<pthread_t> threads_to_join_storage_;
379 bool allow_non_owner_access_storage_;
380
381 std::vector<ThreadPoolImpl>& thread_pools_;
382 pthread_mutex_t& mu_;
383 std::vector<pthread_t>& threads_to_join_;
384 // If true, allow non owner read access for db files. Otherwise, non-owner
385 // has no access to db files.
386 bool& allow_non_owner_access_;
387 };
388
389 PosixEnv::PosixEnv()
390 : CompositeEnvWrapper(this, FileSystem::Default()),
391 thread_pools_storage_(Priority::TOTAL),
392 allow_non_owner_access_storage_(true),
393 thread_pools_(thread_pools_storage_),
394 mu_(mu_storage_),
395 threads_to_join_(threads_to_join_storage_),
396 allow_non_owner_access_(allow_non_owner_access_storage_) {
397 ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
398 for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
399 thread_pools_[pool_id].SetThreadPriority(
400 static_cast<Env::Priority>(pool_id));
401 // This allows later initializing the thread-local-env of each thread.
402 thread_pools_[pool_id].SetHostEnv(this);
403 }
404 thread_status_updater_ = CreateThreadStatusUpdater();
405 }
406
407 PosixEnv::PosixEnv(const PosixEnv* default_env, std::shared_ptr<FileSystem> fs)
408 : CompositeEnvWrapper(this, fs),
409 thread_pools_(default_env->thread_pools_),
410 mu_(default_env->mu_),
411 threads_to_join_(default_env->threads_to_join_),
412 allow_non_owner_access_(default_env->allow_non_owner_access_) {
413 thread_status_updater_ = default_env->thread_status_updater_;
414 }
415
416 void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
417 void* tag, void (*unschedFunction)(void* arg)) {
418 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
419 thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
420 }
421
422 int PosixEnv::UnSchedule(void* arg, Priority pri) {
423 return thread_pools_[pri].UnSchedule(arg);
424 }
425
426 unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
427 assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
428 return thread_pools_[pri].GetQueueLen();
429 }
430
431 struct StartThreadState {
432 void (*user_function)(void*);
433 void* arg;
434 };
435
436 static void* StartThreadWrapper(void* arg) {
437 StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
438 state->user_function(state->arg);
439 delete state;
440 return nullptr;
441 }
442
443 void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
444 pthread_t t;
445 StartThreadState* state = new StartThreadState;
446 state->user_function = function;
447 state->arg = arg;
448 ThreadPoolImpl::PthreadCall(
449 "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
450 ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
451 threads_to_join_.push_back(t);
452 ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
453 }
454
455 void PosixEnv::WaitForJoin() {
456 for (const auto tid : threads_to_join_) {
457 pthread_join(tid, nullptr);
458 }
459 threads_to_join_.clear();
460 }
461
462 } // namespace
463
464 std::string Env::GenerateUniqueId() {
465 std::string uuid_file = "/proc/sys/kernel/random/uuid";
466 std::shared_ptr<FileSystem> fs = FileSystem::Default();
467
468 Status s = fs->FileExists(uuid_file, IOOptions(), nullptr);
469 if (s.ok()) {
470 std::string uuid;
471 s = ReadFileToString(fs.get(), uuid_file, &uuid);
472 if (s.ok()) {
473 return uuid;
474 }
475 }
476 // Could not read uuid_file - generate uuid using "nanos-random"
477 Random64 r(time(nullptr));
478 uint64_t random_uuid_portion =
479 r.Uniform(std::numeric_limits<uint64_t>::max());
480 uint64_t nanos_uuid_portion = NowNanos();
481 char uuid2[200];
482 snprintf(uuid2,
483 200,
484 "%lx-%lx",
485 (unsigned long)nanos_uuid_portion,
486 (unsigned long)random_uuid_portion);
487 return uuid2;
488 }
489
490 //
491 // Default Posix Env
492 //
493 Env* Env::Default() {
494 // The following function call initializes the singletons of ThreadLocalPtr
495 // right before the static default_env. This guarantees default_env will
496 // always being destructed before the ThreadLocalPtr singletons get
497 // destructed as C++ guarantees that the destructions of static variables
498 // is in the reverse order of their constructions.
499 //
500 // Since static members are destructed in the reverse order
501 // of their construction, having this call here guarantees that
502 // the destructor of static PosixEnv will go first, then the
503 // the singletons of ThreadLocalPtr.
504 ThreadLocalPtr::InitSingletons();
505 CompressionContextCache::InitSingleton();
506 INIT_SYNC_POINT_SINGLETONS();
507 static PosixEnv default_env;
508 return &default_env;
509 }
510
511 std::unique_ptr<Env> NewCompositeEnv(std::shared_ptr<FileSystem> fs) {
512 PosixEnv* default_env = static_cast<PosixEnv*>(Env::Default());
513 return std::unique_ptr<Env>(new PosixEnv(default_env, fs));
514 }
515
516 } // namespace ROCKSDB_NAMESPACE
517
518 #endif