1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <boost/intrusive_ptr.hpp>
7 #include "include/ceph_assert.h"
8 #include "rgw_coroutine.h"
10 #include "rgw_sal_rados.h"
11 #include "common/WorkQueue.h"
12 #include "common/Throttle.h"
15 #include "common/ceph_time.h"
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_bucket.h"
20 struct rgw_http_param_pair
;
23 class RGWAsyncRadosRequest
: public RefCountedObject
{
25 RGWAioCompletionNotifier
*notifier
;
29 ceph::mutex lock
= ceph::make_mutex("RGWAsyncRadosRequest::lock");
32 virtual int _send_request(const DoutPrefixProvider
*dpp
) = 0;
34 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
)
35 : caller(_caller
), notifier(_cn
), retcode(0) {
37 ~RGWAsyncRadosRequest() override
{
43 void send_request(const DoutPrefixProvider
*dpp
) {
45 retcode
= _send_request(dpp
);
47 std::lock_guard l
{lock
};
49 notifier
->cb(); // drops its own ref
56 int get_ret_status() { return retcode
; }
60 std::lock_guard l
{lock
};
62 // we won't call notifier->cb() to drop its ref, so drop it here
72 class RGWAsyncRadosProcessor
{
73 std::deque
<RGWAsyncRadosRequest
*> m_req_queue
;
74 std::atomic
<bool> going_down
= { false };
78 Throttle req_throttle
;
80 struct RGWWQ
: public DoutPrefixProvider
, public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
81 RGWAsyncRadosProcessor
*processor
;
82 RGWWQ(RGWAsyncRadosProcessor
*p
,
83 ceph::timespan timeout
, ceph::timespan suicide_timeout
,
85 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
87 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
88 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
91 bool _empty() override
;
92 RGWAsyncRadosRequest
*_dequeue() override
;
93 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
94 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
96 void _clear() override
{
97 ceph_assert(processor
->m_req_queue
.empty());
100 CephContext
*get_cct() const { return processor
->cct
; }
101 unsigned get_subsys() const { return ceph_subsys_rgw
; }
102 std::ostream
& gen_prefix(std::ostream
& out
) const { return out
<< "rgw async rados processor: ";}
107 RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
);
108 ~RGWAsyncRadosProcessor() {}
111 void handle_request(const DoutPrefixProvider
*dpp
, RGWAsyncRadosRequest
*req
);
112 void queue(RGWAsyncRadosRequest
*req
);
114 bool is_going_down() {
121 class RGWSimpleWriteOnlyAsyncCR
: public RGWSimpleCoroutine
{
122 RGWAsyncRadosProcessor
*async_rados
;
123 rgw::sal::RadosStore
* store
;
126 const DoutPrefixProvider
*dpp
;
128 class Request
: public RGWAsyncRadosRequest
{
129 rgw::sal::RadosStore
* store
;
131 const DoutPrefixProvider
*dpp
;
133 int _send_request(const DoutPrefixProvider
*dpp
) override
;
135 Request(RGWCoroutine
*caller
,
136 RGWAioCompletionNotifier
*cn
,
137 rgw::sal::RadosStore
* store
,
139 const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
),
146 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
147 rgw::sal::RadosStore
* _store
,
149 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
150 async_rados(_async_rados
),
155 ~RGWSimpleWriteOnlyAsyncCR() override
{
158 void request_cleanup() override
{
165 int send_request(const DoutPrefixProvider
*dpp
) override
{
166 req
= new Request(this,
167 stack
->create_completion_notifier(),
172 async_rados
->queue(req
);
175 int request_complete() override
{
176 return req
->get_ret_status();
181 template <class P
, class R
>
182 class RGWSimpleAsyncCR
: public RGWSimpleCoroutine
{
183 RGWAsyncRadosProcessor
*async_rados
;
184 rgw::sal::RadosStore
* store
;
187 std::shared_ptr
<R
> result
;
188 const DoutPrefixProvider
*dpp
;
190 class Request
: public RGWAsyncRadosRequest
{
191 rgw::sal::RadosStore
* store
;
193 std::shared_ptr
<R
> result
;
194 const DoutPrefixProvider
*dpp
;
196 int _send_request(const DoutPrefixProvider
*dpp
) override
;
198 Request(const DoutPrefixProvider
*dpp
,
199 RGWCoroutine
*caller
,
200 RGWAioCompletionNotifier
*cn
,
201 rgw::sal::RadosStore
* _store
,
203 std::shared_ptr
<R
>& _result
,
204 const DoutPrefixProvider
*_dpp
) : RGWAsyncRadosRequest(caller
, cn
),
212 RGWSimpleAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
213 rgw::sal::RadosStore
* _store
,
215 std::shared_ptr
<R
>& _result
,
216 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
217 async_rados(_async_rados
),
223 ~RGWSimpleAsyncCR() override
{
226 void request_cleanup() override
{
233 int send_request(const DoutPrefixProvider
*dpp
) override
{
234 req
= new Request(dpp
,
236 stack
->create_completion_notifier(),
242 async_rados
->queue(req
);
245 int request_complete() override
{
246 return req
->get_ret_status();
250 class RGWGenericAsyncCR
: public RGWSimpleCoroutine
{
251 RGWAsyncRadosProcessor
*async_rados
;
252 rgw::sal::RadosStore
* store
;
259 virtual int operate() = 0;
263 std::shared_ptr
<Action
> action
;
265 class Request
: public RGWAsyncRadosRequest
{
266 std::shared_ptr
<Action
> action
;
268 int _send_request(const DoutPrefixProvider
*dpp
) override
{
272 return action
->operate();
275 Request(const DoutPrefixProvider
*dpp
,
276 RGWCoroutine
*caller
,
277 RGWAioCompletionNotifier
*cn
,
278 std::shared_ptr
<Action
>& _action
) : RGWAsyncRadosRequest(caller
, cn
),
283 RGWGenericAsyncCR(CephContext
*_cct
,
284 RGWAsyncRadosProcessor
*_async_rados
,
285 std::shared_ptr
<Action
>& _action
) : RGWSimpleCoroutine(_cct
),
286 async_rados(_async_rados
),
289 RGWGenericAsyncCR(CephContext
*_cct
,
290 RGWAsyncRadosProcessor
*_async_rados
,
291 std::shared_ptr
<T
>& _action
) : RGWSimpleCoroutine(_cct
),
292 async_rados(_async_rados
),
293 action(std::static_pointer_cast
<Action
>(_action
)) {}
295 ~RGWGenericAsyncCR() override
{
298 void request_cleanup() override
{
305 int send_request(const DoutPrefixProvider
*dpp
) override
{
306 req
= new Request(dpp
, this,
307 stack
->create_completion_notifier(),
310 async_rados
->queue(req
);
313 int request_complete() override
{
314 return req
->get_ret_status();
319 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
320 const DoutPrefixProvider
*dpp
;
321 RGWSI_SysObj
* svc_sysobj
;
323 const bool want_attrs
;
324 const bool raw_attrs
;
326 int _send_request(const DoutPrefixProvider
*dpp
) override
;
328 RGWAsyncGetSystemObj(const DoutPrefixProvider
*dpp
,
329 RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
330 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
331 bool want_attrs
, bool raw_attrs
);
334 std::map
<std::string
, bufferlist
> attrs
;
335 RGWObjVersionTracker objv_tracker
;
338 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
339 const DoutPrefixProvider
*dpp
;
346 int _send_request(const DoutPrefixProvider
*dpp
) override
;
348 RGWAsyncPutSystemObj(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
,
349 RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
350 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
351 bool _exclusive
, bufferlist _bl
);
353 RGWObjVersionTracker objv_tracker
;
356 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
357 const DoutPrefixProvider
*dpp
;
360 std::map
<std::string
, bufferlist
> attrs
;
364 int _send_request(const DoutPrefixProvider
*dpp
) override
;
366 RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
367 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
368 std::map
<std::string
, bufferlist
> _attrs
, bool exclusive
);
370 RGWObjVersionTracker objv_tracker
;
373 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
374 rgw::sal::RadosStore
* store
;
376 std::string lock_name
;
378 uint32_t duration_secs
;
381 int _send_request(const DoutPrefixProvider
*dpp
) override
;
383 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
384 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
385 const std::string
& _name
, const std::string
& _cookie
, uint32_t _duration_secs
);
388 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
389 rgw::sal::RadosStore
* store
;
391 std::string lock_name
;
395 int _send_request(const DoutPrefixProvider
*dpp
) override
;
397 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
398 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
399 const std::string
& _name
, const std::string
& _cookie
);
403 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
404 const DoutPrefixProvider
* dpp
;
405 rgw::sal::RadosStore
* store
;
408 /// on ENOENT, call handle_data() with an empty object instead of failing
409 const bool empty_on_enoent
;
410 RGWObjVersionTracker
* objv_tracker
;
414 ceph::buffer::list bl
;
415 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
418 RGWSimpleRadosReadCR(const DoutPrefixProvider
* dpp
,
419 rgw::sal::RadosStore
* store
,
420 const rgw_raw_obj
& obj
,
421 T
* result
, bool empty_on_enoent
= true,
422 RGWObjVersionTracker
* objv_tracker
= nullptr)
423 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
),
424 obj(obj
), result(result
), empty_on_enoent(empty_on_enoent
),
425 objv_tracker(objv_tracker
) {
431 int send_request(const DoutPrefixProvider
*dpp
) {
432 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
434 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret="
439 set_status() << "sending request";
441 librados::ObjectReadOperation op
;
443 objv_tracker
->prepare_op_for_read(&op
);
446 op
.read(0, -1, &bl
, nullptr);
448 cn
= stack
->create_completion_notifier();
449 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
,
453 int request_complete() {
454 int ret
= cn
->completion()->get_return_value();
455 set_status() << "request complete; ret=" << ret
;
457 if (ret
== -ENOENT
&& empty_on_enoent
) {
464 auto iter
= bl
.cbegin();
466 // allow successful reads with empty buffers. ReadSyncStatus coroutines
467 // depend on this to be able to read without locking, because the
468 // cls lock from InitSyncStatus will create an empty object if it didn't
472 decode(*result
, iter
);
474 } catch (buffer::error
& err
) {
479 return handle_data(*result
);
482 virtual int handle_data(T
& data
) {
487 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
488 const DoutPrefixProvider
* dpp
;
489 rgw::sal::RadosStore
* const store
;
491 const rgw_raw_obj obj
;
492 std::map
<std::string
, bufferlist
>* const pattrs
;
493 const bool raw_attrs
;
494 RGWObjVersionTracker
* const objv_tracker
;
497 std::map
<std::string
, bufferlist
> unfiltered_attrs
;
498 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
501 RGWSimpleRadosReadAttrsCR(const DoutPrefixProvider
* dpp
,
502 rgw::sal::RadosStore
* store
,
504 std::map
<std::string
, bufferlist
>* pattrs
,
506 RGWObjVersionTracker
* objv_tracker
= nullptr)
507 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
),
508 obj(std::move(obj
)), pattrs(pattrs
), raw_attrs(raw_attrs
),
509 objv_tracker(objv_tracker
) {}
511 int send_request(const DoutPrefixProvider
*dpp
) override
;
512 int request_complete() override
;
516 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
517 const DoutPrefixProvider
* dpp
;
518 rgw::sal::RadosStore
* const store
;
520 RGWObjVersionTracker
* objv_tracker
;
525 std::map
<std::string
, bufferlist
> unfiltered_attrs
;
526 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
530 RGWSimpleRadosWriteCR(const DoutPrefixProvider
* dpp
,
531 rgw::sal::RadosStore
* const store
,
532 rgw_raw_obj obj
, const T
& data
,
533 RGWObjVersionTracker
* objv_tracker
= nullptr,
534 bool exclusive
= false)
535 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
),
536 obj(std::move(obj
)), objv_tracker(objv_tracker
), exclusive(exclusive
) {
540 int send_request(const DoutPrefixProvider
*dpp
) override
{
541 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
543 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret="
548 set_status() << "sending request";
550 librados::ObjectWriteOperation op
;
555 objv_tracker
->prepare_op_for_write(&op
);
559 cn
= stack
->create_completion_notifier();
560 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
563 int request_complete() override
{
564 int ret
= cn
->completion()->get_return_value();
565 set_status() << "request complete; ret=" << ret
;
566 if (ret
>= 0 && objv_tracker
) {
567 objv_tracker
->apply_write();
573 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
574 const DoutPrefixProvider
* dpp
;
575 rgw::sal::RadosStore
* const store
;
576 RGWObjVersionTracker
* objv_tracker
;
578 std::map
<std::string
, bufferlist
> attrs
;
582 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
586 RGWSimpleRadosWriteAttrsCR(const DoutPrefixProvider
* dpp
,
587 rgw::sal::RadosStore
* const store
,
589 std::map
<std::string
, bufferlist
> attrs
,
590 RGWObjVersionTracker
* objv_tracker
= nullptr,
591 bool exclusive
= false)
592 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
),
593 store(store
), objv_tracker(objv_tracker
),
594 obj(std::move(obj
)), attrs(std::move(attrs
)),
595 exclusive(exclusive
) {}
597 int send_request(const DoutPrefixProvider
*dpp
) override
{
598 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
600 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret="
605 set_status() << "sending request";
607 librados::ObjectWriteOperation op
;
612 objv_tracker
->prepare_op_for_write(&op
);
615 for (const auto& [name
, bl
] : attrs
) {
618 op
.setxattr(name
.c_str(), bl
);
621 cn
= stack
->create_completion_notifier();
627 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
630 int request_complete() override
{
631 int ret
= cn
->completion()->get_return_value();
632 set_status() << "request complete; ret=" << ret
;
633 if (ret
>= 0 && objv_tracker
) {
634 objv_tracker
->apply_write();
640 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
641 rgw::sal::RadosStore
* store
;
642 std::map
<std::string
, bufferlist
> entries
;
648 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
651 RGWRadosSetOmapKeysCR(rgw::sal::RadosStore
* _store
,
652 const rgw_raw_obj
& _obj
,
653 std::map
<std::string
, bufferlist
>& _entries
);
655 int send_request(const DoutPrefixProvider
*dpp
) override
;
656 int request_complete() override
;
659 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
663 std::set
<std::string
> entries
;
666 using ResultPtr
= std::shared_ptr
<Result
>;
668 RGWRadosGetOmapKeysCR(rgw::sal::RadosStore
* _store
, const rgw_raw_obj
& _obj
,
669 const std::string
& _marker
, int _max_entries
,
672 int send_request(const DoutPrefixProvider
*dpp
) override
;
673 int request_complete() override
;
676 rgw::sal::RadosStore
* store
;
681 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
684 class RGWRadosGetOmapValsCR
: public RGWSimpleCoroutine
{
688 std::map
<std::string
, bufferlist
> entries
;
691 using ResultPtr
= std::shared_ptr
<Result
>;
693 RGWRadosGetOmapValsCR(rgw::sal::RadosStore
* _store
, const rgw_raw_obj
& _obj
,
694 const std::string
& _marker
, int _max_entries
,
697 int send_request(const DoutPrefixProvider
*dpp
) override
;
698 int request_complete() override
;
701 rgw::sal::RadosStore
* store
;
706 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
709 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
710 rgw::sal::RadosStore
* store
;
714 std::set
<std::string
> keys
;
718 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
721 RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore
* _store
,
722 const rgw_raw_obj
& _obj
,
723 const std::set
<std::string
>& _keys
);
725 int send_request(const DoutPrefixProvider
*dpp
) override
;
727 int request_complete() override
;
730 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
731 rgw::sal::RadosStore
* store
;
732 librados::IoCtx ioctx
;
733 const rgw_raw_obj obj
;
734 RGWObjVersionTracker
* objv_tracker
;
735 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
738 RGWRadosRemoveCR(rgw::sal::RadosStore
* store
, const rgw_raw_obj
& obj
,
739 RGWObjVersionTracker
* objv_tracker
= nullptr);
741 int send_request(const DoutPrefixProvider
*dpp
) override
;
742 int request_complete() override
;
745 class RGWRadosRemoveOidCR
: public RGWSimpleCoroutine
{
746 librados::IoCtx ioctx
;
747 const std::string oid
;
748 RGWObjVersionTracker
* objv_tracker
;
749 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
752 RGWRadosRemoveOidCR(rgw::sal::RadosStore
* store
,
753 librados::IoCtx
&& ioctx
, std::string_view oid
,
754 RGWObjVersionTracker
* objv_tracker
= nullptr);
756 RGWRadosRemoveOidCR(rgw::sal::RadosStore
* store
,
757 RGWSI_RADOS::Obj
& obj
,
758 RGWObjVersionTracker
* objv_tracker
= nullptr);
760 RGWRadosRemoveOidCR(rgw::sal::RadosStore
* store
,
761 RGWSI_RADOS::Obj
&& obj
,
762 RGWObjVersionTracker
* objv_tracker
= nullptr);
764 int send_request(const DoutPrefixProvider
*dpp
) override
;
765 int request_complete() override
;
768 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
769 RGWAsyncRadosProcessor
*async_rados
;
770 rgw::sal::RadosStore
* store
;
771 std::string lock_name
;
777 RGWAsyncLockSystemObj
*req
;
780 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
781 const rgw_raw_obj
& _obj
,
782 const std::string
& _lock_name
,
783 const std::string
& _cookie
,
785 ~RGWSimpleRadosLockCR() override
{
788 void request_cleanup() override
;
790 int send_request(const DoutPrefixProvider
*dpp
) override
;
791 int request_complete() override
;
793 static std::string
gen_random_cookie(CephContext
* cct
) {
794 static constexpr std::size_t COOKIE_LEN
= 16;
795 char buf
[COOKIE_LEN
+ 1];
796 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
801 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
802 RGWAsyncRadosProcessor
*async_rados
;
803 rgw::sal::RadosStore
* store
;
804 std::string lock_name
;
809 RGWAsyncUnlockSystemObj
*req
;
812 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
813 const rgw_raw_obj
& _obj
,
814 const std::string
& _lock_name
,
815 const std::string
& _cookie
);
816 ~RGWSimpleRadosUnlockCR() override
{
819 void request_cleanup() override
;
821 int send_request(const DoutPrefixProvider
*dpp
) override
;
822 int request_complete() override
;
825 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
827 class RGWOmapAppend
: public RGWConsumerCR
<std::string
> {
828 RGWAsyncRadosProcessor
*async_rados
;
829 rgw::sal::RadosStore
* store
;
835 int num_pending_entries
;
836 std::list
<std::string
> pending_entries
;
838 std::map
<std::string
, bufferlist
> entries
;
840 uint64_t window_size
;
841 uint64_t total_entries
;
843 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
844 const rgw_raw_obj
& _obj
,
845 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
846 int operate(const DoutPrefixProvider
*dpp
) override
;
847 void flush_pending();
848 bool append(const std::string
& s
);
851 uint64_t get_total_entries() {
852 return total_entries
;
855 const rgw_raw_obj
& get_obj() {
860 class RGWShardedOmapCRManager
{
861 RGWAsyncRadosProcessor
*async_rados
;
862 rgw::sal::RadosStore
* store
;
867 std::vector
<RGWOmapAppend
*> shards
;
869 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const std::string
& oid_prefix
)
870 : async_rados(_async_rados
),
871 store(_store
), op(_op
), num_shards(_num_shards
) {
872 shards
.reserve(num_shards
);
873 for (int i
= 0; i
< num_shards
; ++i
) {
874 char buf
[oid_prefix
.size() + 16];
875 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
876 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
878 shards
.push_back(shard
);
879 op
->spawn(shard
, false);
883 ~RGWShardedOmapCRManager() {
884 for (auto shard
: shards
) {
889 bool append(const std::string
& entry
, int shard_id
) {
890 return shards
[shard_id
]->append(entry
);
894 for (auto& append_op
: shards
) {
895 success
&= (append_op
->finish() && (!append_op
->is_error()));
900 uint64_t get_total_entries(int shard_id
) {
901 return shards
[shard_id
]->get_total_entries();
905 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
906 rgw::sal::RadosStore
* store
;
908 const DoutPrefixProvider
*dpp
;
911 int _send_request(const DoutPrefixProvider
*dpp
) override
;
913 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
914 rgw::sal::RadosStore
* _store
, const rgw_bucket
& bucket
,
915 const DoutPrefixProvider
*dpp
)
916 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), bucket(bucket
), dpp(dpp
) {}
918 RGWBucketInfo bucket_info
;
919 std::map
<std::string
, bufferlist
> attrs
;
922 class RGWAsyncPutBucketInstanceInfo
: public RGWAsyncRadosRequest
{
923 rgw::sal::RadosStore
* store
;
924 RGWBucketInfo
& bucket_info
;
927 std::map
<std::string
, ceph::bufferlist
>* attrs
;
928 const DoutPrefixProvider
*dpp
;
931 int _send_request(const DoutPrefixProvider
*dpp
) override
;
933 RGWAsyncPutBucketInstanceInfo(RGWCoroutine
* caller
,
934 RGWAioCompletionNotifier
* cn
,
935 rgw::sal::RadosStore
* store
,
936 RGWBucketInfo
& bucket_info
,
939 std::map
<std::string
, ceph::bufferlist
>* attrs
,
940 const DoutPrefixProvider
* dpp
)
941 : RGWAsyncRadosRequest(caller
, cn
), store(store
), bucket_info(bucket_info
),
942 exclusive(exclusive
), mtime(mtime
), attrs(attrs
), dpp(dpp
) {}
945 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
946 RGWAsyncRadosProcessor
*async_rados
;
947 rgw::sal::RadosStore
* store
;
949 RGWBucketInfo
*bucket_info
;
950 std::map
<std::string
, bufferlist
> *pattrs
;
951 const DoutPrefixProvider
*dpp
;
953 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
956 // rgw_bucket constructor
957 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
958 const rgw_bucket
& _bucket
, RGWBucketInfo
*_bucket_info
,
959 std::map
<std::string
, bufferlist
> *_pattrs
, const DoutPrefixProvider
*dpp
)
960 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
961 bucket(_bucket
), bucket_info(_bucket_info
), pattrs(_pattrs
), dpp(dpp
) {}
962 ~RGWGetBucketInstanceInfoCR() override
{
965 void request_cleanup() override
{
972 int send_request(const DoutPrefixProvider
*dpp
) override
{
973 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, bucket
, dpp
);
974 async_rados
->queue(req
);
977 int request_complete() override
{
979 *bucket_info
= std::move(req
->bucket_info
);
982 *pattrs
= std::move(req
->attrs
);
984 return req
->get_ret_status();
988 class RGWPutBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
989 RGWAsyncRadosProcessor
*async_rados
;
990 rgw::sal::RadosStore
* store
;
991 RGWBucketInfo
& bucket_info
;
994 std::map
<std::string
, ceph::bufferlist
>* attrs
;
995 const DoutPrefixProvider
*dpp
;
997 RGWAsyncPutBucketInstanceInfo
* req
= nullptr;
1000 // rgw_bucket constructor
1001 RGWPutBucketInstanceInfoCR(RGWAsyncRadosProcessor
*async_rados
,
1002 rgw::sal::RadosStore
* store
,
1003 RGWBucketInfo
& bucket_info
,
1006 std::map
<std::string
, ceph::bufferlist
>* attrs
,
1007 const DoutPrefixProvider
*dpp
)
1008 : RGWSimpleCoroutine(store
->ctx()), async_rados(async_rados
), store(store
),
1009 bucket_info(bucket_info
), exclusive(exclusive
),
1010 mtime(mtime
), attrs(attrs
), dpp(dpp
) {}
1011 ~RGWPutBucketInstanceInfoCR() override
{
1014 void request_cleanup() override
{
1021 int send_request(const DoutPrefixProvider
*dpp
) override
{
1022 req
= new RGWAsyncPutBucketInstanceInfo(this,
1023 stack
->create_completion_notifier(),
1024 store
, bucket_info
, exclusive
,
1026 async_rados
->queue(req
);
1029 int request_complete() override
{
1030 return req
->get_ret_status();
1034 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
1035 const RGWBucketInfo
& bucket_info
;
1037 const rgw::bucket_index_layout_generation generation
;
1038 RGWRados::BucketShard bs
;
1039 std::string start_marker
;
1040 std::string end_marker
;
1041 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1043 RGWRadosBILogTrimCR(const DoutPrefixProvider
*dpp
,
1044 rgw::sal::RadosStore
* store
, const RGWBucketInfo
& bucket_info
,
1046 const rgw::bucket_index_layout_generation
& generation
,
1047 const std::string
& start_marker
,
1048 const std::string
& end_marker
);
1050 int send_request(const DoutPrefixProvider
*dpp
) override
;
1051 int request_complete() override
;
1054 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
1055 rgw::sal::RadosStore
* store
;
1056 rgw_zone_id source_zone
;
1058 std::optional
<rgw_user
> user_id
;
1060 rgw_bucket src_bucket
;
1061 std::optional
<rgw_placement_rule
> dest_placement_rule
;
1062 RGWBucketInfo dest_bucket_info
;
1065 std::optional
<rgw_obj_key
> dest_key
;
1066 std::optional
<uint64_t> versioned_epoch
;
1068 real_time src_mtime
;
1071 std::shared_ptr
<RGWFetchObjFilter
> filter
;
1072 rgw_zone_set_entry source_trace_entry
;
1073 rgw_zone_set zones_trace
;
1074 PerfCounters
* counters
;
1075 const DoutPrefixProvider
*dpp
;
1078 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1080 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
1081 const rgw_zone_id
& _source_zone
,
1082 std::optional
<rgw_user
>& _user_id
,
1083 const rgw_bucket
& _src_bucket
,
1084 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
1085 const RGWBucketInfo
& _dest_bucket_info
,
1086 const rgw_obj_key
& _key
,
1087 const std::optional
<rgw_obj_key
>& _dest_key
,
1088 std::optional
<uint64_t> _versioned_epoch
,
1090 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
1091 const rgw_zone_set_entry
& source_trace_entry
,
1092 rgw_zone_set
*_zones_trace
,
1093 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
1094 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1095 source_zone(_source_zone
),
1097 src_bucket(_src_bucket
),
1098 dest_placement_rule(_dest_placement_rule
),
1099 dest_bucket_info(_dest_bucket_info
),
1101 dest_key(_dest_key
),
1102 versioned_epoch(_versioned_epoch
),
1103 copy_if_newer(_if_newer
),
1105 source_trace_entry(source_trace_entry
),
1110 zones_trace
= *_zones_trace
;
1115 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
1117 RGWAsyncRadosProcessor
*async_rados
;
1118 rgw::sal::RadosStore
* store
;
1119 rgw_zone_id source_zone
;
1121 std::optional
<rgw_user
> user_id
;
1123 rgw_bucket src_bucket
;
1124 std::optional
<rgw_placement_rule
> dest_placement_rule
;
1125 RGWBucketInfo dest_bucket_info
;
1128 std::optional
<rgw_obj_key
> dest_key
;
1129 std::optional
<uint64_t> versioned_epoch
;
1131 real_time src_mtime
;
1135 std::shared_ptr
<RGWFetchObjFilter
> filter
;
1137 RGWAsyncFetchRemoteObj
*req
;
1138 const rgw_zone_set_entry
& source_trace_entry
;
1139 rgw_zone_set
*zones_trace
;
1140 PerfCounters
* counters
;
1141 const DoutPrefixProvider
*dpp
;
1144 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1145 const rgw_zone_id
& _source_zone
,
1146 std::optional
<rgw_user
> _user_id
,
1147 const rgw_bucket
& _src_bucket
,
1148 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
1149 const RGWBucketInfo
& _dest_bucket_info
,
1150 const rgw_obj_key
& _key
,
1151 const std::optional
<rgw_obj_key
>& _dest_key
,
1152 std::optional
<uint64_t> _versioned_epoch
,
1154 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
1155 const rgw_zone_set_entry
& source_trace_entry
,
1156 rgw_zone_set
*_zones_trace
,
1157 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
1158 : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1159 async_rados(_async_rados
), store(_store
),
1160 source_zone(_source_zone
),
1162 src_bucket(_src_bucket
),
1163 dest_placement_rule(_dest_placement_rule
),
1164 dest_bucket_info(_dest_bucket_info
),
1166 dest_key(_dest_key
),
1167 versioned_epoch(_versioned_epoch
),
1168 copy_if_newer(_if_newer
),
1171 source_trace_entry(source_trace_entry
),
1172 zones_trace(_zones_trace
), counters(counters
), dpp(dpp
) {}
1175 ~RGWFetchRemoteObjCR() override
{
1179 void request_cleanup() override
{
1186 int send_request(const DoutPrefixProvider
*dpp
) override
{
1187 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
,
1188 source_zone
, user_id
, src_bucket
, dest_placement_rule
, dest_bucket_info
,
1189 key
, dest_key
, versioned_epoch
, copy_if_newer
, filter
,
1190 source_trace_entry
, zones_trace
, counters
, dpp
);
1191 async_rados
->queue(req
);
1195 int request_complete() override
{
1196 return req
->get_ret_status();
1200 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
1201 rgw::sal::RadosStore
* store
;
1202 rgw_zone_id source_zone
;
1204 rgw_bucket src_bucket
;
1207 ceph::real_time
*pmtime
;
1210 std::map
<std::string
, bufferlist
> *pattrs
;
1211 std::map
<std::string
, std::string
> *pheaders
;
1214 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1216 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
1217 const rgw_zone_id
& _source_zone
,
1218 rgw_bucket
& _src_bucket
,
1219 const rgw_obj_key
& _key
,
1220 ceph::real_time
*_pmtime
,
1222 std::string
*_petag
,
1223 std::map
<std::string
, bufferlist
> *_pattrs
,
1224 std::map
<std::string
, std::string
> *_pheaders
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1225 source_zone(_source_zone
),
1226 src_bucket(_src_bucket
),
1232 pheaders(_pheaders
) {}
1235 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
1237 RGWAsyncRadosProcessor
*async_rados
;
1238 rgw::sal::RadosStore
* store
;
1239 rgw_zone_id source_zone
;
1241 rgw_bucket src_bucket
;
1244 ceph::real_time
*pmtime
;
1247 std::map
<std::string
, bufferlist
> *pattrs
;
1248 std::map
<std::string
, std::string
> *pheaders
;
1250 RGWAsyncStatRemoteObj
*req
;
1253 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1254 const rgw_zone_id
& _source_zone
,
1255 rgw_bucket
& _src_bucket
,
1256 const rgw_obj_key
& _key
,
1257 ceph::real_time
*_pmtime
,
1259 std::string
*_petag
,
1260 std::map
<std::string
, bufferlist
> *_pattrs
,
1261 std::map
<std::string
, std::string
> *_pheaders
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1262 async_rados(_async_rados
), store(_store
),
1263 source_zone(_source_zone
),
1264 src_bucket(_src_bucket
),
1270 pheaders(_pheaders
),
1274 ~RGWStatRemoteObjCR() override
{
1278 void request_cleanup() override
{
1285 int send_request(const DoutPrefixProvider
*dpp
) override
{
1286 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
1287 src_bucket
, key
, pmtime
, psize
, petag
, pattrs
, pheaders
);
1288 async_rados
->queue(req
);
1292 int request_complete() override
{
1293 return req
->get_ret_status();
1297 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
1298 const DoutPrefixProvider
*dpp
;
1299 rgw::sal::RadosStore
* store
;
1300 rgw_zone_id source_zone
;
1302 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
1303 std::unique_ptr
<rgw::sal::Object
> obj
;
1306 std::string owner_display_name
;
1308 uint64_t versioned_epoch
;
1309 std::string marker_version_id
;
1312 ceph::real_time timestamp
;
1313 rgw_zone_set zones_trace
;
1316 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1318 RGWAsyncRemoveObj(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
1319 rgw::sal::RadosStore
* _store
,
1320 const rgw_zone_id
& _source_zone
,
1321 RGWBucketInfo
& _bucket_info
,
1322 const rgw_obj_key
& _key
,
1323 const std::string
& _owner
,
1324 const std::string
& _owner_display_name
,
1326 uint64_t _versioned_epoch
,
1327 bool _delete_marker
,
1329 real_time
& _timestamp
,
1330 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), store(_store
),
1331 source_zone(_source_zone
),
1333 owner_display_name(_owner_display_name
),
1334 versioned(_versioned
),
1335 versioned_epoch(_versioned_epoch
),
1336 del_if_older(_if_older
),
1337 timestamp(_timestamp
) {
1338 if (_delete_marker
) {
1339 marker_version_id
= _key
.instance
;
1343 zones_trace
= *_zones_trace
;
1345 store
->get_bucket(nullptr, _bucket_info
, &bucket
);
1346 obj
= bucket
->get_object(_key
);
1350 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
1351 const DoutPrefixProvider
*dpp
;
1353 RGWAsyncRadosProcessor
*async_rados
;
1354 rgw::sal::RadosStore
* store
;
1355 rgw_zone_id source_zone
;
1357 RGWBucketInfo bucket_info
;
1361 uint64_t versioned_epoch
;
1364 std::string owner_display_name
;
1367 real_time timestamp
;
1369 RGWAsyncRemoveObj
*req
;
1371 rgw_zone_set
*zones_trace
;
1374 RGWRemoveObjCR(const DoutPrefixProvider
*_dpp
, RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1375 const rgw_zone_id
& _source_zone
,
1376 RGWBucketInfo
& _bucket_info
,
1377 const rgw_obj_key
& _key
,
1379 uint64_t _versioned_epoch
,
1380 std::string
*_owner
,
1381 std::string
*_owner_display_name
,
1382 bool _delete_marker
,
1383 real_time
*_timestamp
,
1384 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), dpp(_dpp
), cct(_store
->ctx()),
1385 async_rados(_async_rados
), store(_store
),
1386 source_zone(_source_zone
),
1387 bucket_info(_bucket_info
),
1389 versioned(_versioned
),
1390 versioned_epoch(_versioned_epoch
),
1391 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1392 del_if_older
= (_timestamp
!= NULL
);
1394 timestamp
= *_timestamp
;
1401 if (_owner_display_name
) {
1402 owner_display_name
= *_owner_display_name
;
1405 ~RGWRemoveObjCR() override
{
1409 void request_cleanup() override
{
1416 int send_request(const DoutPrefixProvider
*dpp
) override
{
1417 req
= new RGWAsyncRemoveObj(dpp
, this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1418 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1419 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1420 async_rados
->queue(req
);
1424 int request_complete() override
{
1425 return req
->get_ret_status();
1429 /// \brief Collect average latency
1431 /// Used in data sync to back off on concurrency when latency of lock
1432 /// operations rises.
1434 /// \warning This class is not thread safe. We do not use a mutex
1435 /// because all coroutines spawned by RGWDataSyncCR share a single thread.
1436 class LatencyMonitor
{
1437 ceph::timespan total
;
1438 std::uint64_t count
= 0;
1442 LatencyMonitor() = default;
1443 void add_latency(ceph::timespan latency
) {
1448 ceph::timespan
avg_latency() {
1449 using namespace std::literals
;
1450 return count
== 0 ? 0s
: total
/ count
;
1454 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1455 RGWAsyncRadosProcessor
* async_rados
;
1456 rgw::sal::RadosStore
* store
;
1458 const rgw_raw_obj obj
;
1460 const std::string lock_name
;
1461 const std::string cookie
{RGWSimpleRadosLockCR::gen_random_cookie(cct
)};
1464 bool going_down
{false};
1467 const ceph::timespan interval_tolerance
;
1468 const ceph::timespan ts_interval
;
1470 RGWCoroutine
* caller
;
1472 bool aborted
{false};
1474 ceph::coarse_mono_time last_renew_try_time
;
1475 ceph::coarse_mono_time current_time
;
1477 LatencyMonitor
* latency
;
1480 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
* async_rados
,
1481 rgw::sal::RadosStore
* _store
,
1482 rgw_raw_obj obj
, std::string lock_name
,
1483 int interval
, RGWCoroutine
* caller
,
1484 LatencyMonitor
* const latency
)
1485 : RGWCoroutine(_store
->ctx()), async_rados(async_rados
), store(_store
),
1486 obj(std::move(obj
)), lock_name(std::move(lock_name
)),
1487 interval(interval
), interval_tolerance(ceph::make_timespan(9*interval
/10)),
1488 ts_interval(ceph::make_timespan(interval
)), caller(caller
), latency(latency
)
1491 virtual ~RGWContinuousLeaseCR() override
;
1493 int operate(const DoutPrefixProvider
*dpp
) override
;
1495 bool is_locked() const {
1496 if (ceph::coarse_mono_clock::now() - last_renew_try_time
> ts_interval
) {
1502 void set_locked(bool status
) {
1516 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1517 const DoutPrefixProvider
*dpp
;
1518 rgw::sal::RadosStore
* store
;
1519 std::list
<cls_log_entry
> entries
;
1523 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1526 RGWRadosTimelogAddCR(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* _store
, const std::string
& _oid
,
1527 const cls_log_entry
& entry
);
1529 int send_request(const DoutPrefixProvider
*dpp
) override
;
1530 int request_complete() override
;
1533 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1534 const DoutPrefixProvider
*dpp
;
1535 rgw::sal::RadosStore
* store
;
1536 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1539 real_time start_time
;
1541 std::string from_marker
;
1542 std::string to_marker
;
1545 RGWRadosTimelogTrimCR(const DoutPrefixProvider
*dpp
,
1546 rgw::sal::RadosStore
* store
, const std::string
& oid
,
1547 const real_time
& start_time
, const real_time
& end_time
,
1548 const std::string
& from_marker
,
1549 const std::string
& to_marker
);
1551 int send_request(const DoutPrefixProvider
*dpp
) override
;
1552 int request_complete() override
;
1555 // wrapper to update last_trim_marker on success
1556 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1558 std::string
*last_trim_marker
;
1560 static constexpr const char* max_marker
= "99999999";
1562 RGWSyncLogTrimCR(const DoutPrefixProvider
*dpp
,
1563 rgw::sal::RadosStore
* store
, const std::string
& oid
,
1564 const std::string
& to_marker
, std::string
*last_trim_marker
);
1565 int request_complete() override
;
1568 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1569 const DoutPrefixProvider
*dpp
;
1570 rgw::sal::RadosStore
* store
;
1571 RGWBucketInfo bucket_info
;
1576 RGWObjVersionTracker
*objv_tracker
;
1578 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1580 RGWAsyncStatObj(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* store
,
1581 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1582 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1583 RGWObjVersionTracker
*objv_tracker
= nullptr)
1584 : RGWAsyncRadosRequest(caller
, cn
), dpp(dpp
), store(store
), obj(obj
), psize(psize
),
1585 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1588 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1589 const DoutPrefixProvider
*dpp
;
1590 rgw::sal::RadosStore
* store
;
1591 RGWAsyncRadosProcessor
*async_rados
;
1592 RGWBucketInfo bucket_info
;
1597 RGWObjVersionTracker
*objv_tracker
;
1598 RGWAsyncStatObj
*req
= nullptr;
1600 RGWStatObjCR(const DoutPrefixProvider
*dpp
, RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RadosStore
* store
,
1601 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1602 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1603 RGWObjVersionTracker
*objv_tracker
= nullptr);
1604 ~RGWStatObjCR() override
{
1607 void request_cleanup() override
;
1609 int send_request(const DoutPrefixProvider
*dpp
) override
;
1610 int request_complete() override
;
1613 /// coroutine wrapper for IoCtx::aio_notify()
1614 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1615 rgw::sal::RadosStore
* const store
;
1616 const rgw_raw_obj obj
;
1618 const uint64_t timeout_ms
;
1619 bufferlist
*response
;
1621 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1624 RGWRadosNotifyCR(rgw::sal::RadosStore
* store
, const rgw_raw_obj
& obj
,
1625 bufferlist
& request
, uint64_t timeout_ms
,
1626 bufferlist
*response
);
1628 int send_request(const DoutPrefixProvider
*dpp
) override
;
1629 int request_complete() override
;
1632 class RGWDataPostNotifyCR
: public RGWCoroutine
{
1634 RGWHTTPManager
& http_manager
;
1635 bc::flat_map
<int, bc::flat_set
<rgw_data_notify_entry
> >& shards
;
1636 const char *source_zone
;
1640 RGWDataPostNotifyCR(RGWRados
*_store
, RGWHTTPManager
& _http_manager
, bc::flat_map
<int,
1641 bc::flat_set
<rgw_data_notify_entry
> >& _shards
, const char *_zone
, RGWRESTConn
*_conn
)
1642 : RGWCoroutine(_store
->ctx()), store(_store
), http_manager(_http_manager
),
1643 shards(_shards
), source_zone(_zone
), conn(_conn
) {}
1645 int operate(const DoutPrefixProvider
* dpp
) override
;