1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
10 #include "rgw_rados.h"
11 #include "common/WorkQueue.h"
12 #include "common/Throttle.h"
16 #include "services/svc_sys_obj.h"
18 class RGWAsyncRadosRequest
: public RefCountedObject
{
20 RGWAioCompletionNotifier
*notifier
;
27 virtual int _send_request() = 0;
29 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
) : caller(_caller
), notifier(_cn
), retcode(0),
30 lock("RGWAsyncRadosRequest::lock") {
32 ~RGWAsyncRadosRequest() override
{
40 retcode
= _send_request();
42 Mutex::Locker
l(lock
);
44 notifier
->cb(); // drops its own ref
51 int get_ret_status() { return retcode
; }
55 Mutex::Locker
l(lock
);
57 // we won't call notifier->cb() to drop its ref, so drop it here
67 class RGWAsyncRadosProcessor
{
68 deque
<RGWAsyncRadosRequest
*> m_req_queue
;
69 std::atomic
<bool> going_down
= { false };
73 Throttle req_throttle
;
75 struct RGWWQ
: public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
76 RGWAsyncRadosProcessor
*processor
;
77 RGWWQ(RGWAsyncRadosProcessor
*p
, time_t timeout
, time_t suicide_timeout
, ThreadPool
*tp
)
78 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
80 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
81 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
84 bool _empty() override
;
85 RGWAsyncRadosRequest
*_dequeue() override
;
86 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
87 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
89 void _clear() override
{
90 ceph_assert(processor
->m_req_queue
.empty());
95 RGWAsyncRadosProcessor(RGWRados
*_store
, int num_threads
);
96 ~RGWAsyncRadosProcessor() {}
99 void handle_request(RGWAsyncRadosRequest
*req
);
100 void queue(RGWAsyncRadosRequest
*req
);
102 bool is_going_down() {
108 class RGWSimpleWriteOnlyAsyncCR
: public RGWSimpleCoroutine
{
109 RGWAsyncRadosProcessor
*async_rados
;
114 class Request
: public RGWAsyncRadosRequest
{
118 int _send_request() override
;
120 Request(RGWCoroutine
*caller
,
121 RGWAioCompletionNotifier
*cn
,
123 const P
& _params
) : RGWAsyncRadosRequest(caller
, cn
),
129 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
131 const P
& _params
) : RGWSimpleCoroutine(_store
->ctx()),
132 async_rados(_async_rados
),
136 ~RGWSimpleWriteOnlyAsyncCR() override
{
139 void request_cleanup() override
{
146 int send_request() override
{
147 req
= new Request(this,
148 stack
->create_completion_notifier(),
152 async_rados
->queue(req
);
155 int request_complete() override
{
156 return req
->get_ret_status();
161 template <class P
, class R
>
162 class RGWSimpleAsyncCR
: public RGWSimpleCoroutine
{
163 RGWAsyncRadosProcessor
*async_rados
;
167 std::shared_ptr
<R
> result
;
169 class Request
: public RGWAsyncRadosRequest
{
172 std::shared_ptr
<R
> result
;
174 int _send_request() override
;
176 Request(RGWCoroutine
*caller
,
177 RGWAioCompletionNotifier
*cn
,
180 std::shared_ptr
<R
>& _result
) : RGWAsyncRadosRequest(caller
, cn
),
187 RGWSimpleAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
190 std::shared_ptr
<R
>& _result
) : RGWSimpleCoroutine(_store
->ctx()),
191 async_rados(_async_rados
),
196 ~RGWSimpleAsyncCR() override
{
199 void request_cleanup() override
{
206 int send_request() override
{
207 req
= new Request(this,
208 stack
->create_completion_notifier(),
213 async_rados
->queue(req
);
216 int request_complete() override
{
217 return req
->get_ret_status();
222 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
223 RGWSysObjectCtx obj_ctx
;
224 RGWObjVersionTracker objv_tracker
;
226 const bool want_attrs
;
227 const bool raw_attrs
;
229 int _send_request() override
;
231 RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
232 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
233 bool want_attrs
, bool raw_attrs
);
236 map
<string
, bufferlist
> attrs
;
239 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
246 int _send_request() override
;
248 RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
249 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
250 bool _exclusive
, bufferlist _bl
);
252 RGWObjVersionTracker objv_tracker
;
255 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
258 map
<string
, bufferlist
> attrs
;
261 int _send_request() override
;
263 RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
264 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
265 map
<string
, bufferlist
> _attrs
);
267 RGWObjVersionTracker objv_tracker
;
270 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
275 uint32_t duration_secs
;
278 int _send_request() override
;
280 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
281 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
282 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
);
285 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
292 int _send_request() override
;
294 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
295 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
296 const string
& _name
, const string
& _cookie
);
300 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
301 RGWAsyncRadosProcessor
*async_rados
;
306 /// on ENOENT, call handle_data() with an empty object instead of failing
307 const bool empty_on_enoent
;
308 RGWObjVersionTracker
*objv_tracker
;
309 RGWAsyncGetSystemObj
*req
{nullptr};
312 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
313 const rgw_raw_obj
& _obj
,
314 T
*_result
, bool empty_on_enoent
= true,
315 RGWObjVersionTracker
*objv_tracker
= nullptr)
316 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
), svc(_svc
),
317 obj(_obj
), result(_result
),
318 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
319 ~RGWSimpleRadosReadCR() override
{
323 void request_cleanup() override
{
330 int send_request() override
;
331 int request_complete() override
;
333 virtual int handle_data(T
& data
) {
339 int RGWSimpleRadosReadCR
<T
>::send_request()
341 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(), svc
,
342 objv_tracker
, obj
, false, false);
343 async_rados
->queue(req
);
348 int RGWSimpleRadosReadCR
<T
>::request_complete()
350 int ret
= req
->get_ret_status();
352 if (ret
== -ENOENT
&& empty_on_enoent
) {
359 auto iter
= req
->bl
.cbegin();
361 // allow successful reads with empty buffers. ReadSyncStatus coroutines
362 // depend on this to be able to read without locking, because the
363 // cls lock from InitSyncStatus will create an empty object if it didn't
367 decode(*result
, iter
);
369 } catch (buffer::error
& err
) {
374 return handle_data(*result
);
377 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
378 RGWAsyncRadosProcessor
*async_rados
;
382 map
<string
, bufferlist
> *pattrs
;
384 RGWAsyncGetSystemObj
*req
;
387 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
388 const rgw_raw_obj
& _obj
,
389 map
<string
, bufferlist
> *_pattrs
, bool _raw_attrs
) : RGWSimpleCoroutine(_svc
->ctx()),
390 async_rados(_async_rados
), svc(_svc
),
393 raw_attrs(_raw_attrs
),
395 ~RGWSimpleRadosReadAttrsCR() override
{
399 void request_cleanup() override
{
406 int send_request() override
;
407 int request_complete() override
;
411 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
412 RGWAsyncRadosProcessor
*async_rados
;
416 RGWObjVersionTracker
*objv_tracker
;
417 RGWAsyncPutSystemObj
*req
{nullptr};
420 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
421 const rgw_raw_obj
& _obj
,
422 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
423 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
),
424 svc(_svc
), obj(_obj
), objv_tracker(objv_tracker
) {
428 ~RGWSimpleRadosWriteCR() override
{
432 void request_cleanup() override
{
439 int send_request() override
{
440 req
= new RGWAsyncPutSystemObj(this, stack
->create_completion_notifier(),
441 svc
, objv_tracker
, obj
, false, std::move(bl
));
442 async_rados
->queue(req
);
446 int request_complete() override
{
447 if (objv_tracker
) { // copy the updated version
448 *objv_tracker
= req
->objv_tracker
;
450 return req
->get_ret_status();
454 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
455 RGWAsyncRadosProcessor
*async_rados
;
457 RGWObjVersionTracker
*objv_tracker
;
460 map
<string
, bufferlist
> attrs
;
461 RGWAsyncPutSystemObjAttrs
*req
= nullptr;
464 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor
*_async_rados
,
465 RGWSI_SysObj
*_svc
, const rgw_raw_obj
& _obj
,
466 map
<string
, bufferlist
> _attrs
,
467 RGWObjVersionTracker
*objv_tracker
= nullptr)
468 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
),
469 svc(_svc
), objv_tracker(objv_tracker
), obj(_obj
),
470 attrs(std::move(_attrs
)) {
472 ~RGWSimpleRadosWriteAttrsCR() override
{
476 void request_cleanup() override
{
483 int send_request() override
{
484 req
= new RGWAsyncPutSystemObjAttrs(this, stack
->create_completion_notifier(),
485 svc
, objv_tracker
, obj
, std::move(attrs
));
486 async_rados
->queue(req
);
490 int request_complete() override
{
491 if (objv_tracker
) { // copy the updated version
492 *objv_tracker
= req
->objv_tracker
;
494 return req
->get_ret_status();
498 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
500 map
<string
, bufferlist
> entries
;
506 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
509 RGWRadosSetOmapKeysCR(RGWRados
*_store
,
510 const rgw_raw_obj
& _obj
,
511 map
<string
, bufferlist
>& _entries
);
513 int send_request() override
;
514 int request_complete() override
;
517 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
521 std::set
<std::string
> entries
;
524 using ResultPtr
= std::shared_ptr
<Result
>;
526 RGWRadosGetOmapKeysCR(RGWRados
*_store
, const rgw_raw_obj
& _obj
,
527 const string
& _marker
, int _max_entries
,
530 int send_request() override
;
531 int request_complete() override
;
539 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
542 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
551 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
554 RGWRadosRemoveOmapKeysCR(RGWRados
*_store
,
555 const rgw_raw_obj
& _obj
,
556 const set
<string
>& _keys
);
558 int send_request() override
;
560 int request_complete() override
;
563 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
565 librados::IoCtx ioctx
;
566 const rgw_raw_obj obj
;
567 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
570 RGWRadosRemoveCR(RGWRados
*store
, const rgw_raw_obj
& obj
);
572 int send_request() override
;
573 int request_complete() override
;
576 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
577 RGWAsyncRadosProcessor
*async_rados
;
585 RGWAsyncLockSystemObj
*req
;
588 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
589 const rgw_raw_obj
& _obj
,
590 const string
& _lock_name
,
591 const string
& _cookie
,
593 ~RGWSimpleRadosLockCR() override
{
596 void request_cleanup() override
;
598 int send_request() override
;
599 int request_complete() override
;
601 static std::string
gen_random_cookie(CephContext
* cct
) {
602 #define COOKIE_LEN 16
603 char buf
[COOKIE_LEN
+ 1];
604 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
609 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
610 RGWAsyncRadosProcessor
*async_rados
;
617 RGWAsyncUnlockSystemObj
*req
;
620 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
621 const rgw_raw_obj
& _obj
,
622 const string
& _lock_name
,
623 const string
& _cookie
);
624 ~RGWSimpleRadosUnlockCR() override
{
627 void request_cleanup() override
;
629 int send_request() override
;
630 int request_complete() override
;
633 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
635 class RGWOmapAppend
: public RGWConsumerCR
<string
> {
636 RGWAsyncRadosProcessor
*async_rados
;
643 int num_pending_entries
;
644 list
<string
> pending_entries
;
646 map
<string
, bufferlist
> entries
;
648 uint64_t window_size
;
649 uint64_t total_entries
;
651 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
652 const rgw_raw_obj
& _obj
,
653 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
654 int operate() override
;
655 void flush_pending();
656 bool append(const string
& s
);
659 uint64_t get_total_entries() {
660 return total_entries
;
663 const rgw_raw_obj
& get_obj() {
668 class RGWAsyncWait
: public RGWAsyncRadosRequest
{
674 int _send_request() override
{
675 Mutex::Locker
l(*lock
);
676 return cond
->WaitInterval(*lock
, interval
);
679 RGWAsyncWait(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, CephContext
*_cct
,
680 Mutex
*_lock
, Cond
*_cond
, int _secs
) : RGWAsyncRadosRequest(caller
, cn
),
682 lock(_lock
), cond(_cond
), interval(_secs
, 0) {}
685 Mutex::Locker
l(*lock
);
690 class RGWWaitCR
: public RGWSimpleCoroutine
{
692 RGWAsyncRadosProcessor
*async_rados
;
700 RGWWaitCR(RGWAsyncRadosProcessor
*_async_rados
, CephContext
*_cct
,
701 Mutex
*_lock
, Cond
*_cond
,
702 int _secs
) : RGWSimpleCoroutine(_cct
), cct(_cct
),
703 async_rados(_async_rados
), lock(_lock
), cond(_cond
), secs(_secs
), req(NULL
) {
705 ~RGWWaitCR() override
{
709 void request_cleanup() override
{
717 int send_request() override
{
718 req
= new RGWAsyncWait(this, stack
->create_completion_notifier(), cct
, lock
, cond
, secs
);
719 async_rados
->queue(req
);
723 int request_complete() override
{
724 return req
->get_ret_status();
732 class RGWShardedOmapCRManager
{
733 RGWAsyncRadosProcessor
*async_rados
;
739 vector
<RGWOmapAppend
*> shards
;
741 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const string
& oid_prefix
)
742 : async_rados(_async_rados
),
743 store(_store
), op(_op
), num_shards(_num_shards
) {
744 shards
.reserve(num_shards
);
745 for (int i
= 0; i
< num_shards
; ++i
) {
746 char buf
[oid_prefix
.size() + 16];
747 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
748 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
750 shards
.push_back(shard
);
751 op
->spawn(shard
, false);
755 ~RGWShardedOmapCRManager() {
756 for (auto shard
: shards
) {
761 bool append(const string
& entry
, int shard_id
) {
762 return shards
[shard_id
]->append(entry
);
766 for (vector
<RGWOmapAppend
*>::iterator iter
= shards
.begin(); iter
!= shards
.end(); ++iter
) {
767 success
&= ((*iter
)->finish() && (!(*iter
)->is_error()));
772 uint64_t get_total_entries(int shard_id
) {
773 return shards
[shard_id
]->get_total_entries();
777 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
779 const std::string oid
;
782 int _send_request() override
;
784 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
785 RGWRados
*_store
, const std::string
& oid
)
786 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), oid(oid
) {}
788 RGWBucketInfo bucket_info
;
791 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
792 RGWAsyncRadosProcessor
*async_rados
;
794 const std::string oid
;
795 RGWBucketInfo
*bucket_info
;
797 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
800 // metadata key constructor
801 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
802 const std::string
& meta_key
, RGWBucketInfo
*_bucket_info
)
803 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
804 oid(RGW_BUCKET_INSTANCE_MD_PREFIX
+ meta_key
),
805 bucket_info(_bucket_info
) {}
806 // rgw_bucket constructor
807 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
808 const rgw_bucket
& bucket
, RGWBucketInfo
*_bucket_info
)
809 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
810 oid(RGW_BUCKET_INSTANCE_MD_PREFIX
+ bucket
.get_key(':')),
811 bucket_info(_bucket_info
) {}
812 ~RGWGetBucketInstanceInfoCR() override
{
815 void request_cleanup() override
{
822 int send_request() override
{
823 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, oid
);
824 async_rados
->queue(req
);
827 int request_complete() override
{
829 *bucket_info
= std::move(req
->bucket_info
);
831 return req
->get_ret_status();
835 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
836 RGWRados::BucketShard bs
;
837 std::string start_marker
;
838 std::string end_marker
;
839 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
841 RGWRadosBILogTrimCR(RGWRados
*store
, const RGWBucketInfo
& bucket_info
,
842 int shard_id
, const std::string
& start_marker
,
843 const std::string
& end_marker
);
845 int send_request() override
;
846 int request_complete() override
;
849 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
853 RGWBucketInfo bucket_info
;
854 std::optional
<rgw_placement_rule
> dest_placement_rule
;
857 std::optional
<rgw_obj_key
> dest_key
;
858 std::optional
<uint64_t> versioned_epoch
;
863 rgw_zone_set zones_trace
;
864 PerfCounters
* counters
;
867 int _send_request() override
;
869 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
870 const string
& _source_zone
,
871 RGWBucketInfo
& _bucket_info
,
872 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
873 const rgw_obj_key
& _key
,
874 const std::optional
<rgw_obj_key
>& _dest_key
,
875 std::optional
<uint64_t> _versioned_epoch
,
876 bool _if_newer
, rgw_zone_set
*_zones_trace
,
877 PerfCounters
* counters
)
878 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
879 source_zone(_source_zone
),
880 bucket_info(_bucket_info
),
881 dest_placement_rule(_dest_placement_rule
),
884 versioned_epoch(_versioned_epoch
),
885 copy_if_newer(_if_newer
), counters(counters
)
888 zones_trace
= *_zones_trace
;
893 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
895 RGWAsyncRadosProcessor
*async_rados
;
899 RGWBucketInfo bucket_info
;
900 std::optional
<rgw_placement_rule
> dest_placement_rule
;
903 std::optional
<rgw_obj_key
> dest_key
;
904 std::optional
<uint64_t> versioned_epoch
;
910 RGWAsyncFetchRemoteObj
*req
;
911 rgw_zone_set
*zones_trace
;
912 PerfCounters
* counters
;
915 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
916 const string
& _source_zone
,
917 RGWBucketInfo
& _bucket_info
,
918 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
919 const rgw_obj_key
& _key
,
920 const std::optional
<rgw_obj_key
>& _dest_key
,
921 std::optional
<uint64_t> _versioned_epoch
,
922 bool _if_newer
, rgw_zone_set
*_zones_trace
,
923 PerfCounters
* counters
)
924 : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
925 async_rados(_async_rados
), store(_store
),
926 source_zone(_source_zone
),
927 bucket_info(_bucket_info
),
928 dest_placement_rule(_dest_placement_rule
),
931 versioned_epoch(_versioned_epoch
),
932 copy_if_newer(_if_newer
), req(NULL
),
933 zones_trace(_zones_trace
), counters(counters
) {}
936 ~RGWFetchRemoteObjCR() override
{
940 void request_cleanup() override
{
947 int send_request() override
{
948 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
,
949 source_zone
, bucket_info
, dest_placement_rule
,
950 key
, dest_key
, versioned_epoch
, copy_if_newer
,
951 zones_trace
, counters
);
952 async_rados
->queue(req
);
956 int request_complete() override
{
957 return req
->get_ret_status();
961 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
965 RGWBucketInfo bucket_info
;
969 ceph::real_time
*pmtime
;
972 map
<string
, bufferlist
> *pattrs
;
973 map
<string
, string
> *pheaders
;
976 int _send_request() override
;
978 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
979 const string
& _source_zone
,
980 RGWBucketInfo
& _bucket_info
,
981 const rgw_obj_key
& _key
,
982 ceph::real_time
*_pmtime
,
985 map
<string
, bufferlist
> *_pattrs
,
986 map
<string
, string
> *_pheaders
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
987 source_zone(_source_zone
),
988 bucket_info(_bucket_info
),
994 pheaders(_pheaders
) {}
997 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
999 RGWAsyncRadosProcessor
*async_rados
;
1003 RGWBucketInfo bucket_info
;
1007 ceph::real_time
*pmtime
;
1010 map
<string
, bufferlist
> *pattrs
;
1011 map
<string
, string
> *pheaders
;
1013 RGWAsyncStatRemoteObj
*req
;
1016 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
1017 const string
& _source_zone
,
1018 RGWBucketInfo
& _bucket_info
,
1019 const rgw_obj_key
& _key
,
1020 ceph::real_time
*_pmtime
,
1023 map
<string
, bufferlist
> *_pattrs
,
1024 map
<string
, string
> *_pheaders
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1025 async_rados(_async_rados
), store(_store
),
1026 source_zone(_source_zone
),
1027 bucket_info(_bucket_info
),
1033 pheaders(_pheaders
),
1037 ~RGWStatRemoteObjCR() override
{
1041 void request_cleanup() override
{
1048 int send_request() override
{
1049 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
1050 bucket_info
, key
, pmtime
, psize
, petag
, pattrs
, pheaders
);
1051 async_rados
->queue(req
);
1055 int request_complete() override
{
1056 return req
->get_ret_status();
1060 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
1064 RGWBucketInfo bucket_info
;
1068 string owner_display_name
;
1070 uint64_t versioned_epoch
;
1071 string marker_version_id
;
1074 ceph::real_time timestamp
;
1075 rgw_zone_set zones_trace
;
1078 int _send_request() override
;
1080 RGWAsyncRemoveObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
1081 const string
& _source_zone
,
1082 RGWBucketInfo
& _bucket_info
,
1083 const rgw_obj_key
& _key
,
1084 const string
& _owner
,
1085 const string
& _owner_display_name
,
1087 uint64_t _versioned_epoch
,
1088 bool _delete_marker
,
1090 real_time
& _timestamp
,
1091 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1092 source_zone(_source_zone
),
1093 bucket_info(_bucket_info
),
1096 owner_display_name(_owner_display_name
),
1097 versioned(_versioned
),
1098 versioned_epoch(_versioned_epoch
),
1099 del_if_older(_if_older
),
1100 timestamp(_timestamp
) {
1101 if (_delete_marker
) {
1102 marker_version_id
= key
.instance
;
1106 zones_trace
= *_zones_trace
;
1111 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
1113 RGWAsyncRadosProcessor
*async_rados
;
1117 RGWBucketInfo bucket_info
;
1121 uint64_t versioned_epoch
;
1124 string owner_display_name
;
1127 real_time timestamp
;
1129 RGWAsyncRemoveObj
*req
;
1131 rgw_zone_set
*zones_trace
;
1134 RGWRemoveObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
1135 const string
& _source_zone
,
1136 RGWBucketInfo
& _bucket_info
,
1137 const rgw_obj_key
& _key
,
1139 uint64_t _versioned_epoch
,
1141 string
*_owner_display_name
,
1142 bool _delete_marker
,
1143 real_time
*_timestamp
,
1144 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1145 async_rados(_async_rados
), store(_store
),
1146 source_zone(_source_zone
),
1147 bucket_info(_bucket_info
),
1149 versioned(_versioned
),
1150 versioned_epoch(_versioned_epoch
),
1151 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1152 del_if_older
= (_timestamp
!= NULL
);
1154 timestamp
= *_timestamp
;
1161 if (_owner_display_name
) {
1162 owner_display_name
= *_owner_display_name
;
1165 ~RGWRemoveObjCR() override
{
1169 void request_cleanup() override
{
1176 int send_request() override
{
1177 req
= new RGWAsyncRemoveObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1178 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1179 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1180 async_rados
->queue(req
);
1184 int request_complete() override
{
1185 return req
->get_ret_status();
1189 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1190 RGWAsyncRadosProcessor
*async_rados
;
1193 const rgw_raw_obj obj
;
1195 const string lock_name
;
1196 const string cookie
;
1201 std::atomic
<bool> going_down
= { false };
1204 RGWCoroutine
*caller
;
1206 bool aborted
{false};
1209 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
1210 const rgw_raw_obj
& _obj
,
1211 const string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1212 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1213 obj(_obj
), lock_name(_lock_name
),
1214 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1215 interval(_interval
), lock("RGWContinuousLeaseCR"), caller(_caller
)
1218 int operate() override
;
1221 Mutex::Locker
l(lock
);
1225 void set_locked(bool status
) {
1226 Mutex::Locker
l(lock
);
1240 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1242 list
<cls_log_entry
> entries
;
1246 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1249 RGWRadosTimelogAddCR(RGWRados
*_store
, const string
& _oid
,
1250 const cls_log_entry
& entry
);
1252 int send_request() override
;
1253 int request_complete() override
;
1256 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1258 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1261 real_time start_time
;
1263 std::string from_marker
;
1264 std::string to_marker
;
1267 RGWRadosTimelogTrimCR(RGWRados
*store
, const std::string
& oid
,
1268 const real_time
& start_time
, const real_time
& end_time
,
1269 const std::string
& from_marker
,
1270 const std::string
& to_marker
);
1272 int send_request() override
;
1273 int request_complete() override
;
1276 // wrapper to update last_trim_marker on success
1277 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1279 std::string
*last_trim_marker
;
1281 RGWSyncLogTrimCR(RGWRados
*store
, const std::string
& oid
,
1282 const std::string
& to_marker
, std::string
*last_trim_marker
);
1283 int request_complete() override
;
1286 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1288 RGWBucketInfo bucket_info
;
1293 RGWObjVersionTracker
*objv_tracker
;
1295 int _send_request() override
;
1297 RGWAsyncStatObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*store
,
1298 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1299 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1300 RGWObjVersionTracker
*objv_tracker
= nullptr)
1301 : RGWAsyncRadosRequest(caller
, cn
), store(store
), obj(obj
), psize(psize
),
1302 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1305 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1307 RGWAsyncRadosProcessor
*async_rados
;
1308 RGWBucketInfo bucket_info
;
1313 RGWObjVersionTracker
*objv_tracker
;
1314 RGWAsyncStatObj
*req
= nullptr;
1316 RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, RGWRados
*store
,
1317 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1318 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1319 RGWObjVersionTracker
*objv_tracker
= nullptr);
1320 ~RGWStatObjCR() override
{
1323 void request_cleanup() override
;
1325 int send_request() override
;
1326 int request_complete() override
;
1329 /// coroutine wrapper for IoCtx::aio_notify()
1330 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1331 RGWRados
*const store
;
1332 const rgw_raw_obj obj
;
1334 const uint64_t timeout_ms
;
1335 bufferlist
*response
;
1337 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1340 RGWRadosNotifyCR(RGWRados
*store
, const rgw_raw_obj
& obj
,
1341 bufferlist
& request
, uint64_t timeout_ms
,
1342 bufferlist
*response
);
1344 int send_request() override
;
1345 int request_complete() override
;