]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/env_librados.cc
import 15.2.4
[ceph.git] / ceph / src / rocksdb / utilities / env_librados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rocksdb/utilities/env_librados.h"
5 #include "util/random.h"
6 #include <mutex>
7 #include <cstdlib>
8
9 namespace rocksdb {
10 /* GLOBAL DIFINE */
11 // #define DEBUG
12 #ifdef DEBUG
13 #include <cstdio>
14 #include <sys/syscall.h>
15 #include <unistd.h>
16 #define LOG_DEBUG(...) do{\
17 printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
18 printf(__VA_ARGS__);\
19 }while(0)
20 #else
21 #define LOG_DEBUG(...)
22 #endif
23
24 /* GLOBAL CONSTANT */
25 const char *default_db_name = "default_envlibrados_db";
26 const char *default_pool_name = "default_envlibrados_pool";
27 const char *default_config_path = "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file
28 // maximum dir/file that can store in the fs
29 const int MAX_ITEMS_IN_FS = 1 << 30;
30 // root dir tag
31 const std::string ROOT_DIR_KEY = "/";
32 const std::string DIR_ID_VALUE = "<DIR>";
33
34 /**
35 * @brief convert error code to status
36 * @details Convert internal linux error code to Status
37 *
38 * @param r [description]
39 * @return [description]
40 */
41 Status err_to_status(int r)
42 {
43 switch (r) {
44 case 0:
45 return Status::OK();
46 case -ENOENT:
47 return Status::IOError();
48 case -ENODATA:
49 case -ENOTDIR:
50 return Status::NotFound(Status::kNone);
51 case -EINVAL:
52 return Status::InvalidArgument(Status::kNone);
53 case -EIO:
54 return Status::IOError(Status::kNone);
55 default:
56 // FIXME :(
57 assert(0 == "unrecognized error code");
58 return Status::NotSupported(Status::kNone);
59 }
60 }
61
62 /**
63 * @brief split file path into dir path and file name
64 * @details
65 * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
66 * For example:
67 * b/c => dir '/b', file 'c'
68 * /a/b/c => dir '/b', file 'c'
69 *
70 * @param fn [description]
71 * @param dir [description]
72 * @param file [description]
73 */
74 void split(const std::string &fn, std::string *dir, std::string *file) {
75 LOG_DEBUG("[IN]%s\n", fn.c_str());
76 int pos = fn.size() - 1;
77 while ('/' == fn[pos]) --pos;
78 size_t fstart = fn.rfind('/', pos);
79 *file = fn.substr(fstart + 1, pos - fstart);
80
81 pos = fstart;
82 while (pos >= 0 && '/' == fn[pos]) --pos;
83
84 if (pos < 0) {
85 *dir = "/";
86 } else {
87 size_t dstart = fn.rfind('/', pos);
88 *dir = fn.substr(dstart + 1, pos - dstart);
89 *dir = std::string("/") + *dir;
90 }
91
92 LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str());
93 }
94
95 // A file abstraction for reading sequentially through a file
96 class LibradosSequentialFile : public SequentialFile {
97 librados::IoCtx * _io_ctx;
98 std::string _fid;
99 std::string _hint;
100 int _offset;
101 public:
102 LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
103 _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {}
104
105 ~LibradosSequentialFile() {}
106
107 /**
108 * @brief read file
109 * @details
110 * Read up to "n" bytes from the file. "scratch[0..n-1]" may be
111 * written by this routine. Sets "*result" to the data that was
112 * read (including if fewer than "n" bytes were successfully read).
113 * May set "*result" to point at data in "scratch[0..n-1]", so
114 * "scratch[0..n-1]" must be live when "*result" is used.
115 * If an error was encountered, returns a non-OK status.
116 *
117 * REQUIRES: External synchronization
118 *
119 * @param n [description]
120 * @param result [description]
121 * @param scratch [description]
122 * @return [description]
123 */
124 Status Read(size_t n, Slice* result, char* scratch) {
125 LOG_DEBUG("[IN]%i\n", (int)n);
126 librados::bufferlist buffer;
127 Status s;
128 int r = _io_ctx->read(_fid, buffer, n, _offset);
129 if (r >= 0) {
130 buffer.begin().copy(r, scratch);
131 *result = Slice(scratch, r);
132 _offset += r;
133 s = Status::OK();
134 } else {
135 s = err_to_status(r);
136 if (s == Status::IOError()) {
137 *result = Slice();
138 s = Status::OK();
139 }
140 }
141 LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
142 return s;
143 }
144
145 /**
146 * @brief skip "n" bytes from the file
147 * @details
148 * Skip "n" bytes from the file. This is guaranteed to be no
149 * slower that reading the same data, but may be faster.
150 *
151 * If end of file is reached, skipping will stop at the end of the
152 * file, and Skip will return OK.
153 *
154 * REQUIRES: External synchronization
155 *
156 * @param n [description]
157 * @return [description]
158 */
159 Status Skip(uint64_t n) {
160 _offset += n;
161 return Status::OK();
162 }
163
164 /**
165 * @brief noop
166 * @details
167 * rocksdb has it's own caching capabilities that we should be able to use,
168 * without relying on a cache here. This can safely be a no-op.
169 *
170 * @param offset [description]
171 * @param length [description]
172 *
173 * @return [description]
174 */
175 Status InvalidateCache(size_t offset, size_t length) {
176 return Status::OK();
177 }
178 };
179
180 // A file abstraction for randomly reading the contents of a file.
181 class LibradosRandomAccessFile : public RandomAccessFile {
182 librados::IoCtx * _io_ctx;
183 std::string _fid;
184 std::string _hint;
185 public:
186 LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
187 _io_ctx(io_ctx), _fid(fid), _hint(hint) {}
188
189 ~LibradosRandomAccessFile() {}
190
191 /**
192 * @brief read file
193 * @details similar to LibradosSequentialFile::Read
194 *
195 * @param offset [description]
196 * @param n [description]
197 * @param result [description]
198 * @param scratch [description]
199 * @return [description]
200 */
201 Status Read(uint64_t offset, size_t n, Slice* result,
202 char* scratch) const {
203 LOG_DEBUG("[IN]%i\n", (int)n);
204 librados::bufferlist buffer;
205 Status s;
206 int r = _io_ctx->read(_fid, buffer, n, offset);
207 if (r >= 0) {
208 buffer.begin().copy(r, scratch);
209 *result = Slice(scratch, r);
210 s = Status::OK();
211 } else {
212 s = err_to_status(r);
213 if (s == Status::IOError()) {
214 *result = Slice();
215 s = Status::OK();
216 }
217 }
218 LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
219 return s;
220 }
221
222 /**
223 * @brief [brief description]
224 * @details Get unique id for each file and guarantee this id is different for each file
225 *
226 * @param id [description]
227 * @param max_size max size of id, it shoud be larger than 16
228 *
229 * @return [description]
230 */
231 size_t GetUniqueId(char* id, size_t max_size) const {
232 // All fid has the same db_id prefix, so we need to ignore db_id prefix
233 size_t s = std::min(max_size, _fid.size());
234 strncpy(id, _fid.c_str() + (_fid.size() - s), s);
235 id[s - 1] = '\0';
236 return s;
237 };
238
239 //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
240 void Hint(AccessPattern pattern) {
241 /* Do nothing */
242 }
243
244 /**
245 * @brief noop
246 * @details [long description]
247 *
248 * @param offset [description]
249 * @param length [description]
250 *
251 * @return [description]
252 */
253 Status InvalidateCache(size_t offset, size_t length) {
254 return Status::OK();
255 }
256 };
257
258
259 // A file abstraction for sequential writing. The implementation
260 // must provide buffering since callers may append small fragments
261 // at a time to the file.
262 class LibradosWritableFile : public WritableFile {
263 librados::IoCtx * _io_ctx;
264 std::string _fid;
265 std::string _hint;
266 const EnvLibrados * const _env;
267
268 std::mutex _mutex; // used to protect modification of all following variables
269 librados::bufferlist _buffer; // write buffer
270 uint64_t _buffer_size; // write buffer size
271 uint64_t _file_size; // this file size doesn't include buffer size
272
273 /**
274 * @brief assuming caller holds lock
275 * @details [long description]
276 * @return [description]
277 */
278 int _SyncLocked() {
279 // 1. sync append data to RADOS
280 int r = _io_ctx->append(_fid, _buffer, _buffer_size);
281 assert(r >= 0);
282
283 // 2. update local variables
284 if (0 == r) {
285 _buffer.clear();
286 _file_size += _buffer_size;
287 _buffer_size = 0;
288 }
289
290 return r;
291 }
292
293 public:
294 LibradosWritableFile(librados::IoCtx * io_ctx,
295 std::string fid,
296 std::string hint,
297 const EnvLibrados * const env)
298 : _io_ctx(io_ctx), _fid(fid), _hint(hint), _env(env), _buffer(), _buffer_size(0), _file_size(0) {
299 int ret = _io_ctx->stat(_fid, &_file_size, nullptr);
300
301 // if file not exist
302 if (ret < 0) {
303 _file_size = 0;
304 }
305 }
306
307 ~LibradosWritableFile() {
308 // sync before closeing writable file
309 Sync();
310 }
311
312 /**
313 * @brief append data to file
314 * @details
315 * Append will save all written data in buffer util buffer size
316 * reaches buffer max size. Then, it will write buffer into rados
317 *
318 * @param data [description]
319 * @return [description]
320 */
321 Status Append(const Slice& data) {
322 // append buffer
323 LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data());
324 int r = 0;
325
326 std::lock_guard<std::mutex> lock(_mutex);
327 _buffer.append(data.data(), data.size());
328 _buffer_size += data.size();
329
330 if (_buffer_size > _env->_write_buffer_size) {
331 r = _SyncLocked();
332 }
333
334 LOG_DEBUG("[OUT] %i\n", r);
335 return err_to_status(r);
336 }
337
338 /**
339 * @brief not supported
340 * @details [long description]
341 * @return [description]
342 */
343 Status PositionedAppend(
344 const Slice& /* data */,
345 uint64_t /* offset */) {
346 return Status::NotSupported();
347 }
348
349 /**
350 * @brief truncate file to assigned size
351 * @details [long description]
352 *
353 * @param size [description]
354 * @return [description]
355 */
356 Status Truncate(uint64_t size) {
357 LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size);
358 int r = 0;
359
360 std::lock_guard<std::mutex> lock(_mutex);
361 if (_file_size > size) {
362 r = _io_ctx->trunc(_fid, size);
363
364 if (r == 0) {
365 _buffer.clear();
366 _buffer_size = 0;
367 _file_size = size;
368 }
369 } else if (_file_size == size) {
370 _buffer.clear();
371 _buffer_size = 0;
372 } else {
373 librados::bufferlist tmp;
374 tmp.claim(_buffer);
375 _buffer.substr_of(tmp, 0, size - _file_size);
376 _buffer_size = size - _file_size;
377 }
378
379 LOG_DEBUG("[OUT] %i\n", r);
380 return err_to_status(r);
381 }
382
383 /**
384 * @brief close file
385 * @details [long description]
386 * @return [description]
387 */
388 Status Close() {
389 LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size);
390 return Sync();
391 }
392
393 /**
394 * @brief flush file,
395 * @details initiate an aio write and not wait
396 *
397 * @return [description]
398 */
399 Status Flush() {
400 librados::AioCompletion *write_completion = librados::Rados::aio_create_completion();
401 int r = 0;
402
403 std::lock_guard<std::mutex> lock(_mutex);
404 r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size);
405
406 if (0 == r) {
407 _file_size += _buffer_size;
408 _buffer.clear();
409 _buffer_size = 0;
410 }
411
412 write_completion->release();
413
414 return err_to_status(r);
415 }
416
417 /**
418 * @brief write buffer data to rados
419 * @details initiate an aio write and wait for result
420 * @return [description]
421 */
422 Status Sync() { // sync data
423 int r = 0;
424
425 std::lock_guard<std::mutex> lock(_mutex);
426 if (_buffer_size > 0) {
427 r = _SyncLocked();
428 }
429
430 return err_to_status(r);
431 }
432
433 /**
434 * @brief [brief description]
435 * @details [long description]
436 * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
437 */
438 bool IsSyncThreadSafe() const {
439 return true;
440 }
441
442 /**
443 * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
444 * @details [long description]
445 * @return [description]
446 */
447 bool use_direct_io() const {
448 return false;
449 }
450
451 /**
452 * @brief Get file size
453 * @details
454 * This API will use cached file_size.
455 * @return [description]
456 */
457 uint64_t GetFileSize() {
458 LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size);
459
460 std::lock_guard<std::mutex> lock(_mutex);
461 int file_size = _file_size + _buffer_size;
462
463 return file_size;
464 }
465
466 /**
467 * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
468 * @details [long description]
469 *
470 * @param id [description]
471 * @param max_size [description]
472 *
473 * @return [description]
474 */
475 size_t GetUniqueId(char* id, size_t max_size) const {
476 // All fid has the same db_id prefix, so we need to ignore db_id prefix
477 size_t s = std::min(max_size, _fid.size());
478 strncpy(id, _fid.c_str() + (_fid.size() - s), s);
479 id[s - 1] = '\0';
480 return s;
481 }
482
483 /**
484 * @brief noop
485 * @details [long description]
486 *
487 * @param offset [description]
488 * @param length [description]
489 *
490 * @return [description]
491 */
492 Status InvalidateCache(size_t offset, size_t length) {
493 return Status::OK();
494 }
495
496 using WritableFile::RangeSync;
497 /**
498 * @brief No RangeSync support, just call Sync()
499 * @details [long description]
500 *
501 * @param offset [description]
502 * @param nbytes [description]
503 *
504 * @return [description]
505 */
506 Status RangeSync(off_t offset, off_t nbytes) {
507 return Sync();
508 }
509
510 protected:
511 using WritableFile::Allocate;
512 /**
513 * @brief noop
514 * @details [long description]
515 *
516 * @param offset [description]
517 * @param len [description]
518 *
519 * @return [description]
520 */
521 Status Allocate(off_t offset, off_t len) {
522 return Status::OK();
523 }
524 };
525
526
527 // Directory object represents collection of files and implements
528 // filesystem operations that can be executed on directories.
529 class LibradosDirectory : public Directory {
530 librados::IoCtx * _io_ctx;
531 std::string _fid;
532 public:
533 explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid):
534 _io_ctx(io_ctx), _fid(fid) {}
535
536 // Fsync directory. Can be called concurrently from multiple threads.
537 Status Fsync() {
538 return Status::OK();
539 }
540 };
541
542 // Identifies a locked file.
543 // This is exclusive lock and can't nested lock by same thread
544 class LibradosFileLock : public FileLock {
545 librados::IoCtx * _io_ctx;
546 const std::string _obj_name;
547 const std::string _lock_name;
548 const std::string _cookie;
549 int lock_state;
550 public:
551 LibradosFileLock(
552 librados::IoCtx * io_ctx,
553 const std::string obj_name):
554 _io_ctx(io_ctx),
555 _obj_name(obj_name),
556 _lock_name("lock_name"),
557 _cookie("cookie") {
558
559 // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
560 while (!_io_ctx->lock_exclusive(
561 _obj_name,
562 _lock_name,
563 _cookie,
564 "description", nullptr, 0));
565 }
566
567 ~LibradosFileLock() {
568 _io_ctx->unlock(_obj_name, _lock_name, _cookie);
569 }
570 };
571
572
573 // --------------------
574 // --- EnvLibrados ----
575 // --------------------
576 /**
577 * @brief EnvLibrados ctor
578 * @details [long description]
579 *
580 * @param db_name unique database name
581 * @param config_path the configure file path for rados
582 */
583 EnvLibrados::EnvLibrados(const std::string& db_name,
584 const std::string& config_path,
585 const std::string& db_pool)
586 : EnvLibrados("client.admin",
587 "ceph",
588 0,
589 db_name,
590 config_path,
591 db_pool,
592 "/wal",
593 db_pool,
594 1 << 20) {}
595
596 /**
597 * @brief EnvLibrados ctor
598 * @details [long description]
599 *
600 * @param client_name first 3 parameters is for RADOS client init
601 * @param cluster_name
602 * @param flags
603 * @param db_name unique database name, used as db_id key
604 * @param config_path the configure file path for rados
605 * @param db_pool the pool for db data
606 * @param wal_pool the pool for WAL data
607 * @param write_buffer_size WritableFile buffer max size
608 */
609 EnvLibrados::EnvLibrados(const std::string& client_name,
610 const std::string& cluster_name,
611 const uint64_t flags,
612 const std::string& db_name,
613 const std::string& config_path,
614 const std::string& db_pool,
615 const std::string& wal_dir,
616 const std::string& wal_pool,
617 const uint64_t write_buffer_size)
618 : EnvWrapper(Env::Default()),
619 _client_name(client_name),
620 _cluster_name(cluster_name),
621 _flags(flags),
622 _db_name(db_name),
623 _config_path(config_path),
624 _db_pool_name(db_pool),
625 _wal_dir(wal_dir),
626 _wal_pool_name(wal_pool),
627 _write_buffer_size(write_buffer_size) {
628 int ret = 0;
629
630 // 1. create a Rados object and initialize it
631 ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring
632 if (ret < 0) { // let's handle any error that might have come back
633 std::cerr << "couldn't initialize rados! error " << ret << std::endl;
634 ret = EXIT_FAILURE;
635 goto out;
636 }
637
638 // 2. read configure file
639 ret = _rados.conf_read_file(_config_path.c_str());
640 if (ret < 0) {
641 // This could fail if the config file is malformed, but it'd be hard.
642 std::cerr << "failed to parse config file " << _config_path
643 << "! error" << ret << std::endl;
644 ret = EXIT_FAILURE;
645 goto out;
646 }
647
648 // 3. we actually connect to the cluster
649 ret = _rados.connect();
650 if (ret < 0) {
651 std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
652 ret = EXIT_FAILURE;
653 goto out;
654 }
655
656 // 4. create db_pool if not exist
657 ret = _rados.pool_create(_db_pool_name.c_str());
658 if (ret < 0 && ret != -EEXIST && ret != -EPERM) {
659 std::cerr << "couldn't create pool! error " << ret << std::endl;
660 goto out;
661 }
662
663 // 5. create db_pool_ioctx
664 ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx);
665 if (ret < 0) {
666 std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
667 ret = EXIT_FAILURE;
668 goto out;
669 }
670
671 // 6. create wal_pool if not exist
672 ret = _rados.pool_create(_wal_pool_name.c_str());
673 if (ret < 0 && ret != -EEXIST && ret != -EPERM) {
674 std::cerr << "couldn't create pool! error " << ret << std::endl;
675 goto out;
676 }
677
678 // 7. create wal_pool_ioctx
679 ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx);
680 if (ret < 0) {
681 std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
682 ret = EXIT_FAILURE;
683 goto out;
684 }
685
686 // 8. add root dir
687 _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE);
688
689 out:
690 LOG_DEBUG("rados connect result code : %i\n", ret);
691 }
692
693 /****************************************************
694 private functions to handle fid operation.
695 Dir also have fid, but the value is DIR_ID_VALUE
696 ****************************************************/
697
698 /**
699 * @brief generate a new fid
700 * @details [long description]
701 * @return [description]
702 */
703 std::string EnvLibrados::_CreateFid() {
704 return _db_name + "." + GenerateUniqueId();
705 }
706
707 /**
708 * @brief get fid
709 * @details [long description]
710 *
711 * @param fname [description]
712 * @param fid [description]
713 *
714 * @return
715 * Status::OK()
716 * Status::NotFound()
717 */
718 Status EnvLibrados::_GetFid(
719 const std::string &fname,
720 std::string& fid) {
721 std::set<std::string> keys;
722 std::map<std::string, librados::bufferlist> kvs;
723 keys.insert(fname);
724 int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs);
725
726 if (0 == r && 0 == kvs.size()) {
727 return Status::NotFound();
728 } else if (0 == r && 0 != kvs.size()) {
729 fid.assign(kvs[fname].c_str(), kvs[fname].length());
730 return Status::OK();
731 } else {
732 return err_to_status(r);
733 }
734 }
735
736 /**
737 * @brief rename fid
738 * @details Only modify object in rados once,
739 * so this rename operation is atomic in term of rados
740 *
741 * @param old_fname [description]
742 * @param new_fname [description]
743 *
744 * @return [description]
745 */
746 Status EnvLibrados::_RenameFid(const std::string& old_fname,
747 const std::string& new_fname) {
748 std::string fid;
749 Status s = _GetFid(old_fname, fid);
750
751 if (Status::OK() != s) {
752 return s;
753 }
754
755 librados::bufferlist bl;
756 std::set<std::string> keys;
757 std::map<std::string, librados::bufferlist> kvs;
758 librados::ObjectWriteOperation o;
759 bl.append(fid);
760 keys.insert(old_fname);
761 kvs[new_fname] = bl;
762 o.omap_rm_keys(keys);
763 o.omap_set(kvs);
764 int r = _db_pool_ioctx.operate(_db_name, &o);
765 return err_to_status(r);
766 }
767
768 /**
769 * @brief add <file path, fid> to metadata object. It may overwrite exist key.
770 * @details [long description]
771 *
772 * @param fname [description]
773 * @param fid [description]
774 *
775 * @return [description]
776 */
777 Status EnvLibrados::_AddFid(
778 const std::string& fname,
779 const std::string& fid) {
780 std::map<std::string, librados::bufferlist> kvs;
781 librados::bufferlist value;
782 value.append(fid);
783 kvs[fname] = value;
784 int r = _db_pool_ioctx.omap_set(_db_name, kvs);
785 return err_to_status(r);
786 }
787
788 /**
789 * @brief return subfile names of dir.
790 * @details
791 * RocksDB has a 2-level structure, so all keys
792 * that have dir as prefix are subfiles of dir.
793 * So we can just return these files' name.
794 *
795 * @param dir [description]
796 * @param result [description]
797 *
798 * @return [description]
799 */
800 Status EnvLibrados::_GetSubFnames(
801 const std::string& dir,
802 std::vector<std::string> * result
803 ) {
804 std::string start_after(dir);
805 std::string filter_prefix(dir);
806 std::map<std::string, librados::bufferlist> kvs;
807 _db_pool_ioctx.omap_get_vals(_db_name,
808 start_after, filter_prefix,
809 MAX_ITEMS_IN_FS, &kvs);
810
811 result->clear();
812 for (auto i = kvs.begin(); i != kvs.end(); i++) {
813 result->push_back(i->first.substr(dir.size() + 1));
814 }
815 return Status::OK();
816 }
817
818 /**
819 * @brief delete key fname from metadata object
820 * @details [long description]
821 *
822 * @param fname [description]
823 * @return [description]
824 */
825 Status EnvLibrados::_DelFid(
826 const std::string& fname) {
827 std::set<std::string> keys;
828 keys.insert(fname);
829 int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys);
830 return err_to_status(r);
831 }
832
833 /**
834 * @brief get match IoCtx from _prefix_pool_map
835 * @details [long description]
836 *
837 * @param prefix [description]
838 * @return [description]
839 *
840 */
841 librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) {
842 auto is_prefix = [](const std::string & s1, const std::string & s2) {
843 auto it1 = s1.begin(), it2 = s2.begin();
844 while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2;
845 return it1 == s1.end();
846 };
847
848 if (is_prefix(_wal_dir, fpath)) {
849 return &_wal_pool_ioctx;
850 } else {
851 return &_db_pool_ioctx;
852 }
853 }
854
855 /************************************************************
856 public functions
857 ************************************************************/
858 /**
859 * @brief generate unique id
860 * @details Combine system time and random number.
861 * @return [description]
862 */
863 std::string EnvLibrados::GenerateUniqueId() {
864 Random64 r(time(nullptr));
865 uint64_t random_uuid_portion =
866 r.Uniform(std::numeric_limits<uint64_t>::max());
867 uint64_t nanos_uuid_portion = NowNanos();
868 char uuid2[200];
869 snprintf(uuid2,
870 200,
871 "%16lx-%16lx",
872 (unsigned long)nanos_uuid_portion,
873 (unsigned long)random_uuid_portion);
874 return uuid2;
875 }
876
877 /**
878 * @brief create a new sequential read file handler
879 * @details it will check the existence of fname
880 *
881 * @param fname [description]
882 * @param result [description]
883 * @param options [description]
884 * @return [description]
885 */
886 Status EnvLibrados::NewSequentialFile(
887 const std::string& fname,
888 std::unique_ptr<SequentialFile>* result,
889 const EnvOptions& options)
890 {
891 LOG_DEBUG("[IN]%s\n", fname.c_str());
892 std::string dir, file, fid;
893 split(fname, &dir, &file);
894 Status s;
895 std::string fpath = dir + "/" + file;
896 do {
897 s = _GetFid(dir, fid);
898
899 if (!s.ok() || fid != DIR_ID_VALUE) {
900 if (fid != DIR_ID_VALUE) s = Status::IOError();
901 break;
902 }
903
904 s = _GetFid(fpath, fid);
905
906 if (Status::NotFound() == s) {
907 s = Status::IOError();
908 errno = ENOENT;
909 break;
910 }
911
912 result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath));
913 s = Status::OK();
914 } while (0);
915
916 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
917 return s;
918 }
919
920 /**
921 * @brief create a new random access file handler
922 * @details it will check the existence of fname
923 *
924 * @param fname [description]
925 * @param result [description]
926 * @param options [description]
927 * @return [description]
928 */
929 Status EnvLibrados::NewRandomAccessFile(
930 const std::string& fname,
931 std::unique_ptr<RandomAccessFile>* result,
932 const EnvOptions& options)
933 {
934 LOG_DEBUG("[IN]%s\n", fname.c_str());
935 std::string dir, file, fid;
936 split(fname, &dir, &file);
937 Status s;
938 std::string fpath = dir + "/" + file;
939 do {
940 s = _GetFid(dir, fid);
941
942 if (!s.ok() || fid != DIR_ID_VALUE) {
943 s = Status::IOError();
944 break;
945 }
946
947 s = _GetFid(fpath, fid);
948
949 if (Status::NotFound() == s) {
950 s = Status::IOError();
951 errno = ENOENT;
952 break;
953 }
954
955 result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath));
956 s = Status::OK();
957 } while (0);
958
959 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
960 return s;
961 }
962
963 /**
964 * @brief create a new write file handler
965 * @details it will check the existence of fname
966 *
967 * @param fname [description]
968 * @param result [description]
969 * @param options [description]
970 * @return [description]
971 */
972 Status EnvLibrados::NewWritableFile(
973 const std::string& fname,
974 std::unique_ptr<WritableFile>* result,
975 const EnvOptions& options)
976 {
977 LOG_DEBUG("[IN]%s\n", fname.c_str());
978 std::string dir, file, fid;
979 split(fname, &dir, &file);
980 Status s;
981 std::string fpath = dir + "/" + file;
982
983 do {
984 // 1. check if dir exist
985 s = _GetFid(dir, fid);
986 if (!s.ok()) {
987 break;
988 }
989
990 if (fid != DIR_ID_VALUE) {
991 s = Status::IOError();
992 break;
993 }
994
995 // 2. check if file exist.
996 // 2.1 exist, use it
997 // 2.2 not exist, create it
998 s = _GetFid(fpath, fid);
999 if (Status::NotFound() == s) {
1000 fid = _CreateFid();
1001 _AddFid(fpath, fid);
1002 }
1003
1004 result->reset(new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this));
1005 s = Status::OK();
1006 } while (0);
1007
1008 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1009 return s;
1010 }
1011
1012 /**
1013 * @brief reuse write file handler
1014 * @details
1015 * This function will rename old_fname to new_fname,
1016 * then return the handler of new_fname
1017 *
1018 * @param new_fname [description]
1019 * @param old_fname [description]
1020 * @param result [description]
1021 * @param options [description]
1022 * @return [description]
1023 */
1024 Status EnvLibrados::ReuseWritableFile(
1025 const std::string& new_fname,
1026 const std::string& old_fname,
1027 std::unique_ptr<WritableFile>* result,
1028 const EnvOptions& options)
1029 {
1030 LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str());
1031 std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
1032 split(old_fname, &src_dir, &src_file);
1033 split(new_fname, &dst_dir, &dst_file);
1034
1035 std::string src_fpath = src_dir + "/" + src_file;
1036 std::string dst_fpath = dst_dir + "/" + dst_file;
1037 Status r = Status::OK();
1038 do {
1039 r = _RenameFid(src_fpath,
1040 dst_fpath);
1041 if (!r.ok()) {
1042 break;
1043 }
1044
1045 result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid, dst_fpath, this));
1046 } while (0);
1047
1048 LOG_DEBUG("[OUT]%s\n", r.ToString().c_str());
1049 return r;
1050 }
1051
1052 /**
1053 * @brief create a new directory handler
1054 * @details [long description]
1055 *
1056 * @param name [description]
1057 * @param result [description]
1058 *
1059 * @return [description]
1060 */
1061 Status EnvLibrados::NewDirectory(
1062 const std::string& name,
1063 std::unique_ptr<Directory>* result)
1064 {
1065 LOG_DEBUG("[IN]%s\n", name.c_str());
1066 std::string fid, dir, file;
1067 /* just want to get dir name */
1068 split(name + "/tmp", &dir, &file);
1069 Status s;
1070
1071 do {
1072 s = _GetFid(dir, fid);
1073
1074 if (!s.ok() || DIR_ID_VALUE != fid) {
1075 s = Status::IOError(name, strerror(-ENOENT));
1076 break;
1077 }
1078
1079 if (Status::NotFound() == s) {
1080 s = _AddFid(dir, DIR_ID_VALUE);
1081 if (!s.ok()) break;
1082 } else if (!s.ok()) {
1083 break;
1084 }
1085
1086 result->reset(new LibradosDirectory(_GetIoctx(dir), dir));
1087 s = Status::OK();
1088 } while (0);
1089
1090 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1091 return s;
1092 }
1093
1094 /**
1095 * @brief check if fname is exist
1096 * @details [long description]
1097 *
1098 * @param fname [description]
1099 * @return [description]
1100 */
1101 Status EnvLibrados::FileExists(const std::string& fname)
1102 {
1103 LOG_DEBUG("[IN]%s\n", fname.c_str());
1104 std::string fid, dir, file;
1105 split(fname, &dir, &file);
1106 Status s = _GetFid(dir + "/" + file, fid);
1107
1108 if (s.ok() && fid != DIR_ID_VALUE) {
1109 s = Status::OK();
1110 }
1111
1112 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1113 return s;
1114 }
1115
1116 /**
1117 * @brief get subfile name of dir_in
1118 * @details [long description]
1119 *
1120 * @param dir_in [description]
1121 * @param result [description]
1122 *
1123 * @return [description]
1124 */
1125 Status EnvLibrados::GetChildren(
1126 const std::string& dir_in,
1127 std::vector<std::string>* result)
1128 {
1129 LOG_DEBUG("[IN]%s\n", dir_in.c_str());
1130 std::string fid, dir, file;
1131 split(dir_in + "/temp", &dir, &file);
1132 Status s;
1133
1134 do {
1135 s = _GetFid(dir, fid);
1136 if (!s.ok()) {
1137 break;
1138 }
1139
1140 if (fid != DIR_ID_VALUE) {
1141 s = Status::IOError();
1142 break;
1143 }
1144
1145 s = _GetSubFnames(dir, result);
1146 } while (0);
1147
1148 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1149 return s;
1150 }
1151
1152 /**
1153 * @brief delete fname
1154 * @details [long description]
1155 *
1156 * @param fname [description]
1157 * @return [description]
1158 */
1159 Status EnvLibrados::DeleteFile(const std::string& fname)
1160 {
1161 LOG_DEBUG("[IN]%s\n", fname.c_str());
1162 std::string fid, dir, file;
1163 split(fname, &dir, &file);
1164 Status s = _GetFid(dir + "/" + file, fid);
1165
1166 if (s.ok() && DIR_ID_VALUE != fid) {
1167 s = _DelFid(dir + "/" + file);
1168 } else {
1169 s = Status::NotFound();
1170 }
1171 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1172 return s;
1173 }
1174
1175 /**
1176 * @brief create new dir
1177 * @details [long description]
1178 *
1179 * @param dirname [description]
1180 * @return [description]
1181 */
1182 Status EnvLibrados::CreateDir(const std::string& dirname)
1183 {
1184 LOG_DEBUG("[IN]%s\n", dirname.c_str());
1185 std::string fid, dir, file;
1186 split(dirname + "/temp", &dir, &file);
1187 Status s = _GetFid(dir + "/" + file, fid);
1188
1189 do {
1190 if (Status::NotFound() != s && fid != DIR_ID_VALUE) {
1191 break;
1192 } else if (Status::OK() == s && fid == DIR_ID_VALUE) {
1193 break;
1194 }
1195
1196 s = _AddFid(dir, DIR_ID_VALUE);
1197 } while (0);
1198
1199 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1200 return s;
1201 }
1202
1203 /**
1204 * @brief create dir if missing
1205 * @details [long description]
1206 *
1207 * @param dirname [description]
1208 * @return [description]
1209 */
1210 Status EnvLibrados::CreateDirIfMissing(const std::string& dirname)
1211 {
1212 LOG_DEBUG("[IN]%s\n", dirname.c_str());
1213 std::string fid, dir, file;
1214 split(dirname + "/temp", &dir, &file);
1215 Status s = Status::OK();
1216
1217 do {
1218 s = _GetFid(dir, fid);
1219 if (Status::NotFound() != s) {
1220 break;
1221 }
1222
1223 s = _AddFid(dir, DIR_ID_VALUE);
1224 } while (0);
1225
1226 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1227 return s;
1228 }
1229
1230 /**
1231 * @brief delete dir
1232 * @details
1233 *
1234 * @param dirname [description]
1235 * @return [description]
1236 */
1237 Status EnvLibrados::DeleteDir(const std::string& dirname)
1238 {
1239 LOG_DEBUG("[IN]%s\n", dirname.c_str());
1240 std::string fid, dir, file;
1241 split(dirname + "/temp", &dir, &file);
1242 Status s = Status::OK();
1243
1244 s = _GetFid(dir, fid);
1245
1246 if (s.ok() && DIR_ID_VALUE == fid) {
1247 std::vector<std::string> subs;
1248 s = _GetSubFnames(dir, &subs);
1249 // if subfiles exist, can't delete dir
1250 if (subs.size() > 0) {
1251 s = Status::IOError();
1252 } else {
1253 s = _DelFid(dir);
1254 }
1255 } else {
1256 s = Status::NotFound();
1257 }
1258
1259 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1260 return s;
1261 }
1262
1263 /**
1264 * @brief return file size
1265 * @details [long description]
1266 *
1267 * @param fname [description]
1268 * @param file_size [description]
1269 *
1270 * @return [description]
1271 */
1272 Status EnvLibrados::GetFileSize(
1273 const std::string& fname,
1274 uint64_t* file_size)
1275 {
1276 LOG_DEBUG("[IN]%s\n", fname.c_str());
1277 std::string fid, dir, file;
1278 split(fname, &dir, &file);
1279 time_t mtime;
1280 Status s;
1281
1282 do {
1283 std::string fpath = dir + "/" + file;
1284 s = _GetFid(fpath, fid);
1285
1286 if (!s.ok()) {
1287 break;
1288 }
1289
1290 int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime);
1291 if (ret < 0) {
1292 LOG_DEBUG("%i\n", ret);
1293 if (-ENOENT == ret) {
1294 *file_size = 0;
1295 s = Status::OK();
1296 } else {
1297 s = err_to_status(ret);
1298 }
1299 } else {
1300 s = Status::OK();
1301 }
1302 } while (0);
1303
1304 LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size);
1305 return s;
1306 }
1307
1308 /**
1309 * @brief get file modification time
1310 * @details [long description]
1311 *
1312 * @param fname [description]
1313 * @param file_mtime [description]
1314 *
1315 * @return [description]
1316 */
1317 Status EnvLibrados::GetFileModificationTime(const std::string& fname,
1318 uint64_t* file_mtime)
1319 {
1320 LOG_DEBUG("[IN]%s\n", fname.c_str());
1321 std::string fid, dir, file;
1322 split(fname, &dir, &file);
1323 time_t mtime;
1324 uint64_t file_size;
1325 Status s = Status::OK();
1326 do {
1327 std::string fpath = dir + "/" + file;
1328 s = _GetFid(dir + "/" + file, fid);
1329
1330 if (!s.ok()) {
1331 break;
1332 }
1333
1334 int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime);
1335 if (ret < 0) {
1336 if (Status::NotFound() == err_to_status(ret)) {
1337 *file_mtime = static_cast<uint64_t>(mtime);
1338 s = Status::OK();
1339 } else {
1340 s = err_to_status(ret);
1341 }
1342 } else {
1343 s = Status::OK();
1344 }
1345 } while (0);
1346
1347 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1348 return s;
1349 }
1350
1351 /**
1352 * @brief rename file
1353 * @details
1354 *
1355 * @param src [description]
1356 * @param target_in [description]
1357 *
1358 * @return [description]
1359 */
1360 Status EnvLibrados::RenameFile(
1361 const std::string& src,
1362 const std::string& target_in)
1363 {
1364 LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str());
1365 std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
1366 split(src, &src_dir, &src_file);
1367 split(target_in, &dst_dir, &dst_file);
1368
1369 auto s = _RenameFid(src_dir + "/" + src_file,
1370 dst_dir + "/" + dst_file);
1371 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1372 return s;
1373 }
1374
1375 /**
1376 * @brief not support
1377 * @details [long description]
1378 *
1379 * @param src [description]
1380 * @param target_in [description]
1381 *
1382 * @return [description]
1383 */
1384 Status EnvLibrados::LinkFile(
1385 const std::string& src,
1386 const std::string& target_in)
1387 {
1388 LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str());
1389 return Status::NotSupported();
1390 }
1391
1392 /**
1393 * @brief lock file. create if missing.
1394 * @details [long description]
1395 *
1396 * It seems that LockFile is used for preventing other instance of RocksDB
1397 * from opening up the database at the same time. From RocksDB source code,
1398 * the invokes of LockFile are at following locations:
1399 *
1400 * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover
1401 * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB
1402 *
1403 * When db recovery and db destroy, RocksDB will call LockFile
1404 *
1405 * @param fname [description]
1406 * @param lock [description]
1407 *
1408 * @return [description]
1409 */
1410 Status EnvLibrados::LockFile(
1411 const std::string& fname,
1412 FileLock** lock)
1413 {
1414 LOG_DEBUG("[IN]%s\n", fname.c_str());
1415 std::string fid, dir, file;
1416 split(fname, &dir, &file);
1417 Status s = Status::OK();
1418
1419 do {
1420 std::string fpath = dir + "/" + file;
1421 s = _GetFid(fpath, fid);
1422
1423 if (Status::OK() != s &&
1424 Status::NotFound() != s) {
1425 break;
1426 } else if (Status::NotFound() == s) {
1427 s = _AddFid(fpath, _CreateFid());
1428 if (!s.ok()) {
1429 break;
1430 }
1431 } else if (Status::OK() == s && DIR_ID_VALUE == fid) {
1432 s = Status::IOError();
1433 break;
1434 }
1435
1436 *lock = new LibradosFileLock(_GetIoctx(fpath), fpath);
1437 } while (0);
1438
1439 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1440 return s;
1441 }
1442
1443 /**
1444 * @brief unlock file
1445 * @details [long description]
1446 *
1447 * @param lock [description]
1448 * @return [description]
1449 */
1450 Status EnvLibrados::UnlockFile(FileLock* lock)
1451 {
1452 LOG_DEBUG("[IO]%p\n", lock);
1453 if (nullptr != lock) {
1454 delete lock;
1455 }
1456 return Status::OK();
1457 }
1458
1459
1460 /**
1461 * @brief not support
1462 * @details [long description]
1463 *
1464 * @param db_path [description]
1465 * @param output_path [description]
1466 *
1467 * @return [description]
1468 */
1469 Status EnvLibrados::GetAbsolutePath(
1470 const std::string& db_path,
1471 std::string* output_path)
1472 {
1473 LOG_DEBUG("[IO]%s\n", db_path.c_str());
1474 return Status::NotSupported();
1475 }
1476
1477 /**
1478 * @brief Get default EnvLibrados
1479 * @details [long description]
1480 * @return [description]
1481 */
1482 EnvLibrados* EnvLibrados::Default() {
1483 static EnvLibrados default_env(default_db_name,
1484 std::getenv(default_config_path),
1485 default_pool_name);
1486 return &default_env;
1487 }
1488 // @lint-ignore TXT4 T25377293 Grandfathered in
1489 }