]> git.proxmox.com Git - ceph.git/blob - ceph/src/librados/RadosClient.cc
e3215c96a025d7a4304e095f6ad31e20126e43ad
[ceph.git] / ceph / src / librados / RadosClient.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <sys/types.h>
16 #include <sys/stat.h>
17 #include <fcntl.h>
18
19 #include <iostream>
20 #include <string>
21 #include <sstream>
22 #include <pthread.h>
23 #include <errno.h>
24
25 #include "common/ceph_context.h"
26 #include "common/config.h"
27 #include "common/common_init.h"
28 #include "common/errno.h"
29 #include "include/buffer.h"
30 #include "include/stringify.h"
31
32 #include "messages/MLog.h"
33 #include "msg/Messenger.h"
34
35 // needed for static_cast
36 #include "messages/PaxosServiceMessage.h"
37 #include "messages/MPoolOpReply.h"
38 #include "messages/MStatfsReply.h"
39 #include "messages/MGetPoolStatsReply.h"
40 #include "messages/MOSDOpReply.h"
41 #include "messages/MOSDMap.h"
42 #include "messages/MCommandReply.h"
43
44 #include "AioCompletionImpl.h"
45 #include "IoCtxImpl.h"
46 #include "PoolAsyncCompletionImpl.h"
47 #include "RadosClient.h"
48
49 #include "include/assert.h"
50 #include "common/EventTrace.h"
51
52 #define dout_subsys ceph_subsys_rados
53 #undef dout_prefix
54 #define dout_prefix *_dout << "librados: "
55
56 bool librados::RadosClient::ms_get_authorizer(int dest_type,
57 AuthAuthorizer **authorizer,
58 bool force_new) {
59 //ldout(cct, 0) << "RadosClient::ms_get_authorizer type=" << dest_type << dendl;
60 /* monitor authorization is being handled on different layer */
61 if (dest_type == CEPH_ENTITY_TYPE_MON)
62 return true;
63 *authorizer = monclient.build_authorizer(dest_type);
64 return *authorizer != NULL;
65 }
66
67 librados::RadosClient::RadosClient(CephContext *cct_)
68 : Dispatcher(cct_->get()),
69 cct_deleter{cct_, [](CephContext *p) {p->put();}},
70 conf(cct_->_conf),
71 state(DISCONNECTED),
72 monclient(cct_),
73 mgrclient(cct_, nullptr),
74 messenger(NULL),
75 instance_id(0),
76 objecter(NULL),
77 lock("librados::RadosClient::lock"),
78 timer(cct, lock),
79 refcnt(1),
80 log_last_version(0), log_cb(NULL), log_cb2(NULL), log_cb_arg(NULL),
81 finisher(cct, "radosclient", "fn-radosclient")
82 {
83 }
84
85 int64_t librados::RadosClient::lookup_pool(const char *name)
86 {
87 int r = wait_for_osdmap();
88 if (r < 0) {
89 return r;
90 }
91
92 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
93 name);
94 if (-ENOENT == ret) {
95 // Make sure we have the latest map
96 int r = wait_for_latest_osdmap();
97 if (r < 0)
98 return r;
99 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
100 name);
101 }
102
103 return ret;
104 }
105
106 bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
107 {
108 bool requires;
109 int r = pool_requires_alignment2(pool_id, &requires);
110 if (r < 0) {
111 // Cast answer to false, this is a little bit problematic
112 // since we really don't know the answer yet, say.
113 return false;
114 }
115
116 return requires;
117 }
118
119 // a safer version of pool_requires_alignment
120 int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
121 bool *requires)
122 {
123 if (!requires)
124 return -EINVAL;
125
126 int r = wait_for_osdmap();
127 if (r < 0) {
128 return r;
129 }
130
131 return objecter->with_osdmap([requires, pool_id](const OSDMap& o) {
132 if (!o.have_pg_pool(pool_id)) {
133 return -ENOENT;
134 }
135 *requires = o.get_pg_pool(pool_id)->requires_aligned_append();
136 return 0;
137 });
138 }
139
140 uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
141 {
142 uint64_t alignment;
143 int r = pool_required_alignment2(pool_id, &alignment);
144 if (r < 0) {
145 return 0;
146 }
147
148 return alignment;
149 }
150
151 // a safer version of pool_required_alignment
152 int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
153 uint64_t *alignment)
154 {
155 if (!alignment)
156 return -EINVAL;
157
158 int r = wait_for_osdmap();
159 if (r < 0) {
160 return r;
161 }
162
163 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
164 if (!o.have_pg_pool(pool_id)) {
165 return -ENOENT;
166 }
167 *alignment = o.get_pg_pool(pool_id)->required_alignment();
168 return 0;
169 });
170 }
171
172 int librados::RadosClient::pool_get_auid(uint64_t pool_id,
173 unsigned long long *auid)
174 {
175 int r = wait_for_osdmap();
176 if (r < 0)
177 return r;
178 objecter->with_osdmap([&](const OSDMap& o) {
179 const pg_pool_t *pg = o.get_pg_pool(pool_id);
180 if (!pg) {
181 r = -ENOENT;
182 } else {
183 r = 0;
184 *auid = pg->auid;
185 }
186 });
187 return r;
188 }
189
190 int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s)
191 {
192 int r = wait_for_osdmap();
193 if (r < 0)
194 return r;
195 objecter->with_osdmap([&](const OSDMap& o) {
196 if (!o.have_pg_pool(pool_id)) {
197 r = -ENOENT;
198 } else {
199 r = 0;
200 *s = o.get_pool_name(pool_id);
201 }
202 });
203 return r;
204 }
205
206 int librados::RadosClient::get_fsid(std::string *s)
207 {
208 if (!s)
209 return -EINVAL;
210 Mutex::Locker l(lock);
211 ostringstream oss;
212 oss << monclient.get_fsid();
213 *s = oss.str();
214 return 0;
215 }
216
217 int librados::RadosClient::ping_monitor(const string mon_id, string *result)
218 {
219 int err = 0;
220 /* If we haven't yet connected, we have no way of telling whether we
221 * already built monc's initial monmap. IF we are in CONNECTED state,
222 * then it is safe to assume that we went through connect(), which does
223 * build a monmap.
224 */
225 if (state != CONNECTED) {
226 ldout(cct, 10) << __func__ << " build monmap" << dendl;
227 err = monclient.build_initial_monmap();
228 }
229 if (err < 0) {
230 return err;
231 }
232
233 err = monclient.ping_monitor(mon_id, result);
234 return err;
235 }
236
237 int librados::RadosClient::connect()
238 {
239 common_init_finish(cct);
240
241 int err;
242
243 // already connected?
244 if (state == CONNECTING)
245 return -EINPROGRESS;
246 if (state == CONNECTED)
247 return -EISCONN;
248 state = CONNECTING;
249
250 // get monmap
251 err = monclient.build_initial_monmap();
252 if (err < 0)
253 goto out;
254
255 err = -ENOMEM;
256 messenger = Messenger::create_client_messenger(cct, "radosclient");
257 if (!messenger)
258 goto out;
259
260 // require OSDREPLYMUX feature. this means we will fail to talk to
261 // old servers. this is necessary because otherwise we won't know
262 // how to decompose the reply data into its constituent pieces.
263 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
264
265 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddr() << dendl;
266
267 ldout(cct, 1) << "starting objecter" << dendl;
268
269 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient,
270 &finisher,
271 cct->_conf->rados_mon_op_timeout,
272 cct->_conf->rados_osd_op_timeout);
273 if (!objecter)
274 goto out;
275 objecter->set_balanced_budget();
276
277 monclient.set_messenger(messenger);
278 mgrclient.set_messenger(messenger);
279
280 objecter->init();
281 messenger->add_dispatcher_head(&mgrclient);
282 messenger->add_dispatcher_tail(objecter);
283 messenger->add_dispatcher_tail(this);
284
285 messenger->start();
286
287 ldout(cct, 1) << "setting wanted keys" << dendl;
288 monclient.set_want_keys(
289 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
290 ldout(cct, 1) << "calling monclient init" << dendl;
291 err = monclient.init();
292 if (err) {
293 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
294 shutdown();
295 goto out;
296 }
297
298 err = monclient.authenticate(conf->client_mount_timeout);
299 if (err) {
300 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
301 shutdown();
302 goto out;
303 }
304 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
305
306 // MgrClient needs this (it doesn't have MonClient reference itself)
307 monclient.sub_want("mgrmap", 0, 0);
308 monclient.renew_subs();
309
310 mgrclient.init();
311
312 objecter->set_client_incarnation(0);
313 objecter->start();
314 lock.Lock();
315
316 timer.init();
317
318 finisher.start();
319
320 state = CONNECTED;
321 instance_id = monclient.get_global_id();
322
323 lock.Unlock();
324
325 ldout(cct, 1) << "init done" << dendl;
326 err = 0;
327
328 out:
329 if (err) {
330 state = DISCONNECTED;
331
332 if (objecter) {
333 delete objecter;
334 objecter = NULL;
335 }
336 if (messenger) {
337 delete messenger;
338 messenger = NULL;
339 }
340 }
341
342 return err;
343 }
344
345 void librados::RadosClient::shutdown()
346 {
347 lock.Lock();
348 if (state == DISCONNECTED) {
349 lock.Unlock();
350 return;
351 }
352
353 bool need_objecter = false;
354 if (objecter && objecter->initialized) {
355 need_objecter = true;
356 }
357
358 if (state == CONNECTED) {
359 if (need_objecter) {
360 // make sure watch callbacks are flushed
361 watch_flush();
362 }
363 finisher.wait_for_empty();
364 finisher.stop();
365 }
366 state = DISCONNECTED;
367 instance_id = 0;
368 timer.shutdown(); // will drop+retake lock
369 lock.Unlock();
370 if (need_objecter) {
371 objecter->shutdown();
372 }
373 mgrclient.shutdown();
374
375 monclient.shutdown();
376 if (messenger) {
377 messenger->shutdown();
378 messenger->wait();
379 }
380 ldout(cct, 1) << "shutdown" << dendl;
381 }
382
383 int librados::RadosClient::watch_flush()
384 {
385 ldout(cct, 10) << __func__ << " enter" << dendl;
386 Mutex mylock("RadosClient::watch_flush::mylock");
387 Cond cond;
388 bool done;
389 objecter->linger_callback_flush(new C_SafeCond(&mylock, &cond, &done));
390
391 mylock.Lock();
392 while (!done)
393 cond.Wait(mylock);
394 mylock.Unlock();
395
396 ldout(cct, 10) << __func__ << " exit" << dendl;
397 return 0;
398 }
399
400 struct C_aio_watch_flush_Complete : public Context {
401 librados::RadosClient *client;
402 librados::AioCompletionImpl *c;
403
404 C_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
405 : client(_client), c(_c) {
406 c->get();
407 }
408
409 void finish(int r) override {
410 c->lock.Lock();
411 c->rval = r;
412 c->complete = true;
413 c->cond.Signal();
414
415 if (c->callback_complete ||
416 c->callback_safe) {
417 client->finisher.queue(new librados::C_AioComplete(c));
418 }
419 c->put_unlock();
420 }
421 };
422
423 int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
424 {
425 ldout(cct, 10) << __func__ << " enter" << dendl;
426 Context *oncomplete = new C_aio_watch_flush_Complete(this, c);
427 objecter->linger_callback_flush(oncomplete);
428 ldout(cct, 10) << __func__ << " exit" << dendl;
429 return 0;
430 }
431
432 uint64_t librados::RadosClient::get_instance_id()
433 {
434 return instance_id;
435 }
436
437 librados::RadosClient::~RadosClient()
438 {
439 if (messenger)
440 delete messenger;
441 if (objecter)
442 delete objecter;
443 cct = NULL;
444 }
445
446 int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
447 {
448 int64_t poolid = lookup_pool(name);
449 if (poolid < 0) {
450 return (int)poolid;
451 }
452
453 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
454 return 0;
455 }
456
457 int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
458 {
459 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
460 return 0;
461 }
462
463 bool librados::RadosClient::ms_dispatch(Message *m)
464 {
465 bool ret;
466
467 Mutex::Locker l(lock);
468 if (state == DISCONNECTED) {
469 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
470 m->put();
471 ret = true;
472 } else {
473 ret = _dispatch(m);
474 }
475 return ret;
476 }
477
478 void librados::RadosClient::ms_handle_connect(Connection *con)
479 {
480 }
481
482 bool librados::RadosClient::ms_handle_reset(Connection *con)
483 {
484 return false;
485 }
486
487 void librados::RadosClient::ms_handle_remote_reset(Connection *con)
488 {
489 }
490
491 bool librados::RadosClient::ms_handle_refused(Connection *con)
492 {
493 return false;
494 }
495
496 bool librados::RadosClient::_dispatch(Message *m)
497 {
498 assert(lock.is_locked());
499 switch (m->get_type()) {
500 // OSD
501 case CEPH_MSG_OSD_MAP:
502 cond.Signal();
503 m->put();
504 break;
505
506 case CEPH_MSG_MDS_MAP:
507 m->put();
508 break;
509
510 case MSG_LOG:
511 handle_log(static_cast<MLog *>(m));
512 break;
513
514 default:
515 return false;
516 }
517
518 return true;
519 }
520
521
522 int librados::RadosClient::wait_for_osdmap()
523 {
524 assert(!lock.is_locked_by_me());
525
526 if (state != CONNECTED) {
527 return -ENOTCONN;
528 }
529
530 bool need_map = false;
531 objecter->with_osdmap([&](const OSDMap& o) {
532 if (o.get_epoch() == 0) {
533 need_map = true;
534 }
535 });
536
537 if (need_map) {
538 Mutex::Locker l(lock);
539
540 utime_t timeout;
541 if (cct->_conf->rados_mon_op_timeout > 0)
542 timeout.set_from_double(cct->_conf->rados_mon_op_timeout);
543
544 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
545 ldout(cct, 10) << __func__ << " waiting" << dendl;
546 utime_t start = ceph_clock_now();
547 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
548 if (timeout.is_zero()) {
549 cond.Wait(lock);
550 } else {
551 cond.WaitInterval(lock, timeout);
552 utime_t elapsed = ceph_clock_now() - start;
553 if (elapsed > timeout) {
554 lderr(cct) << "timed out waiting for first osdmap from monitors"
555 << dendl;
556 return -ETIMEDOUT;
557 }
558 }
559 }
560 ldout(cct, 10) << __func__ << " done waiting" << dendl;
561 }
562 return 0;
563 } else {
564 return 0;
565 }
566 }
567
568
569 int librados::RadosClient::wait_for_latest_osdmap()
570 {
571 Mutex mylock("RadosClient::wait_for_latest_osdmap");
572 Cond cond;
573 bool done;
574
575 objecter->wait_for_latest_osdmap(new C_SafeCond(&mylock, &cond, &done));
576
577 mylock.Lock();
578 while (!done)
579 cond.Wait(mylock);
580 mylock.Unlock();
581
582 return 0;
583 }
584
585 int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
586 {
587 int r = wait_for_osdmap();
588 if (r < 0)
589 return r;
590
591 objecter->with_osdmap([&](const OSDMap& o) {
592 for (auto p : o.get_pools())
593 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
594 });
595 return 0;
596 }
597
598 int librados::RadosClient::get_pool_stats(std::list<string>& pools,
599 map<string,::pool_stat_t>& result)
600 {
601 Mutex mylock("RadosClient::get_pool_stats::mylock");
602 Cond cond;
603 bool done;
604 int ret = 0;
605
606 objecter->get_pool_stats(pools, &result, new C_SafeCond(&mylock, &cond, &done,
607 &ret));
608
609 mylock.Lock();
610 while (!done)
611 cond.Wait(mylock);
612 mylock.Unlock();
613
614 return ret;
615 }
616
617 bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
618 const std::string& pool)
619 {
620 bool ret = false;
621 objecter->with_osdmap([&](const OSDMap& osdmap) {
622 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
623 if (poolid >= 0)
624 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
625 });
626 return ret;
627 }
628
629 int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
630 {
631 Mutex mylock ("RadosClient::get_fs_stats::mylock");
632 Cond cond;
633 bool done;
634 int ret = 0;
635
636 lock.Lock();
637 objecter->get_fs_stats(stats, new C_SafeCond(&mylock, &cond, &done, &ret));
638 lock.Unlock();
639
640 mylock.Lock();
641 while (!done) cond.Wait(mylock);
642 mylock.Unlock();
643
644 return ret;
645 }
646
647 void librados::RadosClient::get() {
648 Mutex::Locker l(lock);
649 assert(refcnt > 0);
650 refcnt++;
651 }
652
653 bool librados::RadosClient::put() {
654 Mutex::Locker l(lock);
655 assert(refcnt > 0);
656 refcnt--;
657 return (refcnt == 0);
658 }
659
660 int librados::RadosClient::pool_create(string& name, unsigned long long auid,
661 int16_t crush_rule)
662 {
663 int r = wait_for_osdmap();
664 if (r < 0) {
665 return r;
666 }
667
668 Mutex mylock ("RadosClient::pool_create::mylock");
669 int reply;
670 Cond cond;
671 bool done;
672 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
673 reply = objecter->create_pool(name, onfinish, auid, crush_rule);
674
675 if (reply < 0) {
676 delete onfinish;
677 } else {
678 mylock.Lock();
679 while(!done)
680 cond.Wait(mylock);
681 mylock.Unlock();
682 }
683 return reply;
684 }
685
686 int librados::RadosClient::pool_create_async(string& name, PoolAsyncCompletionImpl *c,
687 unsigned long long auid,
688 int16_t crush_rule)
689 {
690 int r = wait_for_osdmap();
691 if (r < 0)
692 return r;
693
694 Context *onfinish = new C_PoolAsync_Safe(c);
695 r = objecter->create_pool(name, onfinish, auid, crush_rule);
696 if (r < 0) {
697 delete onfinish;
698 }
699 return r;
700 }
701
702 int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
703 {
704 int r = wait_for_osdmap();
705 if (r < 0) {
706 return r;
707 }
708
709 objecter->with_osdmap([&](const OSDMap& o) {
710 const pg_pool_t* pool = o.get_pg_pool(pool_id);
711 if (pool) {
712 if (pool->tier_of < 0) {
713 *base_tier = pool_id;
714 } else {
715 *base_tier = pool->tier_of;
716 }
717 r = 0;
718 } else {
719 r = -ENOENT;
720 }
721 });
722 return r;
723 }
724
725 int librados::RadosClient::pool_delete(const char *name)
726 {
727 int r = wait_for_osdmap();
728 if (r < 0) {
729 return r;
730 }
731
732 Mutex mylock("RadosClient::pool_delete::mylock");
733 Cond cond;
734 bool done;
735 int ret;
736 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &ret);
737 ret = objecter->delete_pool(name, onfinish);
738
739 if (ret < 0) {
740 delete onfinish;
741 } else {
742 mylock.Lock();
743 while (!done)
744 cond.Wait(mylock);
745 mylock.Unlock();
746 }
747 return ret;
748 }
749
750 int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
751 {
752 int r = wait_for_osdmap();
753 if (r < 0)
754 return r;
755
756 Context *onfinish = new C_PoolAsync_Safe(c);
757 r = objecter->delete_pool(name, onfinish);
758 if (r < 0) {
759 delete onfinish;
760 }
761 return r;
762 }
763
764 void librados::RadosClient::blacklist_self(bool set) {
765 Mutex::Locker l(lock);
766 objecter->blacklist_self(set);
767 }
768
769 int librados::RadosClient::blacklist_add(const string& client_address,
770 uint32_t expire_seconds)
771 {
772 entity_addr_t addr;
773 if (!addr.parse(client_address.c_str(), 0)) {
774 lderr(cct) << "unable to parse address " << client_address << dendl;
775 return -EINVAL;
776 }
777
778 std::stringstream cmd;
779 cmd << "{"
780 << "\"prefix\": \"osd blacklist\", "
781 << "\"blacklistop\": \"add\", "
782 << "\"addr\": \"" << client_address << "\"";
783 if (expire_seconds != 0) {
784 cmd << ", \"expire\": " << expire_seconds << ".0";
785 }
786 cmd << "}";
787
788 std::vector<std::string> cmds;
789 cmds.push_back(cmd.str());
790 bufferlist inbl;
791 int r = mon_command(cmds, inbl, NULL, NULL);
792 if (r < 0) {
793 return r;
794 }
795
796 // ensure we have the latest osd map epoch before proceeding
797 r = wait_for_latest_osdmap();
798 return r;
799 }
800
801 int librados::RadosClient::mon_command(const vector<string>& cmd,
802 const bufferlist &inbl,
803 bufferlist *outbl, string *outs)
804 {
805 Mutex mylock("RadosClient::mon_command::mylock");
806 Cond cond;
807 bool done;
808 int rval;
809 lock.Lock();
810 monclient.start_mon_command(cmd, inbl, outbl, outs,
811 new C_SafeCond(&mylock, &cond, &done, &rval));
812 lock.Unlock();
813 mylock.Lock();
814 while (!done)
815 cond.Wait(mylock);
816 mylock.Unlock();
817 return rval;
818 }
819
820
821 int librados::RadosClient::mgr_command(const vector<string>& cmd,
822 const bufferlist &inbl,
823 bufferlist *outbl, string *outs)
824 {
825 Mutex::Locker l(lock);
826
827 C_SaferCond cond;
828 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
829 if (r < 0)
830 return r;
831
832 lock.Unlock();
833 r = cond.wait();
834 lock.Lock();
835
836 return r;
837 }
838
839
840 int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
841 const bufferlist &inbl,
842 bufferlist *outbl, string *outs)
843 {
844 Mutex mylock("RadosClient::mon_command::mylock");
845 Cond cond;
846 bool done;
847 int rval;
848 lock.Lock();
849 monclient.start_mon_command(rank, cmd, inbl, outbl, outs,
850 new C_SafeCond(&mylock, &cond, &done, &rval));
851 lock.Unlock();
852 mylock.Lock();
853 while (!done)
854 cond.Wait(mylock);
855 mylock.Unlock();
856 return rval;
857 }
858
859 int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
860 const bufferlist &inbl,
861 bufferlist *outbl, string *outs)
862 {
863 Mutex mylock("RadosClient::mon_command::mylock");
864 Cond cond;
865 bool done;
866 int rval;
867 lock.Lock();
868 monclient.start_mon_command(name, cmd, inbl, outbl, outs,
869 new C_SafeCond(&mylock, &cond, &done, &rval));
870 lock.Unlock();
871 mylock.Lock();
872 while (!done)
873 cond.Wait(mylock);
874 mylock.Unlock();
875 return rval;
876 }
877
878 int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
879 const bufferlist& inbl,
880 bufferlist *poutbl, string *prs)
881 {
882 Mutex mylock("RadosClient::osd_command::mylock");
883 Cond cond;
884 bool done;
885 int ret;
886 ceph_tid_t tid;
887
888 if (osd < 0)
889 return -EINVAL;
890
891 lock.Lock();
892 // XXX do anything with tid?
893 objecter->osd_command(osd, cmd, inbl, &tid, poutbl, prs,
894 new C_SafeCond(&mylock, &cond, &done, &ret));
895 lock.Unlock();
896 mylock.Lock();
897 while (!done)
898 cond.Wait(mylock);
899 mylock.Unlock();
900 return ret;
901 }
902
903 int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
904 const bufferlist& inbl,
905 bufferlist *poutbl, string *prs)
906 {
907 Mutex mylock("RadosClient::pg_command::mylock");
908 Cond cond;
909 bool done;
910 int ret;
911 ceph_tid_t tid;
912 lock.Lock();
913 objecter->pg_command(pgid, cmd, inbl, &tid, poutbl, prs,
914 new C_SafeCond(&mylock, &cond, &done, &ret));
915 lock.Unlock();
916 mylock.Lock();
917 while (!done)
918 cond.Wait(mylock);
919 mylock.Unlock();
920 return ret;
921 }
922
923 int librados::RadosClient::monitor_log(const string& level,
924 rados_log_callback_t cb,
925 rados_log_callback2_t cb2,
926 void *arg)
927 {
928 Mutex::Locker l(lock);
929
930 if (state != CONNECTED) {
931 return -ENOTCONN;
932 }
933
934 if (cb == NULL && cb2 == NULL) {
935 // stop watch
936 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
937 << " " << (void*)log_cb2 << dendl;
938 monclient.sub_unwant(log_watch);
939 log_watch.clear();
940 log_cb = NULL;
941 log_cb2 = NULL;
942 log_cb_arg = NULL;
943 return 0;
944 }
945
946 string watch_level;
947 if (level == "debug") {
948 watch_level = "log-debug";
949 } else if (level == "info") {
950 watch_level = "log-info";
951 } else if (level == "warn" || level == "warning") {
952 watch_level = "log-warn";
953 } else if (level == "err" || level == "error") {
954 watch_level = "log-error";
955 } else if (level == "sec") {
956 watch_level = "log-sec";
957 } else {
958 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
959 return -EINVAL;
960 }
961
962 if (log_cb || log_cb2)
963 monclient.sub_unwant(log_watch);
964
965 // (re)start watch
966 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
967 << " level " << level << dendl;
968 monclient.sub_want(watch_level, 0, 0);
969
970 monclient.renew_subs();
971 log_cb = cb;
972 log_cb2 = cb2;
973 log_cb_arg = arg;
974 log_watch = watch_level;
975 return 0;
976 }
977
978 void librados::RadosClient::handle_log(MLog *m)
979 {
980 assert(lock.is_locked());
981 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
982
983 if (log_last_version < m->version) {
984 log_last_version = m->version;
985
986 if (log_cb || log_cb2) {
987 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
988 LogEntry e = *it;
989 ostringstream ss;
990 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
991 string line = ss.str();
992 string who = stringify(e.who);
993 string name = stringify(e.name);
994 string level = stringify(e.prio);
995 struct timespec stamp;
996 e.stamp.to_timespec(&stamp);
997
998 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
999 if (log_cb)
1000 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1001 stamp.tv_sec, stamp.tv_nsec,
1002 e.seq, level.c_str(), e.msg.c_str());
1003 if (log_cb2)
1004 log_cb2(log_cb_arg, line.c_str(), who.c_str(), name.c_str(),
1005 stamp.tv_sec, stamp.tv_nsec,
1006 e.seq, level.c_str(), e.msg.c_str());
1007 }
1008 }
1009
1010 monclient.sub_got(log_watch, log_last_version);
1011 }
1012
1013 m->put();
1014 }