]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2012 Inktank, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | */ | |
13 | #ifndef CEPH_MONITOR_DB_STORE_H | |
14 | #define CEPH_MONITOR_DB_STORE_H | |
15 | ||
16 | #include "include/types.h" | |
17 | #include "include/buffer.h" | |
18 | #include <set> | |
19 | #include <map> | |
20 | #include <string> | |
21 | #include <boost/scoped_ptr.hpp> | |
22 | #include <sstream> | |
23 | #include <fstream> | |
24 | #include "kv/KeyValueDB.h" | |
25 | ||
11fdf7f2 | 26 | #include "include/ceph_assert.h" |
7c673cae FG |
27 | #include "common/Formatter.h" |
28 | #include "common/Finisher.h" | |
29 | #include "common/errno.h" | |
30 | #include "common/debug.h" | |
31 | #include "common/safe_io.h" | |
11fdf7f2 | 32 | #include "common/blkdev.h" |
eafe8130 | 33 | #include "common/PriorityCache.h" |
7c673cae FG |
34 | |
35 | #define dout_context g_ceph_context | |
36 | ||
37 | class MonitorDBStore | |
38 | { | |
f67539c2 | 39 | std::string path; |
7c673cae FG |
40 | boost::scoped_ptr<KeyValueDB> db; |
41 | bool do_dump; | |
42 | int dump_fd_binary; | |
43 | std::ofstream dump_fd_json; | |
f67539c2 | 44 | ceph::JSONFormatter dump_fmt; |
7c673cae FG |
45 | |
46 | ||
47 | Finisher io_work; | |
48 | ||
49 | bool is_open; | |
50 | ||
51 | public: | |
52 | ||
f67539c2 | 53 | std::string get_devname() { |
11fdf7f2 TL |
54 | char devname[4096] = {0}, partition[4096]; |
55 | get_device_by_path(path.c_str(), partition, devname, | |
56 | sizeof(devname)); | |
57 | return devname; | |
58 | } | |
59 | ||
20effc67 TL |
60 | std::string get_path() { |
61 | return path; | |
62 | } | |
63 | ||
eafe8130 TL |
64 | std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const { |
65 | return db->get_priority_cache(); | |
66 | } | |
67 | ||
7c673cae FG |
68 | struct Op { |
69 | uint8_t type; | |
f67539c2 TL |
70 | std::string prefix; |
71 | std::string key, endkey; | |
72 | ceph::buffer::list bl; | |
7c673cae FG |
73 | |
74 | Op() | |
75 | : type(0) { } | |
f67539c2 | 76 | Op(int t, const std::string& p, const std::string& k) |
7c673cae | 77 | : type(t), prefix(p), key(k) { } |
f67539c2 | 78 | Op(int t, const std::string& p, const std::string& k, const ceph::buffer::list& b) |
7c673cae | 79 | : type(t), prefix(p), key(k), bl(b) { } |
f67539c2 | 80 | Op(int t, const std::string& p, const std::string& start, const std::string& end) |
7c673cae FG |
81 | : type(t), prefix(p), key(start), endkey(end) { } |
82 | ||
f67539c2 | 83 | void encode(ceph::buffer::list& encode_bl) const { |
7c673cae | 84 | ENCODE_START(2, 1, encode_bl); |
11fdf7f2 TL |
85 | encode(type, encode_bl); |
86 | encode(prefix, encode_bl); | |
87 | encode(key, encode_bl); | |
88 | encode(bl, encode_bl); | |
89 | encode(endkey, encode_bl); | |
7c673cae FG |
90 | ENCODE_FINISH(encode_bl); |
91 | } | |
92 | ||
f67539c2 | 93 | void decode(ceph::buffer::list::const_iterator& decode_bl) { |
7c673cae | 94 | DECODE_START(2, decode_bl); |
11fdf7f2 TL |
95 | decode(type, decode_bl); |
96 | decode(prefix, decode_bl); | |
97 | decode(key, decode_bl); | |
98 | decode(bl, decode_bl); | |
7c673cae | 99 | if (struct_v >= 2) |
11fdf7f2 | 100 | decode(endkey, decode_bl); |
7c673cae FG |
101 | DECODE_FINISH(decode_bl); |
102 | } | |
103 | ||
f67539c2 | 104 | void dump(ceph::Formatter *f) const { |
7c673cae FG |
105 | f->dump_int("type", type); |
106 | f->dump_string("prefix", prefix); | |
107 | f->dump_string("key", key); | |
9f95a23c | 108 | if (endkey.length()) { |
7c673cae | 109 | f->dump_string("endkey", endkey); |
9f95a23c TL |
110 | } |
111 | } | |
112 | ||
113 | int approx_size() const { | |
114 | return 6 + 1 + | |
115 | 4 + prefix.size() + | |
116 | 4 + key.size() + | |
117 | 4 + endkey.size() + | |
118 | 4 + bl.length(); | |
7c673cae FG |
119 | } |
120 | ||
f67539c2 | 121 | static void generate_test_instances(std::list<Op*>& ls) { |
7c673cae FG |
122 | ls.push_back(new Op); |
123 | // we get coverage here from the Transaction instances | |
124 | } | |
125 | }; | |
126 | ||
127 | struct Transaction; | |
11fdf7f2 | 128 | typedef std::shared_ptr<Transaction> TransactionRef; |
7c673cae | 129 | struct Transaction { |
f67539c2 | 130 | std::list<Op> ops; |
7c673cae FG |
131 | uint64_t bytes, keys; |
132 | ||
9f95a23c | 133 | Transaction() : bytes(6 + 4 + 8*2), keys(0) {} |
7c673cae FG |
134 | |
135 | enum { | |
136 | OP_PUT = 1, | |
137 | OP_ERASE = 2, | |
138 | OP_COMPACT = 3, | |
9f95a23c | 139 | OP_ERASE_RANGE = 4, |
7c673cae FG |
140 | }; |
141 | ||
f67539c2 | 142 | void put(const std::string& prefix, const std::string& key, const ceph::buffer::list& bl) { |
7c673cae FG |
143 | ops.push_back(Op(OP_PUT, prefix, key, bl)); |
144 | ++keys; | |
9f95a23c | 145 | bytes += ops.back().approx_size(); |
7c673cae FG |
146 | } |
147 | ||
f67539c2 TL |
148 | void put(const std::string& prefix, version_t ver, const ceph::buffer::list& bl) { |
149 | std::ostringstream os; | |
7c673cae FG |
150 | os << ver; |
151 | put(prefix, os.str(), bl); | |
152 | } | |
153 | ||
f67539c2 | 154 | void put(const std::string& prefix, const std::string& key, version_t ver) { |
11fdf7f2 | 155 | using ceph::encode; |
f67539c2 | 156 | ceph::buffer::list bl; |
11fdf7f2 | 157 | encode(ver, bl); |
7c673cae FG |
158 | put(prefix, key, bl); |
159 | } | |
160 | ||
f67539c2 | 161 | void erase(const std::string& prefix, const std::string& key) { |
7c673cae FG |
162 | ops.push_back(Op(OP_ERASE, prefix, key)); |
163 | ++keys; | |
9f95a23c | 164 | bytes += ops.back().approx_size(); |
7c673cae FG |
165 | } |
166 | ||
f67539c2 TL |
167 | void erase(const std::string& prefix, version_t ver) { |
168 | std::ostringstream os; | |
7c673cae FG |
169 | os << ver; |
170 | erase(prefix, os.str()); | |
171 | } | |
172 | ||
f67539c2 TL |
173 | void erase_range(const std::string& prefix, const std::string& begin, |
174 | const std::string& end) { | |
9f95a23c TL |
175 | ops.push_back(Op(OP_ERASE_RANGE, prefix, begin, end)); |
176 | ++keys; | |
177 | bytes += ops.back().approx_size(); | |
178 | } | |
179 | ||
f67539c2 TL |
180 | void compact_prefix(const std::string& prefix) { |
181 | ops.push_back(Op(OP_COMPACT, prefix, {})); | |
7c673cae FG |
182 | } |
183 | ||
f67539c2 TL |
184 | void compact_range(const std::string& prefix, const std::string& start, |
185 | const std::string& end) { | |
7c673cae FG |
186 | ops.push_back(Op(OP_COMPACT, prefix, start, end)); |
187 | } | |
188 | ||
f67539c2 | 189 | void encode(ceph::buffer::list& bl) const { |
7c673cae | 190 | ENCODE_START(2, 1, bl); |
11fdf7f2 TL |
191 | encode(ops, bl); |
192 | encode(bytes, bl); | |
193 | encode(keys, bl); | |
7c673cae FG |
194 | ENCODE_FINISH(bl); |
195 | } | |
196 | ||
f67539c2 | 197 | void decode(ceph::buffer::list::const_iterator& bl) { |
7c673cae | 198 | DECODE_START(2, bl); |
11fdf7f2 | 199 | decode(ops, bl); |
7c673cae | 200 | if (struct_v >= 2) { |
11fdf7f2 TL |
201 | decode(bytes, bl); |
202 | decode(keys, bl); | |
7c673cae FG |
203 | } |
204 | DECODE_FINISH(bl); | |
205 | } | |
206 | ||
f67539c2 | 207 | static void generate_test_instances(std::list<Transaction*>& ls) { |
7c673cae FG |
208 | ls.push_back(new Transaction); |
209 | ls.push_back(new Transaction); | |
f67539c2 | 210 | ceph::buffer::list bl; |
7c673cae FG |
211 | bl.append("value"); |
212 | ls.back()->put("prefix", "key", bl); | |
213 | ls.back()->erase("prefix2", "key2"); | |
9f95a23c | 214 | ls.back()->erase_range("prefix3", "key3", "key4"); |
7c673cae FG |
215 | ls.back()->compact_prefix("prefix3"); |
216 | ls.back()->compact_range("prefix4", "from", "to"); | |
217 | } | |
218 | ||
219 | void append(TransactionRef other) { | |
220 | ops.splice(ops.end(), other->ops); | |
221 | keys += other->keys; | |
222 | bytes += other->bytes; | |
223 | } | |
224 | ||
f67539c2 | 225 | void append_from_encoded(ceph::buffer::list& bl) { |
7c673cae | 226 | auto other(std::make_shared<Transaction>()); |
11fdf7f2 | 227 | auto it = bl.cbegin(); |
7c673cae FG |
228 | other->decode(it); |
229 | append(other); | |
230 | } | |
231 | ||
232 | bool empty() { | |
233 | return (size() == 0); | |
234 | } | |
235 | ||
236 | size_t size() const { | |
237 | return ops.size(); | |
238 | } | |
239 | uint64_t get_keys() const { | |
240 | return keys; | |
241 | } | |
242 | uint64_t get_bytes() const { | |
243 | return bytes; | |
244 | } | |
245 | ||
246 | void dump(ceph::Formatter *f, bool dump_val=false) const { | |
247 | f->open_object_section("transaction"); | |
248 | f->open_array_section("ops"); | |
7c673cae | 249 | int op_num = 0; |
f67539c2 | 250 | for (auto it = ops.begin(); it != ops.end(); ++it) { |
7c673cae FG |
251 | const Op& op = *it; |
252 | f->open_object_section("op"); | |
253 | f->dump_int("op_num", op_num++); | |
254 | switch (op.type) { | |
255 | case OP_PUT: | |
256 | { | |
257 | f->dump_string("type", "PUT"); | |
258 | f->dump_string("prefix", op.prefix); | |
259 | f->dump_string("key", op.key); | |
260 | f->dump_unsigned("length", op.bl.length()); | |
261 | if (dump_val) { | |
f67539c2 | 262 | std::ostringstream os; |
7c673cae FG |
263 | op.bl.hexdump(os); |
264 | f->dump_string("bl", os.str()); | |
265 | } | |
266 | } | |
267 | break; | |
268 | case OP_ERASE: | |
269 | { | |
270 | f->dump_string("type", "ERASE"); | |
271 | f->dump_string("prefix", op.prefix); | |
272 | f->dump_string("key", op.key); | |
273 | } | |
274 | break; | |
9f95a23c TL |
275 | case OP_ERASE_RANGE: |
276 | { | |
277 | f->dump_string("type", "ERASE_RANGE"); | |
278 | f->dump_string("prefix", op.prefix); | |
279 | f->dump_string("start", op.key); | |
280 | f->dump_string("end", op.endkey); | |
281 | } | |
282 | break; | |
7c673cae FG |
283 | case OP_COMPACT: |
284 | { | |
285 | f->dump_string("type", "COMPACT"); | |
286 | f->dump_string("prefix", op.prefix); | |
287 | f->dump_string("start", op.key); | |
288 | f->dump_string("end", op.endkey); | |
289 | } | |
290 | break; | |
291 | default: | |
292 | { | |
293 | f->dump_string("type", "unknown"); | |
294 | f->dump_unsigned("op_code", op.type); | |
295 | break; | |
296 | } | |
297 | } | |
298 | f->close_section(); | |
299 | } | |
300 | f->close_section(); | |
301 | f->dump_unsigned("num_keys", keys); | |
302 | f->dump_unsigned("num_bytes", bytes); | |
303 | f->close_section(); | |
304 | } | |
305 | }; | |
306 | ||
307 | int apply_transaction(MonitorDBStore::TransactionRef t) { | |
308 | KeyValueDB::Transaction dbt = db->get_transaction(); | |
309 | ||
310 | if (do_dump) { | |
11fdf7f2 | 311 | if (!g_conf()->mon_debug_dump_json) { |
f67539c2 | 312 | ceph::buffer::list bl; |
7c673cae FG |
313 | t->encode(bl); |
314 | bl.write_fd(dump_fd_binary); | |
315 | } else { | |
316 | t->dump(&dump_fmt, true); | |
317 | dump_fmt.flush(dump_fd_json); | |
318 | dump_fd_json.flush(); | |
319 | } | |
320 | } | |
321 | ||
f67539c2 TL |
322 | std::list<std::pair<std::string, std::pair<std::string,std::string>>> compact; |
323 | for (auto it = t->ops.begin(); it != t->ops.end(); ++it) { | |
7c673cae FG |
324 | const Op& op = *it; |
325 | switch (op.type) { | |
326 | case Transaction::OP_PUT: | |
327 | dbt->set(op.prefix, op.key, op.bl); | |
328 | break; | |
329 | case Transaction::OP_ERASE: | |
330 | dbt->rmkey(op.prefix, op.key); | |
331 | break; | |
9f95a23c TL |
332 | case Transaction::OP_ERASE_RANGE: |
333 | dbt->rm_range_keys(op.prefix, op.key, op.endkey); | |
334 | break; | |
7c673cae FG |
335 | case Transaction::OP_COMPACT: |
336 | compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey))); | |
337 | break; | |
338 | default: | |
339 | derr << __func__ << " unknown op type " << op.type << dendl; | |
340 | ceph_abort(); | |
341 | break; | |
342 | } | |
343 | } | |
344 | int r = db->submit_transaction_sync(dbt); | |
345 | if (r >= 0) { | |
346 | while (!compact.empty()) { | |
f67539c2 TL |
347 | if (compact.front().second.first == std::string() && |
348 | compact.front().second.second == std::string()) | |
7c673cae FG |
349 | db->compact_prefix_async(compact.front().first); |
350 | else | |
351 | db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second); | |
352 | compact.pop_front(); | |
353 | } | |
354 | } else { | |
11fdf7f2 | 355 | ceph_abort_msg("failed to write to db"); |
7c673cae FG |
356 | } |
357 | return r; | |
358 | } | |
359 | ||
360 | struct C_DoTransaction : public Context { | |
361 | MonitorDBStore *store; | |
362 | MonitorDBStore::TransactionRef t; | |
363 | Context *oncommit; | |
364 | C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t, | |
365 | Context *f) | |
366 | : store(s), t(t), oncommit(f) | |
367 | {} | |
368 | void finish(int r) override { | |
369 | /* The store serializes writes. Each transaction is handled | |
370 | * sequentially by the io_work Finisher. If a transaction takes longer | |
371 | * to apply its state to permanent storage, then no other transaction | |
372 | * will be handled meanwhile. | |
373 | * | |
374 | * We will now randomly inject random delays. We can safely sleep prior | |
375 | * to applying the transaction as it won't break the model. | |
376 | */ | |
11fdf7f2 | 377 | double delay_prob = g_conf()->mon_inject_transaction_delay_probability; |
7c673cae FG |
378 | if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) { |
379 | utime_t delay; | |
11fdf7f2 | 380 | double delay_max = g_conf()->mon_inject_transaction_delay_max; |
7c673cae FG |
381 | delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0); |
382 | lsubdout(g_ceph_context, mon, 1) | |
383 | << "apply_transaction will be delayed for " << delay | |
384 | << " seconds" << dendl; | |
385 | delay.sleep(); | |
386 | } | |
387 | int ret = store->apply_transaction(t); | |
388 | oncommit->complete(ret); | |
389 | } | |
390 | }; | |
391 | ||
392 | /** | |
393 | * queue transaction | |
394 | * | |
395 | * Queue a transaction to commit asynchronously. Trigger a context | |
396 | * on completion (without any locks held). | |
397 | */ | |
398 | void queue_transaction(MonitorDBStore::TransactionRef t, | |
399 | Context *oncommit) { | |
400 | io_work.queue(new C_DoTransaction(this, t, oncommit)); | |
401 | } | |
402 | ||
403 | /** | |
404 | * block and flush all io activity | |
405 | */ | |
406 | void flush() { | |
407 | io_work.wait_for_empty(); | |
408 | } | |
409 | ||
410 | class StoreIteratorImpl { | |
411 | protected: | |
412 | bool done; | |
f67539c2 TL |
413 | std::pair<std::string,std::string> last_key; |
414 | ceph::buffer::list crc_bl; | |
7c673cae FG |
415 | |
416 | StoreIteratorImpl() : done(false) { } | |
417 | virtual ~StoreIteratorImpl() { } | |
418 | ||
7c673cae FG |
419 | virtual bool _is_valid() = 0; |
420 | ||
421 | public: | |
422 | __u32 crc() { | |
11fdf7f2 | 423 | if (g_conf()->mon_sync_debug) |
7c673cae FG |
424 | return crc_bl.crc32c(0); |
425 | return 0; | |
426 | } | |
f67539c2 | 427 | std::pair<std::string,std::string> get_last_key() { |
7c673cae FG |
428 | return last_key; |
429 | } | |
430 | virtual bool has_next_chunk() { | |
431 | return !done && _is_valid(); | |
432 | } | |
9f95a23c TL |
433 | virtual void get_chunk_tx(TransactionRef tx, uint64_t max_bytes, |
434 | uint64_t max_keys) = 0; | |
f67539c2 | 435 | virtual std::pair<std::string,std::string> get_next_key() = 0; |
7c673cae | 436 | }; |
11fdf7f2 | 437 | typedef std::shared_ptr<StoreIteratorImpl> Synchronizer; |
7c673cae FG |
438 | |
439 | class WholeStoreIteratorImpl : public StoreIteratorImpl { | |
440 | KeyValueDB::WholeSpaceIterator iter; | |
f67539c2 | 441 | std::set<std::string> sync_prefixes; |
7c673cae FG |
442 | |
443 | public: | |
444 | WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter, | |
f67539c2 | 445 | std::set<std::string> &prefixes) |
7c673cae FG |
446 | : StoreIteratorImpl(), |
447 | iter(iter), | |
448 | sync_prefixes(prefixes) | |
449 | { } | |
450 | ||
451 | ~WholeStoreIteratorImpl() override { } | |
452 | ||
453 | /** | |
454 | * Obtain a chunk of the store | |
455 | * | |
456 | * @param bl Encoded transaction that will recreate the chunk | |
457 | * @param first_key Pair containing the first key to obtain, and that | |
458 | * will contain the first key in the chunk (that may | |
459 | * differ from the one passed on to the function) | |
460 | * @param last_key[out] Last key in the chunk | |
461 | */ | |
9f95a23c TL |
462 | void get_chunk_tx(TransactionRef tx, uint64_t max_bytes, |
463 | uint64_t max_keys) override { | |
f67539c2 | 464 | using ceph::encode; |
11fdf7f2 TL |
465 | ceph_assert(done == false); |
466 | ceph_assert(iter->valid() == true); | |
7c673cae FG |
467 | |
468 | while (iter->valid()) { | |
f67539c2 TL |
469 | std::string prefix(iter->raw_key().first); |
470 | std::string key(iter->raw_key().second); | |
7c673cae | 471 | if (sync_prefixes.count(prefix)) { |
f67539c2 | 472 | ceph::buffer::list value = iter->value(); |
9f95a23c TL |
473 | if (tx->empty() || |
474 | (tx->get_bytes() + value.length() + key.size() + | |
475 | prefix.size() < max_bytes && | |
476 | tx->get_keys() < max_keys)) { | |
477 | // NOTE: putting every key in a separate transaction is | |
478 | // questionable as far as efficiency goes | |
479 | auto tmp(std::make_shared<Transaction>()); | |
480 | tmp->put(prefix, key, value); | |
481 | tx->append(tmp); | |
482 | if (g_conf()->mon_sync_debug) { | |
483 | encode(prefix, crc_bl); | |
484 | encode(key, crc_bl); | |
485 | encode(value, crc_bl); | |
486 | } | |
487 | } else { | |
488 | last_key.first = prefix; | |
489 | last_key.second = key; | |
7c673cae | 490 | return; |
9f95a23c | 491 | } |
7c673cae FG |
492 | } |
493 | iter->next(); | |
494 | } | |
11fdf7f2 | 495 | ceph_assert(iter->valid() == false); |
7c673cae FG |
496 | done = true; |
497 | } | |
498 | ||
f67539c2 | 499 | std::pair<std::string,std::string> get_next_key() override { |
11fdf7f2 | 500 | ceph_assert(iter->valid()); |
7c673cae FG |
501 | |
502 | for (; iter->valid(); iter->next()) { | |
f67539c2 | 503 | std::pair<std::string,std::string> r = iter->raw_key(); |
7c673cae FG |
504 | if (sync_prefixes.count(r.first) > 0) { |
505 | iter->next(); | |
506 | return r; | |
507 | } | |
508 | } | |
f67539c2 | 509 | return std::pair<std::string,std::string>(); |
7c673cae FG |
510 | } |
511 | ||
512 | bool _is_valid() override { | |
513 | return iter->valid(); | |
514 | } | |
515 | }; | |
516 | ||
f67539c2 TL |
517 | Synchronizer get_synchronizer(std::pair<std::string,std::string> &key, |
518 | std::set<std::string> &prefixes) { | |
7c673cae | 519 | KeyValueDB::WholeSpaceIterator iter; |
11fdf7f2 | 520 | iter = db->get_wholespace_iterator(); |
7c673cae FG |
521 | |
522 | if (!key.first.empty() && !key.second.empty()) | |
523 | iter->upper_bound(key.first, key.second); | |
524 | else | |
525 | iter->seek_to_first(); | |
526 | ||
11fdf7f2 | 527 | return std::shared_ptr<StoreIteratorImpl>( |
7c673cae FG |
528 | new WholeStoreIteratorImpl(iter, prefixes) |
529 | ); | |
530 | } | |
531 | ||
f67539c2 | 532 | KeyValueDB::Iterator get_iterator(const std::string &prefix) { |
11fdf7f2 | 533 | ceph_assert(!prefix.empty()); |
7c673cae FG |
534 | KeyValueDB::Iterator iter = db->get_iterator(prefix); |
535 | iter->seek_to_first(); | |
536 | return iter; | |
537 | } | |
538 | ||
539 | KeyValueDB::WholeSpaceIterator get_iterator() { | |
540 | KeyValueDB::WholeSpaceIterator iter; | |
11fdf7f2 | 541 | iter = db->get_wholespace_iterator(); |
7c673cae FG |
542 | iter->seek_to_first(); |
543 | return iter; | |
544 | } | |
545 | ||
f67539c2 | 546 | int get(const std::string& prefix, const std::string& key, ceph::buffer::list& bl) { |
11fdf7f2 | 547 | ceph_assert(bl.length() == 0); |
7c673cae FG |
548 | return db->get(prefix, key, &bl); |
549 | } | |
550 | ||
f67539c2 TL |
551 | int get(const std::string& prefix, const version_t ver, ceph::buffer::list& bl) { |
552 | std::ostringstream os; | |
7c673cae FG |
553 | os << ver; |
554 | return get(prefix, os.str(), bl); | |
555 | } | |
556 | ||
f67539c2 TL |
557 | version_t get(const std::string& prefix, const std::string& key) { |
558 | using ceph::decode; | |
559 | ceph::buffer::list bl; | |
7c673cae FG |
560 | int err = get(prefix, key, bl); |
561 | if (err < 0) { | |
562 | if (err == -ENOENT) // if key doesn't exist, assume its value is 0 | |
563 | return 0; | |
564 | // we're not expecting any other negative return value, and we can't | |
565 | // just return a negative value if we're returning a version_t | |
566 | generic_dout(0) << "MonitorDBStore::get() error obtaining" | |
567 | << " (" << prefix << ":" << key << "): " | |
568 | << cpp_strerror(err) << dendl; | |
11fdf7f2 | 569 | ceph_abort_msg("error obtaining key"); |
7c673cae FG |
570 | } |
571 | ||
11fdf7f2 | 572 | ceph_assert(bl.length()); |
7c673cae | 573 | version_t ver; |
11fdf7f2 TL |
574 | auto p = bl.cbegin(); |
575 | decode(ver, p); | |
7c673cae FG |
576 | return ver; |
577 | } | |
578 | ||
f67539c2 | 579 | bool exists(const std::string& prefix, const std::string& key) { |
7c673cae FG |
580 | KeyValueDB::Iterator it = db->get_iterator(prefix); |
581 | int err = it->lower_bound(key); | |
582 | if (err < 0) | |
583 | return false; | |
584 | ||
585 | return (it->valid() && it->key() == key); | |
586 | } | |
587 | ||
f67539c2 TL |
588 | bool exists(const std::string& prefix, version_t ver) { |
589 | std::ostringstream os; | |
7c673cae FG |
590 | os << ver; |
591 | return exists(prefix, os.str()); | |
592 | } | |
593 | ||
f67539c2 TL |
594 | std::string combine_strings(const std::string& prefix, const std::string& value) { |
595 | std::string out = prefix; | |
7c673cae FG |
596 | out.push_back('_'); |
597 | out.append(value); | |
598 | return out; | |
599 | } | |
600 | ||
f67539c2 TL |
601 | std::string combine_strings(const std::string& prefix, const version_t ver) { |
602 | std::ostringstream os; | |
7c673cae FG |
603 | os << ver; |
604 | return combine_strings(prefix, os.str()); | |
605 | } | |
606 | ||
f67539c2 | 607 | void clear(std::set<std::string>& prefixes) { |
7c673cae FG |
608 | KeyValueDB::Transaction dbt = db->get_transaction(); |
609 | ||
f67539c2 | 610 | for (auto iter = prefixes.begin(); iter != prefixes.end(); ++iter) { |
7c673cae FG |
611 | dbt->rmkeys_by_prefix((*iter)); |
612 | } | |
613 | int r = db->submit_transaction_sync(dbt); | |
11fdf7f2 | 614 | ceph_assert(r >= 0); |
7c673cae FG |
615 | } |
616 | ||
f67539c2 | 617 | void _open(const std::string& kv_type) { |
7c673cae | 618 | int pos = 0; |
f67539c2 | 619 | for (auto rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) { |
7c673cae FG |
620 | if (*rit != '/') |
621 | break; | |
622 | } | |
f67539c2 | 623 | std::ostringstream os; |
7c673cae | 624 | os << path.substr(0, path.size() - pos) << "/store.db"; |
f67539c2 | 625 | std::string full_path = os.str(); |
7c673cae FG |
626 | |
627 | KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context, | |
628 | kv_type, | |
629 | full_path); | |
630 | if (!db_ptr) { | |
631 | derr << __func__ << " error initializing " | |
632 | << kv_type << " db back storage in " | |
633 | << full_path << dendl; | |
11fdf7f2 | 634 | ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage"); |
7c673cae FG |
635 | } |
636 | db.reset(db_ptr); | |
637 | ||
11fdf7f2 TL |
638 | if (g_conf()->mon_debug_dump_transactions) { |
639 | if (!g_conf()->mon_debug_dump_json) { | |
7c673cae | 640 | dump_fd_binary = ::open( |
11fdf7f2 | 641 | g_conf()->mon_debug_dump_location.c_str(), |
91327a77 | 642 | O_CREAT|O_APPEND|O_WRONLY|O_CLOEXEC, 0644); |
7c673cae FG |
643 | if (dump_fd_binary < 0) { |
644 | dump_fd_binary = -errno; | |
645 | derr << "Could not open log file, got " | |
646 | << cpp_strerror(dump_fd_binary) << dendl; | |
647 | } | |
648 | } else { | |
649 | dump_fmt.reset(); | |
650 | dump_fmt.open_array_section("dump"); | |
11fdf7f2 | 651 | dump_fd_json.open(g_conf()->mon_debug_dump_location.c_str()); |
7c673cae FG |
652 | } |
653 | do_dump = true; | |
654 | } | |
655 | if (kv_type == "rocksdb") | |
11fdf7f2 | 656 | db->init(g_conf()->mon_rocksdb_options); |
7c673cae FG |
657 | else |
658 | db->init(); | |
3efd9988 FG |
659 | |
660 | ||
7c673cae FG |
661 | } |
662 | ||
f67539c2 TL |
663 | int open(std::ostream &out) { |
664 | std::string kv_type; | |
7c673cae | 665 | int r = read_meta("kv_backend", &kv_type); |
31f18b77 FG |
666 | if (r < 0 || kv_type.empty()) { |
667 | // assume old monitors that did not mark the type were leveldb. | |
7c673cae | 668 | kv_type = "leveldb"; |
31f18b77 FG |
669 | r = write_meta("kv_backend", kv_type); |
670 | if (r < 0) | |
671 | return r; | |
672 | } | |
7c673cae FG |
673 | _open(kv_type); |
674 | r = db->open(out); | |
675 | if (r < 0) | |
676 | return r; | |
3efd9988 FG |
677 | |
678 | // Monitors are few in number, so the resource cost of exposing | |
679 | // very detailed stats is low: ramp up the priority of all the | |
680 | // KV store's perf counters. Do this after open, because backend may | |
681 | // not have constructed PerfCounters earlier. | |
682 | if (db->get_perf_counters()) { | |
683 | db->get_perf_counters()->set_prio_adjust( | |
684 | PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY); | |
685 | } | |
686 | ||
7c673cae FG |
687 | io_work.start(); |
688 | is_open = true; | |
689 | return 0; | |
690 | } | |
691 | ||
f67539c2 | 692 | int create_and_open(std::ostream &out) { |
7c673cae | 693 | // record the type before open |
f67539c2 | 694 | std::string kv_type; |
7c673cae FG |
695 | int r = read_meta("kv_backend", &kv_type); |
696 | if (r < 0) { | |
11fdf7f2 | 697 | kv_type = g_conf()->mon_keyvaluedb; |
7c673cae FG |
698 | r = write_meta("kv_backend", kv_type); |
699 | if (r < 0) | |
700 | return r; | |
701 | } | |
702 | _open(kv_type); | |
703 | r = db->create_and_open(out); | |
704 | if (r < 0) | |
705 | return r; | |
706 | io_work.start(); | |
707 | is_open = true; | |
708 | return 0; | |
709 | } | |
710 | ||
711 | void close() { | |
712 | // there should be no work queued! | |
713 | io_work.stop(); | |
714 | is_open = false; | |
715 | db.reset(NULL); | |
716 | } | |
717 | ||
718 | void compact() { | |
719 | db->compact(); | |
720 | } | |
721 | ||
11fdf7f2 TL |
722 | void compact_async() { |
723 | db->compact_async(); | |
724 | } | |
725 | ||
f67539c2 | 726 | void compact_prefix(const std::string& prefix) { |
7c673cae FG |
727 | db->compact_prefix(prefix); |
728 | } | |
729 | ||
f67539c2 | 730 | uint64_t get_estimated_size(std::map<std::string, uint64_t> &extras) { |
7c673cae FG |
731 | return db->get_estimated_size(extras); |
732 | } | |
733 | ||
734 | /** | |
735 | * write_meta - write a simple configuration key out-of-band | |
736 | * | |
737 | * Write a simple key/value pair for basic store configuration | |
738 | * (e.g., a uuid or magic number) to an unopened/unmounted store. | |
739 | * The default implementation writes this to a plaintext file in the | |
740 | * path. | |
741 | * | |
742 | * A newline is appended. | |
743 | * | |
744 | * @param key key name (e.g., "fsid") | |
745 | * @param value value (e.g., a uuid rendered as a string) | |
746 | * @returns 0 for success, or an error code | |
747 | */ | |
748 | int write_meta(const std::string& key, | |
749 | const std::string& value) const { | |
f67539c2 | 750 | std::string v = value; |
7c673cae FG |
751 | v += "\n"; |
752 | int r = safe_write_file(path.c_str(), key.c_str(), | |
eafe8130 TL |
753 | v.c_str(), v.length(), |
754 | 0600); | |
7c673cae FG |
755 | if (r < 0) |
756 | return r; | |
757 | return 0; | |
758 | } | |
759 | ||
760 | /** | |
761 | * read_meta - read a simple configuration key out-of-band | |
762 | * | |
763 | * Read a simple key value to an unopened/mounted store. | |
764 | * | |
765 | * Trailing whitespace is stripped off. | |
766 | * | |
767 | * @param key key name | |
768 | * @param value pointer to value string | |
769 | * @returns 0 for success, or an error code | |
770 | */ | |
771 | int read_meta(const std::string& key, | |
772 | std::string *value) const { | |
773 | char buf[4096]; | |
774 | int r = safe_read_file(path.c_str(), key.c_str(), | |
775 | buf, sizeof(buf)); | |
776 | if (r <= 0) | |
777 | return r; | |
778 | // drop trailing newlines | |
779 | while (r && isspace(buf[r-1])) { | |
780 | --r; | |
781 | } | |
f67539c2 | 782 | *value = std::string(buf, r); |
7c673cae FG |
783 | return 0; |
784 | } | |
785 | ||
f67539c2 | 786 | explicit MonitorDBStore(const std::string& path) |
7c673cae FG |
787 | : path(path), |
788 | db(0), | |
789 | do_dump(false), | |
790 | dump_fd_binary(-1), | |
791 | dump_fmt(true), | |
792 | io_work(g_ceph_context, "monstore", "fn_monstore"), | |
793 | is_open(false) { | |
794 | } | |
795 | ~MonitorDBStore() { | |
11fdf7f2 | 796 | ceph_assert(!is_open); |
7c673cae | 797 | if (do_dump) { |
11fdf7f2 | 798 | if (!g_conf()->mon_debug_dump_json) { |
7c673cae FG |
799 | ::close(dump_fd_binary); |
800 | } else { | |
801 | dump_fmt.close_section(); | |
802 | dump_fmt.flush(dump_fd_json); | |
803 | dump_fd_json.flush(); | |
804 | dump_fd_json.close(); | |
805 | } | |
806 | } | |
807 | } | |
808 | ||
809 | }; | |
810 | ||
811 | WRITE_CLASS_ENCODER(MonitorDBStore::Op) | |
812 | WRITE_CLASS_ENCODER(MonitorDBStore::Transaction) | |
813 | ||
814 | #endif /* CEPH_MONITOR_DB_STORE_H */ |