]> git.proxmox.com Git - ceph.git/blob - ceph/src/librados/RadosClient.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / librados / RadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21 #include <sstream>
22 #include <pthread.h>
23 #include <errno.h>
24
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/ceph_json.h"
29 #include "common/errno.h"
30 #include "common/ceph_json.h"
31 #include "include/buffer.h"
32 #include "include/stringify.h"
33 #include "include/util.h"
34
35 #include "messages/MLog.h"
36 #include "msg/Messenger.h"
37
38 // needed for static_cast
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MPoolOpReply.h"
41 #include "messages/MStatfsReply.h"
42 #include "messages/MGetPoolStatsReply.h"
43 #include "messages/MOSDOpReply.h"
44 #include "messages/MOSDMap.h"
45 #include "messages/MCommandReply.h"
46
47 #include "AioCompletionImpl.h"
48 #include "IoCtxImpl.h"
49 #include "PoolAsyncCompletionImpl.h"
50 #include "RadosClient.h"
51
52 #include "include/ceph_assert.h"
53 #include "common/EventTrace.h"
54
55 #define dout_subsys ceph_subsys_rados
56 #undef dout_prefix
57 #define dout_prefix *_dout << "librados: "
58
59 librados::RadosClient::RadosClient(CephContext *cct_)
60 : Dispatcher(cct_->get()),
61 cct_deleter{cct_, [](CephContext *p) {p->put();}},
62 conf(cct_->_conf),
63 state(DISCONNECTED),
64 monclient(cct_),
65 mgrclient(cct_, nullptr, &monclient.monmap),
66 messenger(NULL),
67 instance_id(0),
68 objecter(NULL),
69 timer(cct, lock),
70 refcnt(1),
71 log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
72 finisher(cct, "radosclient", "fn-radosclient")
73 {
74 }
75
76 int64_t librados::RadosClient::lookup_pool(const char *name)
77 {
78 int r = wait_for_osdmap();
79 if (r < 0) {
80 return r;
81 }
82
83 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
84 name);
85 if (-ENOENT == ret) {
86 // Make sure we have the latest map
87 int r = wait_for_latest_osdmap();
88 if (r < 0)
89 return r;
90 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
91 name);
92 }
93
94 return ret;
95 }
96
97 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
98 {
99 bool requires;
100 int r = pool_requires_alignment2(pool_id, &requires);
101 if (r < 0) {
102 // Cast answer to false, this is a little bit problematic
103 // since we really don't know the answer yet, say.
104 return false;
105 }
106
107 return requires;
108 }
109
110 // a safer version of pool_requires_alignment
111 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
112 bool *requires)
113 {
114 if (!requires)
115 return -EINVAL;
116
117 int r = wait_for_osdmap();
118 if (r < 0) {
119 return r;
120 }
121
122 return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
123 if (!o.have_pg_pool(pool_id)) {
124 return -ENOENT;
125 }
126 *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
127 return 0;
128 });
129 }
130
131 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
132 {
133 uint64_t alignment;
134 int r = pool_required_alignment2(pool_id, &alignment);
135 if (r < 0) {
136 return 0;
137 }
138
139 return alignment;
140 }
141
142 // a safer version of pool_required_alignment
143 int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
144 uint64_t *alignment)
145 {
146 if (!alignment)
147 return -EINVAL;
148
149 int r = wait_for_osdmap();
150 if (r < 0) {
151 return r;
152 }
153
154 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
155 if (!o.have_pg_pool(pool_id)) {
156 return -ENOENT;
157 }
158 *alignment = o.get_pg_pool(pool_id)->required_alignment();
159 return 0;
160 });
161 }
162
163 int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s, bool wait_latest_map)
164 {
165 int r = wait_for_osdmap();
166 if (r < 0)
167 return r;
168 retry:
169 objecter->with_osdmap([&](const OSDMap& o) {
170 if (!o.have_pg_pool(pool_id)) {
171 r = -ENOENT;
172 } else {
173 r = 0;
174 *s = o.get_pool_name(pool_id);
175 }
176 });
177 if (r == -ENOENT && wait_latest_map) {
178 r = wait_for_latest_osdmap();
179 if (r < 0)
180 return r;
181 wait_latest_map = false;
182 goto retry;
183 }
184
185 return r;
186 }
187
188 int librados::RadosClient::get_fsid(std::string *s)
189 {
190 if (!s)
191 return -EINVAL;
192 std::lock_guard l(lock);
193 ostringstream oss;
194 oss << monclient.get_fsid();
195 *s = oss.str();
196 return 0;
197 }
198
199 int librados::RadosClient::ping_monitor(const string mon_id, string *result)
200 {
201 int err = 0;
202 /* If we haven't yet connected, we have no way of telling whether we
203 * already built monc's initial monmap. IF we are in CONNECTED state,
204 * then it is safe to assume that we went through connect(), which does
205 * build a monmap.
206 */
207 if (state != CONNECTED) {
208 ldout(cct, 10) << __func__ << " build monmap" << dendl;
209 err = monclient.build_initial_monmap();
210 }
211 if (err < 0) {
212 return err;
213 }
214
215 err = monclient.ping_monitor(mon_id, result);
216 return err;
217 }
218
219 int librados::RadosClient::connect()
220 {
221 int err;
222
223 // already connected?
224 if (state == CONNECTING)
225 return -EINPROGRESS;
226 if (state == CONNECTED)
227 return -EISCONN;
228 state = CONNECTING;
229
230 if (cct->_conf->log_early &&
231 !cct->_log->is_started()) {
232 cct->_log->start();
233 }
234
235 {
236 MonClient mc_bootstrap(cct);
237 err = mc_bootstrap.get_monmap_and_config();
238 if (err < 0)
239 return err;
240 }
241
242 common_init_finish(cct);
243
244 // get monmap
245 err = monclient.build_initial_monmap();
246 if (err < 0)
247 goto out;
248
249 err = -ENOMEM;
250 messenger = Messenger::create_client_messenger(cct, "radosclient");
251 if (!messenger)
252 goto out;
253
254 // require OSDREPLYMUX feature. this means we will fail to talk to
255 // old servers. this is necessary because otherwise we won't know
256 // how to decompose the reply data into its constituent pieces.
257 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
258
259 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddrs() << dendl;
260
261 ldout(cct, 1) << "starting objecter" << dendl;
262
263 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
264 &finisher,
265 cct->_conf->rados_mon_op_timeout,
266 cct->_conf->rados_osd_op_timeout);
267 if (!objecter)
268 goto out;
269 objecter->set_balanced_budget();
270
271 monclient.set_messenger(messenger);
272 mgrclient.set_messenger(messenger);
273
274 objecter->init();
275 messenger->add_dispatcher_head(&mgrclient);
276 messenger->add_dispatcher_tail(objecter);
277 messenger->add_dispatcher_tail(this);
278
279 messenger->start();
280
281 ldout(cct, 1) << "setting wanted keys" << dendl;
282 monclient.set_want_keys(
283 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
284 ldout(cct, 1) << "calling monclient init" << dendl;
285 err = monclient.init();
286 if (err) {
287 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
288 shutdown();
289 goto out;
290 }
291
292 err = monclient.authenticate(conf->client_mount_timeout);
293 if (err) {
294 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
295 shutdown();
296 goto out;
297 }
298 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
299
300 // Detect older cluster, put mgrclient into compatible mode
301 mgrclient.set_mgr_optional(
302 !get_required_monitor_features().contains_all(
303 ceph::features::mon::FEATURE_LUMINOUS));
304
305 // MgrClient needs this (it doesn't have MonClient reference itself)
306 monclient.sub_want("mgrmap", 0, 0);
307 monclient.renew_subs();
308
309 if (service_daemon) {
310 ldout(cct, 10) << __func__ << " registering as " << service_name << "."
311 << daemon_name << dendl;
312 mgrclient.service_daemon_register(service_name, daemon_name,
313 daemon_metadata);
314 }
315 mgrclient.init();
316
317 objecter->set_client_incarnation(0);
318 objecter->start();
319 lock.lock();
320
321 timer.init();
322
323 finisher.start();
324
325 state = CONNECTED;
326 instance_id = monclient.get_global_id();
327
328 lock.unlock();
329
330 ldout(cct, 1) << "init done" << dendl;
331 err = 0;
332
333 out:
334 if (err) {
335 state = DISCONNECTED;
336
337 if (objecter) {
338 delete objecter;
339 objecter = NULL;
340 }
341 if (messenger) {
342 delete messenger;
343 messenger = NULL;
344 }
345 }
346
347 return err;
348 }
349
350 void librados::RadosClient::shutdown()
351 {
352 std::unique_lock l{lock};
353 if (state == DISCONNECTED) {
354 return;
355 }
356
357 bool need_objecter = false;
358 if (objecter && objecter->initialized) {
359 need_objecter = true;
360 }
361
362 if (state == CONNECTED) {
363 if (need_objecter) {
364 // make sure watch callbacks are flushed
365 watch_flush();
366 }
367 finisher.wait_for_empty();
368 finisher.stop();
369 }
370 state = DISCONNECTED;
371 instance_id = 0;
372 timer.shutdown(); // will drop+retake lock
373 l.unlock();
374 if (need_objecter) {
375 objecter->shutdown();
376 }
377 mgrclient.shutdown();
378
379 monclient.shutdown();
380 if (messenger) {
381 messenger->shutdown();
382 messenger->wait();
383 }
384 ldout(cct, 1) << "shutdown" << dendl;
385 }
386
387 int librados::RadosClient::watch_flush()
388 {
389 ldout(cct, 10) << __func__ << " enter" << dendl;
390 ceph::mutex mylock = ceph::make_mutex("RadosClient::watch_flush::mylock");
391 ceph::condition_variable cond;
392 bool done;
393 objecter->linger_callback_flush(new C_SafeCond(mylock, cond, &done));
394
395 std::unique_lock l{mylock};
396 cond.wait(l, [&done] { return done; });
397 ldout(cct, 10) << __func__ << " exit" << dendl;
398 return 0;
399 }
400
401 struct C_aio_watch_flush_Complete : public Context {
402 librados::RadosClient *client;
403 librados::AioCompletionImpl *c;
404
405 C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
406 : client(_client), c(_c) {
407 c->get();
408 }
409
410 void finish(int r) override {
411 c->lock.lock();
412 c->rval = r;
413 c->complete = true;
414 c->cond.notify_all();
415
416 if (c->callback_complete ||
417 c->callback_safe) {
418 client->finisher.queue(new librados::C_AioComplete(c));
419 }
420 c->put_unlock();
421 }
422 };
423
424 int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
425 {
426 ldout(cct, 10) << __func__ << " enter" << dendl;
427 Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
428 objecter->linger_callback_flush(oncomplete);
429 ldout(cct, 10) << __func__ << " exit" << dendl;
430 return 0;
431 }
432
433 uint64_t librados::RadosClient::get_instance_id()
434 {
435 return instance_id;
436 }
437
438 int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release)
439 {
440 int r = wait_for_osdmap();
441 if (r < 0) {
442 return r;
443 }
444
445 objecter->with_osdmap(
446 [require_osd_release](const OSDMap& o) {
447 *require_osd_release = ceph::to_integer<int8_t>(o.require_osd_release);
448 });
449 return 0;
450 }
451
452 int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client,
453 int8_t* require_min_compat_client)
454 {
455 int r = wait_for_osdmap();
456 if (r < 0) {
457 return r;
458 }
459
460 objecter->with_osdmap(
461 [min_compat_client, require_min_compat_client](const OSDMap& o) {
462 *min_compat_client =
463 ceph::to_integer<int8_t>(o.get_min_compat_client());
464 *require_min_compat_client =
465 ceph::to_integer<int8_t>(o.get_require_min_compat_client());
466 });
467 return 0;
468 }
469
470 librados::RadosClient::~RadosClient()
471 {
472 if (messenger)
473 delete messenger;
474 if (objecter)
475 delete objecter;
476 cct = NULL;
477 }
478
479 int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
480 {
481 int64_t poolid = lookup_pool(name);
482 if (poolid < 0) {
483 return (int)poolid;
484 }
485
486 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
487 return 0;
488 }
489
490 int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
491 {
492 std::string pool_name;
493 int r = pool_get_name(pool_id, &pool_name, true);
494 if (r < 0)
495 return r;
496 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
497 return 0;
498 }
499
500 bool librados::RadosClient::ms_dispatch(Message *m)
501 {
502 bool ret;
503
504 std::lock_guard l(lock);
505 if (state == DISCONNECTED) {
506 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
507 m->put();
508 ret = true;
509 } else {
510 ret = _dispatch(m);
511 }
512 return ret;
513 }
514
515 void librados::RadosClient::ms_handle_connect(Connection *con)
516 {
517 }
518
519 bool librados::RadosClient::ms_handle_reset(Connection *con)
520 {
521 return false;
522 }
523
524 void librados::RadosClient::ms_handle_remote_reset(Connection *con)
525 {
526 }
527
528 bool librados::RadosClient::ms_handle_refused(Connection *con)
529 {
530 return false;
531 }
532
533 bool librados::RadosClient::_dispatch(Message *m)
534 {
535 ceph_assert(ceph_mutex_is_locked(lock));
536 switch (m->get_type()) {
537 // OSD
538 case CEPH_MSG_OSD_MAP:
539 cond.notify_all();
540 m->put();
541 break;
542
543 case CEPH_MSG_MDS_MAP:
544 m->put();
545 break;
546
547 case MSG_LOG:
548 handle_log(static_cast<MLog *>(m));
549 break;
550
551 default:
552 return false;
553 }
554
555 return true;
556 }
557
558
559 int librados::RadosClient::wait_for_osdmap()
560 {
561 ceph_assert(ceph_mutex_is_not_locked_by_me(lock));
562
563 if (state != CONNECTED) {
564 return -ENOTCONN;
565 }
566
567 bool need_map = false;
568 objecter->with_osdmap([&](const OSDMap& o) {
569 if (o.get_epoch() == 0) {
570 need_map = true;
571 }
572 });
573
574 if (need_map) {
575 std::unique_lock l(lock);
576
577 ceph::timespan timeout{0};
578 if (cct->_conf->rados_mon_op_timeout > 0) {
579 timeout = ceph::make_timespan(cct->_conf->rados_mon_op_timeout);
580 }
581
582 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
583 ldout(cct, 10) << __func__ << " waiting" << dendl;
584 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
585 if (timeout == timeout.zero()) {
586 cond.wait(l);
587 } else {
588 if (cond.wait_for(l, timeout) == std::cv_status::timeout) {
589 lderr(cct) << "timed out waiting for first osdmap from monitors"
590 << dendl;
591 return -ETIMEDOUT;
592 }
593 }
594 }
595 ldout(cct, 10) << __func__ << " done waiting" << dendl;
596 }
597 return 0;
598 } else {
599 return 0;
600 }
601 }
602
603
604 int librados::RadosClient::wait_for_latest_osdmap()
605 {
606 ceph::mutex mylock = ceph::make_mutex("RadosClient::wait_for_latest_osdmap");
607 ceph::condition_variable cond;
608 bool done;
609
610 objecter->wait_for_latest_osdmap(new C_SafeCond(mylock, cond, &done));
611
612 std::unique_lock l{mylock};
613 cond.wait(l, [&done] {return done;});
614 return 0;
615 }
616
617 int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
618 {
619 int r = wait_for_osdmap();
620 if (r < 0)
621 return r;
622
623 objecter->with_osdmap([&](const OSDMap& o) {
624 for (auto p : o.get_pools())
625 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
626 });
627 return 0;
628 }
629
630 int librados::RadosClient::get_pool_stats(std::list<string>& pools,
631 map<string,::pool_stat_t> *result,
632 bool *per_pool)
633 {
634 ceph::mutex mylock = ceph::make_mutex("RadosClient::get_pool_stats::mylock");
635 ceph::condition_variable cond;
636 bool done;
637 int ret = 0;
638
639 objecter->get_pool_stats(pools, result, per_pool,
640 new C_SafeCond(mylock, cond, &done,
641 &ret));
642
643 unique_lock l{mylock};
644 cond.wait(l, [&done] { return done;});
645 return ret;
646 }
647
648 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
649 const std::string& pool)
650 {
651 bool ret = false;
652 objecter->with_osdmap([&](const OSDMap& osdmap) {
653 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
654 if (poolid >= 0)
655 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
656 });
657 return ret;
658 }
659
660 int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
661 {
662 ceph::mutex mylock = ceph::make_mutex("RadosClient::get_fs_stats::mylock");
663 ceph::condition_variable cond;
664 bool done;
665 int ret = 0;
666 {
667 std::lock_guard l{mylock};
668 objecter->get_fs_stats(stats, boost::optional<int64_t> (),
669 new C_SafeCond(mylock, cond, &done, &ret));
670 }
671 {
672 std::unique_lock l{mylock};
673 cond.wait(l, [&done] { return done;});
674 }
675 return ret;
676 }
677
678 void librados::RadosClient::get() {
679 std::lock_guard l(lock);
680 ceph_assert(refcnt > 0);
681 refcnt++;
682 }
683
684 bool librados::RadosClient::put() {
685 std::lock_guard l(lock);
686 ceph_assert(refcnt > 0);
687 refcnt--;
688 return (refcnt == 0);
689 }
690
691 int librados::RadosClient::pool_create(string& name,
692 int16_t crush_rule)
693 {
694 if (!name.length())
695 return -EINVAL;
696
697 int r = wait_for_osdmap();
698 if (r < 0) {
699 return r;
700 }
701
702 ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_create::mylock");
703 int reply;
704 ceph::condition_variable cond;
705 bool done;
706 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
707 reply = objecter->create_pool(name, onfinish, crush_rule);
708
709 if (reply < 0) {
710 delete onfinish;
711 } else {
712 std::unique_lock l{mylock};
713 cond.wait(l, [&done] { return done; });
714 }
715 return reply;
716 }
717
718 int librados::RadosClient::pool_create_async(string& name,
719 PoolAsyncCompletionImpl *c,
720 int16_t crush_rule)
721 {
722 int r = wait_for_osdmap();
723 if (r < 0)
724 return r;
725
726 Context *onfinish = new C_PoolAsync_Safe(c);
727 r = objecter->create_pool(name, onfinish, crush_rule);
728 if (r < 0) {
729 delete onfinish;
730 }
731 return r;
732 }
733
734 int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
735 {
736 int r = wait_for_osdmap();
737 if (r < 0) {
738 return r;
739 }
740
741 objecter->with_osdmap([&](const OSDMap& o) {
742 const pg_pool_t* pool = o.get_pg_pool(pool_id);
743 if (pool) {
744 if (pool->tier_of < 0) {
745 *base_tier = pool_id;
746 } else {
747 *base_tier = pool->tier_of;
748 }
749 r = 0;
750 } else {
751 r = -ENOENT;
752 }
753 });
754 return r;
755 }
756
757 int librados::RadosClient::pool_delete(const char *name)
758 {
759 int r = wait_for_osdmap();
760 if (r < 0) {
761 return r;
762 }
763
764 ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_delete::mylock");
765 ceph::condition_variable cond;
766 bool done;
767 int ret;
768 Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret);
769 ret = objecter->delete_pool(name, onfinish);
770
771 if (ret < 0) {
772 delete onfinish;
773 } else {
774 std::unique_lock l{mylock};
775 cond.wait(l, [&done] { return done;});
776 }
777 return ret;
778 }
779
780 int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
781 {
782 int r = wait_for_osdmap();
783 if (r < 0)
784 return r;
785
786 Context *onfinish = new C_PoolAsync_Safe(c);
787 r = objecter->delete_pool(name, onfinish);
788 if (r < 0) {
789 delete onfinish;
790 }
791 return r;
792 }
793
794 void librados::RadosClient::blacklist_self(bool set) {
795 std::lock_guard l(lock);
796 objecter->blacklist_self(set);
797 }
798
799 std::string librados::RadosClient::get_addrs() const {
800 CachedStackStringStream cos;
801 *cos << messenger->get_myaddrs();
802 return std::string(cos->strv());
803 }
804
805 int librados::RadosClient::blacklist_add(const string& client_address,
806 uint32_t expire_seconds)
807 {
808 entity_addr_t addr;
809 if (!addr.parse(client_address.c_str(), 0)) {
810 lderr(cct) << "unable to parse address " << client_address << dendl;
811 return -EINVAL;
812 }
813
814 std::stringstream cmd;
815 cmd << "{"
816 << "\"prefix\": \"osd blacklist\", "
817 << "\"blacklistop\": \"add\", "
818 << "\"addr\": \"" << client_address << "\"";
819 if (expire_seconds != 0) {
820 cmd << ", \"expire\": " << expire_seconds << ".0";
821 }
822 cmd << "}";
823
824 std::vector<std::string> cmds;
825 cmds.push_back(cmd.str());
826 bufferlist inbl;
827 int r = mon_command(cmds, inbl, NULL, NULL);
828 if (r < 0) {
829 return r;
830 }
831
832 // ensure we have the latest osd map epoch before proceeding
833 r = wait_for_latest_osdmap();
834 return r;
835 }
836
837 int librados::RadosClient::mon_command(const vector<string>& cmd,
838 const bufferlist &inbl,
839 bufferlist *outbl, string *outs)
840 {
841 C_SaferCond ctx;
842 mon_command_async(cmd, inbl, outbl, outs, &ctx);
843 return ctx.wait();
844 }
845
846 void librados::RadosClient::mon_command_async(const vector<string>& cmd,
847 const bufferlist &inbl,
848 bufferlist *outbl, string *outs,
849 Context *on_finish)
850 {
851 std::lock_guard l{lock};
852 monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
853 }
854
855 int librados::RadosClient::mgr_command(const vector<string>& cmd,
856 const bufferlist &inbl,
857 bufferlist *outbl, string *outs)
858 {
859 std::lock_guard l(lock);
860
861 C_SaferCond cond;
862 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
863 if (r < 0)
864 return r;
865
866 lock.unlock();
867 if (conf->rados_mon_op_timeout) {
868 r = cond.wait_for(conf->rados_mon_op_timeout);
869 } else {
870 r = cond.wait();
871 }
872 lock.lock();
873
874 return r;
875 }
876
877 int librados::RadosClient::mgr_command(
878 const string& name,
879 const vector<string>& cmd,
880 const bufferlist &inbl,
881 bufferlist *outbl, string *outs)
882 {
883 std::lock_guard l(lock);
884
885 C_SaferCond cond;
886 int r = mgrclient.start_tell_command(name, cmd, inbl, outbl, outs, &cond);
887 if (r < 0)
888 return r;
889
890 lock.unlock();
891 if (conf->rados_mon_op_timeout) {
892 r = cond.wait_for(conf->rados_mon_op_timeout);
893 } else {
894 r = cond.wait();
895 }
896 lock.lock();
897
898 return r;
899 }
900
901
902 int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
903 const bufferlist &inbl,
904 bufferlist *outbl, string *outs)
905 {
906 ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
907 ceph::condition_variable cond;
908 bool done;
909 int rval;
910 {
911 std::lock_guard l{mylock};
912 monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
913 new C_SafeCond(mylock, cond, &done, &rval));
914 }
915 std::unique_lock l{mylock};
916 cond.wait(l, [&done] { return done;});
917 return rval;
918 }
919
920 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
921 const bufferlist &inbl,
922 bufferlist *outbl, string *outs)
923 {
924 ceph::mutex mylock = ceph::make_mutex("RadosClient::mon_command::mylock");
925 ceph::condition_variable cond;
926 bool done;
927 int rval;
928 {
929 std::lock_guard l{mylock};
930 monclient.start_mon_command(name, cmd, inbl, outbl, outs,
931 new C_SafeCond(mylock, cond, &done, &rval));
932 }
933 std::unique_lock l{mylock};
934 cond.wait(l, [&done] { return done;});
935 return rval;
936 }
937
938 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
939 const bufferlist& inbl,
940 bufferlist *poutbl, string *prs)
941 {
942 ceph::mutex mylock = ceph::make_mutex("RadosClient::osd_command::mylock");
943 ceph::condition_variable cond;
944 bool done;
945 int ret;
946 ceph_tid_t tid;
947
948 if (osd < 0)
949 return -EINVAL;
950
951 {
952 std::lock_guard l{mylock};
953 // XXX do anything with tid?
954 objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
955 new C_SafeCond(mylock, cond, &done, &ret));
956 }
957 std::unique_lock l{mylock};
958 cond.wait(l, [&done] { return done;});
959 return ret;
960 }
961
962 int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
963 const bufferlist& inbl,
964 bufferlist *poutbl, string *prs)
965 {
966 ceph::mutex mylock = ceph::make_mutex("RadosClient::pg_command::mylock");
967 ceph::condition_variable cond;
968 bool done;
969 int ret;
970 ceph_tid_t tid;
971
972 {
973 std::lock_guard l{lock};
974 objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
975 new C_SafeCond(mylock, cond, &done, &ret));
976 }
977 std::unique_lock l{mylock};
978 cond.wait(l, [&done] { return done;});
979 return ret;
980 }
981
982 int librados::RadosClient::monitor_log(const string& level,
983 rados_log_callback_t cb,
984 rados_log_callback2_t cb2,
985 void *arg)
986 {
987 std::lock_guard l(lock);
988
989 if (state != CONNECTED) {
990 return -ENOTCONN;
991 }
992
993 if (cb == NULL && cb2 == NULL) {
994 // stop watch
995 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
996 << " " << (void*)log_cb2 << dendl;
997 monclient.sub_unwant(log_watch);
998 log_watch.clear();
999 log_cb = NULL;
1000 log_cb2 = NULL;
1001 log_cb_arg = NULL;
1002 return 0;
1003 }
1004
1005 string watch_level;
1006 if (level == "debug") {
1007 watch_level = "log-debug";
1008 } else if (level == "info") {
1009 watch_level = "log-info";
1010 } else if (level == "warn" || level == "warning") {
1011 watch_level = "log-warn";
1012 } else if (level == "err" || level == "error") {
1013 watch_level = "log-error";
1014 } else if (level == "sec") {
1015 watch_level = "log-sec";
1016 } else {
1017 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
1018 return -EINVAL;
1019 }
1020
1021 if (log_cb || log_cb2)
1022 monclient.sub_unwant(log_watch);
1023
1024 // (re)start watch
1025 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
1026 << " level " << level << dendl;
1027 monclient.sub_want(watch_level, 0, 0);
1028
1029 monclient.renew_subs();
1030 log_cb = cb;
1031 log_cb2 = cb2;
1032 log_cb_arg = arg;
1033 log_watch = watch_level;
1034 return 0;
1035 }
1036
1037 void librados::RadosClient::handle_log(MLog *m)
1038 {
1039 ceph_assert(ceph_mutex_is_locked(lock));
1040 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
1041
1042 if (log_last_version < m->version) {
1043 log_last_version = m->version;
1044
1045 if (log_cb || log_cb2) {
1046 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
1047 LogEntry e = *it;
1048 ostringstream ss;
1049 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
1050 string line = ss.str();
1051 string who = stringify(e.rank) + " " + stringify(e.addrs);
1052 string name = stringify(e.name);
1053 string level = stringify(e.prio);
1054 struct timespec stamp;
1055 e.stamp.to_timespec(&stamp);
1056
1057 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
1058 if (log_cb)
1059 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1060 stamp.tv_sec, stamp.tv_nsec,
1061 e.seq, level.c_str(), e.msg.c_str());
1062 if (log_cb2)
1063 log_cb2(log_cb_arg, line.c_str(),
1064 e.channel.c_str(),
1065 who.c_str(), name.c_str(),
1066 stamp.tv_sec, stamp.tv_nsec,
1067 e.seq, level.c_str(), e.msg.c_str());
1068 }
1069 }
1070
1071 monclient.sub_got(log_watch, log_last_version);
1072 }
1073
1074 m->put();
1075 }
1076
1077 int librados::RadosClient::service_daemon_register(
1078 const std::string& service, ///< service name (e.g., 'rgw')
1079 const std::string& name, ///< daemon name (e.g., 'gwfoo')
1080 const std::map<std::string,std::string>& metadata)
1081 {
1082 if (service_daemon) {
1083 return -EEXIST;
1084 }
1085 if (service == "osd" ||
1086 service == "mds" ||
1087 service == "client" ||
1088 service == "mon" ||
1089 service == "mgr") {
1090 // normal ceph entity types are not allowed!
1091 return -EINVAL;
1092 }
1093 if (service.empty() || name.empty()) {
1094 return -EINVAL;
1095 }
1096
1097 collect_sys_info(&daemon_metadata, cct);
1098
1099 ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1100 service_daemon = true;
1101 service_name = service;
1102 daemon_name = name;
1103 daemon_metadata.insert(metadata.begin(), metadata.end());
1104
1105 if (state == DISCONNECTED) {
1106 return 0;
1107 }
1108 if (state == CONNECTING) {
1109 return -EBUSY;
1110 }
1111 mgrclient.service_daemon_register(service_name, daemon_name,
1112 daemon_metadata);
1113 return 0;
1114 }
1115
1116 int librados::RadosClient::service_daemon_update_status(
1117 std::map<std::string,std::string>&& status)
1118 {
1119 if (state != CONNECTED) {
1120 return -ENOTCONN;
1121 }
1122 return mgrclient.service_daemon_update_status(std::move(status));
1123 }
1124
1125 mon_feature_t librados::RadosClient::get_required_monitor_features() const
1126 {
1127 return monclient.with_monmap([](const MonMap &monmap) {
1128 return monmap.get_required_features(); } );
1129 }
1130
1131 int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
1132 std::vector<std::string>* pgs)
1133 {
1134 vector<string> cmd = {
1135 "{\"prefix\": \"pg ls\","
1136 "\"pool\": " + std::to_string(pool_id) + ","
1137 "\"states\": [\"inconsistent\"],"
1138 "\"format\": \"json\"}"
1139 };
1140 bufferlist inbl, outbl;
1141 string outstring;
1142 if (auto ret = mgr_command(cmd, inbl, &outbl, &outstring); ret) {
1143 return ret;
1144 }
1145 if (!outbl.length()) {
1146 // no pg returned
1147 return 0;
1148 }
1149 JSONParser parser;
1150 if (!parser.parse(outbl.c_str(), outbl.length())) {
1151 return -EINVAL;
1152 }
1153 vector<string> v;
1154 if (!parser.is_array()) {
1155 JSONObj *pgstat_obj = parser.find_obj("pg_stats");
1156 if (!pgstat_obj)
1157 return 0;
1158 auto s = pgstat_obj->get_data();
1159 JSONParser pg_stats;
1160 if (!pg_stats.parse(s.c_str(), s.length())) {
1161 return -EINVAL;
1162 }
1163 v = pg_stats.get_array_elements();
1164 } else {
1165 v = parser.get_array_elements();
1166 }
1167 for (auto i : v) {
1168 JSONParser pg_json;
1169 if (!pg_json.parse(i.c_str(), i.length())) {
1170 return -EINVAL;
1171 }
1172 string pgid;
1173 JSONDecoder::decode_json("pgid", pgid, &pg_json);
1174 pgs->emplace_back(std::move(pgid));
1175 }
1176 return 0;
1177 }