]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2012 Inktank, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | #ifndef CEPH_MONITOR_DB_STORE_H | |
14 | #define CEPH_MONITOR_DB_STORE_H | |
15 | ||
16 | #include "include/types.h" | |
17 | #include "include/buffer.h" | |
18 | #include <set> | |
19 | #include <map> | |
20 | #include <string> | |
21 | #include <boost/scoped_ptr.hpp> | |
22 | #include <sstream> | |
23 | #include <fstream> | |
24 | #include "kv/KeyValueDB.h" | |
25 | ||
26 | #include "include/assert.h" | |
27 | #include "common/Formatter.h" | |
28 | #include "common/Finisher.h" | |
29 | #include "common/errno.h" | |
30 | #include "common/debug.h" | |
31 | #include "common/safe_io.h" | |
32 | ||
33 | #define dout_context g_ceph_context | |
34 | ||
35 | class MonitorDBStore | |
36 | { | |
37 | string path; | |
38 | boost::scoped_ptr<KeyValueDB> db; | |
39 | bool do_dump; | |
40 | int dump_fd_binary; | |
41 | std::ofstream dump_fd_json; | |
42 | JSONFormatter dump_fmt; | |
43 | ||
44 | ||
45 | Finisher io_work; | |
46 | ||
47 | bool is_open; | |
48 | ||
49 | public: | |
50 | ||
51 | struct Op { | |
52 | uint8_t type; | |
53 | string prefix; | |
54 | string key, endkey; | |
55 | bufferlist bl; | |
56 | ||
57 | Op() | |
58 | : type(0) { } | |
59 | Op(int t, string p, string k) | |
60 | : type(t), prefix(p), key(k) { } | |
61 | Op(int t, const string& p, string k, bufferlist& b) | |
62 | : type(t), prefix(p), key(k), bl(b) { } | |
63 | Op(int t, const string& p, string start, string end) | |
64 | : type(t), prefix(p), key(start), endkey(end) { } | |
65 | ||
66 | void encode(bufferlist& encode_bl) const { | |
67 | ENCODE_START(2, 1, encode_bl); | |
68 | ::encode(type, encode_bl); | |
69 | ::encode(prefix, encode_bl); | |
70 | ::encode(key, encode_bl); | |
71 | ::encode(bl, encode_bl); | |
72 | ::encode(endkey, encode_bl); | |
73 | ENCODE_FINISH(encode_bl); | |
74 | } | |
75 | ||
76 | void decode(bufferlist::iterator& decode_bl) { | |
77 | DECODE_START(2, decode_bl); | |
78 | ::decode(type, decode_bl); | |
79 | ::decode(prefix, decode_bl); | |
80 | ::decode(key, decode_bl); | |
81 | ::decode(bl, decode_bl); | |
82 | if (struct_v >= 2) | |
83 | ::decode(endkey, decode_bl); | |
84 | DECODE_FINISH(decode_bl); | |
85 | } | |
86 | ||
87 | void dump(Formatter *f) const { | |
88 | f->dump_int("type", type); | |
89 | f->dump_string("prefix", prefix); | |
90 | f->dump_string("key", key); | |
91 | if (endkey.length()) | |
92 | f->dump_string("endkey", endkey); | |
93 | } | |
94 | ||
95 | static void generate_test_instances(list<Op*>& ls) { | |
96 | ls.push_back(new Op); | |
97 | // we get coverage here from the Transaction instances | |
98 | } | |
99 | }; | |
100 | ||
101 | struct Transaction; | |
102 | typedef ceph::shared_ptr<Transaction> TransactionRef; | |
103 | struct Transaction { | |
104 | list<Op> ops; | |
105 | uint64_t bytes, keys; | |
106 | ||
107 | Transaction() : bytes(0), keys(0) {} | |
108 | ||
109 | enum { | |
110 | OP_PUT = 1, | |
111 | OP_ERASE = 2, | |
112 | OP_COMPACT = 3, | |
113 | }; | |
114 | ||
115 | void put(string prefix, string key, bufferlist& bl) { | |
116 | ops.push_back(Op(OP_PUT, prefix, key, bl)); | |
117 | ++keys; | |
118 | bytes += prefix.length() + key.length() + bl.length(); | |
119 | } | |
120 | ||
121 | void put(string prefix, version_t ver, bufferlist& bl) { | |
122 | ostringstream os; | |
123 | os << ver; | |
124 | put(prefix, os.str(), bl); | |
125 | } | |
126 | ||
127 | void put(string prefix, string key, version_t ver) { | |
128 | bufferlist bl; | |
129 | ::encode(ver, bl); | |
130 | put(prefix, key, bl); | |
131 | } | |
132 | ||
133 | void erase(string prefix, string key) { | |
134 | ops.push_back(Op(OP_ERASE, prefix, key)); | |
135 | ++keys; | |
136 | bytes += prefix.length() + key.length(); | |
137 | } | |
138 | ||
139 | void erase(string prefix, version_t ver) { | |
140 | ostringstream os; | |
141 | os << ver; | |
142 | erase(prefix, os.str()); | |
143 | } | |
144 | ||
145 | void compact_prefix(string prefix) { | |
146 | ops.push_back(Op(OP_COMPACT, prefix, string())); | |
147 | } | |
148 | ||
149 | void compact_range(string prefix, string start, string end) { | |
150 | ops.push_back(Op(OP_COMPACT, prefix, start, end)); | |
151 | } | |
152 | ||
153 | void encode(bufferlist& bl) const { | |
154 | ENCODE_START(2, 1, bl); | |
155 | ::encode(ops, bl); | |
156 | ::encode(bytes, bl); | |
157 | ::encode(keys, bl); | |
158 | ENCODE_FINISH(bl); | |
159 | } | |
160 | ||
161 | void decode(bufferlist::iterator& bl) { | |
162 | DECODE_START(2, bl); | |
163 | ::decode(ops, bl); | |
164 | if (struct_v >= 2) { | |
165 | ::decode(bytes, bl); | |
166 | ::decode(keys, bl); | |
167 | } | |
168 | DECODE_FINISH(bl); | |
169 | } | |
170 | ||
171 | static void generate_test_instances(list<Transaction*>& ls) { | |
172 | ls.push_back(new Transaction); | |
173 | ls.push_back(new Transaction); | |
174 | bufferlist bl; | |
175 | bl.append("value"); | |
176 | ls.back()->put("prefix", "key", bl); | |
177 | ls.back()->erase("prefix2", "key2"); | |
178 | ls.back()->compact_prefix("prefix3"); | |
179 | ls.back()->compact_range("prefix4", "from", "to"); | |
180 | } | |
181 | ||
182 | void append(TransactionRef other) { | |
183 | ops.splice(ops.end(), other->ops); | |
184 | keys += other->keys; | |
185 | bytes += other->bytes; | |
186 | } | |
187 | ||
188 | void append_from_encoded(bufferlist& bl) { | |
189 | auto other(std::make_shared<Transaction>()); | |
190 | bufferlist::iterator it = bl.begin(); | |
191 | other->decode(it); | |
192 | append(other); | |
193 | } | |
194 | ||
195 | bool empty() { | |
196 | return (size() == 0); | |
197 | } | |
198 | ||
199 | size_t size() const { | |
200 | return ops.size(); | |
201 | } | |
202 | uint64_t get_keys() const { | |
203 | return keys; | |
204 | } | |
205 | uint64_t get_bytes() const { | |
206 | return bytes; | |
207 | } | |
208 | ||
209 | void dump(ceph::Formatter *f, bool dump_val=false) const { | |
210 | f->open_object_section("transaction"); | |
211 | f->open_array_section("ops"); | |
212 | list<Op>::const_iterator it; | |
213 | int op_num = 0; | |
214 | for (it = ops.begin(); it != ops.end(); ++it) { | |
215 | const Op& op = *it; | |
216 | f->open_object_section("op"); | |
217 | f->dump_int("op_num", op_num++); | |
218 | switch (op.type) { | |
219 | case OP_PUT: | |
220 | { | |
221 | f->dump_string("type", "PUT"); | |
222 | f->dump_string("prefix", op.prefix); | |
223 | f->dump_string("key", op.key); | |
224 | f->dump_unsigned("length", op.bl.length()); | |
225 | if (dump_val) { | |
226 | ostringstream os; | |
227 | op.bl.hexdump(os); | |
228 | f->dump_string("bl", os.str()); | |
229 | } | |
230 | } | |
231 | break; | |
232 | case OP_ERASE: | |
233 | { | |
234 | f->dump_string("type", "ERASE"); | |
235 | f->dump_string("prefix", op.prefix); | |
236 | f->dump_string("key", op.key); | |
237 | } | |
238 | break; | |
239 | case OP_COMPACT: | |
240 | { | |
241 | f->dump_string("type", "COMPACT"); | |
242 | f->dump_string("prefix", op.prefix); | |
243 | f->dump_string("start", op.key); | |
244 | f->dump_string("end", op.endkey); | |
245 | } | |
246 | break; | |
247 | default: | |
248 | { | |
249 | f->dump_string("type", "unknown"); | |
250 | f->dump_unsigned("op_code", op.type); | |
251 | break; | |
252 | } | |
253 | } | |
254 | f->close_section(); | |
255 | } | |
256 | f->close_section(); | |
257 | f->dump_unsigned("num_keys", keys); | |
258 | f->dump_unsigned("num_bytes", bytes); | |
259 | f->close_section(); | |
260 | } | |
261 | }; | |
262 | ||
263 | int apply_transaction(MonitorDBStore::TransactionRef t) { | |
264 | KeyValueDB::Transaction dbt = db->get_transaction(); | |
265 | ||
266 | if (do_dump) { | |
267 | if (!g_conf->mon_debug_dump_json) { | |
268 | bufferlist bl; | |
269 | t->encode(bl); | |
270 | bl.write_fd(dump_fd_binary); | |
271 | } else { | |
272 | t->dump(&dump_fmt, true); | |
273 | dump_fmt.flush(dump_fd_json); | |
274 | dump_fd_json.flush(); | |
275 | } | |
276 | } | |
277 | ||
278 | list<pair<string, pair<string,string> > > compact; | |
279 | for (list<Op>::const_iterator it = t->ops.begin(); | |
280 | it != t->ops.end(); | |
281 | ++it) { | |
282 | const Op& op = *it; | |
283 | switch (op.type) { | |
284 | case Transaction::OP_PUT: | |
285 | dbt->set(op.prefix, op.key, op.bl); | |
286 | break; | |
287 | case Transaction::OP_ERASE: | |
288 | dbt->rmkey(op.prefix, op.key); | |
289 | break; | |
290 | case Transaction::OP_COMPACT: | |
291 | compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey))); | |
292 | break; | |
293 | default: | |
294 | derr << __func__ << " unknown op type " << op.type << dendl; | |
295 | ceph_abort(); | |
296 | break; | |
297 | } | |
298 | } | |
299 | int r = db->submit_transaction_sync(dbt); | |
300 | if (r >= 0) { | |
301 | while (!compact.empty()) { | |
302 | if (compact.front().second.first == string() && | |
303 | compact.front().second.second == string()) | |
304 | db->compact_prefix_async(compact.front().first); | |
305 | else | |
306 | db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second); | |
307 | compact.pop_front(); | |
308 | } | |
309 | } else { | |
310 | assert(0 == "failed to write to db"); | |
311 | } | |
312 | return r; | |
313 | } | |
314 | ||
315 | struct C_DoTransaction : public Context { | |
316 | MonitorDBStore *store; | |
317 | MonitorDBStore::TransactionRef t; | |
318 | Context *oncommit; | |
319 | C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t, | |
320 | Context *f) | |
321 | : store(s), t(t), oncommit(f) | |
322 | {} | |
323 | void finish(int r) override { | |
324 | /* The store serializes writes. Each transaction is handled | |
325 | * sequentially by the io_work Finisher. If a transaction takes longer | |
326 | * to apply its state to permanent storage, then no other transaction | |
327 | * will be handled meanwhile. | |
328 | * | |
329 | * We will now randomly inject random delays. We can safely sleep prior | |
330 | * to applying the transaction as it won't break the model. | |
331 | */ | |
332 | double delay_prob = g_conf->mon_inject_transaction_delay_probability; | |
333 | if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) { | |
334 | utime_t delay; | |
335 | double delay_max = g_conf->mon_inject_transaction_delay_max; | |
336 | delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0); | |
337 | lsubdout(g_ceph_context, mon, 1) | |
338 | << "apply_transaction will be delayed for " << delay | |
339 | << " seconds" << dendl; | |
340 | delay.sleep(); | |
341 | } | |
342 | int ret = store->apply_transaction(t); | |
343 | oncommit->complete(ret); | |
344 | } | |
345 | }; | |
346 | ||
347 | /** | |
348 | * queue transaction | |
349 | * | |
350 | * Queue a transaction to commit asynchronously. Trigger a context | |
351 | * on completion (without any locks held). | |
352 | */ | |
353 | void queue_transaction(MonitorDBStore::TransactionRef t, | |
354 | Context *oncommit) { | |
355 | io_work.queue(new C_DoTransaction(this, t, oncommit)); | |
356 | } | |
357 | ||
358 | /** | |
359 | * block and flush all io activity | |
360 | */ | |
361 | void flush() { | |
362 | io_work.wait_for_empty(); | |
363 | } | |
364 | ||
365 | class StoreIteratorImpl { | |
366 | protected: | |
367 | bool done; | |
368 | pair<string,string> last_key; | |
369 | bufferlist crc_bl; | |
370 | ||
371 | StoreIteratorImpl() : done(false) { } | |
372 | virtual ~StoreIteratorImpl() { } | |
373 | ||
374 | bool add_chunk_entry(TransactionRef tx, | |
375 | string &prefix, | |
376 | string &key, | |
377 | bufferlist &value, | |
378 | uint64_t max) { | |
379 | auto tmp(std::make_shared<Transaction>()); | |
380 | bufferlist tmp_bl; | |
381 | tmp->put(prefix, key, value); | |
382 | tmp->encode(tmp_bl); | |
383 | ||
384 | bufferlist tx_bl; | |
385 | tx->encode(tx_bl); | |
386 | ||
387 | size_t len = tx_bl.length() + tmp_bl.length(); | |
388 | ||
389 | if (!tx->empty() && (len > max)) { | |
390 | return false; | |
391 | } | |
392 | ||
393 | tx->append(tmp); | |
394 | last_key.first = prefix; | |
395 | last_key.second = key; | |
396 | ||
397 | if (g_conf->mon_sync_debug) { | |
398 | ::encode(prefix, crc_bl); | |
399 | ::encode(key, crc_bl); | |
400 | ::encode(value, crc_bl); | |
401 | } | |
402 | ||
403 | return true; | |
404 | } | |
405 | ||
406 | virtual bool _is_valid() = 0; | |
407 | ||
408 | public: | |
409 | __u32 crc() { | |
410 | if (g_conf->mon_sync_debug) | |
411 | return crc_bl.crc32c(0); | |
412 | return 0; | |
413 | } | |
414 | pair<string,string> get_last_key() { | |
415 | return last_key; | |
416 | } | |
417 | virtual bool has_next_chunk() { | |
418 | return !done && _is_valid(); | |
419 | } | |
420 | virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0; | |
421 | virtual pair<string,string> get_next_key() = 0; | |
422 | }; | |
423 | typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer; | |
424 | ||
425 | class WholeStoreIteratorImpl : public StoreIteratorImpl { | |
426 | KeyValueDB::WholeSpaceIterator iter; | |
427 | set<string> sync_prefixes; | |
428 | ||
429 | public: | |
430 | WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter, | |
431 | set<string> &prefixes) | |
432 | : StoreIteratorImpl(), | |
433 | iter(iter), | |
434 | sync_prefixes(prefixes) | |
435 | { } | |
436 | ||
437 | ~WholeStoreIteratorImpl() override { } | |
438 | ||
439 | /** | |
440 | * Obtain a chunk of the store | |
441 | * | |
442 | * @param bl Encoded transaction that will recreate the chunk | |
443 | * @param first_key Pair containing the first key to obtain, and that | |
444 | * will contain the first key in the chunk (that may | |
445 | * differ from the one passed on to the function) | |
446 | * @param last_key[out] Last key in the chunk | |
447 | */ | |
448 | void get_chunk_tx(TransactionRef tx, uint64_t max) override { | |
449 | assert(done == false); | |
450 | assert(iter->valid() == true); | |
451 | ||
452 | while (iter->valid()) { | |
453 | string prefix(iter->raw_key().first); | |
454 | string key(iter->raw_key().second); | |
455 | if (sync_prefixes.count(prefix)) { | |
456 | bufferlist value = iter->value(); | |
457 | if (!add_chunk_entry(tx, prefix, key, value, max)) | |
458 | return; | |
459 | } | |
460 | iter->next(); | |
461 | } | |
462 | assert(iter->valid() == false); | |
463 | done = true; | |
464 | } | |
465 | ||
466 | pair<string,string> get_next_key() override { | |
467 | assert(iter->valid()); | |
468 | ||
469 | for (; iter->valid(); iter->next()) { | |
470 | pair<string,string> r = iter->raw_key(); | |
471 | if (sync_prefixes.count(r.first) > 0) { | |
472 | iter->next(); | |
473 | return r; | |
474 | } | |
475 | } | |
476 | return pair<string,string>(); | |
477 | } | |
478 | ||
479 | bool _is_valid() override { | |
480 | return iter->valid(); | |
481 | } | |
482 | }; | |
483 | ||
484 | Synchronizer get_synchronizer(pair<string,string> &key, | |
485 | set<string> &prefixes) { | |
486 | KeyValueDB::WholeSpaceIterator iter; | |
487 | iter = db->get_iterator(); | |
488 | ||
489 | if (!key.first.empty() && !key.second.empty()) | |
490 | iter->upper_bound(key.first, key.second); | |
491 | else | |
492 | iter->seek_to_first(); | |
493 | ||
494 | return ceph::shared_ptr<StoreIteratorImpl>( | |
495 | new WholeStoreIteratorImpl(iter, prefixes) | |
496 | ); | |
497 | } | |
498 | ||
499 | KeyValueDB::Iterator get_iterator(const string &prefix) { | |
500 | assert(!prefix.empty()); | |
501 | KeyValueDB::Iterator iter = db->get_iterator(prefix); | |
502 | iter->seek_to_first(); | |
503 | return iter; | |
504 | } | |
505 | ||
506 | KeyValueDB::WholeSpaceIterator get_iterator() { | |
507 | KeyValueDB::WholeSpaceIterator iter; | |
508 | iter = db->get_iterator(); | |
509 | iter->seek_to_first(); | |
510 | return iter; | |
511 | } | |
512 | ||
513 | int get(const string& prefix, const string& key, bufferlist& bl) { | |
514 | assert(bl.length() == 0); | |
515 | return db->get(prefix, key, &bl); | |
516 | } | |
517 | ||
518 | int get(const string& prefix, const version_t ver, bufferlist& bl) { | |
519 | ostringstream os; | |
520 | os << ver; | |
521 | return get(prefix, os.str(), bl); | |
522 | } | |
523 | ||
524 | version_t get(const string& prefix, const string& key) { | |
525 | bufferlist bl; | |
526 | int err = get(prefix, key, bl); | |
527 | if (err < 0) { | |
528 | if (err == -ENOENT) // if key doesn't exist, assume its value is 0 | |
529 | return 0; | |
530 | // we're not expecting any other negative return value, and we can't | |
531 | // just return a negative value if we're returning a version_t | |
532 | generic_dout(0) << "MonitorDBStore::get() error obtaining" | |
533 | << " (" << prefix << ":" << key << "): " | |
534 | << cpp_strerror(err) << dendl; | |
535 | assert(0 == "error obtaining key"); | |
536 | } | |
537 | ||
538 | assert(bl.length()); | |
539 | version_t ver; | |
540 | bufferlist::iterator p = bl.begin(); | |
541 | ::decode(ver, p); | |
542 | return ver; | |
543 | } | |
544 | ||
545 | bool exists(const string& prefix, const string& key) { | |
546 | KeyValueDB::Iterator it = db->get_iterator(prefix); | |
547 | int err = it->lower_bound(key); | |
548 | if (err < 0) | |
549 | return false; | |
550 | ||
551 | return (it->valid() && it->key() == key); | |
552 | } | |
553 | ||
554 | bool exists(const string& prefix, version_t ver) { | |
555 | ostringstream os; | |
556 | os << ver; | |
557 | return exists(prefix, os.str()); | |
558 | } | |
559 | ||
560 | string combine_strings(const string& prefix, const string& value) { | |
561 | string out = prefix; | |
562 | out.push_back('_'); | |
563 | out.append(value); | |
564 | return out; | |
565 | } | |
566 | ||
567 | string combine_strings(const string& prefix, const version_t ver) { | |
568 | ostringstream os; | |
569 | os << ver; | |
570 | return combine_strings(prefix, os.str()); | |
571 | } | |
572 | ||
573 | void clear(set<string>& prefixes) { | |
574 | set<string>::iterator iter; | |
575 | KeyValueDB::Transaction dbt = db->get_transaction(); | |
576 | ||
577 | for (iter = prefixes.begin(); iter != prefixes.end(); ++iter) { | |
578 | dbt->rmkeys_by_prefix((*iter)); | |
579 | } | |
580 | int r = db->submit_transaction_sync(dbt); | |
581 | assert(r >= 0); | |
582 | } | |
583 | ||
584 | void _open(string kv_type) { | |
585 | string::const_reverse_iterator rit; | |
586 | int pos = 0; | |
587 | for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) { | |
588 | if (*rit != '/') | |
589 | break; | |
590 | } | |
591 | ostringstream os; | |
592 | os << path.substr(0, path.size() - pos) << "/store.db"; | |
593 | string full_path = os.str(); | |
594 | ||
595 | KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context, | |
596 | kv_type, | |
597 | full_path); | |
598 | if (!db_ptr) { | |
599 | derr << __func__ << " error initializing " | |
600 | << kv_type << " db back storage in " | |
601 | << full_path << dendl; | |
602 | assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage"); | |
603 | } | |
604 | db.reset(db_ptr); | |
605 | ||
606 | if (g_conf->mon_debug_dump_transactions) { | |
607 | if (!g_conf->mon_debug_dump_json) { | |
608 | dump_fd_binary = ::open( | |
609 | g_conf->mon_debug_dump_location.c_str(), | |
610 | O_CREAT|O_APPEND|O_WRONLY, 0644); | |
611 | if (dump_fd_binary < 0) { | |
612 | dump_fd_binary = -errno; | |
613 | derr << "Could not open log file, got " | |
614 | << cpp_strerror(dump_fd_binary) << dendl; | |
615 | } | |
616 | } else { | |
617 | dump_fmt.reset(); | |
618 | dump_fmt.open_array_section("dump"); | |
619 | dump_fd_json.open(g_conf->mon_debug_dump_location.c_str()); | |
620 | } | |
621 | do_dump = true; | |
622 | } | |
623 | if (kv_type == "rocksdb") | |
624 | db->init(g_conf->mon_rocksdb_options); | |
625 | else | |
626 | db->init(); | |
627 | } | |
628 | ||
629 | int open(ostream &out) { | |
630 | string kv_type; | |
631 | int r = read_meta("kv_backend", &kv_type); | |
31f18b77 FG |
632 | if (r < 0 || kv_type.empty()) { |
633 | // assume old monitors that did not mark the type were leveldb. | |
7c673cae | 634 | kv_type = "leveldb"; |
31f18b77 FG |
635 | r = write_meta("kv_backend", kv_type); |
636 | if (r < 0) | |
637 | return r; | |
638 | } | |
7c673cae FG |
639 | _open(kv_type); |
640 | r = db->open(out); | |
641 | if (r < 0) | |
642 | return r; | |
643 | io_work.start(); | |
644 | is_open = true; | |
645 | return 0; | |
646 | } | |
647 | ||
648 | int create_and_open(ostream &out) { | |
649 | // record the type before open | |
650 | string kv_type; | |
651 | int r = read_meta("kv_backend", &kv_type); | |
652 | if (r < 0) { | |
31f18b77 | 653 | kv_type = g_conf->mon_keyvaluedb; |
7c673cae FG |
654 | r = write_meta("kv_backend", kv_type); |
655 | if (r < 0) | |
656 | return r; | |
657 | } | |
658 | _open(kv_type); | |
659 | r = db->create_and_open(out); | |
660 | if (r < 0) | |
661 | return r; | |
662 | io_work.start(); | |
663 | is_open = true; | |
664 | return 0; | |
665 | } | |
666 | ||
667 | void close() { | |
668 | // there should be no work queued! | |
669 | io_work.stop(); | |
670 | is_open = false; | |
671 | db.reset(NULL); | |
672 | } | |
673 | ||
674 | void compact() { | |
675 | db->compact(); | |
676 | } | |
677 | ||
678 | void compact_prefix(const string& prefix) { | |
679 | db->compact_prefix(prefix); | |
680 | } | |
681 | ||
682 | uint64_t get_estimated_size(map<string, uint64_t> &extras) { | |
683 | return db->get_estimated_size(extras); | |
684 | } | |
685 | ||
686 | /** | |
687 | * write_meta - write a simple configuration key out-of-band | |
688 | * | |
689 | * Write a simple key/value pair for basic store configuration | |
690 | * (e.g., a uuid or magic number) to an unopened/unmounted store. | |
691 | * The default implementation writes this to a plaintext file in the | |
692 | * path. | |
693 | * | |
694 | * A newline is appended. | |
695 | * | |
696 | * @param key key name (e.g., "fsid") | |
697 | * @param value value (e.g., a uuid rendered as a string) | |
698 | * @returns 0 for success, or an error code | |
699 | */ | |
700 | int write_meta(const std::string& key, | |
701 | const std::string& value) const { | |
702 | string v = value; | |
703 | v += "\n"; | |
704 | int r = safe_write_file(path.c_str(), key.c_str(), | |
705 | v.c_str(), v.length()); | |
706 | if (r < 0) | |
707 | return r; | |
708 | return 0; | |
709 | } | |
710 | ||
711 | /** | |
712 | * read_meta - read a simple configuration key out-of-band | |
713 | * | |
714 | * Read a simple key value to an unopened/mounted store. | |
715 | * | |
716 | * Trailing whitespace is stripped off. | |
717 | * | |
718 | * @param key key name | |
719 | * @param value pointer to value string | |
720 | * @returns 0 for success, or an error code | |
721 | */ | |
722 | int read_meta(const std::string& key, | |
723 | std::string *value) const { | |
724 | char buf[4096]; | |
725 | int r = safe_read_file(path.c_str(), key.c_str(), | |
726 | buf, sizeof(buf)); | |
727 | if (r <= 0) | |
728 | return r; | |
729 | // drop trailing newlines | |
730 | while (r && isspace(buf[r-1])) { | |
731 | --r; | |
732 | } | |
733 | *value = string(buf, r); | |
734 | return 0; | |
735 | } | |
736 | ||
737 | explicit MonitorDBStore(const string& path) | |
738 | : path(path), | |
739 | db(0), | |
740 | do_dump(false), | |
741 | dump_fd_binary(-1), | |
742 | dump_fmt(true), | |
743 | io_work(g_ceph_context, "monstore", "fn_monstore"), | |
744 | is_open(false) { | |
745 | } | |
746 | ~MonitorDBStore() { | |
747 | assert(!is_open); | |
748 | if (do_dump) { | |
749 | if (!g_conf->mon_debug_dump_json) { | |
750 | ::close(dump_fd_binary); | |
751 | } else { | |
752 | dump_fmt.close_section(); | |
753 | dump_fmt.flush(dump_fd_json); | |
754 | dump_fd_json.flush(); | |
755 | dump_fd_json.close(); | |
756 | } | |
757 | } | |
758 | } | |
759 | ||
760 | }; | |
761 | ||
762 | WRITE_CLASS_ENCODER(MonitorDBStore::Op) | |
763 | WRITE_CLASS_ENCODER(MonitorDBStore::Transaction) | |
764 | ||
765 | #endif /* CEPH_MONITOR_DB_STORE_H */ |