]>
git.proxmox.com Git - ceph.git/blob - ceph/src/kv/MemDB.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * In-memory crash non-safe keyvalue db
5 * Author: Ramesh Chander, Ramesh.Chander@sandisk.com
8 #include "include/compat.h"
16 #include <sys/types.h>
19 #include "common/perf_counters.h"
20 #include "common/debug.h"
21 #include "include/str_list.h"
22 #include "include/str_map.h"
23 #include "KeyValueDB.h"
26 #include "include/ceph_assert.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "include/buffer.h"
30 #include "include/buffer_raw.h"
31 #include "include/compat.h"
33 #define dout_context g_ceph_context
34 #define dout_subsys ceph_subsys_memdb
36 #define dout_prefix *_dout << "memdb: "
37 #define dtrace dout(30)
41 namespace fs
= std::filesystem
;
48 using ceph::bufferlist
;
49 using ceph::bufferptr
;
53 static void split_key(const string
& raw_key
, string
*prefix
, string
*key
)
55 size_t pos
= raw_key
.find(KEY_DELIM
, 0);
56 ceph_assert(pos
!= std::string::npos
);
57 *prefix
= raw_key
.substr(0, pos
);
58 *key
= raw_key
.substr(pos
+ 1, raw_key
.length());
61 static string
make_key(const string
&prefix
, const string
&value
)
64 out
.push_back(KEY_DELIM
);
69 void MemDB::_encode(mdb_iter_t iter
, bufferlist
&bl
)
71 encode(iter
->first
, bl
);
72 encode(iter
->second
, bl
);
75 std::string
MemDB::_get_data_fn()
77 string fn
= m_db_path
+ "/" + "MemDB.db";
83 std::lock_guard
<std::mutex
> l(m_lock
);
84 dout(10) << __func__
<< " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl
;
86 int fd
= TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(),
87 O_WRONLY
|O_CREAT
|O_TRUNC
|O_CLOEXEC
, mode
));
90 cerr
<< "write_file(" << _get_data_fn().c_str() << "): failed to open file: "
91 << cpp_strerror(err
) << std::endl
;
95 mdb_iter_t iter
= m_map
.begin();
96 while (iter
!= m_map
.end()) {
97 dout(10) << __func__
<< " Key:"<< iter
->first
<< dendl
;
103 VOID_TEMP_FAILURE_RETRY(::close(fd
));
108 std::lock_guard
<std::mutex
> l(m_lock
);
109 dout(10) << __func__
<< " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl
;
111 * Open file and read it in single shot.
113 int fd
= TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY
|O_CLOEXEC
));
116 cerr
<< "can't open " << _get_data_fn().c_str() << ": "
117 << cpp_strerror(err
) << std::endl
;
122 memset(&st
, 0, sizeof(st
));
123 if (::fstat(fd
, &st
) < 0) {
125 cerr
<< "can't stat file " << _get_data_fn().c_str() << ": "
126 << cpp_strerror(err
) << std::endl
;
127 VOID_TEMP_FAILURE_RETRY(::close(fd
));
131 ssize_t file_size
= st
.st_size
;
132 ssize_t bytes_done
= 0;
133 while (bytes_done
< file_size
) {
137 bytes_done
+= ceph::decode_file(fd
, key
);
138 bytes_done
+= ceph::decode_file(fd
, datap
);
140 dout(10) << __func__
<< " Key:"<< key
<< dendl
;
142 m_total_bytes
+= datap
.length();
144 VOID_TEMP_FAILURE_RETRY(::close(fd
));
148 int MemDB::_init(bool create
)
151 dout(1) << __func__
<< dendl
;
153 if (fs::exists(m_db_path
)) {
154 r
= 0; // ignore EEXIST
157 if (!fs::create_directory(m_db_path
, ec
)) {
158 derr
<< __func__
<< " mkdir failed: " << ec
.message() << dendl
;
161 fs::permissions(m_db_path
, fs::perms::owner_all
);
167 PerfCountersBuilder
plb(g_ceph_context
, "memdb", l_memdb_first
, l_memdb_last
);
168 plb
.add_u64_counter(l_memdb_gets
, "get", "Gets");
169 plb
.add_u64_counter(l_memdb_txns
, "submit_transaction", "Submit transactions");
170 plb
.add_time_avg(l_memdb_get_latency
, "get_latency", "Get latency");
171 plb
.add_time_avg(l_memdb_submit_latency
, "submit_latency", "Submit Latency");
172 logger
= plb
.create_perf_counters();
173 m_cct
->get_perfcounters_collection()->add(logger
);
178 int MemDB::set_merge_operator(
179 const string
& prefix
,
180 std::shared_ptr
<KeyValueDB::MergeOperator
> mop
)
182 merge_ops
.push_back(std::make_pair(prefix
, mop
));
186 int MemDB::do_open(ostream
&out
, bool create
)
189 m_allocated_bytes
= 1;
191 return _init(create
);
194 int MemDB::open(ostream
&out
, const std::string
& cfs
) {
196 ceph_abort_msg("Not implemented");
198 return do_open(out
, false);
201 int MemDB::create_and_open(ostream
&out
, const std::string
& cfs
) {
203 ceph_abort_msg("Not implemented");
205 return do_open(out
, true);
211 dout(10) << __func__
<< " Destroying MemDB instance: "<< dendl
;
217 * Save whatever in memory btree.
221 m_cct
->get_perfcounters_collection()->remove(logger
);
224 int MemDB::submit_transaction(KeyValueDB::Transaction t
)
226 utime_t start
= ceph_clock_now();
228 MDBTransactionImpl
* mt
= static_cast<MDBTransactionImpl
*>(t
.get());
230 dtrace
<< __func__
<< " " << mt
->get_ops().size() << dendl
;
231 for(auto& op
: mt
->get_ops()) {
232 if(op
.first
== MDBTransactionImpl::WRITE
) {
233 ms_op_t set_op
= op
.second
;
235 } else if (op
.first
== MDBTransactionImpl::MERGE
) {
236 ms_op_t merge_op
= op
.second
;
239 ms_op_t rm_op
= op
.second
;
240 ceph_assert(op
.first
== MDBTransactionImpl::DELETE
);
245 utime_t lat
= ceph_clock_now() - start
;
246 logger
->inc(l_memdb_txns
);
247 logger
->tinc(l_memdb_submit_latency
, lat
);
252 int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync
)
254 dtrace
<< __func__
<< " " << dendl
;
255 submit_transaction(tsync
);
259 int MemDB::transaction_rollback(KeyValueDB::Transaction t
)
261 MDBTransactionImpl
* mt
= static_cast<MDBTransactionImpl
*>(t
.get());
266 void MemDB::MDBTransactionImpl::set(
267 const string
&prefix
, const string
&k
, const bufferlist
&to_set_bl
)
269 dtrace
<< __func__
<< " " << prefix
<< " " << k
<< dendl
;
270 ops
.push_back(make_pair(WRITE
, std::make_pair(std::make_pair(prefix
, k
),
274 void MemDB::MDBTransactionImpl::rmkey(const string
&prefix
,
277 dtrace
<< __func__
<< " " << prefix
<< " " << k
<< dendl
;
278 ops
.push_back(make_pair(DELETE
,
279 std::make_pair(std::make_pair(prefix
, k
),
283 void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
285 KeyValueDB::Iterator it
= m_db
->get_iterator(prefix
);
286 for (it
->seek_to_first(); it
->valid(); it
->next()) {
287 rmkey(prefix
, it
->key());
291 void MemDB::MDBTransactionImpl::rm_range_keys(const string
&prefix
, const string
&start
, const string
&end
)
293 KeyValueDB::Iterator it
= m_db
->get_iterator(prefix
);
294 it
->lower_bound(start
);
295 while (it
->valid()) {
296 if (it
->key() >= end
) {
299 rmkey(prefix
, it
->key());
304 void MemDB::MDBTransactionImpl::merge(
305 const std::string
&prefix
, const std::string
&key
, const bufferlist
&value
)
308 dtrace
<< __func__
<< " " << prefix
<< " " << key
<< dendl
;
309 ops
.push_back(make_pair(MERGE
, make_pair(std::make_pair(prefix
, key
), value
)));
313 int MemDB::_setkey(ms_op_t
&op
)
315 std::lock_guard
<std::mutex
> l(m_lock
);
316 std::string key
= make_key(op
.first
.first
, op
.first
.second
);
317 bufferlist bl
= op
.second
;
319 m_total_bytes
+= bl
.length();
322 if (_get(op
.first
.first
, op
.first
.second
, &bl_old
)) {
324 * delete and free existing key.
326 ceph_assert(m_total_bytes
>= bl_old
.length());
327 m_total_bytes
-= bl_old
.length();
331 m_map
[key
] = bufferptr((char *) bl
.c_str(), bl
.length());
336 int MemDB::_rmkey(ms_op_t
&op
)
338 std::lock_guard
<std::mutex
> l(m_lock
);
339 std::string key
= make_key(op
.first
.first
, op
.first
.second
);
342 if (_get(op
.first
.first
, op
.first
.second
, &bl_old
)) {
343 ceph_assert(m_total_bytes
>= bl_old
.length());
344 m_total_bytes
-= bl_old
.length();
348 * Erase will call the destructor for bufferptr.
350 return m_map
.erase(key
);
353 std::shared_ptr
<KeyValueDB::MergeOperator
> MemDB::_find_merge_op(const std::string
&prefix
)
355 for (const auto& i
: merge_ops
) {
356 if (i
.first
== prefix
) {
361 dtrace
<< __func__
<< " No merge op for " << prefix
<< dendl
;
366 int MemDB::_merge(ms_op_t
&op
)
368 std::lock_guard
<std::mutex
> l(m_lock
);
369 std::string prefix
= op
.first
.first
;
370 std::string key
= make_key(op
.first
.first
, op
.first
.second
);
371 bufferlist bl
= op
.second
;
372 int64_t bytes_adjusted
= bl
.length();
375 * find the operator for this prefix
377 std::shared_ptr
<MergeOperator
> mop
= _find_merge_op(prefix
);
381 * call the merge operator with value and non value
384 if (_get(op
.first
.first
, op
.first
.second
, &bl_old
) == false) {
387 * Merge non existent.
389 mop
->merge_nonexistent(bl
.c_str(), bl
.length(), &new_val
);
390 m_map
[key
] = bufferptr(new_val
.c_str(), new_val
.length());
396 mop
->merge(bl_old
.c_str(), bl_old
.length(), bl
.c_str(), bl
.length(), &new_val
);
397 m_map
[key
] = bufferptr(new_val
.c_str(), new_val
.length());
398 bytes_adjusted
-= bl_old
.length();
402 ceph_assert((int64_t)m_total_bytes
+ bytes_adjusted
>= 0);
403 m_total_bytes
+= bytes_adjusted
;
409 * Caller take btree lock.
411 bool MemDB::_get(const string
&prefix
, const string
&k
, bufferlist
*out
)
413 string key
= make_key(prefix
, k
);
415 mdb_iter_t iter
= m_map
.find(key
);
416 if (iter
== m_map
.end()) {
420 out
->push_back((m_map
[key
].clone()));
424 bool MemDB::_get_locked(const string
&prefix
, const string
&k
, bufferlist
*out
)
426 std::lock_guard
<std::mutex
> l(m_lock
);
427 return _get(prefix
, k
, out
);
431 int MemDB::get(const string
&prefix
, const std::string
& key
,
434 utime_t start
= ceph_clock_now();
437 if (_get_locked(prefix
, key
, out
)) {
443 utime_t lat
= ceph_clock_now() - start
;
444 logger
->inc(l_memdb_gets
);
445 logger
->tinc(l_memdb_get_latency
, lat
);
450 int MemDB::get(const string
&prefix
, const std::set
<string
> &keys
,
451 std::map
<string
, bufferlist
> *out
)
453 utime_t start
= ceph_clock_now();
455 for (const auto& i
: keys
) {
457 if (_get_locked(prefix
, i
, &bl
))
458 out
->insert(make_pair(i
, bl
));
461 utime_t lat
= ceph_clock_now() - start
;
462 logger
->inc(l_memdb_gets
);
463 logger
->tinc(l_memdb_get_latency
, lat
);
468 void MemDB::MDBWholeSpaceIteratorImpl::fill_current()
471 bl
.push_back(m_iter
->second
.clone());
472 m_key_value
= std::make_pair(m_iter
->first
, bl
);
475 bool MemDB::MDBWholeSpaceIteratorImpl::valid()
477 if (m_key_value
.first
.empty()) {
483 bool MemDB::MDBWholeSpaceIteratorImpl::iterator_validate() {
485 if (this_seq_no
!= *global_seq_no
) {
486 auto key
= m_key_value
.first
;
487 ceph_assert(!key
.empty());
489 bool restart_iter
= false;
490 if (!m_using_btree
) {
492 * Map is modified and marker key does not exists,
493 * restart the iterator from next key.
495 if (m_map_p
->find(key
) == m_map_p
->end()) {
503 m_iter
= m_map_p
->lower_bound(key
);
504 if (m_iter
== m_map_p
->end()) {
510 * This iter is valid now.
512 this_seq_no
= *global_seq_no
;
519 MemDB::MDBWholeSpaceIteratorImpl::free_last()
521 m_key_value
.first
.clear();
522 m_key_value
.second
.clear();
525 string
MemDB::MDBWholeSpaceIteratorImpl::key()
527 dtrace
<< __func__
<< " " << m_key_value
.first
<< dendl
;
529 split_key(m_key_value
.first
, &prefix
, &key
);
533 std::pair
<string
,string
> MemDB::MDBWholeSpaceIteratorImpl::raw_key()
536 split_key(m_key_value
.first
, &prefix
, &key
);
537 return { prefix
, key
};
540 bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed(
541 const string
&prefix
)
544 split_key(m_key_value
.first
, &p
, &k
);
545 return (p
== prefix
);
548 bufferlist
MemDB::MDBWholeSpaceIteratorImpl::value()
550 dtrace
<< __func__
<< " " << m_key_value
<< dendl
;
551 return m_key_value
.second
;
554 int MemDB::MDBWholeSpaceIteratorImpl::next()
556 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
557 if (!iterator_validate()) {
563 if (m_iter
!= m_map_p
->end()) {
571 int MemDB::MDBWholeSpaceIteratorImpl:: prev()
573 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
574 if (!iterator_validate()) {
579 if (m_iter
!= m_map_p
->begin()) {
589 * First key >= to given key, if key is null then first key in btree.
591 int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string
&k
)
593 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
596 m_iter
= m_map_p
->begin();
598 m_iter
= m_map_p
->lower_bound(k
);
601 if (m_iter
== m_map_p
->end()) {
608 int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string
&k
)
610 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
613 m_iter
= m_map_p
->end();
616 m_iter
= m_map_p
->lower_bound(k
);
619 if (m_iter
== m_map_p
->end()) {
626 MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl()
631 int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string
&prefix
,
632 const std::string
&after
) {
634 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
636 dtrace
<< "upper_bound " << prefix
.c_str() << after
.c_str() << dendl
;
637 string k
= make_key(prefix
, after
);
638 m_iter
= m_map_p
->upper_bound(k
);
639 if (m_iter
!= m_map_p
->end()) {
646 int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string
&prefix
,
647 const std::string
&to
) {
648 std::lock_guard
<std::mutex
> l(*m_map_lock_p
);
649 dtrace
<< "lower_bound " << prefix
.c_str() << to
.c_str() << dendl
;
650 string k
= make_key(prefix
, to
);
651 m_iter
= m_map_p
->lower_bound(k
);
652 if (m_iter
!= m_map_p
->end()) {