]> git.proxmox.com Git - ceph.git/blame - ceph/src/librados/IoCtxImpl.cc
bump version to 12.2.5-pve1
[ceph.git] / ceph / src / librados / IoCtxImpl.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include <limits.h>
16
17#include "IoCtxImpl.h"
18
19#include "librados/AioCompletionImpl.h"
20#include "librados/PoolAsyncCompletionImpl.h"
21#include "librados/RadosClient.h"
22#include "include/assert.h"
23#include "common/valgrind.h"
24#include "common/EventTrace.h"
25
26#define dout_subsys ceph_subsys_rados
27#undef dout_prefix
28#define dout_prefix *_dout << "librados: "
29
30namespace librados {
31namespace {
32
33struct C_notify_Finish : public Context {
34 CephContext *cct;
35 Context *ctx;
36 Objecter *objecter;
37 Objecter::LingerOp *linger_op;
38 bufferlist reply_bl;
39 bufferlist *preply_bl;
40 char **preply_buf;
41 size_t *preply_buf_len;
42
43 C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
44 Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
45 char **_preply_buf, size_t *_preply_buf_len)
46 : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
47 preply_bl(_preply_bl), preply_buf(_preply_buf),
48 preply_buf_len(_preply_buf_len)
49 {
50 linger_op->on_notify_finish = this;
51 linger_op->notify_result_bl = &reply_bl;
52 }
53
54 void finish(int r) override
55 {
56 ldout(cct, 10) << __func__ << " completed notify (linger op "
57 << linger_op << "), r = " << r << dendl;
58
59 // pass result back to user
60 // NOTE: we do this regardless of what error code we return
61 if (preply_buf) {
62 if (reply_bl.length()) {
63 *preply_buf = (char*)malloc(reply_bl.length());
64 memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
65 } else {
66 *preply_buf = NULL;
67 }
68 }
69 if (preply_buf_len)
70 *preply_buf_len = reply_bl.length();
71 if (preply_bl)
72 preply_bl->claim(reply_bl);
73
74 ctx->complete(r);
75 }
76};
77
78struct C_aio_linger_cancel : public Context {
79 Objecter *objecter;
80 Objecter::LingerOp *linger_op;
81
82 C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
83 : objecter(_objecter), linger_op(_linger_op)
84 {
85 }
86
87 void finish(int r) override
88 {
89 objecter->linger_cancel(linger_op);
90 }
91};
92
93struct C_aio_linger_Complete : public Context {
94 AioCompletionImpl *c;
95 Objecter::LingerOp *linger_op;
96 bool cancel;
97
98 C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
99 : c(_c), linger_op(_linger_op), cancel(_cancel)
100 {
101 c->get();
102 }
103
104 void finish(int r) override {
105 if (cancel || r < 0)
106 c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
107 linger_op));
108
109 c->lock.Lock();
110 c->rval = r;
111 c->complete = true;
112 c->cond.Signal();
113
114 if (c->callback_complete ||
115 c->callback_safe) {
116 c->io->client->finisher.queue(new C_AioComplete(c));
117 }
118 c->put_unlock();
119 }
120};
121
122struct C_aio_notify_Complete : public C_aio_linger_Complete {
123 Mutex lock;
124 bool acked = false;
125 bool finished = false;
126 int ret_val = 0;
127
128 C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
129 : C_aio_linger_Complete(_c, _linger_op, false),
130 lock("C_aio_notify_Complete::lock") {
131 }
132
133 void handle_ack(int r) {
134 // invoked by C_aio_notify_Ack
135 lock.Lock();
136 acked = true;
137 complete_unlock(r);
138 }
139
140 void complete(int r) override {
141 // invoked by C_notify_Finish (or C_aio_notify_Ack on failure)
142 lock.Lock();
143 finished = true;
144 complete_unlock(r);
145 }
146
147 void complete_unlock(int r) {
148 if (ret_val == 0 && r < 0) {
149 ret_val = r;
150 }
151
152 if (acked && finished) {
153 lock.Unlock();
154 cancel = true;
155 C_aio_linger_Complete::complete(ret_val);
156 } else {
157 lock.Unlock();
158 }
159 }
160};
161
162struct C_aio_notify_Ack : public Context {
163 CephContext *cct;
164 C_notify_Finish *onfinish;
165 C_aio_notify_Complete *oncomplete;
166
167 C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish,
168 C_aio_notify_Complete *_oncomplete)
169 : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete)
170 {
171 }
172
173 void finish(int r) override
174 {
175 ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
176 << "acked (" << r << ")" << dendl;
177 oncomplete->handle_ack(r);
178 if (r < 0) {
179 // on failure, we won't expect to see a notify_finish callback
180 onfinish->complete(r);
181 }
182 }
183};
184
185struct C_aio_selfmanaged_snap_op_Complete : public Context {
186 librados::RadosClient *client;
187 librados::AioCompletionImpl *c;
188
189 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
190 librados::AioCompletionImpl *c)
191 : client(client), c(c) {
192 c->get();
193 }
194
195 void finish(int r) override {
196 c->lock.Lock();
197 c->rval = r;
198 c->complete = true;
199 c->cond.Signal();
200
201 if (c->callback_complete || c->callback_safe) {
202 client->finisher.queue(new librados::C_AioComplete(c));
203 }
204 c->put_unlock();
205 }
206};
207
208struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
209 snapid_t snapid;
210 uint64_t *dest_snapid;
211
212 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
213 librados::AioCompletionImpl *c,
214 uint64_t *dest_snapid)
215 : C_aio_selfmanaged_snap_op_Complete(client, c),
216 dest_snapid(dest_snapid) {
217 }
218
219 void finish(int r) override {
220 if (r >= 0) {
221 *dest_snapid = snapid;
222 }
223 C_aio_selfmanaged_snap_op_Complete::finish(r);
224 }
225};
226
227} // anonymous namespace
228} // namespace librados
229
230librados::IoCtxImpl::IoCtxImpl() :
231 ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
232 notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
233 aio_write_seq(0), objecter(NULL)
234{
235}
236
237librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
238 int64_t poolid, snapid_t s)
239 : ref_cnt(0), client(c), poolid(poolid), snap_seq(s),
240 assert_ver(0), last_objver(0),
241 notify_timeout(c->cct->_conf->client_notify_timeout),
242 oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
243 aio_write_seq(0), objecter(objecter)
244{
245}
246
247void librados::IoCtxImpl::set_snap_read(snapid_t s)
248{
249 if (!s)
250 s = CEPH_NOSNAP;
251 ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
252 snap_seq = s;
253}
254
255int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
256{
257 ::SnapContext n;
258 ldout(client->cct, 10) << "set snap write context: seq = " << seq
259 << " and snaps = " << snaps << dendl;
260 n.seq = seq;
261 n.snaps = snaps;
262 if (!n.is_valid())
263 return -EINVAL;
264 snapc = n;
265 return 0;
266}
267
268int librados::IoCtxImpl::get_object_hash_position(
269 const std::string& oid, uint32_t *hash_position)
270{
271 int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
272 if (r < 0)
273 return r;
274 *hash_position = (uint32_t)r;
275 return 0;
276}
277
278int librados::IoCtxImpl::get_object_pg_hash_position(
279 const std::string& oid, uint32_t *pg_hash_position)
280{
281 int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
282 if (r < 0)
283 return r;
284 *pg_hash_position = (uint32_t)r;
285 return 0;
286}
287
288void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
289{
290 get();
291 aio_write_list_lock.Lock();
292 assert(c->io == this);
293 c->aio_write_seq = ++aio_write_seq;
294 ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
295 << " write_seq " << aio_write_seq << dendl;
296 aio_write_list.push_back(&c->aio_write_list_item);
297 aio_write_list_lock.Unlock();
298}
299
300void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
301{
302 ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
303 aio_write_list_lock.Lock();
304 assert(c->io == this);
305 c->aio_write_list_item.remove_myself();
306
307 map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
308 while (waiters != aio_write_waiters.end()) {
309 if (!aio_write_list.empty() &&
310 aio_write_list.front()->aio_write_seq <= waiters->first) {
311 ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
312 << " <= waiter " << waiters->first
313 << ", stopping" << dendl;
314 break;
315 }
316 ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
317 for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
318 it != waiters->second.end(); ++it) {
319 client->finisher.queue(new C_AioCompleteAndSafe(*it));
320 (*it)->put();
321 }
322 aio_write_waiters.erase(waiters++);
323 }
324
325 aio_write_cond.Signal();
326 aio_write_list_lock.Unlock();
327 put();
328}
329
330void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
331{
332 ldout(client->cct, 20) << "flush_aio_writes_async " << this
333 << " completion " << c << dendl;
334 Mutex::Locker l(aio_write_list_lock);
335 ceph_tid_t seq = aio_write_seq;
336 if (aio_write_list.empty()) {
337 ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
338 << seq << ")" << dendl;
339 client->finisher.queue(new C_AioCompleteAndSafe(c));
340 } else {
341 ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
342 << " writes in flight; waiting on tid " << seq << dendl;
343 c->get();
344 aio_write_waiters[seq].push_back(c);
345 }
346}
347
348void librados::IoCtxImpl::flush_aio_writes()
349{
350 ldout(client->cct, 20) << "flush_aio_writes" << dendl;
351 aio_write_list_lock.Lock();
352 ceph_tid_t seq = aio_write_seq;
353 while (!aio_write_list.empty() &&
354 aio_write_list.front()->aio_write_seq <= seq)
355 aio_write_cond.Wait(aio_write_list_lock);
356 aio_write_list_lock.Unlock();
357}
358
359string librados::IoCtxImpl::get_cached_pool_name()
360{
361 std::string pn;
362 client->pool_get_name(get_id(), &pn);
363 return pn;
364}
365
366// SNAPS
367
368int librados::IoCtxImpl::snap_create(const char *snapName)
369{
370 int reply;
371 string sName(snapName);
372
373 Mutex mylock ("IoCtxImpl::snap_create::mylock");
374 Cond cond;
375 bool done;
376 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
377 reply = objecter->create_pool_snap(poolid, sName, onfinish);
378
379 if (reply < 0) {
380 delete onfinish;
381 } else {
382 mylock.Lock();
383 while (!done)
384 cond.Wait(mylock);
385 mylock.Unlock();
386 }
387 return reply;
388}
389
390int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
391{
392 int reply;
393
394 Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
395 Cond cond;
396 bool done;
397 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
398 snapid_t snapid;
399 reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
400
401 if (reply < 0) {
402 delete onfinish;
403 } else {
404 mylock.Lock();
405 while (!done)
406 cond.Wait(mylock);
407 mylock.Unlock();
408 if (reply == 0)
409 *psnapid = snapid;
410 }
411 return reply;
412}
413
414void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
415 AioCompletionImpl *c)
416{
417 C_aio_selfmanaged_snap_create_Complete *onfinish =
418 new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
419 int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
420 onfinish);
421 if (r < 0) {
422 onfinish->complete(r);
423 }
424}
425
426int librados::IoCtxImpl::snap_remove(const char *snapName)
427{
428 int reply;
429 string sName(snapName);
430
431 Mutex mylock ("IoCtxImpl::snap_remove::mylock");
432 Cond cond;
433 bool done;
434 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
435 reply = objecter->delete_pool_snap(poolid, sName, onfinish);
436
437 if (reply < 0) {
438 delete onfinish;
439 } else {
440 mylock.Lock();
441 while(!done)
442 cond.Wait(mylock);
443 mylock.Unlock();
444 }
445 return reply;
446}
447
448int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
449 ::SnapContext& snapc,
450 uint64_t snapid)
451{
452 int reply;
453
454 Mutex mylock("IoCtxImpl::snap_rollback::mylock");
455 Cond cond;
456 bool done;
457 Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
458
459 ::ObjectOperation op;
460 prepare_assert_ops(&op);
461 op.rollback(snapid);
462 objecter->mutate(oid, oloc,
463 op, snapc, ceph::real_clock::now(), 0,
464 onack, NULL);
465
466 mylock.Lock();
467 while (!done) cond.Wait(mylock);
468 mylock.Unlock();
469 return reply;
470}
471
472int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
473{
474 snapid_t snap;
475
476 int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
477 if (r < 0) {
478 return r;
479 }
480
481 return selfmanaged_snap_rollback_object(oid, snapc, snap);
482}
483
484int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
485{
486 int reply;
487
488 Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
489 Cond cond;
490 bool done;
491 objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
492 new C_SafeCond(&mylock, &cond, &done, &reply));
493
494 mylock.Lock();
495 while (!done) cond.Wait(mylock);
496 mylock.Unlock();
497 return (int)reply;
498}
499
500void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
501 AioCompletionImpl *c)
502{
503 Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
504 objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
505}
506
507int librados::IoCtxImpl::pool_change_auid(unsigned long long auid)
508{
509 int reply;
510
511 Mutex mylock("IoCtxImpl::pool_change_auid::mylock");
512 Cond cond;
513 bool done;
514 objecter->change_pool_auid(poolid,
515 new C_SafeCond(&mylock, &cond, &done, &reply),
516 auid);
517
518 mylock.Lock();
519 while (!done) cond.Wait(mylock);
520 mylock.Unlock();
521 return reply;
522}
523
524int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid,
525 PoolAsyncCompletionImpl *c)
526{
527 objecter->change_pool_auid(poolid,
528 new C_PoolAsync_Safe(c),
529 auid);
530 return 0;
531}
532
533int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
534{
535 return objecter->pool_snap_list(poolid, snaps);
536}
537
538int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
539{
540 return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
541}
542
543int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
544{
545 pool_snap_info_t info;
546 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
547 if (ret < 0) {
548 return ret;
549 }
550 *s = info.name.c_str();
551 return 0;
552}
553
554int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
555{
556 pool_snap_info_t info;
557 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
558 if (ret < 0) {
559 return ret;
560 }
561 *t = info.stamp.sec();
562 return 0;
563}
564
565
566// IO
567
568int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
569{
570 Cond cond;
571 bool done;
572 int r = 0;
573 Mutex mylock("IoCtxImpl::nlist::mylock");
574
575 if (context->at_end())
576 return 0;
577
578 context->max_entries = max_entries;
579 context->nspace = oloc.nspace;
580
581 objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r));
582
583 mylock.Lock();
584 while(!done)
585 cond.Wait(mylock);
586 mylock.Unlock();
587
588 return r;
589}
590
591uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
592 uint32_t pos)
593{
594 context->list.clear();
595 return objecter->list_nobjects_seek(context, pos);
596}
597
598uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
599 const rados_object_list_cursor& cursor)
600{
601 context->list.clear();
602 return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
603}
604
605rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
606{
607 hobject_t *c = new hobject_t;
608
609 objecter->list_nobjects_get_cursor(context, c);
610 return (rados_object_list_cursor)c;
611}
612
613int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
614{
615 ::ObjectOperation op;
616 prepare_assert_ops(&op);
617 op.create(exclusive);
618 return operate(oid, &op, NULL);
619}
620
621/*
622 * add any version assert operations that are appropriate given the
623 * stat in the IoCtx, either the target version assert or any src
624 * object asserts. these affect a single ioctx operation, so clear
625 * the ioctx state when we're doing.
626 *
627 * return a pointer to the ObjectOperation if we added any events;
628 * this is convenient for passing the extra_ops argument into Objecter
629 * methods.
630 */
631::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
632{
633 ::ObjectOperation *pop = NULL;
634 if (assert_ver) {
635 op->assert_version(assert_ver);
636 assert_ver = 0;
637 pop = op;
638 }
639 return pop;
640}
641
642int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
643 size_t len, uint64_t off)
644{
645 if (len > UINT_MAX/2)
646 return -E2BIG;
647 ::ObjectOperation op;
648 prepare_assert_ops(&op);
649 bufferlist mybl;
650 mybl.substr_of(bl, 0, len);
651 op.write(off, mybl);
652 return operate(oid, &op, NULL);
653}
654
655int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
656{
657 if (len > UINT_MAX/2)
658 return -E2BIG;
659 ::ObjectOperation op;
660 prepare_assert_ops(&op);
661 bufferlist mybl;
662 mybl.substr_of(bl, 0, len);
663 op.append(mybl);
664 return operate(oid, &op, NULL);
665}
666
667int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
668{
669 if (bl.length() > UINT_MAX/2)
670 return -E2BIG;
671 ::ObjectOperation op;
672 prepare_assert_ops(&op);
673 op.write_full(bl);
674 return operate(oid, &op, NULL);
675}
676
677int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
678 size_t write_len, uint64_t off)
679{
680 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
681 return -E2BIG;
682 if ((bl.length() == 0) || (write_len % bl.length()))
683 return -EINVAL;
684 ::ObjectOperation op;
685 prepare_assert_ops(&op);
686 bufferlist mybl;
687 mybl.substr_of(bl, 0, bl.length());
688 op.writesame(off, write_len, mybl);
689 return operate(oid, &op, NULL);
690}
691
692int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
693 ceph::real_time *pmtime, int flags)
694{
695 ceph::real_time ut = (pmtime ? *pmtime :
696 ceph::real_clock::now());
697
698 /* can't write to a snapshot */
699 if (snap_seq != CEPH_NOSNAP)
700 return -EROFS;
701
702 if (!o->size())
703 return 0;
704
705 Mutex mylock("IoCtxImpl::operate::mylock");
706 Cond cond;
707 bool done;
708 int r;
709 version_t ver;
710
711 Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
712
713 int op = o->ops[0].op.op;
714 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
715 << " nspace=" << oloc.nspace << dendl;
716 Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
717 *o, snapc, ut, flags,
718 oncommit, &ver);
719 objecter->op_submit(objecter_op);
720
721 mylock.Lock();
722 while (!done)
723 cond.Wait(mylock);
724 mylock.Unlock();
725 ldout(client->cct, 10) << "Objecter returned from "
726 << ceph_osd_op_name(op) << " r=" << r << dendl;
727
728 set_sync_op_version(ver);
729
730 return r;
731}
732
733int librados::IoCtxImpl::operate_read(const object_t& oid,
734 ::ObjectOperation *o,
735 bufferlist *pbl,
736 int flags)
737{
738 if (!o->size())
739 return 0;
740
741 Mutex mylock("IoCtxImpl::operate_read::mylock");
742 Cond cond;
743 bool done;
744 int r;
745 version_t ver;
746
747 Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
748
749 int op = o->ops[0].op.op;
750 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
751 Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
752 *o, snap_seq, pbl, flags,
753 onack, &ver);
754 objecter->op_submit(objecter_op);
755
756 mylock.Lock();
757 while (!done)
758 cond.Wait(mylock);
759 mylock.Unlock();
760 ldout(client->cct, 10) << "Objecter returned from "
761 << ceph_osd_op_name(op) << " r=" << r << dendl;
762
763 set_sync_op_version(ver);
764
765 return r;
766}
767
768int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
769 ::ObjectOperation *o,
770 AioCompletionImpl *c,
771 int flags,
772 bufferlist *pbl,
31f18b77 773 const blkin_trace_info *trace_info)
7c673cae
FG
774{
775 FUNCTRACE();
776 Context *oncomplete = new C_aio_Complete(c);
777
778#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
779 ((C_aio_Complete *) oncomplete)->oid = oid;
780#endif
781 c->is_read = true;
782 c->io = this;
783
784 ZTracer::Trace trace;
31f18b77
FG
785 if (trace_info) {
786 ZTracer::Trace parent_trace("", nullptr, trace_info);
787 trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
788 }
7c673cae
FG
789
790 trace.event("init root span");
791 Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
792 *o, snap_seq, pbl, flags,
793 oncomplete, &c->objver, nullptr, 0, &trace);
794 objecter->op_submit(objecter_op, &c->tid);
795 trace.event("rados operate read submitted");
796
797 return 0;
798}
799
800int librados::IoCtxImpl::aio_operate(const object_t& oid,
801 ::ObjectOperation *o, AioCompletionImpl *c,
802 const SnapContext& snap_context, int flags,
31f18b77 803 const blkin_trace_info *trace_info)
7c673cae
FG
804{
805 FUNCTRACE();
806 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
807 auto ut = ceph::real_clock::now();
808 /* can't write to a snapshot */
809 if (snap_seq != CEPH_NOSNAP)
810 return -EROFS;
811
812 Context *oncomplete = new C_aio_Complete(c);
813#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
814 ((C_aio_Complete *) oncomplete)->oid = oid;
815#endif
816
817 c->io = this;
818 queue_aio_write(c);
819
820 ZTracer::Trace trace;
31f18b77
FG
821 if (trace_info) {
822 ZTracer::Trace parent_trace("", nullptr, trace_info);
823 trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
824 }
7c673cae
FG
825
826 trace.event("init root span");
827 Objecter::Op *op = objecter->prepare_mutate_op(
828 oid, oloc, *o, snap_context, ut, flags,
829 oncomplete, &c->objver, osd_reqid_t(), &trace);
830 objecter->op_submit(op, &c->tid);
831 trace.event("rados operate op submitted");
832
833 return 0;
834}
835
836int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
837 bufferlist *pbl, size_t len, uint64_t off,
838 uint64_t snapid, const blkin_trace_info *info)
839{
840 FUNCTRACE();
841 if (len > (size_t) INT_MAX)
842 return -EDOM;
843
844 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
845 Context *oncomplete = new C_aio_Complete(c);
846
847#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
848 ((C_aio_Complete *) oncomplete)->oid = oid;
849#endif
850 c->is_read = true;
851 c->io = this;
852 c->blp = pbl;
853
854 ZTracer::Trace trace;
855 if (info)
856 trace.init("rados read", &objecter->trace_endpoint, info);
857
858 Objecter::Op *o = objecter->prepare_read_op(
859 oid, oloc,
860 off, len, snapid, pbl, 0,
861 oncomplete, &c->objver, nullptr, 0, &trace);
862 objecter->op_submit(o, &c->tid);
863 return 0;
864}
865
866int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
867 char *buf, size_t len, uint64_t off,
868 uint64_t snapid, const blkin_trace_info *info)
869{
870 FUNCTRACE();
871 if (len > (size_t) INT_MAX)
872 return -EDOM;
873
874 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
875 Context *oncomplete = new C_aio_Complete(c);
876
877#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
878 ((C_aio_Complete *) oncomplete)->oid = oid;
879#endif
880 c->is_read = true;
881 c->io = this;
882 c->bl.clear();
883 c->bl.push_back(buffer::create_static(len, buf));
884 c->blp = &c->bl;
885 c->out_buf = buf;
886
887 ZTracer::Trace trace;
888 if (info)
889 trace.init("rados read", &objecter->trace_endpoint, info);
890
891 Objecter::Op *o = objecter->prepare_read_op(
892 oid, oloc,
893 off, len, snapid, &c->bl, 0,
894 oncomplete, &c->objver, nullptr, 0, &trace);
895 objecter->op_submit(o, &c->tid);
896 return 0;
897}
898
899class C_ObjectOperation : public Context {
900public:
901 ::ObjectOperation m_ops;
902 explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
903 void finish(int r) override {
904 m_ctx->complete(r);
905 }
906private:
907 Context *m_ctx;
908};
909
910int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
911 AioCompletionImpl *c,
912 std::map<uint64_t,uint64_t> *m,
913 bufferlist *data_bl, size_t len,
914 uint64_t off, uint64_t snapid)
915{
916 FUNCTRACE();
917 if (len > (size_t) INT_MAX)
918 return -EDOM;
919
920 Context *nested = new C_aio_Complete(c);
921 C_ObjectOperation *onack = new C_ObjectOperation(nested);
922
923#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
924 ((C_aio_Complete *) nested)->oid = oid;
925#endif
926 c->is_read = true;
927 c->io = this;
928
929 onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
930
931 Objecter::Op *o = objecter->prepare_read_op(
932 oid, oloc,
933 onack->m_ops, snapid, NULL, 0,
934 onack, &c->objver);
935 objecter->op_submit(o, &c->tid);
936 return 0;
937}
938
939int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
940 AioCompletionImpl *c,
941 uint64_t off,
942 bufferlist& cmp_bl)
943{
944 if (cmp_bl.length() > UINT_MAX/2)
945 return -E2BIG;
946
947 Context *onack = new C_aio_Complete(c);
948
949 c->is_read = true;
950 c->io = this;
951
952 Objecter::Op *o = objecter->prepare_cmpext_op(
953 oid, oloc, off, cmp_bl, snap_seq, 0,
954 onack, &c->objver);
955 objecter->op_submit(o, &c->tid);
956
957 return 0;
958}
959
960/* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
961int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
962 AioCompletionImpl *c,
963 const char *cmp_buf,
964 size_t cmp_len,
965 uint64_t off)
966{
967 if (cmp_len > UINT_MAX/2)
968 return -E2BIG;
969
970 bufferlist cmp_bl;
971 cmp_bl.append(cmp_buf, cmp_len);
972
973 Context *nested = new C_aio_Complete(c);
974 C_ObjectOperation *onack = new C_ObjectOperation(nested);
975
976 c->is_read = true;
977 c->io = this;
978
979 onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
980
981 Objecter::Op *o = objecter->prepare_read_op(
982 oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
983 objecter->op_submit(o, &c->tid);
984 return 0;
985}
986
987int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
988 const bufferlist& bl, size_t len,
989 uint64_t off, const blkin_trace_info *info)
990{
991 FUNCTRACE();
992 auto ut = ceph::real_clock::now();
993 ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
994 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
995
996 if (len > UINT_MAX/2)
997 return -E2BIG;
998 /* can't write to a snapshot */
999 if (snap_seq != CEPH_NOSNAP)
1000 return -EROFS;
1001
1002 Context *oncomplete = new C_aio_Complete(c);
1003
1004#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1005 ((C_aio_Complete *) oncomplete)->oid = oid;
1006#endif
1007 ZTracer::Trace trace;
1008 if (info)
1009 trace.init("rados write", &objecter->trace_endpoint, info);
1010
1011 c->io = this;
1012 queue_aio_write(c);
1013
1014 Objecter::Op *o = objecter->prepare_write_op(
1015 oid, oloc,
1016 off, len, snapc, bl, ut, 0,
1017 oncomplete, &c->objver, nullptr, 0, &trace);
1018 objecter->op_submit(o, &c->tid);
1019
1020 return 0;
1021}
1022
1023int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
1024 const bufferlist& bl, size_t len)
1025{
1026 FUNCTRACE();
1027 auto ut = ceph::real_clock::now();
1028
1029 if (len > UINT_MAX/2)
1030 return -E2BIG;
1031 /* can't write to a snapshot */
1032 if (snap_seq != CEPH_NOSNAP)
1033 return -EROFS;
1034
1035 Context *oncomplete = new C_aio_Complete(c);
1036#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1037 ((C_aio_Complete *) oncomplete)->oid = oid;
1038#endif
1039
1040 c->io = this;
1041 queue_aio_write(c);
1042
1043 Objecter::Op *o = objecter->prepare_append_op(
1044 oid, oloc,
1045 len, snapc, bl, ut, 0,
1046 oncomplete, &c->objver);
1047 objecter->op_submit(o, &c->tid);
1048
1049 return 0;
1050}
1051
1052int librados::IoCtxImpl::aio_write_full(const object_t &oid,
1053 AioCompletionImpl *c,
1054 const bufferlist& bl)
1055{
1056 FUNCTRACE();
1057 auto ut = ceph::real_clock::now();
1058
1059 if (bl.length() > UINT_MAX/2)
1060 return -E2BIG;
1061 /* can't write to a snapshot */
1062 if (snap_seq != CEPH_NOSNAP)
1063 return -EROFS;
1064
1065 Context *oncomplete = new C_aio_Complete(c);
1066#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1067 ((C_aio_Complete *) oncomplete)->oid = oid;
1068#endif
1069
1070 c->io = this;
1071 queue_aio_write(c);
1072
1073 Objecter::Op *o = objecter->prepare_write_full_op(
1074 oid, oloc,
1075 snapc, bl, ut, 0,
1076 oncomplete, &c->objver);
1077 objecter->op_submit(o, &c->tid);
1078
1079 return 0;
1080}
1081
1082int librados::IoCtxImpl::aio_writesame(const object_t &oid,
1083 AioCompletionImpl *c,
1084 const bufferlist& bl,
1085 size_t write_len,
1086 uint64_t off)
1087{
1088 FUNCTRACE();
1089 auto ut = ceph::real_clock::now();
1090
1091 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
1092 return -E2BIG;
1093 if ((bl.length() == 0) || (write_len % bl.length()))
1094 return -EINVAL;
1095 /* can't write to a snapshot */
1096 if (snap_seq != CEPH_NOSNAP)
1097 return -EROFS;
1098
1099 Context *oncomplete = new C_aio_Complete(c);
1100
1101#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1102 ((C_aio_Complete *) oncomplete)->oid = oid;
1103#endif
1104 c->io = this;
1105 queue_aio_write(c);
1106
1107 Objecter::Op *o = objecter->prepare_writesame_op(
1108 oid, oloc,
1109 write_len, off,
1110 snapc, bl, ut, 0,
1111 oncomplete, &c->objver);
1112 objecter->op_submit(o, &c->tid);
1113
1114 return 0;
1115}
1116
1117int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
1118{
1119 FUNCTRACE();
1120 auto ut = ceph::real_clock::now();
1121
1122 /* can't write to a snapshot */
1123 if (snap_seq != CEPH_NOSNAP)
1124 return -EROFS;
1125
1126 Context *oncomplete = new C_aio_Complete(c);
1127
1128#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1129 ((C_aio_Complete *) oncomplete)->oid = oid;
1130#endif
1131 c->io = this;
1132 queue_aio_write(c);
1133
1134 Objecter::Op *o = objecter->prepare_remove_op(
1135 oid, oloc,
1136 snapc, ut, flags,
1137 oncomplete, &c->objver);
1138 objecter->op_submit(o, &c->tid);
1139
1140 return 0;
1141}
1142
1143
1144int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
1145 uint64_t *psize, time_t *pmtime)
1146{
1147 C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
1148 c->is_read = true;
1149 c->io = this;
1150 Objecter::Op *o = objecter->prepare_stat_op(
1151 oid, oloc,
1152 snap_seq, psize, &onack->mtime, 0,
1153 onack, &c->objver);
1154 objecter->op_submit(o, &c->tid);
1155 return 0;
1156}
1157
1158int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
1159 uint64_t *psize, struct timespec *pts)
1160{
1161 C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
1162 c->is_read = true;
1163 c->io = this;
1164 Objecter::Op *o = objecter->prepare_stat_op(
1165 oid, oloc,
1166 snap_seq, psize, &onack->mtime, 0,
1167 onack, &c->objver);
1168 objecter->op_submit(o, &c->tid);
1169 return 0;
1170}
1171
1172int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
1173 const char *name, bufferlist& bl)
1174{
1175 ::ObjectOperation rd;
1176 prepare_assert_ops(&rd);
1177 rd.getxattr(name, &bl, NULL);
1178 int r = aio_operate_read(oid, &rd, c, 0, &bl);
1179 return r;
1180}
1181
1182int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
1183 const char *name)
1184{
1185 ::ObjectOperation op;
1186 prepare_assert_ops(&op);
1187 op.rmxattr(name);
1188 return aio_operate(oid, &op, c, snapc, 0);
1189}
1190
1191int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
1192 const char *name, bufferlist& bl)
1193{
1194 ::ObjectOperation op;
1195 prepare_assert_ops(&op);
1196 op.setxattr(name, bl);
1197 return aio_operate(oid, &op, c, snapc, 0);
1198}
1199
1200namespace {
1201struct AioGetxattrsData {
1202 AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
1203 librados::RadosClient *_client) :
1204 user_completion(c), user_attrset(attrset), client(_client) {}
1205 struct librados::C_AioCompleteAndSafe user_completion;
1206 map<string, bufferlist> result_attrset;
1207 map<std::string, bufferlist>* user_attrset;
1208 librados::RadosClient *client;
1209};
1210}
1211
1212static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
1213 AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
1214 int rc = rados_aio_get_return_value(c);
1215 cdata->user_attrset->clear();
1216 if (rc >= 0) {
1217 for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
1218 p != cdata->result_attrset.end();
1219 ++p) {
1220 ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1221 (*cdata->user_attrset)[p->first] = p->second;
1222 }
1223 }
1224 cdata->user_completion.finish(rc);
1225 ((librados::AioCompletionImpl*)c)->put();
1226 delete cdata;
1227}
1228
1229int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
1230 map<std::string, bufferlist>& attrset)
1231{
1232 AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
1233 ::ObjectOperation rd;
1234 prepare_assert_ops(&rd);
1235 rd.getxattrs(&cdata->result_attrset, NULL);
1236 librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
1237 comp->set_complete_callback(cdata, aio_getxattrs_complete);
1238 return aio_operate_read(oid, &rd, comp, 0, NULL);
1239}
1240
1241int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
1242{
1243 return objecter->op_cancel(c->tid, -ECANCELED);
1244}
1245
1246
1247int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
1248 std::list< std::pair<time_t, time_t> > *pls)
1249{
1250 Context *oncomplete = new C_aio_Complete(c);
1251 c->is_read = true;
1252 c->io = this;
1253
1254 ::ObjectOperation rd;
1255 rd.hit_set_ls(pls, NULL);
1256 object_locator_t oloc(poolid);
1257 Objecter::Op *o = objecter->prepare_pg_read_op(
1258 hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
1259 objecter->op_submit(o, &c->tid);
1260 return 0;
1261}
1262
1263int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
1264 time_t stamp,
1265 bufferlist *pbl)
1266{
1267 Context *oncomplete = new C_aio_Complete(c);
1268 c->is_read = true;
1269 c->io = this;
1270
1271 ::ObjectOperation rd;
1272 rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
1273 object_locator_t oloc(poolid);
1274 Objecter::Op *o = objecter->prepare_pg_read_op(
1275 hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
1276 objecter->op_submit(o, &c->tid);
1277 return 0;
1278}
1279
1280int librados::IoCtxImpl::remove(const object_t& oid)
1281{
1282 ::ObjectOperation op;
1283 prepare_assert_ops(&op);
1284 op.remove();
94b18763 1285 return operate(oid, &op, nullptr, librados::OPERATION_FULL_FORCE);
7c673cae
FG
1286}
1287
1288int librados::IoCtxImpl::remove(const object_t& oid, int flags)
1289{
1290 ::ObjectOperation op;
1291 prepare_assert_ops(&op);
1292 op.remove();
1293 return operate(oid, &op, NULL, flags);
1294}
1295
1296int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
1297{
1298 ::ObjectOperation op;
1299 prepare_assert_ops(&op);
1300 op.truncate(size);
1301 return operate(oid, &op, NULL);
1302}
1303
1304int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
1305 const librados::object_id_t& start_after,
1306 uint64_t max_to_get,
1307 AioCompletionImpl *c,
1308 std::vector<inconsistent_obj_t>* objects,
1309 uint32_t* interval)
1310{
1311 Context *oncomplete = new C_aio_Complete(c);
1312 c->is_read = true;
1313 c->io = this;
1314
1315 ::ObjectOperation op;
1316 op.scrub_ls(start_after, max_to_get, objects, interval, nullptr);
1317 object_locator_t oloc{poolid, pg.ps()};
1318 Objecter::Op *o = objecter->prepare_pg_read_op(
1319 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
1320 nullptr, nullptr);
1321 objecter->op_submit(o, &c->tid);
1322 return 0;
1323}
1324
1325int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
1326 const librados::object_id_t& start_after,
1327 uint64_t max_to_get,
1328 AioCompletionImpl *c,
1329 std::vector<inconsistent_snapset_t>* snapsets,
1330 uint32_t* interval)
1331{
1332 Context *oncomplete = new C_aio_Complete(c);
1333 c->is_read = true;
1334 c->io = this;
1335
1336 ::ObjectOperation op;
1337 op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr);
1338 object_locator_t oloc{poolid, pg.ps()};
1339 Objecter::Op *o = objecter->prepare_pg_read_op(
1340 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
1341 nullptr, nullptr);
1342 objecter->op_submit(o, &c->tid);
1343 return 0;
1344}
1345
1346int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
1347{
1348 ::ObjectOperation wr;
1349 prepare_assert_ops(&wr);
1350 wr.tmap_update(cmdbl);
1351 return operate(oid, &wr, NULL);
1352}
1353
1354int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl)
1355{
1356 ::ObjectOperation wr;
1357 prepare_assert_ops(&wr);
1358 wr.tmap_put(bl);
1359 return operate(oid, &wr, NULL);
1360}
1361
1362int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl)
1363{
1364 ::ObjectOperation rd;
1365 prepare_assert_ops(&rd);
1366 rd.tmap_get(&bl, NULL);
1367 return operate_read(oid, &rd, NULL);
1368}
1369
1370int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok)
1371{
1372 ::ObjectOperation wr;
1373 prepare_assert_ops(&wr);
1374 wr.tmap_to_omap(nullok);
1375 return operate(oid, &wr, NULL);
1376}
1377
1378int librados::IoCtxImpl::exec(const object_t& oid,
1379 const char *cls, const char *method,
1380 bufferlist& inbl, bufferlist& outbl)
1381{
1382 ::ObjectOperation rd;
1383 prepare_assert_ops(&rd);
1384 rd.call(cls, method, inbl);
1385 return operate_read(oid, &rd, &outbl);
1386}
1387
1388int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1389 const char *cls, const char *method,
1390 bufferlist& inbl, bufferlist *outbl)
1391{
1392 FUNCTRACE();
1393 Context *oncomplete = new C_aio_Complete(c);
1394
1395#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1396 ((C_aio_Complete *) oncomplete)->oid = oid;
1397#endif
1398 c->is_read = true;
1399 c->io = this;
1400
1401 ::ObjectOperation rd;
1402 prepare_assert_ops(&rd);
1403 rd.call(cls, method, inbl);
1404 Objecter::Op *o = objecter->prepare_read_op(
1405 oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver);
1406 objecter->op_submit(o, &c->tid);
1407 return 0;
1408}
1409
1410int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1411 const char *cls, const char *method,
1412 bufferlist& inbl, char *buf, size_t out_len)
1413{
1414 FUNCTRACE();
1415 Context *oncomplete = new C_aio_Complete(c);
1416
1417#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1418 ((C_aio_Complete *) oncomplete)->oid = oid;
1419#endif
1420 c->is_read = true;
1421 c->io = this;
1422 c->bl.clear();
1423 c->bl.push_back(buffer::create_static(out_len, buf));
1424 c->blp = &c->bl;
1425 c->out_buf = buf;
1426
1427 ::ObjectOperation rd;
1428 prepare_assert_ops(&rd);
1429 rd.call(cls, method, inbl);
1430 Objecter::Op *o = objecter->prepare_read_op(
1431 oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver);
1432 objecter->op_submit(o, &c->tid);
1433 return 0;
1434}
1435
1436int librados::IoCtxImpl::read(const object_t& oid,
1437 bufferlist& bl, size_t len, uint64_t off)
1438{
1439 if (len > (size_t) INT_MAX)
1440 return -EDOM;
1441 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
1442
1443 ::ObjectOperation rd;
1444 prepare_assert_ops(&rd);
1445 rd.read(off, len, &bl, NULL, NULL);
1446 int r = operate_read(oid, &rd, &bl);
1447 if (r < 0)
1448 return r;
1449
1450 if (bl.length() < len) {
1451 ldout(client->cct, 10) << "Returned length " << bl.length()
1452 << " less than original length "<< len << dendl;
1453 }
1454
1455 return bl.length();
1456}
1457
1458int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
1459 bufferlist& cmp_bl)
1460{
1461 if (cmp_bl.length() > UINT_MAX/2)
1462 return -E2BIG;
1463
1464 ::ObjectOperation op;
1465 prepare_assert_ops(&op);
1466 op.cmpext(off, cmp_bl, NULL);
1467 return operate_read(oid, &op, NULL);
1468}
1469
1470int librados::IoCtxImpl::mapext(const object_t& oid,
1471 uint64_t off, size_t len,
1472 std::map<uint64_t,uint64_t>& m)
1473{
1474 bufferlist bl;
1475
1476 Mutex mylock("IoCtxImpl::read::mylock");
1477 Cond cond;
1478 bool done;
1479 int r;
1480 Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
1481
1482 objecter->mapext(oid, oloc,
1483 off, len, snap_seq, &bl, 0,
1484 onack);
1485
1486 mylock.Lock();
1487 while (!done)
1488 cond.Wait(mylock);
1489 mylock.Unlock();
1490 ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
1491
1492 if (r < 0)
1493 return r;
1494
1495 bufferlist::iterator iter = bl.begin();
1496 ::decode(m, iter);
1497
1498 return m.size();
1499}
1500
1501int librados::IoCtxImpl::sparse_read(const object_t& oid,
1502 std::map<uint64_t,uint64_t>& m,
1503 bufferlist& data_bl, size_t len,
1504 uint64_t off)
1505{
1506 if (len > (size_t) INT_MAX)
1507 return -EDOM;
1508
1509 ::ObjectOperation rd;
1510 prepare_assert_ops(&rd);
1511 rd.sparse_read(off, len, &m, &data_bl, NULL);
1512
1513 int r = operate_read(oid, &rd, NULL);
1514 if (r < 0)
1515 return r;
1516
1517 return m.size();
1518}
1519
1520int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
1521 const bufferlist &init_value, size_t len,
1522 uint64_t off, size_t chunk_size,
1523 bufferlist *pbl)
1524{
1525 if (len > (size_t) INT_MAX) {
1526 return -EDOM;
1527 }
1528
1529 ::ObjectOperation rd;
1530 prepare_assert_ops(&rd);
1531 rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
1532
1533 int r = operate_read(oid, &rd, nullptr);
1534 if (r < 0) {
1535 return r;
1536 }
1537
1538 return 0;
1539}
1540
1541int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
1542{
1543 uint64_t size;
1544 real_time mtime;
1545
1546 if (!psize)
1547 psize = &size;
1548
1549 ::ObjectOperation rd;
1550 prepare_assert_ops(&rd);
1551 rd.stat(psize, &mtime, NULL);
1552 int r = operate_read(oid, &rd, NULL);
1553
1554 if (r >= 0 && pmtime) {
1555 *pmtime = real_clock::to_time_t(mtime);
1556 }
1557
1558 return r;
1559}
1560
1561int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
1562{
1563 uint64_t size;
1564 ceph::real_time mtime;
1565
1566 if (!psize)
1567 psize = &size;
1568
1569 ::ObjectOperation rd;
1570 prepare_assert_ops(&rd);
1571 rd.stat(psize, &mtime, NULL);
1572 int r = operate_read(oid, &rd, NULL);
1573 if (r < 0) {
1574 return r;
1575 }
1576
1577 if (pts) {
1578 *pts = ceph::real_clock::to_timespec(mtime);
1579 }
1580
1581 return 0;
1582}
1583
1584int librados::IoCtxImpl::getxattr(const object_t& oid,
1585 const char *name, bufferlist& bl)
1586{
1587 ::ObjectOperation rd;
1588 prepare_assert_ops(&rd);
1589 rd.getxattr(name, &bl, NULL);
1590 int r = operate_read(oid, &rd, &bl);
1591 if (r < 0)
1592 return r;
1593
1594 return bl.length();
1595}
1596
1597int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
1598{
1599 ::ObjectOperation op;
1600 prepare_assert_ops(&op);
1601 op.rmxattr(name);
1602 return operate(oid, &op, NULL);
1603}
1604
1605int librados::IoCtxImpl::setxattr(const object_t& oid,
1606 const char *name, bufferlist& bl)
1607{
1608 ::ObjectOperation op;
1609 prepare_assert_ops(&op);
1610 op.setxattr(name, bl);
1611 return operate(oid, &op, NULL);
1612}
1613
1614int librados::IoCtxImpl::getxattrs(const object_t& oid,
1615 map<std::string, bufferlist>& attrset)
1616{
1617 map<string, bufferlist> aset;
1618
1619 ::ObjectOperation rd;
1620 prepare_assert_ops(&rd);
1621 rd.getxattrs(&aset, NULL);
1622 int r = operate_read(oid, &rd, NULL);
1623
1624 attrset.clear();
1625 if (r >= 0) {
1626 for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
1627 ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1628 attrset[p->first.c_str()] = p->second;
1629 }
1630 }
1631
1632 return r;
1633}
1634
1635void librados::IoCtxImpl::set_sync_op_version(version_t ver)
1636{
1637 ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
1638 "IoCtxImpl last_objver");
1639 last_objver = ver;
1640}
1641
1642struct WatchInfo : public Objecter::WatchContext {
1643 librados::IoCtxImpl *ioctx;
1644 object_t oid;
1645 librados::WatchCtx *ctx;
1646 librados::WatchCtx2 *ctx2;
1647 bool internal = false;
1648
1649 WatchInfo(librados::IoCtxImpl *io, object_t o,
1650 librados::WatchCtx *c, librados::WatchCtx2 *c2,
1651 bool inter)
1652 : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) {
1653 ioctx->get();
1654 }
1655 ~WatchInfo() override {
1656 ioctx->put();
1657 if (internal) {
1658 delete ctx;
1659 delete ctx2;
1660 }
1661 }
1662
1663 void handle_notify(uint64_t notify_id,
1664 uint64_t cookie,
1665 uint64_t notifier_id,
1666 bufferlist& bl) override {
1667 ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
1668 << " cookie " << cookie
1669 << " notifier_id " << notifier_id
1670 << " len " << bl.length()
1671 << dendl;
1672
1673 if (ctx2)
1674 ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
1675 if (ctx) {
1676 ctx->notify(0, 0, bl);
1677
1678 // send ACK back to OSD if using legacy protocol
1679 bufferlist empty;
1680 ioctx->notify_ack(oid, notify_id, cookie, empty);
1681 }
1682 }
1683 void handle_error(uint64_t cookie, int err) override {
1684 ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
1685 << " err " << err
1686 << dendl;
1687 if (ctx2)
1688 ctx2->handle_error(cookie, err);
1689 }
1690};
1691
1692int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1693 librados::WatchCtx *ctx,
1694 librados::WatchCtx2 *ctx2,
1695 bool internal)
1696{
1697 return watch(oid, handle, ctx, ctx2, 0, internal);
1698}
1699
1700int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1701 librados::WatchCtx *ctx,
1702 librados::WatchCtx2 *ctx2,
1703 uint32_t timeout,
1704 bool internal)
1705{
1706 ::ObjectOperation wr;
1707 version_t objver;
1708 C_SaferCond onfinish;
1709
1710 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1711 *handle = linger_op->get_cookie();
1712 linger_op->watch_context = new WatchInfo(this,
1713 oid, ctx, ctx2, internal);
1714
1715 prepare_assert_ops(&wr);
1716 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1717 bufferlist bl;
1718 objecter->linger_watch(linger_op, wr,
1719 snapc, ceph::real_clock::now(), bl,
1720 &onfinish,
1721 &objver);
1722
1723 int r = onfinish.wait();
1724
1725 set_sync_op_version(objver);
1726
1727 if (r < 0) {
1728 objecter->linger_cancel(linger_op);
1729 *handle = 0;
1730 }
1731
1732 return r;
1733}
1734
1735int librados::IoCtxImpl::aio_watch(const object_t& oid,
1736 AioCompletionImpl *c,
1737 uint64_t *handle,
1738 librados::WatchCtx *ctx,
1739 librados::WatchCtx2 *ctx2,
1740 bool internal) {
1741 return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
1742}
1743
1744int librados::IoCtxImpl::aio_watch(const object_t& oid,
1745 AioCompletionImpl *c,
1746 uint64_t *handle,
1747 librados::WatchCtx *ctx,
1748 librados::WatchCtx2 *ctx2,
1749 uint32_t timeout,
1750 bool internal)
1751{
1752 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1753 c->io = this;
1754 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
1755
1756 ::ObjectOperation wr;
1757 *handle = linger_op->get_cookie();
1758 linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
1759
1760 prepare_assert_ops(&wr);
1761 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1762 bufferlist bl;
1763 objecter->linger_watch(linger_op, wr,
1764 snapc, ceph::real_clock::now(), bl,
1765 oncomplete, &c->objver);
1766
1767 return 0;
1768}
1769
1770
1771int librados::IoCtxImpl::notify_ack(
1772 const object_t& oid,
1773 uint64_t notify_id,
1774 uint64_t cookie,
1775 bufferlist& bl)
1776{
1777 ::ObjectOperation rd;
1778 prepare_assert_ops(&rd);
1779 rd.notify_ack(notify_id, cookie, bl);
1780 objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
1781 return 0;
1782}
1783
1784int librados::IoCtxImpl::watch_check(uint64_t cookie)
1785{
1786 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1787 return objecter->linger_check(linger_op);
1788}
1789
1790int librados::IoCtxImpl::unwatch(uint64_t cookie)
1791{
1792 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1793 C_SaferCond onfinish;
1794 version_t ver = 0;
1795
1796 ::ObjectOperation wr;
1797 prepare_assert_ops(&wr);
1798 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1799 objecter->mutate(linger_op->target.base_oid, oloc, wr,
1800 snapc, ceph::real_clock::now(), 0,
1801 &onfinish, &ver);
1802 objecter->linger_cancel(linger_op);
1803
1804 int r = onfinish.wait();
1805 set_sync_op_version(ver);
1806 return r;
1807}
1808
1809int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
1810{
1811 c->io = this;
1812 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1813 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
1814
1815 ::ObjectOperation wr;
1816 prepare_assert_ops(&wr);
1817 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1818 objecter->mutate(linger_op->target.base_oid, oloc, wr,
1819 snapc, ceph::real_clock::now(), 0,
1820 oncomplete, &c->objver);
1821 return 0;
1822}
1823
1824int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
1825 uint64_t timeout_ms,
1826 bufferlist *preply_bl,
1827 char **preply_buf, size_t *preply_buf_len)
1828{
1829 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1830
1831 C_SaferCond notify_finish_cond;
1832 Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
1833 objecter, linger_op, preply_bl,
1834 preply_buf, preply_buf_len);
1835
1836 uint32_t timeout = notify_timeout;
1837 if (timeout_ms)
1838 timeout = timeout_ms / 1000;
1839
1840 // Construct RADOS op
1841 ::ObjectOperation rd;
1842 prepare_assert_ops(&rd);
1843 bufferlist inbl;
1844 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1845
1846 // Issue RADOS op
1847 C_SaferCond onack;
1848 version_t objver;
1849 objecter->linger_notify(linger_op,
1850 rd, snap_seq, inbl, NULL,
1851 &onack, &objver);
1852
1853 ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
1854 int r = onack.wait();
1855 ldout(client->cct, 10) << __func__ << " linger op " << linger_op
1856 << " acked (" << r << ")" << dendl;
1857
1858 if (r == 0) {
1859 ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
1860 << linger_op << dendl;
1861 r = notify_finish_cond.wait();
1862
1863 } else {
1864 ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
1865 << r << dendl;
1866 notify_finish->complete(r);
1867 }
1868
1869 objecter->linger_cancel(linger_op);
1870
1871 set_sync_op_version(objver);
1872 return r;
1873}
1874
1875int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
1876 bufferlist& bl, uint64_t timeout_ms,
1877 bufferlist *preply_bl, char **preply_buf,
1878 size_t *preply_buf_len)
1879{
1880 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1881
1882 c->io = this;
1883
1884 C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
1885 C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
1886 objecter, linger_op,
1887 preply_bl, preply_buf,
1888 preply_buf_len);
1889 Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);
1890
1891 uint32_t timeout = notify_timeout;
1892 if (timeout_ms)
1893 timeout = timeout_ms / 1000;
1894
1895 // Construct RADOS op
1896 ::ObjectOperation rd;
1897 prepare_assert_ops(&rd);
1898 bufferlist inbl;
1899 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1900
1901 // Issue RADOS op
1902 objecter->linger_notify(linger_op,
1903 rd, snap_seq, inbl, NULL,
1904 onack, &c->objver);
1905 return 0;
1906}
1907
1908int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
1909 uint64_t expected_object_size,
1910 uint64_t expected_write_size,
1911 uint32_t flags)
1912{
1913 ::ObjectOperation wr;
1914 prepare_assert_ops(&wr);
1915 wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
1916 return operate(oid, &wr, NULL);
1917}
1918
1919version_t librados::IoCtxImpl::last_version()
1920{
1921 return last_objver;
1922}
1923
1924void librados::IoCtxImpl::set_assert_version(uint64_t ver)
1925{
1926 assert_ver = ver;
1927}
1928
1929void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
1930{
1931 notify_timeout = timeout;
1932}
1933
1934int librados::IoCtxImpl::cache_pin(const object_t& oid)
1935{
1936 ::ObjectOperation wr;
1937 prepare_assert_ops(&wr);
1938 wr.cache_pin();
1939 return operate(oid, &wr, NULL);
1940}
1941
1942int librados::IoCtxImpl::cache_unpin(const object_t& oid)
1943{
1944 ::ObjectOperation wr;
1945 prepare_assert_ops(&wr);
1946 wr.cache_unpin();
1947 return operate(oid, &wr, NULL);
1948}
1949
1950
1951///////////////////////////// C_aio_stat_Ack ////////////////////////////
1952
1953librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
1954 time_t *pm)
1955 : c(_c), pmtime(pm)
1956{
1957 assert(!c->io);
1958 c->get();
1959}
1960
1961void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
1962{
1963 c->lock.Lock();
1964 c->rval = r;
1965 c->complete = true;
1966 c->cond.Signal();
1967
1968 if (r >= 0 && pmtime) {
1969 *pmtime = real_clock::to_time_t(mtime);
1970 }
1971
1972 if (c->callback_complete) {
1973 c->io->client->finisher.queue(new C_AioComplete(c));
1974 }
1975
1976 c->put_unlock();
1977}
1978
1979///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1980
1981librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
1982 struct timespec *pt)
1983 : c(_c), pts(pt)
1984{
1985 assert(!c->io);
1986 c->get();
1987}
1988
1989void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
1990{
1991 c->lock.Lock();
1992 c->rval = r;
1993 c->complete = true;
1994 c->cond.Signal();
1995
1996 if (r >= 0 && pts) {
1997 *pts = real_clock::to_timespec(mtime);
1998 }
1999
2000 if (c->callback_complete) {
2001 c->io->client->finisher.queue(new C_AioComplete(c));
2002 }
2003
2004 c->put_unlock();
2005}
2006
2007//////////////////////////// C_aio_Complete ////////////////////////////////
2008
2009librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
2010 : c(_c)
2011{
2012 c->get();
2013}
2014
2015void librados::IoCtxImpl::C_aio_Complete::finish(int r)
2016{
2017 c->lock.Lock();
2018 c->rval = r;
2019 c->complete = true;
2020 c->cond.Signal();
2021
2022 if (r == 0 && c->blp && c->blp->length() > 0) {
2023 if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
2024 c->blp->copy(0, c->blp->length(), c->out_buf);
2025 c->rval = c->blp->length();
2026 }
2027
2028 if (c->callback_complete ||
2029 c->callback_safe) {
2030 c->io->client->finisher.queue(new C_AioComplete(c));
2031 }
2032
2033 if (c->aio_write_seq) {
2034 c->io->complete_aio_write(c);
2035 }
2036
2037#if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
2038 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
2039#endif
2040 c->put_unlock();
2041}
2042
2043void librados::IoCtxImpl::object_list_slice(
2044 const hobject_t start,
2045 const hobject_t finish,
2046 const size_t n,
2047 const size_t m,
2048 hobject_t *split_start,
2049 hobject_t *split_finish)
2050{
2051 if (start.is_max()) {
2052 *split_start = hobject_t::get_max();
2053 *split_finish = hobject_t::get_max();
2054 return;
2055 }
2056
2057 uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
2058 uint64_t finish_hash =
2059 finish.is_max() ? 0x100000000 :
2060 hobject_t::_reverse_bits(finish.get_hash());
2061
2062 uint64_t diff = finish_hash - start_hash;
2063 uint64_t rev_start = start_hash + (diff * n / m);
2064 uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
2065 if (n == 0) {
2066 *split_start = start;
2067 } else {
2068 *split_start = hobject_t(
2069 object_t(), string(), CEPH_NOSNAP,
2070 hobject_t::_reverse_bits(rev_start), poolid, string());
2071 }
2072
2073 if (n == m - 1)
2074 *split_finish = finish;
2075 else if (rev_finish >= 0x100000000)
2076 *split_finish = hobject_t::get_max();
2077 else
2078 *split_finish = hobject_t(
2079 object_t(), string(), CEPH_NOSNAP,
2080 hobject_t::_reverse_bits(rev_finish), poolid, string());
2081}
2082
c07f9fc5
FG
2083int librados::IoCtxImpl::application_enable(const std::string& app_name,
2084 bool force)
2085{
2086 auto c = new PoolAsyncCompletionImpl();
2087 application_enable_async(app_name, force, c);
2088
2089 int r = c->wait();
2090 assert(r == 0);
2091
2092 r = c->get_return_value();
2093 c->release();
2094 if (r < 0) {
2095 return r;
2096 }
2097
2098 return client->wait_for_latest_osdmap();
2099}
2100
2101void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
2102 bool force,
2103 PoolAsyncCompletionImpl *c)
2104{
2105 // pre-Luminous clusters will return -EINVAL and application won't be
2106 // preserved until Luminous is configured as minimim version.
2107 if (!client->get_required_monitor_features().contains_all(
2108 ceph::features::mon::FEATURE_LUMINOUS)) {
2109 client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP);
2110 return;
2111 }
2112
2113 std::stringstream cmd;
2114 cmd << "{"
2115 << "\"prefix\": \"osd pool application enable\","
2116 << "\"pool\": \"" << get_cached_pool_name() << "\","
2117 << "\"app\": \"" << app_name << "\"";
2118 if (force) {
2119 cmd << ",\"force\":\"--yes-i-really-mean-it\"";
2120 }
2121 cmd << "}";
2122
2123 std::vector<std::string> cmds;
2124 cmds.push_back(cmd.str());
2125 bufferlist inbl;
2126 client->mon_command_async(cmds, inbl, nullptr, nullptr,
2127 new C_PoolAsync_Safe(c));
2128}
2129
2130int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
2131{
2132 int r = 0;
2133 app_names->clear();
2134 objecter->with_osdmap([&](const OSDMap& o) {
2135 auto pg_pool = o.get_pg_pool(poolid);
2136 if (pg_pool == nullptr) {
2137 r = -ENOENT;
2138 return;
2139 }
2140
2141 for (auto &pair : pg_pool->application_metadata) {
2142 app_names->insert(pair.first);
2143 }
2144 });
2145 return r;
2146}
2147
2148int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
2149 const std::string &key,
2150 std::string* value)
2151{
2152 int r = 0;
2153 objecter->with_osdmap([&](const OSDMap& o) {
2154 auto pg_pool = o.get_pg_pool(poolid);
2155 if (pg_pool == nullptr) {
2156 r = -ENOENT;
2157 return;
2158 }
2159
2160 auto app_it = pg_pool->application_metadata.find(app_name);
2161 if (app_it == pg_pool->application_metadata.end()) {
2162 r = -ENOENT;
2163 return;
2164 }
2165
2166 auto it = app_it->second.find(key);
2167 if (it == app_it->second.end()) {
2168 r = -ENOENT;
2169 return;
2170 }
2171
2172 *value = it->second;
2173 });
2174 return r;
2175}
2176
2177int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
2178 const std::string &key,
2179 const std::string& value)
2180{
2181 std::stringstream cmd;
2182 cmd << "{"
2183 << "\"prefix\":\"osd pool application set\","
2184 << "\"pool\":\"" << get_cached_pool_name() << "\","
2185 << "\"app\":\"" << app_name << "\","
2186 << "\"key\":\"" << key << "\","
2187 << "\"value\":\"" << value << "\""
2188 << "}";
2189
2190 std::vector<std::string> cmds;
2191 cmds.push_back(cmd.str());
2192 bufferlist inbl;
2193 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2194 if (r < 0) {
2195 return r;
2196 }
2197
2198 // ensure we have the latest osd map epoch before proceeding
2199 return client->wait_for_latest_osdmap();
2200}
2201
2202int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
2203 const std::string &key)
2204{
2205 std::stringstream cmd;
2206 cmd << "{"
2207 << "\"prefix\":\"osd pool application rm\","
2208 << "\"pool\":\"" << get_cached_pool_name() << "\","
2209 << "\"app\":\"" << app_name << "\","
2210 << "\"key\":\"" << key << "\""
2211 << "}";
2212
2213 std::vector<std::string> cmds;
2214 cmds.push_back(cmd.str());
2215 bufferlist inbl;
2216 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2217 if (r < 0) {
2218 return r;
2219 }
2220
2221 // ensure we have the latest osd map epoch before proceeding
2222 return client->wait_for_latest_osdmap();
2223}
2224
2225int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
2226 std::map<std::string, std::string> *values)
2227{
2228 int r = 0;
2229 values->clear();
2230 objecter->with_osdmap([&](const OSDMap& o) {
2231 auto pg_pool = o.get_pg_pool(poolid);
2232 if (pg_pool == nullptr) {
2233 r = -ENOENT;
2234 return;
2235 }
2236
2237 auto it = pg_pool->application_metadata.find(app_name);
2238 if (it == pg_pool->application_metadata.end()) {
2239 r = -ENOENT;
2240 return;
2241 }
2242
2243 *values = it->second;
2244 });
2245 return r;
2246}
2247