]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/KVMonitor.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / mon / KVMonitor.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "mon/Monitor.h"
5#include "mon/KVMonitor.h"
6#include "include/stringify.h"
7#include "messages/MKVData.h"
8
9#define dout_subsys ceph_subsys_mon
10#undef dout_prefix
11#define dout_prefix _prefix(_dout, mon, this)
12
20effc67
TL
13using std::ostream;
14using std::ostringstream;
15using std::set;
16using std::string;
17using std::stringstream;
18
f67539c2
TL
19static ostream& _prefix(std::ostream *_dout, const Monitor &mon,
20 const KVMonitor *hmon) {
21 return *_dout << "mon." << mon.name << "@" << mon.rank
22 << "(" << mon.get_state_name() << ").kv ";
23}
24
25const string KV_PREFIX = "mon_config_key";
26
27const int MAX_HISTORY = 50;
28
29
30static bool is_binary_string(const string& s)
31{
32 for (auto c : s) {
33 // \n and \t are escaped in JSON; other control characters are not.
34 if ((c < 0x20 && c != '\n' && c != '\t') || c >= 0x7f) {
35 return true;
36 }
37 }
38 return false;
39}
40
41
42KVMonitor::KVMonitor(Monitor &m, Paxos &p, const string& service_name)
43 : PaxosService(m, p, service_name) {
44}
45
46void KVMonitor::init()
47{
48 dout(10) << __func__ << dendl;
49}
50
51void KVMonitor::create_initial()
52{
53 dout(10) << __func__ << dendl;
54 version = 0;
55 pending.clear();
56}
57
58void KVMonitor::update_from_paxos(bool *need_bootstrap)
59{
60 if (version == get_last_committed()) {
61 return;
62 }
63 version = get_last_committed();
64 dout(10) << __func__ << " " << version << dendl;
65 check_all_subs();
66}
67
68void KVMonitor::create_pending()
69{
70 dout(10) << " " << version << dendl;
71 pending.clear();
72}
73
74void KVMonitor::encode_pending(MonitorDBStore::TransactionRef t)
75{
76 dout(10) << " " << (version+1) << dendl;
77 put_last_committed(t, version+1);
78
79 // record the delta for this commit point
80 bufferlist bl;
81 encode(pending, bl);
82 put_version(t, version+1, bl);
83
84 // make actual changes
85 for (auto& p : pending) {
86 string key = p.first;
87 if (p.second) {
88 dout(20) << __func__ << " set " << key << dendl;
89 t->put(KV_PREFIX, key, *p.second);
90 } else {
91 dout(20) << __func__ << " rm " << key << dendl;
92 t->erase(KV_PREFIX, key);
93 }
94 }
95}
96
97version_t KVMonitor::get_trim_to() const
98{
99 // we don't need that many old states, but keep a few
100 if (version > MAX_HISTORY) {
101 return version - MAX_HISTORY;
102 }
103 return 0;
104}
105
106void KVMonitor::get_store_prefixes(set<string>& s) const
107{
108 s.insert(service_name);
109 s.insert(KV_PREFIX);
110}
111
112void KVMonitor::tick()
113{
114 if (!is_active() || !mon.is_leader()) {
115 return;
116 }
117 dout(10) << __func__ << dendl;
118}
119
120void KVMonitor::on_active()
121{
122}
123
124
125bool KVMonitor::preprocess_query(MonOpRequestRef op)
126{
127 switch (op->get_req()->get_type()) {
128 case MSG_MON_COMMAND:
129 try {
130 return preprocess_command(op);
131 } catch (const bad_cmd_get& e) {
132 bufferlist bl;
133 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
134 return true;
135 }
136 }
137 return false;
138}
139
140bool KVMonitor::preprocess_command(MonOpRequestRef op)
141{
142 auto m = op->get_req<MMonCommand>();
143 std::stringstream ss;
144 int err = 0;
145
146 cmdmap_t cmdmap;
147 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
148 string rs = ss.str();
149 mon.reply_command(op, -EINVAL, rs, get_last_committed());
150 return true;
151 }
20effc67 152 string format = cmd_getval_or<string>(cmdmap, "format", "plain");
f67539c2
TL
153 boost::scoped_ptr<Formatter> f(Formatter::create(format));
154
155 string prefix;
156 cmd_getval(cmdmap, "prefix", prefix);
157 string key;
158 cmd_getval(cmdmap, "key", key);
159
160 bufferlist odata;
161
162 if (prefix == "config-key get") {
163 err = mon.store->get(KV_PREFIX, key, odata);
164 }
165 else if (prefix == "config-key exists") {
166 bool exists = mon.store->exists(KV_PREFIX, key);
167 ss << "key '" << key << "'";
168 if (exists) {
169 ss << " exists";
170 err = 0;
171 } else {
172 ss << " doesn't exist";
173 err = -ENOENT;
174 }
175 }
176 else if (prefix == "config-key list" ||
177 prefix == "config-key ls") {
178 if (!f) {
179 f.reset(Formatter::create("json-pretty"));
180 }
181 KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
182 f->open_array_section("keys");
183 while (iter->valid()) {
184 string key(iter->key());
185 f->dump_string("key", key);
186 iter->next();
187 }
188 f->close_section();
189
190 stringstream tmp_ss;
191 f->flush(tmp_ss);
192 odata.append(tmp_ss);
193 err = 0;
194 }
195 else if (prefix == "config-key dump") {
196 if (!f) {
197 f.reset(Formatter::create("json-pretty"));
198 }
199
200 KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
201 if (key.size()) {
202 iter->lower_bound(key);
203 }
204 f->open_object_section("config-key store");
205 while (iter->valid()) {
206 if (key.size() &&
207 iter->key().find(key) != 0) {
208 break;
209 }
210 string s = iter->value().to_str();
211 if (is_binary_string(s)) {
212 ostringstream ss;
213 ss << "<<< binary blob of length " << s.size() << " >>>";
214 f->dump_string(iter->key().c_str(), ss.str());
215 } else {
216 f->dump_string(iter->key().c_str(), s);
217 }
218 iter->next();
219 }
220 f->close_section();
221
222 stringstream tmp_ss;
223 f->flush(tmp_ss);
224 odata.append(tmp_ss);
225 err = 0;
226 }
227 else {
228 return false;
229 }
230
231 mon.reply_command(op, err, ss.str(), odata, get_last_committed());
232 return true;
233}
234
235bool KVMonitor::prepare_update(MonOpRequestRef op)
236{
237 Message *m = op->get_req();
238 dout(7) << "prepare_update " << *m
239 << " from " << m->get_orig_source_inst() << dendl;
240 switch (m->get_type()) {
241 case MSG_MON_COMMAND:
242 try {
243 return prepare_command(op);
244 } catch (const bad_cmd_get& e) {
245 bufferlist bl;
246 mon.reply_command(op, -EINVAL, e.what(), bl, get_last_committed());
247 return true;
248 }
249 }
250 return false;
251}
252
253
254bool KVMonitor::prepare_command(MonOpRequestRef op)
255{
256 auto m = op->get_req<MMonCommand>();
257 std::stringstream ss;
258 int err = 0;
259 bufferlist odata;
260
261 cmdmap_t cmdmap;
262 if (!cmdmap_from_json(m->cmd, &cmdmap, ss)) {
263 string rs = ss.str();
264 mon.reply_command(op, -EINVAL, rs, get_last_committed());
265 return true;
266 }
267
268 string prefix;
269 cmd_getval(cmdmap, "prefix", prefix);
270 string key;
271 if (!cmd_getval(cmdmap, "key", key)) {
272 err = -EINVAL;
273 ss << "must specify a key";
274 goto reply;
275 }
276
277
278 if (prefix == "config-key set" ||
279 prefix == "config-key put") {
280 bufferlist data;
281 string val;
282 if (cmd_getval(cmdmap, "val", val)) {
283 // they specified a value in the command instead of a file
284 data.append(val);
285 } else if (m->get_data_len() > 0) {
286 // they specified '-i <file>'
287 data = m->get_data();
288 }
289 if (data.length() > (size_t) g_conf()->mon_config_key_max_entry_size) {
290 err = -EFBIG; // File too large
291 ss << "error: entry size limited to "
292 << g_conf()->mon_config_key_max_entry_size << " bytes. "
293 << "Use 'mon config key max entry size' to manually adjust";
294 goto reply;
295 }
296
297 ss << "set " << key;
298 pending[key] = data;
299 goto update;
300 }
301 else if (prefix == "config-key del" ||
302 prefix == "config-key rm") {
303 ss << "key deleted";
20effc67 304 pending[key].reset();
f67539c2
TL
305 goto update;
306 }
307 else {
308 ss << "unknown command " << prefix;
309 err = -EINVAL;
310 }
311
312reply:
313 mon.reply_command(op, err, ss.str(), odata, get_last_committed());
314 return false;
315
316update:
317 // see if there is an actual change
318 if (pending.empty()) {
319 err = 0;
320 goto reply;
321 }
322 force_immediate_propose(); // faster response
323 wait_for_finished_proposal(
324 op,
325 new Monitor::C_Command(
326 mon, op, 0, ss.str(), odata,
327 get_last_committed() + 1));
328 return true;
329}
330
331
332
333
334static string _get_dmcrypt_prefix(const uuid_d& uuid, const string k)
335{
336 return "dm-crypt/osd/" + stringify(uuid) + "/" + k;
337}
338
339bool KVMonitor::_have_prefix(const string &prefix)
340{
341 KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
342
343 while (iter->valid()) {
344 string key(iter->key());
345 size_t p = key.find(prefix);
346 if (p != string::npos && p == 0) {
347 return true;
348 }
349 iter->next();
350 }
351 return false;
352}
353
354int KVMonitor::validate_osd_destroy(
355 const int32_t id,
356 const uuid_d& uuid)
357{
358 string dmcrypt_prefix = _get_dmcrypt_prefix(uuid, "");
359 string daemon_prefix =
360 "daemon-private/osd." + stringify(id) + "/";
361
362 if (!_have_prefix(dmcrypt_prefix) &&
363 !_have_prefix(daemon_prefix)) {
364 return -ENOENT;
365 }
366 return 0;
367}
368
369void KVMonitor::do_osd_destroy(int32_t id, uuid_d& uuid)
370{
371 string dmcrypt_prefix = _get_dmcrypt_prefix(uuid, "");
372 string daemon_prefix =
373 "daemon-private/osd." + stringify(id) + "/";
374
375 for (auto& prefix : { dmcrypt_prefix, daemon_prefix }) {
376 KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
377 iter->lower_bound(prefix);
378 if (iter->key().find(prefix) != 0) {
379 break;
380 }
20effc67 381 pending[iter->key()].reset();
f67539c2
TL
382 }
383
384 propose_pending();
385}
386
387int KVMonitor::validate_osd_new(
388 const uuid_d& uuid,
389 const string& dmcrypt_key,
390 stringstream& ss)
391{
392 string dmcrypt_prefix = _get_dmcrypt_prefix(uuid, "luks");
393 bufferlist value;
394 value.append(dmcrypt_key);
395
396 if (mon.store->exists(KV_PREFIX, dmcrypt_prefix)) {
397 bufferlist existing_value;
398 int err = mon.store->get(KV_PREFIX, dmcrypt_prefix, existing_value);
399 if (err < 0) {
400 dout(10) << __func__ << " unable to get dm-crypt key from store (r = "
401 << err << ")" << dendl;
402 return err;
403 }
404 if (existing_value.contents_equal(value)) {
405 // both values match; this will be an idempotent op.
406 return EEXIST;
407 }
408 ss << "dm-crypt key already exists and does not match";
409 return -EEXIST;
410 }
411 return 0;
412}
413
414void KVMonitor::do_osd_new(
415 const uuid_d& uuid,
416 const string& dmcrypt_key)
417{
418 ceph_assert(paxos.is_plugged());
419
420 string dmcrypt_key_prefix = _get_dmcrypt_prefix(uuid, "luks");
421 bufferlist dmcrypt_key_value;
422 dmcrypt_key_value.append(dmcrypt_key);
423
424 pending[dmcrypt_key_prefix] = dmcrypt_key_value;
425
426 propose_pending();
427}
428
429
430void KVMonitor::check_sub(MonSession *s)
431{
432 if (!s->authenticated) {
433 dout(20) << __func__ << " not authenticated " << s->entity_name << dendl;
434 return;
435 }
436 for (auto& p : s->sub_map) {
437 if (p.first.find("kv:") == 0) {
438 check_sub(p.second);
439 }
440 }
441}
442
443void KVMonitor::check_sub(Subscription *sub)
444{
445 dout(10) << __func__
446 << " next " << sub->next
447 << " have " << version << dendl;
448 if (sub->next <= version) {
449 maybe_send_update(sub);
450 if (sub->onetime) {
451 mon.with_session_map([sub](MonSessionMap& session_map) {
452 session_map.remove_sub(sub);
453 });
454 }
455 }
456}
457
458void KVMonitor::check_all_subs()
459{
460 dout(10) << __func__ << dendl;
461 int updated = 0, total = 0;
462 for (auto& i : mon.session_map.subs) {
463 if (i.first.find("kv:") == 0) {
464 auto p = i.second->begin();
465 while (!p.end()) {
466 auto sub = *p;
467 ++p;
468 ++total;
469 if (maybe_send_update(sub)) {
470 ++updated;
471 }
472 }
473 }
474 }
475 dout(10) << __func__ << " updated " << updated << " / " << total << dendl;
476}
477
478bool KVMonitor::maybe_send_update(Subscription *sub)
479{
480 if (sub->next > version) {
481 return false;
482 }
483
484 auto m = new MKVData;
485 m->prefix = sub->type.substr(3);
486 m->version = version;
487
488 if (sub->next && sub->next > get_first_committed()) {
489 // incremental
490 m->incremental = true;
491
492 for (version_t cur = sub->next; cur <= version; ++cur) {
493 bufferlist bl;
494 int err = get_version(cur, bl);
495 ceph_assert(err == 0);
496
20effc67 497 std::map<std::string,std::optional<ceph::buffer::list>> pending;
f67539c2
TL
498 auto p = bl.cbegin();
499 ceph::decode(pending, p);
500
501 for (auto& i : pending) {
502 if (i.first.find(m->prefix) == 0) {
503 m->data[i.first] = i.second;
504 }
505 }
506 }
507
508 dout(10) << __func__ << " incremental keys for " << m->prefix
509 << ", v " << sub->next << ".." << version
510 << ", " << m->data.size() << " keys"
511 << dendl;
512 } else {
513 m->incremental = false;
514
515 KeyValueDB::Iterator iter = mon.store->get_iterator(KV_PREFIX);
516 iter->lower_bound(m->prefix);
517 while (iter->valid() &&
518 iter->key().find(m->prefix) == 0) {
519 m->data[iter->key()] = iter->value();
520 iter->next();
521 }
522
523 dout(10) << __func__ << " sending full dump of " << m->prefix
524 << ", " << m->data.size() << " keys"
525 << dendl;
526 }
527 sub->session->con->send_message(m);
528 sub->next = version + 1;
529 return true;
530}