]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonitorDBStore.h
import ceph 14.2.5
[ceph.git] / ceph / src / mon / MonitorDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Inktank, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
15
16 #include "include/types.h"
17 #include "include/buffer.h"
18 #include <set>
19 #include <map>
20 #include <string>
21 #include <boost/scoped_ptr.hpp>
22 #include <sstream>
23 #include <fstream>
24 #include "kv/KeyValueDB.h"
25
26 #include "include/ceph_assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
32 #include "common/blkdev.h"
33 #include "common/PriorityCache.h"
34
35 #define dout_context g_ceph_context
36
37 class MonitorDBStore
38 {
39 string path;
40 boost::scoped_ptr<KeyValueDB> db;
41 bool do_dump;
42 int dump_fd_binary;
43 std::ofstream dump_fd_json;
44 JSONFormatter dump_fmt;
45
46
47 Finisher io_work;
48
49 bool is_open;
50
51 public:
52
53 string get_devname() {
54 char devname[4096] = {0}, partition[4096];
55 get_device_by_path(path.c_str(), partition, devname,
56 sizeof(devname));
57 return devname;
58 }
59
60 std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const {
61 return db->get_priority_cache();
62 }
63
64 struct Op {
65 uint8_t type;
66 string prefix;
67 string key, endkey;
68 bufferlist bl;
69
70 Op()
71 : type(0) { }
72 Op(int t, string p, string k)
73 : type(t), prefix(p), key(k) { }
74 Op(int t, const string& p, string k, bufferlist& b)
75 : type(t), prefix(p), key(k), bl(b) { }
76 Op(int t, const string& p, string start, string end)
77 : type(t), prefix(p), key(start), endkey(end) { }
78
79 void encode(bufferlist& encode_bl) const {
80 ENCODE_START(2, 1, encode_bl);
81 encode(type, encode_bl);
82 encode(prefix, encode_bl);
83 encode(key, encode_bl);
84 encode(bl, encode_bl);
85 encode(endkey, encode_bl);
86 ENCODE_FINISH(encode_bl);
87 }
88
89 void decode(bufferlist::const_iterator& decode_bl) {
90 DECODE_START(2, decode_bl);
91 decode(type, decode_bl);
92 decode(prefix, decode_bl);
93 decode(key, decode_bl);
94 decode(bl, decode_bl);
95 if (struct_v >= 2)
96 decode(endkey, decode_bl);
97 DECODE_FINISH(decode_bl);
98 }
99
100 void dump(Formatter *f) const {
101 f->dump_int("type", type);
102 f->dump_string("prefix", prefix);
103 f->dump_string("key", key);
104 if (endkey.length())
105 f->dump_string("endkey", endkey);
106 }
107
108 static void generate_test_instances(list<Op*>& ls) {
109 ls.push_back(new Op);
110 // we get coverage here from the Transaction instances
111 }
112 };
113
114 struct Transaction;
115 typedef std::shared_ptr<Transaction> TransactionRef;
116 struct Transaction {
117 list<Op> ops;
118 uint64_t bytes, keys;
119
120 Transaction() : bytes(0), keys(0) {}
121
122 enum {
123 OP_PUT = 1,
124 OP_ERASE = 2,
125 OP_COMPACT = 3,
126 };
127
128 void put(string prefix, string key, bufferlist& bl) {
129 ops.push_back(Op(OP_PUT, prefix, key, bl));
130 ++keys;
131 bytes += prefix.length() + key.length() + bl.length();
132 }
133
134 void put(string prefix, version_t ver, bufferlist& bl) {
135 ostringstream os;
136 os << ver;
137 put(prefix, os.str(), bl);
138 }
139
140 void put(string prefix, string key, version_t ver) {
141 using ceph::encode;
142 bufferlist bl;
143 encode(ver, bl);
144 put(prefix, key, bl);
145 }
146
147 void erase(string prefix, string key) {
148 ops.push_back(Op(OP_ERASE, prefix, key));
149 ++keys;
150 bytes += prefix.length() + key.length();
151 }
152
153 void erase(string prefix, version_t ver) {
154 ostringstream os;
155 os << ver;
156 erase(prefix, os.str());
157 }
158
159 void compact_prefix(string prefix) {
160 ops.push_back(Op(OP_COMPACT, prefix, string()));
161 }
162
163 void compact_range(string prefix, string start, string end) {
164 ops.push_back(Op(OP_COMPACT, prefix, start, end));
165 }
166
167 void encode(bufferlist& bl) const {
168 ENCODE_START(2, 1, bl);
169 encode(ops, bl);
170 encode(bytes, bl);
171 encode(keys, bl);
172 ENCODE_FINISH(bl);
173 }
174
175 void decode(bufferlist::const_iterator& bl) {
176 DECODE_START(2, bl);
177 decode(ops, bl);
178 if (struct_v >= 2) {
179 decode(bytes, bl);
180 decode(keys, bl);
181 }
182 DECODE_FINISH(bl);
183 }
184
185 static void generate_test_instances(list<Transaction*>& ls) {
186 ls.push_back(new Transaction);
187 ls.push_back(new Transaction);
188 bufferlist bl;
189 bl.append("value");
190 ls.back()->put("prefix", "key", bl);
191 ls.back()->erase("prefix2", "key2");
192 ls.back()->compact_prefix("prefix3");
193 ls.back()->compact_range("prefix4", "from", "to");
194 }
195
196 void append(TransactionRef other) {
197 ops.splice(ops.end(), other->ops);
198 keys += other->keys;
199 bytes += other->bytes;
200 }
201
202 void append_from_encoded(bufferlist& bl) {
203 auto other(std::make_shared<Transaction>());
204 auto it = bl.cbegin();
205 other->decode(it);
206 append(other);
207 }
208
209 bool empty() {
210 return (size() == 0);
211 }
212
213 size_t size() const {
214 return ops.size();
215 }
216 uint64_t get_keys() const {
217 return keys;
218 }
219 uint64_t get_bytes() const {
220 return bytes;
221 }
222
223 void dump(ceph::Formatter *f, bool dump_val=false) const {
224 f->open_object_section("transaction");
225 f->open_array_section("ops");
226 list<Op>::const_iterator it;
227 int op_num = 0;
228 for (it = ops.begin(); it != ops.end(); ++it) {
229 const Op& op = *it;
230 f->open_object_section("op");
231 f->dump_int("op_num", op_num++);
232 switch (op.type) {
233 case OP_PUT:
234 {
235 f->dump_string("type", "PUT");
236 f->dump_string("prefix", op.prefix);
237 f->dump_string("key", op.key);
238 f->dump_unsigned("length", op.bl.length());
239 if (dump_val) {
240 ostringstream os;
241 op.bl.hexdump(os);
242 f->dump_string("bl", os.str());
243 }
244 }
245 break;
246 case OP_ERASE:
247 {
248 f->dump_string("type", "ERASE");
249 f->dump_string("prefix", op.prefix);
250 f->dump_string("key", op.key);
251 }
252 break;
253 case OP_COMPACT:
254 {
255 f->dump_string("type", "COMPACT");
256 f->dump_string("prefix", op.prefix);
257 f->dump_string("start", op.key);
258 f->dump_string("end", op.endkey);
259 }
260 break;
261 default:
262 {
263 f->dump_string("type", "unknown");
264 f->dump_unsigned("op_code", op.type);
265 break;
266 }
267 }
268 f->close_section();
269 }
270 f->close_section();
271 f->dump_unsigned("num_keys", keys);
272 f->dump_unsigned("num_bytes", bytes);
273 f->close_section();
274 }
275 };
276
277 int apply_transaction(MonitorDBStore::TransactionRef t) {
278 KeyValueDB::Transaction dbt = db->get_transaction();
279
280 if (do_dump) {
281 if (!g_conf()->mon_debug_dump_json) {
282 bufferlist bl;
283 t->encode(bl);
284 bl.write_fd(dump_fd_binary);
285 } else {
286 t->dump(&dump_fmt, true);
287 dump_fmt.flush(dump_fd_json);
288 dump_fd_json.flush();
289 }
290 }
291
292 list<pair<string, pair<string,string> > > compact;
293 for (list<Op>::const_iterator it = t->ops.begin();
294 it != t->ops.end();
295 ++it) {
296 const Op& op = *it;
297 switch (op.type) {
298 case Transaction::OP_PUT:
299 dbt->set(op.prefix, op.key, op.bl);
300 break;
301 case Transaction::OP_ERASE:
302 dbt->rmkey(op.prefix, op.key);
303 break;
304 case Transaction::OP_COMPACT:
305 compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
306 break;
307 default:
308 derr << __func__ << " unknown op type " << op.type << dendl;
309 ceph_abort();
310 break;
311 }
312 }
313 int r = db->submit_transaction_sync(dbt);
314 if (r >= 0) {
315 while (!compact.empty()) {
316 if (compact.front().second.first == string() &&
317 compact.front().second.second == string())
318 db->compact_prefix_async(compact.front().first);
319 else
320 db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
321 compact.pop_front();
322 }
323 } else {
324 ceph_abort_msg("failed to write to db");
325 }
326 return r;
327 }
328
329 struct C_DoTransaction : public Context {
330 MonitorDBStore *store;
331 MonitorDBStore::TransactionRef t;
332 Context *oncommit;
333 C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
334 Context *f)
335 : store(s), t(t), oncommit(f)
336 {}
337 void finish(int r) override {
338 /* The store serializes writes. Each transaction is handled
339 * sequentially by the io_work Finisher. If a transaction takes longer
340 * to apply its state to permanent storage, then no other transaction
341 * will be handled meanwhile.
342 *
343 * We will now randomly inject random delays. We can safely sleep prior
344 * to applying the transaction as it won't break the model.
345 */
346 double delay_prob = g_conf()->mon_inject_transaction_delay_probability;
347 if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
348 utime_t delay;
349 double delay_max = g_conf()->mon_inject_transaction_delay_max;
350 delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
351 lsubdout(g_ceph_context, mon, 1)
352 << "apply_transaction will be delayed for " << delay
353 << " seconds" << dendl;
354 delay.sleep();
355 }
356 int ret = store->apply_transaction(t);
357 oncommit->complete(ret);
358 }
359 };
360
361 /**
362 * queue transaction
363 *
364 * Queue a transaction to commit asynchronously. Trigger a context
365 * on completion (without any locks held).
366 */
367 void queue_transaction(MonitorDBStore::TransactionRef t,
368 Context *oncommit) {
369 io_work.queue(new C_DoTransaction(this, t, oncommit));
370 }
371
372 /**
373 * block and flush all io activity
374 */
375 void flush() {
376 io_work.wait_for_empty();
377 }
378
379 class StoreIteratorImpl {
380 protected:
381 bool done;
382 pair<string,string> last_key;
383 bufferlist crc_bl;
384
385 StoreIteratorImpl() : done(false) { }
386 virtual ~StoreIteratorImpl() { }
387
388 bool add_chunk_entry(TransactionRef tx,
389 string &prefix,
390 string &key,
391 bufferlist &value,
392 uint64_t max) {
393 auto tmp(std::make_shared<Transaction>());
394 bufferlist tmp_bl;
395 tmp->put(prefix, key, value);
396 tmp->encode(tmp_bl);
397
398 bufferlist tx_bl;
399 tx->encode(tx_bl);
400
401 size_t len = tx_bl.length() + tmp_bl.length();
402
403 if (!tx->empty() && (len > max)) {
404 return false;
405 }
406
407 tx->append(tmp);
408 last_key.first = prefix;
409 last_key.second = key;
410
411 if (g_conf()->mon_sync_debug) {
412 encode(prefix, crc_bl);
413 encode(key, crc_bl);
414 encode(value, crc_bl);
415 }
416
417 return true;
418 }
419
420 virtual bool _is_valid() = 0;
421
422 public:
423 __u32 crc() {
424 if (g_conf()->mon_sync_debug)
425 return crc_bl.crc32c(0);
426 return 0;
427 }
428 pair<string,string> get_last_key() {
429 return last_key;
430 }
431 virtual bool has_next_chunk() {
432 return !done && _is_valid();
433 }
434 virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
435 virtual pair<string,string> get_next_key() = 0;
436 };
437 typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
438
439 class WholeStoreIteratorImpl : public StoreIteratorImpl {
440 KeyValueDB::WholeSpaceIterator iter;
441 set<string> sync_prefixes;
442
443 public:
444 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
445 set<string> &prefixes)
446 : StoreIteratorImpl(),
447 iter(iter),
448 sync_prefixes(prefixes)
449 { }
450
451 ~WholeStoreIteratorImpl() override { }
452
453 /**
454 * Obtain a chunk of the store
455 *
456 * @param bl Encoded transaction that will recreate the chunk
457 * @param first_key Pair containing the first key to obtain, and that
458 * will contain the first key in the chunk (that may
459 * differ from the one passed on to the function)
460 * @param last_key[out] Last key in the chunk
461 */
462 void get_chunk_tx(TransactionRef tx, uint64_t max) override {
463 ceph_assert(done == false);
464 ceph_assert(iter->valid() == true);
465
466 while (iter->valid()) {
467 string prefix(iter->raw_key().first);
468 string key(iter->raw_key().second);
469 if (sync_prefixes.count(prefix)) {
470 bufferlist value = iter->value();
471 if (!add_chunk_entry(tx, prefix, key, value, max))
472 return;
473 }
474 iter->next();
475 }
476 ceph_assert(iter->valid() == false);
477 done = true;
478 }
479
480 pair<string,string> get_next_key() override {
481 ceph_assert(iter->valid());
482
483 for (; iter->valid(); iter->next()) {
484 pair<string,string> r = iter->raw_key();
485 if (sync_prefixes.count(r.first) > 0) {
486 iter->next();
487 return r;
488 }
489 }
490 return pair<string,string>();
491 }
492
493 bool _is_valid() override {
494 return iter->valid();
495 }
496 };
497
498 Synchronizer get_synchronizer(pair<string,string> &key,
499 set<string> &prefixes) {
500 KeyValueDB::WholeSpaceIterator iter;
501 iter = db->get_wholespace_iterator();
502
503 if (!key.first.empty() && !key.second.empty())
504 iter->upper_bound(key.first, key.second);
505 else
506 iter->seek_to_first();
507
508 return std::shared_ptr<StoreIteratorImpl>(
509 new WholeStoreIteratorImpl(iter, prefixes)
510 );
511 }
512
513 KeyValueDB::Iterator get_iterator(const string &prefix) {
514 ceph_assert(!prefix.empty());
515 KeyValueDB::Iterator iter = db->get_iterator(prefix);
516 iter->seek_to_first();
517 return iter;
518 }
519
520 KeyValueDB::WholeSpaceIterator get_iterator() {
521 KeyValueDB::WholeSpaceIterator iter;
522 iter = db->get_wholespace_iterator();
523 iter->seek_to_first();
524 return iter;
525 }
526
527 int get(const string& prefix, const string& key, bufferlist& bl) {
528 ceph_assert(bl.length() == 0);
529 return db->get(prefix, key, &bl);
530 }
531
532 int get(const string& prefix, const version_t ver, bufferlist& bl) {
533 ostringstream os;
534 os << ver;
535 return get(prefix, os.str(), bl);
536 }
537
538 version_t get(const string& prefix, const string& key) {
539 bufferlist bl;
540 int err = get(prefix, key, bl);
541 if (err < 0) {
542 if (err == -ENOENT) // if key doesn't exist, assume its value is 0
543 return 0;
544 // we're not expecting any other negative return value, and we can't
545 // just return a negative value if we're returning a version_t
546 generic_dout(0) << "MonitorDBStore::get() error obtaining"
547 << " (" << prefix << ":" << key << "): "
548 << cpp_strerror(err) << dendl;
549 ceph_abort_msg("error obtaining key");
550 }
551
552 ceph_assert(bl.length());
553 version_t ver;
554 auto p = bl.cbegin();
555 decode(ver, p);
556 return ver;
557 }
558
559 bool exists(const string& prefix, const string& key) {
560 KeyValueDB::Iterator it = db->get_iterator(prefix);
561 int err = it->lower_bound(key);
562 if (err < 0)
563 return false;
564
565 return (it->valid() && it->key() == key);
566 }
567
568 bool exists(const string& prefix, version_t ver) {
569 ostringstream os;
570 os << ver;
571 return exists(prefix, os.str());
572 }
573
574 string combine_strings(const string& prefix, const string& value) {
575 string out = prefix;
576 out.push_back('_');
577 out.append(value);
578 return out;
579 }
580
581 string combine_strings(const string& prefix, const version_t ver) {
582 ostringstream os;
583 os << ver;
584 return combine_strings(prefix, os.str());
585 }
586
587 void clear(set<string>& prefixes) {
588 set<string>::iterator iter;
589 KeyValueDB::Transaction dbt = db->get_transaction();
590
591 for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
592 dbt->rmkeys_by_prefix((*iter));
593 }
594 int r = db->submit_transaction_sync(dbt);
595 ceph_assert(r >= 0);
596 }
597
598 void _open(string kv_type) {
599 string::const_reverse_iterator rit;
600 int pos = 0;
601 for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
602 if (*rit != '/')
603 break;
604 }
605 ostringstream os;
606 os << path.substr(0, path.size() - pos) << "/store.db";
607 string full_path = os.str();
608
609 KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
610 kv_type,
611 full_path);
612 if (!db_ptr) {
613 derr << __func__ << " error initializing "
614 << kv_type << " db back storage in "
615 << full_path << dendl;
616 ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
617 }
618 db.reset(db_ptr);
619
620 if (g_conf()->mon_debug_dump_transactions) {
621 if (!g_conf()->mon_debug_dump_json) {
622 dump_fd_binary = ::open(
623 g_conf()->mon_debug_dump_location.c_str(),
624 O_CREAT|O_APPEND|O_WRONLY|O_CLOEXEC, 0644);
625 if (dump_fd_binary < 0) {
626 dump_fd_binary = -errno;
627 derr << "Could not open log file, got "
628 << cpp_strerror(dump_fd_binary) << dendl;
629 }
630 } else {
631 dump_fmt.reset();
632 dump_fmt.open_array_section("dump");
633 dump_fd_json.open(g_conf()->mon_debug_dump_location.c_str());
634 }
635 do_dump = true;
636 }
637 if (kv_type == "rocksdb")
638 db->init(g_conf()->mon_rocksdb_options);
639 else
640 db->init();
641
642
643 }
644
645 int open(ostream &out) {
646 string kv_type;
647 int r = read_meta("kv_backend", &kv_type);
648 if (r < 0 || kv_type.empty()) {
649 // assume old monitors that did not mark the type were leveldb.
650 kv_type = "leveldb";
651 r = write_meta("kv_backend", kv_type);
652 if (r < 0)
653 return r;
654 }
655 _open(kv_type);
656 r = db->open(out);
657 if (r < 0)
658 return r;
659
660 // Monitors are few in number, so the resource cost of exposing
661 // very detailed stats is low: ramp up the priority of all the
662 // KV store's perf counters. Do this after open, because backend may
663 // not have constructed PerfCounters earlier.
664 if (db->get_perf_counters()) {
665 db->get_perf_counters()->set_prio_adjust(
666 PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
667 }
668
669 io_work.start();
670 is_open = true;
671 return 0;
672 }
673
674 int create_and_open(ostream &out) {
675 // record the type before open
676 string kv_type;
677 int r = read_meta("kv_backend", &kv_type);
678 if (r < 0) {
679 kv_type = g_conf()->mon_keyvaluedb;
680 r = write_meta("kv_backend", kv_type);
681 if (r < 0)
682 return r;
683 }
684 _open(kv_type);
685 r = db->create_and_open(out);
686 if (r < 0)
687 return r;
688 io_work.start();
689 is_open = true;
690 return 0;
691 }
692
693 void close() {
694 // there should be no work queued!
695 io_work.stop();
696 is_open = false;
697 db.reset(NULL);
698 }
699
700 void compact() {
701 db->compact();
702 }
703
704 void compact_async() {
705 db->compact_async();
706 }
707
708 void compact_prefix(const string& prefix) {
709 db->compact_prefix(prefix);
710 }
711
712 uint64_t get_estimated_size(map<string, uint64_t> &extras) {
713 return db->get_estimated_size(extras);
714 }
715
716 /**
717 * write_meta - write a simple configuration key out-of-band
718 *
719 * Write a simple key/value pair for basic store configuration
720 * (e.g., a uuid or magic number) to an unopened/unmounted store.
721 * The default implementation writes this to a plaintext file in the
722 * path.
723 *
724 * A newline is appended.
725 *
726 * @param key key name (e.g., "fsid")
727 * @param value value (e.g., a uuid rendered as a string)
728 * @returns 0 for success, or an error code
729 */
730 int write_meta(const std::string& key,
731 const std::string& value) const {
732 string v = value;
733 v += "\n";
734 int r = safe_write_file(path.c_str(), key.c_str(),
735 v.c_str(), v.length(),
736 0600);
737 if (r < 0)
738 return r;
739 return 0;
740 }
741
742 /**
743 * read_meta - read a simple configuration key out-of-band
744 *
745 * Read a simple key value to an unopened/mounted store.
746 *
747 * Trailing whitespace is stripped off.
748 *
749 * @param key key name
750 * @param value pointer to value string
751 * @returns 0 for success, or an error code
752 */
753 int read_meta(const std::string& key,
754 std::string *value) const {
755 char buf[4096];
756 int r = safe_read_file(path.c_str(), key.c_str(),
757 buf, sizeof(buf));
758 if (r <= 0)
759 return r;
760 // drop trailing newlines
761 while (r && isspace(buf[r-1])) {
762 --r;
763 }
764 *value = string(buf, r);
765 return 0;
766 }
767
768 explicit MonitorDBStore(const string& path)
769 : path(path),
770 db(0),
771 do_dump(false),
772 dump_fd_binary(-1),
773 dump_fmt(true),
774 io_work(g_ceph_context, "monstore", "fn_monstore"),
775 is_open(false) {
776 }
777 ~MonitorDBStore() {
778 ceph_assert(!is_open);
779 if (do_dump) {
780 if (!g_conf()->mon_debug_dump_json) {
781 ::close(dump_fd_binary);
782 } else {
783 dump_fmt.close_section();
784 dump_fmt.flush(dump_fd_json);
785 dump_fd_json.flush();
786 dump_fd_json.close();
787 }
788 }
789 }
790
791 };
792
793 WRITE_CLASS_ENCODER(MonitorDBStore::Op)
794 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
795
796 #endif /* CEPH_MONITOR_DB_STORE_H */