]> git.proxmox.com Git - ceph.git/blame - ceph/src/kv/MemDB.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / kv / MemDB.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * In-memory crash non-safe keyvalue db
5 * Author: Ramesh Chander, Ramesh.Chander@sandisk.com
6 */
7
8#include "include/compat.h"
9#include <set>
10#include <map>
11#include <string>
12#include <memory>
13#include <errno.h>
14#include <unistd.h>
15#include <sys/types.h>
16#include <sys/stat.h>
17
18#include "common/perf_counters.h"
19#include "common/debug.h"
20#include "include/str_list.h"
21#include "include/str_map.h"
22#include "KeyValueDB.h"
23#include "MemDB.h"
24
11fdf7f2 25#include "include/ceph_assert.h"
7c673cae
FG
26#include "common/debug.h"
27#include "common/errno.h"
11fdf7f2
TL
28#include "include/buffer.h"
29#include "include/buffer_raw.h"
7c673cae
FG
30#include "include/compat.h"
31
32#define dout_context g_ceph_context
33#define dout_subsys ceph_subsys_memdb
34#undef dout_prefix
35#define dout_prefix *_dout << "memdb: "
36#define dtrace dout(30)
37#define dwarn dout(0)
38#define dinfo dout(0)
39
40static void split_key(const string& raw_key, string *prefix, string *key)
41{
42 size_t pos = raw_key.find(KEY_DELIM, 0);
11fdf7f2 43 ceph_assert(pos != std::string::npos);
7c673cae
FG
44 *prefix = raw_key.substr(0, pos);
45 *key = raw_key.substr(pos + 1, raw_key.length());
46}
47
48static string make_key(const string &prefix, const string &value)
49{
50 string out = prefix;
51 out.push_back(KEY_DELIM);
52 out.append(value);
53 return out;
54}
55
56void MemDB::_encode(mdb_iter_t iter, bufferlist &bl)
57{
11fdf7f2
TL
58 encode(iter->first, bl);
59 encode(iter->second, bl);
7c673cae
FG
60}
61
62std::string MemDB::_get_data_fn()
63{
64 string fn = m_db_path + "/" + "MemDB.db";
65 return fn;
66}
67
68void MemDB::_save()
69{
70 std::lock_guard<std::mutex> l(m_lock);
71 dout(10) << __func__ << " Saving MemDB to file: "<< _get_data_fn().c_str() << dendl;
72 int mode = 0644;
73 int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(),
91327a77 74 O_WRONLY|O_CREAT|O_TRUNC|O_CLOEXEC, mode));
7c673cae
FG
75 if (fd < 0) {
76 int err = errno;
77 cerr << "write_file(" << _get_data_fn().c_str() << "): failed to open file: "
78 << cpp_strerror(err) << std::endl;
79 return;
80 }
81 bufferlist bl;
82 mdb_iter_t iter = m_map.begin();
83 while (iter != m_map.end()) {
84 dout(10) << __func__ << " Key:"<< iter->first << dendl;
85 _encode(iter, bl);
86 ++iter;
87 }
88 bl.write_fd(fd);
89
90 VOID_TEMP_FAILURE_RETRY(::close(fd));
91}
92
93int MemDB::_load()
94{
95 std::lock_guard<std::mutex> l(m_lock);
96 dout(10) << __func__ << " Reading MemDB from file: "<< _get_data_fn().c_str() << dendl;
97 /*
98 * Open file and read it in single shot.
99 */
91327a77 100 int fd = TEMP_FAILURE_RETRY(::open(_get_data_fn().c_str(), O_RDONLY|O_CLOEXEC));
7c673cae
FG
101 if (fd < 0) {
102 int err = errno;
103 cerr << "can't open " << _get_data_fn().c_str() << ": "
104 << cpp_strerror(err) << std::endl;
105 return -err;
106 }
107
108 struct stat st;
109 memset(&st, 0, sizeof(st));
110 if (::fstat(fd, &st) < 0) {
111 int err = errno;
112 cerr << "can't stat file " << _get_data_fn().c_str() << ": "
113 << cpp_strerror(err) << std::endl;
114 VOID_TEMP_FAILURE_RETRY(::close(fd));
115 return -err;
116 }
117
118 ssize_t file_size = st.st_size;
119 ssize_t bytes_done = 0;
120 while (bytes_done < file_size) {
121 string key;
122 bufferptr datap;
123
124 bytes_done += ::decode_file(fd, key);
125 bytes_done += ::decode_file(fd, datap);
126
127 dout(10) << __func__ << " Key:"<< key << dendl;
128 m_map[key] = datap;
129 m_total_bytes += datap.length();
130 }
131 VOID_TEMP_FAILURE_RETRY(::close(fd));
132 return 0;
133}
134
135int MemDB::_init(bool create)
136{
137 int r;
138 dout(1) << __func__ << dendl;
139 if (create) {
140 r = ::mkdir(m_db_path.c_str(), 0700);
141 if (r < 0) {
142 r = -errno;
143 if (r != -EEXIST) {
11fdf7f2
TL
144 derr << __func__ << " mkdir failed: " << cpp_strerror(r) << dendl;
145 return r;
7c673cae 146 }
11fdf7f2 147 r = 0; // ignore EEXIST
7c673cae
FG
148 }
149 } else {
150 r = _load();
151 }
152
11fdf7f2
TL
153 PerfCountersBuilder plb(g_ceph_context, "memdb", l_memdb_first, l_memdb_last);
154 plb.add_u64_counter(l_memdb_gets, "get", "Gets");
155 plb.add_u64_counter(l_memdb_txns, "submit_transaction", "Submit transactions");
156 plb.add_time_avg(l_memdb_get_latency, "get_latency", "Get latency");
157 plb.add_time_avg(l_memdb_submit_latency, "submit_latency", "Submit Latency");
158 logger = plb.create_perf_counters();
159 m_cct->get_perfcounters_collection()->add(logger);
160
7c673cae
FG
161 return r;
162}
163
164int MemDB::set_merge_operator(
165 const string& prefix,
166 std::shared_ptr<KeyValueDB::MergeOperator> mop)
167{
168 merge_ops.push_back(std::make_pair(prefix, mop));
169 return 0;
170}
171
172int MemDB::do_open(ostream &out, bool create)
173{
174 m_total_bytes = 0;
175 m_allocated_bytes = 1;
176
177 return _init(create);
178}
179
11fdf7f2
TL
180int MemDB::open(ostream &out, const vector<ColumnFamily>& cfs) {
181 if (!cfs.empty()) {
182 ceph_abort_msg("Not implemented");
183 }
184 return do_open(out, false);
185}
186
187int MemDB::create_and_open(ostream &out, const vector<ColumnFamily>& cfs) {
188 if (!cfs.empty()) {
189 ceph_abort_msg("Not implemented");
190 }
191 return do_open(out, true);
192}
193
7c673cae
FG
194MemDB::~MemDB()
195{
196 close();
197 dout(10) << __func__ << " Destroying MemDB instance: "<< dendl;
198}
199
200void MemDB::close()
201{
202 /*
203 * Save whatever in memory btree.
204 */
205 _save();
11fdf7f2
TL
206 if (logger)
207 m_cct->get_perfcounters_collection()->remove(logger);
7c673cae
FG
208}
209
210int MemDB::submit_transaction(KeyValueDB::Transaction t)
211{
11fdf7f2
TL
212 utime_t start = ceph_clock_now();
213
7c673cae
FG
214 MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get());
215
216 dtrace << __func__ << " " << mt->get_ops().size() << dendl;
217 for(auto& op : mt->get_ops()) {
218 if(op.first == MDBTransactionImpl::WRITE) {
219 ms_op_t set_op = op.second;
220 _setkey(set_op);
221 } else if (op.first == MDBTransactionImpl::MERGE) {
222 ms_op_t merge_op = op.second;
223 _merge(merge_op);
224 } else {
225 ms_op_t rm_op = op.second;
11fdf7f2 226 ceph_assert(op.first == MDBTransactionImpl::DELETE);
7c673cae
FG
227 _rmkey(rm_op);
228 }
229 }
230
11fdf7f2
TL
231 utime_t lat = ceph_clock_now() - start;
232 logger->inc(l_memdb_txns);
233 logger->tinc(l_memdb_submit_latency, lat);
234
7c673cae
FG
235 return 0;
236}
237
238int MemDB::submit_transaction_sync(KeyValueDB::Transaction tsync)
239{
240 dtrace << __func__ << " " << dendl;
241 submit_transaction(tsync);
242 return 0;
243}
244
245int MemDB::transaction_rollback(KeyValueDB::Transaction t)
246{
247 MDBTransactionImpl* mt = static_cast<MDBTransactionImpl*>(t.get());
248 mt->clear();
249 return 0;
250}
251
252void MemDB::MDBTransactionImpl::set(
253 const string &prefix, const string &k, const bufferlist &to_set_bl)
254{
255 dtrace << __func__ << " " << prefix << " " << k << dendl;
256 ops.push_back(make_pair(WRITE, std::make_pair(std::make_pair(prefix, k),
257 to_set_bl)));
258}
259
260void MemDB::MDBTransactionImpl::rmkey(const string &prefix,
261 const string &k)
262{
263 dtrace << __func__ << " " << prefix << " " << k << dendl;
264 ops.push_back(make_pair(DELETE,
265 std::make_pair(std::make_pair(prefix, k),
266 bufferlist())));
267}
268
269void MemDB::MDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
270{
271 KeyValueDB::Iterator it = m_db->get_iterator(prefix);
272 for (it->seek_to_first(); it->valid(); it->next()) {
273 rmkey(prefix, it->key());
274 }
275}
276
277void MemDB::MDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
278{
279 KeyValueDB::Iterator it = m_db->get_iterator(prefix);
280 it->lower_bound(start);
281 while (it->valid()) {
282 if (it->key() >= end) {
283 break;
284 }
285 rmkey(prefix, it->key());
286 it->next();
287 }
288}
289
290void MemDB::MDBTransactionImpl::merge(
291 const std::string &prefix, const std::string &key, const bufferlist &value)
292{
293
294 dtrace << __func__ << " " << prefix << " " << key << dendl;
295 ops.push_back(make_pair(MERGE, make_pair(std::make_pair(prefix, key), value)));
296 return;
297}
298
299int MemDB::_setkey(ms_op_t &op)
300{
301 std::lock_guard<std::mutex> l(m_lock);
302 std::string key = make_key(op.first.first, op.first.second);
303 bufferlist bl = op.second;
304
305 m_total_bytes += bl.length();
306
307 bufferlist bl_old;
308 if (_get(op.first.first, op.first.second, &bl_old)) {
309 /*
310 * delete and free existing key.
311 */
11fdf7f2 312 ceph_assert(m_total_bytes >= bl_old.length());
7c673cae
FG
313 m_total_bytes -= bl_old.length();
314 m_map.erase(key);
315 }
316
317 m_map[key] = bufferptr((char *) bl.c_str(), bl.length());
318 iterator_seq_no++;
319 return 0;
320}
321
322int MemDB::_rmkey(ms_op_t &op)
323{
324 std::lock_guard<std::mutex> l(m_lock);
325 std::string key = make_key(op.first.first, op.first.second);
326
327 bufferlist bl_old;
328 if (_get(op.first.first, op.first.second, &bl_old)) {
11fdf7f2 329 ceph_assert(m_total_bytes >= bl_old.length());
7c673cae
FG
330 m_total_bytes -= bl_old.length();
331 }
332 iterator_seq_no++;
333 /*
334 * Erase will call the destructor for bufferptr.
335 */
336 return m_map.erase(key);
337}
338
11fdf7f2 339std::shared_ptr<KeyValueDB::MergeOperator> MemDB::_find_merge_op(const std::string &prefix)
7c673cae
FG
340{
341 for (const auto& i : merge_ops) {
342 if (i.first == prefix) {
343 return i.second;
344 }
345 }
346
347 dtrace << __func__ << " No merge op for " << prefix << dendl;
348 return NULL;
349}
350
351
352int MemDB::_merge(ms_op_t &op)
353{
354 std::lock_guard<std::mutex> l(m_lock);
355 std::string prefix = op.first.first;
356 std::string key = make_key(op.first.first, op.first.second);
357 bufferlist bl = op.second;
358 int64_t bytes_adjusted = bl.length();
359
360 /*
361 * find the operator for this prefix
362 */
363 std::shared_ptr<MergeOperator> mop = _find_merge_op(prefix);
11fdf7f2 364 ceph_assert(mop);
7c673cae
FG
365
366 /*
367 * call the merge operator with value and non value
368 */
369 bufferlist bl_old;
370 if (_get(op.first.first, op.first.second, &bl_old) == false) {
371 std::string new_val;
372 /*
373 * Merge non existent.
374 */
375 mop->merge_nonexistent(bl.c_str(), bl.length(), &new_val);
376 m_map[key] = bufferptr(new_val.c_str(), new_val.length());
377 } else {
378 /*
379 * Merge existing.
380 */
381 std::string new_val;
382 mop->merge(bl_old.c_str(), bl_old.length(), bl.c_str(), bl.length(), &new_val);
383 m_map[key] = bufferptr(new_val.c_str(), new_val.length());
384 bytes_adjusted -= bl_old.length();
385 bl_old.clear();
386 }
387
11fdf7f2 388 ceph_assert((int64_t)m_total_bytes + bytes_adjusted >= 0);
7c673cae
FG
389 m_total_bytes += bytes_adjusted;
390 iterator_seq_no++;
391 return 0;
392}
393
394/*
395 * Caller take btree lock.
396 */
397bool MemDB::_get(const string &prefix, const string &k, bufferlist *out)
398{
399 string key = make_key(prefix, k);
400
401 mdb_iter_t iter = m_map.find(key);
402 if (iter == m_map.end()) {
403 return false;
404 }
405
406 out->push_back((m_map[key].clone()));
407 return true;
408}
409
410bool MemDB::_get_locked(const string &prefix, const string &k, bufferlist *out)
411{
412 std::lock_guard<std::mutex> l(m_lock);
413 return _get(prefix, k, out);
414}
415
416
417int MemDB::get(const string &prefix, const std::string& key,
418 bufferlist *out)
419{
11fdf7f2
TL
420 utime_t start = ceph_clock_now();
421 int ret;
422
7c673cae 423 if (_get_locked(prefix, key, out)) {
11fdf7f2
TL
424 ret = 0;
425 } else {
426 ret = -ENOENT;
7c673cae 427 }
11fdf7f2
TL
428
429 utime_t lat = ceph_clock_now() - start;
430 logger->inc(l_memdb_gets);
431 logger->tinc(l_memdb_get_latency, lat);
432
433 return ret;
7c673cae
FG
434}
435
436int MemDB::get(const string &prefix, const std::set<string> &keys,
437 std::map<string, bufferlist> *out)
438{
11fdf7f2
TL
439 utime_t start = ceph_clock_now();
440
7c673cae
FG
441 for (const auto& i : keys) {
442 bufferlist bl;
443 if (_get_locked(prefix, i, &bl))
444 out->insert(make_pair(i, bl));
445 }
446
11fdf7f2
TL
447 utime_t lat = ceph_clock_now() - start;
448 logger->inc(l_memdb_gets);
449 logger->tinc(l_memdb_get_latency, lat);
450
7c673cae
FG
451 return 0;
452}
453
454void MemDB::MDBWholeSpaceIteratorImpl::fill_current()
455{
456 bufferlist bl;
11fdf7f2 457 bl.push_back(m_iter->second.clone());
7c673cae
FG
458 m_key_value = std::make_pair(m_iter->first, bl);
459}
460
461bool MemDB::MDBWholeSpaceIteratorImpl::valid()
462{
463 if (m_key_value.first.empty()) {
464 return false;
465 }
466 return true;
467}
468
469bool MemDB::MDBWholeSpaceIteratorImpl::iterator_validate() {
470
471 if (this_seq_no != *global_seq_no) {
472 auto key = m_key_value.first;
11fdf7f2 473 ceph_assert(!key.empty());
7c673cae
FG
474
475 bool restart_iter = false;
476 if (!m_using_btree) {
477 /*
478 * Map is modified and marker key does not exists,
479 * restart the iterator from next key.
480 */
481 if (m_map_p->find(key) == m_map_p->end()) {
482 restart_iter = true;
483 }
484 } else {
485 restart_iter = true;
486 }
487
488 if (restart_iter) {
489 m_iter = m_map_p->lower_bound(key);
490 if (m_iter == m_map_p->end()) {
491 return false;
492 }
493 }
494
495 /*
496 * This iter is valid now.
497 */
498 this_seq_no = *global_seq_no;
499 }
500
501 return true;
502}
503
504void
505MemDB::MDBWholeSpaceIteratorImpl::free_last()
506{
507 m_key_value.first.clear();
508 m_key_value.second.clear();
509}
510
511string MemDB::MDBWholeSpaceIteratorImpl::key()
512{
513 dtrace << __func__ << " " << m_key_value.first << dendl;
514 string prefix, key;
515 split_key(m_key_value.first, &prefix, &key);
516 return key;
517}
518
519pair<string,string> MemDB::MDBWholeSpaceIteratorImpl::raw_key()
520{
521 string prefix, key;
522 split_key(m_key_value.first, &prefix, &key);
523 return make_pair(prefix, key);
524}
525
526bool MemDB::MDBWholeSpaceIteratorImpl::raw_key_is_prefixed(
527 const string &prefix)
528{
529 string p, k;
530 split_key(m_key_value.first, &p, &k);
531 return (p == prefix);
532}
533
534bufferlist MemDB::MDBWholeSpaceIteratorImpl::value()
535{
536 dtrace << __func__ << " " << m_key_value << dendl;
537 return m_key_value.second;
538}
539
540int MemDB::MDBWholeSpaceIteratorImpl::next()
541{
542 std::lock_guard<std::mutex> l(*m_map_lock_p);
543 if (!iterator_validate()) {
544 free_last();
545 return -1;
546 }
547 free_last();
548 ++m_iter;
549 if (m_iter != m_map_p->end()) {
550 fill_current();
551 return 0;
552 } else {
553 return -1;
554 }
555}
556
557int MemDB::MDBWholeSpaceIteratorImpl:: prev()
558{
559 std::lock_guard<std::mutex> l(*m_map_lock_p);
560 if (!iterator_validate()) {
561 free_last();
562 return -1;
563 }
564 free_last();
565 if (m_iter != m_map_p->begin()) {
566 --m_iter;
567 fill_current();
568 return 0;
569 } else {
570 return -1;
571 }
572}
573
574/*
575 * First key >= to given key, if key is null then first key in btree.
576 */
577int MemDB::MDBWholeSpaceIteratorImpl::seek_to_first(const std::string &k)
578{
579 std::lock_guard<std::mutex> l(*m_map_lock_p);
580 free_last();
581 if (k.empty()) {
582 m_iter = m_map_p->begin();
583 } else {
584 m_iter = m_map_p->lower_bound(k);
585 }
586
587 if (m_iter == m_map_p->end()) {
588 return -1;
589 }
590 fill_current();
591 return 0;
592}
593
594int MemDB::MDBWholeSpaceIteratorImpl::seek_to_last(const std::string &k)
595{
596 std::lock_guard<std::mutex> l(*m_map_lock_p);
597 free_last();
598 if (k.empty()) {
599 m_iter = m_map_p->end();
600 --m_iter;
601 } else {
602 m_iter = m_map_p->lower_bound(k);
603 }
604
605 if (m_iter == m_map_p->end()) {
606 return -1;
607 }
608 fill_current();
609 return 0;
610}
611
612MemDB::MDBWholeSpaceIteratorImpl::~MDBWholeSpaceIteratorImpl()
613{
614 free_last();
615}
616
617int MemDB::MDBWholeSpaceIteratorImpl::upper_bound(const std::string &prefix,
618 const std::string &after) {
619
620 std::lock_guard<std::mutex> l(*m_map_lock_p);
621
622 dtrace << "upper_bound " << prefix.c_str() << after.c_str() << dendl;
623 string k = make_key(prefix, after);
624 m_iter = m_map_p->upper_bound(k);
625 if (m_iter != m_map_p->end()) {
626 fill_current();
627 return 0;
628 }
629 return -1;
630}
631
632int MemDB::MDBWholeSpaceIteratorImpl::lower_bound(const std::string &prefix,
633 const std::string &to) {
634 std::lock_guard<std::mutex> l(*m_map_lock_p);
635 dtrace << "lower_bound " << prefix.c_str() << to.c_str() << dendl;
636 string k = make_key(prefix, to);
637 m_iter = m_map_p->lower_bound(k);
638 if (m_iter != m_map_p->end()) {
639 fill_current();
640 return 0;
641 }
642 return -1;
643}