]> git.proxmox.com Git - ceph.git/blame - ceph/src/librados/RadosClient.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / librados / RadosClient.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <sys/types.h>
16#include <sys/stat.h>
17#include <fcntl.h>
18
19#include <iostream>
20#include <string>
21#include <sstream>
22#include <pthread.h>
23#include <errno.h>
24
25#include "common/ceph_context.h"
26#include "common/config.h"
27#include "common/common_init.h"
11fdf7f2 28#include "common/ceph_json.h"
7c673cae 29#include "common/errno.h"
11fdf7f2 30#include "common/ceph_json.h"
f67539c2 31#include "common/async/blocked_completion.h"
7c673cae
FG
32#include "include/buffer.h"
33#include "include/stringify.h"
224ce89b 34#include "include/util.h"
7c673cae 35
7c673cae
FG
36#include "msg/Messenger.h"
37
38// needed for static_cast
f67539c2 39#include "messages/MLog.h"
7c673cae
FG
40
41#include "AioCompletionImpl.h"
42#include "IoCtxImpl.h"
43#include "PoolAsyncCompletionImpl.h"
44#include "RadosClient.h"
45
11fdf7f2 46#include "include/ceph_assert.h"
7c673cae
FG
47#include "common/EventTrace.h"
48
49#define dout_subsys ceph_subsys_rados
50#undef dout_prefix
51#define dout_prefix *_dout << "librados: "
52
20effc67
TL
53using std::ostringstream;
54using std::string;
55using std::map;
56using std::vector;
57
f67539c2
TL
58namespace bc = boost::container;
59namespace bs = boost::system;
60namespace ca = ceph::async;
61namespace cb = ceph::buffer;
62
7c673cae
FG
63librados::RadosClient::RadosClient(CephContext *cct_)
64 : Dispatcher(cct_->get()),
f67539c2 65 cct_deleter{cct, [](CephContext *p) {p->put();}}
7c673cae 66{
f67539c2 67 auto& conf = cct->_conf;
f91f0fd5 68 conf.add_observer(this);
f67539c2 69 rados_mon_op_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
7c673cae
FG
70}
71
72int64_t librados::RadosClient::lookup_pool(const char *name)
73{
74 int r = wait_for_osdmap();
75 if (r < 0) {
76 return r;
77 }
78
79 int64_t ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
80 name);
81 if (-ENOENT == ret) {
82 // Make sure we have the latest map
83 int r = wait_for_latest_osdmap();
84 if (r < 0)
85 return r;
86 ret = objecter->with_osdmap(std::mem_fn(&OSDMap::lookup_pg_pool_name),
87 name);
88 }
89
90 return ret;
91}
92
93bool librados::RadosClient::pool_requires_alignment(int64_t pool_id)
94{
1e59de90
TL
95 bool required;
96 int r = pool_requires_alignment2(pool_id, &required);
7c673cae
FG
97 if (r < 0) {
98 // Cast answer to false, this is a little bit problematic
99 // since we really don't know the answer yet, say.
100 return false;
101 }
102
1e59de90 103 return required;
7c673cae
FG
104}
105
106// a safer version of pool_requires_alignment
107int librados::RadosClient::pool_requires_alignment2(int64_t pool_id,
1e59de90 108 bool *req)
7c673cae 109{
1e59de90 110 if (!req)
7c673cae
FG
111 return -EINVAL;
112
113 int r = wait_for_osdmap();
114 if (r < 0) {
115 return r;
116 }
117
1e59de90 118 return objecter->with_osdmap([req, pool_id](const OSDMap& o) {
7c673cae
FG
119 if (!o.have_pg_pool(pool_id)) {
120 return -ENOENT;
121 }
1e59de90 122 *req = o.get_pg_pool(pool_id)->requires_aligned_append();
7c673cae
FG
123 return 0;
124 });
125}
126
127uint64_t librados::RadosClient::pool_required_alignment(int64_t pool_id)
128{
129 uint64_t alignment;
130 int r = pool_required_alignment2(pool_id, &alignment);
131 if (r < 0) {
132 return 0;
133 }
134
135 return alignment;
136}
137
138// a safer version of pool_required_alignment
139int librados::RadosClient::pool_required_alignment2(int64_t pool_id,
140 uint64_t *alignment)
141{
142 if (!alignment)
143 return -EINVAL;
144
145 int r = wait_for_osdmap();
146 if (r < 0) {
147 return r;
148 }
149
150 return objecter->with_osdmap([alignment, pool_id](const OSDMap &o) {
151 if (!o.have_pg_pool(pool_id)) {
152 return -ENOENT;
153 }
154 *alignment = o.get_pg_pool(pool_id)->required_alignment();
155 return 0;
156 });
157}
158
11fdf7f2 159int librados::RadosClient::pool_get_name(uint64_t pool_id, std::string *s, bool wait_latest_map)
7c673cae
FG
160{
161 int r = wait_for_osdmap();
162 if (r < 0)
163 return r;
11fdf7f2 164 retry:
7c673cae
FG
165 objecter->with_osdmap([&](const OSDMap& o) {
166 if (!o.have_pg_pool(pool_id)) {
167 r = -ENOENT;
168 } else {
169 r = 0;
170 *s = o.get_pool_name(pool_id);
171 }
172 });
11fdf7f2
TL
173 if (r == -ENOENT && wait_latest_map) {
174 r = wait_for_latest_osdmap();
175 if (r < 0)
176 return r;
177 wait_latest_map = false;
178 goto retry;
179 }
180
7c673cae
FG
181 return r;
182}
183
184int librados::RadosClient::get_fsid(std::string *s)
185{
186 if (!s)
187 return -EINVAL;
11fdf7f2 188 std::lock_guard l(lock);
7c673cae
FG
189 ostringstream oss;
190 oss << monclient.get_fsid();
191 *s = oss.str();
192 return 0;
193}
194
195int librados::RadosClient::ping_monitor(const string mon_id, string *result)
196{
197 int err = 0;
198 /* If we haven't yet connected, we have no way of telling whether we
199 * already built monc's initial monmap. IF we are in CONNECTED state,
200 * then it is safe to assume that we went through connect(), which does
201 * build a monmap.
202 */
203 if (state != CONNECTED) {
204 ldout(cct, 10) << __func__ << " build monmap" << dendl;
205 err = monclient.build_initial_monmap();
206 }
207 if (err < 0) {
208 return err;
209 }
210
211 err = monclient.ping_monitor(mon_id, result);
212 return err;
213}
214
215int librados::RadosClient::connect()
216{
7c673cae
FG
217 int err;
218
219 // already connected?
220 if (state == CONNECTING)
221 return -EINPROGRESS;
222 if (state == CONNECTED)
223 return -EISCONN;
224 state = CONNECTING;
225
f67539c2 226 if (!cct->_log->is_started()) {
11fdf7f2
TL
227 cct->_log->start();
228 }
229
230 {
f67539c2 231 MonClient mc_bootstrap(cct, poolctx);
11fdf7f2
TL
232 err = mc_bootstrap.get_monmap_and_config();
233 if (err < 0)
234 return err;
235 }
236
237 common_init_finish(cct);
238
f67539c2
TL
239 poolctx.start(cct->_conf.get_val<std::uint64_t>("librados_thread_count"));
240
7c673cae
FG
241 // get monmap
242 err = monclient.build_initial_monmap();
243 if (err < 0)
244 goto out;
245
246 err = -ENOMEM;
247 messenger = Messenger::create_client_messenger(cct, "radosclient");
248 if (!messenger)
249 goto out;
250
251 // require OSDREPLYMUX feature. this means we will fail to talk to
252 // old servers. this is necessary because otherwise we won't know
253 // how to decompose the reply data into its constituent pieces.
254 messenger->set_default_policy(Messenger::Policy::lossy_client(CEPH_FEATURE_OSDREPLYMUX));
255
11fdf7f2 256 ldout(cct, 1) << "starting msgr at " << messenger->get_myaddrs() << dendl;
7c673cae
FG
257
258 ldout(cct, 1) << "starting objecter" << dendl;
259
f67539c2 260 objecter = new (std::nothrow) Objecter(cct, messenger, &monclient, poolctx);
7c673cae
FG
261 if (!objecter)
262 goto out;
263 objecter->set_balanced_budget();
264
265 monclient.set_messenger(messenger);
266 mgrclient.set_messenger(messenger);
267
268 objecter->init();
269 messenger->add_dispatcher_head(&mgrclient);
270 messenger->add_dispatcher_tail(objecter);
271 messenger->add_dispatcher_tail(this);
272
273 messenger->start();
274
275 ldout(cct, 1) << "setting wanted keys" << dendl;
276 monclient.set_want_keys(
277 CEPH_ENTITY_TYPE_MON | CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MGR);
278 ldout(cct, 1) << "calling monclient init" << dendl;
279 err = monclient.init();
280 if (err) {
281 ldout(cct, 0) << conf->name << " initialization error " << cpp_strerror(-err) << dendl;
282 shutdown();
283 goto out;
284 }
285
2a845540 286 err = monclient.authenticate(std::chrono::duration<double>(conf.get_val<std::chrono::seconds>("client_mount_timeout")).count());
7c673cae
FG
287 if (err) {
288 ldout(cct, 0) << conf->name << " authentication error " << cpp_strerror(-err) << dendl;
289 shutdown();
290 goto out;
291 }
292 messenger->set_myname(entity_name_t::CLIENT(monclient.get_global_id()));
293
11fdf7f2
TL
294 // Detect older cluster, put mgrclient into compatible mode
295 mgrclient.set_mgr_optional(
296 !get_required_monitor_features().contains_all(
297 ceph::features::mon::FEATURE_LUMINOUS));
298
7c673cae
FG
299 // MgrClient needs this (it doesn't have MonClient reference itself)
300 monclient.sub_want("mgrmap", 0, 0);
301 monclient.renew_subs();
302
224ce89b
WB
303 if (service_daemon) {
304 ldout(cct, 10) << __func__ << " registering as " << service_name << "."
305 << daemon_name << dendl;
306 mgrclient.service_daemon_register(service_name, daemon_name,
307 daemon_metadata);
308 }
7c673cae
FG
309 mgrclient.init();
310
311 objecter->set_client_incarnation(0);
312 objecter->start();
9f95a23c 313 lock.lock();
7c673cae 314
7c673cae
FG
315 state = CONNECTED;
316 instance_id = monclient.get_global_id();
317
9f95a23c 318 lock.unlock();
7c673cae
FG
319
320 ldout(cct, 1) << "init done" << dendl;
321 err = 0;
322
323 out:
324 if (err) {
325 state = DISCONNECTED;
326
327 if (objecter) {
328 delete objecter;
329 objecter = NULL;
330 }
331 if (messenger) {
332 delete messenger;
333 messenger = NULL;
334 }
335 }
336
337 return err;
338}
339
340void librados::RadosClient::shutdown()
341{
9f95a23c 342 std::unique_lock l{lock};
7c673cae 343 if (state == DISCONNECTED) {
7c673cae
FG
344 return;
345 }
346
347 bool need_objecter = false;
31f18b77 348 if (objecter && objecter->initialized) {
7c673cae
FG
349 need_objecter = true;
350 }
351
352 if (state == CONNECTED) {
353 if (need_objecter) {
354 // make sure watch callbacks are flushed
355 watch_flush();
356 }
7c673cae
FG
357 }
358 state = DISCONNECTED;
359 instance_id = 0;
9f95a23c 360 l.unlock();
7c673cae
FG
361 if (need_objecter) {
362 objecter->shutdown();
363 }
364 mgrclient.shutdown();
365
366 monclient.shutdown();
367 if (messenger) {
368 messenger->shutdown();
369 messenger->wait();
370 }
f67539c2 371 poolctx.stop();
7c673cae
FG
372 ldout(cct, 1) << "shutdown" << dendl;
373}
374
375int librados::RadosClient::watch_flush()
376{
377 ldout(cct, 10) << __func__ << " enter" << dendl;
f67539c2 378 objecter->linger_callback_flush(ca::use_blocked);
7c673cae
FG
379
380 ldout(cct, 10) << __func__ << " exit" << dendl;
381 return 0;
382}
383
f67539c2 384struct CB_aio_watch_flush_Complete {
7c673cae
FG
385 librados::RadosClient *client;
386 librados::AioCompletionImpl *c;
387
f67539c2 388 CB_aio_watch_flush_Complete(librados::RadosClient *_client, librados::AioCompletionImpl *_c)
7c673cae
FG
389 : client(_client), c(_c) {
390 c->get();
391 }
392
f67539c2
TL
393 CB_aio_watch_flush_Complete(const CB_aio_watch_flush_Complete&) = delete;
394 CB_aio_watch_flush_Complete operator =(const CB_aio_watch_flush_Complete&) = delete;
395 CB_aio_watch_flush_Complete(CB_aio_watch_flush_Complete&& rhs) {
396 client = rhs.client;
397 c = rhs.c;
398 }
399 CB_aio_watch_flush_Complete& operator =(CB_aio_watch_flush_Complete&& rhs) {
400 client = rhs.client;
401 c = rhs.c;
402 return *this;
403 }
404
405 void operator()() {
9f95a23c 406 c->lock.lock();
f67539c2 407 c->rval = 0;
7c673cae 408 c->complete = true;
9f95a23c 409 c->cond.notify_all();
7c673cae
FG
410
411 if (c->callback_complete ||
412 c->callback_safe) {
f67539c2 413 boost::asio::defer(client->finish_strand, librados::CB_AioComplete(c));
7c673cae
FG
414 }
415 c->put_unlock();
416 }
417};
418
419int librados::RadosClient::async_watch_flush(AioCompletionImpl *c)
420{
421 ldout(cct, 10) << __func__ << " enter" << dendl;
f67539c2 422 objecter->linger_callback_flush(CB_aio_watch_flush_Complete(this, c));
7c673cae
FG
423 ldout(cct, 10) << __func__ << " exit" << dendl;
424 return 0;
425}
426
427uint64_t librados::RadosClient::get_instance_id()
428{
429 return instance_id;
430}
431
11fdf7f2
TL
432int librados::RadosClient::get_min_compatible_osd(int8_t* require_osd_release)
433{
434 int r = wait_for_osdmap();
435 if (r < 0) {
436 return r;
437 }
438
439 objecter->with_osdmap(
440 [require_osd_release](const OSDMap& o) {
f67539c2 441 *require_osd_release = to_integer<int8_t>(o.require_osd_release);
11fdf7f2
TL
442 });
443 return 0;
444}
445
446int librados::RadosClient::get_min_compatible_client(int8_t* min_compat_client,
447 int8_t* require_min_compat_client)
448{
449 int r = wait_for_osdmap();
450 if (r < 0) {
451 return r;
452 }
453
454 objecter->with_osdmap(
455 [min_compat_client, require_min_compat_client](const OSDMap& o) {
f67539c2 456 *min_compat_client = to_integer<int8_t>(o.get_min_compat_client());
9f95a23c 457 *require_min_compat_client =
f67539c2 458 to_integer<int8_t>(o.get_require_min_compat_client());
11fdf7f2
TL
459 });
460 return 0;
461}
462
7c673cae
FG
463librados::RadosClient::~RadosClient()
464{
f67539c2 465 cct->_conf.remove_observer(this);
7c673cae
FG
466 if (messenger)
467 delete messenger;
468 if (objecter)
469 delete objecter;
470 cct = NULL;
471}
472
473int librados::RadosClient::create_ioctx(const char *name, IoCtxImpl **io)
474{
475 int64_t poolid = lookup_pool(name);
476 if (poolid < 0) {
477 return (int)poolid;
478 }
479
480 *io = new librados::IoCtxImpl(this, objecter, poolid, CEPH_NOSNAP);
481 return 0;
482}
483
484int librados::RadosClient::create_ioctx(int64_t pool_id, IoCtxImpl **io)
485{
11fdf7f2
TL
486 std::string pool_name;
487 int r = pool_get_name(pool_id, &pool_name, true);
488 if (r < 0)
489 return r;
7c673cae
FG
490 *io = new librados::IoCtxImpl(this, objecter, pool_id, CEPH_NOSNAP);
491 return 0;
492}
493
494bool librados::RadosClient::ms_dispatch(Message *m)
495{
496 bool ret;
497
11fdf7f2 498 std::lock_guard l(lock);
7c673cae
FG
499 if (state == DISCONNECTED) {
500 ldout(cct, 10) << "disconnected, discarding " << *m << dendl;
501 m->put();
502 ret = true;
503 } else {
504 ret = _dispatch(m);
505 }
506 return ret;
507}
508
509void librados::RadosClient::ms_handle_connect(Connection *con)
510{
511}
512
513bool librados::RadosClient::ms_handle_reset(Connection *con)
514{
515 return false;
516}
517
518void librados::RadosClient::ms_handle_remote_reset(Connection *con)
519{
520}
521
522bool librados::RadosClient::ms_handle_refused(Connection *con)
523{
524 return false;
525}
526
527bool librados::RadosClient::_dispatch(Message *m)
528{
9f95a23c 529 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
530 switch (m->get_type()) {
531 // OSD
532 case CEPH_MSG_OSD_MAP:
9f95a23c 533 cond.notify_all();
7c673cae
FG
534 m->put();
535 break;
536
537 case CEPH_MSG_MDS_MAP:
538 m->put();
539 break;
540
541 case MSG_LOG:
542 handle_log(static_cast<MLog *>(m));
543 break;
544
545 default:
546 return false;
547 }
548
549 return true;
550}
551
552
553int librados::RadosClient::wait_for_osdmap()
554{
9f95a23c 555 ceph_assert(ceph_mutex_is_not_locked_by_me(lock));
7c673cae
FG
556
557 if (state != CONNECTED) {
558 return -ENOTCONN;
559 }
560
561 bool need_map = false;
562 objecter->with_osdmap([&](const OSDMap& o) {
563 if (o.get_epoch() == 0) {
564 need_map = true;
565 }
566 });
567
568 if (need_map) {
9f95a23c 569 std::unique_lock l(lock);
7c673cae 570
f91f0fd5 571 ceph::timespan timeout = rados_mon_op_timeout;
7c673cae
FG
572 if (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
573 ldout(cct, 10) << __func__ << " waiting" << dendl;
7c673cae 574 while (objecter->with_osdmap(std::mem_fn(&OSDMap::get_epoch)) == 0) {
11fdf7f2 575 if (timeout == timeout.zero()) {
9f95a23c 576 cond.wait(l);
7c673cae 577 } else {
9f95a23c 578 if (cond.wait_for(l, timeout) == std::cv_status::timeout) {
7c673cae
FG
579 lderr(cct) << "timed out waiting for first osdmap from monitors"
580 << dendl;
581 return -ETIMEDOUT;
582 }
583 }
584 }
585 ldout(cct, 10) << __func__ << " done waiting" << dendl;
586 }
587 return 0;
588 } else {
589 return 0;
590 }
591}
592
593
594int librados::RadosClient::wait_for_latest_osdmap()
595{
f67539c2
TL
596 bs::error_code ec;
597 objecter->wait_for_latest_osdmap(ca::use_blocked[ec]);
598 return ceph::from_error_code(ec);
7c673cae
FG
599}
600
601int librados::RadosClient::pool_list(std::list<std::pair<int64_t, string> >& v)
602{
603 int r = wait_for_osdmap();
604 if (r < 0)
605 return r;
606
607 objecter->with_osdmap([&](const OSDMap& o) {
608 for (auto p : o.get_pools())
609 v.push_back(std::make_pair(p.first, o.get_pool_name(p.first)));
610 });
611 return 0;
612}
613
614int librados::RadosClient::get_pool_stats(std::list<string>& pools,
81eedcae 615 map<string,::pool_stat_t> *result,
f67539c2 616 bool *pper_pool)
7c673cae 617{
f67539c2 618 bs::error_code ec;
7c673cae 619
f67539c2 620 std::vector<std::string> v(pools.begin(), pools.end());
7c673cae 621
f67539c2
TL
622 auto [res, per_pool] = objecter->get_pool_stats(v, ca::use_blocked[ec]);
623 if (ec)
624 return ceph::from_error_code(ec);
625
626 if (per_pool)
627 *pper_pool = per_pool;
628 if (result)
629 result->insert(res.begin(), res.end());
630
631 return 0;
7c673cae
FG
632}
633
634bool librados::RadosClient::get_pool_is_selfmanaged_snaps_mode(
635 const std::string& pool)
636{
637 bool ret = false;
638 objecter->with_osdmap([&](const OSDMap& osdmap) {
639 int64_t poolid = osdmap.lookup_pg_pool_name(pool);
640 if (poolid >= 0)
641 ret = osdmap.get_pg_pool(poolid)->is_unmanaged_snaps_mode();
642 });
643 return ret;
644}
645
646int librados::RadosClient::get_fs_stats(ceph_statfs& stats)
647{
9f95a23c
TL
648 ceph::mutex mylock = ceph::make_mutex("RadosClient::get_fs_stats::mylock");
649 ceph::condition_variable cond;
7c673cae
FG
650 bool done;
651 int ret = 0;
9f95a23c
TL
652 {
653 std::lock_guard l{mylock};
20effc67 654 objecter->get_fs_stats(stats, std::optional<int64_t> (),
9f95a23c
TL
655 new C_SafeCond(mylock, cond, &done, &ret));
656 }
657 {
658 std::unique_lock l{mylock};
659 cond.wait(l, [&done] { return done;});
660 }
7c673cae
FG
661 return ret;
662}
663
664void librados::RadosClient::get() {
11fdf7f2
TL
665 std::lock_guard l(lock);
666 ceph_assert(refcnt > 0);
7c673cae
FG
667 refcnt++;
668}
669
670bool librados::RadosClient::put() {
11fdf7f2
TL
671 std::lock_guard l(lock);
672 ceph_assert(refcnt > 0);
7c673cae
FG
673 refcnt--;
674 return (refcnt == 0);
675}
676
11fdf7f2 677int librados::RadosClient::pool_create(string& name,
7c673cae
FG
678 int16_t crush_rule)
679{
11fdf7f2
TL
680 if (!name.length())
681 return -EINVAL;
682
7c673cae
FG
683 int r = wait_for_osdmap();
684 if (r < 0) {
685 return r;
686 }
687
9f95a23c 688 ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_create::mylock");
7c673cae 689 int reply;
9f95a23c 690 ceph::condition_variable cond;
7c673cae 691 bool done;
9f95a23c 692 Context *onfinish = new C_SafeCond(mylock, cond, &done, &reply);
f67539c2 693 objecter->create_pool(name, onfinish, crush_rule);
7c673cae 694
f67539c2
TL
695 std::unique_lock l{mylock};
696 cond.wait(l, [&done] { return done; });
7c673cae
FG
697 return reply;
698}
699
11fdf7f2
TL
700int librados::RadosClient::pool_create_async(string& name,
701 PoolAsyncCompletionImpl *c,
7c673cae
FG
702 int16_t crush_rule)
703{
704 int r = wait_for_osdmap();
705 if (r < 0)
706 return r;
707
f67539c2
TL
708 Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c));
709 objecter->create_pool(name, onfinish, crush_rule);
7c673cae
FG
710 return r;
711}
712
713int librados::RadosClient::pool_get_base_tier(int64_t pool_id, int64_t* base_tier)
714{
715 int r = wait_for_osdmap();
716 if (r < 0) {
717 return r;
718 }
719
720 objecter->with_osdmap([&](const OSDMap& o) {
721 const pg_pool_t* pool = o.get_pg_pool(pool_id);
722 if (pool) {
723 if (pool->tier_of < 0) {
724 *base_tier = pool_id;
725 } else {
726 *base_tier = pool->tier_of;
727 }
728 r = 0;
729 } else {
730 r = -ENOENT;
731 }
732 });
733 return r;
734}
735
736int librados::RadosClient::pool_delete(const char *name)
737{
738 int r = wait_for_osdmap();
739 if (r < 0) {
740 return r;
741 }
742
9f95a23c
TL
743 ceph::mutex mylock = ceph::make_mutex("RadosClient::pool_delete::mylock");
744 ceph::condition_variable cond;
7c673cae
FG
745 bool done;
746 int ret;
9f95a23c 747 Context *onfinish = new C_SafeCond(mylock, cond, &done, &ret);
f67539c2 748 objecter->delete_pool(name, onfinish);
7c673cae 749
f67539c2
TL
750 std::unique_lock l{mylock};
751 cond.wait(l, [&done] { return done;});
7c673cae
FG
752 return ret;
753}
754
755int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompletionImpl *c)
756{
757 int r = wait_for_osdmap();
758 if (r < 0)
759 return r;
760
f67539c2
TL
761 Context *onfinish = make_lambda_context(CB_PoolAsync_Safe(c));
762 objecter->delete_pool(name, onfinish);
7c673cae
FG
763 return r;
764}
765
f67539c2 766void librados::RadosClient::blocklist_self(bool set) {
11fdf7f2 767 std::lock_guard l(lock);
f67539c2 768 objecter->blocklist_self(set);
7c673cae
FG
769}
770
9f95a23c
TL
771std::string librados::RadosClient::get_addrs() const {
772 CachedStackStringStream cos;
773 *cos << messenger->get_myaddrs();
774 return std::string(cos->strv());
775}
776
f67539c2 777int librados::RadosClient::blocklist_add(const string& client_address,
7c673cae
FG
778 uint32_t expire_seconds)
779{
780 entity_addr_t addr;
20effc67 781 if (!addr.parse(client_address)) {
7c673cae
FG
782 lderr(cct) << "unable to parse address " << client_address << dendl;
783 return -EINVAL;
784 }
785
786 std::stringstream cmd;
787 cmd << "{"
f67539c2
TL
788 << "\"prefix\": \"osd blocklist\", "
789 << "\"blocklistop\": \"add\", "
7c673cae
FG
790 << "\"addr\": \"" << client_address << "\"";
791 if (expire_seconds != 0) {
792 cmd << ", \"expire\": " << expire_seconds << ".0";
793 }
794 cmd << "}";
795
796 std::vector<std::string> cmds;
797 cmds.push_back(cmd.str());
798 bufferlist inbl;
799 int r = mon_command(cmds, inbl, NULL, NULL);
f67539c2
TL
800 if (r == -EINVAL) {
801 // try legacy blacklist command
802 std::stringstream cmd;
803 cmd << "{"
804 << "\"prefix\": \"osd blacklist\", "
805 << "\"blacklistop\": \"add\", "
806 << "\"addr\": \"" << client_address << "\"";
807 if (expire_seconds != 0) {
808 cmd << ", \"expire\": " << expire_seconds << ".0";
809 }
810 cmd << "}";
811 cmds.clear();
812 cmds.push_back(cmd.str());
813 r = mon_command(cmds, inbl, NULL, NULL);
814 }
7c673cae
FG
815 if (r < 0) {
816 return r;
817 }
818
819 // ensure we have the latest osd map epoch before proceeding
820 r = wait_for_latest_osdmap();
821 return r;
822}
823
824int librados::RadosClient::mon_command(const vector<string>& cmd,
825 const bufferlist &inbl,
826 bufferlist *outbl, string *outs)
827{
c07f9fc5
FG
828 C_SaferCond ctx;
829 mon_command_async(cmd, inbl, outbl, outs, &ctx);
830 return ctx.wait();
831}
832
833void librados::RadosClient::mon_command_async(const vector<string>& cmd,
834 const bufferlist &inbl,
835 bufferlist *outbl, string *outs,
836 Context *on_finish)
837{
9f95a23c 838 std::lock_guard l{lock};
f67539c2
TL
839 monclient.start_mon_command(cmd, inbl,
840 [outs, outbl,
841 on_finish = std::unique_ptr<Context>(on_finish)]
842 (bs::error_code e,
843 std::string&& s,
844 ceph::bufferlist&& b) mutable {
845 if (outs)
846 *outs = std::move(s);
847 if (outbl)
848 *outbl = std::move(b);
849 if (on_finish)
850 on_finish.release()->complete(
851 ceph::from_error_code(e));
852 });
7c673cae
FG
853}
854
7c673cae
FG
855int librados::RadosClient::mgr_command(const vector<string>& cmd,
856 const bufferlist &inbl,
857 bufferlist *outbl, string *outs)
858{
11fdf7f2 859 std::lock_guard l(lock);
7c673cae
FG
860
861 C_SaferCond cond;
862 int r = mgrclient.start_command(cmd, inbl, outbl, outs, &cond);
863 if (r < 0)
864 return r;
865
9f95a23c 866 lock.unlock();
f91f0fd5
TL
867 if (rados_mon_op_timeout.count() > 0) {
868 r = cond.wait_for(rados_mon_op_timeout);
11fdf7f2
TL
869 } else {
870 r = cond.wait();
871 }
9f95a23c
TL
872 lock.lock();
873
874 return r;
875}
876
877int librados::RadosClient::mgr_command(
878 const string& name,
879 const vector<string>& cmd,
880 const bufferlist &inbl,
881 bufferlist *outbl, string *outs)
882{
883 std::lock_guard l(lock);
884
885 C_SaferCond cond;
886 int r = mgrclient.start_tell_command(name, cmd, inbl, outbl, outs, &cond);
887 if (r < 0)
888 return r;
889
890 lock.unlock();
f91f0fd5
TL
891 if (rados_mon_op_timeout.count() > 0) {
892 r = cond.wait_for(rados_mon_op_timeout);
9f95a23c
TL
893 } else {
894 r = cond.wait();
895 }
896 lock.lock();
7c673cae
FG
897
898 return r;
899}
900
901
902int librados::RadosClient::mon_command(int rank, const vector<string>& cmd,
903 const bufferlist &inbl,
904 bufferlist *outbl, string *outs)
905{
f67539c2
TL
906 bs::error_code ec;
907 auto&& [s, bl] = monclient.start_mon_command(rank, cmd, inbl,
908 ca::use_blocked[ec]);
909 if (outs)
910 *outs = std::move(s);
911 if (outbl)
912 *outbl = std::move(bl);
913
914 return ceph::from_error_code(ec);
7c673cae
FG
915}
916
917int librados::RadosClient::mon_command(string name, const vector<string>& cmd,
918 const bufferlist &inbl,
919 bufferlist *outbl, string *outs)
920{
f67539c2
TL
921 bs::error_code ec;
922 auto&& [s, bl] = monclient.start_mon_command(name, cmd, inbl,
923 ca::use_blocked[ec]);
924 if (outs)
925 *outs = std::move(s);
926 if (outbl)
927 *outbl = std::move(bl);
928
929 return ceph::from_error_code(ec);
7c673cae
FG
930}
931
932int librados::RadosClient::osd_command(int osd, vector<string>& cmd,
933 const bufferlist& inbl,
934 bufferlist *poutbl, string *prs)
935{
7c673cae
FG
936 ceph_tid_t tid;
937
938 if (osd < 0)
939 return -EINVAL;
940
f67539c2
TL
941
942 // XXX do anything with tid?
943 bs::error_code ec;
944 auto [s, bl] = objecter->osd_command(osd, std::move(cmd), cb::list(inbl),
945 &tid, ca::use_blocked[ec]);
946 if (poutbl)
947 *poutbl = std::move(bl);
948 if (prs)
949 *prs = std::move(s);
950 return ceph::from_error_code(ec);
7c673cae
FG
951}
952
953int librados::RadosClient::pg_command(pg_t pgid, vector<string>& cmd,
954 const bufferlist& inbl,
955 bufferlist *poutbl, string *prs)
956{
7c673cae 957 ceph_tid_t tid;
f67539c2
TL
958 bs::error_code ec;
959 auto [s, bl] = objecter->pg_command(pgid, std::move(cmd), inbl, &tid,
960 ca::use_blocked[ec]);
961 if (poutbl)
962 *poutbl = std::move(bl);
963 if (prs)
964 *prs = std::move(s);
965 return ceph::from_error_code(ec);
7c673cae
FG
966}
967
31f18b77
FG
968int librados::RadosClient::monitor_log(const string& level,
969 rados_log_callback_t cb,
970 rados_log_callback2_t cb2,
971 void *arg)
7c673cae 972{
11fdf7f2 973 std::lock_guard l(lock);
7c673cae
FG
974
975 if (state != CONNECTED) {
976 return -ENOTCONN;
977 }
978
31f18b77 979 if (cb == NULL && cb2 == NULL) {
7c673cae 980 // stop watch
31f18b77
FG
981 ldout(cct, 10) << __func__ << " removing cb " << (void*)log_cb
982 << " " << (void*)log_cb2 << dendl;
7c673cae
FG
983 monclient.sub_unwant(log_watch);
984 log_watch.clear();
985 log_cb = NULL;
31f18b77 986 log_cb2 = NULL;
7c673cae
FG
987 log_cb_arg = NULL;
988 return 0;
989 }
990
991 string watch_level;
992 if (level == "debug") {
993 watch_level = "log-debug";
994 } else if (level == "info") {
995 watch_level = "log-info";
996 } else if (level == "warn" || level == "warning") {
997 watch_level = "log-warn";
998 } else if (level == "err" || level == "error") {
999 watch_level = "log-error";
1000 } else if (level == "sec") {
1001 watch_level = "log-sec";
1002 } else {
1003 ldout(cct, 10) << __func__ << " invalid level " << level << dendl;
1004 return -EINVAL;
1005 }
1006
31f18b77 1007 if (log_cb || log_cb2)
7c673cae
FG
1008 monclient.sub_unwant(log_watch);
1009
1010 // (re)start watch
31f18b77
FG
1011 ldout(cct, 10) << __func__ << " add cb " << (void*)cb << " " << (void*)cb2
1012 << " level " << level << dendl;
7c673cae
FG
1013 monclient.sub_want(watch_level, 0, 0);
1014
1015 monclient.renew_subs();
1016 log_cb = cb;
31f18b77 1017 log_cb2 = cb2;
7c673cae
FG
1018 log_cb_arg = arg;
1019 log_watch = watch_level;
1020 return 0;
1021}
1022
1023void librados::RadosClient::handle_log(MLog *m)
1024{
9f95a23c 1025 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
1026 ldout(cct, 10) << __func__ << " version " << m->version << dendl;
1027
1028 if (log_last_version < m->version) {
1029 log_last_version = m->version;
1030
31f18b77 1031 if (log_cb || log_cb2) {
7c673cae
FG
1032 for (std::deque<LogEntry>::iterator it = m->entries.begin(); it != m->entries.end(); ++it) {
1033 LogEntry e = *it;
1034 ostringstream ss;
31f18b77 1035 ss << e.stamp << " " << e.name << " " << e.prio << " " << e.msg;
7c673cae 1036 string line = ss.str();
11fdf7f2 1037 string who = stringify(e.rank) + " " + stringify(e.addrs);
31f18b77 1038 string name = stringify(e.name);
7c673cae
FG
1039 string level = stringify(e.prio);
1040 struct timespec stamp;
1041 e.stamp.to_timespec(&stamp);
1042
1043 ldout(cct, 20) << __func__ << " delivering " << ss.str() << dendl;
31f18b77
FG
1044 if (log_cb)
1045 log_cb(log_cb_arg, line.c_str(), who.c_str(),
1046 stamp.tv_sec, stamp.tv_nsec,
1047 e.seq, level.c_str(), e.msg.c_str());
1048 if (log_cb2)
224ce89b
WB
1049 log_cb2(log_cb_arg, line.c_str(),
1050 e.channel.c_str(),
1051 who.c_str(), name.c_str(),
31f18b77
FG
1052 stamp.tv_sec, stamp.tv_nsec,
1053 e.seq, level.c_str(), e.msg.c_str());
7c673cae
FG
1054 }
1055 }
1056
1057 monclient.sub_got(log_watch, log_last_version);
1058 }
1059
1060 m->put();
1061}
224ce89b
WB
1062
1063int librados::RadosClient::service_daemon_register(
1064 const std::string& service, ///< service name (e.g., 'rgw')
1065 const std::string& name, ///< daemon name (e.g., 'gwfoo')
1066 const std::map<std::string,std::string>& metadata)
1067{
1068 if (service_daemon) {
1069 return -EEXIST;
1070 }
1071 if (service == "osd" ||
1072 service == "mds" ||
1073 service == "client" ||
1074 service == "mon" ||
1075 service == "mgr") {
1076 // normal ceph entity types are not allowed!
1077 return -EINVAL;
1078 }
1079 if (service.empty() || name.empty()) {
1080 return -EINVAL;
1081 }
1082
1083 collect_sys_info(&daemon_metadata, cct);
1084
1085 ldout(cct,10) << __func__ << " " << service << "." << name << dendl;
1086 service_daemon = true;
1087 service_name = service;
1088 daemon_name = name;
1089 daemon_metadata.insert(metadata.begin(), metadata.end());
1090
1091 if (state == DISCONNECTED) {
1092 return 0;
1093 }
1094 if (state == CONNECTING) {
1095 return -EBUSY;
1096 }
1097 mgrclient.service_daemon_register(service_name, daemon_name,
1098 daemon_metadata);
1099 return 0;
1100}
1101
1102int librados::RadosClient::service_daemon_update_status(
11fdf7f2 1103 std::map<std::string,std::string>&& status)
224ce89b
WB
1104{
1105 if (state != CONNECTED) {
1106 return -ENOTCONN;
1107 }
11fdf7f2 1108 return mgrclient.service_daemon_update_status(std::move(status));
224ce89b 1109}
c07f9fc5
FG
1110
1111mon_feature_t librados::RadosClient::get_required_monitor_features() const
1112{
11fdf7f2
TL
1113 return monclient.with_monmap([](const MonMap &monmap) {
1114 return monmap.get_required_features(); } );
1115}
1116
1117int librados::RadosClient::get_inconsistent_pgs(int64_t pool_id,
1118 std::vector<std::string>* pgs)
1119{
1120 vector<string> cmd = {
1121 "{\"prefix\": \"pg ls\","
1122 "\"pool\": " + std::to_string(pool_id) + ","
1123 "\"states\": [\"inconsistent\"],"
1124 "\"format\": \"json\"}"
1125 };
1126 bufferlist inbl, outbl;
1127 string outstring;
1128 if (auto ret = mgr_command(cmd, inbl, &outbl, &outstring); ret) {
1129 return ret;
1130 }
1131 if (!outbl.length()) {
1132 // no pg returned
1133 return 0;
1134 }
1135 JSONParser parser;
1136 if (!parser.parse(outbl.c_str(), outbl.length())) {
1137 return -EINVAL;
1138 }
1139 vector<string> v;
1140 if (!parser.is_array()) {
1141 JSONObj *pgstat_obj = parser.find_obj("pg_stats");
1142 if (!pgstat_obj)
1143 return 0;
1144 auto s = pgstat_obj->get_data();
1145 JSONParser pg_stats;
1146 if (!pg_stats.parse(s.c_str(), s.length())) {
1147 return -EINVAL;
1148 }
1149 v = pg_stats.get_array_elements();
1150 } else {
1151 v = parser.get_array_elements();
1152 }
1153 for (auto i : v) {
1154 JSONParser pg_json;
1155 if (!pg_json.parse(i.c_str(), i.length())) {
1156 return -EINVAL;
1157 }
1158 string pgid;
1159 JSONDecoder::decode_json("pgid", pgid, &pg_json);
1160 pgs->emplace_back(std::move(pgid));
1161 }
1162 return 0;
c07f9fc5 1163}
f91f0fd5
TL
1164
1165const char** librados::RadosClient::get_tracked_conf_keys() const
1166{
1167 static const char *config_keys[] = {
f67539c2 1168 "librados_thread_count",
f91f0fd5
TL
1169 "rados_mon_op_timeout",
1170 nullptr
1171 };
1172 return config_keys;
1173}
1174
1175void librados::RadosClient::handle_conf_change(const ConfigProxy& conf,
1176 const std::set<std::string> &changed)
1177{
f67539c2
TL
1178 if (changed.count("librados_thread_count")) {
1179 poolctx.stop();
1180 poolctx.start(conf.get_val<std::uint64_t>("librados_thread_count"));
1181 }
f91f0fd5
TL
1182 if (changed.count("rados_mon_op_timeout")) {
1183 rados_mon_op_timeout = conf.get_val<std::chrono::seconds>("rados_mon_op_timeout");
1184 }
1185}