]> git.proxmox.com Git - ceph.git/blob - ceph/src/librados/IoCtxImpl.cc
update sources to 12.2.7
[ceph.git] / ceph / src / librados / IoCtxImpl.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <limits.h>
16
17 #include "IoCtxImpl.h"
18
19 #include "librados/AioCompletionImpl.h"
20 #include "librados/PoolAsyncCompletionImpl.h"
21 #include "librados/RadosClient.h"
22 #include "include/assert.h"
23 #include "common/valgrind.h"
24 #include "common/EventTrace.h"
25
26 #define dout_subsys ceph_subsys_rados
27 #undef dout_prefix
28 #define dout_prefix *_dout << "librados: "
29
30 namespace librados {
31 namespace {
32
33 struct C_notify_Finish : public Context {
34 CephContext *cct;
35 Context *ctx;
36 Objecter *objecter;
37 Objecter::LingerOp *linger_op;
38 bufferlist reply_bl;
39 bufferlist *preply_bl;
40 char **preply_buf;
41 size_t *preply_buf_len;
42
43 C_notify_Finish(CephContext *_cct, Context *_ctx, Objecter *_objecter,
44 Objecter::LingerOp *_linger_op, bufferlist *_preply_bl,
45 char **_preply_buf, size_t *_preply_buf_len)
46 : cct(_cct), ctx(_ctx), objecter(_objecter), linger_op(_linger_op),
47 preply_bl(_preply_bl), preply_buf(_preply_buf),
48 preply_buf_len(_preply_buf_len)
49 {
50 linger_op->on_notify_finish = this;
51 linger_op->notify_result_bl = &reply_bl;
52 }
53
54 void finish(int r) override
55 {
56 ldout(cct, 10) << __func__ << " completed notify (linger op "
57 << linger_op << "), r = " << r << dendl;
58
59 // pass result back to user
60 // NOTE: we do this regardless of what error code we return
61 if (preply_buf) {
62 if (reply_bl.length()) {
63 *preply_buf = (char*)malloc(reply_bl.length());
64 memcpy(*preply_buf, reply_bl.c_str(), reply_bl.length());
65 } else {
66 *preply_buf = NULL;
67 }
68 }
69 if (preply_buf_len)
70 *preply_buf_len = reply_bl.length();
71 if (preply_bl)
72 preply_bl->claim(reply_bl);
73
74 ctx->complete(r);
75 }
76 };
77
78 struct C_aio_linger_cancel : public Context {
79 Objecter *objecter;
80 Objecter::LingerOp *linger_op;
81
82 C_aio_linger_cancel(Objecter *_objecter, Objecter::LingerOp *_linger_op)
83 : objecter(_objecter), linger_op(_linger_op)
84 {
85 }
86
87 void finish(int r) override
88 {
89 objecter->linger_cancel(linger_op);
90 }
91 };
92
93 struct C_aio_linger_Complete : public Context {
94 AioCompletionImpl *c;
95 Objecter::LingerOp *linger_op;
96 bool cancel;
97
98 C_aio_linger_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op, bool _cancel)
99 : c(_c), linger_op(_linger_op), cancel(_cancel)
100 {
101 c->get();
102 }
103
104 void finish(int r) override {
105 if (cancel || r < 0)
106 c->io->client->finisher.queue(new C_aio_linger_cancel(c->io->objecter,
107 linger_op));
108
109 c->lock.Lock();
110 c->rval = r;
111 c->complete = true;
112 c->cond.Signal();
113
114 if (c->callback_complete ||
115 c->callback_safe) {
116 c->io->client->finisher.queue(new C_AioComplete(c));
117 }
118 c->put_unlock();
119 }
120 };
121
122 struct C_aio_notify_Complete : public C_aio_linger_Complete {
123 Mutex lock;
124 bool acked = false;
125 bool finished = false;
126 int ret_val = 0;
127
128 C_aio_notify_Complete(AioCompletionImpl *_c, Objecter::LingerOp *_linger_op)
129 : C_aio_linger_Complete(_c, _linger_op, false),
130 lock("C_aio_notify_Complete::lock") {
131 }
132
133 void handle_ack(int r) {
134 // invoked by C_aio_notify_Ack
135 lock.Lock();
136 acked = true;
137 complete_unlock(r);
138 }
139
140 void complete(int r) override {
141 // invoked by C_notify_Finish
142 lock.Lock();
143 finished = true;
144 complete_unlock(r);
145 }
146
147 void complete_unlock(int r) {
148 if (ret_val == 0 && r < 0) {
149 ret_val = r;
150 }
151
152 if (acked && finished) {
153 lock.Unlock();
154 cancel = true;
155 C_aio_linger_Complete::complete(ret_val);
156 } else {
157 lock.Unlock();
158 }
159 }
160 };
161
162 struct C_aio_notify_Ack : public Context {
163 CephContext *cct;
164 C_notify_Finish *onfinish;
165 C_aio_notify_Complete *oncomplete;
166
167 C_aio_notify_Ack(CephContext *_cct, C_notify_Finish *_onfinish,
168 C_aio_notify_Complete *_oncomplete)
169 : cct(_cct), onfinish(_onfinish), oncomplete(_oncomplete)
170 {
171 }
172
173 void finish(int r) override
174 {
175 ldout(cct, 10) << __func__ << " linger op " << oncomplete->linger_op << " "
176 << "acked (" << r << ")" << dendl;
177 oncomplete->handle_ack(r);
178 }
179 };
180
181 struct C_aio_selfmanaged_snap_op_Complete : public Context {
182 librados::RadosClient *client;
183 librados::AioCompletionImpl *c;
184
185 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient *client,
186 librados::AioCompletionImpl *c)
187 : client(client), c(c) {
188 c->get();
189 }
190
191 void finish(int r) override {
192 c->lock.Lock();
193 c->rval = r;
194 c->complete = true;
195 c->cond.Signal();
196
197 if (c->callback_complete || c->callback_safe) {
198 client->finisher.queue(new librados::C_AioComplete(c));
199 }
200 c->put_unlock();
201 }
202 };
203
204 struct C_aio_selfmanaged_snap_create_Complete : public C_aio_selfmanaged_snap_op_Complete {
205 snapid_t snapid;
206 uint64_t *dest_snapid;
207
208 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient *client,
209 librados::AioCompletionImpl *c,
210 uint64_t *dest_snapid)
211 : C_aio_selfmanaged_snap_op_Complete(client, c),
212 dest_snapid(dest_snapid) {
213 }
214
215 void finish(int r) override {
216 if (r >= 0) {
217 *dest_snapid = snapid;
218 }
219 C_aio_selfmanaged_snap_op_Complete::finish(r);
220 }
221 };
222
223 } // anonymous namespace
224 } // namespace librados
225
226 librados::IoCtxImpl::IoCtxImpl() :
227 ref_cnt(0), client(NULL), poolid(0), assert_ver(0), last_objver(0),
228 notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
229 aio_write_seq(0), objecter(NULL)
230 {
231 }
232
233 librados::IoCtxImpl::IoCtxImpl(RadosClient *c, Objecter *objecter,
234 int64_t poolid, snapid_t s)
235 : ref_cnt(0), client(c), poolid(poolid), snap_seq(s),
236 assert_ver(0), last_objver(0),
237 notify_timeout(c->cct->_conf->client_notify_timeout),
238 oloc(poolid), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
239 aio_write_seq(0), objecter(objecter)
240 {
241 }
242
243 void librados::IoCtxImpl::set_snap_read(snapid_t s)
244 {
245 if (!s)
246 s = CEPH_NOSNAP;
247 ldout(client->cct, 10) << "set snap read " << snap_seq << " -> " << s << dendl;
248 snap_seq = s;
249 }
250
251 int librados::IoCtxImpl::set_snap_write_context(snapid_t seq, vector<snapid_t>& snaps)
252 {
253 ::SnapContext n;
254 ldout(client->cct, 10) << "set snap write context: seq = " << seq
255 << " and snaps = " << snaps << dendl;
256 n.seq = seq;
257 n.snaps = snaps;
258 if (!n.is_valid())
259 return -EINVAL;
260 snapc = n;
261 return 0;
262 }
263
264 int librados::IoCtxImpl::get_object_hash_position(
265 const std::string& oid, uint32_t *hash_position)
266 {
267 int64_t r = objecter->get_object_hash_position(poolid, oid, oloc.nspace);
268 if (r < 0)
269 return r;
270 *hash_position = (uint32_t)r;
271 return 0;
272 }
273
274 int librados::IoCtxImpl::get_object_pg_hash_position(
275 const std::string& oid, uint32_t *pg_hash_position)
276 {
277 int64_t r = objecter->get_object_pg_hash_position(poolid, oid, oloc.nspace);
278 if (r < 0)
279 return r;
280 *pg_hash_position = (uint32_t)r;
281 return 0;
282 }
283
284 void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl *c)
285 {
286 get();
287 aio_write_list_lock.Lock();
288 assert(c->io == this);
289 c->aio_write_seq = ++aio_write_seq;
290 ldout(client->cct, 20) << "queue_aio_write " << this << " completion " << c
291 << " write_seq " << aio_write_seq << dendl;
292 aio_write_list.push_back(&c->aio_write_list_item);
293 aio_write_list_lock.Unlock();
294 }
295
296 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl *c)
297 {
298 ldout(client->cct, 20) << "complete_aio_write " << c << dendl;
299 aio_write_list_lock.Lock();
300 assert(c->io == this);
301 c->aio_write_list_item.remove_myself();
302
303 map<ceph_tid_t, std::list<AioCompletionImpl*> >::iterator waiters = aio_write_waiters.begin();
304 while (waiters != aio_write_waiters.end()) {
305 if (!aio_write_list.empty() &&
306 aio_write_list.front()->aio_write_seq <= waiters->first) {
307 ldout(client->cct, 20) << " next outstanding write is " << aio_write_list.front()->aio_write_seq
308 << " <= waiter " << waiters->first
309 << ", stopping" << dendl;
310 break;
311 }
312 ldout(client->cct, 20) << " waking waiters on seq " << waiters->first << dendl;
313 for (std::list<AioCompletionImpl*>::iterator it = waiters->second.begin();
314 it != waiters->second.end(); ++it) {
315 client->finisher.queue(new C_AioCompleteAndSafe(*it));
316 (*it)->put();
317 }
318 aio_write_waiters.erase(waiters++);
319 }
320
321 aio_write_cond.Signal();
322 aio_write_list_lock.Unlock();
323 put();
324 }
325
326 void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl *c)
327 {
328 ldout(client->cct, 20) << "flush_aio_writes_async " << this
329 << " completion " << c << dendl;
330 Mutex::Locker l(aio_write_list_lock);
331 ceph_tid_t seq = aio_write_seq;
332 if (aio_write_list.empty()) {
333 ldout(client->cct, 20) << "flush_aio_writes_async no writes. (tid "
334 << seq << ")" << dendl;
335 client->finisher.queue(new C_AioCompleteAndSafe(c));
336 } else {
337 ldout(client->cct, 20) << "flush_aio_writes_async " << aio_write_list.size()
338 << " writes in flight; waiting on tid " << seq << dendl;
339 c->get();
340 aio_write_waiters[seq].push_back(c);
341 }
342 }
343
344 void librados::IoCtxImpl::flush_aio_writes()
345 {
346 ldout(client->cct, 20) << "flush_aio_writes" << dendl;
347 aio_write_list_lock.Lock();
348 ceph_tid_t seq = aio_write_seq;
349 while (!aio_write_list.empty() &&
350 aio_write_list.front()->aio_write_seq <= seq)
351 aio_write_cond.Wait(aio_write_list_lock);
352 aio_write_list_lock.Unlock();
353 }
354
355 string librados::IoCtxImpl::get_cached_pool_name()
356 {
357 std::string pn;
358 client->pool_get_name(get_id(), &pn);
359 return pn;
360 }
361
362 // SNAPS
363
364 int librados::IoCtxImpl::snap_create(const char *snapName)
365 {
366 int reply;
367 string sName(snapName);
368
369 Mutex mylock ("IoCtxImpl::snap_create::mylock");
370 Cond cond;
371 bool done;
372 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
373 reply = objecter->create_pool_snap(poolid, sName, onfinish);
374
375 if (reply < 0) {
376 delete onfinish;
377 } else {
378 mylock.Lock();
379 while (!done)
380 cond.Wait(mylock);
381 mylock.Unlock();
382 }
383 return reply;
384 }
385
386 int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid)
387 {
388 int reply;
389
390 Mutex mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
391 Cond cond;
392 bool done;
393 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
394 snapid_t snapid;
395 reply = objecter->allocate_selfmanaged_snap(poolid, &snapid, onfinish);
396
397 if (reply < 0) {
398 delete onfinish;
399 } else {
400 mylock.Lock();
401 while (!done)
402 cond.Wait(mylock);
403 mylock.Unlock();
404 if (reply == 0)
405 *psnapid = snapid;
406 }
407 return reply;
408 }
409
410 void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid,
411 AioCompletionImpl *c)
412 {
413 C_aio_selfmanaged_snap_create_Complete *onfinish =
414 new C_aio_selfmanaged_snap_create_Complete(client, c, snapid);
415 int r = objecter->allocate_selfmanaged_snap(poolid, &onfinish->snapid,
416 onfinish);
417 if (r < 0) {
418 onfinish->complete(r);
419 }
420 }
421
422 int librados::IoCtxImpl::snap_remove(const char *snapName)
423 {
424 int reply;
425 string sName(snapName);
426
427 Mutex mylock ("IoCtxImpl::snap_remove::mylock");
428 Cond cond;
429 bool done;
430 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &reply);
431 reply = objecter->delete_pool_snap(poolid, sName, onfinish);
432
433 if (reply < 0) {
434 delete onfinish;
435 } else {
436 mylock.Lock();
437 while(!done)
438 cond.Wait(mylock);
439 mylock.Unlock();
440 }
441 return reply;
442 }
443
444 int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
445 ::SnapContext& snapc,
446 uint64_t snapid)
447 {
448 int reply;
449
450 Mutex mylock("IoCtxImpl::snap_rollback::mylock");
451 Cond cond;
452 bool done;
453 Context *onack = new C_SafeCond(&mylock, &cond, &done, &reply);
454
455 ::ObjectOperation op;
456 prepare_assert_ops(&op);
457 op.rollback(snapid);
458 objecter->mutate(oid, oloc,
459 op, snapc, ceph::real_clock::now(), 0,
460 onack, NULL);
461
462 mylock.Lock();
463 while (!done) cond.Wait(mylock);
464 mylock.Unlock();
465 return reply;
466 }
467
468 int librados::IoCtxImpl::rollback(const object_t& oid, const char *snapName)
469 {
470 snapid_t snap;
471
472 int r = objecter->pool_snap_by_name(poolid, snapName, &snap);
473 if (r < 0) {
474 return r;
475 }
476
477 return selfmanaged_snap_rollback_object(oid, snapc, snap);
478 }
479
480 int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid)
481 {
482 int reply;
483
484 Mutex mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
485 Cond cond;
486 bool done;
487 objecter->delete_selfmanaged_snap(poolid, snapid_t(snapid),
488 new C_SafeCond(&mylock, &cond, &done, &reply));
489
490 mylock.Lock();
491 while (!done) cond.Wait(mylock);
492 mylock.Unlock();
493 return (int)reply;
494 }
495
496 void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid,
497 AioCompletionImpl *c)
498 {
499 Context *onfinish = new C_aio_selfmanaged_snap_op_Complete(client, c);
500 objecter->delete_selfmanaged_snap(poolid, snapid, onfinish);
501 }
502
503 int librados::IoCtxImpl::pool_change_auid(unsigned long long auid)
504 {
505 int reply;
506
507 Mutex mylock("IoCtxImpl::pool_change_auid::mylock");
508 Cond cond;
509 bool done;
510 objecter->change_pool_auid(poolid,
511 new C_SafeCond(&mylock, &cond, &done, &reply),
512 auid);
513
514 mylock.Lock();
515 while (!done) cond.Wait(mylock);
516 mylock.Unlock();
517 return reply;
518 }
519
520 int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid,
521 PoolAsyncCompletionImpl *c)
522 {
523 objecter->change_pool_auid(poolid,
524 new C_PoolAsync_Safe(c),
525 auid);
526 return 0;
527 }
528
529 int librados::IoCtxImpl::snap_list(vector<uint64_t> *snaps)
530 {
531 return objecter->pool_snap_list(poolid, snaps);
532 }
533
534 int librados::IoCtxImpl::snap_lookup(const char *name, uint64_t *snapid)
535 {
536 return objecter->pool_snap_by_name(poolid, name, (snapid_t *)snapid);
537 }
538
539 int librados::IoCtxImpl::snap_get_name(uint64_t snapid, std::string *s)
540 {
541 pool_snap_info_t info;
542 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
543 if (ret < 0) {
544 return ret;
545 }
546 *s = info.name.c_str();
547 return 0;
548 }
549
550 int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid, time_t *t)
551 {
552 pool_snap_info_t info;
553 int ret = objecter->pool_snap_get_info(poolid, snapid, &info);
554 if (ret < 0) {
555 return ret;
556 }
557 *t = info.stamp.sec();
558 return 0;
559 }
560
561
562 // IO
563
564 int librados::IoCtxImpl::nlist(Objecter::NListContext *context, int max_entries)
565 {
566 Cond cond;
567 bool done;
568 int r = 0;
569 Mutex mylock("IoCtxImpl::nlist::mylock");
570
571 if (context->at_end())
572 return 0;
573
574 context->max_entries = max_entries;
575 context->nspace = oloc.nspace;
576
577 objecter->list_nobjects(context, new C_SafeCond(&mylock, &cond, &done, &r));
578
579 mylock.Lock();
580 while(!done)
581 cond.Wait(mylock);
582 mylock.Unlock();
583
584 return r;
585 }
586
587 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
588 uint32_t pos)
589 {
590 context->list.clear();
591 return objecter->list_nobjects_seek(context, pos);
592 }
593
594 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext *context,
595 const rados_object_list_cursor& cursor)
596 {
597 context->list.clear();
598 return objecter->list_nobjects_seek(context, *(const hobject_t *)cursor);
599 }
600
601 rados_object_list_cursor librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext *context)
602 {
603 hobject_t *c = new hobject_t;
604
605 objecter->list_nobjects_get_cursor(context, c);
606 return (rados_object_list_cursor)c;
607 }
608
609 int librados::IoCtxImpl::create(const object_t& oid, bool exclusive)
610 {
611 ::ObjectOperation op;
612 prepare_assert_ops(&op);
613 op.create(exclusive);
614 return operate(oid, &op, NULL);
615 }
616
617 /*
618 * add any version assert operations that are appropriate given the
619 * stat in the IoCtx, either the target version assert or any src
620 * object asserts. these affect a single ioctx operation, so clear
621 * the ioctx state when we're doing.
622 *
623 * return a pointer to the ObjectOperation if we added any events;
624 * this is convenient for passing the extra_ops argument into Objecter
625 * methods.
626 */
627 ::ObjectOperation *librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation *op)
628 {
629 ::ObjectOperation *pop = NULL;
630 if (assert_ver) {
631 op->assert_version(assert_ver);
632 assert_ver = 0;
633 pop = op;
634 }
635 return pop;
636 }
637
638 int librados::IoCtxImpl::write(const object_t& oid, bufferlist& bl,
639 size_t len, uint64_t off)
640 {
641 if (len > UINT_MAX/2)
642 return -E2BIG;
643 ::ObjectOperation op;
644 prepare_assert_ops(&op);
645 bufferlist mybl;
646 mybl.substr_of(bl, 0, len);
647 op.write(off, mybl);
648 return operate(oid, &op, NULL);
649 }
650
651 int librados::IoCtxImpl::append(const object_t& oid, bufferlist& bl, size_t len)
652 {
653 if (len > UINT_MAX/2)
654 return -E2BIG;
655 ::ObjectOperation op;
656 prepare_assert_ops(&op);
657 bufferlist mybl;
658 mybl.substr_of(bl, 0, len);
659 op.append(mybl);
660 return operate(oid, &op, NULL);
661 }
662
663 int librados::IoCtxImpl::write_full(const object_t& oid, bufferlist& bl)
664 {
665 if (bl.length() > UINT_MAX/2)
666 return -E2BIG;
667 ::ObjectOperation op;
668 prepare_assert_ops(&op);
669 op.write_full(bl);
670 return operate(oid, &op, NULL);
671 }
672
673 int librados::IoCtxImpl::writesame(const object_t& oid, bufferlist& bl,
674 size_t write_len, uint64_t off)
675 {
676 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
677 return -E2BIG;
678 if ((bl.length() == 0) || (write_len % bl.length()))
679 return -EINVAL;
680 ::ObjectOperation op;
681 prepare_assert_ops(&op);
682 bufferlist mybl;
683 mybl.substr_of(bl, 0, bl.length());
684 op.writesame(off, write_len, mybl);
685 return operate(oid, &op, NULL);
686 }
687
688 int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o,
689 ceph::real_time *pmtime, int flags)
690 {
691 ceph::real_time ut = (pmtime ? *pmtime :
692 ceph::real_clock::now());
693
694 /* can't write to a snapshot */
695 if (snap_seq != CEPH_NOSNAP)
696 return -EROFS;
697
698 if (!o->size())
699 return 0;
700
701 Mutex mylock("IoCtxImpl::operate::mylock");
702 Cond cond;
703 bool done;
704 int r;
705 version_t ver;
706
707 Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
708
709 int op = o->ops[0].op.op;
710 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid
711 << " nspace=" << oloc.nspace << dendl;
712 Objecter::Op *objecter_op = objecter->prepare_mutate_op(oid, oloc,
713 *o, snapc, ut, flags,
714 oncommit, &ver);
715 objecter->op_submit(objecter_op);
716
717 mylock.Lock();
718 while (!done)
719 cond.Wait(mylock);
720 mylock.Unlock();
721 ldout(client->cct, 10) << "Objecter returned from "
722 << ceph_osd_op_name(op) << " r=" << r << dendl;
723
724 set_sync_op_version(ver);
725
726 return r;
727 }
728
729 int librados::IoCtxImpl::operate_read(const object_t& oid,
730 ::ObjectOperation *o,
731 bufferlist *pbl,
732 int flags)
733 {
734 if (!o->size())
735 return 0;
736
737 Mutex mylock("IoCtxImpl::operate_read::mylock");
738 Cond cond;
739 bool done;
740 int r;
741 version_t ver;
742
743 Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
744
745 int op = o->ops[0].op.op;
746 ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl;
747 Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
748 *o, snap_seq, pbl, flags,
749 onack, &ver);
750 objecter->op_submit(objecter_op);
751
752 mylock.Lock();
753 while (!done)
754 cond.Wait(mylock);
755 mylock.Unlock();
756 ldout(client->cct, 10) << "Objecter returned from "
757 << ceph_osd_op_name(op) << " r=" << r << dendl;
758
759 set_sync_op_version(ver);
760
761 return r;
762 }
763
764 int librados::IoCtxImpl::aio_operate_read(const object_t &oid,
765 ::ObjectOperation *o,
766 AioCompletionImpl *c,
767 int flags,
768 bufferlist *pbl,
769 const blkin_trace_info *trace_info)
770 {
771 FUNCTRACE();
772 Context *oncomplete = new C_aio_Complete(c);
773
774 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
775 ((C_aio_Complete *) oncomplete)->oid = oid;
776 #endif
777 c->is_read = true;
778 c->io = this;
779
780 ZTracer::Trace trace;
781 if (trace_info) {
782 ZTracer::Trace parent_trace("", nullptr, trace_info);
783 trace.init("rados operate read", &objecter->trace_endpoint, &parent_trace);
784 }
785
786 trace.event("init root span");
787 Objecter::Op *objecter_op = objecter->prepare_read_op(oid, oloc,
788 *o, snap_seq, pbl, flags,
789 oncomplete, &c->objver, nullptr, 0, &trace);
790 objecter->op_submit(objecter_op, &c->tid);
791 trace.event("rados operate read submitted");
792
793 return 0;
794 }
795
796 int librados::IoCtxImpl::aio_operate(const object_t& oid,
797 ::ObjectOperation *o, AioCompletionImpl *c,
798 const SnapContext& snap_context, int flags,
799 const blkin_trace_info *trace_info)
800 {
801 FUNCTRACE();
802 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
803 auto ut = ceph::real_clock::now();
804 /* can't write to a snapshot */
805 if (snap_seq != CEPH_NOSNAP)
806 return -EROFS;
807
808 Context *oncomplete = new C_aio_Complete(c);
809 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
810 ((C_aio_Complete *) oncomplete)->oid = oid;
811 #endif
812
813 c->io = this;
814 queue_aio_write(c);
815
816 ZTracer::Trace trace;
817 if (trace_info) {
818 ZTracer::Trace parent_trace("", nullptr, trace_info);
819 trace.init("rados operate", &objecter->trace_endpoint, &parent_trace);
820 }
821
822 trace.event("init root span");
823 Objecter::Op *op = objecter->prepare_mutate_op(
824 oid, oloc, *o, snap_context, ut, flags,
825 oncomplete, &c->objver, osd_reqid_t(), &trace);
826 objecter->op_submit(op, &c->tid);
827 trace.event("rados operate op submitted");
828
829 return 0;
830 }
831
832 int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
833 bufferlist *pbl, size_t len, uint64_t off,
834 uint64_t snapid, const blkin_trace_info *info)
835 {
836 FUNCTRACE();
837 if (len > (size_t) INT_MAX)
838 return -EDOM;
839
840 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
841 Context *oncomplete = new C_aio_Complete(c);
842
843 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
844 ((C_aio_Complete *) oncomplete)->oid = oid;
845 #endif
846 c->is_read = true;
847 c->io = this;
848 c->blp = pbl;
849
850 ZTracer::Trace trace;
851 if (info)
852 trace.init("rados read", &objecter->trace_endpoint, info);
853
854 Objecter::Op *o = objecter->prepare_read_op(
855 oid, oloc,
856 off, len, snapid, pbl, 0,
857 oncomplete, &c->objver, nullptr, 0, &trace);
858 objecter->op_submit(o, &c->tid);
859 return 0;
860 }
861
862 int librados::IoCtxImpl::aio_read(const object_t oid, AioCompletionImpl *c,
863 char *buf, size_t len, uint64_t off,
864 uint64_t snapid, const blkin_trace_info *info)
865 {
866 FUNCTRACE();
867 if (len > (size_t) INT_MAX)
868 return -EDOM;
869
870 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
871 Context *oncomplete = new C_aio_Complete(c);
872
873 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
874 ((C_aio_Complete *) oncomplete)->oid = oid;
875 #endif
876 c->is_read = true;
877 c->io = this;
878 c->bl.clear();
879 c->bl.push_back(buffer::create_static(len, buf));
880 c->blp = &c->bl;
881 c->out_buf = buf;
882
883 ZTracer::Trace trace;
884 if (info)
885 trace.init("rados read", &objecter->trace_endpoint, info);
886
887 Objecter::Op *o = objecter->prepare_read_op(
888 oid, oloc,
889 off, len, snapid, &c->bl, 0,
890 oncomplete, &c->objver, nullptr, 0, &trace);
891 objecter->op_submit(o, &c->tid);
892 return 0;
893 }
894
895 class C_ObjectOperation : public Context {
896 public:
897 ::ObjectOperation m_ops;
898 explicit C_ObjectOperation(Context *c) : m_ctx(c) {}
899 void finish(int r) override {
900 m_ctx->complete(r);
901 }
902 private:
903 Context *m_ctx;
904 };
905
906 int librados::IoCtxImpl::aio_sparse_read(const object_t oid,
907 AioCompletionImpl *c,
908 std::map<uint64_t,uint64_t> *m,
909 bufferlist *data_bl, size_t len,
910 uint64_t off, uint64_t snapid)
911 {
912 FUNCTRACE();
913 if (len > (size_t) INT_MAX)
914 return -EDOM;
915
916 Context *nested = new C_aio_Complete(c);
917 C_ObjectOperation *onack = new C_ObjectOperation(nested);
918
919 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
920 ((C_aio_Complete *) nested)->oid = oid;
921 #endif
922 c->is_read = true;
923 c->io = this;
924
925 onack->m_ops.sparse_read(off, len, m, data_bl, NULL);
926
927 Objecter::Op *o = objecter->prepare_read_op(
928 oid, oloc,
929 onack->m_ops, snapid, NULL, 0,
930 onack, &c->objver);
931 objecter->op_submit(o, &c->tid);
932 return 0;
933 }
934
935 int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
936 AioCompletionImpl *c,
937 uint64_t off,
938 bufferlist& cmp_bl)
939 {
940 if (cmp_bl.length() > UINT_MAX/2)
941 return -E2BIG;
942
943 Context *onack = new C_aio_Complete(c);
944
945 c->is_read = true;
946 c->io = this;
947
948 Objecter::Op *o = objecter->prepare_cmpext_op(
949 oid, oloc, off, cmp_bl, snap_seq, 0,
950 onack, &c->objver);
951 objecter->op_submit(o, &c->tid);
952
953 return 0;
954 }
955
956 /* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
957 int librados::IoCtxImpl::aio_cmpext(const object_t& oid,
958 AioCompletionImpl *c,
959 const char *cmp_buf,
960 size_t cmp_len,
961 uint64_t off)
962 {
963 if (cmp_len > UINT_MAX/2)
964 return -E2BIG;
965
966 bufferlist cmp_bl;
967 cmp_bl.append(cmp_buf, cmp_len);
968
969 Context *nested = new C_aio_Complete(c);
970 C_ObjectOperation *onack = new C_ObjectOperation(nested);
971
972 c->is_read = true;
973 c->io = this;
974
975 onack->m_ops.cmpext(off, cmp_len, cmp_buf, NULL);
976
977 Objecter::Op *o = objecter->prepare_read_op(
978 oid, oloc, onack->m_ops, snap_seq, NULL, 0, onack, &c->objver);
979 objecter->op_submit(o, &c->tid);
980 return 0;
981 }
982
983 int librados::IoCtxImpl::aio_write(const object_t &oid, AioCompletionImpl *c,
984 const bufferlist& bl, size_t len,
985 uint64_t off, const blkin_trace_info *info)
986 {
987 FUNCTRACE();
988 auto ut = ceph::real_clock::now();
989 ldout(client->cct, 20) << "aio_write " << oid << " " << off << "~" << len << " snapc=" << snapc << " snap_seq=" << snap_seq << dendl;
990 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_WRITE_OP_BEGIN");
991
992 if (len > UINT_MAX/2)
993 return -E2BIG;
994 /* can't write to a snapshot */
995 if (snap_seq != CEPH_NOSNAP)
996 return -EROFS;
997
998 Context *oncomplete = new C_aio_Complete(c);
999
1000 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1001 ((C_aio_Complete *) oncomplete)->oid = oid;
1002 #endif
1003 ZTracer::Trace trace;
1004 if (info)
1005 trace.init("rados write", &objecter->trace_endpoint, info);
1006
1007 c->io = this;
1008 queue_aio_write(c);
1009
1010 Objecter::Op *o = objecter->prepare_write_op(
1011 oid, oloc,
1012 off, len, snapc, bl, ut, 0,
1013 oncomplete, &c->objver, nullptr, 0, &trace);
1014 objecter->op_submit(o, &c->tid);
1015
1016 return 0;
1017 }
1018
1019 int librados::IoCtxImpl::aio_append(const object_t &oid, AioCompletionImpl *c,
1020 const bufferlist& bl, size_t len)
1021 {
1022 FUNCTRACE();
1023 auto ut = ceph::real_clock::now();
1024
1025 if (len > UINT_MAX/2)
1026 return -E2BIG;
1027 /* can't write to a snapshot */
1028 if (snap_seq != CEPH_NOSNAP)
1029 return -EROFS;
1030
1031 Context *oncomplete = new C_aio_Complete(c);
1032 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1033 ((C_aio_Complete *) oncomplete)->oid = oid;
1034 #endif
1035
1036 c->io = this;
1037 queue_aio_write(c);
1038
1039 Objecter::Op *o = objecter->prepare_append_op(
1040 oid, oloc,
1041 len, snapc, bl, ut, 0,
1042 oncomplete, &c->objver);
1043 objecter->op_submit(o, &c->tid);
1044
1045 return 0;
1046 }
1047
1048 int librados::IoCtxImpl::aio_write_full(const object_t &oid,
1049 AioCompletionImpl *c,
1050 const bufferlist& bl)
1051 {
1052 FUNCTRACE();
1053 auto ut = ceph::real_clock::now();
1054
1055 if (bl.length() > UINT_MAX/2)
1056 return -E2BIG;
1057 /* can't write to a snapshot */
1058 if (snap_seq != CEPH_NOSNAP)
1059 return -EROFS;
1060
1061 Context *oncomplete = new C_aio_Complete(c);
1062 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1063 ((C_aio_Complete *) oncomplete)->oid = oid;
1064 #endif
1065
1066 c->io = this;
1067 queue_aio_write(c);
1068
1069 Objecter::Op *o = objecter->prepare_write_full_op(
1070 oid, oloc,
1071 snapc, bl, ut, 0,
1072 oncomplete, &c->objver);
1073 objecter->op_submit(o, &c->tid);
1074
1075 return 0;
1076 }
1077
1078 int librados::IoCtxImpl::aio_writesame(const object_t &oid,
1079 AioCompletionImpl *c,
1080 const bufferlist& bl,
1081 size_t write_len,
1082 uint64_t off)
1083 {
1084 FUNCTRACE();
1085 auto ut = ceph::real_clock::now();
1086
1087 if ((bl.length() > UINT_MAX/2) || (write_len > UINT_MAX/2))
1088 return -E2BIG;
1089 if ((bl.length() == 0) || (write_len % bl.length()))
1090 return -EINVAL;
1091 /* can't write to a snapshot */
1092 if (snap_seq != CEPH_NOSNAP)
1093 return -EROFS;
1094
1095 Context *oncomplete = new C_aio_Complete(c);
1096
1097 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1098 ((C_aio_Complete *) oncomplete)->oid = oid;
1099 #endif
1100 c->io = this;
1101 queue_aio_write(c);
1102
1103 Objecter::Op *o = objecter->prepare_writesame_op(
1104 oid, oloc,
1105 write_len, off,
1106 snapc, bl, ut, 0,
1107 oncomplete, &c->objver);
1108 objecter->op_submit(o, &c->tid);
1109
1110 return 0;
1111 }
1112
1113 int librados::IoCtxImpl::aio_remove(const object_t &oid, AioCompletionImpl *c, int flags)
1114 {
1115 FUNCTRACE();
1116 auto ut = ceph::real_clock::now();
1117
1118 /* can't write to a snapshot */
1119 if (snap_seq != CEPH_NOSNAP)
1120 return -EROFS;
1121
1122 Context *oncomplete = new C_aio_Complete(c);
1123
1124 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1125 ((C_aio_Complete *) oncomplete)->oid = oid;
1126 #endif
1127 c->io = this;
1128 queue_aio_write(c);
1129
1130 Objecter::Op *o = objecter->prepare_remove_op(
1131 oid, oloc,
1132 snapc, ut, flags,
1133 oncomplete, &c->objver);
1134 objecter->op_submit(o, &c->tid);
1135
1136 return 0;
1137 }
1138
1139
1140 int librados::IoCtxImpl::aio_stat(const object_t& oid, AioCompletionImpl *c,
1141 uint64_t *psize, time_t *pmtime)
1142 {
1143 C_aio_stat_Ack *onack = new C_aio_stat_Ack(c, pmtime);
1144 c->is_read = true;
1145 c->io = this;
1146 Objecter::Op *o = objecter->prepare_stat_op(
1147 oid, oloc,
1148 snap_seq, psize, &onack->mtime, 0,
1149 onack, &c->objver);
1150 objecter->op_submit(o, &c->tid);
1151 return 0;
1152 }
1153
1154 int librados::IoCtxImpl::aio_stat2(const object_t& oid, AioCompletionImpl *c,
1155 uint64_t *psize, struct timespec *pts)
1156 {
1157 C_aio_stat2_Ack *onack = new C_aio_stat2_Ack(c, pts);
1158 c->is_read = true;
1159 c->io = this;
1160 Objecter::Op *o = objecter->prepare_stat_op(
1161 oid, oloc,
1162 snap_seq, psize, &onack->mtime, 0,
1163 onack, &c->objver);
1164 objecter->op_submit(o, &c->tid);
1165 return 0;
1166 }
1167
1168 int librados::IoCtxImpl::aio_getxattr(const object_t& oid, AioCompletionImpl *c,
1169 const char *name, bufferlist& bl)
1170 {
1171 ::ObjectOperation rd;
1172 prepare_assert_ops(&rd);
1173 rd.getxattr(name, &bl, NULL);
1174 int r = aio_operate_read(oid, &rd, c, 0, &bl);
1175 return r;
1176 }
1177
1178 int librados::IoCtxImpl::aio_rmxattr(const object_t& oid, AioCompletionImpl *c,
1179 const char *name)
1180 {
1181 ::ObjectOperation op;
1182 prepare_assert_ops(&op);
1183 op.rmxattr(name);
1184 return aio_operate(oid, &op, c, snapc, 0);
1185 }
1186
1187 int librados::IoCtxImpl::aio_setxattr(const object_t& oid, AioCompletionImpl *c,
1188 const char *name, bufferlist& bl)
1189 {
1190 ::ObjectOperation op;
1191 prepare_assert_ops(&op);
1192 op.setxattr(name, bl);
1193 return aio_operate(oid, &op, c, snapc, 0);
1194 }
1195
1196 namespace {
1197 struct AioGetxattrsData {
1198 AioGetxattrsData(librados::AioCompletionImpl *c, map<string, bufferlist>* attrset,
1199 librados::RadosClient *_client) :
1200 user_completion(c), user_attrset(attrset), client(_client) {}
1201 struct librados::C_AioCompleteAndSafe user_completion;
1202 map<string, bufferlist> result_attrset;
1203 map<std::string, bufferlist>* user_attrset;
1204 librados::RadosClient *client;
1205 };
1206 }
1207
1208 static void aio_getxattrs_complete(rados_completion_t c, void *arg) {
1209 AioGetxattrsData *cdata = reinterpret_cast<AioGetxattrsData*>(arg);
1210 int rc = rados_aio_get_return_value(c);
1211 cdata->user_attrset->clear();
1212 if (rc >= 0) {
1213 for (map<string,bufferlist>::iterator p = cdata->result_attrset.begin();
1214 p != cdata->result_attrset.end();
1215 ++p) {
1216 ldout(cdata->client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1217 (*cdata->user_attrset)[p->first] = p->second;
1218 }
1219 }
1220 cdata->user_completion.finish(rc);
1221 ((librados::AioCompletionImpl*)c)->put();
1222 delete cdata;
1223 }
1224
1225 int librados::IoCtxImpl::aio_getxattrs(const object_t& oid, AioCompletionImpl *c,
1226 map<std::string, bufferlist>& attrset)
1227 {
1228 AioGetxattrsData *cdata = new AioGetxattrsData(c, &attrset, client);
1229 ::ObjectOperation rd;
1230 prepare_assert_ops(&rd);
1231 rd.getxattrs(&cdata->result_attrset, NULL);
1232 librados::AioCompletionImpl *comp = new librados::AioCompletionImpl;
1233 comp->set_complete_callback(cdata, aio_getxattrs_complete);
1234 return aio_operate_read(oid, &rd, comp, 0, NULL);
1235 }
1236
1237 int librados::IoCtxImpl::aio_cancel(AioCompletionImpl *c)
1238 {
1239 return objecter->op_cancel(c->tid, -ECANCELED);
1240 }
1241
1242
1243 int librados::IoCtxImpl::hit_set_list(uint32_t hash, AioCompletionImpl *c,
1244 std::list< std::pair<time_t, time_t> > *pls)
1245 {
1246 Context *oncomplete = new C_aio_Complete(c);
1247 c->is_read = true;
1248 c->io = this;
1249
1250 ::ObjectOperation rd;
1251 rd.hit_set_ls(pls, NULL);
1252 object_locator_t oloc(poolid);
1253 Objecter::Op *o = objecter->prepare_pg_read_op(
1254 hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
1255 objecter->op_submit(o, &c->tid);
1256 return 0;
1257 }
1258
1259 int librados::IoCtxImpl::hit_set_get(uint32_t hash, AioCompletionImpl *c,
1260 time_t stamp,
1261 bufferlist *pbl)
1262 {
1263 Context *oncomplete = new C_aio_Complete(c);
1264 c->is_read = true;
1265 c->io = this;
1266
1267 ::ObjectOperation rd;
1268 rd.hit_set_get(ceph::real_clock::from_time_t(stamp), pbl, 0);
1269 object_locator_t oloc(poolid);
1270 Objecter::Op *o = objecter->prepare_pg_read_op(
1271 hash, oloc, rd, NULL, 0, oncomplete, NULL, NULL);
1272 objecter->op_submit(o, &c->tid);
1273 return 0;
1274 }
1275
1276 int librados::IoCtxImpl::remove(const object_t& oid)
1277 {
1278 ::ObjectOperation op;
1279 prepare_assert_ops(&op);
1280 op.remove();
1281 return operate(oid, &op, nullptr, librados::OPERATION_FULL_FORCE);
1282 }
1283
1284 int librados::IoCtxImpl::remove(const object_t& oid, int flags)
1285 {
1286 ::ObjectOperation op;
1287 prepare_assert_ops(&op);
1288 op.remove();
1289 return operate(oid, &op, NULL, flags);
1290 }
1291
1292 int librados::IoCtxImpl::trunc(const object_t& oid, uint64_t size)
1293 {
1294 ::ObjectOperation op;
1295 prepare_assert_ops(&op);
1296 op.truncate(size);
1297 return operate(oid, &op, NULL);
1298 }
1299
1300 int librados::IoCtxImpl::get_inconsistent_objects(const pg_t& pg,
1301 const librados::object_id_t& start_after,
1302 uint64_t max_to_get,
1303 AioCompletionImpl *c,
1304 std::vector<inconsistent_obj_t>* objects,
1305 uint32_t* interval)
1306 {
1307 Context *oncomplete = new C_aio_Complete(c);
1308 c->is_read = true;
1309 c->io = this;
1310
1311 ::ObjectOperation op;
1312 op.scrub_ls(start_after, max_to_get, objects, interval, nullptr);
1313 object_locator_t oloc{poolid, pg.ps()};
1314 Objecter::Op *o = objecter->prepare_pg_read_op(
1315 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
1316 nullptr, nullptr);
1317 objecter->op_submit(o, &c->tid);
1318 return 0;
1319 }
1320
1321 int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t& pg,
1322 const librados::object_id_t& start_after,
1323 uint64_t max_to_get,
1324 AioCompletionImpl *c,
1325 std::vector<inconsistent_snapset_t>* snapsets,
1326 uint32_t* interval)
1327 {
1328 Context *oncomplete = new C_aio_Complete(c);
1329 c->is_read = true;
1330 c->io = this;
1331
1332 ::ObjectOperation op;
1333 op.scrub_ls(start_after, max_to_get, snapsets, interval, nullptr);
1334 object_locator_t oloc{poolid, pg.ps()};
1335 Objecter::Op *o = objecter->prepare_pg_read_op(
1336 oloc.hash, oloc, op, nullptr, CEPH_OSD_FLAG_PGOP, oncomplete,
1337 nullptr, nullptr);
1338 objecter->op_submit(o, &c->tid);
1339 return 0;
1340 }
1341
1342 int librados::IoCtxImpl::tmap_update(const object_t& oid, bufferlist& cmdbl)
1343 {
1344 ::ObjectOperation wr;
1345 prepare_assert_ops(&wr);
1346 wr.tmap_update(cmdbl);
1347 return operate(oid, &wr, NULL);
1348 }
1349
1350 int librados::IoCtxImpl::tmap_put(const object_t& oid, bufferlist& bl)
1351 {
1352 ::ObjectOperation wr;
1353 prepare_assert_ops(&wr);
1354 wr.tmap_put(bl);
1355 return operate(oid, &wr, NULL);
1356 }
1357
1358 int librados::IoCtxImpl::tmap_get(const object_t& oid, bufferlist& bl)
1359 {
1360 ::ObjectOperation rd;
1361 prepare_assert_ops(&rd);
1362 rd.tmap_get(&bl, NULL);
1363 return operate_read(oid, &rd, NULL);
1364 }
1365
1366 int librados::IoCtxImpl::tmap_to_omap(const object_t& oid, bool nullok)
1367 {
1368 ::ObjectOperation wr;
1369 prepare_assert_ops(&wr);
1370 wr.tmap_to_omap(nullok);
1371 return operate(oid, &wr, NULL);
1372 }
1373
1374 int librados::IoCtxImpl::exec(const object_t& oid,
1375 const char *cls, const char *method,
1376 bufferlist& inbl, bufferlist& outbl)
1377 {
1378 ::ObjectOperation rd;
1379 prepare_assert_ops(&rd);
1380 rd.call(cls, method, inbl);
1381 return operate_read(oid, &rd, &outbl);
1382 }
1383
1384 int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1385 const char *cls, const char *method,
1386 bufferlist& inbl, bufferlist *outbl)
1387 {
1388 FUNCTRACE();
1389 Context *oncomplete = new C_aio_Complete(c);
1390
1391 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1392 ((C_aio_Complete *) oncomplete)->oid = oid;
1393 #endif
1394 c->is_read = true;
1395 c->io = this;
1396
1397 ::ObjectOperation rd;
1398 prepare_assert_ops(&rd);
1399 rd.call(cls, method, inbl);
1400 Objecter::Op *o = objecter->prepare_read_op(
1401 oid, oloc, rd, snap_seq, outbl, 0, oncomplete, &c->objver);
1402 objecter->op_submit(o, &c->tid);
1403 return 0;
1404 }
1405
1406 int librados::IoCtxImpl::aio_exec(const object_t& oid, AioCompletionImpl *c,
1407 const char *cls, const char *method,
1408 bufferlist& inbl, char *buf, size_t out_len)
1409 {
1410 FUNCTRACE();
1411 Context *oncomplete = new C_aio_Complete(c);
1412
1413 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1414 ((C_aio_Complete *) oncomplete)->oid = oid;
1415 #endif
1416 c->is_read = true;
1417 c->io = this;
1418 c->bl.clear();
1419 c->bl.push_back(buffer::create_static(out_len, buf));
1420 c->blp = &c->bl;
1421 c->out_buf = buf;
1422
1423 ::ObjectOperation rd;
1424 prepare_assert_ops(&rd);
1425 rd.call(cls, method, inbl);
1426 Objecter::Op *o = objecter->prepare_read_op(
1427 oid, oloc, rd, snap_seq, &c->bl, 0, oncomplete, &c->objver);
1428 objecter->op_submit(o, &c->tid);
1429 return 0;
1430 }
1431
1432 int librados::IoCtxImpl::read(const object_t& oid,
1433 bufferlist& bl, size_t len, uint64_t off)
1434 {
1435 if (len > (size_t) INT_MAX)
1436 return -EDOM;
1437 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_READ_OP_BEGIN");
1438
1439 ::ObjectOperation rd;
1440 prepare_assert_ops(&rd);
1441 rd.read(off, len, &bl, NULL, NULL);
1442 int r = operate_read(oid, &rd, &bl);
1443 if (r < 0)
1444 return r;
1445
1446 if (bl.length() < len) {
1447 ldout(client->cct, 10) << "Returned length " << bl.length()
1448 << " less than original length "<< len << dendl;
1449 }
1450
1451 return bl.length();
1452 }
1453
1454 int librados::IoCtxImpl::cmpext(const object_t& oid, uint64_t off,
1455 bufferlist& cmp_bl)
1456 {
1457 if (cmp_bl.length() > UINT_MAX/2)
1458 return -E2BIG;
1459
1460 ::ObjectOperation op;
1461 prepare_assert_ops(&op);
1462 op.cmpext(off, cmp_bl, NULL);
1463 return operate_read(oid, &op, NULL);
1464 }
1465
1466 int librados::IoCtxImpl::mapext(const object_t& oid,
1467 uint64_t off, size_t len,
1468 std::map<uint64_t,uint64_t>& m)
1469 {
1470 bufferlist bl;
1471
1472 Mutex mylock("IoCtxImpl::read::mylock");
1473 Cond cond;
1474 bool done;
1475 int r;
1476 Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
1477
1478 objecter->mapext(oid, oloc,
1479 off, len, snap_seq, &bl, 0,
1480 onack);
1481
1482 mylock.Lock();
1483 while (!done)
1484 cond.Wait(mylock);
1485 mylock.Unlock();
1486 ldout(client->cct, 10) << "Objecter returned from read r=" << r << dendl;
1487
1488 if (r < 0)
1489 return r;
1490
1491 bufferlist::iterator iter = bl.begin();
1492 ::decode(m, iter);
1493
1494 return m.size();
1495 }
1496
1497 int librados::IoCtxImpl::sparse_read(const object_t& oid,
1498 std::map<uint64_t,uint64_t>& m,
1499 bufferlist& data_bl, size_t len,
1500 uint64_t off)
1501 {
1502 if (len > (size_t) INT_MAX)
1503 return -EDOM;
1504
1505 ::ObjectOperation rd;
1506 prepare_assert_ops(&rd);
1507 rd.sparse_read(off, len, &m, &data_bl, NULL);
1508
1509 int r = operate_read(oid, &rd, NULL);
1510 if (r < 0)
1511 return r;
1512
1513 return m.size();
1514 }
1515
1516 int librados::IoCtxImpl::checksum(const object_t& oid, uint8_t type,
1517 const bufferlist &init_value, size_t len,
1518 uint64_t off, size_t chunk_size,
1519 bufferlist *pbl)
1520 {
1521 if (len > (size_t) INT_MAX) {
1522 return -EDOM;
1523 }
1524
1525 ::ObjectOperation rd;
1526 prepare_assert_ops(&rd);
1527 rd.checksum(type, init_value, off, len, chunk_size, pbl, nullptr, nullptr);
1528
1529 int r = operate_read(oid, &rd, nullptr);
1530 if (r < 0) {
1531 return r;
1532 }
1533
1534 return 0;
1535 }
1536
1537 int librados::IoCtxImpl::stat(const object_t& oid, uint64_t *psize, time_t *pmtime)
1538 {
1539 uint64_t size;
1540 real_time mtime;
1541
1542 if (!psize)
1543 psize = &size;
1544
1545 ::ObjectOperation rd;
1546 prepare_assert_ops(&rd);
1547 rd.stat(psize, &mtime, NULL);
1548 int r = operate_read(oid, &rd, NULL);
1549
1550 if (r >= 0 && pmtime) {
1551 *pmtime = real_clock::to_time_t(mtime);
1552 }
1553
1554 return r;
1555 }
1556
1557 int librados::IoCtxImpl::stat2(const object_t& oid, uint64_t *psize, struct timespec *pts)
1558 {
1559 uint64_t size;
1560 ceph::real_time mtime;
1561
1562 if (!psize)
1563 psize = &size;
1564
1565 ::ObjectOperation rd;
1566 prepare_assert_ops(&rd);
1567 rd.stat(psize, &mtime, NULL);
1568 int r = operate_read(oid, &rd, NULL);
1569 if (r < 0) {
1570 return r;
1571 }
1572
1573 if (pts) {
1574 *pts = ceph::real_clock::to_timespec(mtime);
1575 }
1576
1577 return 0;
1578 }
1579
1580 int librados::IoCtxImpl::getxattr(const object_t& oid,
1581 const char *name, bufferlist& bl)
1582 {
1583 ::ObjectOperation rd;
1584 prepare_assert_ops(&rd);
1585 rd.getxattr(name, &bl, NULL);
1586 int r = operate_read(oid, &rd, &bl);
1587 if (r < 0)
1588 return r;
1589
1590 return bl.length();
1591 }
1592
1593 int librados::IoCtxImpl::rmxattr(const object_t& oid, const char *name)
1594 {
1595 ::ObjectOperation op;
1596 prepare_assert_ops(&op);
1597 op.rmxattr(name);
1598 return operate(oid, &op, NULL);
1599 }
1600
1601 int librados::IoCtxImpl::setxattr(const object_t& oid,
1602 const char *name, bufferlist& bl)
1603 {
1604 ::ObjectOperation op;
1605 prepare_assert_ops(&op);
1606 op.setxattr(name, bl);
1607 return operate(oid, &op, NULL);
1608 }
1609
1610 int librados::IoCtxImpl::getxattrs(const object_t& oid,
1611 map<std::string, bufferlist>& attrset)
1612 {
1613 map<string, bufferlist> aset;
1614
1615 ::ObjectOperation rd;
1616 prepare_assert_ops(&rd);
1617 rd.getxattrs(&aset, NULL);
1618 int r = operate_read(oid, &rd, NULL);
1619
1620 attrset.clear();
1621 if (r >= 0) {
1622 for (map<string,bufferlist>::iterator p = aset.begin(); p != aset.end(); ++p) {
1623 ldout(client->cct, 10) << "IoCtxImpl::getxattrs: xattr=" << p->first << dendl;
1624 attrset[p->first.c_str()] = p->second;
1625 }
1626 }
1627
1628 return r;
1629 }
1630
1631 void librados::IoCtxImpl::set_sync_op_version(version_t ver)
1632 {
1633 ANNOTATE_BENIGN_RACE_SIZED(&last_objver, sizeof(last_objver),
1634 "IoCtxImpl last_objver");
1635 last_objver = ver;
1636 }
1637
1638 struct WatchInfo : public Objecter::WatchContext {
1639 librados::IoCtxImpl *ioctx;
1640 object_t oid;
1641 librados::WatchCtx *ctx;
1642 librados::WatchCtx2 *ctx2;
1643 bool internal = false;
1644
1645 WatchInfo(librados::IoCtxImpl *io, object_t o,
1646 librados::WatchCtx *c, librados::WatchCtx2 *c2,
1647 bool inter)
1648 : ioctx(io), oid(o), ctx(c), ctx2(c2), internal(inter) {
1649 ioctx->get();
1650 }
1651 ~WatchInfo() override {
1652 ioctx->put();
1653 if (internal) {
1654 delete ctx;
1655 delete ctx2;
1656 }
1657 }
1658
1659 void handle_notify(uint64_t notify_id,
1660 uint64_t cookie,
1661 uint64_t notifier_id,
1662 bufferlist& bl) override {
1663 ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
1664 << " cookie " << cookie
1665 << " notifier_id " << notifier_id
1666 << " len " << bl.length()
1667 << dendl;
1668
1669 if (ctx2)
1670 ctx2->handle_notify(notify_id, cookie, notifier_id, bl);
1671 if (ctx) {
1672 ctx->notify(0, 0, bl);
1673
1674 // send ACK back to OSD if using legacy protocol
1675 bufferlist empty;
1676 ioctx->notify_ack(oid, notify_id, cookie, empty);
1677 }
1678 }
1679 void handle_error(uint64_t cookie, int err) override {
1680 ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
1681 << " err " << err
1682 << dendl;
1683 if (ctx2)
1684 ctx2->handle_error(cookie, err);
1685 }
1686 };
1687
1688 int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1689 librados::WatchCtx *ctx,
1690 librados::WatchCtx2 *ctx2,
1691 bool internal)
1692 {
1693 return watch(oid, handle, ctx, ctx2, 0, internal);
1694 }
1695
1696 int librados::IoCtxImpl::watch(const object_t& oid, uint64_t *handle,
1697 librados::WatchCtx *ctx,
1698 librados::WatchCtx2 *ctx2,
1699 uint32_t timeout,
1700 bool internal)
1701 {
1702 ::ObjectOperation wr;
1703 version_t objver;
1704 C_SaferCond onfinish;
1705
1706 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1707 *handle = linger_op->get_cookie();
1708 linger_op->watch_context = new WatchInfo(this,
1709 oid, ctx, ctx2, internal);
1710
1711 prepare_assert_ops(&wr);
1712 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1713 bufferlist bl;
1714 objecter->linger_watch(linger_op, wr,
1715 snapc, ceph::real_clock::now(), bl,
1716 &onfinish,
1717 &objver);
1718
1719 int r = onfinish.wait();
1720
1721 set_sync_op_version(objver);
1722
1723 if (r < 0) {
1724 objecter->linger_cancel(linger_op);
1725 *handle = 0;
1726 }
1727
1728 return r;
1729 }
1730
1731 int librados::IoCtxImpl::aio_watch(const object_t& oid,
1732 AioCompletionImpl *c,
1733 uint64_t *handle,
1734 librados::WatchCtx *ctx,
1735 librados::WatchCtx2 *ctx2,
1736 bool internal) {
1737 return aio_watch(oid, c, handle, ctx, ctx2, 0, internal);
1738 }
1739
1740 int librados::IoCtxImpl::aio_watch(const object_t& oid,
1741 AioCompletionImpl *c,
1742 uint64_t *handle,
1743 librados::WatchCtx *ctx,
1744 librados::WatchCtx2 *ctx2,
1745 uint32_t timeout,
1746 bool internal)
1747 {
1748 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1749 c->io = this;
1750 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, false);
1751
1752 ::ObjectOperation wr;
1753 *handle = linger_op->get_cookie();
1754 linger_op->watch_context = new WatchInfo(this, oid, ctx, ctx2, internal);
1755
1756 prepare_assert_ops(&wr);
1757 wr.watch(*handle, CEPH_OSD_WATCH_OP_WATCH, timeout);
1758 bufferlist bl;
1759 objecter->linger_watch(linger_op, wr,
1760 snapc, ceph::real_clock::now(), bl,
1761 oncomplete, &c->objver);
1762
1763 return 0;
1764 }
1765
1766
1767 int librados::IoCtxImpl::notify_ack(
1768 const object_t& oid,
1769 uint64_t notify_id,
1770 uint64_t cookie,
1771 bufferlist& bl)
1772 {
1773 ::ObjectOperation rd;
1774 prepare_assert_ops(&rd);
1775 rd.notify_ack(notify_id, cookie, bl);
1776 objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
1777 return 0;
1778 }
1779
1780 int librados::IoCtxImpl::watch_check(uint64_t cookie)
1781 {
1782 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1783 return objecter->linger_check(linger_op);
1784 }
1785
1786 int librados::IoCtxImpl::unwatch(uint64_t cookie)
1787 {
1788 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1789 C_SaferCond onfinish;
1790 version_t ver = 0;
1791
1792 ::ObjectOperation wr;
1793 prepare_assert_ops(&wr);
1794 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1795 objecter->mutate(linger_op->target.base_oid, oloc, wr,
1796 snapc, ceph::real_clock::now(), 0,
1797 &onfinish, &ver);
1798 objecter->linger_cancel(linger_op);
1799
1800 int r = onfinish.wait();
1801 set_sync_op_version(ver);
1802 return r;
1803 }
1804
1805 int librados::IoCtxImpl::aio_unwatch(uint64_t cookie, AioCompletionImpl *c)
1806 {
1807 c->io = this;
1808 Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
1809 Context *oncomplete = new C_aio_linger_Complete(c, linger_op, true);
1810
1811 ::ObjectOperation wr;
1812 prepare_assert_ops(&wr);
1813 wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
1814 objecter->mutate(linger_op->target.base_oid, oloc, wr,
1815 snapc, ceph::real_clock::now(), 0,
1816 oncomplete, &c->objver);
1817 return 0;
1818 }
1819
1820 int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl,
1821 uint64_t timeout_ms,
1822 bufferlist *preply_bl,
1823 char **preply_buf, size_t *preply_buf_len)
1824 {
1825 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1826
1827 C_SaferCond notify_finish_cond;
1828 Context *notify_finish = new C_notify_Finish(client->cct, &notify_finish_cond,
1829 objecter, linger_op, preply_bl,
1830 preply_buf, preply_buf_len);
1831 (void) notify_finish;
1832
1833 uint32_t timeout = notify_timeout;
1834 if (timeout_ms)
1835 timeout = timeout_ms / 1000;
1836
1837 // Construct RADOS op
1838 ::ObjectOperation rd;
1839 prepare_assert_ops(&rd);
1840 bufferlist inbl;
1841 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1842
1843 // Issue RADOS op
1844 C_SaferCond onack;
1845 version_t objver;
1846 objecter->linger_notify(linger_op,
1847 rd, snap_seq, inbl, NULL,
1848 &onack, &objver);
1849
1850 ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
1851 int r = onack.wait();
1852 ldout(client->cct, 10) << __func__ << " linger op " << linger_op
1853 << " acked (" << r << ")" << dendl;
1854
1855 if (r == 0) {
1856 ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
1857 << linger_op << dendl;
1858 r = notify_finish_cond.wait();
1859
1860 } else {
1861 ldout(client->cct, 10) << __func__ << " failed to initiate notify, r = "
1862 << r << dendl;
1863 notify_finish_cond.wait();
1864 }
1865
1866 objecter->linger_cancel(linger_op);
1867
1868 set_sync_op_version(objver);
1869 return r;
1870 }
1871
1872 int librados::IoCtxImpl::aio_notify(const object_t& oid, AioCompletionImpl *c,
1873 bufferlist& bl, uint64_t timeout_ms,
1874 bufferlist *preply_bl, char **preply_buf,
1875 size_t *preply_buf_len)
1876 {
1877 Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
1878
1879 c->io = this;
1880
1881 C_aio_notify_Complete *oncomplete = new C_aio_notify_Complete(c, linger_op);
1882 C_notify_Finish *onnotify = new C_notify_Finish(client->cct, oncomplete,
1883 objecter, linger_op,
1884 preply_bl, preply_buf,
1885 preply_buf_len);
1886 Context *onack = new C_aio_notify_Ack(client->cct, onnotify, oncomplete);
1887
1888 uint32_t timeout = notify_timeout;
1889 if (timeout_ms)
1890 timeout = timeout_ms / 1000;
1891
1892 // Construct RADOS op
1893 ::ObjectOperation rd;
1894 prepare_assert_ops(&rd);
1895 bufferlist inbl;
1896 rd.notify(linger_op->get_cookie(), 1, timeout, bl, &inbl);
1897
1898 // Issue RADOS op
1899 objecter->linger_notify(linger_op,
1900 rd, snap_seq, inbl, NULL,
1901 onack, &c->objver);
1902 return 0;
1903 }
1904
1905 int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
1906 uint64_t expected_object_size,
1907 uint64_t expected_write_size,
1908 uint32_t flags)
1909 {
1910 ::ObjectOperation wr;
1911 prepare_assert_ops(&wr);
1912 wr.set_alloc_hint(expected_object_size, expected_write_size, flags);
1913 return operate(oid, &wr, NULL);
1914 }
1915
1916 version_t librados::IoCtxImpl::last_version()
1917 {
1918 return last_objver;
1919 }
1920
1921 void librados::IoCtxImpl::set_assert_version(uint64_t ver)
1922 {
1923 assert_ver = ver;
1924 }
1925
1926 void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout)
1927 {
1928 notify_timeout = timeout;
1929 }
1930
1931 int librados::IoCtxImpl::cache_pin(const object_t& oid)
1932 {
1933 ::ObjectOperation wr;
1934 prepare_assert_ops(&wr);
1935 wr.cache_pin();
1936 return operate(oid, &wr, NULL);
1937 }
1938
1939 int librados::IoCtxImpl::cache_unpin(const object_t& oid)
1940 {
1941 ::ObjectOperation wr;
1942 prepare_assert_ops(&wr);
1943 wr.cache_unpin();
1944 return operate(oid, &wr, NULL);
1945 }
1946
1947
1948 ///////////////////////////// C_aio_stat_Ack ////////////////////////////
1949
1950 librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl *_c,
1951 time_t *pm)
1952 : c(_c), pmtime(pm)
1953 {
1954 assert(!c->io);
1955 c->get();
1956 }
1957
1958 void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r)
1959 {
1960 c->lock.Lock();
1961 c->rval = r;
1962 c->complete = true;
1963 c->cond.Signal();
1964
1965 if (r >= 0 && pmtime) {
1966 *pmtime = real_clock::to_time_t(mtime);
1967 }
1968
1969 if (c->callback_complete) {
1970 c->io->client->finisher.queue(new C_AioComplete(c));
1971 }
1972
1973 c->put_unlock();
1974 }
1975
1976 ///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1977
1978 librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl *_c,
1979 struct timespec *pt)
1980 : c(_c), pts(pt)
1981 {
1982 assert(!c->io);
1983 c->get();
1984 }
1985
1986 void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r)
1987 {
1988 c->lock.Lock();
1989 c->rval = r;
1990 c->complete = true;
1991 c->cond.Signal();
1992
1993 if (r >= 0 && pts) {
1994 *pts = real_clock::to_timespec(mtime);
1995 }
1996
1997 if (c->callback_complete) {
1998 c->io->client->finisher.queue(new C_AioComplete(c));
1999 }
2000
2001 c->put_unlock();
2002 }
2003
2004 //////////////////////////// C_aio_Complete ////////////////////////////////
2005
2006 librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl *_c)
2007 : c(_c)
2008 {
2009 c->get();
2010 }
2011
2012 void librados::IoCtxImpl::C_aio_Complete::finish(int r)
2013 {
2014 c->lock.Lock();
2015 c->rval = r;
2016 c->complete = true;
2017 c->cond.Signal();
2018
2019 if (r == 0 && c->blp && c->blp->length() > 0) {
2020 if (c->out_buf && !c->blp->is_provided_buffer(c->out_buf))
2021 c->blp->copy(0, c->blp->length(), c->out_buf);
2022 c->rval = c->blp->length();
2023 }
2024
2025 if (c->callback_complete ||
2026 c->callback_safe) {
2027 c->io->client->finisher.queue(new C_AioComplete(c));
2028 }
2029
2030 if (c->aio_write_seq) {
2031 c->io->complete_aio_write(c);
2032 }
2033
2034 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
2035 OID_EVENT_TRACE(oid.name.c_str(), "RADOS_OP_COMPLETE");
2036 #endif
2037 c->put_unlock();
2038 }
2039
2040 void librados::IoCtxImpl::object_list_slice(
2041 const hobject_t start,
2042 const hobject_t finish,
2043 const size_t n,
2044 const size_t m,
2045 hobject_t *split_start,
2046 hobject_t *split_finish)
2047 {
2048 if (start.is_max()) {
2049 *split_start = hobject_t::get_max();
2050 *split_finish = hobject_t::get_max();
2051 return;
2052 }
2053
2054 uint64_t start_hash = hobject_t::_reverse_bits(start.get_hash());
2055 uint64_t finish_hash =
2056 finish.is_max() ? 0x100000000 :
2057 hobject_t::_reverse_bits(finish.get_hash());
2058
2059 uint64_t diff = finish_hash - start_hash;
2060 uint64_t rev_start = start_hash + (diff * n / m);
2061 uint64_t rev_finish = start_hash + (diff * (n + 1) / m);
2062 if (n == 0) {
2063 *split_start = start;
2064 } else {
2065 *split_start = hobject_t(
2066 object_t(), string(), CEPH_NOSNAP,
2067 hobject_t::_reverse_bits(rev_start), poolid, string());
2068 }
2069
2070 if (n == m - 1)
2071 *split_finish = finish;
2072 else if (rev_finish >= 0x100000000)
2073 *split_finish = hobject_t::get_max();
2074 else
2075 *split_finish = hobject_t(
2076 object_t(), string(), CEPH_NOSNAP,
2077 hobject_t::_reverse_bits(rev_finish), poolid, string());
2078 }
2079
2080 int librados::IoCtxImpl::application_enable(const std::string& app_name,
2081 bool force)
2082 {
2083 auto c = new PoolAsyncCompletionImpl();
2084 application_enable_async(app_name, force, c);
2085
2086 int r = c->wait();
2087 assert(r == 0);
2088
2089 r = c->get_return_value();
2090 c->release();
2091 if (r < 0) {
2092 return r;
2093 }
2094
2095 return client->wait_for_latest_osdmap();
2096 }
2097
2098 void librados::IoCtxImpl::application_enable_async(const std::string& app_name,
2099 bool force,
2100 PoolAsyncCompletionImpl *c)
2101 {
2102 // pre-Luminous clusters will return -EINVAL and application won't be
2103 // preserved until Luminous is configured as minimim version.
2104 if (!client->get_required_monitor_features().contains_all(
2105 ceph::features::mon::FEATURE_LUMINOUS)) {
2106 client->finisher.queue(new C_PoolAsync_Safe(c), -EOPNOTSUPP);
2107 return;
2108 }
2109
2110 std::stringstream cmd;
2111 cmd << "{"
2112 << "\"prefix\": \"osd pool application enable\","
2113 << "\"pool\": \"" << get_cached_pool_name() << "\","
2114 << "\"app\": \"" << app_name << "\"";
2115 if (force) {
2116 cmd << ",\"force\":\"--yes-i-really-mean-it\"";
2117 }
2118 cmd << "}";
2119
2120 std::vector<std::string> cmds;
2121 cmds.push_back(cmd.str());
2122 bufferlist inbl;
2123 client->mon_command_async(cmds, inbl, nullptr, nullptr,
2124 new C_PoolAsync_Safe(c));
2125 }
2126
2127 int librados::IoCtxImpl::application_list(std::set<std::string> *app_names)
2128 {
2129 int r = 0;
2130 app_names->clear();
2131 objecter->with_osdmap([&](const OSDMap& o) {
2132 auto pg_pool = o.get_pg_pool(poolid);
2133 if (pg_pool == nullptr) {
2134 r = -ENOENT;
2135 return;
2136 }
2137
2138 for (auto &pair : pg_pool->application_metadata) {
2139 app_names->insert(pair.first);
2140 }
2141 });
2142 return r;
2143 }
2144
2145 int librados::IoCtxImpl::application_metadata_get(const std::string& app_name,
2146 const std::string &key,
2147 std::string* value)
2148 {
2149 int r = 0;
2150 objecter->with_osdmap([&](const OSDMap& o) {
2151 auto pg_pool = o.get_pg_pool(poolid);
2152 if (pg_pool == nullptr) {
2153 r = -ENOENT;
2154 return;
2155 }
2156
2157 auto app_it = pg_pool->application_metadata.find(app_name);
2158 if (app_it == pg_pool->application_metadata.end()) {
2159 r = -ENOENT;
2160 return;
2161 }
2162
2163 auto it = app_it->second.find(key);
2164 if (it == app_it->second.end()) {
2165 r = -ENOENT;
2166 return;
2167 }
2168
2169 *value = it->second;
2170 });
2171 return r;
2172 }
2173
2174 int librados::IoCtxImpl::application_metadata_set(const std::string& app_name,
2175 const std::string &key,
2176 const std::string& value)
2177 {
2178 std::stringstream cmd;
2179 cmd << "{"
2180 << "\"prefix\":\"osd pool application set\","
2181 << "\"pool\":\"" << get_cached_pool_name() << "\","
2182 << "\"app\":\"" << app_name << "\","
2183 << "\"key\":\"" << key << "\","
2184 << "\"value\":\"" << value << "\""
2185 << "}";
2186
2187 std::vector<std::string> cmds;
2188 cmds.push_back(cmd.str());
2189 bufferlist inbl;
2190 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2191 if (r < 0) {
2192 return r;
2193 }
2194
2195 // ensure we have the latest osd map epoch before proceeding
2196 return client->wait_for_latest_osdmap();
2197 }
2198
2199 int librados::IoCtxImpl::application_metadata_remove(const std::string& app_name,
2200 const std::string &key)
2201 {
2202 std::stringstream cmd;
2203 cmd << "{"
2204 << "\"prefix\":\"osd pool application rm\","
2205 << "\"pool\":\"" << get_cached_pool_name() << "\","
2206 << "\"app\":\"" << app_name << "\","
2207 << "\"key\":\"" << key << "\""
2208 << "}";
2209
2210 std::vector<std::string> cmds;
2211 cmds.push_back(cmd.str());
2212 bufferlist inbl;
2213 int r = client->mon_command(cmds, inbl, nullptr, nullptr);
2214 if (r < 0) {
2215 return r;
2216 }
2217
2218 // ensure we have the latest osd map epoch before proceeding
2219 return client->wait_for_latest_osdmap();
2220 }
2221
2222 int librados::IoCtxImpl::application_metadata_list(const std::string& app_name,
2223 std::map<std::string, std::string> *values)
2224 {
2225 int r = 0;
2226 values->clear();
2227 objecter->with_osdmap([&](const OSDMap& o) {
2228 auto pg_pool = o.get_pg_pool(poolid);
2229 if (pg_pool == nullptr) {
2230 r = -ENOENT;
2231 return;
2232 }
2233
2234 auto it = pg_pool->application_metadata.find(app_name);
2235 if (it == pg_pool->application_metadata.end()) {
2236 r = -ENOENT;
2237 return;
2238 }
2239
2240 *values = it->second;
2241 });
2242 return r;
2243 }
2244