1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors
13 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
19 #if defined(ROCKSDB_IOURING_PRESENT)
29 #if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
30 #include <sys/statfs.h>
32 #include <sys/statvfs.h>
34 #include <sys/types.h>
35 #if defined(ROCKSDB_IOURING_PRESENT)
40 // Get nano time includes
41 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD)
42 #elif defined(__MACH__)
43 #include <Availability.h>
44 #include <mach/clock.h>
45 #include <mach/mach.h>
53 #include "env/composite_env_wrapper.h"
54 #include "env/io_posix.h"
55 #include "logging/posix_logger.h"
56 #include "monitoring/iostats_context_imp.h"
57 #include "monitoring/thread_status_updater.h"
58 #include "port/port.h"
59 #include "rocksdb/options.h"
60 #include "rocksdb/slice.h"
61 #include "test_util/sync_point.h"
62 #include "util/coding.h"
63 #include "util/compression_context_cache.h"
64 #include "util/random.h"
65 #include "util/string_util.h"
66 #include "util/thread_local.h"
67 #include "util/threadpool_imp.h"
69 #if !defined(TMPFS_MAGIC)
70 #define TMPFS_MAGIC 0x01021994
72 #if !defined(XFS_SUPER_MAGIC)
73 #define XFS_SUPER_MAGIC 0x58465342
75 #if !defined(EXT4_SUPER_MAGIC)
76 #define EXT4_SUPER_MAGIC 0xEF53
79 namespace ROCKSDB_NAMESPACE
{
81 static const std::string kSharedLibExt
= ".dll";
82 static const char kPathSeparator
= ';';
84 static const char kPathSeparator
= ':';
85 #if defined(OS_MACOSX)
86 static const std::string kSharedLibExt
= ".dylib";
88 static const std::string kSharedLibExt
= ".so";
94 ThreadStatusUpdater
* CreateThreadStatusUpdater() {
95 return new ThreadStatusUpdater();
98 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
99 class PosixDynamicLibrary
: public DynamicLibrary
{
101 PosixDynamicLibrary(const std::string
& name
, void* handle
)
102 : name_(name
), handle_(handle
) {}
103 ~PosixDynamicLibrary() override
{ dlclose(handle_
); }
105 Status
LoadSymbol(const std::string
& sym_name
, void** func
) override
{
106 assert(nullptr != func
);
107 dlerror(); // Clear any old error
108 *func
= dlsym(handle_
, sym_name
.c_str());
109 if (*func
!= nullptr) {
112 char* err
= dlerror();
113 return Status::NotFound("Error finding symbol: " + sym_name
, err
);
117 const char* Name() const override
{ return name_
.c_str(); }
123 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
125 class PosixEnv
: public CompositeEnvWrapper
{
127 // This constructor is for constructing non-default Envs, mainly by
128 // NewCompositeEnv(). It allows new instances to share the same
129 // threadpool and other resources as the default Env, while allowing
130 // a non-default FileSystem implementation
131 PosixEnv(const PosixEnv
* default_env
, std::shared_ptr
<FileSystem
> fs
);
133 ~PosixEnv() override
{
134 if (this == Env::Default()) {
135 for (const auto tid
: threads_to_join_
) {
136 pthread_join(tid
, nullptr);
138 for (int pool_id
= 0; pool_id
< Env::Priority::TOTAL
; ++pool_id
) {
139 thread_pools_
[pool_id
].JoinAllThreads();
141 // Do not delete the thread_status_updater_ in order to avoid the
142 // free after use when Env::Default() is destructed while some other
143 // child threads are still trying to update thread status. All
144 // PosixEnv instances use the same thread_status_updater_, so never
145 // explicitly delete it.
149 void SetFD_CLOEXEC(int fd
, const EnvOptions
* options
) {
150 if ((options
== nullptr || options
->set_fd_cloexec
) && fd
> 0) {
151 fcntl(fd
, F_SETFD
, fcntl(fd
, F_GETFD
) | FD_CLOEXEC
);
155 #ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
156 // Loads the named library into the result.
157 // If the input name is empty, the current executable is loaded
158 // On *nix systems, a "lib" prefix is added to the name if one is not supplied
159 // Comparably, the appropriate shared library extension is added to the name
160 // if not supplied. If search_path is not specified, the shared library will
161 // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
162 // specified, the shared library will be searched for in the directories
163 // provided by the search path
164 Status
LoadLibrary(const std::string
& name
, const std::string
& path
,
165 std::shared_ptr
<DynamicLibrary
>* result
) override
{
166 assert(result
!= nullptr);
168 void* hndl
= dlopen(NULL
, RTLD_NOW
);
169 if (hndl
!= nullptr) {
170 result
->reset(new PosixDynamicLibrary(name
, hndl
));
174 std::string library_name
= name
;
175 if (library_name
.find(kSharedLibExt
) == std::string::npos
) {
176 library_name
= library_name
+ kSharedLibExt
;
179 if (library_name
.find('/') == std::string::npos
&&
180 library_name
.compare(0, 3, "lib") != 0) {
181 library_name
= "lib" + library_name
;
185 void* hndl
= dlopen(library_name
.c_str(), RTLD_NOW
);
186 if (hndl
!= nullptr) {
187 result
->reset(new PosixDynamicLibrary(library_name
, hndl
));
191 std::string local_path
;
192 std::stringstream
ss(path
);
193 while (getline(ss
, local_path
, kPathSeparator
)) {
195 std::string full_name
= local_path
+ "/" + library_name
;
196 void* hndl
= dlopen(full_name
.c_str(), RTLD_NOW
);
197 if (hndl
!= nullptr) {
198 result
->reset(new PosixDynamicLibrary(full_name
, hndl
));
205 return Status::IOError(
206 IOErrorMsg("Failed to open shared library: xs", name
), dlerror());
208 #endif // !ROCKSDB_NO_DYNAMIC_EXTENSION
210 void Schedule(void (*function
)(void* arg1
), void* arg
, Priority pri
= LOW
,
212 void (*unschedFunction
)(void* arg
) = nullptr) override
;
214 int UnSchedule(void* arg
, Priority pri
) override
;
216 void StartThread(void (*function
)(void* arg
), void* arg
) override
;
218 void WaitForJoin() override
;
220 unsigned int GetThreadPoolQueueLen(Priority pri
= LOW
) const override
;
222 Status
GetThreadList(std::vector
<ThreadStatus
>* thread_list
) override
{
223 assert(thread_status_updater_
);
224 return thread_status_updater_
->GetThreadList(thread_list
);
227 static uint64_t gettid(pthread_t tid
) {
228 uint64_t thread_id
= 0;
229 memcpy(&thread_id
, &tid
, std::min(sizeof(thread_id
), sizeof(tid
)));
233 static uint64_t gettid() {
234 pthread_t tid
= pthread_self();
238 uint64_t GetThreadID() const override
{ return gettid(pthread_self()); }
240 uint64_t NowMicros() override
{
242 gettimeofday(&tv
, nullptr);
243 return static_cast<uint64_t>(tv
.tv_sec
) * 1000000 + tv
.tv_usec
;
246 uint64_t NowNanos() override
{
247 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
250 clock_gettime(CLOCK_MONOTONIC
, &ts
);
251 return static_cast<uint64_t>(ts
.tv_sec
) * 1000000000 + ts
.tv_nsec
;
252 #elif defined(OS_SOLARIS)
254 #elif defined(__MACH__)
257 host_get_clock_service(mach_host_self(), CALENDAR_CLOCK
, &cclock
);
258 clock_get_time(cclock
, &ts
);
259 mach_port_deallocate(mach_task_self(), cclock
);
260 return static_cast<uint64_t>(ts
.tv_sec
) * 1000000000 + ts
.tv_nsec
;
262 return std::chrono::duration_cast
<std::chrono::nanoseconds
>(
263 std::chrono::steady_clock::now().time_since_epoch()).count();
267 uint64_t NowCPUNanos() override
{
268 #if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
269 defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
271 clock_gettime(CLOCK_THREAD_CPUTIME_ID
, &ts
);
272 return static_cast<uint64_t>(ts
.tv_sec
) * 1000000000 + ts
.tv_nsec
;
277 void SleepForMicroseconds(int micros
) override
{ usleep(micros
); }
279 Status
GetHostName(char* name
, uint64_t len
) override
{
280 int ret
= gethostname(name
, static_cast<size_t>(len
));
282 if (errno
== EFAULT
|| errno
== EINVAL
) {
283 return Status::InvalidArgument(strerror(errno
));
285 return IOError("GetHostName", name
, errno
);
291 Status
GetCurrentTime(int64_t* unix_time
) override
{
292 time_t ret
= time(nullptr);
293 if (ret
== (time_t) -1) {
294 return IOError("GetCurrentTime", "", errno
);
296 *unix_time
= (int64_t) ret
;
300 ThreadStatusUpdater
* GetThreadStatusUpdater() const override
{
301 return Env::GetThreadStatusUpdater();
304 std::string
GenerateUniqueId() override
{ return Env::GenerateUniqueId(); }
306 // Allow increasing the number of worker threads.
307 void SetBackgroundThreads(int num
, Priority pri
) override
{
308 assert(pri
>= Priority::BOTTOM
&& pri
<= Priority::HIGH
);
309 thread_pools_
[pri
].SetBackgroundThreads(num
);
312 int GetBackgroundThreads(Priority pri
) override
{
313 assert(pri
>= Priority::BOTTOM
&& pri
<= Priority::HIGH
);
314 return thread_pools_
[pri
].GetBackgroundThreads();
317 Status
SetAllowNonOwnerAccess(bool allow_non_owner_access
) override
{
318 allow_non_owner_access_
= allow_non_owner_access
;
322 // Allow increasing the number of worker threads.
323 void IncBackgroundThreadsIfNeeded(int num
, Priority pri
) override
{
324 assert(pri
>= Priority::BOTTOM
&& pri
<= Priority::HIGH
);
325 thread_pools_
[pri
].IncBackgroundThreadsIfNeeded(num
);
328 void LowerThreadPoolIOPriority(Priority pool
) override
{
329 assert(pool
>= Priority::BOTTOM
&& pool
<= Priority::HIGH
);
331 thread_pools_
[pool
].LowerIOPriority();
337 void LowerThreadPoolCPUPriority(Priority pool
) override
{
338 assert(pool
>= Priority::BOTTOM
&& pool
<= Priority::HIGH
);
339 thread_pools_
[pool
].LowerCPUPriority(CpuPriority::kLow
);
342 Status
LowerThreadPoolCPUPriority(Priority pool
, CpuPriority pri
) override
{
343 assert(pool
>= Priority::BOTTOM
&& pool
<= Priority::HIGH
);
344 thread_pools_
[pool
].LowerCPUPriority(pri
);
348 std::string
TimeToString(uint64_t secondsSince1970
) override
{
349 const time_t seconds
= (time_t)secondsSince1970
;
353 dummy
.reserve(maxsize
);
354 dummy
.resize(maxsize
);
356 localtime_r(&seconds
, &t
);
358 "%04d/%02d/%02d-%02d:%02d:%02d ",
369 friend Env
* Env::Default();
370 // Constructs the default Env, a singleton
373 // The below 4 members are only used by the default PosixEnv instance.
374 // Non-default instances simply maintain references to the backing
375 // members in te default instance
376 std::vector
<ThreadPoolImpl
> thread_pools_storage_
;
377 pthread_mutex_t mu_storage_
;
378 std::vector
<pthread_t
> threads_to_join_storage_
;
379 bool allow_non_owner_access_storage_
;
381 std::vector
<ThreadPoolImpl
>& thread_pools_
;
382 pthread_mutex_t
& mu_
;
383 std::vector
<pthread_t
>& threads_to_join_
;
384 // If true, allow non owner read access for db files. Otherwise, non-owner
385 // has no access to db files.
386 bool& allow_non_owner_access_
;
390 : CompositeEnvWrapper(this, FileSystem::Default()),
391 thread_pools_storage_(Priority::TOTAL
),
392 allow_non_owner_access_storage_(true),
393 thread_pools_(thread_pools_storage_
),
395 threads_to_join_(threads_to_join_storage_
),
396 allow_non_owner_access_(allow_non_owner_access_storage_
) {
397 ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_
, nullptr));
398 for (int pool_id
= 0; pool_id
< Env::Priority::TOTAL
; ++pool_id
) {
399 thread_pools_
[pool_id
].SetThreadPriority(
400 static_cast<Env::Priority
>(pool_id
));
401 // This allows later initializing the thread-local-env of each thread.
402 thread_pools_
[pool_id
].SetHostEnv(this);
404 thread_status_updater_
= CreateThreadStatusUpdater();
407 PosixEnv::PosixEnv(const PosixEnv
* default_env
, std::shared_ptr
<FileSystem
> fs
)
408 : CompositeEnvWrapper(this, fs
),
409 thread_pools_(default_env
->thread_pools_
),
410 mu_(default_env
->mu_
),
411 threads_to_join_(default_env
->threads_to_join_
),
412 allow_non_owner_access_(default_env
->allow_non_owner_access_
) {
413 thread_status_updater_
= default_env
->thread_status_updater_
;
416 void PosixEnv::Schedule(void (*function
)(void* arg1
), void* arg
, Priority pri
,
417 void* tag
, void (*unschedFunction
)(void* arg
)) {
418 assert(pri
>= Priority::BOTTOM
&& pri
<= Priority::HIGH
);
419 thread_pools_
[pri
].Schedule(function
, arg
, tag
, unschedFunction
);
422 int PosixEnv::UnSchedule(void* arg
, Priority pri
) {
423 return thread_pools_
[pri
].UnSchedule(arg
);
426 unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri
) const {
427 assert(pri
>= Priority::BOTTOM
&& pri
<= Priority::HIGH
);
428 return thread_pools_
[pri
].GetQueueLen();
431 struct StartThreadState
{
432 void (*user_function
)(void*);
436 static void* StartThreadWrapper(void* arg
) {
437 StartThreadState
* state
= reinterpret_cast<StartThreadState
*>(arg
);
438 state
->user_function(state
->arg
);
443 void PosixEnv::StartThread(void (*function
)(void* arg
), void* arg
) {
445 StartThreadState
* state
= new StartThreadState
;
446 state
->user_function
= function
;
448 ThreadPoolImpl::PthreadCall(
449 "start thread", pthread_create(&t
, nullptr, &StartThreadWrapper
, state
));
450 ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_
));
451 threads_to_join_
.push_back(t
);
452 ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_
));
455 void PosixEnv::WaitForJoin() {
456 for (const auto tid
: threads_to_join_
) {
457 pthread_join(tid
, nullptr);
459 threads_to_join_
.clear();
464 std::string
Env::GenerateUniqueId() {
465 std::string uuid_file
= "/proc/sys/kernel/random/uuid";
466 std::shared_ptr
<FileSystem
> fs
= FileSystem::Default();
468 Status s
= fs
->FileExists(uuid_file
, IOOptions(), nullptr);
471 s
= ReadFileToString(fs
.get(), uuid_file
, &uuid
);
476 // Could not read uuid_file - generate uuid using "nanos-random"
477 Random64
r(time(nullptr));
478 uint64_t random_uuid_portion
=
479 r
.Uniform(std::numeric_limits
<uint64_t>::max());
480 uint64_t nanos_uuid_portion
= NowNanos();
485 (unsigned long)nanos_uuid_portion
,
486 (unsigned long)random_uuid_portion
);
493 Env
* Env::Default() {
494 // The following function call initializes the singletons of ThreadLocalPtr
495 // right before the static default_env. This guarantees default_env will
496 // always being destructed before the ThreadLocalPtr singletons get
497 // destructed as C++ guarantees that the destructions of static variables
498 // is in the reverse order of their constructions.
500 // Since static members are destructed in the reverse order
501 // of their construction, having this call here guarantees that
502 // the destructor of static PosixEnv will go first, then the
503 // the singletons of ThreadLocalPtr.
504 ThreadLocalPtr::InitSingletons();
505 CompressionContextCache::InitSingleton();
506 INIT_SYNC_POINT_SINGLETONS();
507 static PosixEnv default_env
;
511 std::unique_ptr
<Env
> NewCompositeEnv(std::shared_ptr
<FileSystem
> fs
) {
512 PosixEnv
* default_env
= static_cast<PosixEnv
*>(Env::Default());
513 return std::unique_ptr
<Env
>(new PosixEnv(default_env
, fs
));
516 } // namespace ROCKSDB_NAMESPACE