]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/MemDB.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / kv / MemDB.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * In-memory crash non-safe keyvalue db
5 * Author: Ramesh Chander, Ramesh.Chander@sandisk.com
6 */
7
8 #include "include/compat.h"
9 #include <set>
10 #include <map>
11 #include <string>
12 #include <memory>
13 #if __has_include(<filesystem>)
14 #include <filesystem>
15 namespace fs = std::filesystem;
16 #elif __has_include(<experimental/filesystem>)
17 #include <experimental/filesystem>
18 namespace fs = std::experimental::filesystem;
19 #endif
20 #include <errno.h>
21 #include <unistd.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24
25 #include "common/perf_counters.h"
26 #include "common/debug.h"
27 #include "include/str_list.h"
28 #include "include/str_map.h"
29 #include "KeyValueDB.h"
30 #include "MemDB.h"
31
32 #include "include/ceph_assert.h"
33 #include "common/debug.h"
34 #include "common/errno.h"
35 #include "include/buffer.h"
36 #include "include/buffer_raw.h"
37 #include "include/compat.h"
38
39 #define dout_context g_ceph_context
40 #define dout_subsys ceph_subsys_memdb
41 #undef dout_prefix
42 #define dout_prefix *_dout << "memdb: "
43 #define dtrace dout(30)
44 #define dwarn dout(0)
45 #define dinfo dout(0)
46
47 using std::cerr;
48 using std::ostream;
49 using std::string;
50 using std::vector;
51
52 using ceph::bufferlist;
53 using ceph::bufferptr;
54 using ceph::decode;
55 using ceph::encode;
56
57 static void split_key(const string& raw_key, string *prefix, string *key)
58 {
59 size_t pos = raw_key.find(KEY_DELIM, 0);
60 ceph_assert(pos != std::string::npos);
61 *prefix = raw_key.substr(0, pos);
62 *key = raw_key.substr(pos + 1, raw_key.length());
63 }
64
65 static string make_key(const string &prefix, const string &value)
66 {
67 string out = prefix;
68 out.push_back(KEY_DELIM);
69 out.append(value);
70 return out;
71 }
72
73 void MemDB::_encode(mdb_iter_t iter, bufferlist &bl)
74 {
75 encode(iter->first, bl);
76 encode(iter->second, bl);
77 }
78
79 std::string MemDB::_get_data_fn()
80 {
81 string fn = m_db_path + "/" + "MemDB.db";
82 return fn;
83 }
84
85 void MemDB::_save()
86 {
87 std::lock_guard<std::mutex> l(m_lock);
88 dout(10) << __func__ << " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl;
89 int mode = 0644;
90 int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(),
91 O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode));
92 if (fd < 0) {
93 int err = errno;
94 cerr << "write_file(" << _get_data_fn().c_str() << "): failed to open file: "
95 << cpp_strerror(err) << std::endl;
96 return;
97 }
98 bufferlist bl;
99 mdb_iter_t iter = m_map.begin();
100 while (iter != m_map.end()) {
101 dout(10) << __func__ << " Key:"<< iter->first << dendl;
102 _encode(iter, bl);
103 ++iter;
104 }
105 bl.write_fd(fd);
106
107 VOID_TEMP_FAILURE_RETRY(::close(fd));
108 }
109
110 int MemDB::_load()
111 {
112 std::lock_guard<std::mutex> l(m_lock);
113 dout(10) << __func__ << " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl;
114 /*
115 * Open file and read it in single shot.
116 */
117 int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY|O_CLOEXEC));
118 if (fd < 0) {
119 int err = errno;
120 cerr << "can't open " << _get_data_fn().c_str() << ": "
121 << cpp_strerror(err) << std::endl;
122 return -err;
123 }
124
125 struct stat st;
126 memset(&st, 0, sizeof(st));
127 if (::fstat(fd, &st) < 0) {
128 int err = errno;
129 cerr << "can't stat file " << _get_data_fn().c_str() << ": "
130 << cpp_strerror(err) << std::endl;
131 VOID_TEMP_FAILURE_RETRY(::close(fd));
132 return -err;
133 }
134
135 ssize_t file_size = st.st_size;
136 ssize_t bytes_done = 0;
137 while (bytes_done < file_size) {
138 string key;
139 bufferptr datap;
140
141 bytes_done += ceph::decode_file(fd, key);
142 bytes_done += ceph::decode_file(fd, datap);
143
144 dout(10) << __func__ << " Key:"<< key << dendl;
145 m_map[key] = datap;
146 m_total_bytes += datap.length();
147 }
148 VOID_TEMP_FAILURE_RETRY(::close(fd));
149 return 0;
150 }
151
152 int MemDB::_init(bool create)
153 {
154 int r = 0;
155 dout(1) << __func__ << dendl;
156 if (create) {
157 if (fs::exists(m_db_path)) {
158 r = 0; // ignore EEXIST
159 } else {
160 std::error_code ec;
161 if (!fs::create_directory(m_db_path, ec)) {
162 derr << __func__ << " mkdir failed: " << ec.message() << dendl;
163 return -ec.value();
164 }
165 fs::permissions(m_db_path, fs::perms::owner_all);
166 }
167 } else {
168 r = _load();
169 }
170
171 PerfCountersBuilder plb(g_ceph_context, "memdb", l_memdb_first, l_memdb_last);
172 plb.add_u64_counter(l_memdb_gets, "get", "Gets");
173 plb.add_u64_counter(l_memdb_txns, "submit_transaction", "Submit transactions");
174 plb.add_time_avg(l_memdb_get_latency, "get_latency", "Get latency");
175 plb.add_time_avg(l_memdb_submit_latency, "submit_latency", "Submit Latency");
176 logger = plb.create_perf_counters();
177 m_cct->get_perfcounters_collection()->add(logger);
178
179 return r;
180 }
181
182 int MemDB::set_merge_operator(
183 const string& prefix,
184 std::shared_ptr<KeyValueDB::MergeOperator> mop)
185 {
186 merge_ops.push_back(std::make_pair(prefix, mop));
187 return 0;
188 }
189
190 int MemDB::do_open(ostream &out, bool create)
191 {
192 m_total_bytes = 0;
193 m_allocated_bytes = 1;
194
195 return _init(create);
196 }
197
198 int MemDB::open(ostream &out, const std::string& cfs) {
199 if (!cfs.empty()) {
200 ceph_abort_msg("Not implemented");
201 }
202 return do_open(out, false);
203 }
204
205 int MemDB::create_and_open(ostream &out, const std::string& cfs) {
206 if (!cfs.empty()) {
207 ceph_abort_msg("Not implemented");
208 }
209 return do_open(out, true);
210 }
211
212 MemDB::~MemDB()
213 {
214 close();
215 dout(10) << __func__ << " Destroying MemDB instance: "<< dendl;
216 }
217
218 void MemDB::close()
219 {
220 /*
221 * Save whatever in memory btree.
222 */
223 _save();
224 if (logger)
225 m_cct->get_perfcounters_collection()->remove(logger);
226 }
227
228 int MemDB::submit_transaction(KeyValueDB::Transaction t)
229 {
230 utime_t start = ceph_clock_now();
231
232 MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get());
233
234 dtrace << __func__ << " " << mt->get_ops().size() << dendl;
235 for(auto& op : mt->get_ops()) {
236 if(op.first == MDBTransactionImpl::WRITE) {
237 ms_op_t set_op = op.second;
238 _setkey(set_op);
239 } else if (op.first == MDBTransactionImpl::MERGE) {
240 ms_op_t merge_op = op.second;
241 _merge(merge_op);
242 } else {
243 ms_op_t rm_op = op.second;
244 ceph_assert(op.first == MDBTransactionImpl::DELETE);
245 _rmkey(rm_op);
246 }
247 }
248
249 utime_t lat = ceph_clock_now() - start;
250 logger->inc(l_memdb_txns);
251 logger->tinc(l_memdb_submit_latency, lat);
252
253 return 0;
254 }
255
256 int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync)
257 {
258 dtrace << __func__ << " " << dendl;
259 submit_transaction(tsync);
260 return 0;
261 }
262
263 int MemDB::transaction_rollback(KeyValueDB::Transaction t)
264 {
265 MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get());
266 mt->clear();
267 return 0;
268 }
269
270 void MemDB::MDBTransactionImpl::set(
271 const string &prefix, const string &k, const bufferlist &to_set_bl)
272 {
273 dtrace << __func__ << " " << prefix << " " << k << dendl;
274 ops.push_back(make_pair(WRITE, std::make_pair(std::make_pair(prefix, k),
275 to_set_bl)));
276 }
277
278 void MemDB::MDBTransactionImpl::rmkey(const string &prefix,
279 const string &k)
280 {
281 dtrace << __func__ << " " << prefix << " " << k << dendl;
282 ops.push_back(make_pair(DELETE,
283 std::make_pair(std::make_pair(prefix, k),
284 bufferlist())));
285 }
286
287 void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
288 {
289 KeyValueDB::Iterator it = m_db->get_iterator(prefix);
290 for (it->seek_to_first(); it->valid(); it->next()) {
291 rmkey(prefix, it->key());
292 }
293 }
294
295 void MemDB::MDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
296 {
297 KeyValueDB::Iterator it = m_db->get_iterator(prefix);
298 it->lower_bound(start);
299 while (it->valid()) {
300 if (it->key() >= end) {
301 break;
302 }
303 rmkey(prefix, it->key());
304 it->next();
305 }
306 }
307
308 void MemDB::MDBTransactionImpl::merge(
309 const std::string &prefix, const std::string &key, const bufferlist &value)
310 {
311
312 dtrace << __func__ << " " << prefix << " " << key << dendl;
313 ops.push_back(make_pair(MERGE, make_pair(std::make_pair(prefix, key), value)));
314 return;
315 }
316
317 int MemDB::_setkey(ms_op_t &op)
318 {
319 std::lock_guard<std::mutex> l(m_lock);
320 std::string key = make_key(op.first.first, op.first.second);
321 bufferlist bl = op.second;
322
323 m_total_bytes += bl.length();
324
325 bufferlist bl_old;
326 if (_get(op.first.first, op.first.second, &bl_old)) {
327 /*
328 * delete and free existing key.
329 */
330 ceph_assert(m_total_bytes >= bl_old.length());
331 m_total_bytes -= bl_old.length();
332 m_map.erase(key);
333 }
334
335 m_map[key] = bufferptr((char *) bl.c_str(), bl.length());
336 iterator_seq_no++;
337 return 0;
338 }
339
340 int MemDB::_rmkey(ms_op_t &op)
341 {
342 std::lock_guard<std::mutex> l(m_lock);
343 std::string key = make_key(op.first.first, op.first.second);
344
345 bufferlist bl_old;
346 if (_get(op.first.first, op.first.second, &bl_old)) {
347 ceph_assert(m_total_bytes >= bl_old.length());
348 m_total_bytes -= bl_old.length();
349 }
350 iterator_seq_no++;
351 /*
352 * Erase will call the destructor for bufferptr.
353 */
354 return m_map.erase(key);
355 }
356
357 std::shared_ptr<KeyValueDB::MergeOperator> MemDB::_find_merge_op(const std::string &prefix)
358 {
359 for (const auto& i : merge_ops) {
360 if (i.first == prefix) {
361 return i.second;
362 }
363 }
364
365 dtrace << __func__ << " No merge op for " << prefix << dendl;
366 return NULL;
367 }
368
369
370 int MemDB::_merge(ms_op_t &op)
371 {
372 std::lock_guard<std::mutex> l(m_lock);
373 std::string prefix = op.first.first;
374 std::string key = make_key(op.first.first, op.first.second);
375 bufferlist bl = op.second;
376 int64_t bytes_adjusted = bl.length();
377
378 /*
379 * find the operator for this prefix
380 */
381 std::shared_ptr<MergeOperator> mop = _find_merge_op(prefix);
382 ceph_assert(mop);
383
384 /*
385 * call the merge operator with value and non value
386 */
387 bufferlist bl_old;
388 if (_get(op.first.first, op.first.second, &bl_old) == false) {
389 std::string new_val;
390 /*
391 * Merge non existent.
392 */
393 mop->merge_nonexistent(bl.c_str(), bl.length(), &new_val);
394 m_map[key] = bufferptr(new_val.c_str(), new_val.length());
395 } else {
396 /*
397 * Merge existing.
398 */
399 std::string new_val;
400 mop->merge(bl_old.c_str(), bl_old.length(), bl.c_str(), bl.length(), &new_val);
401 m_map[key] = bufferptr(new_val.c_str(), new_val.length());
402 bytes_adjusted -= bl_old.length();
403 bl_old.clear();
404 }
405
406 ceph_assert((int64_t)m_total_bytes + bytes_adjusted >= 0);
407 m_total_bytes += bytes_adjusted;
408 iterator_seq_no++;
409 return 0;
410 }
411
412 /*
413 * Caller take btree lock.
414 */
415 bool MemDB::_get(const string &prefix, const string &k, bufferlist *out)
416 {
417 string key = make_key(prefix, k);
418
419 mdb_iter_t iter = m_map.find(key);
420 if (iter == m_map.end()) {
421 return false;
422 }
423
424 out->push_back((m_map[key].clone()));
425 return true;
426 }
427
428 bool MemDB::_get_locked(const string &prefix, const string &k, bufferlist *out)
429 {
430 std::lock_guard<std::mutex> l(m_lock);
431 return _get(prefix, k, out);
432 }
433
434
435 int MemDB::get(const string &prefix, const std::string& key,
436 bufferlist *out)
437 {
438 utime_t start = ceph_clock_now();
439 int ret;
440
441 if (_get_locked(prefix, key, out)) {
442 ret = 0;
443 } else {
444 ret = -ENOENT;
445 }
446
447 utime_t lat = ceph_clock_now() - start;
448 logger->inc(l_memdb_gets);
449 logger->tinc(l_memdb_get_latency, lat);
450
451 return ret;
452 }
453
454 int MemDB::get(const string &prefix, const std::set<string> &keys,
455 std::map<string, bufferlist> *out)
456 {
457 utime_t start = ceph_clock_now();
458
459 for (const auto& i : keys) {
460 bufferlist bl;
461 if (_get_locked(prefix, i, &bl))
462 out->insert(make_pair(i, bl));
463 }
464
465 utime_t lat = ceph_clock_now() - start;
466 logger->inc(l_memdb_gets);
467 logger->tinc(l_memdb_get_latency, lat);
468
469 return 0;
470 }
471
472 void MemDB::MDBWholeSpaceIteratorImpl::fill_current()
473 {
474 bufferlist bl;
475 bl.push_back(m_iter->second.clone());
476 m_key_value = std::make_pair(m_iter->first, bl);
477 }
478
479 bool MemDB::MDBWholeSpaceIteratorImpl::valid()
480 {
481 if (m_key_value.first.empty()) {
482 return false;
483 }
484 return true;
485 }
486
487 bool MemDB::MDBWholeSpaceIteratorImpl::iterator_validate() {
488
489 if (this_seq_no != *global_seq_no) {
490 auto key = m_key_value.first;
491 ceph_assert(!key.empty());
492
493 bool restart_iter = false;
494 if (!m_using_btree) {
495 /*
496 * Map is modified and marker key does not exists,
497 * restart the iterator from next key.
498 */
499 if (m_map_p->find(key) == m_map_p->end()) {
500 restart_iter = true;
501 }
502 } else {
503 restart_iter = true;
504 }
505
506 if (restart_iter) {
507 m_iter = m_map_p->lower_bound(key);
508 if (m_iter == m_map_p->end()) {
509 return false;
510 }
511 }
512
513 /*
514 * This iter is valid now.
515 */
516 this_seq_no = *global_seq_no;
517 }
518
519 return true;
520 }
521
522 void
523 MemDB::MDBWholeSpaceIteratorImpl::free_last()
524 {
525 m_key_value.first.clear();
526 m_key_value.second.clear();
527 }
528
529 string MemDB::MDBWholeSpaceIteratorImpl::key()
530 {
531 dtrace << __func__ << " " << m_key_value.first << dendl;
532 string prefix, key;
533 split_key(m_key_value.first, &prefix, &key);
534 return key;
535 }
536
537 std::pair<string,string> MemDB::MDBWholeSpaceIteratorImpl::raw_key()
538 {
539 string prefix, key;
540 split_key(m_key_value.first, &prefix, &key);
541 return { prefix, key };
542 }
543
544 bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed(
545 const string &prefix)
546 {
547 string p, k;
548 split_key(m_key_value.first, &p, &k);
549 return (p == prefix);
550 }
551
552 bufferlist MemDB::MDBWholeSpaceIteratorImpl::value()
553 {
554 dtrace << __func__ << " " << m_key_value << dendl;
555 return m_key_value.second;
556 }
557
558 int MemDB::MDBWholeSpaceIteratorImpl::next()
559 {
560 std::lock_guard<std::mutex> l(*m_map_lock_p);
561 if (!iterator_validate()) {
562 free_last();
563 return -1;
564 }
565 free_last();
566 ++m_iter;
567 if (m_iter != m_map_p->end()) {
568 fill_current();
569 return 0;
570 } else {
571 return -1;
572 }
573 }
574
575 int MemDB::MDBWholeSpaceIteratorImpl:: prev()
576 {
577 std::lock_guard<std::mutex> l(*m_map_lock_p);
578 if (!iterator_validate()) {
579 free_last();
580 return -1;
581 }
582 free_last();
583 if (m_iter != m_map_p->begin()) {
584 --m_iter;
585 fill_current();
586 return 0;
587 } else {
588 return -1;
589 }
590 }
591
592 /*
593 * First key >= to given key, if key is null then first key in btree.
594 */
595 int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string &k)
596 {
597 std::lock_guard<std::mutex> l(*m_map_lock_p);
598 free_last();
599 if (k.empty()) {
600 m_iter = m_map_p->begin();
601 } else {
602 m_iter = m_map_p->lower_bound(k);
603 }
604
605 if (m_iter == m_map_p->end()) {
606 return -1;
607 }
608 fill_current();
609 return 0;
610 }
611
612 int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string &k)
613 {
614 std::lock_guard<std::mutex> l(*m_map_lock_p);
615 free_last();
616 if (k.empty()) {
617 m_iter = m_map_p->end();
618 --m_iter;
619 } else {
620 m_iter = m_map_p->lower_bound(k);
621 }
622
623 if (m_iter == m_map_p->end()) {
624 return -1;
625 }
626 fill_current();
627 return 0;
628 }
629
630 MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl()
631 {
632 free_last();
633 }
634
635 int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string &prefix,
636 const std::string &after) {
637
638 std::lock_guard<std::mutex> l(*m_map_lock_p);
639
640 dtrace << "upper_bound " << prefix.c_str() << after.c_str() << dendl;
641 string k = make_key(prefix, after);
642 m_iter = m_map_p->upper_bound(k);
643 if (m_iter != m_map_p->end()) {
644 fill_current();
645 return 0;
646 }
647 return -1;
648 }
649
650 int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string &prefix,
651 const std::string &to) {
652 std::lock_guard<std::mutex> l(*m_map_lock_p);
653 dtrace << "lower_bound " << prefix.c_str() << to.c_str() << dendl;
654 string k = make_key(prefix, to);
655 m_iter = m_map_p->lower_bound(k);
656 if (m_iter != m_map_p->end()) {
657 fill_current();
658 return 0;
659 }
660 return -1;
661 }