]> git.proxmox.com Git - ceph.git/blob - ceph/src/mon/MonitorDBStore.h
update sources to v12.1.0
[ceph.git] / ceph / src / mon / MonitorDBStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2012 Inktank, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
15
16 #include "include/types.h"
17 #include "include/buffer.h"
18 #include <set>
19 #include <map>
20 #include <string>
21 #include <boost/scoped_ptr.hpp>
22 #include <sstream>
23 #include <fstream>
24 #include "kv/KeyValueDB.h"
25
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
32
33 #define dout_context g_ceph_context
34
35 class MonitorDBStore
36 {
37 string path;
38 boost::scoped_ptr<KeyValueDB> db;
39 bool do_dump;
40 int dump_fd_binary;
41 std::ofstream dump_fd_json;
42 JSONFormatter dump_fmt;
43
44
45 Finisher io_work;
46
47 bool is_open;
48
49 public:
50
51 struct Op {
52 uint8_t type;
53 string prefix;
54 string key, endkey;
55 bufferlist bl;
56
57 Op()
58 : type(0) { }
59 Op(int t, string p, string k)
60 : type(t), prefix(p), key(k) { }
61 Op(int t, const string& p, string k, bufferlist& b)
62 : type(t), prefix(p), key(k), bl(b) { }
63 Op(int t, const string& p, string start, string end)
64 : type(t), prefix(p), key(start), endkey(end) { }
65
66 void encode(bufferlist& encode_bl) const {
67 ENCODE_START(2, 1, encode_bl);
68 ::encode(type, encode_bl);
69 ::encode(prefix, encode_bl);
70 ::encode(key, encode_bl);
71 ::encode(bl, encode_bl);
72 ::encode(endkey, encode_bl);
73 ENCODE_FINISH(encode_bl);
74 }
75
76 void decode(bufferlist::iterator& decode_bl) {
77 DECODE_START(2, decode_bl);
78 ::decode(type, decode_bl);
79 ::decode(prefix, decode_bl);
80 ::decode(key, decode_bl);
81 ::decode(bl, decode_bl);
82 if (struct_v >= 2)
83 ::decode(endkey, decode_bl);
84 DECODE_FINISH(decode_bl);
85 }
86
87 void dump(Formatter *f) const {
88 f->dump_int("type", type);
89 f->dump_string("prefix", prefix);
90 f->dump_string("key", key);
91 if (endkey.length())
92 f->dump_string("endkey", endkey);
93 }
94
95 static void generate_test_instances(list<Op*>& ls) {
96 ls.push_back(new Op);
97 // we get coverage here from the Transaction instances
98 }
99 };
100
101 struct Transaction;
102 typedef ceph::shared_ptr<Transaction> TransactionRef;
103 struct Transaction {
104 list<Op> ops;
105 uint64_t bytes, keys;
106
107 Transaction() : bytes(0), keys(0) {}
108
109 enum {
110 OP_PUT = 1,
111 OP_ERASE = 2,
112 OP_COMPACT = 3,
113 };
114
115 void put(string prefix, string key, bufferlist& bl) {
116 ops.push_back(Op(OP_PUT, prefix, key, bl));
117 ++keys;
118 bytes += prefix.length() + key.length() + bl.length();
119 }
120
121 void put(string prefix, version_t ver, bufferlist& bl) {
122 ostringstream os;
123 os << ver;
124 put(prefix, os.str(), bl);
125 }
126
127 void put(string prefix, string key, version_t ver) {
128 bufferlist bl;
129 ::encode(ver, bl);
130 put(prefix, key, bl);
131 }
132
133 void erase(string prefix, string key) {
134 ops.push_back(Op(OP_ERASE, prefix, key));
135 ++keys;
136 bytes += prefix.length() + key.length();
137 }
138
139 void erase(string prefix, version_t ver) {
140 ostringstream os;
141 os << ver;
142 erase(prefix, os.str());
143 }
144
145 void compact_prefix(string prefix) {
146 ops.push_back(Op(OP_COMPACT, prefix, string()));
147 }
148
149 void compact_range(string prefix, string start, string end) {
150 ops.push_back(Op(OP_COMPACT, prefix, start, end));
151 }
152
153 void encode(bufferlist& bl) const {
154 ENCODE_START(2, 1, bl);
155 ::encode(ops, bl);
156 ::encode(bytes, bl);
157 ::encode(keys, bl);
158 ENCODE_FINISH(bl);
159 }
160
161 void decode(bufferlist::iterator& bl) {
162 DECODE_START(2, bl);
163 ::decode(ops, bl);
164 if (struct_v >= 2) {
165 ::decode(bytes, bl);
166 ::decode(keys, bl);
167 }
168 DECODE_FINISH(bl);
169 }
170
171 static void generate_test_instances(list<Transaction*>& ls) {
172 ls.push_back(new Transaction);
173 ls.push_back(new Transaction);
174 bufferlist bl;
175 bl.append("value");
176 ls.back()->put("prefix", "key", bl);
177 ls.back()->erase("prefix2", "key2");
178 ls.back()->compact_prefix("prefix3");
179 ls.back()->compact_range("prefix4", "from", "to");
180 }
181
182 void append(TransactionRef other) {
183 ops.splice(ops.end(), other->ops);
184 keys += other->keys;
185 bytes += other->bytes;
186 }
187
188 void append_from_encoded(bufferlist& bl) {
189 auto other(std::make_shared<Transaction>());
190 bufferlist::iterator it = bl.begin();
191 other->decode(it);
192 append(other);
193 }
194
195 bool empty() {
196 return (size() == 0);
197 }
198
199 size_t size() const {
200 return ops.size();
201 }
202 uint64_t get_keys() const {
203 return keys;
204 }
205 uint64_t get_bytes() const {
206 return bytes;
207 }
208
209 void dump(ceph::Formatter *f, bool dump_val=false) const {
210 f->open_object_section("transaction");
211 f->open_array_section("ops");
212 list<Op>::const_iterator it;
213 int op_num = 0;
214 for (it = ops.begin(); it != ops.end(); ++it) {
215 const Op& op = *it;
216 f->open_object_section("op");
217 f->dump_int("op_num", op_num++);
218 switch (op.type) {
219 case OP_PUT:
220 {
221 f->dump_string("type", "PUT");
222 f->dump_string("prefix", op.prefix);
223 f->dump_string("key", op.key);
224 f->dump_unsigned("length", op.bl.length());
225 if (dump_val) {
226 ostringstream os;
227 op.bl.hexdump(os);
228 f->dump_string("bl", os.str());
229 }
230 }
231 break;
232 case OP_ERASE:
233 {
234 f->dump_string("type", "ERASE");
235 f->dump_string("prefix", op.prefix);
236 f->dump_string("key", op.key);
237 }
238 break;
239 case OP_COMPACT:
240 {
241 f->dump_string("type", "COMPACT");
242 f->dump_string("prefix", op.prefix);
243 f->dump_string("start", op.key);
244 f->dump_string("end", op.endkey);
245 }
246 break;
247 default:
248 {
249 f->dump_string("type", "unknown");
250 f->dump_unsigned("op_code", op.type);
251 break;
252 }
253 }
254 f->close_section();
255 }
256 f->close_section();
257 f->dump_unsigned("num_keys", keys);
258 f->dump_unsigned("num_bytes", bytes);
259 f->close_section();
260 }
261 };
262
263 int apply_transaction(MonitorDBStore::TransactionRef t) {
264 KeyValueDB::Transaction dbt = db->get_transaction();
265
266 if (do_dump) {
267 if (!g_conf->mon_debug_dump_json) {
268 bufferlist bl;
269 t->encode(bl);
270 bl.write_fd(dump_fd_binary);
271 } else {
272 t->dump(&dump_fmt, true);
273 dump_fmt.flush(dump_fd_json);
274 dump_fd_json.flush();
275 }
276 }
277
278 list<pair<string, pair<string,string> > > compact;
279 for (list<Op>::const_iterator it = t->ops.begin();
280 it != t->ops.end();
281 ++it) {
282 const Op& op = *it;
283 switch (op.type) {
284 case Transaction::OP_PUT:
285 dbt->set(op.prefix, op.key, op.bl);
286 break;
287 case Transaction::OP_ERASE:
288 dbt->rmkey(op.prefix, op.key);
289 break;
290 case Transaction::OP_COMPACT:
291 compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
292 break;
293 default:
294 derr << __func__ << " unknown op type " << op.type << dendl;
295 ceph_abort();
296 break;
297 }
298 }
299 int r = db->submit_transaction_sync(dbt);
300 if (r >= 0) {
301 while (!compact.empty()) {
302 if (compact.front().second.first == string() &&
303 compact.front().second.second == string())
304 db->compact_prefix_async(compact.front().first);
305 else
306 db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
307 compact.pop_front();
308 }
309 } else {
310 assert(0 == "failed to write to db");
311 }
312 return r;
313 }
314
315 struct C_DoTransaction : public Context {
316 MonitorDBStore *store;
317 MonitorDBStore::TransactionRef t;
318 Context *oncommit;
319 C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
320 Context *f)
321 : store(s), t(t), oncommit(f)
322 {}
323 void finish(int r) override {
324 /* The store serializes writes. Each transaction is handled
325 * sequentially by the io_work Finisher. If a transaction takes longer
326 * to apply its state to permanent storage, then no other transaction
327 * will be handled meanwhile.
328 *
329 * We will now randomly inject random delays. We can safely sleep prior
330 * to applying the transaction as it won't break the model.
331 */
332 double delay_prob = g_conf->mon_inject_transaction_delay_probability;
333 if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
334 utime_t delay;
335 double delay_max = g_conf->mon_inject_transaction_delay_max;
336 delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
337 lsubdout(g_ceph_context, mon, 1)
338 << "apply_transaction will be delayed for " << delay
339 << " seconds" << dendl;
340 delay.sleep();
341 }
342 int ret = store->apply_transaction(t);
343 oncommit->complete(ret);
344 }
345 };
346
347 /**
348 * queue transaction
349 *
350 * Queue a transaction to commit asynchronously. Trigger a context
351 * on completion (without any locks held).
352 */
353 void queue_transaction(MonitorDBStore::TransactionRef t,
354 Context *oncommit) {
355 io_work.queue(new C_DoTransaction(this, t, oncommit));
356 }
357
358 /**
359 * block and flush all io activity
360 */
361 void flush() {
362 io_work.wait_for_empty();
363 }
364
365 class StoreIteratorImpl {
366 protected:
367 bool done;
368 pair<string,string> last_key;
369 bufferlist crc_bl;
370
371 StoreIteratorImpl() : done(false) { }
372 virtual ~StoreIteratorImpl() { }
373
374 bool add_chunk_entry(TransactionRef tx,
375 string &prefix,
376 string &key,
377 bufferlist &value,
378 uint64_t max) {
379 auto tmp(std::make_shared<Transaction>());
380 bufferlist tmp_bl;
381 tmp->put(prefix, key, value);
382 tmp->encode(tmp_bl);
383
384 bufferlist tx_bl;
385 tx->encode(tx_bl);
386
387 size_t len = tx_bl.length() + tmp_bl.length();
388
389 if (!tx->empty() && (len > max)) {
390 return false;
391 }
392
393 tx->append(tmp);
394 last_key.first = prefix;
395 last_key.second = key;
396
397 if (g_conf->mon_sync_debug) {
398 ::encode(prefix, crc_bl);
399 ::encode(key, crc_bl);
400 ::encode(value, crc_bl);
401 }
402
403 return true;
404 }
405
406 virtual bool _is_valid() = 0;
407
408 public:
409 __u32 crc() {
410 if (g_conf->mon_sync_debug)
411 return crc_bl.crc32c(0);
412 return 0;
413 }
414 pair<string,string> get_last_key() {
415 return last_key;
416 }
417 virtual bool has_next_chunk() {
418 return !done && _is_valid();
419 }
420 virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
421 virtual pair<string,string> get_next_key() = 0;
422 };
423 typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
424
425 class WholeStoreIteratorImpl : public StoreIteratorImpl {
426 KeyValueDB::WholeSpaceIterator iter;
427 set<string> sync_prefixes;
428
429 public:
430 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
431 set<string> &prefixes)
432 : StoreIteratorImpl(),
433 iter(iter),
434 sync_prefixes(prefixes)
435 { }
436
437 ~WholeStoreIteratorImpl() override { }
438
439 /**
440 * Obtain a chunk of the store
441 *
442 * @param bl Encoded transaction that will recreate the chunk
443 * @param first_key Pair containing the first key to obtain, and that
444 * will contain the first key in the chunk (that may
445 * differ from the one passed on to the function)
446 * @param last_key[out] Last key in the chunk
447 */
448 void get_chunk_tx(TransactionRef tx, uint64_t max) override {
449 assert(done == false);
450 assert(iter->valid() == true);
451
452 while (iter->valid()) {
453 string prefix(iter->raw_key().first);
454 string key(iter->raw_key().second);
455 if (sync_prefixes.count(prefix)) {
456 bufferlist value = iter->value();
457 if (!add_chunk_entry(tx, prefix, key, value, max))
458 return;
459 }
460 iter->next();
461 }
462 assert(iter->valid() == false);
463 done = true;
464 }
465
466 pair<string,string> get_next_key() override {
467 assert(iter->valid());
468
469 for (; iter->valid(); iter->next()) {
470 pair<string,string> r = iter->raw_key();
471 if (sync_prefixes.count(r.first) > 0) {
472 iter->next();
473 return r;
474 }
475 }
476 return pair<string,string>();
477 }
478
479 bool _is_valid() override {
480 return iter->valid();
481 }
482 };
483
484 Synchronizer get_synchronizer(pair<string,string> &key,
485 set<string> &prefixes) {
486 KeyValueDB::WholeSpaceIterator iter;
487 iter = db->get_iterator();
488
489 if (!key.first.empty() && !key.second.empty())
490 iter->upper_bound(key.first, key.second);
491 else
492 iter->seek_to_first();
493
494 return ceph::shared_ptr<StoreIteratorImpl>(
495 new WholeStoreIteratorImpl(iter, prefixes)
496 );
497 }
498
499 KeyValueDB::Iterator get_iterator(const string &prefix) {
500 assert(!prefix.empty());
501 KeyValueDB::Iterator iter = db->get_iterator(prefix);
502 iter->seek_to_first();
503 return iter;
504 }
505
506 KeyValueDB::WholeSpaceIterator get_iterator() {
507 KeyValueDB::WholeSpaceIterator iter;
508 iter = db->get_iterator();
509 iter->seek_to_first();
510 return iter;
511 }
512
513 int get(const string& prefix, const string& key, bufferlist& bl) {
514 assert(bl.length() == 0);
515 return db->get(prefix, key, &bl);
516 }
517
518 int get(const string& prefix, const version_t ver, bufferlist& bl) {
519 ostringstream os;
520 os << ver;
521 return get(prefix, os.str(), bl);
522 }
523
524 version_t get(const string& prefix, const string& key) {
525 bufferlist bl;
526 int err = get(prefix, key, bl);
527 if (err < 0) {
528 if (err == -ENOENT) // if key doesn't exist, assume its value is 0
529 return 0;
530 // we're not expecting any other negative return value, and we can't
531 // just return a negative value if we're returning a version_t
532 generic_dout(0) << "MonitorDBStore::get() error obtaining"
533 << " (" << prefix << ":" << key << "): "
534 << cpp_strerror(err) << dendl;
535 assert(0 == "error obtaining key");
536 }
537
538 assert(bl.length());
539 version_t ver;
540 bufferlist::iterator p = bl.begin();
541 ::decode(ver, p);
542 return ver;
543 }
544
545 bool exists(const string& prefix, const string& key) {
546 KeyValueDB::Iterator it = db->get_iterator(prefix);
547 int err = it->lower_bound(key);
548 if (err < 0)
549 return false;
550
551 return (it->valid() && it->key() == key);
552 }
553
554 bool exists(const string& prefix, version_t ver) {
555 ostringstream os;
556 os << ver;
557 return exists(prefix, os.str());
558 }
559
560 string combine_strings(const string& prefix, const string& value) {
561 string out = prefix;
562 out.push_back('_');
563 out.append(value);
564 return out;
565 }
566
567 string combine_strings(const string& prefix, const version_t ver) {
568 ostringstream os;
569 os << ver;
570 return combine_strings(prefix, os.str());
571 }
572
573 void clear(set<string>& prefixes) {
574 set<string>::iterator iter;
575 KeyValueDB::Transaction dbt = db->get_transaction();
576
577 for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
578 dbt->rmkeys_by_prefix((*iter));
579 }
580 int r = db->submit_transaction_sync(dbt);
581 assert(r >= 0);
582 }
583
584 void _open(string kv_type) {
585 string::const_reverse_iterator rit;
586 int pos = 0;
587 for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
588 if (*rit != '/')
589 break;
590 }
591 ostringstream os;
592 os << path.substr(0, path.size() - pos) << "/store.db";
593 string full_path = os.str();
594
595 KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
596 kv_type,
597 full_path);
598 if (!db_ptr) {
599 derr << __func__ << " error initializing "
600 << kv_type << " db back storage in "
601 << full_path << dendl;
602 assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage");
603 }
604 db.reset(db_ptr);
605
606 if (g_conf->mon_debug_dump_transactions) {
607 if (!g_conf->mon_debug_dump_json) {
608 dump_fd_binary = ::open(
609 g_conf->mon_debug_dump_location.c_str(),
610 O_CREAT|O_APPEND|O_WRONLY, 0644);
611 if (dump_fd_binary < 0) {
612 dump_fd_binary = -errno;
613 derr << "Could not open log file, got "
614 << cpp_strerror(dump_fd_binary) << dendl;
615 }
616 } else {
617 dump_fmt.reset();
618 dump_fmt.open_array_section("dump");
619 dump_fd_json.open(g_conf->mon_debug_dump_location.c_str());
620 }
621 do_dump = true;
622 }
623 if (kv_type == "rocksdb")
624 db->init(g_conf->mon_rocksdb_options);
625 else
626 db->init();
627 }
628
629 int open(ostream &out) {
630 string kv_type;
631 int r = read_meta("kv_backend", &kv_type);
632 if (r < 0 || kv_type.empty()) {
633 // assume old monitors that did not mark the type were leveldb.
634 kv_type = "leveldb";
635 r = write_meta("kv_backend", kv_type);
636 if (r < 0)
637 return r;
638 }
639 _open(kv_type);
640 r = db->open(out);
641 if (r < 0)
642 return r;
643 io_work.start();
644 is_open = true;
645 return 0;
646 }
647
648 int create_and_open(ostream &out) {
649 // record the type before open
650 string kv_type;
651 int r = read_meta("kv_backend", &kv_type);
652 if (r < 0) {
653 kv_type = g_conf->mon_keyvaluedb;
654 r = write_meta("kv_backend", kv_type);
655 if (r < 0)
656 return r;
657 }
658 _open(kv_type);
659 r = db->create_and_open(out);
660 if (r < 0)
661 return r;
662 io_work.start();
663 is_open = true;
664 return 0;
665 }
666
667 void close() {
668 // there should be no work queued!
669 io_work.stop();
670 is_open = false;
671 db.reset(NULL);
672 }
673
674 void compact() {
675 db->compact();
676 }
677
678 void compact_prefix(const string& prefix) {
679 db->compact_prefix(prefix);
680 }
681
682 uint64_t get_estimated_size(map<string, uint64_t> &extras) {
683 return db->get_estimated_size(extras);
684 }
685
686 /**
687 * write_meta - write a simple configuration key out-of-band
688 *
689 * Write a simple key/value pair for basic store configuration
690 * (e.g., a uuid or magic number) to an unopened/unmounted store.
691 * The default implementation writes this to a plaintext file in the
692 * path.
693 *
694 * A newline is appended.
695 *
696 * @param key key name (e.g., "fsid")
697 * @param value value (e.g., a uuid rendered as a string)
698 * @returns 0 for success, or an error code
699 */
700 int write_meta(const std::string& key,
701 const std::string& value) const {
702 string v = value;
703 v += "\n";
704 int r = safe_write_file(path.c_str(), key.c_str(),
705 v.c_str(), v.length());
706 if (r < 0)
707 return r;
708 return 0;
709 }
710
711 /**
712 * read_meta - read a simple configuration key out-of-band
713 *
714 * Read a simple key value to an unopened/mounted store.
715 *
716 * Trailing whitespace is stripped off.
717 *
718 * @param key key name
719 * @param value pointer to value string
720 * @returns 0 for success, or an error code
721 */
722 int read_meta(const std::string& key,
723 std::string *value) const {
724 char buf[4096];
725 int r = safe_read_file(path.c_str(), key.c_str(),
726 buf, sizeof(buf));
727 if (r <= 0)
728 return r;
729 // drop trailing newlines
730 while (r && isspace(buf[r-1])) {
731 --r;
732 }
733 *value = string(buf, r);
734 return 0;
735 }
736
737 explicit MonitorDBStore(const string& path)
738 : path(path),
739 db(0),
740 do_dump(false),
741 dump_fd_binary(-1),
742 dump_fmt(true),
743 io_work(g_ceph_context, "monstore", "fn_monstore"),
744 is_open(false) {
745 }
746 ~MonitorDBStore() {
747 assert(!is_open);
748 if (do_dump) {
749 if (!g_conf->mon_debug_dump_json) {
750 ::close(dump_fd_binary);
751 } else {
752 dump_fmt.close_section();
753 dump_fmt.flush(dump_fd_json);
754 dump_fd_json.flush();
755 dump_fd_json.close();
756 }
757 }
758 }
759
760 };
761
762 WRITE_CLASS_ENCODER(MonitorDBStore::Op)
763 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
764
765 #endif /* CEPH_MONITOR_DB_STORE_H */