]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/env_librados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rocksdb/utilities/env_librados.h"
5 #include "util/random.h"
14 #include <sys/syscall.h>
16 #define LOG_DEBUG(...) do{\
17 printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
21 #define LOG_DEBUG(...)
25 const char *default_db_name
= "default_envlibrados_db";
26 const char *default_pool_name
= "default_envlibrados_pool";
27 const char *default_config_path
= "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file
28 // maximum dir/file that can store in the fs
29 const int MAX_ITEMS_IN_FS
= 1 << 30;
31 const std::string ROOT_DIR_KEY
= "/";
32 const std::string DIR_ID_VALUE
= "<DIR>";
35 * @brief convert error code to status
36 * @details Convert internal linux error code to Status
38 * @param r [description]
39 * @return [description]
41 Status
err_to_status(int r
)
47 return Status::IOError();
50 return Status::NotFound(Status::kNone
);
52 return Status::InvalidArgument(Status::kNone
);
54 return Status::IOError(Status::kNone
);
57 assert(0 == "unrecognized error code");
58 return Status::NotSupported(Status::kNone
);
63 * @brief split file path into dir path and file name
65 * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
67 * b/c => dir '/b', file 'c'
68 * /a/b/c => dir '/b', file 'c'
70 * @param fn [description]
71 * @param dir [description]
72 * @param file [description]
74 void split(const std::string
&fn
, std::string
*dir
, std::string
*file
) {
75 LOG_DEBUG("[IN]%s\n", fn
.c_str());
76 int pos
= fn
.size() - 1;
77 while ('/' == fn
[pos
]) --pos
;
78 size_t fstart
= fn
.rfind('/', pos
);
79 *file
= fn
.substr(fstart
+ 1, pos
- fstart
);
82 while (pos
>= 0 && '/' == fn
[pos
]) --pos
;
87 size_t dstart
= fn
.rfind('/', pos
);
88 *dir
= fn
.substr(dstart
+ 1, pos
- dstart
);
89 *dir
= std::string("/") + *dir
;
92 LOG_DEBUG("[OUT]%s | %s\n", dir
->c_str(), file
->c_str());
95 // A file abstraction for reading sequentially through a file
96 class LibradosSequentialFile
: public SequentialFile
{
97 librados::IoCtx
* _io_ctx
;
102 LibradosSequentialFile(librados::IoCtx
* io_ctx
, std::string fid
, std::string hint
):
103 _io_ctx(io_ctx
), _fid(fid
), _hint(hint
), _offset(0) {}
105 ~LibradosSequentialFile() {}
110 * Read up to "n" bytes from the file. "scratch[0..n-1]" may be
111 * written by this routine. Sets "*result" to the data that was
112 * read (including if fewer than "n" bytes were successfully read).
113 * May set "*result" to point at data in "scratch[0..n-1]", so
114 * "scratch[0..n-1]" must be live when "*result" is used.
115 * If an error was encountered, returns a non-OK status.
117 * REQUIRES: External synchronization
119 * @param n [description]
120 * @param result [description]
121 * @param scratch [description]
122 * @return [description]
124 Status
Read(size_t n
, Slice
* result
, char* scratch
) {
125 LOG_DEBUG("[IN]%i\n", (int)n
);
126 librados::bufferlist buffer
;
128 int r
= _io_ctx
->read(_fid
, buffer
, n
, _offset
);
130 buffer
.begin().copy(r
, scratch
);
131 *result
= Slice(scratch
, r
);
135 s
= err_to_status(r
);
136 if (s
== Status::IOError()) {
141 LOG_DEBUG("[OUT]%s, %i, %s\n", s
.ToString().c_str(), (int)r
, buffer
.c_str());
146 * @brief skip "n" bytes from the file
148 * Skip "n" bytes from the file. This is guaranteed to be no
149 * slower that reading the same data, but may be faster.
151 * If end of file is reached, skipping will stop at the end of the
152 * file, and Skip will return OK.
154 * REQUIRES: External synchronization
156 * @param n [description]
157 * @return [description]
159 Status
Skip(uint64_t n
) {
167 * rocksdb has it's own caching capabilities that we should be able to use,
168 * without relying on a cache here. This can safely be a no-op.
170 * @param offset [description]
171 * @param length [description]
173 * @return [description]
175 Status
InvalidateCache(size_t offset
, size_t length
) {
180 // A file abstraction for randomly reading the contents of a file.
181 class LibradosRandomAccessFile
: public RandomAccessFile
{
182 librados::IoCtx
* _io_ctx
;
186 LibradosRandomAccessFile(librados::IoCtx
* io_ctx
, std::string fid
, std::string hint
):
187 _io_ctx(io_ctx
), _fid(fid
), _hint(hint
) {}
189 ~LibradosRandomAccessFile() {}
193 * @details similar to LibradosSequentialFile::Read
195 * @param offset [description]
196 * @param n [description]
197 * @param result [description]
198 * @param scratch [description]
199 * @return [description]
201 Status
Read(uint64_t offset
, size_t n
, Slice
* result
,
202 char* scratch
) const {
203 LOG_DEBUG("[IN]%i\n", (int)n
);
204 librados::bufferlist buffer
;
206 int r
= _io_ctx
->read(_fid
, buffer
, n
, offset
);
208 buffer
.begin().copy(r
, scratch
);
209 *result
= Slice(scratch
, r
);
212 s
= err_to_status(r
);
213 if (s
== Status::IOError()) {
218 LOG_DEBUG("[OUT]%s, %i, %s\n", s
.ToString().c_str(), (int)r
, buffer
.c_str());
223 * @brief [brief description]
224 * @details Get unique id for each file and guarantee this id is different for each file
226 * @param id [description]
227 * @param max_size max size of id, it shoud be larger than 16
229 * @return [description]
231 size_t GetUniqueId(char* id
, size_t max_size
) const {
232 // All fid has the same db_id prefix, so we need to ignore db_id prefix
233 size_t s
= std::min(max_size
, _fid
.size());
234 strncpy(id
, _fid
.c_str() + (_fid
.size() - s
), s
);
239 //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
240 void Hint(AccessPattern pattern
) {
246 * @details [long description]
248 * @param offset [description]
249 * @param length [description]
251 * @return [description]
253 Status
InvalidateCache(size_t offset
, size_t length
) {
259 // A file abstraction for sequential writing. The implementation
260 // must provide buffering since callers may append small fragments
261 // at a time to the file.
262 class LibradosWritableFile
: public WritableFile
{
263 librados::IoCtx
* _io_ctx
;
266 const EnvLibrados
* const _env
;
268 std::mutex _mutex
; // used to protect modification of all following variables
269 librados::bufferlist _buffer
; // write buffer
270 uint64_t _buffer_size
; // write buffer size
271 uint64_t _file_size
; // this file size doesn't include buffer size
274 * @brief assuming caller holds lock
275 * @details [long description]
276 * @return [description]
279 // 1. sync append data to RADOS
280 int r
= _io_ctx
->append(_fid
, _buffer
, _buffer_size
);
283 // 2. update local variables
286 _file_size
+= _buffer_size
;
294 LibradosWritableFile(librados::IoCtx
* io_ctx
,
297 const EnvLibrados
* const env
)
298 : _io_ctx(io_ctx
), _fid(fid
), _hint(hint
), _env(env
), _buffer(), _buffer_size(0), _file_size(0) {
299 int ret
= _io_ctx
->stat(_fid
, &_file_size
, nullptr);
307 ~LibradosWritableFile() {
308 // sync before closeing writable file
313 * @brief append data to file
315 * Append will save all written data in buffer util buffer size
316 * reaches buffer max size. Then, it will write buffer into rados
318 * @param data [description]
319 * @return [description]
321 Status
Append(const Slice
& data
) {
323 LOG_DEBUG("[IN] %i | %s\n", (int)data
.size(), data
.data());
326 std::lock_guard
<std::mutex
> lock(_mutex
);
327 _buffer
.append(data
.data(), data
.size());
328 _buffer_size
+= data
.size();
330 if (_buffer_size
> _env
->_write_buffer_size
) {
334 LOG_DEBUG("[OUT] %i\n", r
);
335 return err_to_status(r
);
339 * @brief not supported
340 * @details [long description]
341 * @return [description]
343 Status
PositionedAppend(
344 const Slice
& /* data */,
345 uint64_t /* offset */) {
346 return Status::NotSupported();
350 * @brief truncate file to assigned size
351 * @details [long description]
353 * @param size [description]
354 * @return [description]
356 Status
Truncate(uint64_t size
) {
357 LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size
, (long long)_file_size
, (long long)_buffer_size
);
360 std::lock_guard
<std::mutex
> lock(_mutex
);
361 if (_file_size
> size
) {
362 r
= _io_ctx
->trunc(_fid
, size
);
369 } else if (_file_size
== size
) {
373 librados::bufferlist tmp
;
375 _buffer
.substr_of(tmp
, 0, size
- _file_size
);
376 _buffer_size
= size
- _file_size
;
379 LOG_DEBUG("[OUT] %i\n", r
);
380 return err_to_status(r
);
385 * @details [long description]
386 * @return [description]
389 LOG_DEBUG("%s | %lld | %lld\n", _hint
.c_str(), (long long)_buffer_size
, (long long)_file_size
);
395 * @details initiate an aio write and not wait
397 * @return [description]
400 librados::AioCompletion
*write_completion
= librados::Rados::aio_create_completion();
403 std::lock_guard
<std::mutex
> lock(_mutex
);
404 r
= _io_ctx
->aio_append(_fid
, write_completion
, _buffer
, _buffer_size
);
407 _file_size
+= _buffer_size
;
412 write_completion
->release();
414 return err_to_status(r
);
418 * @brief write buffer data to rados
419 * @details initiate an aio write and wait for result
420 * @return [description]
422 Status
Sync() { // sync data
425 std::lock_guard
<std::mutex
> lock(_mutex
);
426 if (_buffer_size
> 0) {
430 return err_to_status(r
);
434 * @brief [brief description]
435 * @details [long description]
436 * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
438 bool IsSyncThreadSafe() const {
443 * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
444 * @details [long description]
445 * @return [description]
447 bool use_direct_io() const {
452 * @brief Get file size
454 * This API will use cached file_size.
455 * @return [description]
457 uint64_t GetFileSize() {
458 LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size
, (long long)_file_size
);
460 std::lock_guard
<std::mutex
> lock(_mutex
);
461 int file_size
= _file_size
+ _buffer_size
;
467 * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
468 * @details [long description]
470 * @param id [description]
471 * @param max_size [description]
473 * @return [description]
475 size_t GetUniqueId(char* id
, size_t max_size
) const {
476 // All fid has the same db_id prefix, so we need to ignore db_id prefix
477 size_t s
= std::min(max_size
, _fid
.size());
478 strncpy(id
, _fid
.c_str() + (_fid
.size() - s
), s
);
485 * @details [long description]
487 * @param offset [description]
488 * @param length [description]
490 * @return [description]
492 Status
InvalidateCache(size_t offset
, size_t length
) {
496 using WritableFile::RangeSync
;
498 * @brief No RangeSync support, just call Sync()
499 * @details [long description]
501 * @param offset [description]
502 * @param nbytes [description]
504 * @return [description]
506 Status
RangeSync(off_t offset
, off_t nbytes
) {
511 using WritableFile::Allocate
;
514 * @details [long description]
516 * @param offset [description]
517 * @param len [description]
519 * @return [description]
521 Status
Allocate(off_t offset
, off_t len
) {
527 // Directory object represents collection of files and implements
528 // filesystem operations that can be executed on directories.
529 class LibradosDirectory
: public Directory
{
530 librados::IoCtx
* _io_ctx
;
533 explicit LibradosDirectory(librados::IoCtx
* io_ctx
, std::string fid
):
534 _io_ctx(io_ctx
), _fid(fid
) {}
536 // Fsync directory. Can be called concurrently from multiple threads.
542 // Identifies a locked file.
543 // This is exclusive lock and can't nested lock by same thread
544 class LibradosFileLock
: public FileLock
{
545 librados::IoCtx
* _io_ctx
;
546 const std::string _obj_name
;
547 const std::string _lock_name
;
548 const std::string _cookie
;
552 librados::IoCtx
* io_ctx
,
553 const std::string obj_name
):
556 _lock_name("lock_name"),
559 // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
560 while (!_io_ctx
->lock_exclusive(
564 "description", nullptr, 0));
567 ~LibradosFileLock() {
568 _io_ctx
->unlock(_obj_name
, _lock_name
, _cookie
);
573 // --------------------
574 // --- EnvLibrados ----
575 // --------------------
577 * @brief EnvLibrados ctor
578 * @details [long description]
580 * @param db_name unique database name
581 * @param config_path the configure file path for rados
583 EnvLibrados::EnvLibrados(const std::string
& db_name
,
584 const std::string
& config_path
,
585 const std::string
& db_pool
)
586 : EnvLibrados("client.admin",
597 * @brief EnvLibrados ctor
598 * @details [long description]
600 * @param client_name first 3 parameters is for RADOS client init
601 * @param cluster_name
603 * @param db_name unique database name, used as db_id key
604 * @param config_path the configure file path for rados
605 * @param db_pool the pool for db data
606 * @param wal_pool the pool for WAL data
607 * @param write_buffer_size WritableFile buffer max size
609 EnvLibrados::EnvLibrados(const std::string
& client_name
,
610 const std::string
& cluster_name
,
611 const uint64_t flags
,
612 const std::string
& db_name
,
613 const std::string
& config_path
,
614 const std::string
& db_pool
,
615 const std::string
& wal_dir
,
616 const std::string
& wal_pool
,
617 const uint64_t write_buffer_size
)
618 : EnvWrapper(Env::Default()),
619 _client_name(client_name
),
620 _cluster_name(cluster_name
),
623 _config_path(config_path
),
624 _db_pool_name(db_pool
),
626 _wal_pool_name(wal_pool
),
627 _write_buffer_size(write_buffer_size
) {
630 // 1. create a Rados object and initialize it
631 ret
= _rados
.init2(_client_name
.c_str(), _cluster_name
.c_str(), _flags
); // just use the client.admin keyring
632 if (ret
< 0) { // let's handle any error that might have come back
633 std::cerr
<< "couldn't initialize rados! error " << ret
<< std::endl
;
638 // 2. read configure file
639 ret
= _rados
.conf_read_file(_config_path
.c_str());
641 // This could fail if the config file is malformed, but it'd be hard.
642 std::cerr
<< "failed to parse config file " << _config_path
643 << "! error" << ret
<< std::endl
;
648 // 3. we actually connect to the cluster
649 ret
= _rados
.connect();
651 std::cerr
<< "couldn't connect to cluster! error " << ret
<< std::endl
;
656 // 4. create db_pool if not exist
657 ret
= _rados
.pool_create(_db_pool_name
.c_str());
658 if (ret
< 0 && ret
!= -EEXIST
&& ret
!= -EPERM
) {
659 std::cerr
<< "couldn't create pool! error " << ret
<< std::endl
;
663 // 5. create db_pool_ioctx
664 ret
= _rados
.ioctx_create(_db_pool_name
.c_str(), _db_pool_ioctx
);
666 std::cerr
<< "couldn't set up ioctx! error " << ret
<< std::endl
;
671 // 6. create wal_pool if not exist
672 ret
= _rados
.pool_create(_wal_pool_name
.c_str());
673 if (ret
< 0 && ret
!= -EEXIST
&& ret
!= -EPERM
) {
674 std::cerr
<< "couldn't create pool! error " << ret
<< std::endl
;
678 // 7. create wal_pool_ioctx
679 ret
= _rados
.ioctx_create(_wal_pool_name
.c_str(), _wal_pool_ioctx
);
681 std::cerr
<< "couldn't set up ioctx! error " << ret
<< std::endl
;
687 _AddFid(ROOT_DIR_KEY
, DIR_ID_VALUE
);
690 LOG_DEBUG("rados connect result code : %i\n", ret
);
693 /****************************************************
694 private functions to handle fid operation.
695 Dir also have fid, but the value is DIR_ID_VALUE
696 ****************************************************/
699 * @brief generate a new fid
700 * @details [long description]
701 * @return [description]
703 std::string
EnvLibrados::_CreateFid() {
704 return _db_name
+ "." + GenerateUniqueId();
709 * @details [long description]
711 * @param fname [description]
712 * @param fid [description]
718 Status
EnvLibrados::_GetFid(
719 const std::string
&fname
,
721 std::set
<std::string
> keys
;
722 std::map
<std::string
, librados::bufferlist
> kvs
;
724 int r
= _db_pool_ioctx
.omap_get_vals_by_keys(_db_name
, keys
, &kvs
);
726 if (0 == r
&& 0 == kvs
.size()) {
727 return Status::NotFound();
728 } else if (0 == r
&& 0 != kvs
.size()) {
729 fid
.assign(kvs
[fname
].c_str(), kvs
[fname
].length());
732 return err_to_status(r
);
738 * @details Only modify object in rados once,
739 * so this rename operation is atomic in term of rados
741 * @param old_fname [description]
742 * @param new_fname [description]
744 * @return [description]
746 Status
EnvLibrados::_RenameFid(const std::string
& old_fname
,
747 const std::string
& new_fname
) {
749 Status s
= _GetFid(old_fname
, fid
);
751 if (Status::OK() != s
) {
755 librados::bufferlist bl
;
756 std::set
<std::string
> keys
;
757 std::map
<std::string
, librados::bufferlist
> kvs
;
758 librados::ObjectWriteOperation o
;
760 keys
.insert(old_fname
);
762 o
.omap_rm_keys(keys
);
764 int r
= _db_pool_ioctx
.operate(_db_name
, &o
);
765 return err_to_status(r
);
769 * @brief add <file path, fid> to metadata object. It may overwrite exist key.
770 * @details [long description]
772 * @param fname [description]
773 * @param fid [description]
775 * @return [description]
777 Status
EnvLibrados::_AddFid(
778 const std::string
& fname
,
779 const std::string
& fid
) {
780 std::map
<std::string
, librados::bufferlist
> kvs
;
781 librados::bufferlist value
;
784 int r
= _db_pool_ioctx
.omap_set(_db_name
, kvs
);
785 return err_to_status(r
);
789 * @brief return subfile names of dir.
791 * RocksDB has a 2-level structure, so all keys
792 * that have dir as prefix are subfiles of dir.
793 * So we can just return these files' name.
795 * @param dir [description]
796 * @param result [description]
798 * @return [description]
800 Status
EnvLibrados::_GetSubFnames(
801 const std::string
& dir
,
802 std::vector
<std::string
> * result
804 std::string
start_after(dir
);
805 std::string
filter_prefix(dir
);
806 std::map
<std::string
, librados::bufferlist
> kvs
;
807 _db_pool_ioctx
.omap_get_vals(_db_name
,
808 start_after
, filter_prefix
,
809 MAX_ITEMS_IN_FS
, &kvs
);
812 for (auto i
= kvs
.begin(); i
!= kvs
.end(); i
++) {
813 result
->push_back(i
->first
.substr(dir
.size() + 1));
819 * @brief delete key fname from metadata object
820 * @details [long description]
822 * @param fname [description]
823 * @return [description]
825 Status
EnvLibrados::_DelFid(
826 const std::string
& fname
) {
827 std::set
<std::string
> keys
;
829 int r
= _db_pool_ioctx
.omap_rm_keys(_db_name
, keys
);
830 return err_to_status(r
);
834 * @brief get match IoCtx from _prefix_pool_map
835 * @details [long description]
837 * @param prefix [description]
838 * @return [description]
841 librados::IoCtx
* EnvLibrados::_GetIoctx(const std::string
& fpath
) {
842 auto is_prefix
= [](const std::string
& s1
, const std::string
& s2
) {
843 auto it1
= s1
.begin(), it2
= s2
.begin();
844 while (it1
!= s1
.end() && it2
!= s2
.end() && *it1
== *it2
) ++it1
, ++it2
;
845 return it1
== s1
.end();
848 if (is_prefix(_wal_dir
, fpath
)) {
849 return &_wal_pool_ioctx
;
851 return &_db_pool_ioctx
;
855 /************************************************************
857 ************************************************************/
859 * @brief generate unique id
860 * @details Combine system time and random number.
861 * @return [description]
863 std::string
EnvLibrados::GenerateUniqueId() {
864 Random64
r(time(nullptr));
865 uint64_t random_uuid_portion
=
866 r
.Uniform(std::numeric_limits
<uint64_t>::max());
867 uint64_t nanos_uuid_portion
= NowNanos();
872 (unsigned long)nanos_uuid_portion
,
873 (unsigned long)random_uuid_portion
);
878 * @brief create a new sequential read file handler
879 * @details it will check the existence of fname
881 * @param fname [description]
882 * @param result [description]
883 * @param options [description]
884 * @return [description]
886 Status
EnvLibrados::NewSequentialFile(
887 const std::string
& fname
,
888 std::unique_ptr
<SequentialFile
>* result
,
889 const EnvOptions
& options
)
891 LOG_DEBUG("[IN]%s\n", fname
.c_str());
892 std::string dir
, file
, fid
;
893 split(fname
, &dir
, &file
);
895 std::string fpath
= dir
+ "/" + file
;
897 s
= _GetFid(dir
, fid
);
899 if (!s
.ok() || fid
!= DIR_ID_VALUE
) {
900 if (fid
!= DIR_ID_VALUE
) s
= Status::IOError();
904 s
= _GetFid(fpath
, fid
);
906 if (Status::NotFound() == s
) {
907 s
= Status::IOError();
912 result
->reset(new LibradosSequentialFile(_GetIoctx(fpath
), fid
, fpath
));
916 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
921 * @brief create a new random access file handler
922 * @details it will check the existence of fname
924 * @param fname [description]
925 * @param result [description]
926 * @param options [description]
927 * @return [description]
929 Status
EnvLibrados::NewRandomAccessFile(
930 const std::string
& fname
,
931 std::unique_ptr
<RandomAccessFile
>* result
,
932 const EnvOptions
& options
)
934 LOG_DEBUG("[IN]%s\n", fname
.c_str());
935 std::string dir
, file
, fid
;
936 split(fname
, &dir
, &file
);
938 std::string fpath
= dir
+ "/" + file
;
940 s
= _GetFid(dir
, fid
);
942 if (!s
.ok() || fid
!= DIR_ID_VALUE
) {
943 s
= Status::IOError();
947 s
= _GetFid(fpath
, fid
);
949 if (Status::NotFound() == s
) {
950 s
= Status::IOError();
955 result
->reset(new LibradosRandomAccessFile(_GetIoctx(fpath
), fid
, fpath
));
959 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
964 * @brief create a new write file handler
965 * @details it will check the existence of fname
967 * @param fname [description]
968 * @param result [description]
969 * @param options [description]
970 * @return [description]
972 Status
EnvLibrados::NewWritableFile(
973 const std::string
& fname
,
974 std::unique_ptr
<WritableFile
>* result
,
975 const EnvOptions
& options
)
977 LOG_DEBUG("[IN]%s\n", fname
.c_str());
978 std::string dir
, file
, fid
;
979 split(fname
, &dir
, &file
);
981 std::string fpath
= dir
+ "/" + file
;
984 // 1. check if dir exist
985 s
= _GetFid(dir
, fid
);
990 if (fid
!= DIR_ID_VALUE
) {
991 s
= Status::IOError();
995 // 2. check if file exist.
997 // 2.2 not exist, create it
998 s
= _GetFid(fpath
, fid
);
999 if (Status::NotFound() == s
) {
1001 _AddFid(fpath
, fid
);
1004 result
->reset(new LibradosWritableFile(_GetIoctx(fpath
), fid
, fpath
, this));
1008 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1013 * @brief reuse write file handler
1015 * This function will rename old_fname to new_fname,
1016 * then return the handler of new_fname
1018 * @param new_fname [description]
1019 * @param old_fname [description]
1020 * @param result [description]
1021 * @param options [description]
1022 * @return [description]
1024 Status
EnvLibrados::ReuseWritableFile(
1025 const std::string
& new_fname
,
1026 const std::string
& old_fname
,
1027 std::unique_ptr
<WritableFile
>* result
,
1028 const EnvOptions
& options
)
1030 LOG_DEBUG("[IN]%s => %s\n", old_fname
.c_str(), new_fname
.c_str());
1031 std::string src_fid
, tmp_fid
, src_dir
, src_file
, dst_dir
, dst_file
;
1032 split(old_fname
, &src_dir
, &src_file
);
1033 split(new_fname
, &dst_dir
, &dst_file
);
1035 std::string src_fpath
= src_dir
+ "/" + src_file
;
1036 std::string dst_fpath
= dst_dir
+ "/" + dst_file
;
1037 Status r
= Status::OK();
1039 r
= _RenameFid(src_fpath
,
1045 result
->reset(new LibradosWritableFile(_GetIoctx(dst_fpath
), src_fid
, dst_fpath
, this));
1048 LOG_DEBUG("[OUT]%s\n", r
.ToString().c_str());
1053 * @brief create a new directory handler
1054 * @details [long description]
1056 * @param name [description]
1057 * @param result [description]
1059 * @return [description]
1061 Status
EnvLibrados::NewDirectory(
1062 const std::string
& name
,
1063 std::unique_ptr
<Directory
>* result
)
1065 LOG_DEBUG("[IN]%s\n", name
.c_str());
1066 std::string fid
, dir
, file
;
1067 /* just want to get dir name */
1068 split(name
+ "/tmp", &dir
, &file
);
1072 s
= _GetFid(dir
, fid
);
1074 if (!s
.ok() || DIR_ID_VALUE
!= fid
) {
1075 s
= Status::IOError(name
, strerror(-ENOENT
));
1079 if (Status::NotFound() == s
) {
1080 s
= _AddFid(dir
, DIR_ID_VALUE
);
1082 } else if (!s
.ok()) {
1086 result
->reset(new LibradosDirectory(_GetIoctx(dir
), dir
));
1090 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1095 * @brief check if fname is exist
1096 * @details [long description]
1098 * @param fname [description]
1099 * @return [description]
1101 Status
EnvLibrados::FileExists(const std::string
& fname
)
1103 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1104 std::string fid
, dir
, file
;
1105 split(fname
, &dir
, &file
);
1106 Status s
= _GetFid(dir
+ "/" + file
, fid
);
1108 if (s
.ok() && fid
!= DIR_ID_VALUE
) {
1112 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1117 * @brief get subfile name of dir_in
1118 * @details [long description]
1120 * @param dir_in [description]
1121 * @param result [description]
1123 * @return [description]
1125 Status
EnvLibrados::GetChildren(
1126 const std::string
& dir_in
,
1127 std::vector
<std::string
>* result
)
1129 LOG_DEBUG("[IN]%s\n", dir_in
.c_str());
1130 std::string fid
, dir
, file
;
1131 split(dir_in
+ "/temp", &dir
, &file
);
1135 s
= _GetFid(dir
, fid
);
1140 if (fid
!= DIR_ID_VALUE
) {
1141 s
= Status::IOError();
1145 s
= _GetSubFnames(dir
, result
);
1148 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1153 * @brief delete fname
1154 * @details [long description]
1156 * @param fname [description]
1157 * @return [description]
1159 Status
EnvLibrados::DeleteFile(const std::string
& fname
)
1161 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1162 std::string fid
, dir
, file
;
1163 split(fname
, &dir
, &file
);
1164 Status s
= _GetFid(dir
+ "/" + file
, fid
);
1166 if (s
.ok() && DIR_ID_VALUE
!= fid
) {
1167 s
= _DelFid(dir
+ "/" + file
);
1169 s
= Status::NotFound();
1171 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1176 * @brief create new dir
1177 * @details [long description]
1179 * @param dirname [description]
1180 * @return [description]
1182 Status
EnvLibrados::CreateDir(const std::string
& dirname
)
1184 LOG_DEBUG("[IN]%s\n", dirname
.c_str());
1185 std::string fid
, dir
, file
;
1186 split(dirname
+ "/temp", &dir
, &file
);
1187 Status s
= _GetFid(dir
+ "/" + file
, fid
);
1190 if (Status::NotFound() != s
&& fid
!= DIR_ID_VALUE
) {
1192 } else if (Status::OK() == s
&& fid
== DIR_ID_VALUE
) {
1196 s
= _AddFid(dir
, DIR_ID_VALUE
);
1199 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1204 * @brief create dir if missing
1205 * @details [long description]
1207 * @param dirname [description]
1208 * @return [description]
1210 Status
EnvLibrados::CreateDirIfMissing(const std::string
& dirname
)
1212 LOG_DEBUG("[IN]%s\n", dirname
.c_str());
1213 std::string fid
, dir
, file
;
1214 split(dirname
+ "/temp", &dir
, &file
);
1215 Status s
= Status::OK();
1218 s
= _GetFid(dir
, fid
);
1219 if (Status::NotFound() != s
) {
1223 s
= _AddFid(dir
, DIR_ID_VALUE
);
1226 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1234 * @param dirname [description]
1235 * @return [description]
1237 Status
EnvLibrados::DeleteDir(const std::string
& dirname
)
1239 LOG_DEBUG("[IN]%s\n", dirname
.c_str());
1240 std::string fid
, dir
, file
;
1241 split(dirname
+ "/temp", &dir
, &file
);
1242 Status s
= Status::OK();
1244 s
= _GetFid(dir
, fid
);
1246 if (s
.ok() && DIR_ID_VALUE
== fid
) {
1247 std::vector
<std::string
> subs
;
1248 s
= _GetSubFnames(dir
, &subs
);
1249 // if subfiles exist, can't delete dir
1250 if (subs
.size() > 0) {
1251 s
= Status::IOError();
1256 s
= Status::NotFound();
1259 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1264 * @brief return file size
1265 * @details [long description]
1267 * @param fname [description]
1268 * @param file_size [description]
1270 * @return [description]
1272 Status
EnvLibrados::GetFileSize(
1273 const std::string
& fname
,
1274 uint64_t* file_size
)
1276 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1277 std::string fid
, dir
, file
;
1278 split(fname
, &dir
, &file
);
1283 std::string fpath
= dir
+ "/" + file
;
1284 s
= _GetFid(fpath
, fid
);
1290 int ret
= _GetIoctx(fpath
)->stat(fid
, file_size
, &mtime
);
1292 LOG_DEBUG("%i\n", ret
);
1293 if (-ENOENT
== ret
) {
1297 s
= err_to_status(ret
);
1304 LOG_DEBUG("[OUT]%s|%lld\n", s
.ToString().c_str(), (long long)*file_size
);
1309 * @brief get file modification time
1310 * @details [long description]
1312 * @param fname [description]
1313 * @param file_mtime [description]
1315 * @return [description]
1317 Status
EnvLibrados::GetFileModificationTime(const std::string
& fname
,
1318 uint64_t* file_mtime
)
1320 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1321 std::string fid
, dir
, file
;
1322 split(fname
, &dir
, &file
);
1325 Status s
= Status::OK();
1327 std::string fpath
= dir
+ "/" + file
;
1328 s
= _GetFid(dir
+ "/" + file
, fid
);
1334 int ret
= _GetIoctx(fpath
)->stat(fid
, &file_size
, &mtime
);
1336 if (Status::NotFound() == err_to_status(ret
)) {
1337 *file_mtime
= static_cast<uint64_t>(mtime
);
1340 s
= err_to_status(ret
);
1347 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1352 * @brief rename file
1355 * @param src [description]
1356 * @param target_in [description]
1358 * @return [description]
1360 Status
EnvLibrados::RenameFile(
1361 const std::string
& src
,
1362 const std::string
& target_in
)
1364 LOG_DEBUG("[IN]%s => %s\n", src
.c_str(), target_in
.c_str());
1365 std::string src_fid
, tmp_fid
, src_dir
, src_file
, dst_dir
, dst_file
;
1366 split(src
, &src_dir
, &src_file
);
1367 split(target_in
, &dst_dir
, &dst_file
);
1369 auto s
= _RenameFid(src_dir
+ "/" + src_file
,
1370 dst_dir
+ "/" + dst_file
);
1371 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1376 * @brief not support
1377 * @details [long description]
1379 * @param src [description]
1380 * @param target_in [description]
1382 * @return [description]
1384 Status
EnvLibrados::LinkFile(
1385 const std::string
& src
,
1386 const std::string
& target_in
)
1388 LOG_DEBUG("[IO]%s => %s\n", src
.c_str(), target_in
.c_str());
1389 return Status::NotSupported();
1393 * @brief lock file. create if missing.
1394 * @details [long description]
1396 * It seems that LockFile is used for preventing other instance of RocksDB
1397 * from opening up the database at the same time. From RocksDB source code,
1398 * the invokes of LockFile are at following locations:
1400 * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover
1401 * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB
1403 * When db recovery and db destroy, RocksDB will call LockFile
1405 * @param fname [description]
1406 * @param lock [description]
1408 * @return [description]
1410 Status
EnvLibrados::LockFile(
1411 const std::string
& fname
,
1414 LOG_DEBUG("[IN]%s\n", fname
.c_str());
1415 std::string fid
, dir
, file
;
1416 split(fname
, &dir
, &file
);
1417 Status s
= Status::OK();
1420 std::string fpath
= dir
+ "/" + file
;
1421 s
= _GetFid(fpath
, fid
);
1423 if (Status::OK() != s
&&
1424 Status::NotFound() != s
) {
1426 } else if (Status::NotFound() == s
) {
1427 s
= _AddFid(fpath
, _CreateFid());
1431 } else if (Status::OK() == s
&& DIR_ID_VALUE
== fid
) {
1432 s
= Status::IOError();
1436 *lock
= new LibradosFileLock(_GetIoctx(fpath
), fpath
);
1439 LOG_DEBUG("[OUT]%s\n", s
.ToString().c_str());
1444 * @brief unlock file
1445 * @details [long description]
1447 * @param lock [description]
1448 * @return [description]
1450 Status
EnvLibrados::UnlockFile(FileLock
* lock
)
1452 LOG_DEBUG("[IO]%p\n", lock
);
1453 if (nullptr != lock
) {
1456 return Status::OK();
1461 * @brief not support
1462 * @details [long description]
1464 * @param db_path [description]
1465 * @param output_path [description]
1467 * @return [description]
1469 Status
EnvLibrados::GetAbsolutePath(
1470 const std::string
& db_path
,
1471 std::string
* output_path
)
1473 LOG_DEBUG("[IO]%s\n", db_path
.c_str());
1474 return Status::NotSupported();
1478 * @brief Get default EnvLibrados
1479 * @details [long description]
1480 * @return [description]
1482 EnvLibrados
* EnvLibrados::Default() {
1483 static EnvLibrados
default_env(default_db_name
,
1484 std::getenv(default_config_path
),
1486 return &default_env
;
1488 // @lint-ignore TXT4 T25377293 Grandfathered in