]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/env_librados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
4 #include "rocksdb/utilities/env_librados.h"
5 #include "util/random.h"
9 namespace ROCKSDB_NAMESPACE
{
14 #include <sys/syscall.h>
16 #define LOG_DEBUG(...) do{\
17 printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
21 #define LOG_DEBUG(...)
25 const char *default_db_name
= "default_envlibrados_db";
26 const char *default_pool_name
= "default_envlibrados_pool";
27 const char *default_config_path
= "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file
28 // maximum dir/file that can store in the fs
29 const int MAX_ITEMS_IN_FS
= 1 << 30;
31 const std::string ROOT_DIR_KEY
= "/";
32 const std::string DIR_ID_VALUE
= "<DIR>";
35 * @brief convert error code to status
36 * @details Convert internal linux error code to Status
38 * @param r [description]
39 * @return [description]
41 Status
err_to_status(int r
)
47 return Status::IOError();
50 return Status::NotFound(Status::kNone
);
52 return Status::InvalidArgument(Status::kNone
);
54 return Status::IOError(Status::kNone
);
57 assert(0 == "unrecognized error code");
58 return Status::NotSupported(Status::kNone
);
63 * @brief split file path into dir path and file name
65 * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
67 * b/c => dir '/b', file 'c'
68 * /a/b/c => dir '/b', file 'c'
70 * @param fn [description]
71 * @param dir [description]
72 * @param file [description]
74 void split(const std::string
&fn
, std::string
*dir
, std::string
*file
) {
75 LOG_DEBUG("[IN]%s\n", fn
.c_str());
76 int pos
= fn
.size() - 1;
77 while ('/' == fn
[pos
]) --pos
;
78 size_t fstart
= fn
.rfind('/', pos
);
79 *file
= fn
.substr(fstart
+ 1, pos
- fstart
);
82 while (pos
>= 0 && '/' == fn
[pos
]) --pos
;
87 size_t dstart
= fn
.rfind('/', pos
);
88 *dir
= fn
.substr(dstart
+ 1, pos
- dstart
);
89 *dir
= std::string("/") + *dir
;
92 LOG_DEBUG("[OUT]%s | %s\n", dir
->c_str(), file
->c_str());
95 // A file abstraction for reading sequentially through a file
96 class LibradosSequentialFile
: public SequentialFile
{
97 librados::IoCtx
* _io_ctx
;
102 LibradosSequentialFile(librados::IoCtx
* io_ctx
, std::string fid
, std::string hint
):
103 _io_ctx(io_ctx
), _fid(fid
), _hint(hint
), _offset(0) {}
105 ~LibradosSequentialFile() {}
110 * Read up to "n" bytes from the file. "scratch[0..n-1]" may be
111 * written by this routine. Sets "*result" to the data that was
112 * read (including if fewer than "n" bytes were successfully read).
113 * May set "*result" to point at data in "scratch[0..n-1]", so
114 * "scratch[0..n-1]" must be live when "*result" is used.
115 * If an error was encountered, returns a non-OK status.
117 * REQUIRES: External synchronization
119 * @param n [description]
120 * @param result [description]
121 * @param scratch [description]
122 * @return [description]
124 Status
Read(size_t n
, Slice
* result
, char* scratch
) {
125 LOG_DEBUG("[IN]%i\n", (int)n
);
126 librados::bufferlist buffer
;
128 int r
= _io_ctx
->read(_fid
, buffer
, n
, _offset
);
130 buffer
.begin().copy(r
, scratch
);
131 *result
= Slice(scratch
, r
);
135 s
= err_to_status(r
);
136 if (s
== Status::IOError()) {
141 LOG_DEBUG("[OUT]%s, %i, %s\n", s
.ToString().c_str(), (int)r
, buffer
.c_str());
146 * @brief skip "n" bytes from the file
148 * Skip "n" bytes from the file. This is guaranteed to be no
149 * slower that reading the same data, but may be faster.
151 * If end of file is reached, skipping will stop at the end of the
152 * file, and Skip will return OK.
154 * REQUIRES: External synchronization
156 * @param n [description]
157 * @return [description]
159 Status
Skip(uint64_t n
) {
167 * rocksdb has it's own caching capabilities that we should be able to use,
168 * without relying on a cache here. This can safely be a no-op.
170 * @param offset [description]
171 * @param length [description]
173 * @return [description]
175 Status
InvalidateCache(size_t offset
, size_t length
) {
180 // A file abstraction for randomly reading the contents of a file.
181 class LibradosRandomAccessFile
: public RandomAccessFile
{
182 librados::IoCtx
* _io_ctx
;
186 LibradosRandomAccessFile(librados::IoCtx
* io_ctx
, std::string fid
, std::string hint
):
187 _io_ctx(io_ctx
), _fid(fid
), _hint(hint
) {}
189 ~LibradosRandomAccessFile() {}
193 * @details similar to LibradosSequentialFile::Read
195 * @param offset [description]
196 * @param n [description]
197 * @param result [description]
198 * @param scratch [description]
199 * @return [description]
201 Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
202 char* scratch
) const {
203 LOG_DEBUG("[IN]%i\n", (int)n
);
204 librados::bufferlist buffer
;
206 int r
= _io_ctx
->read(_fid
, buffer
, n
, offset
);
208 buffer
.begin().copy(r
, scratch
);
209 *result
= Slice(scratch
, r
);
212 s
= err_to_status(r
);
213 if (s
== Status::IOError()) {
218 LOG_DEBUG("[OUT]%s, %i, %s\n", s
.ToString().c_str(), (int)r
, buffer
.c_str());
223 * @brief [brief description]
224 * @details Get unique id for each file and guarantee this id is different for each file
226 * @param id [description]
227 * @param max_size max size of id, it shoud be larger than 16
229 * @return [description]
231 size_t GetUniqueId(char* id
, size_t max_size
) const {
232 // All fid has the same db_id prefix, so we need to ignore db_id prefix
233 size_t s
= std::min(max_size
, _fid
.size());
234 strncpy(id
, _fid
.c_str() + (_fid
.size() - s
), s
);
239 //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
240 void Hint(AccessPattern pattern
) {
246 * @details [long description]
248 * @param offset [description]
249 * @param length [description]
251 * @return [description]
253 Status
InvalidateCache(size_t offset
, size_t length
) {
259 // A file abstraction for sequential writing. The implementation
260 // must provide buffering since callers may append small fragments
261 // at a time to the file.
262 class LibradosWritableFile
: public WritableFile
{
263 librados::IoCtx
* _io_ctx
;
266 const EnvLibrados
* const _env
;
268 std::mutex _mutex
; // used to protect modification of all following variables
269 librados::bufferlist _buffer
; // write buffer
270 uint64_t _buffer_size
; // write buffer size
271 uint64_t _file_size
; // this file size doesn't include buffer size
274 * @brief assuming caller holds lock
275 * @details [long description]
276 * @return [description]
279 // 1. sync append data to RADOS
280 int r
= _io_ctx
->append(_fid
, _buffer
, _buffer_size
);
283 // 2. update local variables
286 _file_size
+= _buffer_size
;
294 LibradosWritableFile(librados::IoCtx
* io_ctx
, std::string fid
,
295 std::string hint
, const EnvLibrados
* const env
,
296 const EnvOptions
& options
)
297 : WritableFile(options
),
305 int ret
= _io_ctx
->stat(_fid
, &_file_size
, nullptr);
313 ~LibradosWritableFile() {
314 // sync before closeing writable file
319 * @brief append data to file
321 * Append will save all written data in buffer util buffer size
322 * reaches buffer max size. Then, it will write buffer into rados
324 * @param data [description]
325 * @return [description]
327 Status
Append(const Slice
& data
) {
329 LOG_DEBUG("[IN] %i | %s\n", (int)data
.size(), data
.data());
332 std::lock_guard
<std::mutex
> lock(_mutex
);
333 _buffer
.append(data
.data(), data
.size());
334 _buffer_size
+= data
.size();
336 if (_buffer_size
> _env
->_write_buffer_size
) {
340 LOG_DEBUG("[OUT] %i\n", r
);
341 return err_to_status(r
);
345 * @brief not supported
346 * @details [long description]
347 * @return [description]
349 Status
PositionedAppend(
350 const Slice
& /* data */,
351 uint64_t /* offset */) {
352 return Status::NotSupported();
356 * @brief truncate file to assigned size
357 * @details [long description]
359 * @param size [description]
360 * @return [description]
362 Status
Truncate(uint64_t size
) {
363 LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size
, (long long)_file_size
, (long long)_buffer_size
);
366 std::lock_guard
<std::mutex
> lock(_mutex
);
367 if (_file_size
> size
) {
368 r
= _io_ctx
->trunc(_fid
, size
);
375 } else if (_file_size
== size
) {
379 librados::bufferlist tmp
;
381 _buffer
.substr_of(tmp
, 0, size
- _file_size
);
382 _buffer_size
= size
- _file_size
;
385 LOG_DEBUG("[OUT] %i\n", r
);
386 return err_to_status(r
);
391 * @details [long description]
392 * @return [description]
395 LOG_DEBUG("%s | %lld | %lld\n", _hint
.c_str(), (long long)_buffer_size
, (long long)_file_size
);
401 * @details initiate an aio write and not wait
403 * @return [description]
406 librados::AioCompletion
*write_completion
= librados::Rados::aio_create_completion();
409 std::lock_guard
<std::mutex
> lock(_mutex
);
410 r
= _io_ctx
->aio_append(_fid
, write_completion
, _buffer
, _buffer_size
);
413 _file_size
+= _buffer_size
;
418 write_completion
->release();
420 return err_to_status(r
);
424 * @brief write buffer data to rados
425 * @details initiate an aio write and wait for result
426 * @return [description]
428 Status
Sync() { // sync data
431 std::lock_guard
<std::mutex
> lock(_mutex
);
432 if (_buffer_size
> 0) {
436 return err_to_status(r
);
440 * @brief [brief description]
441 * @details [long description]
442 * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
444 bool IsSyncThreadSafe() const {
449 * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
450 * @details [long description]
451 * @return [description]
453 bool use_direct_io() const {
458 * @brief Get file size
460 * This API will use cached file_size.
461 * @return [description]
463 uint64_t GetFileSize() {
464 LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size
, (long long)_file_size
);
466 std::lock_guard
<std::mutex
> lock(_mutex
);
467 int file_size
= _file_size
+ _buffer_size
;
473 * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
474 * @details [long description]
476 * @param id [description]
477 * @param max_size [description]
479 * @return [description]
481 size_t GetUniqueId(char* id
, size_t max_size
) const {
482 // All fid has the same db_id prefix, so we need to ignore db_id prefix
483 size_t s
= std::min(max_size
, _fid
.size());
484 strncpy(id
, _fid
.c_str() + (_fid
.size() - s
), s
);
491 * @details [long description]
493 * @param offset [description]
494 * @param length [description]
496 * @return [description]
498 Status
InvalidateCache(size_t offset
, size_t length
) {
502 using WritableFile::RangeSync
;
504 * @brief No RangeSync support, just call Sync()
505 * @details [long description]
507 * @param offset [description]
508 * @param nbytes [description]
510 * @return [description]
512 Status
RangeSync(off_t offset
, off_t nbytes
) {
517 using WritableFile::Allocate
;
520 * @details [long description]
522 * @param offset [description]
523 * @param len [description]
525 * @return [description]
527 Status
Allocate(off_t offset
, off_t len
) {
533 // Directory object represents collection of files and implements
534 // filesystem operations that can be executed on directories.
535 class LibradosDirectory
: public Directory
{
536 librados::IoCtx
* _io_ctx
;
539 explicit LibradosDirectory(librados::IoCtx
* io_ctx
, std::string fid
):
540 _io_ctx(io_ctx
), _fid(fid
) {}
542 // Fsync directory. Can be called concurrently from multiple threads.
548 // Identifies a locked file.
549 // This is exclusive lock and can't nested lock by same thread
550 class LibradosFileLock
: public FileLock
{
551 librados::IoCtx
* _io_ctx
;
552 const std::string _obj_name
;
553 const std::string _lock_name
;
554 const std::string _cookie
;
558 librados::IoCtx
* io_ctx
,
559 const std::string obj_name
):
562 _lock_name("lock_name"),
565 // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
566 while (!_io_ctx
->lock_exclusive(
570 "description", nullptr, 0));
573 ~LibradosFileLock() {
574 _io_ctx
->unlock(_obj_name
, _lock_name
, _cookie
);
579 // --------------------
580 // --- EnvLibrados ----
581 // --------------------
583 * @brief EnvLibrados ctor
584 * @details [long description]
586 * @param db_name unique database name
587 * @param config_path the configure file path for rados
589 EnvLibrados::EnvLibrados(const std::string
& db_name
,
590 const std::string
& config_path
,
591 const std::string
& db_pool
)
592 : EnvLibrados("client.admin",
603 * @brief EnvLibrados ctor
604 * @details [long description]
606 * @param client_name first 3 parameters is for RADOS client init
607 * @param cluster_name
609 * @param db_name unique database name, used as db_id key
610 * @param config_path the configure file path for rados
611 * @param db_pool the pool for db data
612 * @param wal_pool the pool for WAL data
613 * @param write_buffer_size WritableFile buffer max size
615 EnvLibrados::EnvLibrados(const std::string
& client_name
,
616 const std::string
& cluster_name
,
617 const uint64_t flags
,
618 const std::string
& db_name
,
619 const std::string
& config_path
,
620 const std::string
& db_pool
,
621 const std::string
& wal_dir
,
622 const std::string
& wal_pool
,
623 const uint64_t write_buffer_size
)
624 : EnvWrapper(Env::Default()),
625 _client_name(client_name
),
626 _cluster_name(cluster_name
),
629 _config_path(config_path
),
630 _db_pool_name(db_pool
),
632 _wal_pool_name(wal_pool
),
633 _write_buffer_size(write_buffer_size
) {
636 // 1. create a Rados object and initialize it
637 ret
= _rados
.init2(_client_name
.c_str(), _cluster_name
.c_str(), _flags
); // just use the client.admin keyring
638 if (ret
< 0) { // let's handle any error that might have come back
639 std::cerr
<< "couldn't initialize rados! error " << ret
<< std::endl
;
644 // 2. read configure file
645 ret
= _rados
.conf_read_file(_config_path
.c_str());
647 // This could fail if the config file is malformed, but it'd be hard.
648 std::cerr
<< "failed to parse config file " << _config_path
649 << "! error" << ret
<< std::endl
;
654 // 3. we actually connect to the cluster
655 ret
= _rados
.connect();
657 std::cerr
<< "couldn't connect to cluster! error " << ret
<< std::endl
;
662 // 4. create db_pool if not exist
663 ret
= _rados
.pool_create(_db_pool_name
.c_str());
664 if (ret
< 0 && ret
!= -EEXIST
&& ret
!= -EPERM
) {
665 std::cerr
<< "couldn't create pool! error " << ret
<< std::endl
;
669 // 5. create db_pool_ioctx
670 ret
= _rados
.ioctx_create(_db_pool_name
.c_str(), _db_pool_ioctx
);
672 std::cerr
<< "couldn't set up ioctx! error " << ret
<< std::endl
;
677 // 6. create wal_pool if not exist
678 ret
= _rados
.pool_create(_wal_pool_name
.c_str());
679 if (ret
< 0 && ret
!= -EEXIST
&& ret
!= -EPERM
) {
680 std::cerr
<< "couldn't create pool! error " << ret
<< std::endl
;
684 // 7. create wal_pool_ioctx
685 ret
= _rados
.ioctx_create(_wal_pool_name
.c_str(), _wal_pool_ioctx
);
687 std::cerr
<< "couldn't set up ioctx! error " << ret
<< std::endl
;
693 _AddFid(ROOT_DIR_KEY
, DIR_ID_VALUE
);
696 LOG_DEBUG("rados connect result code : %i\n", ret
);
699 /****************************************************
700 private functions to handle fid operation.
701 Dir also have fid, but the value is DIR_ID_VALUE
702 ****************************************************/
705 * @brief generate a new fid
706 * @details [long description]
707 * @return [description]
709 std::string
EnvLibrados::_CreateFid() {
710 return _db_name
+ "." + GenerateUniqueId();
715 * @details [long description]
717 * @param fname [description]
718 * @param fid [description]
724 Status
EnvLibrados::_GetFid(
725 const std::string
&fname
,
727 std::set
<std::string
> keys
;
728 std::map
<std::string
, librados::bufferlist
> kvs
;
730 int r
= _db_pool_ioctx
.omap_get_vals_by_keys(_db_name
, keys
, &kvs
);
732 if (0 == r
&& 0 == kvs
.size()) {
733 return Status::NotFound();
734 } else if (0 == r
&& 0 != kvs
.size()) {
735 fid
.assign(kvs
[fname
].c_str(), kvs
[fname
].length());
738 return err_to_status(r
);
744 * @details Only modify object in rados once,
745 * so this rename operation is atomic in term of rados
747 * @param old_fname [description]
748 * @param new_fname [description]
750 * @return [description]
752 Status
EnvLibrados::_RenameFid(const std::string
& old_fname
,
753 const std::string
& new_fname
) {
755 Status s
= _GetFid(old_fname
, fid
);
757 if (Status::OK() != s
) {
761 librados::bufferlist bl
;
762 std::set
<std::string
> keys
;
763 std::map
<std::string
, librados::bufferlist
> kvs
;
764 librados::ObjectWriteOperation o
;
766 keys
.insert(old_fname
);
768 o
.omap_rm_keys(keys
);
770 int r
= _db_pool_ioctx
.operate(_db_name
, &o
);
771 return err_to_status(r
);
775 * @brief add <file path, fid> to metadata object. It may overwrite exist key.
776 * @details [long description]
778 * @param fname [description]
779 * @param fid [description]
781 * @return [description]
783 Status
EnvLibrados::_AddFid(
784 const std::string
& fname
,
785 const std::string
& fid
) {
786 std::map
<std::string
, librados::bufferlist
> kvs
;
787 librados::bufferlist value
;
790 int r
= _db_pool_ioctx
.omap_set(_db_name
, kvs
);
791 return err_to_status(r
);
795 * @brief return subfile names of dir.
797 * RocksDB has a 2-level structure, so all keys
798 * that have dir as prefix are subfiles of dir.
799 * So we can just return these files' name.
801 * @param dir [description]
802 * @param result [description]
804 * @return [description]
806 Status
EnvLibrados::_GetSubFnames(
807 const std::string
& dir
,
808 std::vector
<std::string
> * result
810 std::string
start_after(dir
);
811 std::string
filter_prefix(dir
);
812 std::map
<std::string
, librados::bufferlist
> kvs
;
813 _db_pool_ioctx
.omap_get_vals(_db_name
,
814 start_after
, filter_prefix
,
815 MAX_ITEMS_IN_FS
, &kvs
);
818 for (auto i
= kvs
.begin(); i
!= kvs
.end(); i
++) {
819 result
->push_back(i
->first
.substr(dir
.size() + 1));
825 * @brief delete key fname from metadata object
826 * @details [long description]
828 * @param fname [description]
829 * @return [description]
831 Status
EnvLibrados::_DelFid(
832 const std::string
& fname
) {
833 std::set
<std::string
> keys
;
835 int r
= _db_pool_ioctx
.omap_rm_keys(_db_name
, keys
);
836 return err_to_status(r
);
840 * @brief get match IoCtx from _prefix_pool_map
841 * @details [long description]
843 * @param prefix [description]
844 * @return [description]
847 librados::IoCtx
* EnvLibrados::_GetIoctx(const std::string
& fpath
) {
848 auto is_prefix
= [](const std::string
& s1
, const std::string
& s2
) {
849 auto it1
= s1
.begin(), it2
= s2
.begin();
850 while (it1
!= s1
.end() && it2
!= s2
.end() && *it1
== *it2
) ++it1
, ++it2
;
851 return it1
== s1
.end();
854 if (is_prefix(_wal_dir
, fpath
)) {
855 return &_wal_pool_ioctx
;
857 return &_db_pool_ioctx
;
861 /************************************************************
863 ************************************************************/
865 * @brief generate unique id
866 * @details Combine system time and random number.
867 * @return [description]
869 std::string
EnvLibrados::GenerateUniqueId() {
870 Random64
r(time(nullptr));
871 uint64_t random_uuid_portion
=
872 r
.Uniform(std::numeric_limits
<uint64_t>::max());
873 uint64_t nanos_uuid_portion
= NowNanos();
878 (unsigned long)nanos_uuid_portion
,
879 (unsigned long)random_uuid_portion
);
884 * @brief create a new sequential read file handler
885 * @details it will check the existence of fname
887 * @param fname [description]
888 * @param result [description]
889 * @param options [description]
890 * @return [description]
892 Status
EnvLibrados::NewSequentialFile(
893 const std::string
& fname
,
894 std::unique_ptr
<SequentialFile
>* result
,
895 const EnvOptions
& options
)
897 LOG_DEBUG("[IN]%s\n", fname
.c_str());
898 std::string dir
, file
, fid
;
899 split(fname
, &dir
, &file
);
901 std::string fpath
= dir
+ "/" + file
;
903 s
= _GetFid(dir
, fid
);
905 if (!s
.ok() || fid
!= DIR_ID_VALUE
) {
906 if (fid
!= DIR_ID_VALUE
) s
= Status::IOError();
910 s
= _GetFid(fpath
, fid
);
912 if (Status::NotFound() == s
) {
913 s
= Status::IOError();
918 result
->reset(new LibradosSequentialFile(_GetIoctx(fpath
), fid
, fpath
));
922 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
927 * @brief create a new random access file handler
928 * @details it will check the existence of fname
930 * @param fname [description]
931 * @param result [description]
932 * @param options [description]
933 * @return [description]
935 Status
EnvLibrados::NewRandomAccessFile(
936 const std::string
& fname
,
937 std::unique_ptr
<RandomAccessFile
>* result
,
938 const EnvOptions
& options
)
940 LOG_DEBUG("[IN]%s\n", fname
.c_str());
941 std::string dir
, file
, fid
;
942 split(fname
, &dir
, &file
);
944 std::string fpath
= dir
+ "/" + file
;
946 s
= _GetFid(dir
, fid
);
948 if (!s
.ok() || fid
!= DIR_ID_VALUE
) {
949 s
= Status::IOError();
953 s
= _GetFid(fpath
, fid
);
955 if (Status::NotFound() == s
) {
956 s
= Status::IOError();
961 result
->reset(new LibradosRandomAccessFile(_GetIoctx(fpath
), fid
, fpath
));
965 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
970 * @brief create a new write file handler
971 * @details it will check the existence of fname
973 * @param fname [description]
974 * @param result [description]
975 * @param options [description]
976 * @return [description]
978 Status
EnvLibrados::NewWritableFile(
979 const std::string
& fname
,
980 std::unique_ptr
<WritableFile
>* result
,
981 const EnvOptions
& options
)
983 LOG_DEBUG("[IN]%s\n", fname
.c_str());
984 std::string dir
, file
, fid
;
985 split(fname
, &dir
, &file
);
987 std::string fpath
= dir
+ "/" + file
;
990 // 1. check if dir exist
991 s
= _GetFid(dir
, fid
);
996 if (fid
!= DIR_ID_VALUE
) {
997 s
= Status::IOError();
1001 // 2. check if file exist.
1002 // 2.1 exist, use it
1003 // 2.2 not exist, create it
1004 s
= _GetFid(fpath
, fid
);
1005 if (Status::NotFound() == s
) {
1007 _AddFid(fpath
, fid
);
1011 new LibradosWritableFile(_GetIoctx(fpath
), fid
, fpath
, this, options
));
1015 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1020 * @brief reuse write file handler
1022 * This function will rename old_fname to new_fname,
1023 * then return the handler of new_fname
1025 * @param new_fname [description]
1026 * @param old_fname [description]
1027 * @param result [description]
1028 * @param options [description]
1029 * @return [description]
1031 Status
EnvLibrados::ReuseWritableFile(
1032 const std::string
& new_fname
,
1033 const std::string
& old_fname
,
1034 std::unique_ptr
<WritableFile
>* result
,
1035 const EnvOptions
& options
)
1037 LOG_DEBUG("[IN]%s => %s\n", old_fname
.c_str(), new_fname
.c_str());
1038 std::string src_fid
, tmp_fid
, src_dir
, src_file
, dst_dir
, dst_file
;
1039 split(old_fname
, &src_dir
, &src_file
);
1040 split(new_fname
, &dst_dir
, &dst_file
);
1042 std::string src_fpath
= src_dir
+ "/" + src_file
;
1043 std::string dst_fpath
= dst_dir
+ "/" + dst_file
;
1044 Status r
= Status::OK();
1046 r
= _RenameFid(src_fpath
,
1052 result
->reset(new LibradosWritableFile(_GetIoctx(dst_fpath
), src_fid
,
1053 dst_fpath
, this, options
));
1056 LOG_DEBUG("[OUT]%s\n", r
.ToString().c_str());
1061 * @brief create a new directory handler
1062 * @details [long description]
1064 * @param name [description]
1065 * @param result [description]
1067 * @return [description]
1069 Status
EnvLibrados::NewDirectory(
1070 const std::string
& name
,
1071 std::unique_ptr
<Directory
>* result
)
1073 LOG_DEBUG("[IN]%s\n", name
.c_str());
1074 std::string fid
, dir
, file
;
1075 /* just want to get dir name */
1076 split(name
+ "/tmp", &dir
, &file
);
1080 s
= _GetFid(dir
, fid
);
1082 if (!s
.ok() || DIR_ID_VALUE
!= fid
) {
1083 s
= Status::IOError(name
, strerror(-ENOENT
));
1087 if (Status::NotFound() == s
) {
1088 s
= _AddFid(dir
, DIR_ID_VALUE
);
1090 } else if (!s
.ok()) {
1094 result
->reset(new LibradosDirectory(_GetIoctx(dir
), dir
));
1098 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1103 * @brief check if fname is exist
1104 * @details [long description]
1106 * @param fname [description]
1107 * @return [description]
1109 Status
EnvLibrados::FileExists(const std::string
& fname
)
1111 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1112 std::string fid
, dir
, file
;
1113 split(fname
, &dir
, &file
);
1114 Status s
= _GetFid(dir
+ "/" + file
, fid
);
1116 if (s
.ok() && fid
!= DIR_ID_VALUE
) {
1120 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1125 * @brief get subfile name of dir_in
1126 * @details [long description]
1128 * @param dir_in [description]
1129 * @param result [description]
1131 * @return [description]
1133 Status
EnvLibrados::GetChildren(
1134 const std::string
& dir_in
,
1135 std::vector
<std::string
>* result
)
1137 LOG_DEBUG("[IN]%s\n", dir_in
.c_str());
1138 std::string fid
, dir
, file
;
1139 split(dir_in
+ "/temp", &dir
, &file
);
1143 s
= _GetFid(dir
, fid
);
1148 if (fid
!= DIR_ID_VALUE
) {
1149 s
= Status::IOError();
1153 s
= _GetSubFnames(dir
, result
);
1156 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1161 * @brief delete fname
1162 * @details [long description]
1164 * @param fname [description]
1165 * @return [description]
1167 Status
EnvLibrados::DeleteFile(const std::string
& fname
)
1169 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1170 std::string fid
, dir
, file
;
1171 split(fname
, &dir
, &file
);
1172 Status s
= _GetFid(dir
+ "/" + file
, fid
);
1174 if (s
.ok() && DIR_ID_VALUE
!= fid
) {
1175 s
= _DelFid(dir
+ "/" + file
);
1177 s
= Status::NotFound();
1179 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1184 * @brief create new dir
1185 * @details [long description]
1187 * @param dirname [description]
1188 * @return [description]
1190 Status
EnvLibrados::CreateDir(const std::string
& dirname
)
1192 LOG_DEBUG("[IN]%s\n", dirname
.c_str());
1193 std::string fid
, dir
, file
;
1194 split(dirname
+ "/temp", &dir
, &file
);
1195 Status s
= _GetFid(dir
+ "/" + file
, fid
);
1198 if (Status::NotFound() != s
&& fid
!= DIR_ID_VALUE
) {
1200 } else if (Status::OK() == s
&& fid
== DIR_ID_VALUE
) {
1204 s
= _AddFid(dir
, DIR_ID_VALUE
);
1207 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1212 * @brief create dir if missing
1213 * @details [long description]
1215 * @param dirname [description]
1216 * @return [description]
1218 Status
EnvLibrados::CreateDirIfMissing(const std::string
& dirname
)
1220 LOG_DEBUG("[IN]%s\n", dirname
.c_str());
1221 std::string fid
, dir
, file
;
1222 split(dirname
+ "/temp", &dir
, &file
);
1223 Status s
= Status::OK();
1226 s
= _GetFid(dir
, fid
);
1227 if (Status::NotFound() != s
) {
1231 s
= _AddFid(dir
, DIR_ID_VALUE
);
1234 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1242 * @param dirname [description]
1243 * @return [description]
1245 Status
EnvLibrados::DeleteDir(const std::string
& dirname
)
1247 LOG_DEBUG("[IN]%s\n", dirname
.c_str());
1248 std::string fid
, dir
, file
;
1249 split(dirname
+ "/temp", &dir
, &file
);
1250 Status s
= Status::OK();
1252 s
= _GetFid(dir
, fid
);
1254 if (s
.ok() && DIR_ID_VALUE
== fid
) {
1255 std::vector
<std::string
> subs
;
1256 s
= _GetSubFnames(dir
, &subs
);
1257 // if subfiles exist, can't delete dir
1258 if (subs
.size() > 0) {
1259 s
= Status::IOError();
1264 s
= Status::NotFound();
1267 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1272 * @brief return file size
1273 * @details [long description]
1275 * @param fname [description]
1276 * @param file_size [description]
1278 * @return [description]
1280 Status
EnvLibrados::GetFileSize(
1281 const std::string
& fname
,
1282 uint64_t* file_size
)
1284 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1285 std::string fid
, dir
, file
;
1286 split(fname
, &dir
, &file
);
1291 std::string fpath
= dir
+ "/" + file
;
1292 s
= _GetFid(fpath
, fid
);
1298 int ret
= _GetIoctx(fpath
)->stat(fid
, file_size
, &mtime
);
1300 LOG_DEBUG("%i\n", ret
);
1301 if (-ENOENT
== ret
) {
1305 s
= err_to_status(ret
);
1312 LOG_DEBUG("[OUT]%s|%lld\n", s
.ToString().c_str(), (long long)*file_size
);
1317 * @brief get file modification time
1318 * @details [long description]
1320 * @param fname [description]
1321 * @param file_mtime [description]
1323 * @return [description]
1325 Status
EnvLibrados::GetFileModificationTime(const std::string
& fname
,
1326 uint64_t* file_mtime
)
1328 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1329 std::string fid
, dir
, file
;
1330 split(fname
, &dir
, &file
);
1333 Status s
= Status::OK();
1335 std::string fpath
= dir
+ "/" + file
;
1336 s
= _GetFid(dir
+ "/" + file
, fid
);
1342 int ret
= _GetIoctx(fpath
)->stat(fid
, &file_size
, &mtime
);
1344 if (Status::NotFound() == err_to_status(ret
)) {
1345 *file_mtime
= static_cast<uint64_t>(mtime
);
1348 s
= err_to_status(ret
);
1355 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1360 * @brief rename file
1363 * @param src [description]
1364 * @param target_in [description]
1366 * @return [description]
1368 Status
EnvLibrados::RenameFile(
1369 const std::string
& src
,
1370 const std::string
& target_in
)
1372 LOG_DEBUG("[IN]%s => %s\n", src
.c_str(), target_in
.c_str());
1373 std::string src_fid
, tmp_fid
, src_dir
, src_file
, dst_dir
, dst_file
;
1374 split(src
, &src_dir
, &src_file
);
1375 split(target_in
, &dst_dir
, &dst_file
);
1377 auto s
= _RenameFid(src_dir
+ "/" + src_file
,
1378 dst_dir
+ "/" + dst_file
);
1379 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1384 * @brief not support
1385 * @details [long description]
1387 * @param src [description]
1388 * @param target_in [description]
1390 * @return [description]
1392 Status
EnvLibrados::LinkFile(
1393 const std::string
& src
,
1394 const std::string
& target_in
)
1396 LOG_DEBUG("[IO]%s => %s\n", src
.c_str(), target_in
.c_str());
1397 return Status::NotSupported();
1401 * @brief lock file. create if missing.
1402 * @details [long description]
1404 * It seems that LockFile is used for preventing other instance of RocksDB
1405 * from opening up the database at the same time. From RocksDB source code,
1406 * the invokes of LockFile are at following locations:
1408 * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover
1409 * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB
1411 * When db recovery and db destroy, RocksDB will call LockFile
1413 * @param fname [description]
1414 * @param lock [description]
1416 * @return [description]
1418 Status
EnvLibrados::LockFile(
1419 const std::string
& fname
,
1422 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1423 std::string fid
, dir
, file
;
1424 split(fname
, &dir
, &file
);
1425 Status s
= Status::OK();
1428 std::string fpath
= dir
+ "/" + file
;
1429 s
= _GetFid(fpath
, fid
);
1431 if (Status::OK() != s
&&
1432 Status::NotFound() != s
) {
1434 } else if (Status::NotFound() == s
) {
1435 s
= _AddFid(fpath
, _CreateFid());
1439 } else if (Status::OK() == s
&& DIR_ID_VALUE
== fid
) {
1440 s
= Status::IOError();
1444 *lock
= new LibradosFileLock(_GetIoctx(fpath
), fpath
);
1447 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1452 * @brief unlock file
1453 * @details [long description]
1455 * @param lock [description]
1456 * @return [description]
1458 Status
EnvLibrados::UnlockFile(FileLock
* lock
)
1460 LOG_DEBUG("[IO]%p\n", lock
);
1461 if (nullptr != lock
) {
1464 return Status::OK();
1469 * @brief not support
1470 * @details [long description]
1472 * @param db_path [description]
1473 * @param output_path [description]
1475 * @return [description]
1477 Status
EnvLibrados::GetAbsolutePath(
1478 const std::string
& db_path
,
1479 std::string
* output_path
)
1481 LOG_DEBUG("[IO]%s\n", db_path
.c_str());
1482 return Status::NotSupported();
1486 * @brief Get default EnvLibrados
1487 * @details [long description]
1488 * @return [description]
1490 EnvLibrados
* EnvLibrados::Default() {
1491 static EnvLibrados
default_env(default_db_name
,
1492 std::getenv(default_config_path
),
1494 return &default_env
;
1497 } // namespace ROCKSDB_NAMESPACE