]> git.proxmox.com Git - ceph.git/blame - ceph/src/librados/RadosClient.cc
update sources to v12.1.1
[ceph.git] / ceph / src / librados / RadosClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <fcntl.h>
18
19#include <iostream>
20#include <string>
21#include <sstream>
22#include <pthread.h>
23#include <errno.h>
24
25#include "common/ceph_context.h"
26#include "common/config.h"
27#include "common/common_init.h"
28#include "common/errno.h"
29#include "include/buffer.h"
30#include "include/stringify.h"
224ce89b 31#include "include/util.h"
7c673cae
FG
32
33#include "messages/MLog.h"
34#include "msg/Messenger.h"
35
36// needed for static_cast
37#include "messages/PaxosServiceMessage.h"
38#include "messages/MPoolOpReply.h"
39#include "messages/MStatfsReply.h"
40#include "messages/MGetPoolStatsReply.h"
41#include "messages/MOSDOpReply.h"
42#include "messages/MOSDMap.h"
43#include "messages/MCommandReply.h"
44
45#include "AioCompletionImpl.h"
46#include "IoCtxImpl.h"
47#include "PoolAsyncCompletionImpl.h"
48#include "RadosClient.h"
49
50#include "include/assert.h"
51#include "common/EventTrace.h"
52
53#define dout_subsys ceph_subsys_rados
54#undef dout_prefix
55#define dout_prefix *_dout << "librados: "
56
57bool librados::RadosClient::ms_get_authorizer(int dest_type,
58 AuthAuthorizer **authorizer,
59 bool force_new) {
60 //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
61 /* monitor authorization is being handled on different layer */
62 if (dest_type == CEPH_ENTITY_TYPE_MON)
63 return true;
64 *authorizer = monclient.build_authorizer(dest_type);
65 return *authorizer != NULL;
66}
67
68librados::RadosClient::RadosClient(CephContext *cct_)
69 : Dispatcher(cct_->get()),
70 cct_deleter{cct_, [](CephContext *p) {p->put();}},
71 conf(cct_->_conf),
72 state(DISCONNECTED),
73 monclient(cct_),
74 mgrclient(cct_, nullptr),
75 messenger(NULL),
76 instance_id(0),
77 objecter(NULL),
78 lock("librados::RadosClient::lock"),
79 timer(cct, lock),
80 refcnt(1),
31f18b77 81 log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
7c673cae
FG
82 finisher(cct, "radosclient", "fn-radosclient")
83{
84}
85
86int64_t librados::RadosClient::lookup_pool(const char *name)
87{
88 int r = wait_for_osdmap();
89 if (r < 0) {
90 return r;
91 }
92
93 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
94 name);
95 if (-ENOENT == ret) {
96 // Make sure we have the latest map
97 int r = wait_for_latest_osdmap();
98 if (r < 0)
99 return r;
100 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
101 name);
102 }
103
104 return ret;
105}
106
107bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
108{
109 bool requires;
110 int r = pool_requires_alignment2(pool_id, &requires);
111 if (r < 0) {
112 // Cast answer to false, this is a little bit problematic
113 // since we really don't know the answer yet, say.
114 return false;
115 }
116
117 return requires;
118}
119
120// a safer version of pool_requires_alignment
121int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
122 bool *requires)
123{
124 if (!requires)
125 return -EINVAL;
126
127 int r = wait_for_osdmap();
128 if (r < 0) {
129 return r;
130 }
131
132 return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
133 if (!o.have_pg_pool(pool_id)) {
134 return -ENOENT;
135 }
136 *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
137 return 0;
138 });
139}
140
141uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
142{
143 uint64_t alignment;
144 int r = pool_required_alignment2(pool_id, &alignment);
145 if (r < 0) {
146 return 0;
147 }
148
149 return alignment;
150}
151
152// a safer version of pool_required_alignment
153int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
154 uint64_t *alignment)
155{
156 if (!alignment)
157 return -EINVAL;
158
159 int r = wait_for_osdmap();
160 if (r < 0) {
161 return r;
162 }
163
164 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
165 if (!o.have_pg_pool(pool_id)) {
166 return -ENOENT;
167 }
168 *alignment = o.get_pg_pool(pool_id)->required_alignment();
169 return 0;
170 });
171}
172
173int librados::RadosClient::pool_get_auid(uint64_t pool_id,
174 unsigned long long *auid)
175{
176 int r = wait_for_osdmap();
177 if (r < 0)
178 return r;
179 objecter->with_osdmap([&](const OSDMap& o) {
180 const pg_pool_t *pg = o.get_pg_pool(pool_id);
181 if (!pg) {
182 r = -ENOENT;
183 } else {
184 r = 0;
185 *auid = pg->auid;
186 }
187 });
188 return r;
189}
190
191int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s)
192{
193 int r = wait_for_osdmap();
194 if (r < 0)
195 return r;
196 objecter->with_osdmap([&](const OSDMap& o) {
197 if (!o.have_pg_pool(pool_id)) {
198 r = -ENOENT;
199 } else {
200 r = 0;
201 *s = o.get_pool_name(pool_id);
202 }
203 });
204 return r;
205}
206
207int librados::RadosClient::get_fsid(std::string *s)
208{
209 if (!s)
210 return -EINVAL;
211 Mutex::Locker l(lock);
212 ostringstream oss;
213 oss << monclient.get_fsid();
214 *s = oss.str();
215 return 0;
216}
217
218int librados::RadosClient::ping_monitor(const string mon_id, string *result)
219{
220 int err = 0;
221 /* If we haven't yet connected, we have no way of telling whether we
222 * already built monc's initial monmap. IF we are in CONNECTED state,
223 * then it is safe to assume that we went through connect(), which does
224 * build a monmap.
225 */
226 if (state != CONNECTED) {
227 ldout(cct, 10) << __func__ << " build monmap" << dendl;
228 err = monclient.build_initial_monmap();
229 }
230 if (err < 0) {
231 return err;
232 }
233
234 err = monclient.ping_monitor(mon_id, result);
235 return err;
236}
237
238int librados::RadosClient::connect()
239{
240 common_init_finish(cct);
241
242 int err;
243
244 // already connected?
245 if (state == CONNECTING)
246 return -EINPROGRESS;
247 if (state == CONNECTED)
248 return -EISCONN;
249 state = CONNECTING;
250
251 // get monmap
252 err = monclient.build_initial_monmap();
253 if (err < 0)
254 goto out;
255
256 err = -ENOMEM;
257 messenger = Messenger::create_client_messenger(cct, "radosclient");
258 if (!messenger)
259 goto out;
260
261 // require OSDREPLYMUX feature. this means we will fail to talk to
262 // old servers. this is necessary because otherwise we won't know
263 // how to decompose the reply data into its constituent pieces.
264 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
265
266 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;
267
268 ldout(cct, 1) << "starting objecter" << dendl;
269
270 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
271 &finisher,
272 cct->_conf->rados_mon_op_timeout,
273 cct->_conf->rados_osd_op_timeout);
274 if (!objecter)
275 goto out;
276 objecter->set_balanced_budget();
277
278 monclient.set_messenger(messenger);
279 mgrclient.set_messenger(messenger);
280
281 objecter->init();
282 messenger->add_dispatcher_head(&mgrclient);
283 messenger->add_dispatcher_tail(objecter);
284 messenger->add_dispatcher_tail(this);
285
286 messenger->start();
287
288 ldout(cct, 1) << "setting wanted keys" << dendl;
289 monclient.set_want_keys(
290 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
291 ldout(cct, 1) << "calling monclient init" << dendl;
292 err = monclient.init();
293 if (err) {
294 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
295 shutdown();
296 goto out;
297 }
298
299 err = monclient.authenticate(conf->client_mount_timeout);
300 if (err) {
301 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
302 shutdown();
303 goto out;
304 }
305 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
306
307 // MgrClient needs this (it doesn't have MonClient reference itself)
308 monclient.sub_want("mgrmap", 0, 0);
309 monclient.renew_subs();
310
224ce89b
WB
311 if (service_daemon) {
312 ldout(cct, 10) << __func__ << " registering as " << service_name << "."
313 << daemon_name << dendl;
314 mgrclient.service_daemon_register(service_name, daemon_name,
315 daemon_metadata);
316 }
7c673cae
FG
317 mgrclient.init();
318
319 objecter->set_client_incarnation(0);
320 objecter->start();
321 lock.Lock();
322
323 timer.init();
324
325 finisher.start();
326
327 state = CONNECTED;
328 instance_id = monclient.get_global_id();
329
330 lock.Unlock();
331
332 ldout(cct, 1) << "init done" << dendl;
333 err = 0;
334
335 out:
336 if (err) {
337 state = DISCONNECTED;
338
339 if (objecter) {
340 delete objecter;
341 objecter = NULL;
342 }
343 if (messenger) {
344 delete messenger;
345 messenger = NULL;
346 }
347 }
348
349 return err;
350}
351
352void librados::RadosClient::shutdown()
353{
354 lock.Lock();
355 if (state == DISCONNECTED) {
356 lock.Unlock();
357 return;
358 }
359
360 bool need_objecter = false;
31f18b77 361 if (objecter && objecter->initialized) {
7c673cae
FG
362 need_objecter = true;
363 }
364
365 if (state == CONNECTED) {
366 if (need_objecter) {
367 // make sure watch callbacks are flushed
368 watch_flush();
369 }
370 finisher.wait_for_empty();
371 finisher.stop();
372 }
373 state = DISCONNECTED;
374 instance_id = 0;
375 timer.shutdown(); // will drop+retake lock
376 lock.Unlock();
377 if (need_objecter) {
378 objecter->shutdown();
379 }
380 mgrclient.shutdown();
381
382 monclient.shutdown();
383 if (messenger) {
384 messenger->shutdown();
385 messenger->wait();
386 }
387 ldout(cct, 1) << "shutdown" << dendl;
388}
389
390int librados::RadosClient::watch_flush()
391{
392 ldout(cct, 10) << __func__ << " enter" << dendl;
393 Mutex mylock("RadosClient::watch_flush::mylock");
394 Cond cond;
395 bool done;
396 objecter->linger_callback_flush(new C_SafeCond(&mylock, &cond, &done));
397
398 mylock.Lock();
399 while (!done)
400 cond.Wait(mylock);
401 mylock.Unlock();
402
403 ldout(cct, 10) << __func__ << " exit" << dendl;
404 return 0;
405}
406
407struct C_aio_watch_flush_Complete : public Context {
408 librados::RadosClient *client;
409 librados::AioCompletionImpl *c;
410
411 C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
412 : client(_client), c(_c) {
413 c->get();
414 }
415
416 void finish(int r) override {
417 c->lock.Lock();
418 c->rval = r;
419 c->complete = true;
420 c->cond.Signal();
421
422 if (c->callback_complete ||
423 c->callback_safe) {
424 client->finisher.queue(new librados::C_AioComplete(c));
425 }
426 c->put_unlock();
427 }
428};
429
430int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
431{
432 ldout(cct, 10) << __func__ << " enter" << dendl;
433 Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
434 objecter->linger_callback_flush(oncomplete);
435 ldout(cct, 10) << __func__ << " exit" << dendl;
436 return 0;
437}
438
439uint64_t librados::RadosClient::get_instance_id()
440{
441 return instance_id;
442}
443
444librados::RadosClient::~RadosClient()
445{
446 if (messenger)
447 delete messenger;
448 if (objecter)
449 delete objecter;
450 cct = NULL;
451}
452
453int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
454{
455 int64_t poolid = lookup_pool(name);
456 if (poolid < 0) {
457 return (int)poolid;
458 }
459
460 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
461 return 0;
462}
463
464int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
465{
466 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
467 return 0;
468}
469
470bool librados::RadosClient::ms_dispatch(Message *m)
471{
472 bool ret;
473
474 Mutex::Locker l(lock);
475 if (state == DISCONNECTED) {
476 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
477 m->put();
478 ret = true;
479 } else {
480 ret = _dispatch(m);
481 }
482 return ret;
483}
484
485void librados::RadosClient::ms_handle_connect(Connection *con)
486{
487}
488
489bool librados::RadosClient::ms_handle_reset(Connection *con)
490{
491 return false;
492}
493
494void librados::RadosClient::ms_handle_remote_reset(Connection *con)
495{
496}
497
498bool librados::RadosClient::ms_handle_refused(Connection *con)
499{
500 return false;
501}
502
503bool librados::RadosClient::_dispatch(Message *m)
504{
505 assert(lock.is_locked());
506 switch (m->get_type()) {
507 // OSD
508 case CEPH_MSG_OSD_MAP:
509 cond.Signal();
510 m->put();
511 break;
512
513 case CEPH_MSG_MDS_MAP:
514 m->put();
515 break;
516
517 case MSG_LOG:
518 handle_log(static_cast<MLog *>(m));
519 break;
520
521 default:
522 return false;
523 }
524
525 return true;
526}
527
528
529int librados::RadosClient::wait_for_osdmap()
530{
531 assert(!lock.is_locked_by_me());
532
533 if (state != CONNECTED) {
534 return -ENOTCONN;
535 }
536
537 bool need_map = false;
538 objecter->with_osdmap([&](const OSDMap& o) {
539 if (o.get_epoch() == 0) {
540 need_map = true;
541 }
542 });
543
544 if (need_map) {
545 Mutex::Locker l(lock);
546
547 utime_t timeout;
548 if (cct->_conf->rados_mon_op_timeout > 0)
549 timeout.set_from_double(cct->_conf->rados_mon_op_timeout);
550
551 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
552 ldout(cct, 10) << __func__ << " waiting" << dendl;
553 utime_t start = ceph_clock_now();
554 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
555 if (timeout.is_zero()) {
556 cond.Wait(lock);
557 } else {
558 cond.WaitInterval(lock, timeout);
559 utime_t elapsed = ceph_clock_now() - start;
560 if (elapsed > timeout) {
561 lderr(cct) << "timed out waiting for first osdmap from monitors"
562 << dendl;
563 return -ETIMEDOUT;
564 }
565 }
566 }
567 ldout(cct, 10) << __func__ << " done waiting" << dendl;
568 }
569 return 0;
570 } else {
571 return 0;
572 }
573}
574
575
576int librados::RadosClient::wait_for_latest_osdmap()
577{
578 Mutex mylock("RadosClient::wait_for_latest_osdmap");
579 Cond cond;
580 bool done;
581
582 objecter->wait_for_latest_osdmap(new C_SafeCond(&mylock, &cond, &done));
583
584 mylock.Lock();
585 while (!done)
586 cond.Wait(mylock);
587 mylock.Unlock();
588
589 return 0;
590}
591
592int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
593{
594 int r = wait_for_osdmap();
595 if (r < 0)
596 return r;
597
598 objecter->with_osdmap([&](const OSDMap& o) {
599 for (auto p : o.get_pools())
600 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
601 });
602 return 0;
603}
604
605int librados::RadosClient::get_pool_stats(std::list<string>& pools,
606 map<string,::pool_stat_t>& result)
607{
608 Mutex mylock("RadosClient::get_pool_stats::mylock");
609 Cond cond;
610 bool done;
611 int ret = 0;
612
613 objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done,
614 &ret));
615
616 mylock.Lock();
617 while (!done)
618 cond.Wait(mylock);
619 mylock.Unlock();
620
621 return ret;
622}
623
624bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
625 const std::string& pool)
626{
627 bool ret = false;
628 objecter->with_osdmap([&](const OSDMap& osdmap) {
629 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
630 if (poolid >= 0)
631 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
632 });
633 return ret;
634}
635
636int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
637{
638 Mutex mylock ("RadosClient::get_fs_stats::mylock");
639 Cond cond;
640 bool done;
641 int ret = 0;
642
643 lock.Lock();
644 objecter->get_fs_stats(stats, new C_SafeCond(&mylock, &cond, &done, &ret));
645 lock.Unlock();
646
647 mylock.Lock();
648 while (!done) cond.Wait(mylock);
649 mylock.Unlock();
650
651 return ret;
652}
653
654void librados::RadosClient::get() {
655 Mutex::Locker l(lock);
656 assert(refcnt > 0);
657 refcnt++;
658}
659
660bool librados::RadosClient::put() {
661 Mutex::Locker l(lock);
662 assert(refcnt > 0);
663 refcnt--;
664 return (refcnt == 0);
665}
666
667int librados::RadosClient::pool_create(string& name, unsigned long long auid,
668 int16_t crush_rule)
669{
670 int r = wait_for_osdmap();
671 if (r < 0) {
672 return r;
673 }
674
675 Mutex mylock ("RadosClient::pool_create::mylock");
676 int reply;
677 Cond cond;
678 bool done;
679 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
680 reply = objecter->create_pool(name, onfinish, auid, crush_rule);
681
682 if (reply < 0) {
683 delete onfinish;
684 } else {
685 mylock.Lock();
686 while(!done)
687 cond.Wait(mylock);
688 mylock.Unlock();
689 }
690 return reply;
691}
692
693int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c,
694 unsigned long long auid,
695 int16_t crush_rule)
696{
697 int r = wait_for_osdmap();
698 if (r < 0)
699 return r;
700
701 Context *onfinish = new C_PoolAsync_Safe(c);
702 r = objecter->create_pool(name, onfinish, auid, crush_rule);
703 if (r < 0) {
704 delete onfinish;
705 }
706 return r;
707}
708
709int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
710{
711 int r = wait_for_osdmap();
712 if (r < 0) {
713 return r;
714 }
715
716 objecter->with_osdmap([&](const OSDMap& o) {
717 const pg_pool_t* pool = o.get_pg_pool(pool_id);
718 if (pool) {
719 if (pool->tier_of < 0) {
720 *base_tier = pool_id;
721 } else {
722 *base_tier = pool->tier_of;
723 }
724 r = 0;
725 } else {
726 r = -ENOENT;
727 }
728 });
729 return r;
730}
731
732int librados::RadosClient::pool_delete(const char *name)
733{
734 int r = wait_for_osdmap();
735 if (r < 0) {
736 return r;
737 }
738
739 Mutex mylock("RadosClient::pool_delete::mylock");
740 Cond cond;
741 bool done;
742 int ret;
743 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &ret);
744 ret = objecter->delete_pool(name, onfinish);
745
746 if (ret < 0) {
747 delete onfinish;
748 } else {
749 mylock.Lock();
750 while (!done)
751 cond.Wait(mylock);
752 mylock.Unlock();
753 }
754 return ret;
755}
756
757int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
758{
759 int r = wait_for_osdmap();
760 if (r < 0)
761 return r;
762
763 Context *onfinish = new C_PoolAsync_Safe(c);
764 r = objecter->delete_pool(name, onfinish);
765 if (r < 0) {
766 delete onfinish;
767 }
768 return r;
769}
770
771void librados::RadosClient::blacklist_self(bool set) {
772 Mutex::Locker l(lock);
773 objecter->blacklist_self(set);
774}
775
776int librados::RadosClient::blacklist_add(const string& client_address,
777 uint32_t expire_seconds)
778{
779 entity_addr_t addr;
780 if (!addr.parse(client_address.c_str(), 0)) {
781 lderr(cct) << "unable to parse address " << client_address << dendl;
782 return -EINVAL;
783 }
784
785 std::stringstream cmd;
786 cmd << "{"
787 << "\"prefix\": \"osd blacklist\", "
788 << "\"blacklistop\": \"add\", "
789 << "\"addr\": \"" << client_address << "\"";
790 if (expire_seconds != 0) {
791 cmd << ", \"expire\": " << expire_seconds << ".0";
792 }
793 cmd << "}";
794
795 std::vector<std::string> cmds;
796 cmds.push_back(cmd.str());
797 bufferlist inbl;
798 int r = mon_command(cmds, inbl, NULL, NULL);
799 if (r < 0) {
800 return r;
801 }
802
803 // ensure we have the latest osd map epoch before proceeding
804 r = wait_for_latest_osdmap();
805 return r;
806}
807
808int librados::RadosClient::mon_command(const vector<string>& cmd,
809 const bufferlist &inbl,
810 bufferlist *outbl, string *outs)
811{
812 Mutex mylock("RadosClient::mon_command::mylock");
813 Cond cond;
814 bool done;
815 int rval;
816 lock.Lock();
817 monclient.start_mon_command(cmd, inbl, outbl, outs,
818 new C_SafeCond(&mylock, &cond, &done, &rval));
819 lock.Unlock();
820 mylock.Lock();
821 while (!done)
822 cond.Wait(mylock);
823 mylock.Unlock();
824 return rval;
825}
826
827
828int librados::RadosClient::mgr_command(const vector<string>& cmd,
829 const bufferlist &inbl,
830 bufferlist *outbl, string *outs)
831{
832 Mutex::Locker l(lock);
833
834 C_SaferCond cond;
835 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
836 if (r < 0)
837 return r;
838
839 lock.Unlock();
840 r = cond.wait();
841 lock.Lock();
842
843 return r;
844}
845
846
847int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
848 const bufferlist &inbl,
849 bufferlist *outbl, string *outs)
850{
851 Mutex mylock("RadosClient::mon_command::mylock");
852 Cond cond;
853 bool done;
854 int rval;
855 lock.Lock();
856 monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
857 new C_SafeCond(&mylock, &cond, &done, &rval));
858 lock.Unlock();
859 mylock.Lock();
860 while (!done)
861 cond.Wait(mylock);
862 mylock.Unlock();
863 return rval;
864}
865
866int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
867 const bufferlist &inbl,
868 bufferlist *outbl, string *outs)
869{
870 Mutex mylock("RadosClient::mon_command::mylock");
871 Cond cond;
872 bool done;
873 int rval;
874 lock.Lock();
875 monclient.start_mon_command(name, cmd, inbl, outbl, outs,
876 new C_SafeCond(&mylock, &cond, &done, &rval));
877 lock.Unlock();
878 mylock.Lock();
879 while (!done)
880 cond.Wait(mylock);
881 mylock.Unlock();
882 return rval;
883}
884
885int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
886 const bufferlist& inbl,
887 bufferlist *poutbl, string *prs)
888{
889 Mutex mylock("RadosClient::osd_command::mylock");
890 Cond cond;
891 bool done;
892 int ret;
893 ceph_tid_t tid;
894
895 if (osd < 0)
896 return -EINVAL;
897
898 lock.Lock();
899 // XXX do anything with tid?
900 objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
901 new C_SafeCond(&mylock, &cond, &done, &ret));
902 lock.Unlock();
903 mylock.Lock();
904 while (!done)
905 cond.Wait(mylock);
906 mylock.Unlock();
907 return ret;
908}
909
910int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
911 const bufferlist& inbl,
912 bufferlist *poutbl, string *prs)
913{
914 Mutex mylock("RadosClient::pg_command::mylock");
915 Cond cond;
916 bool done;
917 int ret;
918 ceph_tid_t tid;
919 lock.Lock();
920 objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
921 new C_SafeCond(&mylock, &cond, &done, &ret));
922 lock.Unlock();
923 mylock.Lock();
924 while (!done)
925 cond.Wait(mylock);
926 mylock.Unlock();
927 return ret;
928}
929
31f18b77
FG
930int librados::RadosClient::monitor_log(const string& level,
931 rados_log_callback_t cb,
932 rados_log_callback2_t cb2,
933 void *arg)
7c673cae
FG
934{
935 Mutex::Locker l(lock);
936
937 if (state != CONNECTED) {
938 return -ENOTCONN;
939 }
940
31f18b77 941 if (cb == NULL && cb2 == NULL) {
7c673cae 942 // stop watch
31f18b77
FG
943 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
944 << " " << (void*)log_cb2 << dendl;
7c673cae
FG
945 monclient.sub_unwant(log_watch);
946 log_watch.clear();
947 log_cb = NULL;
31f18b77 948 log_cb2 = NULL;
7c673cae
FG
949 log_cb_arg = NULL;
950 return 0;
951 }
952
953 string watch_level;
954 if (level == "debug") {
955 watch_level = "log-debug";
956 } else if (level == "info") {
957 watch_level = "log-info";
958 } else if (level == "warn" || level == "warning") {
959 watch_level = "log-warn";
960 } else if (level == "err" || level == "error") {
961 watch_level = "log-error";
962 } else if (level == "sec") {
963 watch_level = "log-sec";
964 } else {
965 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
966 return -EINVAL;
967 }
968
31f18b77 969 if (log_cb || log_cb2)
7c673cae
FG
970 monclient.sub_unwant(log_watch);
971
972 // (re)start watch
31f18b77
FG
973 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
974 << " level " << level << dendl;
7c673cae
FG
975 monclient.sub_want(watch_level, 0, 0);
976
977 monclient.renew_subs();
978 log_cb = cb;
31f18b77 979 log_cb2 = cb2;
7c673cae
FG
980 log_cb_arg = arg;
981 log_watch = watch_level;
982 return 0;
983}
984
985void librados::RadosClient::handle_log(MLog *m)
986{
987 assert(lock.is_locked());
988 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
989
990 if (log_last_version < m->version) {
991 log_last_version = m->version;
992
31f18b77 993 if (log_cb || log_cb2) {
7c673cae
FG
994 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
995 LogEntry e = *it;
996 ostringstream ss;
31f18b77 997 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
7c673cae
FG
998 string line = ss.str();
999 string who = stringify(e.who);
31f18b77 1000 string name = stringify(e.name);
7c673cae
FG
1001 string level = stringify(e.prio);
1002 struct timespec stamp;
1003 e.stamp.to_timespec(&stamp);
1004
1005 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
31f18b77
FG
1006 if (log_cb)
1007 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1008 stamp.tv_sec, stamp.tv_nsec,
1009 e.seq, level.c_str(), e.msg.c_str());
1010 if (log_cb2)
224ce89b
WB
1011 log_cb2(log_cb_arg, line.c_str(),
1012 e.channel.c_str(),
1013 who.c_str(), name.c_str(),
31f18b77
FG
1014 stamp.tv_sec, stamp.tv_nsec,
1015 e.seq, level.c_str(), e.msg.c_str());
7c673cae
FG
1016 }
1017 }
1018
1019 monclient.sub_got(log_watch, log_last_version);
1020 }
1021
1022 m->put();
1023}
224ce89b
WB
1024
1025int librados::RadosClient::service_daemon_register(
1026 const std::string& service, ///< service name (e.g., 'rgw')
1027 const std::string& name, ///< daemon name (e.g., 'gwfoo')
1028 const std::map<std::string,std::string>& metadata)
1029{
1030 if (service_daemon) {
1031 return -EEXIST;
1032 }
1033 if (service == "osd" ||
1034 service == "mds" ||
1035 service == "client" ||
1036 service == "mon" ||
1037 service == "mgr") {
1038 // normal ceph entity types are not allowed!
1039 return -EINVAL;
1040 }
1041 if (service.empty() || name.empty()) {
1042 return -EINVAL;
1043 }
1044
1045 collect_sys_info(&daemon_metadata, cct);
1046
1047 ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1048 service_daemon = true;
1049 service_name = service;
1050 daemon_name = name;
1051 daemon_metadata.insert(metadata.begin(), metadata.end());
1052
1053 if (state == DISCONNECTED) {
1054 return 0;
1055 }
1056 if (state == CONNECTING) {
1057 return -EBUSY;
1058 }
1059 mgrclient.service_daemon_register(service_name, daemon_name,
1060 daemon_metadata);
1061 return 0;
1062}
1063
1064int librados::RadosClient::service_daemon_update_status(
1065 const std::map<std::string,std::string>& status)
1066{
1067 if (state != CONNECTED) {
1068 return -ENOTCONN;
1069 }
1070 return mgrclient.service_daemon_update_status(status);
1071}