1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "IoCtxImpl.h"
19 #include "librados/librados_c.h"
20 #include "librados/AioCompletionImpl.h"
21 #include "librados/PoolAsyncCompletionImpl.h"
22 #include "librados/RadosClient.h"
23 #include "include/ceph_assert.h"
24 #include "common/valgrind.h"
25 #include "common/EventTrace.h"
27 #define dout_subsys ceph_subsys_rados
29 #define dout_prefix *_dout << "librados: "
33 using std::unique_lock
;
36 namespace bs
= boost::system
;
37 namespace ca
= ceph::async
;
38 namespace cb
= ceph::buffer
;
43 struct CB_notify_Finish
{
47 Objecter::LingerOp
*linger_op
;
48 bufferlist
*preply_bl
;
50 size_t *preply_buf_len
;
52 CB_notify_Finish(CephContext
*_cct
, Context
*_ctx
, Objecter
*_objecter
,
53 Objecter::LingerOp
*_linger_op
, bufferlist
*_preply_bl
,
54 char **_preply_buf
, size_t *_preply_buf_len
)
55 : cct(_cct
), ctx(_ctx
), objecter(_objecter
), linger_op(_linger_op
),
56 preply_bl(_preply_bl
), preply_buf(_preply_buf
),
57 preply_buf_len(_preply_buf_len
) {}
61 CB_notify_Finish(const CB_notify_Finish
&) = delete;
62 CB_notify_Finish
& operator =(const CB_notify_Finish
&) = delete;
63 CB_notify_Finish(CB_notify_Finish
&&) = default;
64 CB_notify_Finish
& operator =(CB_notify_Finish
&&) = default;
66 void operator()(bs::error_code ec
, bufferlist
&& reply_bl
) {
67 ldout(cct
, 10) << __func__
<< " completed notify (linger op "
68 << linger_op
<< "), ec = " << ec
<< dendl
;
70 // pass result back to user
71 // NOTE: we do this regardless of what error code we return
73 if (reply_bl
.length()) {
74 *preply_buf
= (char*)malloc(reply_bl
.length());
75 memcpy(*preply_buf
, reply_bl
.c_str(), reply_bl
.length());
81 *preply_buf_len
= reply_bl
.length();
83 *preply_bl
= std::move(reply_bl
);
85 ctx
->complete(ceph::from_error_code(ec
));
89 struct CB_aio_linger_cancel
{
91 Objecter::LingerOp
*linger_op
;
93 CB_aio_linger_cancel(Objecter
*_objecter
, Objecter::LingerOp
*_linger_op
)
94 : objecter(_objecter
), linger_op(_linger_op
)
99 objecter
->linger_cancel(linger_op
);
103 struct C_aio_linger_Complete
: public Context
{
104 AioCompletionImpl
*c
;
105 Objecter::LingerOp
*linger_op
;
108 C_aio_linger_Complete(AioCompletionImpl
*_c
, Objecter::LingerOp
*_linger_op
, bool _cancel
)
109 : c(_c
), linger_op(_linger_op
), cancel(_cancel
)
114 void finish(int r
) override
{
116 boost::asio::defer(c
->io
->client
->finish_strand
,
117 CB_aio_linger_cancel(c
->io
->objecter
,
123 c
->cond
.notify_all();
125 if (c
->callback_complete
||
127 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
133 struct C_aio_notify_Complete
: public C_aio_linger_Complete
{
134 ceph::mutex lock
= ceph::make_mutex("C_aio_notify_Complete::lock");
136 bool finished
= false;
139 C_aio_notify_Complete(AioCompletionImpl
*_c
, Objecter::LingerOp
*_linger_op
)
140 : C_aio_linger_Complete(_c
, _linger_op
, false) {
143 void handle_ack(int r
) {
144 // invoked by C_aio_notify_Ack
150 void complete(int r
) override
{
151 // invoked by C_notify_Finish
157 void complete_unlock(int r
) {
158 if (ret_val
== 0 && r
< 0) {
162 if (acked
&& finished
) {
165 C_aio_linger_Complete::complete(ret_val
);
172 struct C_aio_notify_Ack
: public Context
{
174 C_aio_notify_Complete
*oncomplete
;
176 C_aio_notify_Ack(CephContext
*_cct
,
177 C_aio_notify_Complete
*_oncomplete
)
178 : cct(_cct
), oncomplete(_oncomplete
)
182 void finish(int r
) override
184 ldout(cct
, 10) << __func__
<< " linger op " << oncomplete
->linger_op
<< " "
185 << "acked (" << r
<< ")" << dendl
;
186 oncomplete
->handle_ack(r
);
190 struct C_aio_selfmanaged_snap_op_Complete
: public Context
{
191 librados::RadosClient
*client
;
192 librados::AioCompletionImpl
*c
;
194 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient
*client
,
195 librados::AioCompletionImpl
*c
)
196 : client(client
), c(c
) {
200 void finish(int r
) override
{
204 c
->cond
.notify_all();
206 if (c
->callback_complete
|| c
->callback_safe
) {
207 boost::asio::defer(client
->finish_strand
, librados::CB_AioComplete(c
));
213 struct C_aio_selfmanaged_snap_create_Complete
: public C_aio_selfmanaged_snap_op_Complete
{
215 uint64_t *dest_snapid
;
217 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient
*client
,
218 librados::AioCompletionImpl
*c
,
219 uint64_t *dest_snapid
)
220 : C_aio_selfmanaged_snap_op_Complete(client
, c
),
221 dest_snapid(dest_snapid
) {
224 void finish(int r
) override
{
226 *dest_snapid
= snapid
;
228 C_aio_selfmanaged_snap_op_Complete::finish(r
);
232 } // anonymous namespace
233 } // namespace librados
235 librados::IoCtxImpl::IoCtxImpl() = default;
237 librados::IoCtxImpl::IoCtxImpl(RadosClient
*c
, Objecter
*objecter
,
238 int64_t poolid
, snapid_t s
)
239 : client(c
), poolid(poolid
), snap_seq(s
),
240 notify_timeout(c
->cct
->_conf
->client_notify_timeout
),
242 aio_write_seq(0), objecter(objecter
)
246 void librados::IoCtxImpl::set_snap_read(snapid_t s
)
250 ldout(client
->cct
, 10) << "set snap read " << snap_seq
<< " -> " << s
<< dendl
;
254 int librados::IoCtxImpl::set_snap_write_context(snapid_t seq
, vector
<snapid_t
>& snaps
)
257 ldout(client
->cct
, 10) << "set snap write context: seq = " << seq
258 << " and snaps = " << snaps
<< dendl
;
267 int librados::IoCtxImpl::get_object_hash_position(
268 const std::string
& oid
, uint32_t *hash_position
)
270 int64_t r
= objecter
->get_object_hash_position(poolid
, oid
, oloc
.nspace
);
273 *hash_position
= (uint32_t)r
;
277 int librados::IoCtxImpl::get_object_pg_hash_position(
278 const std::string
& oid
, uint32_t *pg_hash_position
)
280 int64_t r
= objecter
->get_object_pg_hash_position(poolid
, oid
, oloc
.nspace
);
283 *pg_hash_position
= (uint32_t)r
;
287 void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl
*c
)
290 std::scoped_lock l
{aio_write_list_lock
};
291 ceph_assert(c
->io
== this);
292 c
->aio_write_seq
= ++aio_write_seq
;
293 ldout(client
->cct
, 20) << "queue_aio_write " << this << " completion " << c
294 << " write_seq " << aio_write_seq
<< dendl
;
295 aio_write_list
.push_back(&c
->aio_write_list_item
);
298 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl
*c
)
300 ldout(client
->cct
, 20) << "complete_aio_write " << c
<< dendl
;
301 aio_write_list_lock
.lock();
302 ceph_assert(c
->io
== this);
303 c
->aio_write_list_item
.remove_myself();
305 map
<ceph_tid_t
, std::list
<AioCompletionImpl
*> >::iterator waiters
= aio_write_waiters
.begin();
306 while (waiters
!= aio_write_waiters
.end()) {
307 if (!aio_write_list
.empty() &&
308 aio_write_list
.front()->aio_write_seq
<= waiters
->first
) {
309 ldout(client
->cct
, 20) << " next outstanding write is " << aio_write_list
.front()->aio_write_seq
310 << " <= waiter " << waiters
->first
311 << ", stopping" << dendl
;
314 ldout(client
->cct
, 20) << " waking waiters on seq " << waiters
->first
<< dendl
;
315 for (std::list
<AioCompletionImpl
*>::iterator it
= waiters
->second
.begin();
316 it
!= waiters
->second
.end(); ++it
) {
317 boost::asio::defer(client
->finish_strand
, CB_AioCompleteAndSafe(*it
));
320 aio_write_waiters
.erase(waiters
++);
323 aio_write_cond
.notify_all();
324 aio_write_list_lock
.unlock();
328 void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl
*c
)
330 ldout(client
->cct
, 20) << "flush_aio_writes_async " << this
331 << " completion " << c
<< dendl
;
332 std::lock_guard
l(aio_write_list_lock
);
333 ceph_tid_t seq
= aio_write_seq
;
334 if (aio_write_list
.empty()) {
335 ldout(client
->cct
, 20) << "flush_aio_writes_async no writes. (tid "
336 << seq
<< ")" << dendl
;
337 boost::asio::defer(client
->finish_strand
, CB_AioCompleteAndSafe(c
));
339 ldout(client
->cct
, 20) << "flush_aio_writes_async " << aio_write_list
.size()
340 << " writes in flight; waiting on tid " << seq
<< dendl
;
342 aio_write_waiters
[seq
].push_back(c
);
346 void librados::IoCtxImpl::flush_aio_writes()
348 ldout(client
->cct
, 20) << "flush_aio_writes" << dendl
;
349 std::unique_lock l
{aio_write_list_lock
};
350 aio_write_cond
.wait(l
, [seq
=aio_write_seq
, this] {
351 return (aio_write_list
.empty() ||
352 aio_write_list
.front()->aio_write_seq
> seq
);
356 string
librados::IoCtxImpl::get_cached_pool_name()
359 client
->pool_get_name(get_id(), &pn
);
365 int librados::IoCtxImpl::snap_create(const char *snapName
)
368 string
sName(snapName
);
370 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::snap_create::mylock");
371 ceph::condition_variable cond
;
373 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
374 objecter
->create_pool_snap(poolid
, sName
, onfinish
);
376 std::unique_lock l
{mylock
};
377 cond
.wait(l
, [&done
] { return done
; });
381 int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid
)
385 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::selfmanaged_snap_create::mylock");
386 ceph::condition_variable cond
;
388 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
390 objecter
->allocate_selfmanaged_snap(poolid
, &snapid
, onfinish
);
393 std::unique_lock l
{mylock
};
394 cond
.wait(l
, [&done
] { return done
; });
401 void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid
,
402 AioCompletionImpl
*c
)
404 C_aio_selfmanaged_snap_create_Complete
*onfinish
=
405 new C_aio_selfmanaged_snap_create_Complete(client
, c
, snapid
);
406 objecter
->allocate_selfmanaged_snap(poolid
, &onfinish
->snapid
,
410 int librados::IoCtxImpl::snap_remove(const char *snapName
)
413 string
sName(snapName
);
415 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::snap_remove::mylock");
416 ceph::condition_variable cond
;
418 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
419 objecter
->delete_pool_snap(poolid
, sName
, onfinish
);
420 unique_lock l
{mylock
};
421 cond
.wait(l
, [&done
] { return done
; });
425 int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t
& oid
,
426 ::SnapContext
& snapc
,
431 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::snap_rollback::mylock");
432 ceph::condition_variable cond
;
434 Context
*onack
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
436 ::ObjectOperation op
;
437 prepare_assert_ops(&op
);
439 objecter
->mutate(oid
, oloc
,
440 op
, snapc
, ceph::real_clock::now(),
444 std::unique_lock l
{mylock
};
445 cond
.wait(l
, [&done
] { return done
; });
449 int librados::IoCtxImpl::rollback(const object_t
& oid
, const char *snapName
)
453 int r
= objecter
->pool_snap_by_name(poolid
, snapName
, &snap
);
458 return selfmanaged_snap_rollback_object(oid
, snapc
, snap
);
461 int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid
)
465 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::selfmanaged_snap_remove::mylock");
466 ceph::condition_variable cond
;
468 objecter
->delete_selfmanaged_snap(poolid
, snapid_t(snapid
),
469 new C_SafeCond(mylock
, cond
, &done
, &reply
));
471 std::unique_lock l
{mylock
};
472 cond
.wait(l
, [&done
] { return done
; });
476 void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid
,
477 AioCompletionImpl
*c
)
479 Context
*onfinish
= new C_aio_selfmanaged_snap_op_Complete(client
, c
);
480 objecter
->delete_selfmanaged_snap(poolid
, snapid
, onfinish
);
483 int librados::IoCtxImpl::snap_list(vector
<uint64_t> *snaps
)
485 return objecter
->pool_snap_list(poolid
, snaps
);
488 int librados::IoCtxImpl::snap_lookup(const char *name
, uint64_t *snapid
)
490 return objecter
->pool_snap_by_name(poolid
, name
, (snapid_t
*)snapid
);
493 int librados::IoCtxImpl::snap_get_name(uint64_t snapid
, std::string
*s
)
495 pool_snap_info_t info
;
496 int ret
= objecter
->pool_snap_get_info(poolid
, snapid
, &info
);
500 *s
= info
.name
.c_str();
504 int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid
, time_t *t
)
506 pool_snap_info_t info
;
507 int ret
= objecter
->pool_snap_get_info(poolid
, snapid
, &info
);
511 *t
= info
.stamp
.sec();
518 int librados::IoCtxImpl::nlist(Objecter::NListContext
*context
, int max_entries
)
522 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::nlist::mylock");
523 ceph::condition_variable cond
;
525 if (context
->at_end())
528 context
->max_entries
= max_entries
;
529 context
->nspace
= oloc
.nspace
;
531 objecter
->list_nobjects(context
, new C_SafeCond(mylock
, cond
, &done
, &r
));
533 std::unique_lock l
{mylock
};
534 cond
.wait(l
, [&done
] { return done
; });
538 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext
*context
,
541 context
->list
.clear();
542 return objecter
->list_nobjects_seek(context
, pos
);
545 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext
*context
,
546 const rados_object_list_cursor
& cursor
)
548 context
->list
.clear();
549 return objecter
->list_nobjects_seek(context
, *(const hobject_t
*)cursor
);
552 rados_object_list_cursor
librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext
*context
)
554 hobject_t
*c
= new hobject_t
;
556 objecter
->list_nobjects_get_cursor(context
, c
);
557 return (rados_object_list_cursor
)c
;
560 int librados::IoCtxImpl::create(const object_t
& oid
, bool exclusive
)
562 ::ObjectOperation op
;
563 prepare_assert_ops(&op
);
564 op
.create(exclusive
);
565 return operate(oid
, &op
, NULL
);
569 * add any version assert operations that are appropriate given the
570 * stat in the IoCtx, either the target version assert or any src
571 * object asserts. these affect a single ioctx operation, so clear
572 * the ioctx state when we're doing.
574 * return a pointer to the ObjectOperation if we added any events;
575 * this is convenient for passing the extra_ops argument into Objecter
578 ::ObjectOperation
*librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation
*op
)
580 ::ObjectOperation
*pop
= NULL
;
582 op
->assert_version(assert_ver
);
589 int librados::IoCtxImpl::write(const object_t
& oid
, bufferlist
& bl
,
590 size_t len
, uint64_t off
)
592 if (len
> UINT_MAX
/2)
594 ::ObjectOperation op
;
595 prepare_assert_ops(&op
);
597 mybl
.substr_of(bl
, 0, len
);
599 return operate(oid
, &op
, NULL
);
602 int librados::IoCtxImpl::append(const object_t
& oid
, bufferlist
& bl
, size_t len
)
604 if (len
> UINT_MAX
/2)
606 ::ObjectOperation op
;
607 prepare_assert_ops(&op
);
609 mybl
.substr_of(bl
, 0, len
);
611 return operate(oid
, &op
, NULL
);
614 int librados::IoCtxImpl::write_full(const object_t
& oid
, bufferlist
& bl
)
616 if (bl
.length() > UINT_MAX
/2)
618 ::ObjectOperation op
;
619 prepare_assert_ops(&op
);
621 return operate(oid
, &op
, NULL
);
624 int librados::IoCtxImpl::writesame(const object_t
& oid
, bufferlist
& bl
,
625 size_t write_len
, uint64_t off
)
627 if ((bl
.length() > UINT_MAX
/2) || (write_len
> UINT_MAX
/2))
629 if ((bl
.length() == 0) || (write_len
% bl
.length()))
631 ::ObjectOperation op
;
632 prepare_assert_ops(&op
);
634 mybl
.substr_of(bl
, 0, bl
.length());
635 op
.writesame(off
, write_len
, mybl
);
636 return operate(oid
, &op
, NULL
);
639 int librados::IoCtxImpl::operate(const object_t
& oid
, ::ObjectOperation
*o
,
640 ceph::real_time
*pmtime
, int flags
)
642 ceph::real_time ut
= (pmtime
? *pmtime
:
643 ceph::real_clock::now());
645 /* can't write to a snapshot */
646 if (snap_seq
!= CEPH_NOSNAP
)
652 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::operate::mylock");
653 ceph::condition_variable cond
;
658 Context
*oncommit
= new C_SafeCond(mylock
, cond
, &done
, &r
);
660 int op
= o
->ops
[0].op
.op
;
661 ldout(client
->cct
, 10) << ceph_osd_op_name(op
) << " oid=" << oid
662 << " nspace=" << oloc
.nspace
<< dendl
;
663 Objecter::Op
*objecter_op
= objecter
->prepare_mutate_op(
666 flags
| extra_op_flags
,
668 objecter
->op_submit(objecter_op
);
671 std::unique_lock l
{mylock
};
672 cond
.wait(l
, [&done
] { return done
;});
674 ldout(client
->cct
, 10) << "Objecter returned from "
675 << ceph_osd_op_name(op
) << " r=" << r
<< dendl
;
677 set_sync_op_version(ver
);
682 int librados::IoCtxImpl::operate_read(const object_t
& oid
,
683 ::ObjectOperation
*o
,
690 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::operate_read::mylock");
691 ceph::condition_variable cond
;
696 Context
*onack
= new C_SafeCond(mylock
, cond
, &done
, &r
);
698 int op
= o
->ops
[0].op
.op
;
699 ldout(client
->cct
, 10) << ceph_osd_op_name(op
) << " oid=" << oid
<< " nspace=" << oloc
.nspace
<< dendl
;
700 Objecter::Op
*objecter_op
= objecter
->prepare_read_op(
703 flags
| extra_op_flags
,
705 objecter
->op_submit(objecter_op
);
708 std::unique_lock l
{mylock
};
709 cond
.wait(l
, [&done
] { return done
; });
711 ldout(client
->cct
, 10) << "Objecter returned from "
712 << ceph_osd_op_name(op
) << " r=" << r
<< dendl
;
714 set_sync_op_version(ver
);
719 int librados::IoCtxImpl::aio_operate_read(const object_t
&oid
,
720 ::ObjectOperation
*o
,
721 AioCompletionImpl
*c
,
724 const blkin_trace_info
*trace_info
)
726 FUNCTRACE(client
->cct
);
727 Context
*oncomplete
= new C_aio_Complete(c
);
729 #if defined(WITH_EVENTTRACE)
730 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
735 ZTracer::Trace trace
;
737 ZTracer::Trace
parent_trace("", nullptr, trace_info
);
738 trace
.init("rados operate read", &objecter
->trace_endpoint
, &parent_trace
);
741 trace
.event("init root span");
742 Objecter::Op
*objecter_op
= objecter
->prepare_read_op(
744 *o
, snap_seq
, pbl
, flags
| extra_op_flags
,
745 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
746 objecter
->op_submit(objecter_op
, &c
->tid
);
747 trace
.event("rados operate read submitted");
752 int librados::IoCtxImpl::aio_operate(const object_t
& oid
,
753 ::ObjectOperation
*o
, AioCompletionImpl
*c
,
754 const SnapContext
& snap_context
,
755 const ceph::real_time
*pmtime
, int flags
,
756 const blkin_trace_info
*trace_info
)
758 FUNCTRACE(client
->cct
);
759 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_WRITE_OP_BEGIN");
760 const ceph::real_time ut
= (pmtime
? *pmtime
: ceph::real_clock::now());
761 /* can't write to a snapshot */
762 if (snap_seq
!= CEPH_NOSNAP
)
765 Context
*oncomplete
= new C_aio_Complete(c
);
766 #if defined(WITH_EVENTTRACE)
767 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
773 ZTracer::Trace trace
;
775 ZTracer::Trace
parent_trace("", nullptr, trace_info
);
776 trace
.init("rados operate", &objecter
->trace_endpoint
, &parent_trace
);
779 trace
.event("init root span");
780 Objecter::Op
*op
= objecter
->prepare_mutate_op(
781 oid
, oloc
, *o
, snap_context
, ut
, flags
| extra_op_flags
,
782 oncomplete
, &c
->objver
, osd_reqid_t(), &trace
);
783 objecter
->op_submit(op
, &c
->tid
);
784 trace
.event("rados operate op submitted");
789 int librados::IoCtxImpl::aio_read(const object_t oid
, AioCompletionImpl
*c
,
790 bufferlist
*pbl
, size_t len
, uint64_t off
,
791 uint64_t snapid
, const blkin_trace_info
*info
)
793 FUNCTRACE(client
->cct
);
794 if (len
> (size_t) INT_MAX
)
797 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
798 Context
*oncomplete
= new C_aio_Complete(c
);
800 #if defined(WITH_EVENTTRACE)
801 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
807 ZTracer::Trace trace
;
809 trace
.init("rados read", &objecter
->trace_endpoint
, info
);
811 Objecter::Op
*o
= objecter
->prepare_read_op(
813 off
, len
, snapid
, pbl
, extra_op_flags
,
814 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
815 objecter
->op_submit(o
, &c
->tid
);
819 int librados::IoCtxImpl::aio_read(const object_t oid
, AioCompletionImpl
*c
,
820 char *buf
, size_t len
, uint64_t off
,
821 uint64_t snapid
, const blkin_trace_info
*info
)
823 FUNCTRACE(client
->cct
);
824 if (len
> (size_t) INT_MAX
)
827 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
828 Context
*oncomplete
= new C_aio_Complete(c
);
830 #if defined(WITH_EVENTTRACE)
831 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
836 c
->bl
.push_back(buffer::create_static(len
, buf
));
840 ZTracer::Trace trace
;
842 trace
.init("rados read", &objecter
->trace_endpoint
, info
);
844 Objecter::Op
*o
= objecter
->prepare_read_op(
846 off
, len
, snapid
, &c
->bl
, extra_op_flags
,
847 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
848 objecter
->op_submit(o
, &c
->tid
);
852 class C_ObjectOperation
: public Context
{
854 ::ObjectOperation m_ops
;
855 explicit C_ObjectOperation(Context
*c
) : m_ctx(c
) {}
856 void finish(int r
) override
{
863 int librados::IoCtxImpl::aio_sparse_read(const object_t oid
,
864 AioCompletionImpl
*c
,
865 std::map
<uint64_t,uint64_t> *m
,
866 bufferlist
*data_bl
, size_t len
,
867 uint64_t off
, uint64_t snapid
)
869 FUNCTRACE(client
->cct
);
870 if (len
> (size_t) INT_MAX
)
873 Context
*nested
= new C_aio_Complete(c
);
874 C_ObjectOperation
*onack
= new C_ObjectOperation(nested
);
876 #if defined(WITH_EVENTTRACE)
877 ((C_aio_Complete
*) nested
)->oid
= oid
;
882 onack
->m_ops
.sparse_read(off
, len
, m
, data_bl
, NULL
);
884 Objecter::Op
*o
= objecter
->prepare_read_op(
886 onack
->m_ops
, snapid
, NULL
, extra_op_flags
,
888 objecter
->op_submit(o
, &c
->tid
);
892 int librados::IoCtxImpl::aio_cmpext(const object_t
& oid
,
893 AioCompletionImpl
*c
,
897 if (cmp_bl
.length() > UINT_MAX
/2)
900 Context
*onack
= new C_aio_Complete(c
);
905 Objecter::Op
*o
= objecter
->prepare_cmpext_op(
906 oid
, oloc
, off
, cmp_bl
, snap_seq
, extra_op_flags
,
908 objecter
->op_submit(o
, &c
->tid
);
913 /* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
914 int librados::IoCtxImpl::aio_cmpext(const object_t
& oid
,
915 AioCompletionImpl
*c
,
920 if (cmp_len
> UINT_MAX
/2)
924 cmp_bl
.append(cmp_buf
, cmp_len
);
926 Context
*nested
= new C_aio_Complete(c
);
927 C_ObjectOperation
*onack
= new C_ObjectOperation(nested
);
932 onack
->m_ops
.cmpext(off
, cmp_len
, cmp_buf
, NULL
);
934 Objecter::Op
*o
= objecter
->prepare_read_op(
935 oid
, oloc
, onack
->m_ops
, snap_seq
, NULL
, extra_op_flags
, onack
, &c
->objver
);
936 objecter
->op_submit(o
, &c
->tid
);
940 int librados::IoCtxImpl::aio_write(const object_t
&oid
, AioCompletionImpl
*c
,
941 const bufferlist
& bl
, size_t len
,
942 uint64_t off
, const blkin_trace_info
*info
)
944 FUNCTRACE(client
->cct
);
945 auto ut
= ceph::real_clock::now();
946 ldout(client
->cct
, 20) << "aio_write " << oid
<< " " << off
<< "~" << len
<< " snapc=" << snapc
<< " snap_seq=" << snap_seq
<< dendl
;
947 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_WRITE_OP_BEGIN");
949 if (len
> UINT_MAX
/2)
951 /* can't write to a snapshot */
952 if (snap_seq
!= CEPH_NOSNAP
)
955 Context
*oncomplete
= new C_aio_Complete(c
);
957 #if defined(WITH_EVENTTRACE)
958 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
960 ZTracer::Trace trace
;
962 trace
.init("rados write", &objecter
->trace_endpoint
, info
);
967 Objecter::Op
*o
= objecter
->prepare_write_op(
969 off
, len
, snapc
, bl
, ut
, extra_op_flags
,
970 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
971 objecter
->op_submit(o
, &c
->tid
);
976 int librados::IoCtxImpl::aio_append(const object_t
&oid
, AioCompletionImpl
*c
,
977 const bufferlist
& bl
, size_t len
)
979 FUNCTRACE(client
->cct
);
980 auto ut
= ceph::real_clock::now();
982 if (len
> UINT_MAX
/2)
984 /* can't write to a snapshot */
985 if (snap_seq
!= CEPH_NOSNAP
)
988 Context
*oncomplete
= new C_aio_Complete(c
);
989 #if defined(WITH_EVENTTRACE)
990 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
996 Objecter::Op
*o
= objecter
->prepare_append_op(
998 len
, snapc
, bl
, ut
, extra_op_flags
,
999 oncomplete
, &c
->objver
);
1000 objecter
->op_submit(o
, &c
->tid
);
1005 int librados::IoCtxImpl::aio_write_full(const object_t
&oid
,
1006 AioCompletionImpl
*c
,
1007 const bufferlist
& bl
)
1009 FUNCTRACE(client
->cct
);
1010 auto ut
= ceph::real_clock::now();
1012 if (bl
.length() > UINT_MAX
/2)
1014 /* can't write to a snapshot */
1015 if (snap_seq
!= CEPH_NOSNAP
)
1018 Context
*oncomplete
= new C_aio_Complete(c
);
1019 #if defined(WITH_EVENTTRACE)
1020 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1026 Objecter::Op
*o
= objecter
->prepare_write_full_op(
1028 snapc
, bl
, ut
, extra_op_flags
,
1029 oncomplete
, &c
->objver
);
1030 objecter
->op_submit(o
, &c
->tid
);
1035 int librados::IoCtxImpl::aio_writesame(const object_t
&oid
,
1036 AioCompletionImpl
*c
,
1037 const bufferlist
& bl
,
1041 FUNCTRACE(client
->cct
);
1042 auto ut
= ceph::real_clock::now();
1044 if ((bl
.length() > UINT_MAX
/2) || (write_len
> UINT_MAX
/2))
1046 if ((bl
.length() == 0) || (write_len
% bl
.length()))
1048 /* can't write to a snapshot */
1049 if (snap_seq
!= CEPH_NOSNAP
)
1052 Context
*oncomplete
= new C_aio_Complete(c
);
1054 #if defined(WITH_EVENTTRACE)
1055 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1060 Objecter::Op
*o
= objecter
->prepare_writesame_op(
1063 snapc
, bl
, ut
, extra_op_flags
,
1064 oncomplete
, &c
->objver
);
1065 objecter
->op_submit(o
, &c
->tid
);
1070 int librados::IoCtxImpl::aio_remove(const object_t
&oid
, AioCompletionImpl
*c
, int flags
)
1072 FUNCTRACE(client
->cct
);
1073 auto ut
= ceph::real_clock::now();
1075 /* can't write to a snapshot */
1076 if (snap_seq
!= CEPH_NOSNAP
)
1079 Context
*oncomplete
= new C_aio_Complete(c
);
1081 #if defined(WITH_EVENTTRACE)
1082 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1087 Objecter::Op
*o
= objecter
->prepare_remove_op(
1089 snapc
, ut
, flags
| extra_op_flags
,
1090 oncomplete
, &c
->objver
);
1091 objecter
->op_submit(o
, &c
->tid
);
1097 int librados::IoCtxImpl::aio_stat(const object_t
& oid
, AioCompletionImpl
*c
,
1098 uint64_t *psize
, time_t *pmtime
)
1100 C_aio_stat_Ack
*onack
= new C_aio_stat_Ack(c
, pmtime
);
1103 Objecter::Op
*o
= objecter
->prepare_stat_op(
1105 snap_seq
, psize
, &onack
->mtime
, extra_op_flags
,
1107 objecter
->op_submit(o
, &c
->tid
);
1111 int librados::IoCtxImpl::aio_stat2(const object_t
& oid
, AioCompletionImpl
*c
,
1112 uint64_t *psize
, struct timespec
*pts
)
1114 C_aio_stat2_Ack
*onack
= new C_aio_stat2_Ack(c
, pts
);
1117 Objecter::Op
*o
= objecter
->prepare_stat_op(
1119 snap_seq
, psize
, &onack
->mtime
, extra_op_flags
,
1121 objecter
->op_submit(o
, &c
->tid
);
1125 int librados::IoCtxImpl::aio_getxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1126 const char *name
, bufferlist
& bl
)
1128 ::ObjectOperation rd
;
1129 prepare_assert_ops(&rd
);
1130 rd
.getxattr(name
, &bl
, NULL
);
1131 int r
= aio_operate_read(oid
, &rd
, c
, 0, &bl
);
1135 int librados::IoCtxImpl::aio_rmxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1138 ::ObjectOperation op
;
1139 prepare_assert_ops(&op
);
1141 return aio_operate(oid
, &op
, c
, snapc
, nullptr, 0);
1144 int librados::IoCtxImpl::aio_setxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1145 const char *name
, bufferlist
& bl
)
1147 ::ObjectOperation op
;
1148 prepare_assert_ops(&op
);
1149 op
.setxattr(name
, bl
);
1150 return aio_operate(oid
, &op
, c
, snapc
, nullptr, 0);
1154 struct AioGetxattrsData
{
1155 AioGetxattrsData(librados::AioCompletionImpl
*c
, map
<string
, bufferlist
>* attrset
,
1156 librados::RadosClient
*_client
) :
1157 user_completion(c
), user_attrset(attrset
), client(_client
) {}
1158 struct librados::CB_AioCompleteAndSafe user_completion
;
1159 map
<string
, bufferlist
> result_attrset
;
1160 map
<std::string
, bufferlist
>* user_attrset
;
1161 librados::RadosClient
*client
;
1165 static void aio_getxattrs_complete(rados_completion_t c
, void *arg
) {
1166 AioGetxattrsData
*cdata
= reinterpret_cast<AioGetxattrsData
*>(arg
);
1167 int rc
= rados_aio_get_return_value(c
);
1168 cdata
->user_attrset
->clear();
1170 for (map
<string
,bufferlist
>::iterator p
= cdata
->result_attrset
.begin();
1171 p
!= cdata
->result_attrset
.end();
1173 ldout(cdata
->client
->cct
, 10) << "IoCtxImpl::getxattrs: xattr=" << p
->first
<< dendl
;
1174 (*cdata
->user_attrset
)[p
->first
] = p
->second
;
1177 cdata
->user_completion(rc
);
1178 ((librados::AioCompletionImpl
*)c
)->put();
1182 int librados::IoCtxImpl::aio_getxattrs(const object_t
& oid
, AioCompletionImpl
*c
,
1183 map
<std::string
, bufferlist
>& attrset
)
1185 AioGetxattrsData
*cdata
= new AioGetxattrsData(c
, &attrset
, client
);
1186 ::ObjectOperation rd
;
1187 prepare_assert_ops(&rd
);
1188 rd
.getxattrs(&cdata
->result_attrset
, NULL
);
1189 librados::AioCompletionImpl
*comp
= new librados::AioCompletionImpl
;
1190 comp
->set_complete_callback(cdata
, aio_getxattrs_complete
);
1191 return aio_operate_read(oid
, &rd
, comp
, 0, NULL
);
1194 int librados::IoCtxImpl::aio_cancel(AioCompletionImpl
*c
)
1196 return objecter
->op_cancel(c
->tid
, -ECANCELED
);
1200 int librados::IoCtxImpl::hit_set_list(uint32_t hash
, AioCompletionImpl
*c
,
1201 std::list
< std::pair
<time_t, time_t> > *pls
)
1203 Context
*oncomplete
= new C_aio_Complete(c
);
1207 ::ObjectOperation rd
;
1208 rd
.hit_set_ls(pls
, NULL
);
1209 object_locator_t
oloc(poolid
);
1210 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1211 hash
, oloc
, rd
, NULL
, extra_op_flags
, oncomplete
, NULL
, NULL
);
1212 objecter
->op_submit(o
, &c
->tid
);
1216 int librados::IoCtxImpl::hit_set_get(uint32_t hash
, AioCompletionImpl
*c
,
1220 Context
*oncomplete
= new C_aio_Complete(c
);
1224 ::ObjectOperation rd
;
1225 rd
.hit_set_get(ceph::real_clock::from_time_t(stamp
), pbl
, 0);
1226 object_locator_t
oloc(poolid
);
1227 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1228 hash
, oloc
, rd
, NULL
, extra_op_flags
, oncomplete
, NULL
, NULL
);
1229 objecter
->op_submit(o
, &c
->tid
);
1233 int librados::IoCtxImpl::remove(const object_t
& oid
)
1235 ::ObjectOperation op
;
1236 prepare_assert_ops(&op
);
1238 return operate(oid
, &op
, nullptr, librados::OPERATION_FULL_FORCE
);
1241 int librados::IoCtxImpl::remove(const object_t
& oid
, int flags
)
1243 ::ObjectOperation op
;
1244 prepare_assert_ops(&op
);
1246 return operate(oid
, &op
, NULL
, flags
);
1249 int librados::IoCtxImpl::trunc(const object_t
& oid
, uint64_t size
)
1251 ::ObjectOperation op
;
1252 prepare_assert_ops(&op
);
1254 return operate(oid
, &op
, NULL
);
1257 int librados::IoCtxImpl::get_inconsistent_objects(const pg_t
& pg
,
1258 const librados::object_id_t
& start_after
,
1259 uint64_t max_to_get
,
1260 AioCompletionImpl
*c
,
1261 std::vector
<inconsistent_obj_t
>* objects
,
1264 Context
*oncomplete
= new C_aio_Complete(c
);
1268 ::ObjectOperation op
;
1269 op
.scrub_ls(start_after
, max_to_get
, objects
, interval
, &c
->rval
);
1270 object_locator_t oloc
{poolid
, pg
.ps()};
1271 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1272 oloc
.hash
, oloc
, op
, nullptr, CEPH_OSD_FLAG_PGOP
| extra_op_flags
, oncomplete
,
1274 objecter
->op_submit(o
, &c
->tid
);
1278 int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t
& pg
,
1279 const librados::object_id_t
& start_after
,
1280 uint64_t max_to_get
,
1281 AioCompletionImpl
*c
,
1282 std::vector
<inconsistent_snapset_t
>* snapsets
,
1285 Context
*oncomplete
= new C_aio_Complete(c
);
1289 ::ObjectOperation op
;
1290 op
.scrub_ls(start_after
, max_to_get
, snapsets
, interval
, &c
->rval
);
1291 object_locator_t oloc
{poolid
, pg
.ps()};
1292 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1293 oloc
.hash
, oloc
, op
, nullptr, CEPH_OSD_FLAG_PGOP
| extra_op_flags
, oncomplete
,
1295 objecter
->op_submit(o
, &c
->tid
);
1299 int librados::IoCtxImpl::tmap_update(const object_t
& oid
, bufferlist
& cmdbl
)
1301 ::ObjectOperation wr
;
1302 prepare_assert_ops(&wr
);
1303 wr
.tmap_update(cmdbl
);
1304 return operate(oid
, &wr
, NULL
);
1307 int librados::IoCtxImpl::exec(const object_t
& oid
,
1308 const char *cls
, const char *method
,
1309 bufferlist
& inbl
, bufferlist
& outbl
)
1311 ::ObjectOperation rd
;
1312 prepare_assert_ops(&rd
);
1313 rd
.call(cls
, method
, inbl
);
1314 return operate_read(oid
, &rd
, &outbl
);
1317 int librados::IoCtxImpl::aio_exec(const object_t
& oid
, AioCompletionImpl
*c
,
1318 const char *cls
, const char *method
,
1319 bufferlist
& inbl
, bufferlist
*outbl
)
1321 FUNCTRACE(client
->cct
);
1322 Context
*oncomplete
= new C_aio_Complete(c
);
1324 #if defined(WITH_EVENTTRACE)
1325 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1330 ::ObjectOperation rd
;
1331 prepare_assert_ops(&rd
);
1332 rd
.call(cls
, method
, inbl
);
1333 Objecter::Op
*o
= objecter
->prepare_read_op(
1334 oid
, oloc
, rd
, snap_seq
, outbl
, extra_op_flags
, oncomplete
, &c
->objver
);
1335 objecter
->op_submit(o
, &c
->tid
);
1339 int librados::IoCtxImpl::aio_exec(const object_t
& oid
, AioCompletionImpl
*c
,
1340 const char *cls
, const char *method
,
1341 bufferlist
& inbl
, char *buf
, size_t out_len
)
1343 FUNCTRACE(client
->cct
);
1344 Context
*oncomplete
= new C_aio_Complete(c
);
1346 #if defined(WITH_EVENTTRACE)
1347 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1352 c
->bl
.push_back(buffer::create_static(out_len
, buf
));
1356 ::ObjectOperation rd
;
1357 prepare_assert_ops(&rd
);
1358 rd
.call(cls
, method
, inbl
);
1359 Objecter::Op
*o
= objecter
->prepare_read_op(
1360 oid
, oloc
, rd
, snap_seq
, &c
->bl
, extra_op_flags
, oncomplete
, &c
->objver
);
1361 objecter
->op_submit(o
, &c
->tid
);
1365 int librados::IoCtxImpl::read(const object_t
& oid
,
1366 bufferlist
& bl
, size_t len
, uint64_t off
)
1368 if (len
> (size_t) INT_MAX
)
1370 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
1372 ::ObjectOperation rd
;
1373 prepare_assert_ops(&rd
);
1374 rd
.read(off
, len
, &bl
, NULL
, NULL
);
1375 int r
= operate_read(oid
, &rd
, &bl
);
1379 if (bl
.length() < len
) {
1380 ldout(client
->cct
, 10) << "Returned length " << bl
.length()
1381 << " less than original length "<< len
<< dendl
;
1387 int librados::IoCtxImpl::cmpext(const object_t
& oid
, uint64_t off
,
1390 if (cmp_bl
.length() > UINT_MAX
/2)
1393 ::ObjectOperation op
;
1394 prepare_assert_ops(&op
);
1395 op
.cmpext(off
, cmp_bl
, NULL
);
1396 return operate_read(oid
, &op
, NULL
);
1399 int librados::IoCtxImpl::mapext(const object_t
& oid
,
1400 uint64_t off
, size_t len
,
1401 std::map
<uint64_t,uint64_t>& m
)
1405 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::read::mylock");
1406 ceph::condition_variable cond
;
1409 Context
*onack
= new C_SafeCond(mylock
, cond
, &done
, &r
);
1411 objecter
->mapext(oid
, oloc
,
1412 off
, len
, snap_seq
, &bl
, extra_op_flags
,
1416 unique_lock l
{mylock
};
1417 cond
.wait(l
, [&done
] { return done
;});
1419 ldout(client
->cct
, 10) << "Objecter returned from read r=" << r
<< dendl
;
1424 auto iter
= bl
.cbegin();
1430 int librados::IoCtxImpl::sparse_read(const object_t
& oid
,
1431 std::map
<uint64_t,uint64_t>& m
,
1432 bufferlist
& data_bl
, size_t len
,
1435 if (len
> (size_t) INT_MAX
)
1438 ::ObjectOperation rd
;
1439 prepare_assert_ops(&rd
);
1440 rd
.sparse_read(off
, len
, &m
, &data_bl
, NULL
);
1442 int r
= operate_read(oid
, &rd
, NULL
);
1449 int librados::IoCtxImpl::checksum(const object_t
& oid
, uint8_t type
,
1450 const bufferlist
&init_value
, size_t len
,
1451 uint64_t off
, size_t chunk_size
,
1454 if (len
> (size_t) INT_MAX
) {
1458 ::ObjectOperation rd
;
1459 prepare_assert_ops(&rd
);
1460 rd
.checksum(type
, init_value
, off
, len
, chunk_size
, pbl
, nullptr, nullptr);
1462 int r
= operate_read(oid
, &rd
, nullptr);
1470 int librados::IoCtxImpl::stat(const object_t
& oid
, uint64_t *psize
, time_t *pmtime
)
1478 ::ObjectOperation rd
;
1479 prepare_assert_ops(&rd
);
1480 rd
.stat(psize
, &mtime
, nullptr);
1481 int r
= operate_read(oid
, &rd
, NULL
);
1483 if (r
>= 0 && pmtime
) {
1484 *pmtime
= real_clock::to_time_t(mtime
);
1490 int librados::IoCtxImpl::stat2(const object_t
& oid
, uint64_t *psize
, struct timespec
*pts
)
1493 ceph::real_time mtime
;
1498 ::ObjectOperation rd
;
1499 prepare_assert_ops(&rd
);
1500 rd
.stat(psize
, &mtime
, nullptr);
1501 int r
= operate_read(oid
, &rd
, NULL
);
1507 *pts
= ceph::real_clock::to_timespec(mtime
);
1513 int librados::IoCtxImpl::getxattr(const object_t
& oid
,
1514 const char *name
, bufferlist
& bl
)
1516 ::ObjectOperation rd
;
1517 prepare_assert_ops(&rd
);
1518 rd
.getxattr(name
, &bl
, NULL
);
1519 int r
= operate_read(oid
, &rd
, &bl
);
1526 int librados::IoCtxImpl::rmxattr(const object_t
& oid
, const char *name
)
1528 ::ObjectOperation op
;
1529 prepare_assert_ops(&op
);
1531 return operate(oid
, &op
, NULL
);
1534 int librados::IoCtxImpl::setxattr(const object_t
& oid
,
1535 const char *name
, bufferlist
& bl
)
1537 ::ObjectOperation op
;
1538 prepare_assert_ops(&op
);
1539 op
.setxattr(name
, bl
);
1540 return operate(oid
, &op
, NULL
);
1543 int librados::IoCtxImpl::getxattrs(const object_t
& oid
,
1544 map
<std::string
, bufferlist
>& attrset
)
1546 map
<string
, bufferlist
> aset
;
1548 ::ObjectOperation rd
;
1549 prepare_assert_ops(&rd
);
1550 rd
.getxattrs(&aset
, NULL
);
1551 int r
= operate_read(oid
, &rd
, NULL
);
1555 for (map
<string
,bufferlist
>::iterator p
= aset
.begin(); p
!= aset
.end(); ++p
) {
1556 ldout(client
->cct
, 10) << "IoCtxImpl::getxattrs: xattr=" << p
->first
<< dendl
;
1557 attrset
[p
->first
.c_str()] = p
->second
;
1564 void librados::IoCtxImpl::set_sync_op_version(version_t ver
)
1566 ANNOTATE_BENIGN_RACE_SIZED(&last_objver
, sizeof(last_objver
),
1567 "IoCtxImpl last_objver");
1571 namespace librados
{
1572 void intrusive_ptr_add_ref(IoCtxImpl
*p
) { p
->get(); }
1573 void intrusive_ptr_release(IoCtxImpl
*p
) { p
->put(); }
1577 boost::intrusive_ptr
<librados::IoCtxImpl
> ioctx
;
1579 librados::WatchCtx
*ctx
;
1580 librados::WatchCtx2
*ctx2
;
1582 WatchInfo(librados::IoCtxImpl
*io
, object_t o
,
1583 librados::WatchCtx
*c
, librados::WatchCtx2
*c2
)
1584 : ioctx(io
), oid(o
), ctx(c
), ctx2(c2
) {}
1586 void handle_notify(uint64_t notify_id
,
1588 uint64_t notifier_id
,
1590 ldout(ioctx
->client
->cct
, 10) << __func__
<< " " << notify_id
1591 << " cookie " << cookie
1592 << " notifier_id " << notifier_id
1593 << " len " << bl
.length()
1597 ctx2
->handle_notify(notify_id
, cookie
, notifier_id
, bl
);
1599 ctx
->notify(0, 0, bl
);
1601 // send ACK back to OSD if using legacy protocol
1603 ioctx
->notify_ack(oid
, notify_id
, cookie
, empty
);
1606 void handle_error(uint64_t cookie
, int err
) {
1607 ldout(ioctx
->client
->cct
, 10) << __func__
<< " cookie " << cookie
1611 ctx2
->handle_error(cookie
, err
);
1614 void operator()(bs::error_code ec
,
1617 uint64_t notifier_id
,
1620 handle_error(cookie
, ceph::from_error_code(ec
));
1622 handle_notify(notify_id
, cookie
, notifier_id
, bl
);
1627 // internal WatchInfo that owns the context memory
1628 struct InternalWatchInfo
: public WatchInfo
{
1629 std::unique_ptr
<librados::WatchCtx
> ctx
;
1630 std::unique_ptr
<librados::WatchCtx2
> ctx2
;
1632 InternalWatchInfo(librados::IoCtxImpl
*io
, object_t o
,
1633 librados::WatchCtx
*c
, librados::WatchCtx2
*c2
)
1634 : WatchInfo(io
, o
, c
, c2
), ctx(c
), ctx2(c2
) {}
1637 int librados::IoCtxImpl::watch(const object_t
& oid
, uint64_t *handle
,
1638 librados::WatchCtx
*ctx
,
1639 librados::WatchCtx2
*ctx2
,
1642 return watch(oid
, handle
, ctx
, ctx2
, 0, internal
);
1645 int librados::IoCtxImpl::watch(const object_t
& oid
, uint64_t *handle
,
1646 librados::WatchCtx
*ctx
,
1647 librados::WatchCtx2
*ctx2
,
1651 ::ObjectOperation wr
;
1653 C_SaferCond onfinish
;
1655 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
,
1657 *handle
= linger_op
->get_cookie();
1659 linger_op
->handle
= InternalWatchInfo(this, oid
, ctx
, ctx2
);
1661 linger_op
->handle
= WatchInfo(this, oid
, ctx
, ctx2
);
1663 prepare_assert_ops(&wr
);
1664 wr
.watch(*handle
, CEPH_OSD_WATCH_OP_WATCH
, timeout
);
1666 objecter
->linger_watch(linger_op
, wr
,
1667 snapc
, ceph::real_clock::now(), bl
,
1671 int r
= onfinish
.wait();
1673 set_sync_op_version(objver
);
1676 objecter
->linger_cancel(linger_op
);
1683 int librados::IoCtxImpl::aio_watch(const object_t
& oid
,
1684 AioCompletionImpl
*c
,
1686 librados::WatchCtx
*ctx
,
1687 librados::WatchCtx2
*ctx2
,
1689 return aio_watch(oid
, c
, handle
, ctx
, ctx2
, 0, internal
);
1692 int librados::IoCtxImpl::aio_watch(const object_t
& oid
,
1693 AioCompletionImpl
*c
,
1695 librados::WatchCtx
*ctx
,
1696 librados::WatchCtx2
*ctx2
,
1700 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
,
1703 Context
*oncomplete
= new C_aio_linger_Complete(c
, linger_op
, false);
1705 ::ObjectOperation wr
;
1706 *handle
= linger_op
->get_cookie();
1708 linger_op
->handle
= InternalWatchInfo(this, oid
, ctx
, ctx2
);
1710 linger_op
->handle
= WatchInfo(this, oid
, ctx
, ctx2
);
1713 prepare_assert_ops(&wr
);
1714 wr
.watch(*handle
, CEPH_OSD_WATCH_OP_WATCH
, timeout
);
1716 objecter
->linger_watch(linger_op
, wr
,
1717 snapc
, ceph::real_clock::now(), bl
,
1718 oncomplete
, &c
->objver
);
1724 int librados::IoCtxImpl::notify_ack(
1725 const object_t
& oid
,
1730 ::ObjectOperation rd
;
1731 prepare_assert_ops(&rd
);
1732 rd
.notify_ack(notify_id
, cookie
, bl
);
1733 objecter
->read(oid
, oloc
, rd
, snap_seq
, (bufferlist
*)NULL
, extra_op_flags
, 0, 0);
1737 int librados::IoCtxImpl::watch_check(uint64_t cookie
)
1739 auto linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1740 auto r
= objecter
->linger_check(linger_op
);
1742 return 1 + std::chrono::duration_cast
<
1743 std::chrono::milliseconds
>(*r
).count();
1745 return ceph::from_error_code(r
.error());
1748 int librados::IoCtxImpl::unwatch(uint64_t cookie
)
1750 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1751 C_SaferCond onfinish
;
1754 ::ObjectOperation wr
;
1755 prepare_assert_ops(&wr
);
1756 wr
.watch(cookie
, CEPH_OSD_WATCH_OP_UNWATCH
);
1757 objecter
->mutate(linger_op
->target
.base_oid
, oloc
, wr
,
1758 snapc
, ceph::real_clock::now(), extra_op_flags
,
1760 objecter
->linger_cancel(linger_op
);
1762 int r
= onfinish
.wait();
1763 set_sync_op_version(ver
);
1767 int librados::IoCtxImpl::aio_unwatch(uint64_t cookie
, AioCompletionImpl
*c
)
1770 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1771 Context
*oncomplete
= new C_aio_linger_Complete(c
, linger_op
, true);
1773 ::ObjectOperation wr
;
1774 prepare_assert_ops(&wr
);
1775 wr
.watch(cookie
, CEPH_OSD_WATCH_OP_UNWATCH
);
1776 objecter
->mutate(linger_op
->target
.base_oid
, oloc
, wr
,
1777 snapc
, ceph::real_clock::now(), extra_op_flags
,
1778 oncomplete
, &c
->objver
);
1782 int librados::IoCtxImpl::notify(const object_t
& oid
, bufferlist
& bl
,
1783 uint64_t timeout_ms
,
1784 bufferlist
*preply_bl
,
1785 char **preply_buf
, size_t *preply_buf_len
)
1787 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
,
1790 C_SaferCond notify_finish_cond
;
1791 linger_op
->on_notify_finish
=
1792 Objecter::LingerOp::OpComp::create(
1793 objecter
->service
.get_executor(),
1794 CB_notify_Finish(client
->cct
, ¬ify_finish_cond
,
1795 objecter
, linger_op
, preply_bl
,
1796 preply_buf
, preply_buf_len
));
1797 uint32_t timeout
= notify_timeout
;
1799 timeout
= timeout_ms
/ 1000;
1801 // Construct RADOS op
1802 ::ObjectOperation rd
;
1803 prepare_assert_ops(&rd
);
1805 rd
.notify(linger_op
->get_cookie(), 1, timeout
, bl
, &inbl
);
1810 objecter
->linger_notify(linger_op
,
1811 rd
, snap_seq
, inbl
, NULL
,
1814 ldout(client
->cct
, 10) << __func__
<< " issued linger op " << linger_op
<< dendl
;
1815 int r
= onack
.wait();
1816 ldout(client
->cct
, 10) << __func__
<< " linger op " << linger_op
1817 << " acked (" << r
<< ")" << dendl
;
1820 ldout(client
->cct
, 10) << __func__
<< " waiting for watch_notify finish "
1821 << linger_op
<< dendl
;
1822 r
= notify_finish_cond
.wait();
1825 ldout(client
->cct
, 10) << __func__
<< " failed to initiate notify, r = "
1827 notify_finish_cond
.wait();
1830 objecter
->linger_cancel(linger_op
);
1832 set_sync_op_version(objver
);
1836 int librados::IoCtxImpl::aio_notify(const object_t
& oid
, AioCompletionImpl
*c
,
1837 bufferlist
& bl
, uint64_t timeout_ms
,
1838 bufferlist
*preply_bl
, char **preply_buf
,
1839 size_t *preply_buf_len
)
1841 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
,
1846 C_aio_notify_Complete
*oncomplete
= new C_aio_notify_Complete(c
, linger_op
);
1847 linger_op
->on_notify_finish
=
1848 Objecter::LingerOp::OpComp::create(
1849 objecter
->service
.get_executor(),
1850 CB_notify_Finish(client
->cct
, oncomplete
,
1851 objecter
, linger_op
,
1852 preply_bl
, preply_buf
,
1854 Context
*onack
= new C_aio_notify_Ack(client
->cct
, oncomplete
);
1856 uint32_t timeout
= notify_timeout
;
1858 timeout
= timeout_ms
/ 1000;
1860 // Construct RADOS op
1861 ::ObjectOperation rd
;
1862 prepare_assert_ops(&rd
);
1864 rd
.notify(linger_op
->get_cookie(), 1, timeout
, bl
, &inbl
);
1867 objecter
->linger_notify(linger_op
,
1868 rd
, snap_seq
, inbl
, NULL
,
1873 int librados::IoCtxImpl::set_alloc_hint(const object_t
& oid
,
1874 uint64_t expected_object_size
,
1875 uint64_t expected_write_size
,
1878 ::ObjectOperation wr
;
1879 prepare_assert_ops(&wr
);
1880 wr
.set_alloc_hint(expected_object_size
, expected_write_size
, flags
);
1881 return operate(oid
, &wr
, NULL
);
1884 version_t
librados::IoCtxImpl::last_version()
1889 void librados::IoCtxImpl::set_assert_version(uint64_t ver
)
1894 void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout
)
1896 notify_timeout
= timeout
;
1899 int librados::IoCtxImpl::cache_pin(const object_t
& oid
)
1901 ::ObjectOperation wr
;
1902 prepare_assert_ops(&wr
);
1904 return operate(oid
, &wr
, NULL
);
1907 int librados::IoCtxImpl::cache_unpin(const object_t
& oid
)
1909 ::ObjectOperation wr
;
1910 prepare_assert_ops(&wr
);
1912 return operate(oid
, &wr
, NULL
);
1916 ///////////////////////////// C_aio_stat_Ack ////////////////////////////
1918 librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl
*_c
,
1922 ceph_assert(!c
->io
);
1926 void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r
)
1931 c
->cond
.notify_all();
1933 if (r
>= 0 && pmtime
) {
1934 *pmtime
= real_clock::to_time_t(mtime
);
1937 if (c
->callback_complete
) {
1938 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
1944 ///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1946 librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl
*_c
,
1947 struct timespec
*pt
)
1950 ceph_assert(!c
->io
);
1954 void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r
)
1959 c
->cond
.notify_all();
1961 if (r
>= 0 && pts
) {
1962 *pts
= real_clock::to_timespec(mtime
);
1965 if (c
->callback_complete
) {
1966 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
1972 //////////////////////////// C_aio_Complete ////////////////////////////////
1974 librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl
*_c
)
1980 void librados::IoCtxImpl::C_aio_Complete::finish(int r
)
1983 // Leave an existing rval unless r != 0
1985 c
->rval
= r
; // This clears the error set in C_ObjectOperation_scrub_ls::finish()
1987 c
->cond
.notify_all();
1989 if (r
== 0 && c
->blp
&& c
->blp
->length() > 0) {
1990 if (c
->out_buf
&& !c
->blp
->is_contiguous()) {
1993 if (c
->out_buf
&& !c
->blp
->is_provided_buffer(c
->out_buf
))
1994 c
->blp
->begin().copy(c
->blp
->length(), c
->out_buf
);
1996 c
->rval
= c
->blp
->length();
2000 if (c
->callback_complete
||
2002 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
2005 if (c
->aio_write_seq
) {
2006 c
->io
->complete_aio_write(c
);
2009 #if defined(WITH_EVENTTRACE)
2010 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_OP_COMPLETE");
2015 void librados::IoCtxImpl::object_list_slice(
2016 const hobject_t start
,
2017 const hobject_t finish
,
2020 hobject_t
*split_start
,
2021 hobject_t
*split_finish
)
2023 if (start
.is_max()) {
2024 *split_start
= hobject_t::get_max();
2025 *split_finish
= hobject_t::get_max();
2029 uint64_t start_hash
= hobject_t::_reverse_bits(start
.get_hash());
2030 uint64_t finish_hash
=
2031 finish
.is_max() ? 0x100000000 :
2032 hobject_t::_reverse_bits(finish
.get_hash());
2034 uint64_t diff
= finish_hash
- start_hash
;
2035 uint64_t rev_start
= start_hash
+ (diff
* n
/ m
);
2036 uint64_t rev_finish
= start_hash
+ (diff
* (n
+ 1) / m
);
2038 *split_start
= start
;
2040 *split_start
= hobject_t(
2041 object_t(), string(), CEPH_NOSNAP
,
2042 hobject_t::_reverse_bits(rev_start
), poolid
, string());
2046 *split_finish
= finish
;
2047 else if (rev_finish
>= 0x100000000)
2048 *split_finish
= hobject_t::get_max();
2050 *split_finish
= hobject_t(
2051 object_t(), string(), CEPH_NOSNAP
,
2052 hobject_t::_reverse_bits(rev_finish
), poolid
, string());
2055 int librados::IoCtxImpl::application_enable(const std::string
& app_name
,
2058 auto c
= new PoolAsyncCompletionImpl();
2059 application_enable_async(app_name
, force
, c
);
2062 ceph_assert(r
== 0);
2064 r
= c
->get_return_value();
2071 return client
->wait_for_latest_osdmap();
2074 void librados::IoCtxImpl::application_enable_async(const std::string
& app_name
,
2076 PoolAsyncCompletionImpl
*c
)
2078 // pre-Luminous clusters will return -EINVAL and application won't be
2079 // preserved until Luminous is configured as minimim version.
2080 if (!client
->get_required_monitor_features().contains_all(
2081 ceph::features::mon::FEATURE_LUMINOUS
)) {
2082 boost::asio::defer(client
->finish_strand
,
2083 [cb
= CB_PoolAsync_Safe(c
)]() mutable {
2089 std::stringstream cmd
;
2091 << "\"prefix\": \"osd pool application enable\","
2092 << "\"pool\": \"" << get_cached_pool_name() << "\","
2093 << "\"app\": \"" << app_name
<< "\"";
2095 cmd
<< ",\"yes_i_really_mean_it\": true";
2099 std::vector
<std::string
> cmds
;
2100 cmds
.push_back(cmd
.str());
2102 client
->mon_command_async(cmds
, inbl
, nullptr, nullptr,
2103 make_lambda_context(CB_PoolAsync_Safe(c
)));
2106 int librados::IoCtxImpl::application_list(std::set
<std::string
> *app_names
)
2110 objecter
->with_osdmap([&](const OSDMap
& o
) {
2111 auto pg_pool
= o
.get_pg_pool(poolid
);
2112 if (pg_pool
== nullptr) {
2117 for (auto &pair
: pg_pool
->application_metadata
) {
2118 app_names
->insert(pair
.first
);
2124 int librados::IoCtxImpl::application_metadata_get(const std::string
& app_name
,
2125 const std::string
&key
,
2129 objecter
->with_osdmap([&](const OSDMap
& o
) {
2130 auto pg_pool
= o
.get_pg_pool(poolid
);
2131 if (pg_pool
== nullptr) {
2136 auto app_it
= pg_pool
->application_metadata
.find(app_name
);
2137 if (app_it
== pg_pool
->application_metadata
.end()) {
2142 auto it
= app_it
->second
.find(key
);
2143 if (it
== app_it
->second
.end()) {
2148 *value
= it
->second
;
2153 int librados::IoCtxImpl::application_metadata_set(const std::string
& app_name
,
2154 const std::string
&key
,
2155 const std::string
& value
)
2157 std::stringstream cmd
;
2159 << "\"prefix\":\"osd pool application set\","
2160 << "\"pool\":\"" << get_cached_pool_name() << "\","
2161 << "\"app\":\"" << app_name
<< "\","
2162 << "\"key\":\"" << key
<< "\","
2163 << "\"value\":\"" << value
<< "\""
2166 std::vector
<std::string
> cmds
;
2167 cmds
.push_back(cmd
.str());
2169 int r
= client
->mon_command(cmds
, inbl
, nullptr, nullptr);
2174 // ensure we have the latest osd map epoch before proceeding
2175 return client
->wait_for_latest_osdmap();
2178 int librados::IoCtxImpl::application_metadata_remove(const std::string
& app_name
,
2179 const std::string
&key
)
2181 std::stringstream cmd
;
2183 << "\"prefix\":\"osd pool application rm\","
2184 << "\"pool\":\"" << get_cached_pool_name() << "\","
2185 << "\"app\":\"" << app_name
<< "\","
2186 << "\"key\":\"" << key
<< "\""
2189 std::vector
<std::string
> cmds
;
2190 cmds
.push_back(cmd
.str());
2192 int r
= client
->mon_command(cmds
, inbl
, nullptr, nullptr);
2197 // ensure we have the latest osd map epoch before proceeding
2198 return client
->wait_for_latest_osdmap();
2201 int librados::IoCtxImpl::application_metadata_list(const std::string
& app_name
,
2202 std::map
<std::string
, std::string
> *values
)
2206 objecter
->with_osdmap([&](const OSDMap
& o
) {
2207 auto pg_pool
= o
.get_pg_pool(poolid
);
2208 if (pg_pool
== nullptr) {
2213 auto it
= pg_pool
->application_metadata
.find(app_name
);
2214 if (it
== pg_pool
->application_metadata
.end()) {
2219 *values
= it
->second
;