]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonitorDBStore.h
515a047279781743cd638611875c9de5825bd16a
[ceph.git] / ceph / src / mon / MonitorDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Inktank, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
15
16 #include "include/types.h"
17 #include "include/buffer.h"
18 #include <set>
19 #include <map>
20 #include <string>
21 #include <boost/scoped_ptr.hpp>
22 #include <sstream>
23 #include <fstream>
24 #include "kv/KeyValueDB.h"
25
26 #include "include/ceph_assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
32 #include "common/blkdev.h"
33 #include "common/PriorityCache.h"
34
35 #define dout_context g_ceph_context
36
37 class MonitorDBStore
38 {
39 std::string path;
40 boost::scoped_ptr<KeyValueDB> db;
41 bool do_dump;
42 int dump_fd_binary;
43 std::ofstream dump_fd_json;
44 ceph::JSONFormatter dump_fmt;
45
46
47 Finisher io_work;
48
49 bool is_open;
50
51 public:
52
53 std::string get_devname() {
54 char devname[4096] = {0}, partition[4096];
55 get_device_by_path(path.c_str(), partition, devname,
56 sizeof(devname));
57 return devname;
58 }
59
60 std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const {
61 return db->get_priority_cache();
62 }
63
64 struct Op {
65 uint8_t type;
66 std::string prefix;
67 std::string key, endkey;
68 ceph::buffer::list bl;
69
70 Op()
71 : type(0) { }
72 Op(int t, const std::string& p, const std::string& k)
73 : type(t), prefix(p), key(k) { }
74 Op(int t, const std::string& p, const std::string& k, const ceph::buffer::list& b)
75 : type(t), prefix(p), key(k), bl(b) { }
76 Op(int t, const std::string& p, const std::string& start, const std::string& end)
77 : type(t), prefix(p), key(start), endkey(end) { }
78
79 void encode(ceph::buffer::list& encode_bl) const {
80 ENCODE_START(2, 1, encode_bl);
81 encode(type, encode_bl);
82 encode(prefix, encode_bl);
83 encode(key, encode_bl);
84 encode(bl, encode_bl);
85 encode(endkey, encode_bl);
86 ENCODE_FINISH(encode_bl);
87 }
88
89 void decode(ceph::buffer::list::const_iterator& decode_bl) {
90 DECODE_START(2, decode_bl);
91 decode(type, decode_bl);
92 decode(prefix, decode_bl);
93 decode(key, decode_bl);
94 decode(bl, decode_bl);
95 if (struct_v >= 2)
96 decode(endkey, decode_bl);
97 DECODE_FINISH(decode_bl);
98 }
99
100 void dump(ceph::Formatter *f) const {
101 f->dump_int("type", type);
102 f->dump_string("prefix", prefix);
103 f->dump_string("key", key);
104 if (endkey.length()) {
105 f->dump_string("endkey", endkey);
106 }
107 }
108
109 int approx_size() const {
110 return 6 + 1 +
111 4 + prefix.size() +
112 4 + key.size() +
113 4 + endkey.size() +
114 4 + bl.length();
115 }
116
117 static void generate_test_instances(std::list<Op*>& ls) {
118 ls.push_back(new Op);
119 // we get coverage here from the Transaction instances
120 }
121 };
122
123 struct Transaction;
124 typedef std::shared_ptr<Transaction> TransactionRef;
125 struct Transaction {
126 std::list<Op> ops;
127 uint64_t bytes, keys;
128
129 Transaction() : bytes(6 + 4 + 8*2), keys(0) {}
130
131 enum {
132 OP_PUT = 1,
133 OP_ERASE = 2,
134 OP_COMPACT = 3,
135 OP_ERASE_RANGE = 4,
136 };
137
138 void put(const std::string& prefix, const std::string& key, const ceph::buffer::list& bl) {
139 ops.push_back(Op(OP_PUT, prefix, key, bl));
140 ++keys;
141 bytes += ops.back().approx_size();
142 }
143
144 void put(const std::string& prefix, version_t ver, const ceph::buffer::list& bl) {
145 std::ostringstream os;
146 os << ver;
147 put(prefix, os.str(), bl);
148 }
149
150 void put(const std::string& prefix, const std::string& key, version_t ver) {
151 using ceph::encode;
152 ceph::buffer::list bl;
153 encode(ver, bl);
154 put(prefix, key, bl);
155 }
156
157 void erase(const std::string& prefix, const std::string& key) {
158 ops.push_back(Op(OP_ERASE, prefix, key));
159 ++keys;
160 bytes += ops.back().approx_size();
161 }
162
163 void erase(const std::string& prefix, version_t ver) {
164 std::ostringstream os;
165 os << ver;
166 erase(prefix, os.str());
167 }
168
169 void erase_range(const std::string& prefix, const std::string& begin,
170 const std::string& end) {
171 ops.push_back(Op(OP_ERASE_RANGE, prefix, begin, end));
172 ++keys;
173 bytes += ops.back().approx_size();
174 }
175
176 void compact_prefix(const std::string& prefix) {
177 ops.push_back(Op(OP_COMPACT, prefix, {}));
178 }
179
180 void compact_range(const std::string& prefix, const std::string& start,
181 const std::string& end) {
182 ops.push_back(Op(OP_COMPACT, prefix, start, end));
183 }
184
185 void encode(ceph::buffer::list& bl) const {
186 ENCODE_START(2, 1, bl);
187 encode(ops, bl);
188 encode(bytes, bl);
189 encode(keys, bl);
190 ENCODE_FINISH(bl);
191 }
192
193 void decode(ceph::buffer::list::const_iterator& bl) {
194 DECODE_START(2, bl);
195 decode(ops, bl);
196 if (struct_v >= 2) {
197 decode(bytes, bl);
198 decode(keys, bl);
199 }
200 DECODE_FINISH(bl);
201 }
202
203 static void generate_test_instances(std::list<Transaction*>& ls) {
204 ls.push_back(new Transaction);
205 ls.push_back(new Transaction);
206 ceph::buffer::list bl;
207 bl.append("value");
208 ls.back()->put("prefix", "key", bl);
209 ls.back()->erase("prefix2", "key2");
210 ls.back()->erase_range("prefix3", "key3", "key4");
211 ls.back()->compact_prefix("prefix3");
212 ls.back()->compact_range("prefix4", "from", "to");
213 }
214
215 void append(TransactionRef other) {
216 ops.splice(ops.end(), other->ops);
217 keys += other->keys;
218 bytes += other->bytes;
219 }
220
221 void append_from_encoded(ceph::buffer::list& bl) {
222 auto other(std::make_shared<Transaction>());
223 auto it = bl.cbegin();
224 other->decode(it);
225 append(other);
226 }
227
228 bool empty() {
229 return (size() == 0);
230 }
231
232 size_t size() const {
233 return ops.size();
234 }
235 uint64_t get_keys() const {
236 return keys;
237 }
238 uint64_t get_bytes() const {
239 return bytes;
240 }
241
242 void dump(ceph::Formatter *f, bool dump_val=false) const {
243 f->open_object_section("transaction");
244 f->open_array_section("ops");
245 int op_num = 0;
246 for (auto it = ops.begin(); it != ops.end(); ++it) {
247 const Op& op = *it;
248 f->open_object_section("op");
249 f->dump_int("op_num", op_num++);
250 switch (op.type) {
251 case OP_PUT:
252 {
253 f->dump_string("type", "PUT");
254 f->dump_string("prefix", op.prefix);
255 f->dump_string("key", op.key);
256 f->dump_unsigned("length", op.bl.length());
257 if (dump_val) {
258 std::ostringstream os;
259 op.bl.hexdump(os);
260 f->dump_string("bl", os.str());
261 }
262 }
263 break;
264 case OP_ERASE:
265 {
266 f->dump_string("type", "ERASE");
267 f->dump_string("prefix", op.prefix);
268 f->dump_string("key", op.key);
269 }
270 break;
271 case OP_ERASE_RANGE:
272 {
273 f->dump_string("type", "ERASE_RANGE");
274 f->dump_string("prefix", op.prefix);
275 f->dump_string("start", op.key);
276 f->dump_string("end", op.endkey);
277 }
278 break;
279 case OP_COMPACT:
280 {
281 f->dump_string("type", "COMPACT");
282 f->dump_string("prefix", op.prefix);
283 f->dump_string("start", op.key);
284 f->dump_string("end", op.endkey);
285 }
286 break;
287 default:
288 {
289 f->dump_string("type", "unknown");
290 f->dump_unsigned("op_code", op.type);
291 break;
292 }
293 }
294 f->close_section();
295 }
296 f->close_section();
297 f->dump_unsigned("num_keys", keys);
298 f->dump_unsigned("num_bytes", bytes);
299 f->close_section();
300 }
301 };
302
303 int apply_transaction(MonitorDBStore::TransactionRef t) {
304 KeyValueDB::Transaction dbt = db->get_transaction();
305
306 if (do_dump) {
307 if (!g_conf()->mon_debug_dump_json) {
308 ceph::buffer::list bl;
309 t->encode(bl);
310 bl.write_fd(dump_fd_binary);
311 } else {
312 t->dump(&dump_fmt, true);
313 dump_fmt.flush(dump_fd_json);
314 dump_fd_json.flush();
315 }
316 }
317
318 std::list<std::pair<std::string, std::pair<std::string,std::string>>> compact;
319 for (auto it = t->ops.begin(); it != t->ops.end(); ++it) {
320 const Op& op = *it;
321 switch (op.type) {
322 case Transaction::OP_PUT:
323 dbt->set(op.prefix, op.key, op.bl);
324 break;
325 case Transaction::OP_ERASE:
326 dbt->rmkey(op.prefix, op.key);
327 break;
328 case Transaction::OP_ERASE_RANGE:
329 dbt->rm_range_keys(op.prefix, op.key, op.endkey);
330 break;
331 case Transaction::OP_COMPACT:
332 compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
333 break;
334 default:
335 derr << __func__ << " unknown op type " << op.type << dendl;
336 ceph_abort();
337 break;
338 }
339 }
340 int r = db->submit_transaction_sync(dbt);
341 if (r >= 0) {
342 while (!compact.empty()) {
343 if (compact.front().second.first == std::string() &&
344 compact.front().second.second == std::string())
345 db->compact_prefix_async(compact.front().first);
346 else
347 db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
348 compact.pop_front();
349 }
350 } else {
351 ceph_abort_msg("failed to write to db");
352 }
353 return r;
354 }
355
356 struct C_DoTransaction : public Context {
357 MonitorDBStore *store;
358 MonitorDBStore::TransactionRef t;
359 Context *oncommit;
360 C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
361 Context *f)
362 : store(s), t(t), oncommit(f)
363 {}
364 void finish(int r) override {
365 /* The store serializes writes. Each transaction is handled
366 * sequentially by the io_work Finisher. If a transaction takes longer
367 * to apply its state to permanent storage, then no other transaction
368 * will be handled meanwhile.
369 *
370 * We will now randomly inject random delays. We can safely sleep prior
371 * to applying the transaction as it won't break the model.
372 */
373 double delay_prob = g_conf()->mon_inject_transaction_delay_probability;
374 if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
375 utime_t delay;
376 double delay_max = g_conf()->mon_inject_transaction_delay_max;
377 delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
378 lsubdout(g_ceph_context, mon, 1)
379 << "apply_transaction will be delayed for " << delay
380 << " seconds" << dendl;
381 delay.sleep();
382 }
383 int ret = store->apply_transaction(t);
384 oncommit->complete(ret);
385 }
386 };
387
388 /**
389 * queue transaction
390 *
391 * Queue a transaction to commit asynchronously. Trigger a context
392 * on completion (without any locks held).
393 */
394 void queue_transaction(MonitorDBStore::TransactionRef t,
395 Context *oncommit) {
396 io_work.queue(new C_DoTransaction(this, t, oncommit));
397 }
398
399 /**
400 * block and flush all io activity
401 */
402 void flush() {
403 io_work.wait_for_empty();
404 }
405
406 class StoreIteratorImpl {
407 protected:
408 bool done;
409 std::pair<std::string,std::string> last_key;
410 ceph::buffer::list crc_bl;
411
412 StoreIteratorImpl() : done(false) { }
413 virtual ~StoreIteratorImpl() { }
414
415 virtual bool _is_valid() = 0;
416
417 public:
418 __u32 crc() {
419 if (g_conf()->mon_sync_debug)
420 return crc_bl.crc32c(0);
421 return 0;
422 }
423 std::pair<std::string,std::string> get_last_key() {
424 return last_key;
425 }
426 virtual bool has_next_chunk() {
427 return !done && _is_valid();
428 }
429 virtual void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
430 uint64_t max_keys) = 0;
431 virtual std::pair<std::string,std::string> get_next_key() = 0;
432 };
433 typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
434
435 class WholeStoreIteratorImpl : public StoreIteratorImpl {
436 KeyValueDB::WholeSpaceIterator iter;
437 std::set<std::string> sync_prefixes;
438
439 public:
440 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
441 std::set<std::string> &prefixes)
442 : StoreIteratorImpl(),
443 iter(iter),
444 sync_prefixes(prefixes)
445 { }
446
447 ~WholeStoreIteratorImpl() override { }
448
449 /**
450 * Obtain a chunk of the store
451 *
452 * @param bl Encoded transaction that will recreate the chunk
453 * @param first_key Pair containing the first key to obtain, and that
454 * will contain the first key in the chunk (that may
455 * differ from the one passed on to the function)
456 * @param last_key[out] Last key in the chunk
457 */
458 void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
459 uint64_t max_keys) override {
460 using ceph::encode;
461 ceph_assert(done == false);
462 ceph_assert(iter->valid() == true);
463
464 while (iter->valid()) {
465 std::string prefix(iter->raw_key().first);
466 std::string key(iter->raw_key().second);
467 if (sync_prefixes.count(prefix)) {
468 ceph::buffer::list value = iter->value();
469 if (tx->empty() ||
470 (tx->get_bytes() + value.length() + key.size() +
471 prefix.size() < max_bytes &&
472 tx->get_keys() < max_keys)) {
473 // NOTE: putting every key in a separate transaction is
474 // questionable as far as efficiency goes
475 auto tmp(std::make_shared<Transaction>());
476 tmp->put(prefix, key, value);
477 tx->append(tmp);
478 if (g_conf()->mon_sync_debug) {
479 encode(prefix, crc_bl);
480 encode(key, crc_bl);
481 encode(value, crc_bl);
482 }
483 } else {
484 last_key.first = prefix;
485 last_key.second = key;
486 return;
487 }
488 }
489 iter->next();
490 }
491 ceph_assert(iter->valid() == false);
492 done = true;
493 }
494
495 std::pair<std::string,std::string> get_next_key() override {
496 ceph_assert(iter->valid());
497
498 for (; iter->valid(); iter->next()) {
499 std::pair<std::string,std::string> r = iter->raw_key();
500 if (sync_prefixes.count(r.first) > 0) {
501 iter->next();
502 return r;
503 }
504 }
505 return std::pair<std::string,std::string>();
506 }
507
508 bool _is_valid() override {
509 return iter->valid();
510 }
511 };
512
513 Synchronizer get_synchronizer(std::pair<std::string,std::string> &key,
514 std::set<std::string> &prefixes) {
515 KeyValueDB::WholeSpaceIterator iter;
516 iter = db->get_wholespace_iterator();
517
518 if (!key.first.empty() && !key.second.empty())
519 iter->upper_bound(key.first, key.second);
520 else
521 iter->seek_to_first();
522
523 return std::shared_ptr<StoreIteratorImpl>(
524 new WholeStoreIteratorImpl(iter, prefixes)
525 );
526 }
527
528 KeyValueDB::Iterator get_iterator(const std::string &prefix) {
529 ceph_assert(!prefix.empty());
530 KeyValueDB::Iterator iter = db->get_iterator(prefix);
531 iter->seek_to_first();
532 return iter;
533 }
534
535 KeyValueDB::WholeSpaceIterator get_iterator() {
536 KeyValueDB::WholeSpaceIterator iter;
537 iter = db->get_wholespace_iterator();
538 iter->seek_to_first();
539 return iter;
540 }
541
542 int get(const std::string& prefix, const std::string& key, ceph::buffer::list& bl) {
543 ceph_assert(bl.length() == 0);
544 return db->get(prefix, key, &bl);
545 }
546
547 int get(const std::string& prefix, const version_t ver, ceph::buffer::list& bl) {
548 std::ostringstream os;
549 os << ver;
550 return get(prefix, os.str(), bl);
551 }
552
553 version_t get(const std::string& prefix, const std::string& key) {
554 using ceph::decode;
555 ceph::buffer::list bl;
556 int err = get(prefix, key, bl);
557 if (err < 0) {
558 if (err == -ENOENT) // if key doesn't exist, assume its value is 0
559 return 0;
560 // we're not expecting any other negative return value, and we can't
561 // just return a negative value if we're returning a version_t
562 generic_dout(0) << "MonitorDBStore::get() error obtaining"
563 << " (" << prefix << ":" << key << "): "
564 << cpp_strerror(err) << dendl;
565 ceph_abort_msg("error obtaining key");
566 }
567
568 ceph_assert(bl.length());
569 version_t ver;
570 auto p = bl.cbegin();
571 decode(ver, p);
572 return ver;
573 }
574
575 bool exists(const std::string& prefix, const std::string& key) {
576 KeyValueDB::Iterator it = db->get_iterator(prefix);
577 int err = it->lower_bound(key);
578 if (err < 0)
579 return false;
580
581 return (it->valid() && it->key() == key);
582 }
583
584 bool exists(const std::string& prefix, version_t ver) {
585 std::ostringstream os;
586 os << ver;
587 return exists(prefix, os.str());
588 }
589
590 std::string combine_strings(const std::string& prefix, const std::string& value) {
591 std::string out = prefix;
592 out.push_back('_');
593 out.append(value);
594 return out;
595 }
596
597 std::string combine_strings(const std::string& prefix, const version_t ver) {
598 std::ostringstream os;
599 os << ver;
600 return combine_strings(prefix, os.str());
601 }
602
603 void clear(std::set<std::string>& prefixes) {
604 KeyValueDB::Transaction dbt = db->get_transaction();
605
606 for (auto iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
607 dbt->rmkeys_by_prefix((*iter));
608 }
609 int r = db->submit_transaction_sync(dbt);
610 ceph_assert(r >= 0);
611 }
612
613 void _open(const std::string& kv_type) {
614 int pos = 0;
615 for (auto rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
616 if (*rit != '/')
617 break;
618 }
619 std::ostringstream os;
620 os << path.substr(0, path.size() - pos) << "/store.db";
621 std::string full_path = os.str();
622
623 KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
624 kv_type,
625 full_path);
626 if (!db_ptr) {
627 derr << __func__ << " error initializing "
628 << kv_type << " db back storage in "
629 << full_path << dendl;
630 ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
631 }
632 db.reset(db_ptr);
633
634 if (g_conf()->mon_debug_dump_transactions) {
635 if (!g_conf()->mon_debug_dump_json) {
636 dump_fd_binary = ::open(
637 g_conf()->mon_debug_dump_location.c_str(),
638 O_CREAT|O_APPEND|O_WRONLY|O_CLOEXEC, 0644);
639 if (dump_fd_binary < 0) {
640 dump_fd_binary = -errno;
641 derr << "Could not open log file, got "
642 << cpp_strerror(dump_fd_binary) << dendl;
643 }
644 } else {
645 dump_fmt.reset();
646 dump_fmt.open_array_section("dump");
647 dump_fd_json.open(g_conf()->mon_debug_dump_location.c_str());
648 }
649 do_dump = true;
650 }
651 if (kv_type == "rocksdb")
652 db->init(g_conf()->mon_rocksdb_options);
653 else
654 db->init();
655
656
657 }
658
659 int open(std::ostream &out) {
660 std::string kv_type;
661 int r = read_meta("kv_backend", &kv_type);
662 if (r < 0 || kv_type.empty()) {
663 // assume old monitors that did not mark the type were leveldb.
664 kv_type = "leveldb";
665 r = write_meta("kv_backend", kv_type);
666 if (r < 0)
667 return r;
668 }
669 _open(kv_type);
670 r = db->open(out);
671 if (r < 0)
672 return r;
673
674 // Monitors are few in number, so the resource cost of exposing
675 // very detailed stats is low: ramp up the priority of all the
676 // KV store's perf counters. Do this after open, because backend may
677 // not have constructed PerfCounters earlier.
678 if (db->get_perf_counters()) {
679 db->get_perf_counters()->set_prio_adjust(
680 PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
681 }
682
683 io_work.start();
684 is_open = true;
685 return 0;
686 }
687
688 int create_and_open(std::ostream &out) {
689 // record the type before open
690 std::string kv_type;
691 int r = read_meta("kv_backend", &kv_type);
692 if (r < 0) {
693 kv_type = g_conf()->mon_keyvaluedb;
694 r = write_meta("kv_backend", kv_type);
695 if (r < 0)
696 return r;
697 }
698 _open(kv_type);
699 r = db->create_and_open(out);
700 if (r < 0)
701 return r;
702 io_work.start();
703 is_open = true;
704 return 0;
705 }
706
707 void close() {
708 // there should be no work queued!
709 io_work.stop();
710 is_open = false;
711 db.reset(NULL);
712 }
713
714 void compact() {
715 db->compact();
716 }
717
718 void compact_async() {
719 db->compact_async();
720 }
721
722 void compact_prefix(const std::string& prefix) {
723 db->compact_prefix(prefix);
724 }
725
726 uint64_t get_estimated_size(std::map<std::string, uint64_t> &extras) {
727 return db->get_estimated_size(extras);
728 }
729
730 /**
731 * write_meta - write a simple configuration key out-of-band
732 *
733 * Write a simple key/value pair for basic store configuration
734 * (e.g., a uuid or magic number) to an unopened/unmounted store.
735 * The default implementation writes this to a plaintext file in the
736 * path.
737 *
738 * A newline is appended.
739 *
740 * @param key key name (e.g., "fsid")
741 * @param value value (e.g., a uuid rendered as a string)
742 * @returns 0 for success, or an error code
743 */
744 int write_meta(const std::string& key,
745 const std::string& value) const {
746 std::string v = value;
747 v += "\n";
748 int r = safe_write_file(path.c_str(), key.c_str(),
749 v.c_str(), v.length(),
750 0600);
751 if (r < 0)
752 return r;
753 return 0;
754 }
755
756 /**
757 * read_meta - read a simple configuration key out-of-band
758 *
759 * Read a simple key value to an unopened/mounted store.
760 *
761 * Trailing whitespace is stripped off.
762 *
763 * @param key key name
764 * @param value pointer to value string
765 * @returns 0 for success, or an error code
766 */
767 int read_meta(const std::string& key,
768 std::string *value) const {
769 char buf[4096];
770 int r = safe_read_file(path.c_str(), key.c_str(),
771 buf, sizeof(buf));
772 if (r <= 0)
773 return r;
774 // drop trailing newlines
775 while (r && isspace(buf[r-1])) {
776 --r;
777 }
778 *value = std::string(buf, r);
779 return 0;
780 }
781
782 explicit MonitorDBStore(const std::string& path)
783 : path(path),
784 db(0),
785 do_dump(false),
786 dump_fd_binary(-1),
787 dump_fmt(true),
788 io_work(g_ceph_context, "monstore", "fn_monstore"),
789 is_open(false) {
790 }
791 ~MonitorDBStore() {
792 ceph_assert(!is_open);
793 if (do_dump) {
794 if (!g_conf()->mon_debug_dump_json) {
795 ::close(dump_fd_binary);
796 } else {
797 dump_fmt.close_section();
798 dump_fmt.flush(dump_fd_json);
799 dump_fd_json.flush();
800 dump_fd_json.close();
801 }
802 }
803 }
804
805 };
806
807 WRITE_CLASS_ENCODER(MonitorDBStore::Op)
808 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
809
810 #endif /* CEPH_MONITOR_DB_STORE_H */