]>
git.proxmox.com Git - ceph.git/blob - ceph/src/kv/MemDB.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * In-memory crash non-safe keyvalue db
5 * Author: Ramesh Chander, Ramesh.Chander@sandisk.com
8 #include "include/compat.h"
13 #if __has_include(<filesystem>)
15 namespace fs
= std::filesystem
;
16 #elif __has_include(<experimental/filesystem>)
17 #include <experimental/filesystem>
18 namespace fs
= std::experimental::filesystem
;
22 #include <sys/types.h>
25 #include "common/perf_counters.h"
26 #include "common/debug.h"
27 #include "include/str_list.h"
28 #include "include/str_map.h"
29 #include "KeyValueDB.h"
32 #include "include/ceph_assert.h"
33 #include "common/debug.h"
34 #include "common/errno.h"
35 #include "include/buffer.h"
36 #include "include/buffer_raw.h"
37 #include "include/compat.h"
39 #define dout_context g_ceph_context
40 #define dout_subsys ceph_subsys_memdb
42 #define dout_prefix *_dout << "memdb: "
43 #define dtrace dout(30)
52 using ceph::bufferlist
;
53 using ceph::bufferptr
;
57 static void split_key(const string
& raw_key
, string
*prefix
, string
*key
)
59 size_t pos
= raw_key
.find(KEY_DELIM
, 0);
60 ceph_assert(pos
!= std::string::npos
);
61 *prefix
= raw_key
.substr(0, pos
);
62 *key
= raw_key
.substr(pos
+ 1, raw_key
.length());
65 static string
make_key(const string
&prefix
, const string
&value
)
68 out
.push_back(KEY_DELIM
);
73 void MemDB::_encode(mdb_iter_t iter
, bufferlist
&bl
)
75 encode(iter
->first
, bl
);
76 encode(iter
->second
, bl
);
79 std::string
MemDB::_get_data_fn()
81 string fn
= m_db_path
+ "/" + "MemDB.db";
87 std::lock_guard
<std::mutex
> l(m_lock
);
88 dout(10) << __func__
<< " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl
;
90 int fd
= TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(),
91 O_WRONLY
|O_CREAT
|O_TRUNC
|O_CLOEXEC
, mode
));
94 cerr
<< "write_file(" << _get_data_fn().c_str() << "): failed to open file: "
95 << cpp_strerror(err
) << std::endl
;
99 mdb_iter_t iter
= m_map
.begin();
100 while (iter
!= m_map
.end()) {
101 dout(10) << __func__
<< " Key:"<< iter
->first
<< dendl
;
107 VOID_TEMP_FAILURE_RETRY(::close(fd
));
112 std::lock_guard
<std::mutex
> l(m_lock
);
113 dout(10) << __func__
<< " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl
;
115 * Open file and read it in single shot.
117 int fd
= TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY
|O_CLOEXEC
));
120 cerr
<< "can't open " << _get_data_fn().c_str() << ": "
121 << cpp_strerror(err
) << std::endl
;
126 memset(&st
, 0, sizeof(st
));
127 if (::fstat(fd
, &st
) < 0) {
129 cerr
<< "can't stat file " << _get_data_fn().c_str() << ": "
130 << cpp_strerror(err
) << std::endl
;
131 VOID_TEMP_FAILURE_RETRY(::close(fd
));
135 ssize_t file_size
= st
.st_size
;
136 ssize_t bytes_done
= 0;
137 while (bytes_done
< file_size
) {
141 bytes_done
+= ceph::decode_file(fd
, key
);
142 bytes_done
+= ceph::decode_file(fd
, datap
);
144 dout(10) << __func__
<< " Key:"<< key
<< dendl
;
146 m_total_bytes
+= datap
.length();
148 VOID_TEMP_FAILURE_RETRY(::close(fd
));
152 int MemDB::_init(bool create
)
155 dout(1) << __func__
<< dendl
;
157 if (fs::exists(m_db_path
)) {
158 r
= 0; // ignore EEXIST
161 if (!fs::create_directory(m_db_path
, ec
)) {
162 derr
<< __func__
<< " mkdir failed: " << ec
.message() << dendl
;
165 fs::permissions(m_db_path
, fs::perms::owner_all
);
171 PerfCountersBuilder
plb(g_ceph_context
, "memdb", l_memdb_first
, l_memdb_last
);
172 plb
.add_u64_counter(l_memdb_gets
, "get", "Gets");
173 plb
.add_u64_counter(l_memdb_txns
, "submit_transaction", "Submit transactions");
174 plb
.add_time_avg(l_memdb_get_latency
, "get_latency", "Get latency");
175 plb
.add_time_avg(l_memdb_submit_latency
, "submit_latency", "Submit Latency");
176 logger
= plb
.create_perf_counters();
177 m_cct
->get_perfcounters_collection()->add(logger
);
182 int MemDB::set_merge_operator(
183 const string
& prefix
,
184 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
186 merge_ops
.push_back(std::make_pair(prefix
, mop
));
190 int MemDB::do_open(ostream
&out
, bool create
)
193 m_allocated_bytes
= 1;
195 return _init(create
);
198 int MemDB::open(ostream
&out
, const std::string
& cfs
) {
200 ceph_abort_msg("Not implemented");
202 return do_open(out
, false);
205 int MemDB::create_and_open(ostream
&out
, const std::string
& cfs
) {
207 ceph_abort_msg("Not implemented");
209 return do_open(out
, true);
215 dout(10) << __func__
<< " Destroying MemDB instance: "<< dendl
;
221 * Save whatever in memory btree.
225 m_cct
->get_perfcounters_collection()->remove(logger
);
228 int MemDB::submit_transaction(KeyValueDB::Transaction t
)
230 utime_t start
= ceph_clock_now();
232 MDBTransactionImpl
* mt
= static_cast<MDBTransactionImpl
*>(t
.get());
234 dtrace
<< __func__
<< " " << mt
->get_ops().size() << dendl
;
235 for(auto& op
: mt
->get_ops()) {
236 if(op
.first
== MDBTransactionImpl::WRITE
) {
237 ms_op_t set_op
= op
.second
;
239 } else if (op
.first
== MDBTransactionImpl::MERGE
) {
240 ms_op_t merge_op
= op
.second
;
243 ms_op_t rm_op
= op
.second
;
244 ceph_assert(op
.first
== MDBTransactionImpl::DELETE
);
249 utime_t lat
= ceph_clock_now() - start
;
250 logger
->inc(l_memdb_txns
);
251 logger
->tinc(l_memdb_submit_latency
, lat
);
256 int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync
)
258 dtrace
<< __func__
<< " " << dendl
;
259 submit_transaction(tsync
);
263 int MemDB::transaction_rollback(KeyValueDB::Transaction t
)
265 MDBTransactionImpl
* mt
= static_cast<MDBTransactionImpl
*>(t
.get());
270 void MemDB::MDBTransactionImpl::set(
271 const string
&prefix
, const string
&k
, const bufferlist
&to_set_bl
)
273 dtrace
<< __func__
<< " " << prefix
<< " " << k
<< dendl
;
274 ops
.push_back(make_pair(WRITE
, std::make_pair(std::make_pair(prefix
, k
),
278 void MemDB::MDBTransactionImpl::rmkey(const string
&prefix
,
281 dtrace
<< __func__
<< " " << prefix
<< " " << k
<< dendl
;
282 ops
.push_back(make_pair(DELETE
,
283 std::make_pair(std::make_pair(prefix
, k
),
287 void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
289 KeyValueDB::Iterator it
= m_db
->get_iterator(prefix
);
290 for (it
->seek_to_first(); it
->valid(); it
->next()) {
291 rmkey(prefix
, it
->key());
295 void MemDB::MDBTransactionImpl::rm_range_keys(const string
&prefix
, const string
&start
, const string
&end
)
297 KeyValueDB::Iterator it
= m_db
->get_iterator(prefix
);
298 it
->lower_bound(start
);
299 while (it
->valid()) {
300 if (it
->key() >= end
) {
303 rmkey(prefix
, it
->key());
308 void MemDB::MDBTransactionImpl::merge(
309 const std::string
&prefix
, const std::string
&key
, const bufferlist
&value
)
312 dtrace
<< __func__
<< " " << prefix
<< " " << key
<< dendl
;
313 ops
.push_back(make_pair(MERGE
, make_pair(std::make_pair(prefix
, key
), value
)));
317 int MemDB::_setkey(ms_op_t
&op
)
319 std::lock_guard
<std::mutex
> l(m_lock
);
320 std::string key
= make_key(op
.first
.first
, op
.first
.second
);
321 bufferlist bl
= op
.second
;
323 m_total_bytes
+= bl
.length();
326 if (_get(op
.first
.first
, op
.first
.second
, &bl_old
)) {
328 * delete and free existing key.
330 ceph_assert(m_total_bytes
>= bl_old
.length());
331 m_total_bytes
-= bl_old
.length();
335 m_map
[key
] = bufferptr((char *) bl
.c_str(), bl
.length());
340 int MemDB::_rmkey(ms_op_t
&op
)
342 std::lock_guard
<std::mutex
> l(m_lock
);
343 std::string key
= make_key(op
.first
.first
, op
.first
.second
);
346 if (_get(op
.first
.first
, op
.first
.second
, &bl_old
)) {
347 ceph_assert(m_total_bytes
>= bl_old
.length());
348 m_total_bytes
-= bl_old
.length();
352 * Erase will call the destructor for bufferptr.
354 return m_map
.erase(key
);
357 std::shared_ptr
<KeyValueDB::MergeOperator
> MemDB::_find_merge_op(const std::string
&prefix
)
359 for (const auto& i
: merge_ops
) {
360 if (i
.first
== prefix
) {
365 dtrace
<< __func__
<< " No merge op for " << prefix
<< dendl
;
370 int MemDB::_merge(ms_op_t
&op
)
372 std::lock_guard
<std::mutex
> l(m_lock
);
373 std::string prefix
= op
.first
.first
;
374 std::string key
= make_key(op
.first
.first
, op
.first
.second
);
375 bufferlist bl
= op
.second
;
376 int64_t bytes_adjusted
= bl
.length();
379 * find the operator for this prefix
381 std::shared_ptr
<MergeOperator
> mop
= _find_merge_op(prefix
);
385 * call the merge operator with value and non value
388 if (_get(op
.first
.first
, op
.first
.second
, &bl_old
) == false) {
391 * Merge non existent.
393 mop
->merge_nonexistent(bl
.c_str(), bl
.length(), &new_val
);
394 m_map
[key
] = bufferptr(new_val
.c_str(), new_val
.length());
400 mop
->merge(bl_old
.c_str(), bl_old
.length(), bl
.c_str(), bl
.length(), &new_val
);
401 m_map
[key
] = bufferptr(new_val
.c_str(), new_val
.length());
402 bytes_adjusted
-= bl_old
.length();
406 ceph_assert((int64_t)m_total_bytes
+ bytes_adjusted
>= 0);
407 m_total_bytes
+= bytes_adjusted
;
413 * Caller take btree lock.
415 bool MemDB::_get(const string
&prefix
, const string
&k
, bufferlist
*out
)
417 string key
= make_key(prefix
, k
);
419 mdb_iter_t iter
= m_map
.find(key
);
420 if (iter
== m_map
.end()) {
424 out
->push_back((m_map
[key
].clone()));
428 bool MemDB::_get_locked(const string
&prefix
, const string
&k
, bufferlist
*out
)
430 std::lock_guard
<std::mutex
> l(m_lock
);
431 return _get(prefix
, k
, out
);
435 int MemDB::get(const string
&prefix
, const std::string
& key
,
438 utime_t start
= ceph_clock_now();
441 if (_get_locked(prefix
, key
, out
)) {
447 utime_t lat
= ceph_clock_now() - start
;
448 logger
->inc(l_memdb_gets
);
449 logger
->tinc(l_memdb_get_latency
, lat
);
454 int MemDB::get(const string
&prefix
, const std::set
<string
> &keys
,
455 std::map
<string
, bufferlist
> *out
)
457 utime_t start
= ceph_clock_now();
459 for (const auto& i
: keys
) {
461 if (_get_locked(prefix
, i
, &bl
))
462 out
->insert(make_pair(i
, bl
));
465 utime_t lat
= ceph_clock_now() - start
;
466 logger
->inc(l_memdb_gets
);
467 logger
->tinc(l_memdb_get_latency
, lat
);
472 void MemDB::MDBWholeSpaceIteratorImpl::fill_current()
475 bl
.push_back(m_iter
->second
.clone());
476 m_key_value
= std::make_pair(m_iter
->first
, bl
);
479 bool MemDB::MDBWholeSpaceIteratorImpl::valid()
481 if (m_key_value
.first
.empty()) {
487 bool MemDB::MDBWholeSpaceIteratorImpl::iterator_validate() {
489 if (this_seq_no
!= *global_seq_no
) {
490 auto key
= m_key_value
.first
;
491 ceph_assert(!key
.empty());
493 bool restart_iter
= false;
494 if (!m_using_btree
) {
496 * Map is modified and marker key does not exists,
497 * restart the iterator from next key.
499 if (m_map_p
->find(key
) == m_map_p
->end()) {
507 m_iter
= m_map_p
->lower_bound(key
);
508 if (m_iter
== m_map_p
->end()) {
514 * This iter is valid now.
516 this_seq_no
= *global_seq_no
;
523 MemDB::MDBWholeSpaceIteratorImpl::free_last()
525 m_key_value
.first
.clear();
526 m_key_value
.second
.clear();
529 string
MemDB::MDBWholeSpaceIteratorImpl::key()
531 dtrace
<< __func__
<< " " << m_key_value
.first
<< dendl
;
533 split_key(m_key_value
.first
, &prefix
, &key
);
537 std::pair
<string
,string
> MemDB::MDBWholeSpaceIteratorImpl::raw_key()
540 split_key(m_key_value
.first
, &prefix
, &key
);
541 return { prefix
, key
};
544 bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed(
545 const string
&prefix
)
548 split_key(m_key_value
.first
, &p
, &k
);
549 return (p
== prefix
);
552 bufferlist
MemDB::MDBWholeSpaceIteratorImpl::value()
554 dtrace
<< __func__
<< " " << m_key_value
<< dendl
;
555 return m_key_value
.second
;
558 int MemDB::MDBWholeSpaceIteratorImpl::next()
560 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
561 if (!iterator_validate()) {
567 if (m_iter
!= m_map_p
->end()) {
575 int MemDB::MDBWholeSpaceIteratorImpl:: prev()
577 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
578 if (!iterator_validate()) {
583 if (m_iter
!= m_map_p
->begin()) {
593 * First key >= to given key, if key is null then first key in btree.
595 int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string
&k
)
597 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
600 m_iter
= m_map_p
->begin();
602 m_iter
= m_map_p
->lower_bound(k
);
605 if (m_iter
== m_map_p
->end()) {
612 int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string
&k
)
614 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
617 m_iter
= m_map_p
->end();
620 m_iter
= m_map_p
->lower_bound(k
);
623 if (m_iter
== m_map_p
->end()) {
630 MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl()
635 int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string
&prefix
,
636 const std::string
&after
) {
638 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
640 dtrace
<< "upper_bound " << prefix
.c_str() << after
.c_str() << dendl
;
641 string k
= make_key(prefix
, after
);
642 m_iter
= m_map_p
->upper_bound(k
);
643 if (m_iter
!= m_map_p
->end()) {
650 int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string
&prefix
,
651 const std::string
&to
) {
652 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
653 dtrace
<< "lower_bound " << prefix
.c_str() << to
.c_str() << dendl
;
654 string k
= make_key(prefix
, to
);
655 m_iter
= m_map_p
->lower_bound(k
);
656 if (m_iter
!= m_map_p
->end()) {