1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
13 class RGWAsyncRadosRequest
: public RefCountedObject
{
15 RGWAioCompletionNotifier
*notifier
;
22 virtual int _send_request() = 0;
24 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
) : caller(_caller
), notifier(_cn
), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
27 ~RGWAsyncRadosRequest() override
{
35 retcode
= _send_request();
37 Mutex::Locker
l(lock
);
39 notifier
->cb(); // drops its own ref
46 int get_ret_status() { return retcode
; }
50 Mutex::Locker
l(lock
);
52 // we won't call notifier->cb() to drop its ref, so drop it here
62 class RGWAsyncRadosProcessor
{
63 deque
<RGWAsyncRadosRequest
*> m_req_queue
;
64 std::atomic
<bool> going_down
= { false };
68 Throttle req_throttle
;
70 struct RGWWQ
: public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
71 RGWAsyncRadosProcessor
*processor
;
72 RGWWQ(RGWAsyncRadosProcessor
*p
, time_t timeout
, time_t suicide_timeout
, ThreadPool
*tp
)
73 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
75 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
76 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
79 bool _empty() override
;
80 RGWAsyncRadosRequest
*_dequeue() override
;
81 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
82 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
84 void _clear() override
{
85 assert(processor
->m_req_queue
.empty());
90 RGWAsyncRadosProcessor(RGWRados
*_store
, int num_threads
);
91 ~RGWAsyncRadosProcessor() {}
94 void handle_request(RGWAsyncRadosRequest
*req
);
95 void queue(RGWAsyncRadosRequest
*req
);
97 bool is_going_down() {
103 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
105 RGWObjectCtx
*obj_ctx
;
106 RGWRados::SystemObject::Read::GetObjState read_state
;
107 RGWObjVersionTracker
*objv_tracker
;
110 map
<string
, bufferlist
> *pattrs
;
114 int _send_request() override
;
116 RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
, RGWObjectCtx
*_obj_ctx
,
117 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
118 bufferlist
*_pbl
, off_t _ofs
, off_t _end
);
119 void set_read_attrs(map
<string
, bufferlist
> *_pattrs
) { pattrs
= _pattrs
; }
122 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
124 RGWObjVersionTracker
*objv_tracker
;
130 int _send_request() override
;
132 RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
133 RGWObjVersionTracker
*_objv_tracker
, rgw_raw_obj
& _obj
,
134 bool _exclusive
, bufferlist
& _bl
);
137 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
139 RGWObjVersionTracker
*objv_tracker
;
141 map
<string
, bufferlist
> *attrs
;
144 int _send_request() override
;
146 RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
147 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
148 map
<string
, bufferlist
> *_attrs
);
151 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
156 uint32_t duration_secs
;
159 int _send_request() override
;
161 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
162 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
163 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
);
166 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
173 int _send_request() override
;
175 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
176 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
177 const string
& _name
, const string
& _cookie
);
181 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
182 RGWAsyncRadosProcessor
*async_rados
;
184 RGWObjectCtx obj_ctx
;
189 map
<string
, bufferlist
> *pattrs
{nullptr};
192 /// on ENOENT, call handle_data() with an empty object instead of failing
193 const bool empty_on_enoent
;
194 RGWObjVersionTracker
*objv_tracker
;
196 RGWAsyncGetSystemObj
*req
{nullptr};
199 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
200 const rgw_raw_obj
& _obj
,
201 T
*_result
, bool empty_on_enoent
= true,
202 RGWObjVersionTracker
*objv_tracker
= nullptr)
203 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
204 obj_ctx(store
), obj(_obj
), result(_result
),
205 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
206 ~RGWSimpleRadosReadCR() override
{
210 void request_cleanup() override
{
217 int send_request() override
;
218 int request_complete() override
;
220 virtual int handle_data(T
& data
) {
226 int RGWSimpleRadosReadCR
<T
>::send_request()
228 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(),
229 store
, &obj_ctx
, objv_tracker
,
233 req
->set_read_attrs(pattrs
);
235 async_rados
->queue(req
);
240 int RGWSimpleRadosReadCR
<T
>::request_complete()
242 int ret
= req
->get_ret_status();
244 if (ret
== -ENOENT
&& empty_on_enoent
) {
251 bufferlist::iterator iter
= bl
.begin();
253 // allow successful reads with empty buffers. ReadSyncStatus coroutines
254 // depend on this to be able to read without locking, because the
255 // cls lock from InitSyncStatus will create an empty object if it didnt
259 ::decode(*result
, iter
);
261 } catch (buffer::error
& err
) {
266 return handle_data(*result
);
269 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
270 RGWAsyncRadosProcessor
*async_rados
;
272 RGWObjectCtx obj_ctx
;
277 map
<string
, bufferlist
> *pattrs
;
279 RGWAsyncGetSystemObj
*req
;
282 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
283 const rgw_raw_obj
& _obj
,
284 map
<string
, bufferlist
> *_pattrs
) : RGWSimpleCoroutine(_store
->ctx()),
285 async_rados(_async_rados
), store(_store
),
290 ~RGWSimpleRadosReadAttrsCR() override
{
294 void request_cleanup() override
{
301 int send_request() override
;
302 int request_complete() override
;
306 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
307 RGWAsyncRadosProcessor
*async_rados
;
312 RGWObjVersionTracker
*objv_tracker
;
314 RGWAsyncPutSystemObj
*req
{nullptr};
317 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
318 const rgw_raw_obj
& _obj
,
319 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
320 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
),
321 store(_store
), obj(_obj
), objv_tracker(objv_tracker
) {
325 ~RGWSimpleRadosWriteCR() override
{
329 void request_cleanup() override
{
336 int send_request() override
{
337 req
= new RGWAsyncPutSystemObj(this, stack
->create_completion_notifier(),
338 store
, objv_tracker
, obj
, false, bl
);
339 async_rados
->queue(req
);
343 int request_complete() override
{
344 return req
->get_ret_status();
348 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
349 RGWAsyncRadosProcessor
*async_rados
;
354 map
<string
, bufferlist
> attrs
;
356 RGWAsyncPutSystemObjAttrs
*req
;
359 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
360 const rgw_raw_obj
& _obj
,
361 map
<string
, bufferlist
>& _attrs
) : RGWSimpleCoroutine(_store
->ctx()),
362 async_rados(_async_rados
),
365 attrs(_attrs
), req(NULL
) {
367 ~RGWSimpleRadosWriteAttrsCR() override
{
371 void request_cleanup() override
{
378 int send_request() override
{
379 req
= new RGWAsyncPutSystemObjAttrs(this, stack
->create_completion_notifier(),
380 store
, NULL
, obj
, &attrs
);
381 async_rados
->queue(req
);
385 int request_complete() override
{
386 return req
->get_ret_status();
390 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
392 map
<string
, bufferlist
> entries
;
398 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
401 RGWRadosSetOmapKeysCR(RGWRados
*_store
,
402 const rgw_raw_obj
& _obj
,
403 map
<string
, bufferlist
>& _entries
);
405 int send_request() override
;
406 int request_complete() override
;
409 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
413 map
<string
, bufferlist
> *entries
;
421 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
424 RGWRadosGetOmapKeysCR(RGWRados
*_store
,
425 const rgw_raw_obj
& _obj
,
426 const string
& _marker
,
427 map
<string
, bufferlist
> *_entries
, int _max_entries
);
429 int send_request() override
;
431 int request_complete() override
{
436 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
445 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
448 RGWRadosRemoveOmapKeysCR(RGWRados
*_store
,
449 const rgw_raw_obj
& _obj
,
450 const set
<string
>& _keys
);
452 int send_request() override
;
454 int request_complete() override
;
457 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
459 librados::IoCtx ioctx
;
460 const rgw_raw_obj obj
;
461 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
464 RGWRadosRemoveCR(RGWRados
*store
, const rgw_raw_obj
& obj
);
467 int request_complete();
470 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
471 RGWAsyncRadosProcessor
*async_rados
;
479 RGWAsyncLockSystemObj
*req
;
482 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
483 const rgw_raw_obj
& _obj
,
484 const string
& _lock_name
,
485 const string
& _cookie
,
487 ~RGWSimpleRadosLockCR() override
{
490 void request_cleanup() override
;
492 int send_request() override
;
493 int request_complete() override
;
495 static std::string
gen_random_cookie(CephContext
* cct
) {
496 #define COOKIE_LEN 16
497 char buf
[COOKIE_LEN
+ 1];
498 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
503 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
504 RGWAsyncRadosProcessor
*async_rados
;
511 RGWAsyncUnlockSystemObj
*req
;
514 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
515 const rgw_raw_obj
& _obj
,
516 const string
& _lock_name
,
517 const string
& _cookie
);
518 ~RGWSimpleRadosUnlockCR() override
{
521 void request_cleanup() override
;
523 int send_request() override
;
524 int request_complete() override
;
527 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
529 class RGWOmapAppend
: public RGWConsumerCR
<string
> {
530 RGWAsyncRadosProcessor
*async_rados
;
537 int num_pending_entries
;
538 list
<string
> pending_entries
;
540 map
<string
, bufferlist
> entries
;
542 uint64_t window_size
;
543 uint64_t total_entries
;
545 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
546 const rgw_raw_obj
& _obj
,
547 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
548 int operate() override
;
549 void flush_pending();
550 bool append(const string
& s
);
553 uint64_t get_total_entries() {
554 return total_entries
;
557 const rgw_raw_obj
& get_obj() {
562 class RGWAsyncWait
: public RGWAsyncRadosRequest
{
568 int _send_request() override
{
569 Mutex::Locker
l(*lock
);
570 return cond
->WaitInterval(*lock
, interval
);
573 RGWAsyncWait(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, CephContext
*_cct
,
574 Mutex
*_lock
, Cond
*_cond
, int _secs
) : RGWAsyncRadosRequest(caller
, cn
),
576 lock(_lock
), cond(_cond
), interval(_secs
, 0) {}
579 Mutex::Locker
l(*lock
);
584 class RGWWaitCR
: public RGWSimpleCoroutine
{
586 RGWAsyncRadosProcessor
*async_rados
;
594 RGWWaitCR(RGWAsyncRadosProcessor
*_async_rados
, CephContext
*_cct
,
595 Mutex
*_lock
, Cond
*_cond
,
596 int _secs
) : RGWSimpleCoroutine(_cct
), cct(_cct
),
597 async_rados(_async_rados
), lock(_lock
), cond(_cond
), secs(_secs
), req(NULL
) {
599 ~RGWWaitCR() override
{
603 void request_cleanup() override
{
611 int send_request() override
{
612 req
= new RGWAsyncWait(this, stack
->create_completion_notifier(), cct
, lock
, cond
, secs
);
613 async_rados
->queue(req
);
617 int request_complete() override
{
618 return req
->get_ret_status();
626 class RGWShardedOmapCRManager
{
627 RGWAsyncRadosProcessor
*async_rados
;
633 vector
<RGWOmapAppend
*> shards
;
635 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const string
& oid_prefix
)
636 : async_rados(_async_rados
),
637 store(_store
), op(_op
), num_shards(_num_shards
) {
638 shards
.reserve(num_shards
);
639 for (int i
= 0; i
< num_shards
; ++i
) {
640 char buf
[oid_prefix
.size() + 16];
641 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
642 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
644 shards
.push_back(shard
);
645 op
->spawn(shard
, false);
649 ~RGWShardedOmapCRManager() {
650 for (auto shard
: shards
) {
655 bool append(const string
& entry
, int shard_id
) {
656 return shards
[shard_id
]->append(entry
);
660 for (vector
<RGWOmapAppend
*>::iterator iter
= shards
.begin(); iter
!= shards
.end(); ++iter
) {
661 success
&= ((*iter
)->finish() && (!(*iter
)->is_error()));
666 uint64_t get_total_entries(int shard_id
) {
667 return shards
[shard_id
]->get_total_entries();
671 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
673 const std::string oid
;
674 RGWBucketInfo
*bucket_info
;
677 int _send_request() override
;
679 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
680 RGWRados
*_store
, const std::string
& oid
,
681 RGWBucketInfo
*_bucket_info
)
682 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
683 oid(oid
), bucket_info(_bucket_info
) {}
686 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
687 RGWAsyncRadosProcessor
*async_rados
;
689 const std::string oid
;
690 RGWBucketInfo
*bucket_info
;
692 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
695 // metadata key constructor
696 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
697 const std::string
& meta_key
, RGWBucketInfo
*_bucket_info
)
698 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
699 oid(RGW_BUCKET_INSTANCE_MD_PREFIX
+ meta_key
),
700 bucket_info(_bucket_info
) {}
701 // rgw_bucket constructor
702 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
703 const rgw_bucket
& bucket
, RGWBucketInfo
*_bucket_info
)
704 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
705 oid(RGW_BUCKET_INSTANCE_MD_PREFIX
+ bucket
.get_key(':')),
706 bucket_info(_bucket_info
) {}
707 ~RGWGetBucketInstanceInfoCR() override
{
710 void request_cleanup() override
{
717 int send_request() override
{
718 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, oid
, bucket_info
);
719 async_rados
->queue(req
);
722 int request_complete() override
{
723 return req
->get_ret_status();
727 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
728 RGWRados::BucketShard bs
;
729 std::string start_marker
;
730 std::string end_marker
;
731 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
733 RGWRadosBILogTrimCR(RGWRados
*store
, const RGWBucketInfo
& bucket_info
,
734 int shard_id
, const std::string
& start_marker
,
735 const std::string
& end_marker
);
737 int send_request() override
;
738 int request_complete() override
;
741 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
745 RGWBucketInfo bucket_info
;
748 uint64_t versioned_epoch
;
753 rgw_zone_set
*zones_trace
;
756 int _send_request() override
;
758 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
759 const string
& _source_zone
,
760 RGWBucketInfo
& _bucket_info
,
761 const rgw_obj_key
& _key
,
762 uint64_t _versioned_epoch
,
763 bool _if_newer
, rgw_zone_set
*_zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
764 source_zone(_source_zone
),
765 bucket_info(_bucket_info
),
767 versioned_epoch(_versioned_epoch
),
768 copy_if_newer(_if_newer
), zones_trace(_zones_trace
) {}
771 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
773 RGWAsyncRadosProcessor
*async_rados
;
777 RGWBucketInfo bucket_info
;
780 uint64_t versioned_epoch
;
786 RGWAsyncFetchRemoteObj
*req
;
787 rgw_zone_set
*zones_trace
;
790 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
791 const string
& _source_zone
,
792 RGWBucketInfo
& _bucket_info
,
793 const rgw_obj_key
& _key
,
794 uint64_t _versioned_epoch
,
795 bool _if_newer
, rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
796 async_rados(_async_rados
), store(_store
),
797 source_zone(_source_zone
),
798 bucket_info(_bucket_info
),
800 versioned_epoch(_versioned_epoch
),
801 copy_if_newer(_if_newer
), req(NULL
), zones_trace(_zones_trace
) {}
804 ~RGWFetchRemoteObjCR() override
{
808 void request_cleanup() override
{
815 int send_request() override
{
816 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
817 key
, versioned_epoch
, copy_if_newer
, zones_trace
);
818 async_rados
->queue(req
);
822 int request_complete() override
{
823 return req
->get_ret_status();
827 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
831 RGWBucketInfo bucket_info
;
835 ceph::real_time
*pmtime
;
837 map
<string
, bufferlist
> *pattrs
;
840 int _send_request() override
;
842 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
843 const string
& _source_zone
,
844 RGWBucketInfo
& _bucket_info
,
845 const rgw_obj_key
& _key
,
846 ceph::real_time
*_pmtime
,
848 map
<string
, bufferlist
> *_pattrs
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
849 source_zone(_source_zone
),
850 bucket_info(_bucket_info
),
857 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
859 RGWAsyncRadosProcessor
*async_rados
;
863 RGWBucketInfo bucket_info
;
867 ceph::real_time
*pmtime
;
869 map
<string
, bufferlist
> *pattrs
;
871 RGWAsyncStatRemoteObj
*req
;
874 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
875 const string
& _source_zone
,
876 RGWBucketInfo
& _bucket_info
,
877 const rgw_obj_key
& _key
,
878 ceph::real_time
*_pmtime
,
880 map
<string
, bufferlist
> *_pattrs
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
881 async_rados(_async_rados
), store(_store
),
882 source_zone(_source_zone
),
883 bucket_info(_bucket_info
),
891 ~RGWStatRemoteObjCR() override
{
895 void request_cleanup() override
{
902 int send_request() override
{
903 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
904 bucket_info
, key
, pmtime
, psize
, pattrs
);
905 async_rados
->queue(req
);
909 int request_complete() override
{
910 return req
->get_ret_status();
914 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
918 RGWBucketInfo bucket_info
;
922 string owner_display_name
;
924 uint64_t versioned_epoch
;
925 string marker_version_id
;
928 ceph::real_time timestamp
;
929 rgw_zone_set
*zones_trace
;
932 int _send_request() override
;
934 RGWAsyncRemoveObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
935 const string
& _source_zone
,
936 RGWBucketInfo
& _bucket_info
,
937 const rgw_obj_key
& _key
,
938 const string
& _owner
,
939 const string
& _owner_display_name
,
941 uint64_t _versioned_epoch
,
944 real_time
& _timestamp
,
945 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
946 source_zone(_source_zone
),
947 bucket_info(_bucket_info
),
950 owner_display_name(_owner_display_name
),
951 versioned(_versioned
),
952 versioned_epoch(_versioned_epoch
),
953 del_if_older(_if_older
),
954 timestamp(_timestamp
), zones_trace(_zones_trace
) {
955 if (_delete_marker
) {
956 marker_version_id
= key
.instance
;
961 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
963 RGWAsyncRadosProcessor
*async_rados
;
967 RGWBucketInfo bucket_info
;
971 uint64_t versioned_epoch
;
974 string owner_display_name
;
979 RGWAsyncRemoveObj
*req
;
981 rgw_zone_set
*zones_trace
;
984 RGWRemoveObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
985 const string
& _source_zone
,
986 RGWBucketInfo
& _bucket_info
,
987 const rgw_obj_key
& _key
,
989 uint64_t _versioned_epoch
,
991 string
*_owner_display_name
,
993 real_time
*_timestamp
,
994 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
995 async_rados(_async_rados
), store(_store
),
996 source_zone(_source_zone
),
997 bucket_info(_bucket_info
),
999 versioned(_versioned
),
1000 versioned_epoch(_versioned_epoch
),
1001 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1002 del_if_older
= (_timestamp
!= NULL
);
1004 timestamp
= *_timestamp
;
1011 if (_owner_display_name
) {
1012 owner_display_name
= *_owner_display_name
;
1015 ~RGWRemoveObjCR() override
{
1019 void request_cleanup() override
{
1026 int send_request() override
{
1027 req
= new RGWAsyncRemoveObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1028 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1029 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1030 async_rados
->queue(req
);
1034 int request_complete() override
{
1035 return req
->get_ret_status();
1039 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1040 RGWAsyncRadosProcessor
*async_rados
;
1043 const rgw_raw_obj obj
;
1045 const string lock_name
;
1046 const string cookie
;
1051 std::atomic
<bool> going_down
= { false };
1054 RGWCoroutine
*caller
;
1056 bool aborted
{false};
1059 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
1060 const rgw_raw_obj
& _obj
,
1061 const string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1062 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1063 obj(_obj
), lock_name(_lock_name
),
1064 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1065 interval(_interval
), lock("RGWContinuousLeaseCR"), caller(_caller
)
1068 int operate() override
;
1071 Mutex::Locker
l(lock
);
1075 void set_locked(bool status
) {
1076 Mutex::Locker
l(lock
);
1090 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1092 list
<cls_log_entry
> entries
;
1096 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1099 RGWRadosTimelogAddCR(RGWRados
*_store
, const string
& _oid
,
1100 const cls_log_entry
& entry
);
1102 int send_request() override
;
1103 int request_complete() override
;
1106 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1108 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1111 real_time start_time
;
1113 std::string from_marker
;
1114 std::string to_marker
;
1117 RGWRadosTimelogTrimCR(RGWRados
*store
, const std::string
& oid
,
1118 const real_time
& start_time
, const real_time
& end_time
,
1119 const std::string
& from_marker
,
1120 const std::string
& to_marker
);
1122 int send_request() override
;
1123 int request_complete() override
;
1126 // wrapper to update last_trim_marker on success
1127 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1129 std::string
*last_trim_marker
;
1131 RGWSyncLogTrimCR(RGWRados
*store
, const std::string
& oid
,
1132 const std::string
& to_marker
, std::string
*last_trim_marker
);
1133 int request_complete() override
;
1136 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1138 RGWBucketInfo bucket_info
;
1143 RGWObjVersionTracker
*objv_tracker
;
1145 int _send_request() override
;
1147 RGWAsyncStatObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*store
,
1148 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1149 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1150 RGWObjVersionTracker
*objv_tracker
= nullptr)
1151 : RGWAsyncRadosRequest(caller
, cn
), store(store
), obj(obj
), psize(psize
),
1152 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1155 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1157 RGWAsyncRadosProcessor
*async_rados
;
1158 RGWBucketInfo bucket_info
;
1163 RGWObjVersionTracker
*objv_tracker
;
1164 RGWAsyncStatObj
*req
= nullptr;
1166 RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, RGWRados
*store
,
1167 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1168 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1169 RGWObjVersionTracker
*objv_tracker
= nullptr);
1170 ~RGWStatObjCR() override
{
1173 void request_cleanup() override
;
1175 int send_request() override
;
1176 int request_complete() override
;
1179 /// coroutine wrapper for IoCtx::aio_notify()
1180 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1181 RGWRados
*const store
;
1182 const rgw_raw_obj obj
;
1184 const uint64_t timeout_ms
;
1185 bufferlist
*response
;
1187 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1190 RGWRadosNotifyCR(RGWRados
*store
, const rgw_raw_obj
& obj
,
1191 bufferlist
& request
, uint64_t timeout_ms
,
1192 bufferlist
*response
);
1194 int send_request() override
;
1195 int request_complete() override
;