]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/env_librados.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rocksdb / utilities / env_librados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 // Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
4 #include "rocksdb/utilities/env_librados.h"
5 #include "util/random.h"
6 #include <mutex>
7 #include <cstdlib>
8
9 namespace ROCKSDB_NAMESPACE {
10 /* GLOBAL DIFINE */
11 // #define DEBUG
12 #ifdef DEBUG
13 #include <cstdio>
14 #include <sys/syscall.h>
15 #include <unistd.h>
16 #define LOG_DEBUG(...) do{\
17 printf("[%ld:%s:%i:%s]", syscall(SYS_gettid), __FILE__, __LINE__, __FUNCTION__);\
18 printf(__VA_ARGS__);\
19 }while(0)
20 #else
21 #define LOG_DEBUG(...)
22 #endif
23
24 /* GLOBAL CONSTANT */
25 const char *default_db_name = "default_envlibrados_db";
26 const char *default_pool_name = "default_envlibrados_pool";
27 const char *default_config_path = "CEPH_CONFIG_PATH"; // the env variable name of ceph configure file
28 // maximum dir/file that can store in the fs
29 const int MAX_ITEMS_IN_FS = 1 << 30;
30 // root dir tag
31 const std::string ROOT_DIR_KEY = "/";
32 const std::string DIR_ID_VALUE = "<DIR>";
33
34 /**
35 * @brief convert error code to status
36 * @details Convert internal linux error code to Status
37 *
38 * @param r [description]
39 * @return [description]
40 */
41 Status err_to_status(int r)
42 {
43 switch (r) {
44 case 0:
45 return Status::OK();
46 case -ENOENT:
47 return Status::IOError();
48 case -ENODATA:
49 case -ENOTDIR:
50 return Status::NotFound(Status::kNone);
51 case -EINVAL:
52 return Status::InvalidArgument(Status::kNone);
53 case -EIO:
54 return Status::IOError(Status::kNone);
55 default:
56 // FIXME :(
57 assert(0 == "unrecognized error code");
58 return Status::NotSupported(Status::kNone);
59 }
60 }
61
62 /**
63 * @brief split file path into dir path and file name
64 * @details
65 * Because rocksdb only need a 2-level structure (dir/file), all input path will be shortened to dir/file format
66 * For example:
67 * b/c => dir '/b', file 'c'
68 * /a/b/c => dir '/b', file 'c'
69 *
70 * @param fn [description]
71 * @param dir [description]
72 * @param file [description]
73 */
74 void split(const std::string &fn, std::string *dir, std::string *file) {
75 LOG_DEBUG("[IN]%s\n", fn.c_str());
76 int pos = fn.size() - 1;
77 while ('/' == fn[pos]) --pos;
78 size_t fstart = fn.rfind('/', pos);
79 *file = fn.substr(fstart + 1, pos - fstart);
80
81 pos = fstart;
82 while (pos >= 0 && '/' == fn[pos]) --pos;
83
84 if (pos < 0) {
85 *dir = "/";
86 } else {
87 size_t dstart = fn.rfind('/', pos);
88 *dir = fn.substr(dstart + 1, pos - dstart);
89 *dir = std::string("/") + *dir;
90 }
91
92 LOG_DEBUG("[OUT]%s | %s\n", dir->c_str(), file->c_str());
93 }
94
95 // A file abstraction for reading sequentially through a file
96 class LibradosSequentialFile : public SequentialFile {
97 librados::IoCtx * _io_ctx;
98 std::string _fid;
99 std::string _hint;
100 int _offset;
101 public:
102 LibradosSequentialFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
103 _io_ctx(io_ctx), _fid(fid), _hint(hint), _offset(0) {}
104
105 ~LibradosSequentialFile() {}
106
107 /**
108 * @brief read file
109 * @details
110 * Read up to "n" bytes from the file. "scratch[0..n-1]" may be
111 * written by this routine. Sets "*result" to the data that was
112 * read (including if fewer than "n" bytes were successfully read).
113 * May set "*result" to point at data in "scratch[0..n-1]", so
114 * "scratch[0..n-1]" must be live when "*result" is used.
115 * If an error was encountered, returns a non-OK status.
116 *
117 * REQUIRES: External synchronization
118 *
119 * @param n [description]
120 * @param result [description]
121 * @param scratch [description]
122 * @return [description]
123 */
124 Status Read(size_t n, Slice* result, char* scratch) {
125 LOG_DEBUG("[IN]%i\n", (int)n);
126 librados::bufferlist buffer;
127 Status s;
128 int r = _io_ctx->read(_fid, buffer, n, _offset);
129 if (r >= 0) {
130 buffer.begin().copy(r, scratch);
131 *result = Slice(scratch, r);
132 _offset += r;
133 s = Status::OK();
134 } else {
135 s = err_to_status(r);
136 if (s == Status::IOError()) {
137 *result = Slice();
138 s = Status::OK();
139 }
140 }
141 LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
142 return s;
143 }
144
145 /**
146 * @brief skip "n" bytes from the file
147 * @details
148 * Skip "n" bytes from the file. This is guaranteed to be no
149 * slower that reading the same data, but may be faster.
150 *
151 * If end of file is reached, skipping will stop at the end of the
152 * file, and Skip will return OK.
153 *
154 * REQUIRES: External synchronization
155 *
156 * @param n [description]
157 * @return [description]
158 */
159 Status Skip(uint64_t n) {
160 _offset += n;
161 return Status::OK();
162 }
163
164 /**
165 * @brief noop
166 * @details
167 * rocksdb has it's own caching capabilities that we should be able to use,
168 * without relying on a cache here. This can safely be a no-op.
169 *
170 * @param offset [description]
171 * @param length [description]
172 *
173 * @return [description]
174 */
175 Status InvalidateCache(size_t offset, size_t length) {
176 return Status::OK();
177 }
178 };
179
180 // A file abstraction for randomly reading the contents of a file.
181 class LibradosRandomAccessFile : public RandomAccessFile {
182 librados::IoCtx * _io_ctx;
183 std::string _fid;
184 std::string _hint;
185 public:
186 LibradosRandomAccessFile(librados::IoCtx * io_ctx, std::string fid, std::string hint):
187 _io_ctx(io_ctx), _fid(fid), _hint(hint) {}
188
189 ~LibradosRandomAccessFile() {}
190
191 /**
192 * @brief read file
193 * @details similar to LibradosSequentialFile::Read
194 *
195 * @param offset [description]
196 * @param n [description]
197 * @param result [description]
198 * @param scratch [description]
199 * @return [description]
200 */
201 Status Read(uint64_t offset, size_t n, Slice* result,
202 char* scratch) const {
203 LOG_DEBUG("[IN]%i\n", (int)n);
204 librados::bufferlist buffer;
205 Status s;
206 int r = _io_ctx->read(_fid, buffer, n, offset);
207 if (r >= 0) {
208 buffer.begin().copy(r, scratch);
209 *result = Slice(scratch, r);
210 s = Status::OK();
211 } else {
212 s = err_to_status(r);
213 if (s == Status::IOError()) {
214 *result = Slice();
215 s = Status::OK();
216 }
217 }
218 LOG_DEBUG("[OUT]%s, %i, %s\n", s.ToString().c_str(), (int)r, buffer.c_str());
219 return s;
220 }
221
222 /**
223 * @brief [brief description]
224 * @details Get unique id for each file and guarantee this id is different for each file
225 *
226 * @param id [description]
227 * @param max_size max size of id, it shoud be larger than 16
228 *
229 * @return [description]
230 */
231 size_t GetUniqueId(char* id, size_t max_size) const {
232 // All fid has the same db_id prefix, so we need to ignore db_id prefix
233 size_t s = std::min(max_size, _fid.size());
234 strncpy(id, _fid.c_str() + (_fid.size() - s), s);
235 id[s - 1] = '\0';
236 return s;
237 };
238
239 //enum AccessPattern { NORMAL, RANDOM, SEQUENTIAL, WILLNEED, DONTNEED };
240 void Hint(AccessPattern pattern) {
241 /* Do nothing */
242 }
243
244 /**
245 * @brief noop
246 * @details [long description]
247 *
248 * @param offset [description]
249 * @param length [description]
250 *
251 * @return [description]
252 */
253 Status InvalidateCache(size_t offset, size_t length) {
254 return Status::OK();
255 }
256 };
257
258
259 // A file abstraction for sequential writing. The implementation
260 // must provide buffering since callers may append small fragments
261 // at a time to the file.
262 class LibradosWritableFile : public WritableFile {
263 librados::IoCtx * _io_ctx;
264 std::string _fid;
265 std::string _hint;
266 const EnvLibrados * const _env;
267
268 std::mutex _mutex; // used to protect modification of all following variables
269 librados::bufferlist _buffer; // write buffer
270 uint64_t _buffer_size; // write buffer size
271 uint64_t _file_size; // this file size doesn't include buffer size
272
273 /**
274 * @brief assuming caller holds lock
275 * @details [long description]
276 * @return [description]
277 */
278 int _SyncLocked() {
279 // 1. sync append data to RADOS
280 int r = _io_ctx->append(_fid, _buffer, _buffer_size);
281 assert(r >= 0);
282
283 // 2. update local variables
284 if (0 == r) {
285 _buffer.clear();
286 _file_size += _buffer_size;
287 _buffer_size = 0;
288 }
289
290 return r;
291 }
292
293 public:
294 LibradosWritableFile(librados::IoCtx* io_ctx, std::string fid,
295 std::string hint, const EnvLibrados* const env,
296 const EnvOptions& options)
297 : WritableFile(options),
298 _io_ctx(io_ctx),
299 _fid(fid),
300 _hint(hint),
301 _env(env),
302 _buffer(),
303 _buffer_size(0),
304 _file_size(0) {
305 int ret = _io_ctx->stat(_fid, &_file_size, nullptr);
306
307 // if file not exist
308 if (ret < 0) {
309 _file_size = 0;
310 }
311 }
312
313 ~LibradosWritableFile() {
314 // sync before closeing writable file
315 Sync();
316 }
317
318 /**
319 * @brief append data to file
320 * @details
321 * Append will save all written data in buffer util buffer size
322 * reaches buffer max size. Then, it will write buffer into rados
323 *
324 * @param data [description]
325 * @return [description]
326 */
327 Status Append(const Slice& data) {
328 // append buffer
329 LOG_DEBUG("[IN] %i | %s\n", (int)data.size(), data.data());
330 int r = 0;
331
332 std::lock_guard<std::mutex> lock(_mutex);
333 _buffer.append(data.data(), data.size());
334 _buffer_size += data.size();
335
336 if (_buffer_size > _env->_write_buffer_size) {
337 r = _SyncLocked();
338 }
339
340 LOG_DEBUG("[OUT] %i\n", r);
341 return err_to_status(r);
342 }
343
344 /**
345 * @brief not supported
346 * @details [long description]
347 * @return [description]
348 */
349 Status PositionedAppend(
350 const Slice& /* data */,
351 uint64_t /* offset */) {
352 return Status::NotSupported();
353 }
354
355 /**
356 * @brief truncate file to assigned size
357 * @details [long description]
358 *
359 * @param size [description]
360 * @return [description]
361 */
362 Status Truncate(uint64_t size) {
363 LOG_DEBUG("[IN]%lld|%lld|%lld\n", (long long)size, (long long)_file_size, (long long)_buffer_size);
364 int r = 0;
365
366 std::lock_guard<std::mutex> lock(_mutex);
367 if (_file_size > size) {
368 r = _io_ctx->trunc(_fid, size);
369
370 if (r == 0) {
371 _buffer.clear();
372 _buffer_size = 0;
373 _file_size = size;
374 }
375 } else if (_file_size == size) {
376 _buffer.clear();
377 _buffer_size = 0;
378 } else {
379 librados::bufferlist tmp;
380 tmp.claim(_buffer);
381 _buffer.substr_of(tmp, 0, size - _file_size);
382 _buffer_size = size - _file_size;
383 }
384
385 LOG_DEBUG("[OUT] %i\n", r);
386 return err_to_status(r);
387 }
388
389 /**
390 * @brief close file
391 * @details [long description]
392 * @return [description]
393 */
394 Status Close() {
395 LOG_DEBUG("%s | %lld | %lld\n", _hint.c_str(), (long long)_buffer_size, (long long)_file_size);
396 return Sync();
397 }
398
399 /**
400 * @brief flush file,
401 * @details initiate an aio write and not wait
402 *
403 * @return [description]
404 */
405 Status Flush() {
406 librados::AioCompletion *write_completion = librados::Rados::aio_create_completion();
407 int r = 0;
408
409 std::lock_guard<std::mutex> lock(_mutex);
410 r = _io_ctx->aio_append(_fid, write_completion, _buffer, _buffer_size);
411
412 if (0 == r) {
413 _file_size += _buffer_size;
414 _buffer.clear();
415 _buffer_size = 0;
416 }
417
418 write_completion->release();
419
420 return err_to_status(r);
421 }
422
423 /**
424 * @brief write buffer data to rados
425 * @details initiate an aio write and wait for result
426 * @return [description]
427 */
428 Status Sync() { // sync data
429 int r = 0;
430
431 std::lock_guard<std::mutex> lock(_mutex);
432 if (_buffer_size > 0) {
433 r = _SyncLocked();
434 }
435
436 return err_to_status(r);
437 }
438
439 /**
440 * @brief [brief description]
441 * @details [long description]
442 * @return true if Sync() and Fsync() are safe to call concurrently with Append()and Flush().
443 */
444 bool IsSyncThreadSafe() const {
445 return true;
446 }
447
448 /**
449 * @brief Indicates the upper layers if the current WritableFile implementation uses direct IO.
450 * @details [long description]
451 * @return [description]
452 */
453 bool use_direct_io() const {
454 return false;
455 }
456
457 /**
458 * @brief Get file size
459 * @details
460 * This API will use cached file_size.
461 * @return [description]
462 */
463 uint64_t GetFileSize() {
464 LOG_DEBUG("%lld|%lld\n", (long long)_buffer_size, (long long)_file_size);
465
466 std::lock_guard<std::mutex> lock(_mutex);
467 int file_size = _file_size + _buffer_size;
468
469 return file_size;
470 }
471
472 /**
473 * @brief For documentation, refer to RandomAccessFile::GetUniqueId()
474 * @details [long description]
475 *
476 * @param id [description]
477 * @param max_size [description]
478 *
479 * @return [description]
480 */
481 size_t GetUniqueId(char* id, size_t max_size) const {
482 // All fid has the same db_id prefix, so we need to ignore db_id prefix
483 size_t s = std::min(max_size, _fid.size());
484 strncpy(id, _fid.c_str() + (_fid.size() - s), s);
485 id[s - 1] = '\0';
486 return s;
487 }
488
489 /**
490 * @brief noop
491 * @details [long description]
492 *
493 * @param offset [description]
494 * @param length [description]
495 *
496 * @return [description]
497 */
498 Status InvalidateCache(size_t offset, size_t length) {
499 return Status::OK();
500 }
501
502 using WritableFile::RangeSync;
503 /**
504 * @brief No RangeSync support, just call Sync()
505 * @details [long description]
506 *
507 * @param offset [description]
508 * @param nbytes [description]
509 *
510 * @return [description]
511 */
512 Status RangeSync(off_t offset, off_t nbytes) {
513 return Sync();
514 }
515
516 protected:
517 using WritableFile::Allocate;
518 /**
519 * @brief noop
520 * @details [long description]
521 *
522 * @param offset [description]
523 * @param len [description]
524 *
525 * @return [description]
526 */
527 Status Allocate(off_t offset, off_t len) {
528 return Status::OK();
529 }
530 };
531
532
533 // Directory object represents collection of files and implements
534 // filesystem operations that can be executed on directories.
535 class LibradosDirectory : public Directory {
536 librados::IoCtx * _io_ctx;
537 std::string _fid;
538 public:
539 explicit LibradosDirectory(librados::IoCtx * io_ctx, std::string fid):
540 _io_ctx(io_ctx), _fid(fid) {}
541
542 // Fsync directory. Can be called concurrently from multiple threads.
543 Status Fsync() {
544 return Status::OK();
545 }
546 };
547
548 // Identifies a locked file.
549 // This is exclusive lock and can't nested lock by same thread
550 class LibradosFileLock : public FileLock {
551 librados::IoCtx * _io_ctx;
552 const std::string _obj_name;
553 const std::string _lock_name;
554 const std::string _cookie;
555 int lock_state;
556 public:
557 LibradosFileLock(
558 librados::IoCtx * io_ctx,
559 const std::string obj_name):
560 _io_ctx(io_ctx),
561 _obj_name(obj_name),
562 _lock_name("lock_name"),
563 _cookie("cookie") {
564
565 // TODO: the lock will never expire. It may cause problem if the process crash or abnormally exit.
566 while (!_io_ctx->lock_exclusive(
567 _obj_name,
568 _lock_name,
569 _cookie,
570 "description", nullptr, 0));
571 }
572
573 ~LibradosFileLock() {
574 _io_ctx->unlock(_obj_name, _lock_name, _cookie);
575 }
576 };
577
578
579 // --------------------
580 // --- EnvLibrados ----
581 // --------------------
582 /**
583 * @brief EnvLibrados ctor
584 * @details [long description]
585 *
586 * @param db_name unique database name
587 * @param config_path the configure file path for rados
588 */
589 EnvLibrados::EnvLibrados(const std::string& db_name,
590 const std::string& config_path,
591 const std::string& db_pool)
592 : EnvLibrados("client.admin",
593 "ceph",
594 0,
595 db_name,
596 config_path,
597 db_pool,
598 "/wal",
599 db_pool,
600 1 << 20) {}
601
602 /**
603 * @brief EnvLibrados ctor
604 * @details [long description]
605 *
606 * @param client_name first 3 parameters is for RADOS client init
607 * @param cluster_name
608 * @param flags
609 * @param db_name unique database name, used as db_id key
610 * @param config_path the configure file path for rados
611 * @param db_pool the pool for db data
612 * @param wal_pool the pool for WAL data
613 * @param write_buffer_size WritableFile buffer max size
614 */
615 EnvLibrados::EnvLibrados(const std::string& client_name,
616 const std::string& cluster_name,
617 const uint64_t flags,
618 const std::string& db_name,
619 const std::string& config_path,
620 const std::string& db_pool,
621 const std::string& wal_dir,
622 const std::string& wal_pool,
623 const uint64_t write_buffer_size)
624 : EnvWrapper(Env::Default()),
625 _client_name(client_name),
626 _cluster_name(cluster_name),
627 _flags(flags),
628 _db_name(db_name),
629 _config_path(config_path),
630 _db_pool_name(db_pool),
631 _wal_dir(wal_dir),
632 _wal_pool_name(wal_pool),
633 _write_buffer_size(write_buffer_size) {
634 int ret = 0;
635
636 // 1. create a Rados object and initialize it
637 ret = _rados.init2(_client_name.c_str(), _cluster_name.c_str(), _flags); // just use the client.admin keyring
638 if (ret < 0) { // let's handle any error that might have come back
639 std::cerr << "couldn't initialize rados! error " << ret << std::endl;
640 ret = EXIT_FAILURE;
641 goto out;
642 }
643
644 // 2. read configure file
645 ret = _rados.conf_read_file(_config_path.c_str());
646 if (ret < 0) {
647 // This could fail if the config file is malformed, but it'd be hard.
648 std::cerr << "failed to parse config file " << _config_path
649 << "! error" << ret << std::endl;
650 ret = EXIT_FAILURE;
651 goto out;
652 }
653
654 // 3. we actually connect to the cluster
655 ret = _rados.connect();
656 if (ret < 0) {
657 std::cerr << "couldn't connect to cluster! error " << ret << std::endl;
658 ret = EXIT_FAILURE;
659 goto out;
660 }
661
662 // 4. create db_pool if not exist
663 ret = _rados.pool_create(_db_pool_name.c_str());
664 if (ret < 0 && ret != -EEXIST && ret != -EPERM) {
665 std::cerr << "couldn't create pool! error " << ret << std::endl;
666 goto out;
667 }
668
669 // 5. create db_pool_ioctx
670 ret = _rados.ioctx_create(_db_pool_name.c_str(), _db_pool_ioctx);
671 if (ret < 0) {
672 std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
673 ret = EXIT_FAILURE;
674 goto out;
675 }
676
677 // 6. create wal_pool if not exist
678 ret = _rados.pool_create(_wal_pool_name.c_str());
679 if (ret < 0 && ret != -EEXIST && ret != -EPERM) {
680 std::cerr << "couldn't create pool! error " << ret << std::endl;
681 goto out;
682 }
683
684 // 7. create wal_pool_ioctx
685 ret = _rados.ioctx_create(_wal_pool_name.c_str(), _wal_pool_ioctx);
686 if (ret < 0) {
687 std::cerr << "couldn't set up ioctx! error " << ret << std::endl;
688 ret = EXIT_FAILURE;
689 goto out;
690 }
691
692 // 8. add root dir
693 _AddFid(ROOT_DIR_KEY, DIR_ID_VALUE);
694
695 out:
696 LOG_DEBUG("rados connect result code : %i\n", ret);
697 }
698
699 /****************************************************
700 private functions to handle fid operation.
701 Dir also have fid, but the value is DIR_ID_VALUE
702 ****************************************************/
703
704 /**
705 * @brief generate a new fid
706 * @details [long description]
707 * @return [description]
708 */
709 std::string EnvLibrados::_CreateFid() {
710 return _db_name + "." + GenerateUniqueId();
711 }
712
713 /**
714 * @brief get fid
715 * @details [long description]
716 *
717 * @param fname [description]
718 * @param fid [description]
719 *
720 * @return
721 * Status::OK()
722 * Status::NotFound()
723 */
724 Status EnvLibrados::_GetFid(
725 const std::string &fname,
726 std::string& fid) {
727 std::set<std::string> keys;
728 std::map<std::string, librados::bufferlist> kvs;
729 keys.insert(fname);
730 int r = _db_pool_ioctx.omap_get_vals_by_keys(_db_name, keys, &kvs);
731
732 if (0 == r && 0 == kvs.size()) {
733 return Status::NotFound();
734 } else if (0 == r && 0 != kvs.size()) {
735 fid.assign(kvs[fname].c_str(), kvs[fname].length());
736 return Status::OK();
737 } else {
738 return err_to_status(r);
739 }
740 }
741
742 /**
743 * @brief rename fid
744 * @details Only modify object in rados once,
745 * so this rename operation is atomic in term of rados
746 *
747 * @param old_fname [description]
748 * @param new_fname [description]
749 *
750 * @return [description]
751 */
752 Status EnvLibrados::_RenameFid(const std::string& old_fname,
753 const std::string& new_fname) {
754 std::string fid;
755 Status s = _GetFid(old_fname, fid);
756
757 if (Status::OK() != s) {
758 return s;
759 }
760
761 librados::bufferlist bl;
762 std::set<std::string> keys;
763 std::map<std::string, librados::bufferlist> kvs;
764 librados::ObjectWriteOperation o;
765 bl.append(fid);
766 keys.insert(old_fname);
767 kvs[new_fname] = bl;
768 o.omap_rm_keys(keys);
769 o.omap_set(kvs);
770 int r = _db_pool_ioctx.operate(_db_name, &o);
771 return err_to_status(r);
772 }
773
774 /**
775 * @brief add <file path, fid> to metadata object. It may overwrite exist key.
776 * @details [long description]
777 *
778 * @param fname [description]
779 * @param fid [description]
780 *
781 * @return [description]
782 */
783 Status EnvLibrados::_AddFid(
784 const std::string& fname,
785 const std::string& fid) {
786 std::map<std::string, librados::bufferlist> kvs;
787 librados::bufferlist value;
788 value.append(fid);
789 kvs[fname] = value;
790 int r = _db_pool_ioctx.omap_set(_db_name, kvs);
791 return err_to_status(r);
792 }
793
794 /**
795 * @brief return subfile names of dir.
796 * @details
797 * RocksDB has a 2-level structure, so all keys
798 * that have dir as prefix are subfiles of dir.
799 * So we can just return these files' name.
800 *
801 * @param dir [description]
802 * @param result [description]
803 *
804 * @return [description]
805 */
806 Status EnvLibrados::_GetSubFnames(
807 const std::string& dir,
808 std::vector<std::string> * result
809 ) {
810 std::string start_after(dir);
811 std::string filter_prefix(dir);
812 std::map<std::string, librados::bufferlist> kvs;
813 _db_pool_ioctx.omap_get_vals(_db_name,
814 start_after, filter_prefix,
815 MAX_ITEMS_IN_FS, &kvs);
816
817 result->clear();
818 for (auto i = kvs.begin(); i != kvs.end(); i++) {
819 result->push_back(i->first.substr(dir.size() + 1));
820 }
821 return Status::OK();
822 }
823
824 /**
825 * @brief delete key fname from metadata object
826 * @details [long description]
827 *
828 * @param fname [description]
829 * @return [description]
830 */
831 Status EnvLibrados::_DelFid(
832 const std::string& fname) {
833 std::set<std::string> keys;
834 keys.insert(fname);
835 int r = _db_pool_ioctx.omap_rm_keys(_db_name, keys);
836 return err_to_status(r);
837 }
838
839 /**
840 * @brief get match IoCtx from _prefix_pool_map
841 * @details [long description]
842 *
843 * @param prefix [description]
844 * @return [description]
845 *
846 */
847 librados::IoCtx* EnvLibrados::_GetIoctx(const std::string& fpath) {
848 auto is_prefix = [](const std::string & s1, const std::string & s2) {
849 auto it1 = s1.begin(), it2 = s2.begin();
850 while (it1 != s1.end() && it2 != s2.end() && *it1 == *it2) ++it1, ++it2;
851 return it1 == s1.end();
852 };
853
854 if (is_prefix(_wal_dir, fpath)) {
855 return &_wal_pool_ioctx;
856 } else {
857 return &_db_pool_ioctx;
858 }
859 }
860
861 /************************************************************
862 public functions
863 ************************************************************/
864 /**
865 * @brief generate unique id
866 * @details Combine system time and random number.
867 * @return [description]
868 */
869 std::string EnvLibrados::GenerateUniqueId() {
870 Random64 r(time(nullptr));
871 uint64_t random_uuid_portion =
872 r.Uniform(std::numeric_limits<uint64_t>::max());
873 uint64_t nanos_uuid_portion = NowNanos();
874 char uuid2[200];
875 snprintf(uuid2,
876 200,
877 "%16lx-%16lx",
878 (unsigned long)nanos_uuid_portion,
879 (unsigned long)random_uuid_portion);
880 return uuid2;
881 }
882
883 /**
884 * @brief create a new sequential read file handler
885 * @details it will check the existence of fname
886 *
887 * @param fname [description]
888 * @param result [description]
889 * @param options [description]
890 * @return [description]
891 */
892 Status EnvLibrados::NewSequentialFile(
893 const std::string& fname,
894 std::unique_ptr<SequentialFile>* result,
895 const EnvOptions& options)
896 {
897 LOG_DEBUG("[IN]%s\n", fname.c_str());
898 std::string dir, file, fid;
899 split(fname, &dir, &file);
900 Status s;
901 std::string fpath = dir + "/" + file;
902 do {
903 s = _GetFid(dir, fid);
904
905 if (!s.ok() || fid != DIR_ID_VALUE) {
906 if (fid != DIR_ID_VALUE) s = Status::IOError();
907 break;
908 }
909
910 s = _GetFid(fpath, fid);
911
912 if (Status::NotFound() == s) {
913 s = Status::IOError();
914 errno = ENOENT;
915 break;
916 }
917
918 result->reset(new LibradosSequentialFile(_GetIoctx(fpath), fid, fpath));
919 s = Status::OK();
920 } while (0);
921
922 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
923 return s;
924 }
925
926 /**
927 * @brief create a new random access file handler
928 * @details it will check the existence of fname
929 *
930 * @param fname [description]
931 * @param result [description]
932 * @param options [description]
933 * @return [description]
934 */
935 Status EnvLibrados::NewRandomAccessFile(
936 const std::string& fname,
937 std::unique_ptr<RandomAccessFile>* result,
938 const EnvOptions& options)
939 {
940 LOG_DEBUG("[IN]%s\n", fname.c_str());
941 std::string dir, file, fid;
942 split(fname, &dir, &file);
943 Status s;
944 std::string fpath = dir + "/" + file;
945 do {
946 s = _GetFid(dir, fid);
947
948 if (!s.ok() || fid != DIR_ID_VALUE) {
949 s = Status::IOError();
950 break;
951 }
952
953 s = _GetFid(fpath, fid);
954
955 if (Status::NotFound() == s) {
956 s = Status::IOError();
957 errno = ENOENT;
958 break;
959 }
960
961 result->reset(new LibradosRandomAccessFile(_GetIoctx(fpath), fid, fpath));
962 s = Status::OK();
963 } while (0);
964
965 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
966 return s;
967 }
968
969 /**
970 * @brief create a new write file handler
971 * @details it will check the existence of fname
972 *
973 * @param fname [description]
974 * @param result [description]
975 * @param options [description]
976 * @return [description]
977 */
978 Status EnvLibrados::NewWritableFile(
979 const std::string& fname,
980 std::unique_ptr<WritableFile>* result,
981 const EnvOptions& options)
982 {
983 LOG_DEBUG("[IN]%s\n", fname.c_str());
984 std::string dir, file, fid;
985 split(fname, &dir, &file);
986 Status s;
987 std::string fpath = dir + "/" + file;
988
989 do {
990 // 1. check if dir exist
991 s = _GetFid(dir, fid);
992 if (!s.ok()) {
993 break;
994 }
995
996 if (fid != DIR_ID_VALUE) {
997 s = Status::IOError();
998 break;
999 }
1000
1001 // 2. check if file exist.
1002 // 2.1 exist, use it
1003 // 2.2 not exist, create it
1004 s = _GetFid(fpath, fid);
1005 if (Status::NotFound() == s) {
1006 fid = _CreateFid();
1007 _AddFid(fpath, fid);
1008 }
1009
1010 result->reset(
1011 new LibradosWritableFile(_GetIoctx(fpath), fid, fpath, this, options));
1012 s = Status::OK();
1013 } while (0);
1014
1015 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1016 return s;
1017 }
1018
1019 /**
1020 * @brief reuse write file handler
1021 * @details
1022 * This function will rename old_fname to new_fname,
1023 * then return the handler of new_fname
1024 *
1025 * @param new_fname [description]
1026 * @param old_fname [description]
1027 * @param result [description]
1028 * @param options [description]
1029 * @return [description]
1030 */
1031 Status EnvLibrados::ReuseWritableFile(
1032 const std::string& new_fname,
1033 const std::string& old_fname,
1034 std::unique_ptr<WritableFile>* result,
1035 const EnvOptions& options)
1036 {
1037 LOG_DEBUG("[IN]%s => %s\n", old_fname.c_str(), new_fname.c_str());
1038 std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
1039 split(old_fname, &src_dir, &src_file);
1040 split(new_fname, &dst_dir, &dst_file);
1041
1042 std::string src_fpath = src_dir + "/" + src_file;
1043 std::string dst_fpath = dst_dir + "/" + dst_file;
1044 Status r = Status::OK();
1045 do {
1046 r = _RenameFid(src_fpath,
1047 dst_fpath);
1048 if (!r.ok()) {
1049 break;
1050 }
1051
1052 result->reset(new LibradosWritableFile(_GetIoctx(dst_fpath), src_fid,
1053 dst_fpath, this, options));
1054 } while (0);
1055
1056 LOG_DEBUG("[OUT]%s\n", r.ToString().c_str());
1057 return r;
1058 }
1059
1060 /**
1061 * @brief create a new directory handler
1062 * @details [long description]
1063 *
1064 * @param name [description]
1065 * @param result [description]
1066 *
1067 * @return [description]
1068 */
1069 Status EnvLibrados::NewDirectory(
1070 const std::string& name,
1071 std::unique_ptr<Directory>* result)
1072 {
1073 LOG_DEBUG("[IN]%s\n", name.c_str());
1074 std::string fid, dir, file;
1075 /* just want to get dir name */
1076 split(name + "/tmp", &dir, &file);
1077 Status s;
1078
1079 do {
1080 s = _GetFid(dir, fid);
1081
1082 if (!s.ok() || DIR_ID_VALUE != fid) {
1083 s = Status::IOError(name, strerror(-ENOENT));
1084 break;
1085 }
1086
1087 if (Status::NotFound() == s) {
1088 s = _AddFid(dir, DIR_ID_VALUE);
1089 if (!s.ok()) break;
1090 } else if (!s.ok()) {
1091 break;
1092 }
1093
1094 result->reset(new LibradosDirectory(_GetIoctx(dir), dir));
1095 s = Status::OK();
1096 } while (0);
1097
1098 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1099 return s;
1100 }
1101
1102 /**
1103 * @brief check if fname is exist
1104 * @details [long description]
1105 *
1106 * @param fname [description]
1107 * @return [description]
1108 */
1109 Status EnvLibrados::FileExists(const std::string& fname)
1110 {
1111 LOG_DEBUG("[IN]%s\n", fname.c_str());
1112 std::string fid, dir, file;
1113 split(fname, &dir, &file);
1114 Status s = _GetFid(dir + "/" + file, fid);
1115
1116 if (s.ok() && fid != DIR_ID_VALUE) {
1117 s = Status::OK();
1118 }
1119
1120 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1121 return s;
1122 }
1123
1124 /**
1125 * @brief get subfile name of dir_in
1126 * @details [long description]
1127 *
1128 * @param dir_in [description]
1129 * @param result [description]
1130 *
1131 * @return [description]
1132 */
1133 Status EnvLibrados::GetChildren(
1134 const std::string& dir_in,
1135 std::vector<std::string>* result)
1136 {
1137 LOG_DEBUG("[IN]%s\n", dir_in.c_str());
1138 std::string fid, dir, file;
1139 split(dir_in + "/temp", &dir, &file);
1140 Status s;
1141
1142 do {
1143 s = _GetFid(dir, fid);
1144 if (!s.ok()) {
1145 break;
1146 }
1147
1148 if (fid != DIR_ID_VALUE) {
1149 s = Status::IOError();
1150 break;
1151 }
1152
1153 s = _GetSubFnames(dir, result);
1154 } while (0);
1155
1156 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1157 return s;
1158 }
1159
1160 /**
1161 * @brief delete fname
1162 * @details [long description]
1163 *
1164 * @param fname [description]
1165 * @return [description]
1166 */
1167 Status EnvLibrados::DeleteFile(const std::string& fname)
1168 {
1169 LOG_DEBUG("[IN]%s\n", fname.c_str());
1170 std::string fid, dir, file;
1171 split(fname, &dir, &file);
1172 Status s = _GetFid(dir + "/" + file, fid);
1173
1174 if (s.ok() && DIR_ID_VALUE != fid) {
1175 s = _DelFid(dir + "/" + file);
1176 } else {
1177 s = Status::NotFound();
1178 }
1179 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1180 return s;
1181 }
1182
1183 /**
1184 * @brief create new dir
1185 * @details [long description]
1186 *
1187 * @param dirname [description]
1188 * @return [description]
1189 */
1190 Status EnvLibrados::CreateDir(const std::string& dirname)
1191 {
1192 LOG_DEBUG("[IN]%s\n", dirname.c_str());
1193 std::string fid, dir, file;
1194 split(dirname + "/temp", &dir, &file);
1195 Status s = _GetFid(dir + "/" + file, fid);
1196
1197 do {
1198 if (Status::NotFound() != s && fid != DIR_ID_VALUE) {
1199 break;
1200 } else if (Status::OK() == s && fid == DIR_ID_VALUE) {
1201 break;
1202 }
1203
1204 s = _AddFid(dir, DIR_ID_VALUE);
1205 } while (0);
1206
1207 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1208 return s;
1209 }
1210
1211 /**
1212 * @brief create dir if missing
1213 * @details [long description]
1214 *
1215 * @param dirname [description]
1216 * @return [description]
1217 */
1218 Status EnvLibrados::CreateDirIfMissing(const std::string& dirname)
1219 {
1220 LOG_DEBUG("[IN]%s\n", dirname.c_str());
1221 std::string fid, dir, file;
1222 split(dirname + "/temp", &dir, &file);
1223 Status s = Status::OK();
1224
1225 do {
1226 s = _GetFid(dir, fid);
1227 if (Status::NotFound() != s) {
1228 break;
1229 }
1230
1231 s = _AddFid(dir, DIR_ID_VALUE);
1232 } while (0);
1233
1234 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1235 return s;
1236 }
1237
1238 /**
1239 * @brief delete dir
1240 * @details
1241 *
1242 * @param dirname [description]
1243 * @return [description]
1244 */
1245 Status EnvLibrados::DeleteDir(const std::string& dirname)
1246 {
1247 LOG_DEBUG("[IN]%s\n", dirname.c_str());
1248 std::string fid, dir, file;
1249 split(dirname + "/temp", &dir, &file);
1250 Status s = Status::OK();
1251
1252 s = _GetFid(dir, fid);
1253
1254 if (s.ok() && DIR_ID_VALUE == fid) {
1255 std::vector<std::string> subs;
1256 s = _GetSubFnames(dir, &subs);
1257 // if subfiles exist, can't delete dir
1258 if (subs.size() > 0) {
1259 s = Status::IOError();
1260 } else {
1261 s = _DelFid(dir);
1262 }
1263 } else {
1264 s = Status::NotFound();
1265 }
1266
1267 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1268 return s;
1269 }
1270
1271 /**
1272 * @brief return file size
1273 * @details [long description]
1274 *
1275 * @param fname [description]
1276 * @param file_size [description]
1277 *
1278 * @return [description]
1279 */
1280 Status EnvLibrados::GetFileSize(
1281 const std::string& fname,
1282 uint64_t* file_size)
1283 {
1284 LOG_DEBUG("[IN]%s\n", fname.c_str());
1285 std::string fid, dir, file;
1286 split(fname, &dir, &file);
1287 time_t mtime;
1288 Status s;
1289
1290 do {
1291 std::string fpath = dir + "/" + file;
1292 s = _GetFid(fpath, fid);
1293
1294 if (!s.ok()) {
1295 break;
1296 }
1297
1298 int ret = _GetIoctx(fpath)->stat(fid, file_size, &mtime);
1299 if (ret < 0) {
1300 LOG_DEBUG("%i\n", ret);
1301 if (-ENOENT == ret) {
1302 *file_size = 0;
1303 s = Status::OK();
1304 } else {
1305 s = err_to_status(ret);
1306 }
1307 } else {
1308 s = Status::OK();
1309 }
1310 } while (0);
1311
1312 LOG_DEBUG("[OUT]%s|%lld\n", s.ToString().c_str(), (long long)*file_size);
1313 return s;
1314 }
1315
1316 /**
1317 * @brief get file modification time
1318 * @details [long description]
1319 *
1320 * @param fname [description]
1321 * @param file_mtime [description]
1322 *
1323 * @return [description]
1324 */
1325 Status EnvLibrados::GetFileModificationTime(const std::string& fname,
1326 uint64_t* file_mtime)
1327 {
1328 LOG_DEBUG("[IN]%s\n", fname.c_str());
1329 std::string fid, dir, file;
1330 split(fname, &dir, &file);
1331 time_t mtime;
1332 uint64_t file_size;
1333 Status s = Status::OK();
1334 do {
1335 std::string fpath = dir + "/" + file;
1336 s = _GetFid(dir + "/" + file, fid);
1337
1338 if (!s.ok()) {
1339 break;
1340 }
1341
1342 int ret = _GetIoctx(fpath)->stat(fid, &file_size, &mtime);
1343 if (ret < 0) {
1344 if (Status::NotFound() == err_to_status(ret)) {
1345 *file_mtime = static_cast<uint64_t>(mtime);
1346 s = Status::OK();
1347 } else {
1348 s = err_to_status(ret);
1349 }
1350 } else {
1351 s = Status::OK();
1352 }
1353 } while (0);
1354
1355 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1356 return s;
1357 }
1358
1359 /**
1360 * @brief rename file
1361 * @details
1362 *
1363 * @param src [description]
1364 * @param target_in [description]
1365 *
1366 * @return [description]
1367 */
1368 Status EnvLibrados::RenameFile(
1369 const std::string& src,
1370 const std::string& target_in)
1371 {
1372 LOG_DEBUG("[IN]%s => %s\n", src.c_str(), target_in.c_str());
1373 std::string src_fid, tmp_fid, src_dir, src_file, dst_dir, dst_file;
1374 split(src, &src_dir, &src_file);
1375 split(target_in, &dst_dir, &dst_file);
1376
1377 auto s = _RenameFid(src_dir + "/" + src_file,
1378 dst_dir + "/" + dst_file);
1379 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1380 return s;
1381 }
1382
1383 /**
1384 * @brief not support
1385 * @details [long description]
1386 *
1387 * @param src [description]
1388 * @param target_in [description]
1389 *
1390 * @return [description]
1391 */
1392 Status EnvLibrados::LinkFile(
1393 const std::string& src,
1394 const std::string& target_in)
1395 {
1396 LOG_DEBUG("[IO]%s => %s\n", src.c_str(), target_in.c_str());
1397 return Status::NotSupported();
1398 }
1399
1400 /**
1401 * @brief lock file. create if missing.
1402 * @details [long description]
1403 *
1404 * It seems that LockFile is used for preventing other instance of RocksDB
1405 * from opening up the database at the same time. From RocksDB source code,
1406 * the invokes of LockFile are at following locations:
1407 *
1408 * ./db/db_impl.cc:1159: s = env_->LockFile(LockFileName(dbname_), &db_lock_); // DBImpl::Recover
1409 * ./db/db_impl.cc:5839: Status result = env->LockFile(lockname, &lock); // Status DestroyDB
1410 *
1411 * When db recovery and db destroy, RocksDB will call LockFile
1412 *
1413 * @param fname [description]
1414 * @param lock [description]
1415 *
1416 * @return [description]
1417 */
1418 Status EnvLibrados::LockFile(
1419 const std::string& fname,
1420 FileLock** lock)
1421 {
1422 LOG_DEBUG("[IN]%s\n", fname.c_str());
1423 std::string fid, dir, file;
1424 split(fname, &dir, &file);
1425 Status s = Status::OK();
1426
1427 do {
1428 std::string fpath = dir + "/" + file;
1429 s = _GetFid(fpath, fid);
1430
1431 if (Status::OK() != s &&
1432 Status::NotFound() != s) {
1433 break;
1434 } else if (Status::NotFound() == s) {
1435 s = _AddFid(fpath, _CreateFid());
1436 if (!s.ok()) {
1437 break;
1438 }
1439 } else if (Status::OK() == s && DIR_ID_VALUE == fid) {
1440 s = Status::IOError();
1441 break;
1442 }
1443
1444 *lock = new LibradosFileLock(_GetIoctx(fpath), fpath);
1445 } while (0);
1446
1447 LOG_DEBUG("[OUT]%s\n", s.ToString().c_str());
1448 return s;
1449 }
1450
1451 /**
1452 * @brief unlock file
1453 * @details [long description]
1454 *
1455 * @param lock [description]
1456 * @return [description]
1457 */
1458 Status EnvLibrados::UnlockFile(FileLock* lock)
1459 {
1460 LOG_DEBUG("[IO]%p\n", lock);
1461 if (nullptr != lock) {
1462 delete lock;
1463 }
1464 return Status::OK();
1465 }
1466
1467
1468 /**
1469 * @brief not support
1470 * @details [long description]
1471 *
1472 * @param db_path [description]
1473 * @param output_path [description]
1474 *
1475 * @return [description]
1476 */
1477 Status EnvLibrados::GetAbsolutePath(
1478 const std::string& db_path,
1479 std::string* output_path)
1480 {
1481 LOG_DEBUG("[IO]%s\n", db_path.c_str());
1482 return Status::NotSupported();
1483 }
1484
1485 /**
1486 * @brief Get default EnvLibrados
1487 * @details [long description]
1488 * @return [description]
1489 */
1490 EnvLibrados* EnvLibrados::Default() {
1491 static EnvLibrados default_env(default_db_name,
1492 std::getenv(default_config_path),
1493 default_pool_name);
1494 return &default_env;
1495 }
1496
1497 } // namespace ROCKSDB_NAMESPACE