1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "IoCtxImpl.h"
19 #include "librados/AioCompletionImpl.h"
20 #include "librados/PoolAsyncCompletionImpl.h"
21 #include "librados/RadosClient.h"
22 #include "include/assert.h"
23 #include "common/valgrind.h"
24 #include "common/EventTrace.h"
26 #define dout_subsys ceph_subsys_rados
28 #define dout_prefix *_dout << "librados: "
33 struct C_notify_Finish
: public Context
{
37 Objecter::LingerOp
*linger_op
;
39 bufferlist
*preply_bl
;
41 size_t *preply_buf_len
;
43 C_notify_Finish(CephContext
*_cct
, Context
*_ctx
, Objecter
*_objecter
,
44 Objecter::LingerOp
*_linger_op
, bufferlist
*_preply_bl
,
45 char **_preply_buf
, size_t *_preply_buf_len
)
46 : cct(_cct
), ctx(_ctx
), objecter(_objecter
), linger_op(_linger_op
),
47 preply_bl(_preply_bl
), preply_buf(_preply_buf
),
48 preply_buf_len(_preply_buf_len
)
50 linger_op
->on_notify_finish
= this;
51 linger_op
->notify_result_bl
= &reply_bl
;
54 void finish(int r
) override
56 ldout(cct
, 10) << __func__
<< " completed notify (linger op "
57 << linger_op
<< "), r = " << r
<< dendl
;
59 // pass result back to user
60 // NOTE: we do this regardless of what error code we return
62 if (reply_bl
.length()) {
63 *preply_buf
= (char*)malloc(reply_bl
.length());
64 memcpy(*preply_buf
, reply_bl
.c_str(), reply_bl
.length());
70 *preply_buf_len
= reply_bl
.length();
72 preply_bl
->claim(reply_bl
);
78 struct C_aio_linger_cancel
: public Context
{
80 Objecter::LingerOp
*linger_op
;
82 C_aio_linger_cancel(Objecter
*_objecter
, Objecter::LingerOp
*_linger_op
)
83 : objecter(_objecter
), linger_op(_linger_op
)
87 void finish(int r
) override
89 objecter
->linger_cancel(linger_op
);
93 struct C_aio_linger_Complete
: public Context
{
95 Objecter::LingerOp
*linger_op
;
98 C_aio_linger_Complete(AioCompletionImpl
*_c
, Objecter::LingerOp
*_linger_op
, bool _cancel
)
99 : c(_c
), linger_op(_linger_op
), cancel(_cancel
)
104 void finish(int r
) override
{
106 c
->io
->client
->finisher
.queue(new C_aio_linger_cancel(c
->io
->objecter
,
114 if (c
->callback_complete
||
116 c
->io
->client
->finisher
.queue(new C_AioComplete(c
));
122 struct C_aio_notify_Complete
: public C_aio_linger_Complete
{
125 bool finished
= false;
128 C_aio_notify_Complete(AioCompletionImpl
*_c
, Objecter::LingerOp
*_linger_op
)
129 : C_aio_linger_Complete(_c
, _linger_op
, false),
130 lock("C_aio_notify_Complete::lock") {
133 void handle_ack(int r
) {
134 // invoked by C_aio_notify_Ack
140 void complete(int r
) override
{
141 // invoked by C_notify_Finish (or C_aio_notify_Ack on failure)
147 void complete_unlock(int r
) {
148 if (ret_val
== 0 && r
< 0) {
152 if (acked
&& finished
) {
155 C_aio_linger_Complete::complete(ret_val
);
162 struct C_aio_notify_Ack
: public Context
{
164 C_notify_Finish
*onfinish
;
165 C_aio_notify_Complete
*oncomplete
;
167 C_aio_notify_Ack(CephContext
*_cct
, C_notify_Finish
*_onfinish
,
168 C_aio_notify_Complete
*_oncomplete
)
169 : cct(_cct
), onfinish(_onfinish
), oncomplete(_oncomplete
)
173 void finish(int r
) override
175 ldout(cct
, 10) << __func__
<< " linger op " << oncomplete
->linger_op
<< " "
176 << "acked (" << r
<< ")" << dendl
;
177 oncomplete
->handle_ack(r
);
179 // on failure, we won't expect to see a notify_finish callback
180 onfinish
->complete(r
);
185 struct C_aio_selfmanaged_snap_op_Complete
: public Context
{
186 librados::RadosClient
*client
;
187 librados::AioCompletionImpl
*c
;
189 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient
*client
,
190 librados::AioCompletionImpl
*c
)
191 : client(client
), c(c
) {
195 void finish(int r
) override
{
201 if (c
->callback_complete
|| c
->callback_safe
) {
202 client
->finisher
.queue(new librados::C_AioComplete(c
));
208 struct C_aio_selfmanaged_snap_create_Complete
: public C_aio_selfmanaged_snap_op_Complete
{
210 uint64_t *dest_snapid
;
212 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient
*client
,
213 librados::AioCompletionImpl
*c
,
214 uint64_t *dest_snapid
)
215 : C_aio_selfmanaged_snap_op_Complete(client
, c
),
216 dest_snapid(dest_snapid
) {
219 void finish(int r
) override
{
221 *dest_snapid
= snapid
;
223 C_aio_selfmanaged_snap_op_Complete::finish(r
);
227 } // anonymous namespace
228 } // namespace librados
230 librados::IoCtxImpl::IoCtxImpl() :
231 ref_cnt(0), client(NULL
), poolid(0), assert_ver(0), last_objver(0),
232 notify_timeout(30), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
233 aio_write_seq(0), objecter(NULL
)
237 librados::IoCtxImpl::IoCtxImpl(RadosClient
*c
, Objecter
*objecter
,
238 int64_t poolid
, snapid_t s
)
239 : ref_cnt(0), client(c
), poolid(poolid
), snap_seq(s
),
240 assert_ver(0), last_objver(0),
241 notify_timeout(c
->cct
->_conf
->client_notify_timeout
),
242 oloc(poolid
), aio_write_list_lock("librados::IoCtxImpl::aio_write_list_lock"),
243 aio_write_seq(0), objecter(objecter
)
247 void librados::IoCtxImpl::set_snap_read(snapid_t s
)
251 ldout(client
->cct
, 10) << "set snap read " << snap_seq
<< " -> " << s
<< dendl
;
255 int librados::IoCtxImpl::set_snap_write_context(snapid_t seq
, vector
<snapid_t
>& snaps
)
258 ldout(client
->cct
, 10) << "set snap write context: seq = " << seq
259 << " and snaps = " << snaps
<< dendl
;
268 int librados::IoCtxImpl::get_object_hash_position(
269 const std::string
& oid
, uint32_t *hash_position
)
271 int64_t r
= objecter
->get_object_hash_position(poolid
, oid
, oloc
.nspace
);
274 *hash_position
= (uint32_t)r
;
278 int librados::IoCtxImpl::get_object_pg_hash_position(
279 const std::string
& oid
, uint32_t *pg_hash_position
)
281 int64_t r
= objecter
->get_object_pg_hash_position(poolid
, oid
, oloc
.nspace
);
284 *pg_hash_position
= (uint32_t)r
;
288 void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl
*c
)
291 aio_write_list_lock
.Lock();
292 assert(c
->io
== this);
293 c
->aio_write_seq
= ++aio_write_seq
;
294 ldout(client
->cct
, 20) << "queue_aio_write " << this << " completion " << c
295 << " write_seq " << aio_write_seq
<< dendl
;
296 aio_write_list
.push_back(&c
->aio_write_list_item
);
297 aio_write_list_lock
.Unlock();
300 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl
*c
)
302 ldout(client
->cct
, 20) << "complete_aio_write " << c
<< dendl
;
303 aio_write_list_lock
.Lock();
304 assert(c
->io
== this);
305 c
->aio_write_list_item
.remove_myself();
307 map
<ceph_tid_t
, std::list
<AioCompletionImpl
*> >::iterator waiters
= aio_write_waiters
.begin();
308 while (waiters
!= aio_write_waiters
.end()) {
309 if (!aio_write_list
.empty() &&
310 aio_write_list
.front()->aio_write_seq
<= waiters
->first
) {
311 ldout(client
->cct
, 20) << " next outstanding write is " << aio_write_list
.front()->aio_write_seq
312 << " <= waiter " << waiters
->first
313 << ", stopping" << dendl
;
316 ldout(client
->cct
, 20) << " waking waiters on seq " << waiters
->first
<< dendl
;
317 for (std::list
<AioCompletionImpl
*>::iterator it
= waiters
->second
.begin();
318 it
!= waiters
->second
.end(); ++it
) {
319 client
->finisher
.queue(new C_AioCompleteAndSafe(*it
));
322 aio_write_waiters
.erase(waiters
++);
325 aio_write_cond
.Signal();
326 aio_write_list_lock
.Unlock();
330 void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl
*c
)
332 ldout(client
->cct
, 20) << "flush_aio_writes_async " << this
333 << " completion " << c
<< dendl
;
334 Mutex::Locker
l(aio_write_list_lock
);
335 ceph_tid_t seq
= aio_write_seq
;
336 if (aio_write_list
.empty()) {
337 ldout(client
->cct
, 20) << "flush_aio_writes_async no writes. (tid "
338 << seq
<< ")" << dendl
;
339 client
->finisher
.queue(new C_AioCompleteAndSafe(c
));
341 ldout(client
->cct
, 20) << "flush_aio_writes_async " << aio_write_list
.size()
342 << " writes in flight; waiting on tid " << seq
<< dendl
;
344 aio_write_waiters
[seq
].push_back(c
);
348 void librados::IoCtxImpl::flush_aio_writes()
350 ldout(client
->cct
, 20) << "flush_aio_writes" << dendl
;
351 aio_write_list_lock
.Lock();
352 ceph_tid_t seq
= aio_write_seq
;
353 while (!aio_write_list
.empty() &&
354 aio_write_list
.front()->aio_write_seq
<= seq
)
355 aio_write_cond
.Wait(aio_write_list_lock
);
356 aio_write_list_lock
.Unlock();
359 string
librados::IoCtxImpl::get_cached_pool_name()
362 client
->pool_get_name(get_id(), &pn
);
368 int librados::IoCtxImpl::snap_create(const char *snapName
)
371 string
sName(snapName
);
373 Mutex
mylock ("IoCtxImpl::snap_create::mylock");
376 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &reply
);
377 reply
= objecter
->create_pool_snap(poolid
, sName
, onfinish
);
390 int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid
)
394 Mutex
mylock("IoCtxImpl::selfmanaged_snap_create::mylock");
397 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &reply
);
399 reply
= objecter
->allocate_selfmanaged_snap(poolid
, &snapid
, onfinish
);
414 void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid
,
415 AioCompletionImpl
*c
)
417 C_aio_selfmanaged_snap_create_Complete
*onfinish
=
418 new C_aio_selfmanaged_snap_create_Complete(client
, c
, snapid
);
419 int r
= objecter
->allocate_selfmanaged_snap(poolid
, &onfinish
->snapid
,
422 onfinish
->complete(r
);
426 int librados::IoCtxImpl::snap_remove(const char *snapName
)
429 string
sName(snapName
);
431 Mutex
mylock ("IoCtxImpl::snap_remove::mylock");
434 Context
*onfinish
= new C_SafeCond(&mylock
, &cond
, &done
, &reply
);
435 reply
= objecter
->delete_pool_snap(poolid
, sName
, onfinish
);
448 int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t
& oid
,
449 ::SnapContext
& snapc
,
454 Mutex
mylock("IoCtxImpl::snap_rollback::mylock");
457 Context
*onack
= new C_SafeCond(&mylock
, &cond
, &done
, &reply
);
459 ::ObjectOperation op
;
460 prepare_assert_ops(&op
);
462 objecter
->mutate(oid
, oloc
,
463 op
, snapc
, ceph::real_clock::now(), 0,
467 while (!done
) cond
.Wait(mylock
);
472 int librados::IoCtxImpl::rollback(const object_t
& oid
, const char *snapName
)
476 int r
= objecter
->pool_snap_by_name(poolid
, snapName
, &snap
);
481 return selfmanaged_snap_rollback_object(oid
, snapc
, snap
);
484 int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid
)
488 Mutex
mylock("IoCtxImpl::selfmanaged_snap_remove::mylock");
491 objecter
->delete_selfmanaged_snap(poolid
, snapid_t(snapid
),
492 new C_SafeCond(&mylock
, &cond
, &done
, &reply
));
495 while (!done
) cond
.Wait(mylock
);
500 void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid
,
501 AioCompletionImpl
*c
)
503 Context
*onfinish
= new C_aio_selfmanaged_snap_op_Complete(client
, c
);
504 objecter
->delete_selfmanaged_snap(poolid
, snapid
, onfinish
);
507 int librados::IoCtxImpl::pool_change_auid(unsigned long long auid
)
511 Mutex
mylock("IoCtxImpl::pool_change_auid::mylock");
514 objecter
->change_pool_auid(poolid
,
515 new C_SafeCond(&mylock
, &cond
, &done
, &reply
),
519 while (!done
) cond
.Wait(mylock
);
524 int librados::IoCtxImpl::pool_change_auid_async(unsigned long long auid
,
525 PoolAsyncCompletionImpl
*c
)
527 objecter
->change_pool_auid(poolid
,
528 new C_PoolAsync_Safe(c
),
533 int librados::IoCtxImpl::snap_list(vector
<uint64_t> *snaps
)
535 return objecter
->pool_snap_list(poolid
, snaps
);
538 int librados::IoCtxImpl::snap_lookup(const char *name
, uint64_t *snapid
)
540 return objecter
->pool_snap_by_name(poolid
, name
, (snapid_t
*)snapid
);
543 int librados::IoCtxImpl::snap_get_name(uint64_t snapid
, std::string
*s
)
545 pool_snap_info_t info
;
546 int ret
= objecter
->pool_snap_get_info(poolid
, snapid
, &info
);
550 *s
= info
.name
.c_str();
554 int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid
, time_t *t
)
556 pool_snap_info_t info
;
557 int ret
= objecter
->pool_snap_get_info(poolid
, snapid
, &info
);
561 *t
= info
.stamp
.sec();
568 int librados::IoCtxImpl::nlist(Objecter::NListContext
*context
, int max_entries
)
573 Mutex
mylock("IoCtxImpl::nlist::mylock");
575 if (context
->at_end())
578 context
->max_entries
= max_entries
;
579 context
->nspace
= oloc
.nspace
;
581 objecter
->list_nobjects(context
, new C_SafeCond(&mylock
, &cond
, &done
, &r
));
591 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext
*context
,
594 context
->list
.clear();
595 return objecter
->list_nobjects_seek(context
, pos
);
598 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext
*context
,
599 const rados_object_list_cursor
& cursor
)
601 context
->list
.clear();
602 return objecter
->list_nobjects_seek(context
, *(const hobject_t
*)cursor
);
605 rados_object_list_cursor
librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext
*context
)
607 hobject_t
*c
= new hobject_t
;
609 objecter
->list_nobjects_get_cursor(context
, c
);
610 return (rados_object_list_cursor
)c
;
613 int librados::IoCtxImpl::create(const object_t
& oid
, bool exclusive
)
615 ::ObjectOperation op
;
616 prepare_assert_ops(&op
);
617 op
.create(exclusive
);
618 return operate(oid
, &op
, NULL
);
622 * add any version assert operations that are appropriate given the
623 * stat in the IoCtx, either the target version assert or any src
624 * object asserts. these affect a single ioctx operation, so clear
625 * the ioctx state when we're doing.
627 * return a pointer to the ObjectOperation if we added any events;
628 * this is convenient for passing the extra_ops argument into Objecter
631 ::ObjectOperation
*librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation
*op
)
633 ::ObjectOperation
*pop
= NULL
;
635 op
->assert_version(assert_ver
);
642 int librados::IoCtxImpl::write(const object_t
& oid
, bufferlist
& bl
,
643 size_t len
, uint64_t off
)
645 if (len
> UINT_MAX
/2)
647 ::ObjectOperation op
;
648 prepare_assert_ops(&op
);
650 mybl
.substr_of(bl
, 0, len
);
652 return operate(oid
, &op
, NULL
);
655 int librados::IoCtxImpl::append(const object_t
& oid
, bufferlist
& bl
, size_t len
)
657 if (len
> UINT_MAX
/2)
659 ::ObjectOperation op
;
660 prepare_assert_ops(&op
);
662 mybl
.substr_of(bl
, 0, len
);
664 return operate(oid
, &op
, NULL
);
667 int librados::IoCtxImpl::write_full(const object_t
& oid
, bufferlist
& bl
)
669 if (bl
.length() > UINT_MAX
/2)
671 ::ObjectOperation op
;
672 prepare_assert_ops(&op
);
674 return operate(oid
, &op
, NULL
);
677 int librados::IoCtxImpl::writesame(const object_t
& oid
, bufferlist
& bl
,
678 size_t write_len
, uint64_t off
)
680 if ((bl
.length() > UINT_MAX
/2) || (write_len
> UINT_MAX
/2))
682 if ((bl
.length() == 0) || (write_len
% bl
.length()))
684 ::ObjectOperation op
;
685 prepare_assert_ops(&op
);
687 mybl
.substr_of(bl
, 0, bl
.length());
688 op
.writesame(off
, write_len
, mybl
);
689 return operate(oid
, &op
, NULL
);
692 int librados::IoCtxImpl::operate(const object_t
& oid
, ::ObjectOperation
*o
,
693 ceph::real_time
*pmtime
, int flags
)
695 ceph::real_time ut
= (pmtime
? *pmtime
:
696 ceph::real_clock::now());
698 /* can't write to a snapshot */
699 if (snap_seq
!= CEPH_NOSNAP
)
705 Mutex
mylock("IoCtxImpl::operate::mylock");
711 Context
*oncommit
= new C_SafeCond(&mylock
, &cond
, &done
, &r
);
713 int op
= o
->ops
[0].op
.op
;
714 ldout(client
->cct
, 10) << ceph_osd_op_name(op
) << " oid=" << oid
715 << " nspace=" << oloc
.nspace
<< dendl
;
716 Objecter::Op
*objecter_op
= objecter
->prepare_mutate_op(oid
, oloc
,
717 *o
, snapc
, ut
, flags
,
719 objecter
->op_submit(objecter_op
);
725 ldout(client
->cct
, 10) << "Objecter returned from "
726 << ceph_osd_op_name(op
) << " r=" << r
<< dendl
;
728 set_sync_op_version(ver
);
733 int librados::IoCtxImpl::operate_read(const object_t
& oid
,
734 ::ObjectOperation
*o
,
741 Mutex
mylock("IoCtxImpl::operate_read::mylock");
747 Context
*onack
= new C_SafeCond(&mylock
, &cond
, &done
, &r
);
749 int op
= o
->ops
[0].op
.op
;
750 ldout(client
->cct
, 10) << ceph_osd_op_name(op
) << " oid=" << oid
<< " nspace=" << oloc
.nspace
<< dendl
;
751 Objecter::Op
*objecter_op
= objecter
->prepare_read_op(oid
, oloc
,
752 *o
, snap_seq
, pbl
, flags
,
754 objecter
->op_submit(objecter_op
);
760 ldout(client
->cct
, 10) << "Objecter returned from "
761 << ceph_osd_op_name(op
) << " r=" << r
<< dendl
;
763 set_sync_op_version(ver
);
768 int librados::IoCtxImpl::aio_operate_read(const object_t
&oid
,
769 ::ObjectOperation
*o
,
770 AioCompletionImpl
*c
,
773 const blkin_trace_info
*trace_info
)
776 Context
*oncomplete
= new C_aio_Complete(c
);
778 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
779 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
784 ZTracer::Trace trace
;
786 ZTracer::Trace
parent_trace("", nullptr, trace_info
);
787 trace
.init("rados operate read", &objecter
->trace_endpoint
, &parent_trace
);
790 trace
.event("init root span");
791 Objecter::Op
*objecter_op
= objecter
->prepare_read_op(oid
, oloc
,
792 *o
, snap_seq
, pbl
, flags
,
793 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
794 objecter
->op_submit(objecter_op
, &c
->tid
);
795 trace
.event("rados operate read submitted");
800 int librados::IoCtxImpl::aio_operate(const object_t
& oid
,
801 ::ObjectOperation
*o
, AioCompletionImpl
*c
,
802 const SnapContext
& snap_context
, int flags
,
803 const blkin_trace_info
*trace_info
)
806 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_WRITE_OP_BEGIN");
807 auto ut
= ceph::real_clock::now();
808 /* can't write to a snapshot */
809 if (snap_seq
!= CEPH_NOSNAP
)
812 Context
*oncomplete
= new C_aio_Complete(c
);
813 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
814 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
820 ZTracer::Trace trace
;
822 ZTracer::Trace
parent_trace("", nullptr, trace_info
);
823 trace
.init("rados operate", &objecter
->trace_endpoint
, &parent_trace
);
826 trace
.event("init root span");
827 Objecter::Op
*op
= objecter
->prepare_mutate_op(
828 oid
, oloc
, *o
, snap_context
, ut
, flags
,
829 oncomplete
, &c
->objver
, osd_reqid_t(), &trace
);
830 objecter
->op_submit(op
, &c
->tid
);
831 trace
.event("rados operate op submitted");
836 int librados::IoCtxImpl::aio_read(const object_t oid
, AioCompletionImpl
*c
,
837 bufferlist
*pbl
, size_t len
, uint64_t off
,
838 uint64_t snapid
, const blkin_trace_info
*info
)
841 if (len
> (size_t) INT_MAX
)
844 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
845 Context
*oncomplete
= new C_aio_Complete(c
);
847 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
848 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
854 ZTracer::Trace trace
;
856 trace
.init("rados read", &objecter
->trace_endpoint
, info
);
858 Objecter::Op
*o
= objecter
->prepare_read_op(
860 off
, len
, snapid
, pbl
, 0,
861 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
862 objecter
->op_submit(o
, &c
->tid
);
866 int librados::IoCtxImpl::aio_read(const object_t oid
, AioCompletionImpl
*c
,
867 char *buf
, size_t len
, uint64_t off
,
868 uint64_t snapid
, const blkin_trace_info
*info
)
871 if (len
> (size_t) INT_MAX
)
874 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
875 Context
*oncomplete
= new C_aio_Complete(c
);
877 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
878 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
883 c
->bl
.push_back(buffer::create_static(len
, buf
));
887 ZTracer::Trace trace
;
889 trace
.init("rados read", &objecter
->trace_endpoint
, info
);
891 Objecter::Op
*o
= objecter
->prepare_read_op(
893 off
, len
, snapid
, &c
->bl
, 0,
894 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
895 objecter
->op_submit(o
, &c
->tid
);
899 class C_ObjectOperation
: public Context
{
901 ::ObjectOperation m_ops
;
902 explicit C_ObjectOperation(Context
*c
) : m_ctx(c
) {}
903 void finish(int r
) override
{
910 int librados::IoCtxImpl::aio_sparse_read(const object_t oid
,
911 AioCompletionImpl
*c
,
912 std::map
<uint64_t,uint64_t> *m
,
913 bufferlist
*data_bl
, size_t len
,
914 uint64_t off
, uint64_t snapid
)
917 if (len
> (size_t) INT_MAX
)
920 Context
*nested
= new C_aio_Complete(c
);
921 C_ObjectOperation
*onack
= new C_ObjectOperation(nested
);
923 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
924 ((C_aio_Complete
*) nested
)->oid
= oid
;
929 onack
->m_ops
.sparse_read(off
, len
, m
, data_bl
, NULL
);
931 Objecter::Op
*o
= objecter
->prepare_read_op(
933 onack
->m_ops
, snapid
, NULL
, 0,
935 objecter
->op_submit(o
, &c
->tid
);
939 int librados::IoCtxImpl::aio_cmpext(const object_t
& oid
,
940 AioCompletionImpl
*c
,
944 if (cmp_bl
.length() > UINT_MAX
/2)
947 Context
*onack
= new C_aio_Complete(c
);
952 Objecter::Op
*o
= objecter
->prepare_cmpext_op(
953 oid
, oloc
, off
, cmp_bl
, snap_seq
, 0,
955 objecter
->op_submit(o
, &c
->tid
);
960 /* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
961 int librados::IoCtxImpl::aio_cmpext(const object_t
& oid
,
962 AioCompletionImpl
*c
,
967 if (cmp_len
> UINT_MAX
/2)
971 cmp_bl
.append(cmp_buf
, cmp_len
);
973 Context
*nested
= new C_aio_Complete(c
);
974 C_ObjectOperation
*onack
= new C_ObjectOperation(nested
);
979 onack
->m_ops
.cmpext(off
, cmp_len
, cmp_buf
, NULL
);
981 Objecter::Op
*o
= objecter
->prepare_read_op(
982 oid
, oloc
, onack
->m_ops
, snap_seq
, NULL
, 0, onack
, &c
->objver
);
983 objecter
->op_submit(o
, &c
->tid
);
987 int librados::IoCtxImpl::aio_write(const object_t
&oid
, AioCompletionImpl
*c
,
988 const bufferlist
& bl
, size_t len
,
989 uint64_t off
, const blkin_trace_info
*info
)
992 auto ut
= ceph::real_clock::now();
993 ldout(client
->cct
, 20) << "aio_write " << oid
<< " " << off
<< "~" << len
<< " snapc=" << snapc
<< " snap_seq=" << snap_seq
<< dendl
;
994 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_WRITE_OP_BEGIN");
996 if (len
> UINT_MAX
/2)
998 /* can't write to a snapshot */
999 if (snap_seq
!= CEPH_NOSNAP
)
1002 Context
*oncomplete
= new C_aio_Complete(c
);
1004 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1005 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1007 ZTracer::Trace trace
;
1009 trace
.init("rados write", &objecter
->trace_endpoint
, info
);
1014 Objecter::Op
*o
= objecter
->prepare_write_op(
1016 off
, len
, snapc
, bl
, ut
, 0,
1017 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
1018 objecter
->op_submit(o
, &c
->tid
);
1023 int librados::IoCtxImpl::aio_append(const object_t
&oid
, AioCompletionImpl
*c
,
1024 const bufferlist
& bl
, size_t len
)
1027 auto ut
= ceph::real_clock::now();
1029 if (len
> UINT_MAX
/2)
1031 /* can't write to a snapshot */
1032 if (snap_seq
!= CEPH_NOSNAP
)
1035 Context
*oncomplete
= new C_aio_Complete(c
);
1036 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1037 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1043 Objecter::Op
*o
= objecter
->prepare_append_op(
1045 len
, snapc
, bl
, ut
, 0,
1046 oncomplete
, &c
->objver
);
1047 objecter
->op_submit(o
, &c
->tid
);
1052 int librados::IoCtxImpl::aio_write_full(const object_t
&oid
,
1053 AioCompletionImpl
*c
,
1054 const bufferlist
& bl
)
1057 auto ut
= ceph::real_clock::now();
1059 if (bl
.length() > UINT_MAX
/2)
1061 /* can't write to a snapshot */
1062 if (snap_seq
!= CEPH_NOSNAP
)
1065 Context
*oncomplete
= new C_aio_Complete(c
);
1066 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1067 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1073 Objecter::Op
*o
= objecter
->prepare_write_full_op(
1076 oncomplete
, &c
->objver
);
1077 objecter
->op_submit(o
, &c
->tid
);
1082 int librados::IoCtxImpl::aio_writesame(const object_t
&oid
,
1083 AioCompletionImpl
*c
,
1084 const bufferlist
& bl
,
1089 auto ut
= ceph::real_clock::now();
1091 if ((bl
.length() > UINT_MAX
/2) || (write_len
> UINT_MAX
/2))
1093 if ((bl
.length() == 0) || (write_len
% bl
.length()))
1095 /* can't write to a snapshot */
1096 if (snap_seq
!= CEPH_NOSNAP
)
1099 Context
*oncomplete
= new C_aio_Complete(c
);
1101 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1102 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1107 Objecter::Op
*o
= objecter
->prepare_writesame_op(
1111 oncomplete
, &c
->objver
);
1112 objecter
->op_submit(o
, &c
->tid
);
1117 int librados::IoCtxImpl::aio_remove(const object_t
&oid
, AioCompletionImpl
*c
, int flags
)
1120 auto ut
= ceph::real_clock::now();
1122 /* can't write to a snapshot */
1123 if (snap_seq
!= CEPH_NOSNAP
)
1126 Context
*oncomplete
= new C_aio_Complete(c
);
1128 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1129 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1134 Objecter::Op
*o
= objecter
->prepare_remove_op(
1137 oncomplete
, &c
->objver
);
1138 objecter
->op_submit(o
, &c
->tid
);
1144 int librados::IoCtxImpl::aio_stat(const object_t
& oid
, AioCompletionImpl
*c
,
1145 uint64_t *psize
, time_t *pmtime
)
1147 C_aio_stat_Ack
*onack
= new C_aio_stat_Ack(c
, pmtime
);
1150 Objecter::Op
*o
= objecter
->prepare_stat_op(
1152 snap_seq
, psize
, &onack
->mtime
, 0,
1154 objecter
->op_submit(o
, &c
->tid
);
1158 int librados::IoCtxImpl::aio_stat2(const object_t
& oid
, AioCompletionImpl
*c
,
1159 uint64_t *psize
, struct timespec
*pts
)
1161 C_aio_stat2_Ack
*onack
= new C_aio_stat2_Ack(c
, pts
);
1164 Objecter::Op
*o
= objecter
->prepare_stat_op(
1166 snap_seq
, psize
, &onack
->mtime
, 0,
1168 objecter
->op_submit(o
, &c
->tid
);
1172 int librados::IoCtxImpl::aio_getxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1173 const char *name
, bufferlist
& bl
)
1175 ::ObjectOperation rd
;
1176 prepare_assert_ops(&rd
);
1177 rd
.getxattr(name
, &bl
, NULL
);
1178 int r
= aio_operate_read(oid
, &rd
, c
, 0, &bl
);
1182 int librados::IoCtxImpl::aio_rmxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1185 ::ObjectOperation op
;
1186 prepare_assert_ops(&op
);
1188 return aio_operate(oid
, &op
, c
, snapc
, 0);
1191 int librados::IoCtxImpl::aio_setxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1192 const char *name
, bufferlist
& bl
)
1194 ::ObjectOperation op
;
1195 prepare_assert_ops(&op
);
1196 op
.setxattr(name
, bl
);
1197 return aio_operate(oid
, &op
, c
, snapc
, 0);
1201 struct AioGetxattrsData
{
1202 AioGetxattrsData(librados::AioCompletionImpl
*c
, map
<string
, bufferlist
>* attrset
,
1203 librados::RadosClient
*_client
) :
1204 user_completion(c
), user_attrset(attrset
), client(_client
) {}
1205 struct librados::C_AioCompleteAndSafe user_completion
;
1206 map
<string
, bufferlist
> result_attrset
;
1207 map
<std::string
, bufferlist
>* user_attrset
;
1208 librados::RadosClient
*client
;
1212 static void aio_getxattrs_complete(rados_completion_t c
, void *arg
) {
1213 AioGetxattrsData
*cdata
= reinterpret_cast<AioGetxattrsData
*>(arg
);
1214 int rc
= rados_aio_get_return_value(c
);
1215 cdata
->user_attrset
->clear();
1217 for (map
<string
,bufferlist
>::iterator p
= cdata
->result_attrset
.begin();
1218 p
!= cdata
->result_attrset
.end();
1220 ldout(cdata
->client
->cct
, 10) << "IoCtxImpl::getxattrs: xattr=" << p
->first
<< dendl
;
1221 (*cdata
->user_attrset
)[p
->first
] = p
->second
;
1224 cdata
->user_completion
.finish(rc
);
1225 ((librados::AioCompletionImpl
*)c
)->put();
1229 int librados::IoCtxImpl::aio_getxattrs(const object_t
& oid
, AioCompletionImpl
*c
,
1230 map
<std::string
, bufferlist
>& attrset
)
1232 AioGetxattrsData
*cdata
= new AioGetxattrsData(c
, &attrset
, client
);
1233 ::ObjectOperation rd
;
1234 prepare_assert_ops(&rd
);
1235 rd
.getxattrs(&cdata
->result_attrset
, NULL
);
1236 librados::AioCompletionImpl
*comp
= new librados::AioCompletionImpl
;
1237 comp
->set_complete_callback(cdata
, aio_getxattrs_complete
);
1238 return aio_operate_read(oid
, &rd
, comp
, 0, NULL
);
1241 int librados::IoCtxImpl::aio_cancel(AioCompletionImpl
*c
)
1243 return objecter
->op_cancel(c
->tid
, -ECANCELED
);
1247 int librados::IoCtxImpl::hit_set_list(uint32_t hash
, AioCompletionImpl
*c
,
1248 std::list
< std::pair
<time_t, time_t> > *pls
)
1250 Context
*oncomplete
= new C_aio_Complete(c
);
1254 ::ObjectOperation rd
;
1255 rd
.hit_set_ls(pls
, NULL
);
1256 object_locator_t
oloc(poolid
);
1257 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1258 hash
, oloc
, rd
, NULL
, 0, oncomplete
, NULL
, NULL
);
1259 objecter
->op_submit(o
, &c
->tid
);
1263 int librados::IoCtxImpl::hit_set_get(uint32_t hash
, AioCompletionImpl
*c
,
1267 Context
*oncomplete
= new C_aio_Complete(c
);
1271 ::ObjectOperation rd
;
1272 rd
.hit_set_get(ceph::real_clock::from_time_t(stamp
), pbl
, 0);
1273 object_locator_t
oloc(poolid
);
1274 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1275 hash
, oloc
, rd
, NULL
, 0, oncomplete
, NULL
, NULL
);
1276 objecter
->op_submit(o
, &c
->tid
);
1280 int librados::IoCtxImpl::remove(const object_t
& oid
)
1282 ::ObjectOperation op
;
1283 prepare_assert_ops(&op
);
1285 return operate(oid
, &op
, nullptr, librados::OPERATION_FULL_FORCE
);
1288 int librados::IoCtxImpl::remove(const object_t
& oid
, int flags
)
1290 ::ObjectOperation op
;
1291 prepare_assert_ops(&op
);
1293 return operate(oid
, &op
, NULL
, flags
);
1296 int librados::IoCtxImpl::trunc(const object_t
& oid
, uint64_t size
)
1298 ::ObjectOperation op
;
1299 prepare_assert_ops(&op
);
1301 return operate(oid
, &op
, NULL
);
1304 int librados::IoCtxImpl::get_inconsistent_objects(const pg_t
& pg
,
1305 const librados::object_id_t
& start_after
,
1306 uint64_t max_to_get
,
1307 AioCompletionImpl
*c
,
1308 std::vector
<inconsistent_obj_t
>* objects
,
1311 Context
*oncomplete
= new C_aio_Complete(c
);
1315 ::ObjectOperation op
;
1316 op
.scrub_ls(start_after
, max_to_get
, objects
, interval
, nullptr);
1317 object_locator_t oloc
{poolid
, pg
.ps()};
1318 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1319 oloc
.hash
, oloc
, op
, nullptr, CEPH_OSD_FLAG_PGOP
, oncomplete
,
1321 objecter
->op_submit(o
, &c
->tid
);
1325 int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t
& pg
,
1326 const librados::object_id_t
& start_after
,
1327 uint64_t max_to_get
,
1328 AioCompletionImpl
*c
,
1329 std::vector
<inconsistent_snapset_t
>* snapsets
,
1332 Context
*oncomplete
= new C_aio_Complete(c
);
1336 ::ObjectOperation op
;
1337 op
.scrub_ls(start_after
, max_to_get
, snapsets
, interval
, nullptr);
1338 object_locator_t oloc
{poolid
, pg
.ps()};
1339 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1340 oloc
.hash
, oloc
, op
, nullptr, CEPH_OSD_FLAG_PGOP
, oncomplete
,
1342 objecter
->op_submit(o
, &c
->tid
);
1346 int librados::IoCtxImpl::tmap_update(const object_t
& oid
, bufferlist
& cmdbl
)
1348 ::ObjectOperation wr
;
1349 prepare_assert_ops(&wr
);
1350 wr
.tmap_update(cmdbl
);
1351 return operate(oid
, &wr
, NULL
);
1354 int librados::IoCtxImpl::tmap_put(const object_t
& oid
, bufferlist
& bl
)
1356 ::ObjectOperation wr
;
1357 prepare_assert_ops(&wr
);
1359 return operate(oid
, &wr
, NULL
);
1362 int librados::IoCtxImpl::tmap_get(const object_t
& oid
, bufferlist
& bl
)
1364 ::ObjectOperation rd
;
1365 prepare_assert_ops(&rd
);
1366 rd
.tmap_get(&bl
, NULL
);
1367 return operate_read(oid
, &rd
, NULL
);
1370 int librados::IoCtxImpl::tmap_to_omap(const object_t
& oid
, bool nullok
)
1372 ::ObjectOperation wr
;
1373 prepare_assert_ops(&wr
);
1374 wr
.tmap_to_omap(nullok
);
1375 return operate(oid
, &wr
, NULL
);
1378 int librados::IoCtxImpl::exec(const object_t
& oid
,
1379 const char *cls
, const char *method
,
1380 bufferlist
& inbl
, bufferlist
& outbl
)
1382 ::ObjectOperation rd
;
1383 prepare_assert_ops(&rd
);
1384 rd
.call(cls
, method
, inbl
);
1385 return operate_read(oid
, &rd
, &outbl
);
1388 int librados::IoCtxImpl::aio_exec(const object_t
& oid
, AioCompletionImpl
*c
,
1389 const char *cls
, const char *method
,
1390 bufferlist
& inbl
, bufferlist
*outbl
)
1393 Context
*oncomplete
= new C_aio_Complete(c
);
1395 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1396 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1401 ::ObjectOperation rd
;
1402 prepare_assert_ops(&rd
);
1403 rd
.call(cls
, method
, inbl
);
1404 Objecter::Op
*o
= objecter
->prepare_read_op(
1405 oid
, oloc
, rd
, snap_seq
, outbl
, 0, oncomplete
, &c
->objver
);
1406 objecter
->op_submit(o
, &c
->tid
);
1410 int librados::IoCtxImpl::aio_exec(const object_t
& oid
, AioCompletionImpl
*c
,
1411 const char *cls
, const char *method
,
1412 bufferlist
& inbl
, char *buf
, size_t out_len
)
1415 Context
*oncomplete
= new C_aio_Complete(c
);
1417 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1418 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1423 c
->bl
.push_back(buffer::create_static(out_len
, buf
));
1427 ::ObjectOperation rd
;
1428 prepare_assert_ops(&rd
);
1429 rd
.call(cls
, method
, inbl
);
1430 Objecter::Op
*o
= objecter
->prepare_read_op(
1431 oid
, oloc
, rd
, snap_seq
, &c
->bl
, 0, oncomplete
, &c
->objver
);
1432 objecter
->op_submit(o
, &c
->tid
);
1436 int librados::IoCtxImpl::read(const object_t
& oid
,
1437 bufferlist
& bl
, size_t len
, uint64_t off
)
1439 if (len
> (size_t) INT_MAX
)
1441 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
1443 ::ObjectOperation rd
;
1444 prepare_assert_ops(&rd
);
1445 rd
.read(off
, len
, &bl
, NULL
, NULL
);
1446 int r
= operate_read(oid
, &rd
, &bl
);
1450 if (bl
.length() < len
) {
1451 ldout(client
->cct
, 10) << "Returned length " << bl
.length()
1452 << " less than original length "<< len
<< dendl
;
1458 int librados::IoCtxImpl::cmpext(const object_t
& oid
, uint64_t off
,
1461 if (cmp_bl
.length() > UINT_MAX
/2)
1464 ::ObjectOperation op
;
1465 prepare_assert_ops(&op
);
1466 op
.cmpext(off
, cmp_bl
, NULL
);
1467 return operate_read(oid
, &op
, NULL
);
1470 int librados::IoCtxImpl::mapext(const object_t
& oid
,
1471 uint64_t off
, size_t len
,
1472 std::map
<uint64_t,uint64_t>& m
)
1476 Mutex
mylock("IoCtxImpl::read::mylock");
1480 Context
*onack
= new C_SafeCond(&mylock
, &cond
, &done
, &r
);
1482 objecter
->mapext(oid
, oloc
,
1483 off
, len
, snap_seq
, &bl
, 0,
1490 ldout(client
->cct
, 10) << "Objecter returned from read r=" << r
<< dendl
;
1495 bufferlist::iterator iter
= bl
.begin();
1501 int librados::IoCtxImpl::sparse_read(const object_t
& oid
,
1502 std::map
<uint64_t,uint64_t>& m
,
1503 bufferlist
& data_bl
, size_t len
,
1506 if (len
> (size_t) INT_MAX
)
1509 ::ObjectOperation rd
;
1510 prepare_assert_ops(&rd
);
1511 rd
.sparse_read(off
, len
, &m
, &data_bl
, NULL
);
1513 int r
= operate_read(oid
, &rd
, NULL
);
1520 int librados::IoCtxImpl::checksum(const object_t
& oid
, uint8_t type
,
1521 const bufferlist
&init_value
, size_t len
,
1522 uint64_t off
, size_t chunk_size
,
1525 if (len
> (size_t) INT_MAX
) {
1529 ::ObjectOperation rd
;
1530 prepare_assert_ops(&rd
);
1531 rd
.checksum(type
, init_value
, off
, len
, chunk_size
, pbl
, nullptr, nullptr);
1533 int r
= operate_read(oid
, &rd
, nullptr);
1541 int librados::IoCtxImpl::stat(const object_t
& oid
, uint64_t *psize
, time_t *pmtime
)
1549 ::ObjectOperation rd
;
1550 prepare_assert_ops(&rd
);
1551 rd
.stat(psize
, &mtime
, NULL
);
1552 int r
= operate_read(oid
, &rd
, NULL
);
1554 if (r
>= 0 && pmtime
) {
1555 *pmtime
= real_clock::to_time_t(mtime
);
1561 int librados::IoCtxImpl::stat2(const object_t
& oid
, uint64_t *psize
, struct timespec
*pts
)
1564 ceph::real_time mtime
;
1569 ::ObjectOperation rd
;
1570 prepare_assert_ops(&rd
);
1571 rd
.stat(psize
, &mtime
, NULL
);
1572 int r
= operate_read(oid
, &rd
, NULL
);
1578 *pts
= ceph::real_clock::to_timespec(mtime
);
1584 int librados::IoCtxImpl::getxattr(const object_t
& oid
,
1585 const char *name
, bufferlist
& bl
)
1587 ::ObjectOperation rd
;
1588 prepare_assert_ops(&rd
);
1589 rd
.getxattr(name
, &bl
, NULL
);
1590 int r
= operate_read(oid
, &rd
, &bl
);
1597 int librados::IoCtxImpl::rmxattr(const object_t
& oid
, const char *name
)
1599 ::ObjectOperation op
;
1600 prepare_assert_ops(&op
);
1602 return operate(oid
, &op
, NULL
);
1605 int librados::IoCtxImpl::setxattr(const object_t
& oid
,
1606 const char *name
, bufferlist
& bl
)
1608 ::ObjectOperation op
;
1609 prepare_assert_ops(&op
);
1610 op
.setxattr(name
, bl
);
1611 return operate(oid
, &op
, NULL
);
1614 int librados::IoCtxImpl::getxattrs(const object_t
& oid
,
1615 map
<std::string
, bufferlist
>& attrset
)
1617 map
<string
, bufferlist
> aset
;
1619 ::ObjectOperation rd
;
1620 prepare_assert_ops(&rd
);
1621 rd
.getxattrs(&aset
, NULL
);
1622 int r
= operate_read(oid
, &rd
, NULL
);
1626 for (map
<string
,bufferlist
>::iterator p
= aset
.begin(); p
!= aset
.end(); ++p
) {
1627 ldout(client
->cct
, 10) << "IoCtxImpl::getxattrs: xattr=" << p
->first
<< dendl
;
1628 attrset
[p
->first
.c_str()] = p
->second
;
1635 void librados::IoCtxImpl::set_sync_op_version(version_t ver
)
1637 ANNOTATE_BENIGN_RACE_SIZED(&last_objver
, sizeof(last_objver
),
1638 "IoCtxImpl last_objver");
1642 struct WatchInfo
: public Objecter::WatchContext
{
1643 librados::IoCtxImpl
*ioctx
;
1645 librados::WatchCtx
*ctx
;
1646 librados::WatchCtx2
*ctx2
;
1647 bool internal
= false;
1649 WatchInfo(librados::IoCtxImpl
*io
, object_t o
,
1650 librados::WatchCtx
*c
, librados::WatchCtx2
*c2
,
1652 : ioctx(io
), oid(o
), ctx(c
), ctx2(c2
), internal(inter
) {
1655 ~WatchInfo() override
{
1663 void handle_notify(uint64_t notify_id
,
1665 uint64_t notifier_id
,
1666 bufferlist
& bl
) override
{
1667 ldout(ioctx
->client
->cct
, 10) << __func__
<< " " << notify_id
1668 << " cookie " << cookie
1669 << " notifier_id " << notifier_id
1670 << " len " << bl
.length()
1674 ctx2
->handle_notify(notify_id
, cookie
, notifier_id
, bl
);
1676 ctx
->notify(0, 0, bl
);
1678 // send ACK back to OSD if using legacy protocol
1680 ioctx
->notify_ack(oid
, notify_id
, cookie
, empty
);
1683 void handle_error(uint64_t cookie
, int err
) override
{
1684 ldout(ioctx
->client
->cct
, 10) << __func__
<< " cookie " << cookie
1688 ctx2
->handle_error(cookie
, err
);
1692 int librados::IoCtxImpl::watch(const object_t
& oid
, uint64_t *handle
,
1693 librados::WatchCtx
*ctx
,
1694 librados::WatchCtx2
*ctx2
,
1697 return watch(oid
, handle
, ctx
, ctx2
, 0, internal
);
1700 int librados::IoCtxImpl::watch(const object_t
& oid
, uint64_t *handle
,
1701 librados::WatchCtx
*ctx
,
1702 librados::WatchCtx2
*ctx2
,
1706 ::ObjectOperation wr
;
1708 C_SaferCond onfinish
;
1710 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1711 *handle
= linger_op
->get_cookie();
1712 linger_op
->watch_context
= new WatchInfo(this,
1713 oid
, ctx
, ctx2
, internal
);
1715 prepare_assert_ops(&wr
);
1716 wr
.watch(*handle
, CEPH_OSD_WATCH_OP_WATCH
, timeout
);
1718 objecter
->linger_watch(linger_op
, wr
,
1719 snapc
, ceph::real_clock::now(), bl
,
1723 int r
= onfinish
.wait();
1725 set_sync_op_version(objver
);
1728 objecter
->linger_cancel(linger_op
);
1735 int librados::IoCtxImpl::aio_watch(const object_t
& oid
,
1736 AioCompletionImpl
*c
,
1738 librados::WatchCtx
*ctx
,
1739 librados::WatchCtx2
*ctx2
,
1741 return aio_watch(oid
, c
, handle
, ctx
, ctx2
, 0, internal
);
1744 int librados::IoCtxImpl::aio_watch(const object_t
& oid
,
1745 AioCompletionImpl
*c
,
1747 librados::WatchCtx
*ctx
,
1748 librados::WatchCtx2
*ctx2
,
1752 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1754 Context
*oncomplete
= new C_aio_linger_Complete(c
, linger_op
, false);
1756 ::ObjectOperation wr
;
1757 *handle
= linger_op
->get_cookie();
1758 linger_op
->watch_context
= new WatchInfo(this, oid
, ctx
, ctx2
, internal
);
1760 prepare_assert_ops(&wr
);
1761 wr
.watch(*handle
, CEPH_OSD_WATCH_OP_WATCH
, timeout
);
1763 objecter
->linger_watch(linger_op
, wr
,
1764 snapc
, ceph::real_clock::now(), bl
,
1765 oncomplete
, &c
->objver
);
1771 int librados::IoCtxImpl::notify_ack(
1772 const object_t
& oid
,
1777 ::ObjectOperation rd
;
1778 prepare_assert_ops(&rd
);
1779 rd
.notify_ack(notify_id
, cookie
, bl
);
1780 objecter
->read(oid
, oloc
, rd
, snap_seq
, (bufferlist
*)NULL
, 0, 0, 0);
1784 int librados::IoCtxImpl::watch_check(uint64_t cookie
)
1786 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1787 return objecter
->linger_check(linger_op
);
1790 int librados::IoCtxImpl::unwatch(uint64_t cookie
)
1792 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1793 C_SaferCond onfinish
;
1796 ::ObjectOperation wr
;
1797 prepare_assert_ops(&wr
);
1798 wr
.watch(cookie
, CEPH_OSD_WATCH_OP_UNWATCH
);
1799 objecter
->mutate(linger_op
->target
.base_oid
, oloc
, wr
,
1800 snapc
, ceph::real_clock::now(), 0,
1802 objecter
->linger_cancel(linger_op
);
1804 int r
= onfinish
.wait();
1805 set_sync_op_version(ver
);
1809 int librados::IoCtxImpl::aio_unwatch(uint64_t cookie
, AioCompletionImpl
*c
)
1812 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1813 Context
*oncomplete
= new C_aio_linger_Complete(c
, linger_op
, true);
1815 ::ObjectOperation wr
;
1816 prepare_assert_ops(&wr
);
1817 wr
.watch(cookie
, CEPH_OSD_WATCH_OP_UNWATCH
);
1818 objecter
->mutate(linger_op
->target
.base_oid
, oloc
, wr
,
1819 snapc
, ceph::real_clock::now(), 0,
1820 oncomplete
, &c
->objver
);
1824 int librados::IoCtxImpl::notify(const object_t
& oid
, bufferlist
& bl
,
1825 uint64_t timeout_ms
,
1826 bufferlist
*preply_bl
,
1827 char **preply_buf
, size_t *preply_buf_len
)
1829 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1831 C_SaferCond notify_finish_cond
;
1832 Context
*notify_finish
= new C_notify_Finish(client
->cct
, ¬ify_finish_cond
,
1833 objecter
, linger_op
, preply_bl
,
1834 preply_buf
, preply_buf_len
);
1836 uint32_t timeout
= notify_timeout
;
1838 timeout
= timeout_ms
/ 1000;
1840 // Construct RADOS op
1841 ::ObjectOperation rd
;
1842 prepare_assert_ops(&rd
);
1844 rd
.notify(linger_op
->get_cookie(), 1, timeout
, bl
, &inbl
);
1849 objecter
->linger_notify(linger_op
,
1850 rd
, snap_seq
, inbl
, NULL
,
1853 ldout(client
->cct
, 10) << __func__
<< " issued linger op " << linger_op
<< dendl
;
1854 int r
= onack
.wait();
1855 ldout(client
->cct
, 10) << __func__
<< " linger op " << linger_op
1856 << " acked (" << r
<< ")" << dendl
;
1859 ldout(client
->cct
, 10) << __func__
<< " waiting for watch_notify finish "
1860 << linger_op
<< dendl
;
1861 r
= notify_finish_cond
.wait();
1864 ldout(client
->cct
, 10) << __func__
<< " failed to initiate notify, r = "
1866 notify_finish
->complete(r
);
1869 objecter
->linger_cancel(linger_op
);
1871 set_sync_op_version(objver
);
1875 int librados::IoCtxImpl::aio_notify(const object_t
& oid
, AioCompletionImpl
*c
,
1876 bufferlist
& bl
, uint64_t timeout_ms
,
1877 bufferlist
*preply_bl
, char **preply_buf
,
1878 size_t *preply_buf_len
)
1880 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1884 C_aio_notify_Complete
*oncomplete
= new C_aio_notify_Complete(c
, linger_op
);
1885 C_notify_Finish
*onnotify
= new C_notify_Finish(client
->cct
, oncomplete
,
1886 objecter
, linger_op
,
1887 preply_bl
, preply_buf
,
1889 Context
*onack
= new C_aio_notify_Ack(client
->cct
, onnotify
, oncomplete
);
1891 uint32_t timeout
= notify_timeout
;
1893 timeout
= timeout_ms
/ 1000;
1895 // Construct RADOS op
1896 ::ObjectOperation rd
;
1897 prepare_assert_ops(&rd
);
1899 rd
.notify(linger_op
->get_cookie(), 1, timeout
, bl
, &inbl
);
1902 objecter
->linger_notify(linger_op
,
1903 rd
, snap_seq
, inbl
, NULL
,
1908 int librados::IoCtxImpl::set_alloc_hint(const object_t
& oid
,
1909 uint64_t expected_object_size
,
1910 uint64_t expected_write_size
,
1913 ::ObjectOperation wr
;
1914 prepare_assert_ops(&wr
);
1915 wr
.set_alloc_hint(expected_object_size
, expected_write_size
, flags
);
1916 return operate(oid
, &wr
, NULL
);
1919 version_t
librados::IoCtxImpl::last_version()
1924 void librados::IoCtxImpl::set_assert_version(uint64_t ver
)
1929 void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout
)
1931 notify_timeout
= timeout
;
1934 int librados::IoCtxImpl::cache_pin(const object_t
& oid
)
1936 ::ObjectOperation wr
;
1937 prepare_assert_ops(&wr
);
1939 return operate(oid
, &wr
, NULL
);
1942 int librados::IoCtxImpl::cache_unpin(const object_t
& oid
)
1944 ::ObjectOperation wr
;
1945 prepare_assert_ops(&wr
);
1947 return operate(oid
, &wr
, NULL
);
1951 ///////////////////////////// C_aio_stat_Ack ////////////////////////////
1953 librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl
*_c
,
1961 void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r
)
1968 if (r
>= 0 && pmtime
) {
1969 *pmtime
= real_clock::to_time_t(mtime
);
1972 if (c
->callback_complete
) {
1973 c
->io
->client
->finisher
.queue(new C_AioComplete(c
));
1979 ///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1981 librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl
*_c
,
1982 struct timespec
*pt
)
1989 void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r
)
1996 if (r
>= 0 && pts
) {
1997 *pts
= real_clock::to_timespec(mtime
);
2000 if (c
->callback_complete
) {
2001 c
->io
->client
->finisher
.queue(new C_AioComplete(c
));
2007 //////////////////////////// C_aio_Complete ////////////////////////////////
2009 librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl
*_c
)
2015 void librados::IoCtxImpl::C_aio_Complete::finish(int r
)
2022 if (r
== 0 && c
->blp
&& c
->blp
->length() > 0) {
2023 if (c
->out_buf
&& !c
->blp
->is_provided_buffer(c
->out_buf
))
2024 c
->blp
->copy(0, c
->blp
->length(), c
->out_buf
);
2025 c
->rval
= c
->blp
->length();
2028 if (c
->callback_complete
||
2030 c
->io
->client
->finisher
.queue(new C_AioComplete(c
));
2033 if (c
->aio_write_seq
) {
2034 c
->io
->complete_aio_write(c
);
2037 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
2038 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_OP_COMPLETE");
2043 void librados::IoCtxImpl::object_list_slice(
2044 const hobject_t start
,
2045 const hobject_t finish
,
2048 hobject_t
*split_start
,
2049 hobject_t
*split_finish
)
2051 if (start
.is_max()) {
2052 *split_start
= hobject_t::get_max();
2053 *split_finish
= hobject_t::get_max();
2057 uint64_t start_hash
= hobject_t::_reverse_bits(start
.get_hash());
2058 uint64_t finish_hash
=
2059 finish
.is_max() ? 0x100000000 :
2060 hobject_t::_reverse_bits(finish
.get_hash());
2062 uint64_t diff
= finish_hash
- start_hash
;
2063 uint64_t rev_start
= start_hash
+ (diff
* n
/ m
);
2064 uint64_t rev_finish
= start_hash
+ (diff
* (n
+ 1) / m
);
2066 *split_start
= start
;
2068 *split_start
= hobject_t(
2069 object_t(), string(), CEPH_NOSNAP
,
2070 hobject_t::_reverse_bits(rev_start
), poolid
, string());
2074 *split_finish
= finish
;
2075 else if (rev_finish
>= 0x100000000)
2076 *split_finish
= hobject_t::get_max();
2078 *split_finish
= hobject_t(
2079 object_t(), string(), CEPH_NOSNAP
,
2080 hobject_t::_reverse_bits(rev_finish
), poolid
, string());
2083 int librados::IoCtxImpl::application_enable(const std::string
& app_name
,
2086 auto c
= new PoolAsyncCompletionImpl();
2087 application_enable_async(app_name
, force
, c
);
2092 r
= c
->get_return_value();
2098 return client
->wait_for_latest_osdmap();
2101 void librados::IoCtxImpl::application_enable_async(const std::string
& app_name
,
2103 PoolAsyncCompletionImpl
*c
)
2105 // pre-Luminous clusters will return -EINVAL and application won't be
2106 // preserved until Luminous is configured as minimim version.
2107 if (!client
->get_required_monitor_features().contains_all(
2108 ceph::features::mon::FEATURE_LUMINOUS
)) {
2109 client
->finisher
.queue(new C_PoolAsync_Safe(c
), -EOPNOTSUPP
);
2113 std::stringstream cmd
;
2115 << "\"prefix\": \"osd pool application enable\","
2116 << "\"pool\": \"" << get_cached_pool_name() << "\","
2117 << "\"app\": \"" << app_name
<< "\"";
2119 cmd
<< ",\"force\":\"--yes-i-really-mean-it\"";
2123 std::vector
<std::string
> cmds
;
2124 cmds
.push_back(cmd
.str());
2126 client
->mon_command_async(cmds
, inbl
, nullptr, nullptr,
2127 new C_PoolAsync_Safe(c
));
2130 int librados::IoCtxImpl::application_list(std::set
<std::string
> *app_names
)
2134 objecter
->with_osdmap([&](const OSDMap
& o
) {
2135 auto pg_pool
= o
.get_pg_pool(poolid
);
2136 if (pg_pool
== nullptr) {
2141 for (auto &pair
: pg_pool
->application_metadata
) {
2142 app_names
->insert(pair
.first
);
2148 int librados::IoCtxImpl::application_metadata_get(const std::string
& app_name
,
2149 const std::string
&key
,
2153 objecter
->with_osdmap([&](const OSDMap
& o
) {
2154 auto pg_pool
= o
.get_pg_pool(poolid
);
2155 if (pg_pool
== nullptr) {
2160 auto app_it
= pg_pool
->application_metadata
.find(app_name
);
2161 if (app_it
== pg_pool
->application_metadata
.end()) {
2166 auto it
= app_it
->second
.find(key
);
2167 if (it
== app_it
->second
.end()) {
2172 *value
= it
->second
;
2177 int librados::IoCtxImpl::application_metadata_set(const std::string
& app_name
,
2178 const std::string
&key
,
2179 const std::string
& value
)
2181 std::stringstream cmd
;
2183 << "\"prefix\":\"osd pool application set\","
2184 << "\"pool\":\"" << get_cached_pool_name() << "\","
2185 << "\"app\":\"" << app_name
<< "\","
2186 << "\"key\":\"" << key
<< "\","
2187 << "\"value\":\"" << value
<< "\""
2190 std::vector
<std::string
> cmds
;
2191 cmds
.push_back(cmd
.str());
2193 int r
= client
->mon_command(cmds
, inbl
, nullptr, nullptr);
2198 // ensure we have the latest osd map epoch before proceeding
2199 return client
->wait_for_latest_osdmap();
2202 int librados::IoCtxImpl::application_metadata_remove(const std::string
& app_name
,
2203 const std::string
&key
)
2205 std::stringstream cmd
;
2207 << "\"prefix\":\"osd pool application rm\","
2208 << "\"pool\":\"" << get_cached_pool_name() << "\","
2209 << "\"app\":\"" << app_name
<< "\","
2210 << "\"key\":\"" << key
<< "\""
2213 std::vector
<std::string
> cmds
;
2214 cmds
.push_back(cmd
.str());
2216 int r
= client
->mon_command(cmds
, inbl
, nullptr, nullptr);
2221 // ensure we have the latest osd map epoch before proceeding
2222 return client
->wait_for_latest_osdmap();
2225 int librados::IoCtxImpl::application_metadata_list(const std::string
& app_name
,
2226 std::map
<std::string
, std::string
> *values
)
2230 objecter
->with_osdmap([&](const OSDMap
& o
) {
2231 auto pg_pool
= o
.get_pg_pool(poolid
);
2232 if (pg_pool
== nullptr) {
2237 auto it
= pg_pool
->application_metadata
.find(app_name
);
2238 if (it
== pg_pool
->application_metadata
.end()) {
2243 *values
= it
->second
;