]> git.proxmox.com Git - ceph.git/blob - ceph/src/librados/RadosClient.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / librados / RadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21 #include <sstream>
22 #include <pthread.h>
23 #include <errno.h>
24
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/ceph_json.h"
29 #include "common/errno.h"
30 #include "common/ceph_json.h"
31 #include "include/buffer.h"
32 #include "include/stringify.h"
33 #include "include/util.h"
34
35 #include "messages/MLog.h"
36 #include "msg/Messenger.h"
37
38 // needed for static_cast
39 #include "messages/PaxosServiceMessage.h"
40 #include "messages/MPoolOpReply.h"
41 #include "messages/MStatfsReply.h"
42 #include "messages/MGetPoolStatsReply.h"
43 #include "messages/MOSDOpReply.h"
44 #include "messages/MOSDMap.h"
45 #include "messages/MCommandReply.h"
46
47 #include "AioCompletionImpl.h"
48 #include "IoCtxImpl.h"
49 #include "PoolAsyncCompletionImpl.h"
50 #include "RadosClient.h"
51
52 #include "include/ceph_assert.h"
53 #include "common/EventTrace.h"
54
55 #define dout_subsys ceph_subsys_rados
56 #undef dout_prefix
57 #define dout_prefix *_dout << "librados: "
58
59 bool librados::RadosClient::ms_get_authorizer(int dest_type,
60 AuthAuthorizer **authorizer) {
61 //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
62 /* monitor authorization is being handled on different layer */
63 if (dest_type == CEPH_ENTITY_TYPE_MON)
64 return true;
65 *authorizer = monclient.build_authorizer(dest_type);
66 return *authorizer != NULL;
67 }
68
69 librados::RadosClient::RadosClient(CephContext *cct_)
70 : Dispatcher(cct_->get()),
71 cct_deleter{cct_, [](CephContext *p) {p->put();}},
72 conf(cct_->_conf),
73 state(DISCONNECTED),
74 monclient(cct_),
75 mgrclient(cct_, nullptr),
76 messenger(NULL),
77 instance_id(0),
78 objecter(NULL),
79 lock("librados::RadosClient::lock"),
80 timer(cct, lock),
81 refcnt(1),
82 log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
83 finisher(cct, "radosclient", "fn-radosclient")
84 {
85 }
86
87 int64_t librados::RadosClient::lookup_pool(const char *name)
88 {
89 int r = wait_for_osdmap();
90 if (r < 0) {
91 return r;
92 }
93
94 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
95 name);
96 if (-ENOENT == ret) {
97 // Make sure we have the latest map
98 int r = wait_for_latest_osdmap();
99 if (r < 0)
100 return r;
101 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
102 name);
103 }
104
105 return ret;
106 }
107
108 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
109 {
110 bool requires;
111 int r = pool_requires_alignment2(pool_id, &requires);
112 if (r < 0) {
113 // Cast answer to false, this is a little bit problematic
114 // since we really don't know the answer yet, say.
115 return false;
116 }
117
118 return requires;
119 }
120
121 // a safer version of pool_requires_alignment
122 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
123 bool *requires)
124 {
125 if (!requires)
126 return -EINVAL;
127
128 int r = wait_for_osdmap();
129 if (r < 0) {
130 return r;
131 }
132
133 return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
134 if (!o.have_pg_pool(pool_id)) {
135 return -ENOENT;
136 }
137 *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
138 return 0;
139 });
140 }
141
142 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
143 {
144 uint64_t alignment;
145 int r = pool_required_alignment2(pool_id, &alignment);
146 if (r < 0) {
147 return 0;
148 }
149
150 return alignment;
151 }
152
153 // a safer version of pool_required_alignment
154 int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
155 uint64_t *alignment)
156 {
157 if (!alignment)
158 return -EINVAL;
159
160 int r = wait_for_osdmap();
161 if (r < 0) {
162 return r;
163 }
164
165 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
166 if (!o.have_pg_pool(pool_id)) {
167 return -ENOENT;
168 }
169 *alignment = o.get_pg_pool(pool_id)->required_alignment();
170 return 0;
171 });
172 }
173
174 int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s, bool wait_latest_map)
175 {
176 int r = wait_for_osdmap();
177 if (r < 0)
178 return r;
179 retry:
180 objecter->with_osdmap([&](const OSDMap& o) {
181 if (!o.have_pg_pool(pool_id)) {
182 r = -ENOENT;
183 } else {
184 r = 0;
185 *s = o.get_pool_name(pool_id);
186 }
187 });
188 if (r == -ENOENT && wait_latest_map) {
189 r = wait_for_latest_osdmap();
190 if (r < 0)
191 return r;
192 wait_latest_map = false;
193 goto retry;
194 }
195
196 return r;
197 }
198
199 int librados::RadosClient::get_fsid(std::string *s)
200 {
201 if (!s)
202 return -EINVAL;
203 std::lock_guard l(lock);
204 ostringstream oss;
205 oss << monclient.get_fsid();
206 *s = oss.str();
207 return 0;
208 }
209
210 int librados::RadosClient::ping_monitor(const string mon_id, string *result)
211 {
212 int err = 0;
213 /* If we haven't yet connected, we have no way of telling whether we
214 * already built monc's initial monmap. IF we are in CONNECTED state,
215 * then it is safe to assume that we went through connect(), which does
216 * build a monmap.
217 */
218 if (state != CONNECTED) {
219 ldout(cct, 10) << __func__ << " build monmap" << dendl;
220 err = monclient.build_initial_monmap();
221 }
222 if (err < 0) {
223 return err;
224 }
225
226 err = monclient.ping_monitor(mon_id, result);
227 return err;
228 }
229
230 int librados::RadosClient::connect()
231 {
232 int err;
233
234 // already connected?
235 if (state == CONNECTING)
236 return -EINPROGRESS;
237 if (state == CONNECTED)
238 return -EISCONN;
239 state = CONNECTING;
240
241 if (cct->_conf->log_early &&
242 !cct->_log->is_started()) {
243 cct->_log->start();
244 }
245
246 {
247 MonClient mc_bootstrap(cct);
248 err = mc_bootstrap.get_monmap_and_config();
249 if (err < 0)
250 return err;
251 }
252
253 common_init_finish(cct);
254
255 // get monmap
256 err = monclient.build_initial_monmap();
257 if (err < 0)
258 goto out;
259
260 err = -ENOMEM;
261 messenger = Messenger::create_client_messenger(cct, "radosclient");
262 if (!messenger)
263 goto out;
264
265 // require OSDREPLYMUX feature. this means we will fail to talk to
266 // old servers. this is necessary because otherwise we won't know
267 // how to decompose the reply data into its constituent pieces.
268 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
269
270 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddrs() << dendl;
271
272 ldout(cct, 1) << "starting objecter" << dendl;
273
274 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
275 &finisher,
276 cct->_conf->rados_mon_op_timeout,
277 cct->_conf->rados_osd_op_timeout);
278 if (!objecter)
279 goto out;
280 objecter->set_balanced_budget();
281
282 monclient.set_messenger(messenger);
283 mgrclient.set_messenger(messenger);
284
285 objecter->init();
286 messenger->add_dispatcher_head(&mgrclient);
287 messenger->add_dispatcher_tail(objecter);
288 messenger->add_dispatcher_tail(this);
289
290 messenger->start();
291
292 ldout(cct, 1) << "setting wanted keys" << dendl;
293 monclient.set_want_keys(
294 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
295 ldout(cct, 1) << "calling monclient init" << dendl;
296 err = monclient.init();
297 if (err) {
298 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
299 shutdown();
300 goto out;
301 }
302
303 err = monclient.authenticate(conf->client_mount_timeout);
304 if (err) {
305 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
306 shutdown();
307 goto out;
308 }
309 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
310
311 // Detect older cluster, put mgrclient into compatible mode
312 mgrclient.set_mgr_optional(
313 !get_required_monitor_features().contains_all(
314 ceph::features::mon::FEATURE_LUMINOUS));
315
316 // MgrClient needs this (it doesn't have MonClient reference itself)
317 monclient.sub_want("mgrmap", 0, 0);
318 monclient.renew_subs();
319
320 if (service_daemon) {
321 ldout(cct, 10) << __func__ << " registering as " << service_name << "."
322 << daemon_name << dendl;
323 mgrclient.service_daemon_register(service_name, daemon_name,
324 daemon_metadata);
325 }
326 mgrclient.init();
327
328 objecter->set_client_incarnation(0);
329 objecter->start();
330 lock.Lock();
331
332 timer.init();
333
334 finisher.start();
335
336 state = CONNECTED;
337 instance_id = monclient.get_global_id();
338
339 lock.Unlock();
340
341 ldout(cct, 1) << "init done" << dendl;
342 err = 0;
343
344 out:
345 if (err) {
346 state = DISCONNECTED;
347
348 if (objecter) {
349 delete objecter;
350 objecter = NULL;
351 }
352 if (messenger) {
353 delete messenger;
354 messenger = NULL;
355 }
356 }
357
358 return err;
359 }
360
361 void librados::RadosClient::shutdown()
362 {
363 lock.Lock();
364 if (state == DISCONNECTED) {
365 lock.Unlock();
366 return;
367 }
368
369 bool need_objecter = false;
370 if (objecter && objecter->initialized) {
371 need_objecter = true;
372 }
373
374 if (state == CONNECTED) {
375 if (need_objecter) {
376 // make sure watch callbacks are flushed
377 watch_flush();
378 }
379 finisher.wait_for_empty();
380 finisher.stop();
381 }
382 state = DISCONNECTED;
383 instance_id = 0;
384 timer.shutdown(); // will drop+retake lock
385 lock.Unlock();
386 if (need_objecter) {
387 objecter->shutdown();
388 }
389 mgrclient.shutdown();
390
391 monclient.shutdown();
392 if (messenger) {
393 messenger->shutdown();
394 messenger->wait();
395 }
396 ldout(cct, 1) << "shutdown" << dendl;
397 }
398
399 int librados::RadosClient::watch_flush()
400 {
401 ldout(cct, 10) << __func__ << " enter" << dendl;
402 Mutex mylock("RadosClient::watch_flush::mylock");
403 Cond cond;
404 bool done;
405 objecter->linger_callback_flush(new C_SafeCond(&mylock, &cond, &done));
406
407 mylock.Lock();
408 while (!done)
409 cond.Wait(mylock);
410 mylock.Unlock();
411
412 ldout(cct, 10) << __func__ << " exit" << dendl;
413 return 0;
414 }
415
416 struct C_aio_watch_flush_Complete : public Context {
417 librados::RadosClient *client;
418 librados::AioCompletionImpl *c;
419
420 C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
421 : client(_client), c(_c) {
422 c->get();
423 }
424
425 void finish(int r) override {
426 c->lock.Lock();
427 c->rval = r;
428 c->complete = true;
429 c->cond.Signal();
430
431 if (c->callback_complete ||
432 c->callback_safe) {
433 client->finisher.queue(new librados::C_AioComplete(c));
434 }
435 c->put_unlock();
436 }
437 };
438
439 int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
440 {
441 ldout(cct, 10) << __func__ << " enter" << dendl;
442 Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
443 objecter->linger_callback_flush(oncomplete);
444 ldout(cct, 10) << __func__ << " exit" << dendl;
445 return 0;
446 }
447
448 uint64_t librados::RadosClient::get_instance_id()
449 {
450 return instance_id;
451 }
452
453 int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release)
454 {
455 int r = wait_for_osdmap();
456 if (r < 0) {
457 return r;
458 }
459
460 objecter->with_osdmap(
461 [require_osd_release](const OSDMap& o) {
462 *require_osd_release = o.require_osd_release;
463 });
464 return 0;
465 }
466
467 int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client,
468 int8_t* require_min_compat_client)
469 {
470 int r = wait_for_osdmap();
471 if (r < 0) {
472 return r;
473 }
474
475 objecter->with_osdmap(
476 [min_compat_client, require_min_compat_client](const OSDMap& o) {
477 *min_compat_client = o.get_min_compat_client();
478 *require_min_compat_client = o.get_require_min_compat_client();
479 });
480 return 0;
481 }
482
483 librados::RadosClient::~RadosClient()
484 {
485 if (messenger)
486 delete messenger;
487 if (objecter)
488 delete objecter;
489 cct = NULL;
490 }
491
492 int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
493 {
494 int64_t poolid = lookup_pool(name);
495 if (poolid < 0) {
496 return (int)poolid;
497 }
498
499 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
500 return 0;
501 }
502
503 int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
504 {
505 std::string pool_name;
506 int r = pool_get_name(pool_id, &pool_name, true);
507 if (r < 0)
508 return r;
509 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
510 return 0;
511 }
512
513 bool librados::RadosClient::ms_dispatch(Message *m)
514 {
515 bool ret;
516
517 std::lock_guard l(lock);
518 if (state == DISCONNECTED) {
519 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
520 m->put();
521 ret = true;
522 } else {
523 ret = _dispatch(m);
524 }
525 return ret;
526 }
527
528 void librados::RadosClient::ms_handle_connect(Connection *con)
529 {
530 }
531
532 bool librados::RadosClient::ms_handle_reset(Connection *con)
533 {
534 return false;
535 }
536
537 void librados::RadosClient::ms_handle_remote_reset(Connection *con)
538 {
539 }
540
541 bool librados::RadosClient::ms_handle_refused(Connection *con)
542 {
543 return false;
544 }
545
546 bool librados::RadosClient::_dispatch(Message *m)
547 {
548 ceph_assert(lock.is_locked());
549 switch (m->get_type()) {
550 // OSD
551 case CEPH_MSG_OSD_MAP:
552 cond.Signal();
553 m->put();
554 break;
555
556 case CEPH_MSG_MDS_MAP:
557 m->put();
558 break;
559
560 case MSG_LOG:
561 handle_log(static_cast<MLog *>(m));
562 break;
563
564 default:
565 return false;
566 }
567
568 return true;
569 }
570
571
572 int librados::RadosClient::wait_for_osdmap()
573 {
574 ceph_assert(!lock.is_locked_by_me());
575
576 if (state != CONNECTED) {
577 return -ENOTCONN;
578 }
579
580 bool need_map = false;
581 objecter->with_osdmap([&](const OSDMap& o) {
582 if (o.get_epoch() == 0) {
583 need_map = true;
584 }
585 });
586
587 if (need_map) {
588 std::lock_guard l(lock);
589
590 ceph::timespan timeout{0};
591 if (cct->_conf->rados_mon_op_timeout > 0) {
592 timeout = ceph::make_timespan(cct->_conf->rados_mon_op_timeout);
593 }
594
595 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
596 ldout(cct, 10) << __func__ << " waiting" << dendl;
597 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
598 if (timeout == timeout.zero()) {
599 cond.Wait(lock);
600 } else {
601 int r = cond.WaitInterval(lock, timeout);
602 if (r == ETIMEDOUT) {
603 lderr(cct) << "timed out waiting for first osdmap from monitors"
604 << dendl;
605 return -ETIMEDOUT;
606 }
607 }
608 }
609 ldout(cct, 10) << __func__ << " done waiting" << dendl;
610 }
611 return 0;
612 } else {
613 return 0;
614 }
615 }
616
617
618 int librados::RadosClient::wait_for_latest_osdmap()
619 {
620 Mutex mylock("RadosClient::wait_for_latest_osdmap");
621 Cond cond;
622 bool done;
623
624 objecter->wait_for_latest_osdmap(new C_SafeCond(&mylock, &cond, &done));
625
626 mylock.Lock();
627 while (!done)
628 cond.Wait(mylock);
629 mylock.Unlock();
630
631 return 0;
632 }
633
634 int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
635 {
636 int r = wait_for_osdmap();
637 if (r < 0)
638 return r;
639
640 objecter->with_osdmap([&](const OSDMap& o) {
641 for (auto p : o.get_pools())
642 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
643 });
644 return 0;
645 }
646
647 int librados::RadosClient::get_pool_stats(std::list<string>& pools,
648 map<string,::pool_stat_t>& result)
649 {
650 Mutex mylock("RadosClient::get_pool_stats::mylock");
651 Cond cond;
652 bool done;
653 int ret = 0;
654
655 objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done,
656 &ret));
657
658 mylock.Lock();
659 while (!done)
660 cond.Wait(mylock);
661 mylock.Unlock();
662
663 return ret;
664 }
665
666 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
667 const std::string& pool)
668 {
669 bool ret = false;
670 objecter->with_osdmap([&](const OSDMap& osdmap) {
671 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
672 if (poolid >= 0)
673 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
674 });
675 return ret;
676 }
677
678 int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
679 {
680 Mutex mylock ("RadosClient::get_fs_stats::mylock");
681 Cond cond;
682 bool done;
683 int ret = 0;
684
685 lock.Lock();
686 objecter->get_fs_stats(stats, boost::optional<int64_t> (),
687 new C_SafeCond(&mylock, &cond, &done, &ret));
688 lock.Unlock();
689
690 mylock.Lock();
691 while (!done) cond.Wait(mylock);
692 mylock.Unlock();
693
694 return ret;
695 }
696
697 void librados::RadosClient::get() {
698 std::lock_guard l(lock);
699 ceph_assert(refcnt > 0);
700 refcnt++;
701 }
702
703 bool librados::RadosClient::put() {
704 std::lock_guard l(lock);
705 ceph_assert(refcnt > 0);
706 refcnt--;
707 return (refcnt == 0);
708 }
709
710 int librados::RadosClient::pool_create(string& name,
711 int16_t crush_rule)
712 {
713 if (!name.length())
714 return -EINVAL;
715
716 int r = wait_for_osdmap();
717 if (r < 0) {
718 return r;
719 }
720
721 Mutex mylock ("RadosClient::pool_create::mylock");
722 int reply;
723 Cond cond;
724 bool done;
725 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
726 reply = objecter->create_pool(name, onfinish, crush_rule);
727
728 if (reply < 0) {
729 delete onfinish;
730 } else {
731 mylock.Lock();
732 while(!done)
733 cond.Wait(mylock);
734 mylock.Unlock();
735 }
736 return reply;
737 }
738
739 int librados::RadosClient::pool_create_async(string& name,
740 PoolAsyncCompletionImpl *c,
741 int16_t crush_rule)
742 {
743 int r = wait_for_osdmap();
744 if (r < 0)
745 return r;
746
747 Context *onfinish = new C_PoolAsync_Safe(c);
748 r = objecter->create_pool(name, onfinish, crush_rule);
749 if (r < 0) {
750 delete onfinish;
751 }
752 return r;
753 }
754
755 int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
756 {
757 int r = wait_for_osdmap();
758 if (r < 0) {
759 return r;
760 }
761
762 objecter->with_osdmap([&](const OSDMap& o) {
763 const pg_pool_t* pool = o.get_pg_pool(pool_id);
764 if (pool) {
765 if (pool->tier_of < 0) {
766 *base_tier = pool_id;
767 } else {
768 *base_tier = pool->tier_of;
769 }
770 r = 0;
771 } else {
772 r = -ENOENT;
773 }
774 });
775 return r;
776 }
777
778 int librados::RadosClient::pool_delete(const char *name)
779 {
780 int r = wait_for_osdmap();
781 if (r < 0) {
782 return r;
783 }
784
785 Mutex mylock("RadosClient::pool_delete::mylock");
786 Cond cond;
787 bool done;
788 int ret;
789 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &ret);
790 ret = objecter->delete_pool(name, onfinish);
791
792 if (ret < 0) {
793 delete onfinish;
794 } else {
795 mylock.Lock();
796 while (!done)
797 cond.Wait(mylock);
798 mylock.Unlock();
799 }
800 return ret;
801 }
802
803 int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
804 {
805 int r = wait_for_osdmap();
806 if (r < 0)
807 return r;
808
809 Context *onfinish = new C_PoolAsync_Safe(c);
810 r = objecter->delete_pool(name, onfinish);
811 if (r < 0) {
812 delete onfinish;
813 }
814 return r;
815 }
816
817 void librados::RadosClient::blacklist_self(bool set) {
818 std::lock_guard l(lock);
819 objecter->blacklist_self(set);
820 }
821
822 int librados::RadosClient::blacklist_add(const string& client_address,
823 uint32_t expire_seconds)
824 {
825 entity_addr_t addr;
826 if (!addr.parse(client_address.c_str(), 0)) {
827 lderr(cct) << "unable to parse address " << client_address << dendl;
828 return -EINVAL;
829 }
830
831 std::stringstream cmd;
832 cmd << "{"
833 << "\"prefix\": \"osd blacklist\", "
834 << "\"blacklistop\": \"add\", "
835 << "\"addr\": \"" << client_address << "\"";
836 if (expire_seconds != 0) {
837 cmd << ", \"expire\": " << expire_seconds << ".0";
838 }
839 cmd << "}";
840
841 std::vector<std::string> cmds;
842 cmds.push_back(cmd.str());
843 bufferlist inbl;
844 int r = mon_command(cmds, inbl, NULL, NULL);
845 if (r < 0) {
846 return r;
847 }
848
849 // ensure we have the latest osd map epoch before proceeding
850 r = wait_for_latest_osdmap();
851 return r;
852 }
853
854 int librados::RadosClient::mon_command(const vector<string>& cmd,
855 const bufferlist &inbl,
856 bufferlist *outbl, string *outs)
857 {
858 C_SaferCond ctx;
859 mon_command_async(cmd, inbl, outbl, outs, &ctx);
860 return ctx.wait();
861 }
862
863 void librados::RadosClient::mon_command_async(const vector<string>& cmd,
864 const bufferlist &inbl,
865 bufferlist *outbl, string *outs,
866 Context *on_finish)
867 {
868 lock.Lock();
869 monclient.start_mon_command(cmd, inbl, outbl, outs, on_finish);
870 lock.Unlock();
871 }
872
873 int librados::RadosClient::mgr_command(const vector<string>& cmd,
874 const bufferlist &inbl,
875 bufferlist *outbl, string *outs)
876 {
877 std::lock_guard l(lock);
878
879 C_SaferCond cond;
880 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
881 if (r < 0)
882 return r;
883
884 lock.Unlock();
885 if (conf->rados_mon_op_timeout) {
886 r = cond.wait_for(conf->rados_mon_op_timeout);
887 } else {
888 r = cond.wait();
889 }
890 lock.Lock();
891
892 return r;
893 }
894
895
896 int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
897 const bufferlist &inbl,
898 bufferlist *outbl, string *outs)
899 {
900 Mutex mylock("RadosClient::mon_command::mylock");
901 Cond cond;
902 bool done;
903 int rval;
904 lock.Lock();
905 monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
906 new C_SafeCond(&mylock, &cond, &done, &rval));
907 lock.Unlock();
908 mylock.Lock();
909 while (!done)
910 cond.Wait(mylock);
911 mylock.Unlock();
912 return rval;
913 }
914
915 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
916 const bufferlist &inbl,
917 bufferlist *outbl, string *outs)
918 {
919 Mutex mylock("RadosClient::mon_command::mylock");
920 Cond cond;
921 bool done;
922 int rval;
923 lock.Lock();
924 monclient.start_mon_command(name, cmd, inbl, outbl, outs,
925 new C_SafeCond(&mylock, &cond, &done, &rval));
926 lock.Unlock();
927 mylock.Lock();
928 while (!done)
929 cond.Wait(mylock);
930 mylock.Unlock();
931 return rval;
932 }
933
934 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
935 const bufferlist& inbl,
936 bufferlist *poutbl, string *prs)
937 {
938 Mutex mylock("RadosClient::osd_command::mylock");
939 Cond cond;
940 bool done;
941 int ret;
942 ceph_tid_t tid;
943
944 if (osd < 0)
945 return -EINVAL;
946
947 lock.Lock();
948 // XXX do anything with tid?
949 objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
950 new C_SafeCond(&mylock, &cond, &done, &ret));
951 lock.Unlock();
952 mylock.Lock();
953 while (!done)
954 cond.Wait(mylock);
955 mylock.Unlock();
956 return ret;
957 }
958
959 int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
960 const bufferlist& inbl,
961 bufferlist *poutbl, string *prs)
962 {
963 Mutex mylock("RadosClient::pg_command::mylock");
964 Cond cond;
965 bool done;
966 int ret;
967 ceph_tid_t tid;
968 lock.Lock();
969 objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
970 new C_SafeCond(&mylock, &cond, &done, &ret));
971 lock.Unlock();
972 mylock.Lock();
973 while (!done)
974 cond.Wait(mylock);
975 mylock.Unlock();
976 return ret;
977 }
978
979 int librados::RadosClient::monitor_log(const string& level,
980 rados_log_callback_t cb,
981 rados_log_callback2_t cb2,
982 void *arg)
983 {
984 std::lock_guard l(lock);
985
986 if (state != CONNECTED) {
987 return -ENOTCONN;
988 }
989
990 if (cb == NULL && cb2 == NULL) {
991 // stop watch
992 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
993 << " " << (void*)log_cb2 << dendl;
994 monclient.sub_unwant(log_watch);
995 log_watch.clear();
996 log_cb = NULL;
997 log_cb2 = NULL;
998 log_cb_arg = NULL;
999 return 0;
1000 }
1001
1002 string watch_level;
1003 if (level == "debug") {
1004 watch_level = "log-debug";
1005 } else if (level == "info") {
1006 watch_level = "log-info";
1007 } else if (level == "warn" || level == "warning") {
1008 watch_level = "log-warn";
1009 } else if (level == "err" || level == "error") {
1010 watch_level = "log-error";
1011 } else if (level == "sec") {
1012 watch_level = "log-sec";
1013 } else {
1014 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
1015 return -EINVAL;
1016 }
1017
1018 if (log_cb || log_cb2)
1019 monclient.sub_unwant(log_watch);
1020
1021 // (re)start watch
1022 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
1023 << " level " << level << dendl;
1024 monclient.sub_want(watch_level, 0, 0);
1025
1026 monclient.renew_subs();
1027 log_cb = cb;
1028 log_cb2 = cb2;
1029 log_cb_arg = arg;
1030 log_watch = watch_level;
1031 return 0;
1032 }
1033
1034 void librados::RadosClient::handle_log(MLog *m)
1035 {
1036 ceph_assert(lock.is_locked());
1037 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
1038
1039 if (log_last_version < m->version) {
1040 log_last_version = m->version;
1041
1042 if (log_cb || log_cb2) {
1043 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
1044 LogEntry e = *it;
1045 ostringstream ss;
1046 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
1047 string line = ss.str();
1048 string who = stringify(e.rank) + " " + stringify(e.addrs);
1049 string name = stringify(e.name);
1050 string level = stringify(e.prio);
1051 struct timespec stamp;
1052 e.stamp.to_timespec(&stamp);
1053
1054 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
1055 if (log_cb)
1056 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1057 stamp.tv_sec, stamp.tv_nsec,
1058 e.seq, level.c_str(), e.msg.c_str());
1059 if (log_cb2)
1060 log_cb2(log_cb_arg, line.c_str(),
1061 e.channel.c_str(),
1062 who.c_str(), name.c_str(),
1063 stamp.tv_sec, stamp.tv_nsec,
1064 e.seq, level.c_str(), e.msg.c_str());
1065 }
1066 }
1067
1068 monclient.sub_got(log_watch, log_last_version);
1069 }
1070
1071 m->put();
1072 }
1073
1074 int librados::RadosClient::service_daemon_register(
1075 const std::string& service, ///< service name (e.g., 'rgw')
1076 const std::string& name, ///< daemon name (e.g., 'gwfoo')
1077 const std::map<std::string,std::string>& metadata)
1078 {
1079 if (service_daemon) {
1080 return -EEXIST;
1081 }
1082 if (service == "osd" ||
1083 service == "mds" ||
1084 service == "client" ||
1085 service == "mon" ||
1086 service == "mgr") {
1087 // normal ceph entity types are not allowed!
1088 return -EINVAL;
1089 }
1090 if (service.empty() || name.empty()) {
1091 return -EINVAL;
1092 }
1093
1094 collect_sys_info(&daemon_metadata, cct);
1095
1096 ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1097 service_daemon = true;
1098 service_name = service;
1099 daemon_name = name;
1100 daemon_metadata.insert(metadata.begin(), metadata.end());
1101
1102 if (state == DISCONNECTED) {
1103 return 0;
1104 }
1105 if (state == CONNECTING) {
1106 return -EBUSY;
1107 }
1108 mgrclient.service_daemon_register(service_name, daemon_name,
1109 daemon_metadata);
1110 return 0;
1111 }
1112
1113 int librados::RadosClient::service_daemon_update_status(
1114 std::map<std::string,std::string>&& status)
1115 {
1116 if (state != CONNECTED) {
1117 return -ENOTCONN;
1118 }
1119 return mgrclient.service_daemon_update_status(std::move(status));
1120 }
1121
1122 mon_feature_t librados::RadosClient::get_required_monitor_features() const
1123 {
1124 return monclient.with_monmap([](const MonMap &monmap) {
1125 return monmap.get_required_features(); } );
1126 }
1127
1128 int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
1129 std::vector<std::string>* pgs)
1130 {
1131 vector<string> cmd = {
1132 "{\"prefix\": \"pg ls\","
1133 "\"pool\": " + std::to_string(pool_id) + ","
1134 "\"states\": [\"inconsistent\"],"
1135 "\"format\": \"json\"}"
1136 };
1137 bufferlist inbl, outbl;
1138 string outstring;
1139 if (auto ret = mgr_command(cmd, inbl, &outbl, &outstring); ret) {
1140 return ret;
1141 }
1142 if (!outbl.length()) {
1143 // no pg returned
1144 return 0;
1145 }
1146 JSONParser parser;
1147 if (!parser.parse(outbl.c_str(), outbl.length())) {
1148 return -EINVAL;
1149 }
1150 vector<string> v;
1151 if (!parser.is_array()) {
1152 JSONObj *pgstat_obj = parser.find_obj("pg_stats");
1153 if (!pgstat_obj)
1154 return 0;
1155 auto s = pgstat_obj->get_data();
1156 JSONParser pg_stats;
1157 if (!pg_stats.parse(s.c_str(), s.length())) {
1158 return -EINVAL;
1159 }
1160 v = pg_stats.get_array_elements();
1161 } else {
1162 v = parser.get_array_elements();
1163 }
1164 for (auto i : v) {
1165 JSONParser pg_json;
1166 if (!pg_json.parse(i.c_str(), i.length())) {
1167 return -EINVAL;
1168 }
1169 string pgid;
1170 JSONDecoder::decode_json("pgid", pgid, &pg_json);
1171 pgs->emplace_back(std::move(pgid));
1172 }
1173 return 0;
1174 }