1 #ifndef CEPH_RGW_CR_RADOS_H
2 #define CEPH_RGW_CR_RADOS_H
4 #include <boost/intrusive_ptr.hpp>
5 #include "include/assert.h"
6 #include "rgw_coroutine.h"
8 #include "common/WorkQueue.h"
9 #include "common/Throttle.h"
13 class RGWAsyncRadosRequest
: public RefCountedObject
{
15 RGWAioCompletionNotifier
*notifier
;
22 virtual int _send_request() = 0;
24 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
) : caller(_caller
), notifier(_cn
), retcode(0),
25 lock("RGWAsyncRadosRequest::lock") {
27 ~RGWAsyncRadosRequest() override
{
35 retcode
= _send_request();
37 Mutex::Locker
l(lock
);
39 notifier
->cb(); // drops its own ref
46 int get_ret_status() { return retcode
; }
50 Mutex::Locker
l(lock
);
52 // we won't call notifier->cb() to drop its ref, so drop it here
62 class RGWAsyncRadosProcessor
{
63 deque
<RGWAsyncRadosRequest
*> m_req_queue
;
64 std::atomic
<bool> going_down
= { false };
68 Throttle req_throttle
;
70 struct RGWWQ
: public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
71 RGWAsyncRadosProcessor
*processor
;
72 RGWWQ(RGWAsyncRadosProcessor
*p
, time_t timeout
, time_t suicide_timeout
, ThreadPool
*tp
)
73 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
75 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
76 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
79 bool _empty() override
;
80 RGWAsyncRadosRequest
*_dequeue() override
;
81 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
82 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
84 void _clear() override
{
85 assert(processor
->m_req_queue
.empty());
90 RGWAsyncRadosProcessor(RGWRados
*_store
, int num_threads
);
91 ~RGWAsyncRadosProcessor() {}
94 void handle_request(RGWAsyncRadosRequest
*req
);
95 void queue(RGWAsyncRadosRequest
*req
);
97 bool is_going_down() {
103 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
105 RGWObjectCtx obj_ctx
;
106 RGWRados::SystemObject::Read::GetObjState read_state
;
107 RGWObjVersionTracker objv_tracker
;
111 const bool want_attrs
;
113 int _send_request() override
;
115 RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
116 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
117 off_t _ofs
, off_t _end
, bool want_attrs
);
120 map
<string
, bufferlist
> attrs
;
123 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
130 int _send_request() override
;
132 RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
133 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
134 bool _exclusive
, bufferlist _bl
);
136 RGWObjVersionTracker objv_tracker
;
139 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
142 map
<string
, bufferlist
> attrs
;
145 int _send_request() override
;
147 RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
148 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
149 map
<string
, bufferlist
> _attrs
);
151 RGWObjVersionTracker objv_tracker
;
154 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
159 uint32_t duration_secs
;
162 int _send_request() override
;
164 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
165 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
166 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
);
169 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
176 int _send_request() override
;
178 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
179 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
180 const string
& _name
, const string
& _cookie
);
184 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
185 RGWAsyncRadosProcessor
*async_rados
;
189 /// on ENOENT, call handle_data() with an empty object instead of failing
190 const bool empty_on_enoent
;
191 RGWObjVersionTracker
*objv_tracker
;
192 RGWAsyncGetSystemObj
*req
{nullptr};
195 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
196 const rgw_raw_obj
& _obj
,
197 T
*_result
, bool empty_on_enoent
= true,
198 RGWObjVersionTracker
*objv_tracker
= nullptr)
199 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
200 obj(_obj
), result(_result
),
201 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
202 ~RGWSimpleRadosReadCR() override
{
206 void request_cleanup() override
{
213 int send_request() override
;
214 int request_complete() override
;
216 virtual int handle_data(T
& data
) {
222 int RGWSimpleRadosReadCR
<T
>::send_request()
224 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(),
225 store
, objv_tracker
, obj
, 0, -1, false);
226 async_rados
->queue(req
);
231 int RGWSimpleRadosReadCR
<T
>::request_complete()
233 int ret
= req
->get_ret_status();
235 if (ret
== -ENOENT
&& empty_on_enoent
) {
242 bufferlist::iterator iter
= req
->bl
.begin();
244 // allow successful reads with empty buffers. ReadSyncStatus coroutines
245 // depend on this to be able to read without locking, because the
246 // cls lock from InitSyncStatus will create an empty object if it didnt
250 ::decode(*result
, iter
);
252 } catch (buffer::error
& err
) {
257 return handle_data(*result
);
260 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
261 RGWAsyncRadosProcessor
*async_rados
;
264 map
<string
, bufferlist
> *pattrs
;
265 RGWAsyncGetSystemObj
*req
;
268 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
269 const rgw_raw_obj
& _obj
,
270 map
<string
, bufferlist
> *_pattrs
) : RGWSimpleCoroutine(_store
->ctx()),
271 async_rados(_async_rados
), store(_store
),
275 ~RGWSimpleRadosReadAttrsCR() override
{
279 void request_cleanup() override
{
286 int send_request() override
;
287 int request_complete() override
;
291 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
292 RGWAsyncRadosProcessor
*async_rados
;
296 RGWObjVersionTracker
*objv_tracker
;
297 RGWAsyncPutSystemObj
*req
{nullptr};
300 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
301 const rgw_raw_obj
& _obj
,
302 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
303 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
),
304 store(_store
), obj(_obj
), objv_tracker(objv_tracker
) {
308 ~RGWSimpleRadosWriteCR() override
{
312 void request_cleanup() override
{
319 int send_request() override
{
320 req
= new RGWAsyncPutSystemObj(this, stack
->create_completion_notifier(),
321 store
, objv_tracker
, obj
, false, std::move(bl
));
322 async_rados
->queue(req
);
326 int request_complete() override
{
327 if (objv_tracker
) { // copy the updated version
328 *objv_tracker
= req
->objv_tracker
;
330 return req
->get_ret_status();
334 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
335 RGWAsyncRadosProcessor
*async_rados
;
337 RGWObjVersionTracker
*objv_tracker
;
339 map
<string
, bufferlist
> attrs
;
340 RGWAsyncPutSystemObjAttrs
*req
= nullptr;
343 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor
*_async_rados
,
344 RGWRados
*_store
, const rgw_raw_obj
& _obj
,
345 map
<string
, bufferlist
> _attrs
,
346 RGWObjVersionTracker
*objv_tracker
= nullptr)
347 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
),
348 store(_store
), objv_tracker(objv_tracker
), obj(_obj
),
349 attrs(std::move(_attrs
)) {
351 ~RGWSimpleRadosWriteAttrsCR() override
{
355 void request_cleanup() override
{
362 int send_request() override
{
363 req
= new RGWAsyncPutSystemObjAttrs(this, stack
->create_completion_notifier(),
364 store
, objv_tracker
, obj
, std::move(attrs
));
365 async_rados
->queue(req
);
369 int request_complete() override
{
370 if (objv_tracker
) { // copy the updated version
371 *objv_tracker
= req
->objv_tracker
;
373 return req
->get_ret_status();
377 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
379 map
<string
, bufferlist
> entries
;
385 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
388 RGWRadosSetOmapKeysCR(RGWRados
*_store
,
389 const rgw_raw_obj
& _obj
,
390 map
<string
, bufferlist
>& _entries
);
392 int send_request() override
;
393 int request_complete() override
;
396 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
400 std::set
<std::string
> *entries
;
407 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
410 RGWRadosGetOmapKeysCR(RGWRados
*_store
,
411 const rgw_raw_obj
& _obj
,
412 const string
& _marker
,
413 std::set
<std::string
> *_entries
, int _max_entries
);
415 int send_request() override
;
416 int request_complete() override
;
419 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
428 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
431 RGWRadosRemoveOmapKeysCR(RGWRados
*_store
,
432 const rgw_raw_obj
& _obj
,
433 const set
<string
>& _keys
);
435 int send_request() override
;
437 int request_complete() override
;
440 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
442 librados::IoCtx ioctx
;
443 const rgw_raw_obj obj
;
444 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
447 RGWRadosRemoveCR(RGWRados
*store
, const rgw_raw_obj
& obj
);
450 int request_complete();
453 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
454 RGWAsyncRadosProcessor
*async_rados
;
462 RGWAsyncLockSystemObj
*req
;
465 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
466 const rgw_raw_obj
& _obj
,
467 const string
& _lock_name
,
468 const string
& _cookie
,
470 ~RGWSimpleRadosLockCR() override
{
473 void request_cleanup() override
;
475 int send_request() override
;
476 int request_complete() override
;
478 static std::string
gen_random_cookie(CephContext
* cct
) {
479 #define COOKIE_LEN 16
480 char buf
[COOKIE_LEN
+ 1];
481 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
486 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
487 RGWAsyncRadosProcessor
*async_rados
;
494 RGWAsyncUnlockSystemObj
*req
;
497 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
498 const rgw_raw_obj
& _obj
,
499 const string
& _lock_name
,
500 const string
& _cookie
);
501 ~RGWSimpleRadosUnlockCR() override
{
504 void request_cleanup() override
;
506 int send_request() override
;
507 int request_complete() override
;
510 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
512 class RGWOmapAppend
: public RGWConsumerCR
<string
> {
513 RGWAsyncRadosProcessor
*async_rados
;
520 int num_pending_entries
;
521 list
<string
> pending_entries
;
523 map
<string
, bufferlist
> entries
;
525 uint64_t window_size
;
526 uint64_t total_entries
;
528 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
529 const rgw_raw_obj
& _obj
,
530 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
531 int operate() override
;
532 void flush_pending();
533 bool append(const string
& s
);
536 uint64_t get_total_entries() {
537 return total_entries
;
540 const rgw_raw_obj
& get_obj() {
545 class RGWAsyncWait
: public RGWAsyncRadosRequest
{
551 int _send_request() override
{
552 Mutex::Locker
l(*lock
);
553 return cond
->WaitInterval(*lock
, interval
);
556 RGWAsyncWait(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, CephContext
*_cct
,
557 Mutex
*_lock
, Cond
*_cond
, int _secs
) : RGWAsyncRadosRequest(caller
, cn
),
559 lock(_lock
), cond(_cond
), interval(_secs
, 0) {}
562 Mutex::Locker
l(*lock
);
567 class RGWWaitCR
: public RGWSimpleCoroutine
{
569 RGWAsyncRadosProcessor
*async_rados
;
577 RGWWaitCR(RGWAsyncRadosProcessor
*_async_rados
, CephContext
*_cct
,
578 Mutex
*_lock
, Cond
*_cond
,
579 int _secs
) : RGWSimpleCoroutine(_cct
), cct(_cct
),
580 async_rados(_async_rados
), lock(_lock
), cond(_cond
), secs(_secs
), req(NULL
) {
582 ~RGWWaitCR() override
{
586 void request_cleanup() override
{
594 int send_request() override
{
595 req
= new RGWAsyncWait(this, stack
->create_completion_notifier(), cct
, lock
, cond
, secs
);
596 async_rados
->queue(req
);
600 int request_complete() override
{
601 return req
->get_ret_status();
609 class RGWShardedOmapCRManager
{
610 RGWAsyncRadosProcessor
*async_rados
;
616 vector
<RGWOmapAppend
*> shards
;
618 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const string
& oid_prefix
)
619 : async_rados(_async_rados
),
620 store(_store
), op(_op
), num_shards(_num_shards
) {
621 shards
.reserve(num_shards
);
622 for (int i
= 0; i
< num_shards
; ++i
) {
623 char buf
[oid_prefix
.size() + 16];
624 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
625 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
627 shards
.push_back(shard
);
628 op
->spawn(shard
, false);
632 ~RGWShardedOmapCRManager() {
633 for (auto shard
: shards
) {
638 bool append(const string
& entry
, int shard_id
) {
639 return shards
[shard_id
]->append(entry
);
643 for (vector
<RGWOmapAppend
*>::iterator iter
= shards
.begin(); iter
!= shards
.end(); ++iter
) {
644 success
&= ((*iter
)->finish() && (!(*iter
)->is_error()));
649 uint64_t get_total_entries(int shard_id
) {
650 return shards
[shard_id
]->get_total_entries();
654 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
656 const std::string oid
;
659 int _send_request() override
;
661 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
662 RGWRados
*_store
, const std::string
& oid
)
663 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), oid(oid
) {}
665 RGWBucketInfo bucket_info
;
668 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
669 RGWAsyncRadosProcessor
*async_rados
;
671 const std::string oid
;
672 RGWBucketInfo
*bucket_info
;
674 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
677 // metadata key constructor
678 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
679 const std::string
& meta_key
, RGWBucketInfo
*_bucket_info
)
680 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
681 oid(RGW_BUCKET_INSTANCE_MD_PREFIX
+ meta_key
),
682 bucket_info(_bucket_info
) {}
683 // rgw_bucket constructor
684 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
685 const rgw_bucket
& bucket
, RGWBucketInfo
*_bucket_info
)
686 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
687 oid(RGW_BUCKET_INSTANCE_MD_PREFIX
+ bucket
.get_key(':')),
688 bucket_info(_bucket_info
) {}
689 ~RGWGetBucketInstanceInfoCR() override
{
692 void request_cleanup() override
{
699 int send_request() override
{
700 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, oid
);
701 async_rados
->queue(req
);
704 int request_complete() override
{
706 *bucket_info
= std::move(req
->bucket_info
);
708 return req
->get_ret_status();
712 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
713 RGWRados::BucketShard bs
;
714 std::string start_marker
;
715 std::string end_marker
;
716 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
718 RGWRadosBILogTrimCR(RGWRados
*store
, const RGWBucketInfo
& bucket_info
,
719 int shard_id
, const std::string
& start_marker
,
720 const std::string
& end_marker
);
722 int send_request() override
;
723 int request_complete() override
;
726 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
730 RGWBucketInfo bucket_info
;
733 boost::optional
<uint64_t> versioned_epoch
;
738 rgw_zone_set zones_trace
;
741 int _send_request() override
;
743 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
744 const string
& _source_zone
,
745 RGWBucketInfo
& _bucket_info
,
746 const rgw_obj_key
& _key
,
747 boost::optional
<uint64_t> _versioned_epoch
,
748 bool _if_newer
, rgw_zone_set
*_zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
749 source_zone(_source_zone
),
750 bucket_info(_bucket_info
),
752 versioned_epoch(_versioned_epoch
),
753 copy_if_newer(_if_newer
)
756 zones_trace
= *_zones_trace
;
761 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
763 RGWAsyncRadosProcessor
*async_rados
;
767 RGWBucketInfo bucket_info
;
770 boost::optional
<uint64_t> versioned_epoch
;
776 RGWAsyncFetchRemoteObj
*req
;
777 rgw_zone_set
*zones_trace
;
780 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
781 const string
& _source_zone
,
782 RGWBucketInfo
& _bucket_info
,
783 const rgw_obj_key
& _key
,
784 boost::optional
<uint64_t> _versioned_epoch
,
785 bool _if_newer
, rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
786 async_rados(_async_rados
), store(_store
),
787 source_zone(_source_zone
),
788 bucket_info(_bucket_info
),
790 versioned_epoch(_versioned_epoch
),
791 copy_if_newer(_if_newer
), req(NULL
), zones_trace(_zones_trace
) {}
794 ~RGWFetchRemoteObjCR() override
{
798 void request_cleanup() override
{
805 int send_request() override
{
806 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
807 key
, versioned_epoch
, copy_if_newer
, zones_trace
);
808 async_rados
->queue(req
);
812 int request_complete() override
{
813 return req
->get_ret_status();
817 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
821 RGWBucketInfo bucket_info
;
825 ceph::real_time
*pmtime
;
827 map
<string
, bufferlist
> *pattrs
;
830 int _send_request() override
;
832 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
833 const string
& _source_zone
,
834 RGWBucketInfo
& _bucket_info
,
835 const rgw_obj_key
& _key
,
836 ceph::real_time
*_pmtime
,
838 map
<string
, bufferlist
> *_pattrs
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
839 source_zone(_source_zone
),
840 bucket_info(_bucket_info
),
847 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
849 RGWAsyncRadosProcessor
*async_rados
;
853 RGWBucketInfo bucket_info
;
857 ceph::real_time
*pmtime
;
859 map
<string
, bufferlist
> *pattrs
;
861 RGWAsyncStatRemoteObj
*req
;
864 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
865 const string
& _source_zone
,
866 RGWBucketInfo
& _bucket_info
,
867 const rgw_obj_key
& _key
,
868 ceph::real_time
*_pmtime
,
870 map
<string
, bufferlist
> *_pattrs
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
871 async_rados(_async_rados
), store(_store
),
872 source_zone(_source_zone
),
873 bucket_info(_bucket_info
),
881 ~RGWStatRemoteObjCR() override
{
885 void request_cleanup() override
{
892 int send_request() override
{
893 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
894 bucket_info
, key
, pmtime
, psize
, pattrs
);
895 async_rados
->queue(req
);
899 int request_complete() override
{
900 return req
->get_ret_status();
904 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
908 RGWBucketInfo bucket_info
;
912 string owner_display_name
;
914 uint64_t versioned_epoch
;
915 string marker_version_id
;
918 ceph::real_time timestamp
;
919 rgw_zone_set zones_trace
;
922 int _send_request() override
;
924 RGWAsyncRemoveObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
925 const string
& _source_zone
,
926 RGWBucketInfo
& _bucket_info
,
927 const rgw_obj_key
& _key
,
928 const string
& _owner
,
929 const string
& _owner_display_name
,
931 uint64_t _versioned_epoch
,
934 real_time
& _timestamp
,
935 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
936 source_zone(_source_zone
),
937 bucket_info(_bucket_info
),
940 owner_display_name(_owner_display_name
),
941 versioned(_versioned
),
942 versioned_epoch(_versioned_epoch
),
943 del_if_older(_if_older
),
944 timestamp(_timestamp
) {
945 if (_delete_marker
) {
946 marker_version_id
= key
.instance
;
950 zones_trace
= *_zones_trace
;
955 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
957 RGWAsyncRadosProcessor
*async_rados
;
961 RGWBucketInfo bucket_info
;
965 uint64_t versioned_epoch
;
968 string owner_display_name
;
973 RGWAsyncRemoveObj
*req
;
975 rgw_zone_set
*zones_trace
;
978 RGWRemoveObjCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
979 const string
& _source_zone
,
980 RGWBucketInfo
& _bucket_info
,
981 const rgw_obj_key
& _key
,
983 uint64_t _versioned_epoch
,
985 string
*_owner_display_name
,
987 real_time
*_timestamp
,
988 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
989 async_rados(_async_rados
), store(_store
),
990 source_zone(_source_zone
),
991 bucket_info(_bucket_info
),
993 versioned(_versioned
),
994 versioned_epoch(_versioned_epoch
),
995 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
996 del_if_older
= (_timestamp
!= NULL
);
998 timestamp
= *_timestamp
;
1005 if (_owner_display_name
) {
1006 owner_display_name
= *_owner_display_name
;
1009 ~RGWRemoveObjCR() override
{
1013 void request_cleanup() override
{
1020 int send_request() override
{
1021 req
= new RGWAsyncRemoveObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1022 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1023 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1024 async_rados
->queue(req
);
1028 int request_complete() override
{
1029 return req
->get_ret_status();
1033 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1034 RGWAsyncRadosProcessor
*async_rados
;
1037 const rgw_raw_obj obj
;
1039 const string lock_name
;
1040 const string cookie
;
1045 std::atomic
<bool> going_down
= { false };
1048 RGWCoroutine
*caller
;
1050 bool aborted
{false};
1053 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
1054 const rgw_raw_obj
& _obj
,
1055 const string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1056 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1057 obj(_obj
), lock_name(_lock_name
),
1058 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1059 interval(_interval
), lock("RGWContinuousLeaseCR"), caller(_caller
)
1062 int operate() override
;
1065 Mutex::Locker
l(lock
);
1069 void set_locked(bool status
) {
1070 Mutex::Locker
l(lock
);
1084 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1086 list
<cls_log_entry
> entries
;
1090 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1093 RGWRadosTimelogAddCR(RGWRados
*_store
, const string
& _oid
,
1094 const cls_log_entry
& entry
);
1096 int send_request() override
;
1097 int request_complete() override
;
1100 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1102 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1105 real_time start_time
;
1107 std::string from_marker
;
1108 std::string to_marker
;
1111 RGWRadosTimelogTrimCR(RGWRados
*store
, const std::string
& oid
,
1112 const real_time
& start_time
, const real_time
& end_time
,
1113 const std::string
& from_marker
,
1114 const std::string
& to_marker
);
1116 int send_request() override
;
1117 int request_complete() override
;
1120 // wrapper to update last_trim_marker on success
1121 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1123 std::string
*last_trim_marker
;
1125 RGWSyncLogTrimCR(RGWRados
*store
, const std::string
& oid
,
1126 const std::string
& to_marker
, std::string
*last_trim_marker
);
1127 int request_complete() override
;
1130 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1132 RGWBucketInfo bucket_info
;
1137 RGWObjVersionTracker
*objv_tracker
;
1139 int _send_request() override
;
1141 RGWAsyncStatObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*store
,
1142 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1143 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1144 RGWObjVersionTracker
*objv_tracker
= nullptr)
1145 : RGWAsyncRadosRequest(caller
, cn
), store(store
), obj(obj
), psize(psize
),
1146 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1149 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1151 RGWAsyncRadosProcessor
*async_rados
;
1152 RGWBucketInfo bucket_info
;
1157 RGWObjVersionTracker
*objv_tracker
;
1158 RGWAsyncStatObj
*req
= nullptr;
1160 RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, RGWRados
*store
,
1161 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1162 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1163 RGWObjVersionTracker
*objv_tracker
= nullptr);
1164 ~RGWStatObjCR() override
{
1167 void request_cleanup() override
;
1169 int send_request() override
;
1170 int request_complete() override
;
1173 /// coroutine wrapper for IoCtx::aio_notify()
1174 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1175 RGWRados
*const store
;
1176 const rgw_raw_obj obj
;
1178 const uint64_t timeout_ms
;
1179 bufferlist
*response
;
1181 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1184 RGWRadosNotifyCR(RGWRados
*store
, const rgw_raw_obj
& obj
,
1185 bufferlist
& request
, uint64_t timeout_ms
,
1186 bufferlist
*response
);
1188 int send_request() override
;
1189 int request_complete() override
;