1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2012 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include "IoCtxImpl.h"
19 #include "librados/librados_c.h"
20 #include "librados/AioCompletionImpl.h"
21 #include "librados/PoolAsyncCompletionImpl.h"
22 #include "librados/RadosClient.h"
23 #include "include/ceph_assert.h"
24 #include "common/valgrind.h"
25 #include "common/EventTrace.h"
27 #define dout_subsys ceph_subsys_rados
29 #define dout_prefix *_dout << "librados: "
31 namespace bs
= boost::system
;
32 namespace ca
= ceph::async
;
33 namespace cb
= ceph::buffer
;
38 struct CB_notify_Finish
{
42 Objecter::LingerOp
*linger_op
;
43 bufferlist
*preply_bl
;
45 size_t *preply_buf_len
;
47 CB_notify_Finish(CephContext
*_cct
, Context
*_ctx
, Objecter
*_objecter
,
48 Objecter::LingerOp
*_linger_op
, bufferlist
*_preply_bl
,
49 char **_preply_buf
, size_t *_preply_buf_len
)
50 : cct(_cct
), ctx(_ctx
), objecter(_objecter
), linger_op(_linger_op
),
51 preply_bl(_preply_bl
), preply_buf(_preply_buf
),
52 preply_buf_len(_preply_buf_len
) {}
56 CB_notify_Finish(const CB_notify_Finish
&) = delete;
57 CB_notify_Finish
& operator =(const CB_notify_Finish
&) = delete;
58 CB_notify_Finish(CB_notify_Finish
&&) = default;
59 CB_notify_Finish
& operator =(CB_notify_Finish
&&) = default;
61 void operator()(bs::error_code ec
, bufferlist
&& reply_bl
) {
62 ldout(cct
, 10) << __func__
<< " completed notify (linger op "
63 << linger_op
<< "), ec = " << ec
<< dendl
;
65 // pass result back to user
66 // NOTE: we do this regardless of what error code we return
68 if (reply_bl
.length()) {
69 *preply_buf
= (char*)malloc(reply_bl
.length());
70 memcpy(*preply_buf
, reply_bl
.c_str(), reply_bl
.length());
76 *preply_buf_len
= reply_bl
.length();
78 *preply_bl
= std::move(reply_bl
);
80 ctx
->complete(ceph::from_error_code(ec
));
84 struct CB_aio_linger_cancel
{
86 Objecter::LingerOp
*linger_op
;
88 CB_aio_linger_cancel(Objecter
*_objecter
, Objecter::LingerOp
*_linger_op
)
89 : objecter(_objecter
), linger_op(_linger_op
)
94 objecter
->linger_cancel(linger_op
);
98 struct C_aio_linger_Complete
: public Context
{
100 Objecter::LingerOp
*linger_op
;
103 C_aio_linger_Complete(AioCompletionImpl
*_c
, Objecter::LingerOp
*_linger_op
, bool _cancel
)
104 : c(_c
), linger_op(_linger_op
), cancel(_cancel
)
109 void finish(int r
) override
{
111 boost::asio::defer(c
->io
->client
->finish_strand
,
112 CB_aio_linger_cancel(c
->io
->objecter
,
118 c
->cond
.notify_all();
120 if (c
->callback_complete
||
122 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
128 struct C_aio_notify_Complete
: public C_aio_linger_Complete
{
129 ceph::mutex lock
= ceph::make_mutex("C_aio_notify_Complete::lock");
131 bool finished
= false;
134 C_aio_notify_Complete(AioCompletionImpl
*_c
, Objecter::LingerOp
*_linger_op
)
135 : C_aio_linger_Complete(_c
, _linger_op
, false) {
138 void handle_ack(int r
) {
139 // invoked by C_aio_notify_Ack
145 void complete(int r
) override
{
146 // invoked by C_notify_Finish
152 void complete_unlock(int r
) {
153 if (ret_val
== 0 && r
< 0) {
157 if (acked
&& finished
) {
160 C_aio_linger_Complete::complete(ret_val
);
167 struct C_aio_notify_Ack
: public Context
{
169 C_aio_notify_Complete
*oncomplete
;
171 C_aio_notify_Ack(CephContext
*_cct
,
172 C_aio_notify_Complete
*_oncomplete
)
173 : cct(_cct
), oncomplete(_oncomplete
)
177 void finish(int r
) override
179 ldout(cct
, 10) << __func__
<< " linger op " << oncomplete
->linger_op
<< " "
180 << "acked (" << r
<< ")" << dendl
;
181 oncomplete
->handle_ack(r
);
185 struct C_aio_selfmanaged_snap_op_Complete
: public Context
{
186 librados::RadosClient
*client
;
187 librados::AioCompletionImpl
*c
;
189 C_aio_selfmanaged_snap_op_Complete(librados::RadosClient
*client
,
190 librados::AioCompletionImpl
*c
)
191 : client(client
), c(c
) {
195 void finish(int r
) override
{
199 c
->cond
.notify_all();
201 if (c
->callback_complete
|| c
->callback_safe
) {
202 boost::asio::defer(client
->finish_strand
, librados::CB_AioComplete(c
));
208 struct C_aio_selfmanaged_snap_create_Complete
: public C_aio_selfmanaged_snap_op_Complete
{
210 uint64_t *dest_snapid
;
212 C_aio_selfmanaged_snap_create_Complete(librados::RadosClient
*client
,
213 librados::AioCompletionImpl
*c
,
214 uint64_t *dest_snapid
)
215 : C_aio_selfmanaged_snap_op_Complete(client
, c
),
216 dest_snapid(dest_snapid
) {
219 void finish(int r
) override
{
221 *dest_snapid
= snapid
;
223 C_aio_selfmanaged_snap_op_Complete::finish(r
);
227 } // anonymous namespace
228 } // namespace librados
230 librados::IoCtxImpl::IoCtxImpl() = default;
232 librados::IoCtxImpl::IoCtxImpl(RadosClient
*c
, Objecter
*objecter
,
233 int64_t poolid
, snapid_t s
)
234 : client(c
), poolid(poolid
), snap_seq(s
),
235 notify_timeout(c
->cct
->_conf
->client_notify_timeout
),
237 aio_write_seq(0), objecter(objecter
)
241 void librados::IoCtxImpl::set_snap_read(snapid_t s
)
245 ldout(client
->cct
, 10) << "set snap read " << snap_seq
<< " -> " << s
<< dendl
;
249 int librados::IoCtxImpl::set_snap_write_context(snapid_t seq
, vector
<snapid_t
>& snaps
)
252 ldout(client
->cct
, 10) << "set snap write context: seq = " << seq
253 << " and snaps = " << snaps
<< dendl
;
262 int librados::IoCtxImpl::get_object_hash_position(
263 const std::string
& oid
, uint32_t *hash_position
)
265 int64_t r
= objecter
->get_object_hash_position(poolid
, oid
, oloc
.nspace
);
268 *hash_position
= (uint32_t)r
;
272 int librados::IoCtxImpl::get_object_pg_hash_position(
273 const std::string
& oid
, uint32_t *pg_hash_position
)
275 int64_t r
= objecter
->get_object_pg_hash_position(poolid
, oid
, oloc
.nspace
);
278 *pg_hash_position
= (uint32_t)r
;
282 void librados::IoCtxImpl::queue_aio_write(AioCompletionImpl
*c
)
285 std::scoped_lock l
{aio_write_list_lock
};
286 ceph_assert(c
->io
== this);
287 c
->aio_write_seq
= ++aio_write_seq
;
288 ldout(client
->cct
, 20) << "queue_aio_write " << this << " completion " << c
289 << " write_seq " << aio_write_seq
<< dendl
;
290 aio_write_list
.push_back(&c
->aio_write_list_item
);
293 void librados::IoCtxImpl::complete_aio_write(AioCompletionImpl
*c
)
295 ldout(client
->cct
, 20) << "complete_aio_write " << c
<< dendl
;
296 aio_write_list_lock
.lock();
297 ceph_assert(c
->io
== this);
298 c
->aio_write_list_item
.remove_myself();
300 map
<ceph_tid_t
, std::list
<AioCompletionImpl
*> >::iterator waiters
= aio_write_waiters
.begin();
301 while (waiters
!= aio_write_waiters
.end()) {
302 if (!aio_write_list
.empty() &&
303 aio_write_list
.front()->aio_write_seq
<= waiters
->first
) {
304 ldout(client
->cct
, 20) << " next outstanding write is " << aio_write_list
.front()->aio_write_seq
305 << " <= waiter " << waiters
->first
306 << ", stopping" << dendl
;
309 ldout(client
->cct
, 20) << " waking waiters on seq " << waiters
->first
<< dendl
;
310 for (std::list
<AioCompletionImpl
*>::iterator it
= waiters
->second
.begin();
311 it
!= waiters
->second
.end(); ++it
) {
312 boost::asio::defer(client
->finish_strand
, CB_AioCompleteAndSafe(*it
));
315 aio_write_waiters
.erase(waiters
++);
318 aio_write_cond
.notify_all();
319 aio_write_list_lock
.unlock();
323 void librados::IoCtxImpl::flush_aio_writes_async(AioCompletionImpl
*c
)
325 ldout(client
->cct
, 20) << "flush_aio_writes_async " << this
326 << " completion " << c
<< dendl
;
327 std::lock_guard
l(aio_write_list_lock
);
328 ceph_tid_t seq
= aio_write_seq
;
329 if (aio_write_list
.empty()) {
330 ldout(client
->cct
, 20) << "flush_aio_writes_async no writes. (tid "
331 << seq
<< ")" << dendl
;
332 boost::asio::defer(client
->finish_strand
, CB_AioCompleteAndSafe(c
));
334 ldout(client
->cct
, 20) << "flush_aio_writes_async " << aio_write_list
.size()
335 << " writes in flight; waiting on tid " << seq
<< dendl
;
337 aio_write_waiters
[seq
].push_back(c
);
341 void librados::IoCtxImpl::flush_aio_writes()
343 ldout(client
->cct
, 20) << "flush_aio_writes" << dendl
;
344 std::unique_lock l
{aio_write_list_lock
};
345 aio_write_cond
.wait(l
, [seq
=aio_write_seq
, this] {
346 return (aio_write_list
.empty() ||
347 aio_write_list
.front()->aio_write_seq
> seq
);
351 string
librados::IoCtxImpl::get_cached_pool_name()
354 client
->pool_get_name(get_id(), &pn
);
360 int librados::IoCtxImpl::snap_create(const char *snapName
)
363 string
sName(snapName
);
365 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::snap_create::mylock");
366 ceph::condition_variable cond
;
368 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
369 objecter
->create_pool_snap(poolid
, sName
, onfinish
);
371 std::unique_lock l
{mylock
};
372 cond
.wait(l
, [&done
] { return done
; });
376 int librados::IoCtxImpl::selfmanaged_snap_create(uint64_t *psnapid
)
380 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::selfmanaged_snap_create::mylock");
381 ceph::condition_variable cond
;
383 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
385 objecter
->allocate_selfmanaged_snap(poolid
, &snapid
, onfinish
);
388 std::unique_lock l
{mylock
};
389 cond
.wait(l
, [&done
] { return done
; });
396 void librados::IoCtxImpl::aio_selfmanaged_snap_create(uint64_t *snapid
,
397 AioCompletionImpl
*c
)
399 C_aio_selfmanaged_snap_create_Complete
*onfinish
=
400 new C_aio_selfmanaged_snap_create_Complete(client
, c
, snapid
);
401 objecter
->allocate_selfmanaged_snap(poolid
, &onfinish
->snapid
,
405 int librados::IoCtxImpl::snap_remove(const char *snapName
)
408 string
sName(snapName
);
410 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::snap_remove::mylock");
411 ceph::condition_variable cond
;
413 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
414 objecter
->delete_pool_snap(poolid
, sName
, onfinish
);
415 unique_lock l
{mylock
};
416 cond
.wait(l
, [&done
] { return done
; });
420 int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t
& oid
,
421 ::SnapContext
& snapc
,
426 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::snap_rollback::mylock");
427 ceph::condition_variable cond
;
429 Context
*onack
= new C_SafeCond(mylock
, cond
, &done
, &reply
);
431 ::ObjectOperation op
;
432 prepare_assert_ops(&op
);
434 objecter
->mutate(oid
, oloc
,
435 op
, snapc
, ceph::real_clock::now(),
439 std::unique_lock l
{mylock
};
440 cond
.wait(l
, [&done
] { return done
; });
444 int librados::IoCtxImpl::rollback(const object_t
& oid
, const char *snapName
)
448 int r
= objecter
->pool_snap_by_name(poolid
, snapName
, &snap
);
453 return selfmanaged_snap_rollback_object(oid
, snapc
, snap
);
456 int librados::IoCtxImpl::selfmanaged_snap_remove(uint64_t snapid
)
460 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::selfmanaged_snap_remove::mylock");
461 ceph::condition_variable cond
;
463 objecter
->delete_selfmanaged_snap(poolid
, snapid_t(snapid
),
464 new C_SafeCond(mylock
, cond
, &done
, &reply
));
466 std::unique_lock l
{mylock
};
467 cond
.wait(l
, [&done
] { return done
; });
471 void librados::IoCtxImpl::aio_selfmanaged_snap_remove(uint64_t snapid
,
472 AioCompletionImpl
*c
)
474 Context
*onfinish
= new C_aio_selfmanaged_snap_op_Complete(client
, c
);
475 objecter
->delete_selfmanaged_snap(poolid
, snapid
, onfinish
);
478 int librados::IoCtxImpl::snap_list(vector
<uint64_t> *snaps
)
480 return objecter
->pool_snap_list(poolid
, snaps
);
483 int librados::IoCtxImpl::snap_lookup(const char *name
, uint64_t *snapid
)
485 return objecter
->pool_snap_by_name(poolid
, name
, (snapid_t
*)snapid
);
488 int librados::IoCtxImpl::snap_get_name(uint64_t snapid
, std::string
*s
)
490 pool_snap_info_t info
;
491 int ret
= objecter
->pool_snap_get_info(poolid
, snapid
, &info
);
495 *s
= info
.name
.c_str();
499 int librados::IoCtxImpl::snap_get_stamp(uint64_t snapid
, time_t *t
)
501 pool_snap_info_t info
;
502 int ret
= objecter
->pool_snap_get_info(poolid
, snapid
, &info
);
506 *t
= info
.stamp
.sec();
513 int librados::IoCtxImpl::nlist(Objecter::NListContext
*context
, int max_entries
)
517 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::nlist::mylock");
518 ceph::condition_variable cond
;
520 if (context
->at_end())
523 context
->max_entries
= max_entries
;
524 context
->nspace
= oloc
.nspace
;
526 objecter
->list_nobjects(context
, new C_SafeCond(mylock
, cond
, &done
, &r
));
528 std::unique_lock l
{mylock
};
529 cond
.wait(l
, [&done
] { return done
; });
533 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext
*context
,
536 context
->list
.clear();
537 return objecter
->list_nobjects_seek(context
, pos
);
540 uint32_t librados::IoCtxImpl::nlist_seek(Objecter::NListContext
*context
,
541 const rados_object_list_cursor
& cursor
)
543 context
->list
.clear();
544 return objecter
->list_nobjects_seek(context
, *(const hobject_t
*)cursor
);
547 rados_object_list_cursor
librados::IoCtxImpl::nlist_get_cursor(Objecter::NListContext
*context
)
549 hobject_t
*c
= new hobject_t
;
551 objecter
->list_nobjects_get_cursor(context
, c
);
552 return (rados_object_list_cursor
)c
;
555 int librados::IoCtxImpl::create(const object_t
& oid
, bool exclusive
)
557 ::ObjectOperation op
;
558 prepare_assert_ops(&op
);
559 op
.create(exclusive
);
560 return operate(oid
, &op
, NULL
);
564 * add any version assert operations that are appropriate given the
565 * stat in the IoCtx, either the target version assert or any src
566 * object asserts. these affect a single ioctx operation, so clear
567 * the ioctx state when we're doing.
569 * return a pointer to the ObjectOperation if we added any events;
570 * this is convenient for passing the extra_ops argument into Objecter
573 ::ObjectOperation
*librados::IoCtxImpl::prepare_assert_ops(::ObjectOperation
*op
)
575 ::ObjectOperation
*pop
= NULL
;
577 op
->assert_version(assert_ver
);
584 int librados::IoCtxImpl::write(const object_t
& oid
, bufferlist
& bl
,
585 size_t len
, uint64_t off
)
587 if (len
> UINT_MAX
/2)
589 ::ObjectOperation op
;
590 prepare_assert_ops(&op
);
592 mybl
.substr_of(bl
, 0, len
);
594 return operate(oid
, &op
, NULL
);
597 int librados::IoCtxImpl::append(const object_t
& oid
, bufferlist
& bl
, size_t len
)
599 if (len
> UINT_MAX
/2)
601 ::ObjectOperation op
;
602 prepare_assert_ops(&op
);
604 mybl
.substr_of(bl
, 0, len
);
606 return operate(oid
, &op
, NULL
);
609 int librados::IoCtxImpl::write_full(const object_t
& oid
, bufferlist
& bl
)
611 if (bl
.length() > UINT_MAX
/2)
613 ::ObjectOperation op
;
614 prepare_assert_ops(&op
);
616 return operate(oid
, &op
, NULL
);
619 int librados::IoCtxImpl::writesame(const object_t
& oid
, bufferlist
& bl
,
620 size_t write_len
, uint64_t off
)
622 if ((bl
.length() > UINT_MAX
/2) || (write_len
> UINT_MAX
/2))
624 if ((bl
.length() == 0) || (write_len
% bl
.length()))
626 ::ObjectOperation op
;
627 prepare_assert_ops(&op
);
629 mybl
.substr_of(bl
, 0, bl
.length());
630 op
.writesame(off
, write_len
, mybl
);
631 return operate(oid
, &op
, NULL
);
634 int librados::IoCtxImpl::operate(const object_t
& oid
, ::ObjectOperation
*o
,
635 ceph::real_time
*pmtime
, int flags
)
637 ceph::real_time ut
= (pmtime
? *pmtime
:
638 ceph::real_clock::now());
640 /* can't write to a snapshot */
641 if (snap_seq
!= CEPH_NOSNAP
)
647 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::operate::mylock");
648 ceph::condition_variable cond
;
653 Context
*oncommit
= new C_SafeCond(mylock
, cond
, &done
, &r
);
655 int op
= o
->ops
[0].op
.op
;
656 ldout(client
->cct
, 10) << ceph_osd_op_name(op
) << " oid=" << oid
657 << " nspace=" << oloc
.nspace
<< dendl
;
658 Objecter::Op
*objecter_op
= objecter
->prepare_mutate_op(
661 flags
| extra_op_flags
,
663 objecter
->op_submit(objecter_op
);
666 std::unique_lock l
{mylock
};
667 cond
.wait(l
, [&done
] { return done
;});
669 ldout(client
->cct
, 10) << "Objecter returned from "
670 << ceph_osd_op_name(op
) << " r=" << r
<< dendl
;
672 set_sync_op_version(ver
);
677 int librados::IoCtxImpl::operate_read(const object_t
& oid
,
678 ::ObjectOperation
*o
,
685 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::operate_read::mylock");
686 ceph::condition_variable cond
;
691 Context
*onack
= new C_SafeCond(mylock
, cond
, &done
, &r
);
693 int op
= o
->ops
[0].op
.op
;
694 ldout(client
->cct
, 10) << ceph_osd_op_name(op
) << " oid=" << oid
<< " nspace=" << oloc
.nspace
<< dendl
;
695 Objecter::Op
*objecter_op
= objecter
->prepare_read_op(
698 flags
| extra_op_flags
,
700 objecter
->op_submit(objecter_op
);
703 std::unique_lock l
{mylock
};
704 cond
.wait(l
, [&done
] { return done
; });
706 ldout(client
->cct
, 10) << "Objecter returned from "
707 << ceph_osd_op_name(op
) << " r=" << r
<< dendl
;
709 set_sync_op_version(ver
);
714 int librados::IoCtxImpl::aio_operate_read(const object_t
&oid
,
715 ::ObjectOperation
*o
,
716 AioCompletionImpl
*c
,
719 const blkin_trace_info
*trace_info
)
721 FUNCTRACE(client
->cct
);
722 Context
*oncomplete
= new C_aio_Complete(c
);
724 #if defined(WITH_EVENTTRACE)
725 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
730 ZTracer::Trace trace
;
732 ZTracer::Trace
parent_trace("", nullptr, trace_info
);
733 trace
.init("rados operate read", &objecter
->trace_endpoint
, &parent_trace
);
736 trace
.event("init root span");
737 Objecter::Op
*objecter_op
= objecter
->prepare_read_op(
739 *o
, snap_seq
, pbl
, flags
| extra_op_flags
,
740 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
741 objecter
->op_submit(objecter_op
, &c
->tid
);
742 trace
.event("rados operate read submitted");
747 int librados::IoCtxImpl::aio_operate(const object_t
& oid
,
748 ::ObjectOperation
*o
, AioCompletionImpl
*c
,
749 const SnapContext
& snap_context
, int flags
,
750 const blkin_trace_info
*trace_info
)
752 FUNCTRACE(client
->cct
);
753 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_WRITE_OP_BEGIN");
754 auto ut
= ceph::real_clock::now();
755 /* can't write to a snapshot */
756 if (snap_seq
!= CEPH_NOSNAP
)
759 Context
*oncomplete
= new C_aio_Complete(c
);
760 #if defined(WITH_EVENTTRACE)
761 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
767 ZTracer::Trace trace
;
769 ZTracer::Trace
parent_trace("", nullptr, trace_info
);
770 trace
.init("rados operate", &objecter
->trace_endpoint
, &parent_trace
);
773 trace
.event("init root span");
774 Objecter::Op
*op
= objecter
->prepare_mutate_op(
775 oid
, oloc
, *o
, snap_context
, ut
, flags
| extra_op_flags
,
776 oncomplete
, &c
->objver
, osd_reqid_t(), &trace
);
777 objecter
->op_submit(op
, &c
->tid
);
778 trace
.event("rados operate op submitted");
783 int librados::IoCtxImpl::aio_read(const object_t oid
, AioCompletionImpl
*c
,
784 bufferlist
*pbl
, size_t len
, uint64_t off
,
785 uint64_t snapid
, const blkin_trace_info
*info
)
787 FUNCTRACE(client
->cct
);
788 if (len
> (size_t) INT_MAX
)
791 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
792 Context
*oncomplete
= new C_aio_Complete(c
);
794 #if defined(WITH_EVENTTRACE)
795 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
801 ZTracer::Trace trace
;
803 trace
.init("rados read", &objecter
->trace_endpoint
, info
);
805 Objecter::Op
*o
= objecter
->prepare_read_op(
807 off
, len
, snapid
, pbl
, extra_op_flags
,
808 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
809 objecter
->op_submit(o
, &c
->tid
);
813 int librados::IoCtxImpl::aio_read(const object_t oid
, AioCompletionImpl
*c
,
814 char *buf
, size_t len
, uint64_t off
,
815 uint64_t snapid
, const blkin_trace_info
*info
)
817 FUNCTRACE(client
->cct
);
818 if (len
> (size_t) INT_MAX
)
821 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
822 Context
*oncomplete
= new C_aio_Complete(c
);
824 #if defined(WITH_EVENTTRACE)
825 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
830 c
->bl
.push_back(buffer::create_static(len
, buf
));
834 ZTracer::Trace trace
;
836 trace
.init("rados read", &objecter
->trace_endpoint
, info
);
838 Objecter::Op
*o
= objecter
->prepare_read_op(
840 off
, len
, snapid
, &c
->bl
, extra_op_flags
,
841 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
842 objecter
->op_submit(o
, &c
->tid
);
846 class C_ObjectOperation
: public Context
{
848 ::ObjectOperation m_ops
;
849 explicit C_ObjectOperation(Context
*c
) : m_ctx(c
) {}
850 void finish(int r
) override
{
857 int librados::IoCtxImpl::aio_sparse_read(const object_t oid
,
858 AioCompletionImpl
*c
,
859 std::map
<uint64_t,uint64_t> *m
,
860 bufferlist
*data_bl
, size_t len
,
861 uint64_t off
, uint64_t snapid
)
863 FUNCTRACE(client
->cct
);
864 if (len
> (size_t) INT_MAX
)
867 Context
*nested
= new C_aio_Complete(c
);
868 C_ObjectOperation
*onack
= new C_ObjectOperation(nested
);
870 #if defined(WITH_EVENTTRACE)
871 ((C_aio_Complete
*) nested
)->oid
= oid
;
876 onack
->m_ops
.sparse_read(off
, len
, m
, data_bl
, NULL
);
878 Objecter::Op
*o
= objecter
->prepare_read_op(
880 onack
->m_ops
, snapid
, NULL
, extra_op_flags
,
882 objecter
->op_submit(o
, &c
->tid
);
886 int librados::IoCtxImpl::aio_cmpext(const object_t
& oid
,
887 AioCompletionImpl
*c
,
891 if (cmp_bl
.length() > UINT_MAX
/2)
894 Context
*onack
= new C_aio_Complete(c
);
899 Objecter::Op
*o
= objecter
->prepare_cmpext_op(
900 oid
, oloc
, off
, cmp_bl
, snap_seq
, extra_op_flags
,
902 objecter
->op_submit(o
, &c
->tid
);
907 /* use m_ops.cmpext() + prepare_read_op() for non-bufferlist C API */
908 int librados::IoCtxImpl::aio_cmpext(const object_t
& oid
,
909 AioCompletionImpl
*c
,
914 if (cmp_len
> UINT_MAX
/2)
918 cmp_bl
.append(cmp_buf
, cmp_len
);
920 Context
*nested
= new C_aio_Complete(c
);
921 C_ObjectOperation
*onack
= new C_ObjectOperation(nested
);
926 onack
->m_ops
.cmpext(off
, cmp_len
, cmp_buf
, NULL
);
928 Objecter::Op
*o
= objecter
->prepare_read_op(
929 oid
, oloc
, onack
->m_ops
, snap_seq
, NULL
, extra_op_flags
, onack
, &c
->objver
);
930 objecter
->op_submit(o
, &c
->tid
);
934 int librados::IoCtxImpl::aio_write(const object_t
&oid
, AioCompletionImpl
*c
,
935 const bufferlist
& bl
, size_t len
,
936 uint64_t off
, const blkin_trace_info
*info
)
938 FUNCTRACE(client
->cct
);
939 auto ut
= ceph::real_clock::now();
940 ldout(client
->cct
, 20) << "aio_write " << oid
<< " " << off
<< "~" << len
<< " snapc=" << snapc
<< " snap_seq=" << snap_seq
<< dendl
;
941 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_WRITE_OP_BEGIN");
943 if (len
> UINT_MAX
/2)
945 /* can't write to a snapshot */
946 if (snap_seq
!= CEPH_NOSNAP
)
949 Context
*oncomplete
= new C_aio_Complete(c
);
951 #if defined(WITH_EVENTTRACE)
952 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
954 ZTracer::Trace trace
;
956 trace
.init("rados write", &objecter
->trace_endpoint
, info
);
961 Objecter::Op
*o
= objecter
->prepare_write_op(
963 off
, len
, snapc
, bl
, ut
, extra_op_flags
,
964 oncomplete
, &c
->objver
, nullptr, 0, &trace
);
965 objecter
->op_submit(o
, &c
->tid
);
970 int librados::IoCtxImpl::aio_append(const object_t
&oid
, AioCompletionImpl
*c
,
971 const bufferlist
& bl
, size_t len
)
973 FUNCTRACE(client
->cct
);
974 auto ut
= ceph::real_clock::now();
976 if (len
> UINT_MAX
/2)
978 /* can't write to a snapshot */
979 if (snap_seq
!= CEPH_NOSNAP
)
982 Context
*oncomplete
= new C_aio_Complete(c
);
983 #if defined(WITH_EVENTTRACE)
984 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
990 Objecter::Op
*o
= objecter
->prepare_append_op(
992 len
, snapc
, bl
, ut
, extra_op_flags
,
993 oncomplete
, &c
->objver
);
994 objecter
->op_submit(o
, &c
->tid
);
999 int librados::IoCtxImpl::aio_write_full(const object_t
&oid
,
1000 AioCompletionImpl
*c
,
1001 const bufferlist
& bl
)
1003 FUNCTRACE(client
->cct
);
1004 auto ut
= ceph::real_clock::now();
1006 if (bl
.length() > UINT_MAX
/2)
1008 /* can't write to a snapshot */
1009 if (snap_seq
!= CEPH_NOSNAP
)
1012 Context
*oncomplete
= new C_aio_Complete(c
);
1013 #if defined(WITH_EVENTTRACE)
1014 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1020 Objecter::Op
*o
= objecter
->prepare_write_full_op(
1022 snapc
, bl
, ut
, extra_op_flags
,
1023 oncomplete
, &c
->objver
);
1024 objecter
->op_submit(o
, &c
->tid
);
1029 int librados::IoCtxImpl::aio_writesame(const object_t
&oid
,
1030 AioCompletionImpl
*c
,
1031 const bufferlist
& bl
,
1035 FUNCTRACE(client
->cct
);
1036 auto ut
= ceph::real_clock::now();
1038 if ((bl
.length() > UINT_MAX
/2) || (write_len
> UINT_MAX
/2))
1040 if ((bl
.length() == 0) || (write_len
% bl
.length()))
1042 /* can't write to a snapshot */
1043 if (snap_seq
!= CEPH_NOSNAP
)
1046 Context
*oncomplete
= new C_aio_Complete(c
);
1048 #if defined(WITH_EVENTTRACE)
1049 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1054 Objecter::Op
*o
= objecter
->prepare_writesame_op(
1057 snapc
, bl
, ut
, extra_op_flags
,
1058 oncomplete
, &c
->objver
);
1059 objecter
->op_submit(o
, &c
->tid
);
1064 int librados::IoCtxImpl::aio_remove(const object_t
&oid
, AioCompletionImpl
*c
, int flags
)
1066 FUNCTRACE(client
->cct
);
1067 auto ut
= ceph::real_clock::now();
1069 /* can't write to a snapshot */
1070 if (snap_seq
!= CEPH_NOSNAP
)
1073 Context
*oncomplete
= new C_aio_Complete(c
);
1075 #if defined(WITH_EVENTTRACE)
1076 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1081 Objecter::Op
*o
= objecter
->prepare_remove_op(
1083 snapc
, ut
, flags
| extra_op_flags
,
1084 oncomplete
, &c
->objver
);
1085 objecter
->op_submit(o
, &c
->tid
);
1091 int librados::IoCtxImpl::aio_stat(const object_t
& oid
, AioCompletionImpl
*c
,
1092 uint64_t *psize
, time_t *pmtime
)
1094 C_aio_stat_Ack
*onack
= new C_aio_stat_Ack(c
, pmtime
);
1097 Objecter::Op
*o
= objecter
->prepare_stat_op(
1099 snap_seq
, psize
, &onack
->mtime
, extra_op_flags
,
1101 objecter
->op_submit(o
, &c
->tid
);
1105 int librados::IoCtxImpl::aio_stat2(const object_t
& oid
, AioCompletionImpl
*c
,
1106 uint64_t *psize
, struct timespec
*pts
)
1108 C_aio_stat2_Ack
*onack
= new C_aio_stat2_Ack(c
, pts
);
1111 Objecter::Op
*o
= objecter
->prepare_stat_op(
1113 snap_seq
, psize
, &onack
->mtime
, extra_op_flags
,
1115 objecter
->op_submit(o
, &c
->tid
);
1119 int librados::IoCtxImpl::aio_getxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1120 const char *name
, bufferlist
& bl
)
1122 ::ObjectOperation rd
;
1123 prepare_assert_ops(&rd
);
1124 rd
.getxattr(name
, &bl
, NULL
);
1125 int r
= aio_operate_read(oid
, &rd
, c
, 0, &bl
);
1129 int librados::IoCtxImpl::aio_rmxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1132 ::ObjectOperation op
;
1133 prepare_assert_ops(&op
);
1135 return aio_operate(oid
, &op
, c
, snapc
, 0);
1138 int librados::IoCtxImpl::aio_setxattr(const object_t
& oid
, AioCompletionImpl
*c
,
1139 const char *name
, bufferlist
& bl
)
1141 ::ObjectOperation op
;
1142 prepare_assert_ops(&op
);
1143 op
.setxattr(name
, bl
);
1144 return aio_operate(oid
, &op
, c
, snapc
, 0);
1148 struct AioGetxattrsData
{
1149 AioGetxattrsData(librados::AioCompletionImpl
*c
, map
<string
, bufferlist
>* attrset
,
1150 librados::RadosClient
*_client
) :
1151 user_completion(c
), user_attrset(attrset
), client(_client
) {}
1152 struct librados::CB_AioCompleteAndSafe user_completion
;
1153 map
<string
, bufferlist
> result_attrset
;
1154 map
<std::string
, bufferlist
>* user_attrset
;
1155 librados::RadosClient
*client
;
1159 static void aio_getxattrs_complete(rados_completion_t c
, void *arg
) {
1160 AioGetxattrsData
*cdata
= reinterpret_cast<AioGetxattrsData
*>(arg
);
1161 int rc
= rados_aio_get_return_value(c
);
1162 cdata
->user_attrset
->clear();
1164 for (map
<string
,bufferlist
>::iterator p
= cdata
->result_attrset
.begin();
1165 p
!= cdata
->result_attrset
.end();
1167 ldout(cdata
->client
->cct
, 10) << "IoCtxImpl::getxattrs: xattr=" << p
->first
<< dendl
;
1168 (*cdata
->user_attrset
)[p
->first
] = p
->second
;
1171 cdata
->user_completion(rc
);
1172 ((librados::AioCompletionImpl
*)c
)->put();
1176 int librados::IoCtxImpl::aio_getxattrs(const object_t
& oid
, AioCompletionImpl
*c
,
1177 map
<std::string
, bufferlist
>& attrset
)
1179 AioGetxattrsData
*cdata
= new AioGetxattrsData(c
, &attrset
, client
);
1180 ::ObjectOperation rd
;
1181 prepare_assert_ops(&rd
);
1182 rd
.getxattrs(&cdata
->result_attrset
, NULL
);
1183 librados::AioCompletionImpl
*comp
= new librados::AioCompletionImpl
;
1184 comp
->set_complete_callback(cdata
, aio_getxattrs_complete
);
1185 return aio_operate_read(oid
, &rd
, comp
, 0, NULL
);
1188 int librados::IoCtxImpl::aio_cancel(AioCompletionImpl
*c
)
1190 return objecter
->op_cancel(c
->tid
, -ECANCELED
);
1194 int librados::IoCtxImpl::hit_set_list(uint32_t hash
, AioCompletionImpl
*c
,
1195 std::list
< std::pair
<time_t, time_t> > *pls
)
1197 Context
*oncomplete
= new C_aio_Complete(c
);
1201 ::ObjectOperation rd
;
1202 rd
.hit_set_ls(pls
, NULL
);
1203 object_locator_t
oloc(poolid
);
1204 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1205 hash
, oloc
, rd
, NULL
, extra_op_flags
, oncomplete
, NULL
, NULL
);
1206 objecter
->op_submit(o
, &c
->tid
);
1210 int librados::IoCtxImpl::hit_set_get(uint32_t hash
, AioCompletionImpl
*c
,
1214 Context
*oncomplete
= new C_aio_Complete(c
);
1218 ::ObjectOperation rd
;
1219 rd
.hit_set_get(ceph::real_clock::from_time_t(stamp
), pbl
, 0);
1220 object_locator_t
oloc(poolid
);
1221 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1222 hash
, oloc
, rd
, NULL
, extra_op_flags
, oncomplete
, NULL
, NULL
);
1223 objecter
->op_submit(o
, &c
->tid
);
1227 int librados::IoCtxImpl::remove(const object_t
& oid
)
1229 ::ObjectOperation op
;
1230 prepare_assert_ops(&op
);
1232 return operate(oid
, &op
, nullptr, librados::OPERATION_FULL_FORCE
);
1235 int librados::IoCtxImpl::remove(const object_t
& oid
, int flags
)
1237 ::ObjectOperation op
;
1238 prepare_assert_ops(&op
);
1240 return operate(oid
, &op
, NULL
, flags
);
1243 int librados::IoCtxImpl::trunc(const object_t
& oid
, uint64_t size
)
1245 ::ObjectOperation op
;
1246 prepare_assert_ops(&op
);
1248 return operate(oid
, &op
, NULL
);
1251 int librados::IoCtxImpl::get_inconsistent_objects(const pg_t
& pg
,
1252 const librados::object_id_t
& start_after
,
1253 uint64_t max_to_get
,
1254 AioCompletionImpl
*c
,
1255 std::vector
<inconsistent_obj_t
>* objects
,
1258 Context
*oncomplete
= new C_aio_Complete(c
);
1262 ::ObjectOperation op
;
1263 op
.scrub_ls(start_after
, max_to_get
, objects
, interval
, &c
->rval
);
1264 object_locator_t oloc
{poolid
, pg
.ps()};
1265 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1266 oloc
.hash
, oloc
, op
, nullptr, CEPH_OSD_FLAG_PGOP
| extra_op_flags
, oncomplete
,
1268 objecter
->op_submit(o
, &c
->tid
);
1272 int librados::IoCtxImpl::get_inconsistent_snapsets(const pg_t
& pg
,
1273 const librados::object_id_t
& start_after
,
1274 uint64_t max_to_get
,
1275 AioCompletionImpl
*c
,
1276 std::vector
<inconsistent_snapset_t
>* snapsets
,
1279 Context
*oncomplete
= new C_aio_Complete(c
);
1283 ::ObjectOperation op
;
1284 op
.scrub_ls(start_after
, max_to_get
, snapsets
, interval
, &c
->rval
);
1285 object_locator_t oloc
{poolid
, pg
.ps()};
1286 Objecter::Op
*o
= objecter
->prepare_pg_read_op(
1287 oloc
.hash
, oloc
, op
, nullptr, CEPH_OSD_FLAG_PGOP
| extra_op_flags
, oncomplete
,
1289 objecter
->op_submit(o
, &c
->tid
);
1293 int librados::IoCtxImpl::tmap_update(const object_t
& oid
, bufferlist
& cmdbl
)
1295 ::ObjectOperation wr
;
1296 prepare_assert_ops(&wr
);
1297 wr
.tmap_update(cmdbl
);
1298 return operate(oid
, &wr
, NULL
);
1301 int librados::IoCtxImpl::exec(const object_t
& oid
,
1302 const char *cls
, const char *method
,
1303 bufferlist
& inbl
, bufferlist
& outbl
)
1305 ::ObjectOperation rd
;
1306 prepare_assert_ops(&rd
);
1307 rd
.call(cls
, method
, inbl
);
1308 return operate_read(oid
, &rd
, &outbl
);
1311 int librados::IoCtxImpl::aio_exec(const object_t
& oid
, AioCompletionImpl
*c
,
1312 const char *cls
, const char *method
,
1313 bufferlist
& inbl
, bufferlist
*outbl
)
1315 FUNCTRACE(client
->cct
);
1316 Context
*oncomplete
= new C_aio_Complete(c
);
1318 #if defined(WITH_EVENTTRACE)
1319 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1324 ::ObjectOperation rd
;
1325 prepare_assert_ops(&rd
);
1326 rd
.call(cls
, method
, inbl
);
1327 Objecter::Op
*o
= objecter
->prepare_read_op(
1328 oid
, oloc
, rd
, snap_seq
, outbl
, extra_op_flags
, oncomplete
, &c
->objver
);
1329 objecter
->op_submit(o
, &c
->tid
);
1333 int librados::IoCtxImpl::aio_exec(const object_t
& oid
, AioCompletionImpl
*c
,
1334 const char *cls
, const char *method
,
1335 bufferlist
& inbl
, char *buf
, size_t out_len
)
1337 FUNCTRACE(client
->cct
);
1338 Context
*oncomplete
= new C_aio_Complete(c
);
1340 #if defined(WITH_EVENTTRACE)
1341 ((C_aio_Complete
*) oncomplete
)->oid
= oid
;
1346 c
->bl
.push_back(buffer::create_static(out_len
, buf
));
1350 ::ObjectOperation rd
;
1351 prepare_assert_ops(&rd
);
1352 rd
.call(cls
, method
, inbl
);
1353 Objecter::Op
*o
= objecter
->prepare_read_op(
1354 oid
, oloc
, rd
, snap_seq
, &c
->bl
, extra_op_flags
, oncomplete
, &c
->objver
);
1355 objecter
->op_submit(o
, &c
->tid
);
1359 int librados::IoCtxImpl::read(const object_t
& oid
,
1360 bufferlist
& bl
, size_t len
, uint64_t off
)
1362 if (len
> (size_t) INT_MAX
)
1364 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_READ_OP_BEGIN");
1366 ::ObjectOperation rd
;
1367 prepare_assert_ops(&rd
);
1368 rd
.read(off
, len
, &bl
, NULL
, NULL
);
1369 int r
= operate_read(oid
, &rd
, &bl
);
1373 if (bl
.length() < len
) {
1374 ldout(client
->cct
, 10) << "Returned length " << bl
.length()
1375 << " less than original length "<< len
<< dendl
;
1381 int librados::IoCtxImpl::cmpext(const object_t
& oid
, uint64_t off
,
1384 if (cmp_bl
.length() > UINT_MAX
/2)
1387 ::ObjectOperation op
;
1388 prepare_assert_ops(&op
);
1389 op
.cmpext(off
, cmp_bl
, NULL
);
1390 return operate_read(oid
, &op
, NULL
);
1393 int librados::IoCtxImpl::mapext(const object_t
& oid
,
1394 uint64_t off
, size_t len
,
1395 std::map
<uint64_t,uint64_t>& m
)
1399 ceph::mutex mylock
= ceph::make_mutex("IoCtxImpl::read::mylock");
1400 ceph::condition_variable cond
;
1403 Context
*onack
= new C_SafeCond(mylock
, cond
, &done
, &r
);
1405 objecter
->mapext(oid
, oloc
,
1406 off
, len
, snap_seq
, &bl
, extra_op_flags
,
1410 unique_lock l
{mylock
};
1411 cond
.wait(l
, [&done
] { return done
;});
1413 ldout(client
->cct
, 10) << "Objecter returned from read r=" << r
<< dendl
;
1418 auto iter
= bl
.cbegin();
1424 int librados::IoCtxImpl::sparse_read(const object_t
& oid
,
1425 std::map
<uint64_t,uint64_t>& m
,
1426 bufferlist
& data_bl
, size_t len
,
1429 if (len
> (size_t) INT_MAX
)
1432 ::ObjectOperation rd
;
1433 prepare_assert_ops(&rd
);
1434 rd
.sparse_read(off
, len
, &m
, &data_bl
, NULL
);
1436 int r
= operate_read(oid
, &rd
, NULL
);
1443 int librados::IoCtxImpl::checksum(const object_t
& oid
, uint8_t type
,
1444 const bufferlist
&init_value
, size_t len
,
1445 uint64_t off
, size_t chunk_size
,
1448 if (len
> (size_t) INT_MAX
) {
1452 ::ObjectOperation rd
;
1453 prepare_assert_ops(&rd
);
1454 rd
.checksum(type
, init_value
, off
, len
, chunk_size
, pbl
, nullptr, nullptr);
1456 int r
= operate_read(oid
, &rd
, nullptr);
1464 int librados::IoCtxImpl::stat(const object_t
& oid
, uint64_t *psize
, time_t *pmtime
)
1472 ::ObjectOperation rd
;
1473 prepare_assert_ops(&rd
);
1474 rd
.stat(psize
, &mtime
, nullptr);
1475 int r
= operate_read(oid
, &rd
, NULL
);
1477 if (r
>= 0 && pmtime
) {
1478 *pmtime
= real_clock::to_time_t(mtime
);
1484 int librados::IoCtxImpl::stat2(const object_t
& oid
, uint64_t *psize
, struct timespec
*pts
)
1487 ceph::real_time mtime
;
1492 ::ObjectOperation rd
;
1493 prepare_assert_ops(&rd
);
1494 rd
.stat(psize
, &mtime
, nullptr);
1495 int r
= operate_read(oid
, &rd
, NULL
);
1501 *pts
= ceph::real_clock::to_timespec(mtime
);
1507 int librados::IoCtxImpl::getxattr(const object_t
& oid
,
1508 const char *name
, bufferlist
& bl
)
1510 ::ObjectOperation rd
;
1511 prepare_assert_ops(&rd
);
1512 rd
.getxattr(name
, &bl
, NULL
);
1513 int r
= operate_read(oid
, &rd
, &bl
);
1520 int librados::IoCtxImpl::rmxattr(const object_t
& oid
, const char *name
)
1522 ::ObjectOperation op
;
1523 prepare_assert_ops(&op
);
1525 return operate(oid
, &op
, NULL
);
1528 int librados::IoCtxImpl::setxattr(const object_t
& oid
,
1529 const char *name
, bufferlist
& bl
)
1531 ::ObjectOperation op
;
1532 prepare_assert_ops(&op
);
1533 op
.setxattr(name
, bl
);
1534 return operate(oid
, &op
, NULL
);
1537 int librados::IoCtxImpl::getxattrs(const object_t
& oid
,
1538 map
<std::string
, bufferlist
>& attrset
)
1540 map
<string
, bufferlist
> aset
;
1542 ::ObjectOperation rd
;
1543 prepare_assert_ops(&rd
);
1544 rd
.getxattrs(&aset
, NULL
);
1545 int r
= operate_read(oid
, &rd
, NULL
);
1549 for (map
<string
,bufferlist
>::iterator p
= aset
.begin(); p
!= aset
.end(); ++p
) {
1550 ldout(client
->cct
, 10) << "IoCtxImpl::getxattrs: xattr=" << p
->first
<< dendl
;
1551 attrset
[p
->first
.c_str()] = p
->second
;
1558 void librados::IoCtxImpl::set_sync_op_version(version_t ver
)
1560 ANNOTATE_BENIGN_RACE_SIZED(&last_objver
, sizeof(last_objver
),
1561 "IoCtxImpl last_objver");
1565 namespace librados
{
1566 void intrusive_ptr_add_ref(IoCtxImpl
*p
) { p
->get(); }
1567 void intrusive_ptr_release(IoCtxImpl
*p
) { p
->put(); }
1571 boost::intrusive_ptr
<librados::IoCtxImpl
> ioctx
;
1573 librados::WatchCtx
*ctx
;
1574 librados::WatchCtx2
*ctx2
;
1576 WatchInfo(librados::IoCtxImpl
*io
, object_t o
,
1577 librados::WatchCtx
*c
, librados::WatchCtx2
*c2
)
1578 : ioctx(io
), oid(o
), ctx(c
), ctx2(c2
) {}
1580 void handle_notify(uint64_t notify_id
,
1582 uint64_t notifier_id
,
1584 ldout(ioctx
->client
->cct
, 10) << __func__
<< " " << notify_id
1585 << " cookie " << cookie
1586 << " notifier_id " << notifier_id
1587 << " len " << bl
.length()
1591 ctx2
->handle_notify(notify_id
, cookie
, notifier_id
, bl
);
1593 ctx
->notify(0, 0, bl
);
1595 // send ACK back to OSD if using legacy protocol
1597 ioctx
->notify_ack(oid
, notify_id
, cookie
, empty
);
1600 void handle_error(uint64_t cookie
, int err
) {
1601 ldout(ioctx
->client
->cct
, 10) << __func__
<< " cookie " << cookie
1605 ctx2
->handle_error(cookie
, err
);
1608 void operator()(bs::error_code ec
,
1611 uint64_t notifier_id
,
1614 handle_error(cookie
, ceph::from_error_code(ec
));
1616 handle_notify(notify_id
, cookie
, notifier_id
, bl
);
1621 // internal WatchInfo that owns the context memory
1622 struct InternalWatchInfo
: public WatchInfo
{
1623 std::unique_ptr
<librados::WatchCtx
> ctx
;
1624 std::unique_ptr
<librados::WatchCtx2
> ctx2
;
1626 InternalWatchInfo(librados::IoCtxImpl
*io
, object_t o
,
1627 librados::WatchCtx
*c
, librados::WatchCtx2
*c2
)
1628 : WatchInfo(io
, o
, c
, c2
), ctx(c
), ctx2(c2
) {}
1631 int librados::IoCtxImpl::watch(const object_t
& oid
, uint64_t *handle
,
1632 librados::WatchCtx
*ctx
,
1633 librados::WatchCtx2
*ctx2
,
1636 return watch(oid
, handle
, ctx
, ctx2
, 0, internal
);
1639 int librados::IoCtxImpl::watch(const object_t
& oid
, uint64_t *handle
,
1640 librados::WatchCtx
*ctx
,
1641 librados::WatchCtx2
*ctx2
,
1645 ::ObjectOperation wr
;
1647 C_SaferCond onfinish
;
1649 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1650 *handle
= linger_op
->get_cookie();
1652 linger_op
->handle
= InternalWatchInfo(this, oid
, ctx
, ctx2
);
1654 linger_op
->handle
= WatchInfo(this, oid
, ctx
, ctx2
);
1656 prepare_assert_ops(&wr
);
1657 wr
.watch(*handle
, CEPH_OSD_WATCH_OP_WATCH
, timeout
);
1659 objecter
->linger_watch(linger_op
, wr
,
1660 snapc
, ceph::real_clock::now(), bl
,
1664 int r
= onfinish
.wait();
1666 set_sync_op_version(objver
);
1669 objecter
->linger_cancel(linger_op
);
1676 int librados::IoCtxImpl::aio_watch(const object_t
& oid
,
1677 AioCompletionImpl
*c
,
1679 librados::WatchCtx
*ctx
,
1680 librados::WatchCtx2
*ctx2
,
1682 return aio_watch(oid
, c
, handle
, ctx
, ctx2
, 0, internal
);
1685 int librados::IoCtxImpl::aio_watch(const object_t
& oid
,
1686 AioCompletionImpl
*c
,
1688 librados::WatchCtx
*ctx
,
1689 librados::WatchCtx2
*ctx2
,
1693 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1695 Context
*oncomplete
= new C_aio_linger_Complete(c
, linger_op
, false);
1697 ::ObjectOperation wr
;
1698 *handle
= linger_op
->get_cookie();
1700 linger_op
->handle
= InternalWatchInfo(this, oid
, ctx
, ctx2
);
1702 linger_op
->handle
= WatchInfo(this, oid
, ctx
, ctx2
);
1705 prepare_assert_ops(&wr
);
1706 wr
.watch(*handle
, CEPH_OSD_WATCH_OP_WATCH
, timeout
);
1708 objecter
->linger_watch(linger_op
, wr
,
1709 snapc
, ceph::real_clock::now(), bl
,
1710 oncomplete
, &c
->objver
);
1716 int librados::IoCtxImpl::notify_ack(
1717 const object_t
& oid
,
1722 ::ObjectOperation rd
;
1723 prepare_assert_ops(&rd
);
1724 rd
.notify_ack(notify_id
, cookie
, bl
);
1725 objecter
->read(oid
, oloc
, rd
, snap_seq
, (bufferlist
*)NULL
, extra_op_flags
, 0, 0);
1729 int librados::IoCtxImpl::watch_check(uint64_t cookie
)
1731 auto linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1732 auto r
= objecter
->linger_check(linger_op
);
1734 return 1 + std::chrono::duration_cast
<
1735 std::chrono::milliseconds
>(*r
).count();
1737 return ceph::from_error_code(r
.error());
1740 int librados::IoCtxImpl::unwatch(uint64_t cookie
)
1742 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1743 C_SaferCond onfinish
;
1746 ::ObjectOperation wr
;
1747 prepare_assert_ops(&wr
);
1748 wr
.watch(cookie
, CEPH_OSD_WATCH_OP_UNWATCH
);
1749 objecter
->mutate(linger_op
->target
.base_oid
, oloc
, wr
,
1750 snapc
, ceph::real_clock::now(), extra_op_flags
,
1752 objecter
->linger_cancel(linger_op
);
1754 int r
= onfinish
.wait();
1755 set_sync_op_version(ver
);
1759 int librados::IoCtxImpl::aio_unwatch(uint64_t cookie
, AioCompletionImpl
*c
)
1762 Objecter::LingerOp
*linger_op
= reinterpret_cast<Objecter::LingerOp
*>(cookie
);
1763 Context
*oncomplete
= new C_aio_linger_Complete(c
, linger_op
, true);
1765 ::ObjectOperation wr
;
1766 prepare_assert_ops(&wr
);
1767 wr
.watch(cookie
, CEPH_OSD_WATCH_OP_UNWATCH
);
1768 objecter
->mutate(linger_op
->target
.base_oid
, oloc
, wr
,
1769 snapc
, ceph::real_clock::now(), extra_op_flags
,
1770 oncomplete
, &c
->objver
);
1774 int librados::IoCtxImpl::notify(const object_t
& oid
, bufferlist
& bl
,
1775 uint64_t timeout_ms
,
1776 bufferlist
*preply_bl
,
1777 char **preply_buf
, size_t *preply_buf_len
)
1779 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1781 C_SaferCond notify_finish_cond
;
1782 linger_op
->on_notify_finish
=
1783 Objecter::LingerOp::OpComp::create(
1784 objecter
->service
.get_executor(),
1785 CB_notify_Finish(client
->cct
, ¬ify_finish_cond
,
1786 objecter
, linger_op
, preply_bl
,
1787 preply_buf
, preply_buf_len
));
1788 uint32_t timeout
= notify_timeout
;
1790 timeout
= timeout_ms
/ 1000;
1792 // Construct RADOS op
1793 ::ObjectOperation rd
;
1794 prepare_assert_ops(&rd
);
1796 rd
.notify(linger_op
->get_cookie(), 1, timeout
, bl
, &inbl
);
1801 objecter
->linger_notify(linger_op
,
1802 rd
, snap_seq
, inbl
, NULL
,
1805 ldout(client
->cct
, 10) << __func__
<< " issued linger op " << linger_op
<< dendl
;
1806 int r
= onack
.wait();
1807 ldout(client
->cct
, 10) << __func__
<< " linger op " << linger_op
1808 << " acked (" << r
<< ")" << dendl
;
1811 ldout(client
->cct
, 10) << __func__
<< " waiting for watch_notify finish "
1812 << linger_op
<< dendl
;
1813 r
= notify_finish_cond
.wait();
1816 ldout(client
->cct
, 10) << __func__
<< " failed to initiate notify, r = "
1818 notify_finish_cond
.wait();
1821 objecter
->linger_cancel(linger_op
);
1823 set_sync_op_version(objver
);
1827 int librados::IoCtxImpl::aio_notify(const object_t
& oid
, AioCompletionImpl
*c
,
1828 bufferlist
& bl
, uint64_t timeout_ms
,
1829 bufferlist
*preply_bl
, char **preply_buf
,
1830 size_t *preply_buf_len
)
1832 Objecter::LingerOp
*linger_op
= objecter
->linger_register(oid
, oloc
, 0);
1836 C_aio_notify_Complete
*oncomplete
= new C_aio_notify_Complete(c
, linger_op
);
1837 linger_op
->on_notify_finish
=
1838 Objecter::LingerOp::OpComp::create(
1839 objecter
->service
.get_executor(),
1840 CB_notify_Finish(client
->cct
, oncomplete
,
1841 objecter
, linger_op
,
1842 preply_bl
, preply_buf
,
1844 Context
*onack
= new C_aio_notify_Ack(client
->cct
, oncomplete
);
1846 uint32_t timeout
= notify_timeout
;
1848 timeout
= timeout_ms
/ 1000;
1850 // Construct RADOS op
1851 ::ObjectOperation rd
;
1852 prepare_assert_ops(&rd
);
1854 rd
.notify(linger_op
->get_cookie(), 1, timeout
, bl
, &inbl
);
1857 objecter
->linger_notify(linger_op
,
1858 rd
, snap_seq
, inbl
, NULL
,
1863 int librados::IoCtxImpl::set_alloc_hint(const object_t
& oid
,
1864 uint64_t expected_object_size
,
1865 uint64_t expected_write_size
,
1868 ::ObjectOperation wr
;
1869 prepare_assert_ops(&wr
);
1870 wr
.set_alloc_hint(expected_object_size
, expected_write_size
, flags
);
1871 return operate(oid
, &wr
, NULL
);
1874 version_t
librados::IoCtxImpl::last_version()
1879 void librados::IoCtxImpl::set_assert_version(uint64_t ver
)
1884 void librados::IoCtxImpl::set_notify_timeout(uint32_t timeout
)
1886 notify_timeout
= timeout
;
1889 int librados::IoCtxImpl::cache_pin(const object_t
& oid
)
1891 ::ObjectOperation wr
;
1892 prepare_assert_ops(&wr
);
1894 return operate(oid
, &wr
, NULL
);
1897 int librados::IoCtxImpl::cache_unpin(const object_t
& oid
)
1899 ::ObjectOperation wr
;
1900 prepare_assert_ops(&wr
);
1902 return operate(oid
, &wr
, NULL
);
1906 ///////////////////////////// C_aio_stat_Ack ////////////////////////////
1908 librados::IoCtxImpl::C_aio_stat_Ack::C_aio_stat_Ack(AioCompletionImpl
*_c
,
1912 ceph_assert(!c
->io
);
1916 void librados::IoCtxImpl::C_aio_stat_Ack::finish(int r
)
1921 c
->cond
.notify_all();
1923 if (r
>= 0 && pmtime
) {
1924 *pmtime
= real_clock::to_time_t(mtime
);
1927 if (c
->callback_complete
) {
1928 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
1934 ///////////////////////////// C_aio_stat2_Ack ////////////////////////////
1936 librados::IoCtxImpl::C_aio_stat2_Ack::C_aio_stat2_Ack(AioCompletionImpl
*_c
,
1937 struct timespec
*pt
)
1940 ceph_assert(!c
->io
);
1944 void librados::IoCtxImpl::C_aio_stat2_Ack::finish(int r
)
1949 c
->cond
.notify_all();
1951 if (r
>= 0 && pts
) {
1952 *pts
= real_clock::to_timespec(mtime
);
1955 if (c
->callback_complete
) {
1956 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
1962 //////////////////////////// C_aio_Complete ////////////////////////////////
1964 librados::IoCtxImpl::C_aio_Complete::C_aio_Complete(AioCompletionImpl
*_c
)
1970 void librados::IoCtxImpl::C_aio_Complete::finish(int r
)
1973 // Leave an existing rval unless r != 0
1975 c
->rval
= r
; // This clears the error set in C_ObjectOperation_scrub_ls::finish()
1977 c
->cond
.notify_all();
1979 if (r
== 0 && c
->blp
&& c
->blp
->length() > 0) {
1980 if (c
->out_buf
&& !c
->blp
->is_contiguous()) {
1983 if (c
->out_buf
&& !c
->blp
->is_provided_buffer(c
->out_buf
))
1984 c
->blp
->begin().copy(c
->blp
->length(), c
->out_buf
);
1986 c
->rval
= c
->blp
->length();
1990 if (c
->callback_complete
||
1992 boost::asio::defer(c
->io
->client
->finish_strand
, CB_AioComplete(c
));
1995 if (c
->aio_write_seq
) {
1996 c
->io
->complete_aio_write(c
);
1999 #if defined(WITH_EVENTTRACE)
2000 OID_EVENT_TRACE(oid
.name
.c_str(), "RADOS_OP_COMPLETE");
2005 void librados::IoCtxImpl::object_list_slice(
2006 const hobject_t start
,
2007 const hobject_t finish
,
2010 hobject_t
*split_start
,
2011 hobject_t
*split_finish
)
2013 if (start
.is_max()) {
2014 *split_start
= hobject_t::get_max();
2015 *split_finish
= hobject_t::get_max();
2019 uint64_t start_hash
= hobject_t::_reverse_bits(start
.get_hash());
2020 uint64_t finish_hash
=
2021 finish
.is_max() ? 0x100000000 :
2022 hobject_t::_reverse_bits(finish
.get_hash());
2024 uint64_t diff
= finish_hash
- start_hash
;
2025 uint64_t rev_start
= start_hash
+ (diff
* n
/ m
);
2026 uint64_t rev_finish
= start_hash
+ (diff
* (n
+ 1) / m
);
2028 *split_start
= start
;
2030 *split_start
= hobject_t(
2031 object_t(), string(), CEPH_NOSNAP
,
2032 hobject_t::_reverse_bits(rev_start
), poolid
, string());
2036 *split_finish
= finish
;
2037 else if (rev_finish
>= 0x100000000)
2038 *split_finish
= hobject_t::get_max();
2040 *split_finish
= hobject_t(
2041 object_t(), string(), CEPH_NOSNAP
,
2042 hobject_t::_reverse_bits(rev_finish
), poolid
, string());
2045 int librados::IoCtxImpl::application_enable(const std::string
& app_name
,
2048 auto c
= new PoolAsyncCompletionImpl();
2049 application_enable_async(app_name
, force
, c
);
2052 ceph_assert(r
== 0);
2054 r
= c
->get_return_value();
2061 return client
->wait_for_latest_osdmap();
2064 void librados::IoCtxImpl::application_enable_async(const std::string
& app_name
,
2066 PoolAsyncCompletionImpl
*c
)
2068 // pre-Luminous clusters will return -EINVAL and application won't be
2069 // preserved until Luminous is configured as minimim version.
2070 if (!client
->get_required_monitor_features().contains_all(
2071 ceph::features::mon::FEATURE_LUMINOUS
)) {
2072 boost::asio::defer(client
->finish_strand
,
2073 [cb
= CB_PoolAsync_Safe(c
)]() mutable {
2079 std::stringstream cmd
;
2081 << "\"prefix\": \"osd pool application enable\","
2082 << "\"pool\": \"" << get_cached_pool_name() << "\","
2083 << "\"app\": \"" << app_name
<< "\"";
2085 cmd
<< ",\"yes_i_really_mean_it\": true";
2089 std::vector
<std::string
> cmds
;
2090 cmds
.push_back(cmd
.str());
2092 client
->mon_command_async(cmds
, inbl
, nullptr, nullptr,
2093 make_lambda_context(CB_PoolAsync_Safe(c
)));
2096 int librados::IoCtxImpl::application_list(std::set
<std::string
> *app_names
)
2100 objecter
->with_osdmap([&](const OSDMap
& o
) {
2101 auto pg_pool
= o
.get_pg_pool(poolid
);
2102 if (pg_pool
== nullptr) {
2107 for (auto &pair
: pg_pool
->application_metadata
) {
2108 app_names
->insert(pair
.first
);
2114 int librados::IoCtxImpl::application_metadata_get(const std::string
& app_name
,
2115 const std::string
&key
,
2119 objecter
->with_osdmap([&](const OSDMap
& o
) {
2120 auto pg_pool
= o
.get_pg_pool(poolid
);
2121 if (pg_pool
== nullptr) {
2126 auto app_it
= pg_pool
->application_metadata
.find(app_name
);
2127 if (app_it
== pg_pool
->application_metadata
.end()) {
2132 auto it
= app_it
->second
.find(key
);
2133 if (it
== app_it
->second
.end()) {
2138 *value
= it
->second
;
2143 int librados::IoCtxImpl::application_metadata_set(const std::string
& app_name
,
2144 const std::string
&key
,
2145 const std::string
& value
)
2147 std::stringstream cmd
;
2149 << "\"prefix\":\"osd pool application set\","
2150 << "\"pool\":\"" << get_cached_pool_name() << "\","
2151 << "\"app\":\"" << app_name
<< "\","
2152 << "\"key\":\"" << key
<< "\","
2153 << "\"value\":\"" << value
<< "\""
2156 std::vector
<std::string
> cmds
;
2157 cmds
.push_back(cmd
.str());
2159 int r
= client
->mon_command(cmds
, inbl
, nullptr, nullptr);
2164 // ensure we have the latest osd map epoch before proceeding
2165 return client
->wait_for_latest_osdmap();
2168 int librados::IoCtxImpl::application_metadata_remove(const std::string
& app_name
,
2169 const std::string
&key
)
2171 std::stringstream cmd
;
2173 << "\"prefix\":\"osd pool application rm\","
2174 << "\"pool\":\"" << get_cached_pool_name() << "\","
2175 << "\"app\":\"" << app_name
<< "\","
2176 << "\"key\":\"" << key
<< "\""
2179 std::vector
<std::string
> cmds
;
2180 cmds
.push_back(cmd
.str());
2182 int r
= client
->mon_command(cmds
, inbl
, nullptr, nullptr);
2187 // ensure we have the latest osd map epoch before proceeding
2188 return client
->wait_for_latest_osdmap();
2191 int librados::IoCtxImpl::application_metadata_list(const std::string
& app_name
,
2192 std::map
<std::string
, std::string
> *values
)
2196 objecter
->with_osdmap([&](const OSDMap
& o
) {
2197 auto pg_pool
= o
.get_pg_pool(poolid
);
2198 if (pg_pool
== nullptr) {
2203 auto it
= pg_pool
->application_metadata
.find(app_name
);
2204 if (it
== pg_pool
->application_metadata
.end()) {
2209 *values
= it
->second
;