1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
11 #include "rgw_sal_rados.h"
12 #include "common/WorkQueue.h"
13 #include "common/Throttle.h"
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_bucket.h"
20 class RGWAsyncRadosRequest
: public RefCountedObject
{
22 RGWAioCompletionNotifier
*notifier
;
26 ceph::mutex lock
= ceph::make_mutex("RGWAsyncRadosRequest::lock");
29 virtual int _send_request() = 0;
31 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
)
32 : caller(_caller
), notifier(_cn
), retcode(0) {
34 ~RGWAsyncRadosRequest() override
{
42 retcode
= _send_request();
44 std::lock_guard l
{lock
};
46 notifier
->cb(); // drops its own ref
53 int get_ret_status() { return retcode
; }
57 std::lock_guard l
{lock
};
59 // we won't call notifier->cb() to drop its ref, so drop it here
69 class RGWAsyncRadosProcessor
{
70 deque
<RGWAsyncRadosRequest
*> m_req_queue
;
71 std::atomic
<bool> going_down
= { false };
75 Throttle req_throttle
;
77 struct RGWWQ
: public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
78 RGWAsyncRadosProcessor
*processor
;
79 RGWWQ(RGWAsyncRadosProcessor
*p
,
80 ceph::timespan timeout
, ceph::timespan suicide_timeout
,
82 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
84 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
85 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
88 bool _empty() override
;
89 RGWAsyncRadosRequest
*_dequeue() override
;
90 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
91 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
93 void _clear() override
{
94 ceph_assert(processor
->m_req_queue
.empty());
99 RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
);
100 ~RGWAsyncRadosProcessor() {}
103 void handle_request(RGWAsyncRadosRequest
*req
);
104 void queue(RGWAsyncRadosRequest
*req
);
106 bool is_going_down() {
112 class RGWSimpleWriteOnlyAsyncCR
: public RGWSimpleCoroutine
{
113 RGWAsyncRadosProcessor
*async_rados
;
114 rgw::sal::RGWRadosStore
*store
;
117 const DoutPrefixProvider
*dpp
;
119 class Request
: public RGWAsyncRadosRequest
{
120 rgw::sal::RGWRadosStore
*store
;
122 const DoutPrefixProvider
*dpp
;
124 int _send_request() override
;
126 Request(RGWCoroutine
*caller
,
127 RGWAioCompletionNotifier
*cn
,
128 rgw::sal::RGWRadosStore
*store
,
130 const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
),
137 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
138 rgw::sal::RGWRadosStore
*_store
,
140 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
141 async_rados(_async_rados
),
146 ~RGWSimpleWriteOnlyAsyncCR() override
{
149 void request_cleanup() override
{
156 int send_request() override
{
157 req
= new Request(this,
158 stack
->create_completion_notifier(),
163 async_rados
->queue(req
);
166 int request_complete() override
{
167 return req
->get_ret_status();
172 template <class P
, class R
>
173 class RGWSimpleAsyncCR
: public RGWSimpleCoroutine
{
174 RGWAsyncRadosProcessor
*async_rados
;
175 rgw::sal::RGWRadosStore
*store
;
178 std::shared_ptr
<R
> result
;
180 class Request
: public RGWAsyncRadosRequest
{
181 rgw::sal::RGWRadosStore
*store
;
183 std::shared_ptr
<R
> result
;
185 int _send_request() override
;
187 Request(RGWCoroutine
*caller
,
188 RGWAioCompletionNotifier
*cn
,
189 rgw::sal::RGWRadosStore
*_store
,
191 std::shared_ptr
<R
>& _result
) : RGWAsyncRadosRequest(caller
, cn
),
198 RGWSimpleAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
199 rgw::sal::RGWRadosStore
*_store
,
201 std::shared_ptr
<R
>& _result
) : RGWSimpleCoroutine(_store
->ctx()),
202 async_rados(_async_rados
),
207 ~RGWSimpleAsyncCR() override
{
210 void request_cleanup() override
{
217 int send_request() override
{
218 req
= new Request(this,
219 stack
->create_completion_notifier(),
224 async_rados
->queue(req
);
227 int request_complete() override
{
228 return req
->get_ret_status();
232 class RGWGenericAsyncCR
: public RGWSimpleCoroutine
{
233 RGWAsyncRadosProcessor
*async_rados
;
234 rgw::sal::RGWRadosStore
*store
;
241 virtual int operate() = 0;
245 std::shared_ptr
<Action
> action
;
247 class Request
: public RGWAsyncRadosRequest
{
248 std::shared_ptr
<Action
> action
;
250 int _send_request() override
{
254 return action
->operate();
257 Request(RGWCoroutine
*caller
,
258 RGWAioCompletionNotifier
*cn
,
259 std::shared_ptr
<Action
>& _action
) : RGWAsyncRadosRequest(caller
, cn
),
264 RGWGenericAsyncCR(CephContext
*_cct
,
265 RGWAsyncRadosProcessor
*_async_rados
,
266 std::shared_ptr
<Action
>& _action
) : RGWSimpleCoroutine(_cct
),
267 async_rados(_async_rados
),
270 RGWGenericAsyncCR(CephContext
*_cct
,
271 RGWAsyncRadosProcessor
*_async_rados
,
272 std::shared_ptr
<T
>& _action
) : RGWSimpleCoroutine(_cct
),
273 async_rados(_async_rados
),
274 action(std::static_pointer_cast
<Action
>(_action
)) {}
276 ~RGWGenericAsyncCR() override
{
279 void request_cleanup() override
{
286 int send_request() override
{
287 req
= new Request(this,
288 stack
->create_completion_notifier(),
291 async_rados
->queue(req
);
294 int request_complete() override
{
295 return req
->get_ret_status();
300 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
301 RGWSysObjectCtx obj_ctx
;
303 const bool want_attrs
;
304 const bool raw_attrs
;
306 int _send_request() override
;
308 RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
309 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
310 bool want_attrs
, bool raw_attrs
);
313 map
<string
, bufferlist
> attrs
;
314 RGWObjVersionTracker objv_tracker
;
317 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
324 int _send_request() override
;
326 RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
327 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
328 bool _exclusive
, bufferlist _bl
);
330 RGWObjVersionTracker objv_tracker
;
333 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
336 map
<string
, bufferlist
> attrs
;
339 int _send_request() override
;
341 RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
342 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
343 map
<string
, bufferlist
> _attrs
);
345 RGWObjVersionTracker objv_tracker
;
348 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
349 rgw::sal::RGWRadosStore
*store
;
353 uint32_t duration_secs
;
356 int _send_request() override
;
358 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
359 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
360 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
);
363 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
364 rgw::sal::RGWRadosStore
*store
;
370 int _send_request() override
;
372 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
373 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
374 const string
& _name
, const string
& _cookie
);
378 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
379 RGWAsyncRadosProcessor
*async_rados
;
384 /// on ENOENT, call handle_data() with an empty object instead of failing
385 const bool empty_on_enoent
;
386 RGWObjVersionTracker
*objv_tracker
;
387 RGWAsyncGetSystemObj
*req
{nullptr};
390 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
391 const rgw_raw_obj
& _obj
,
392 T
*_result
, bool empty_on_enoent
= true,
393 RGWObjVersionTracker
*objv_tracker
= nullptr)
394 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
), svc(_svc
),
395 obj(_obj
), result(_result
),
396 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
397 ~RGWSimpleRadosReadCR() override
{
401 void request_cleanup() override
{
408 int send_request() override
;
409 int request_complete() override
;
411 virtual int handle_data(T
& data
) {
417 int RGWSimpleRadosReadCR
<T
>::send_request()
419 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(), svc
,
420 objv_tracker
, obj
, false, false);
421 async_rados
->queue(req
);
426 int RGWSimpleRadosReadCR
<T
>::request_complete()
428 int ret
= req
->get_ret_status();
430 if (ret
== -ENOENT
&& empty_on_enoent
) {
437 auto iter
= req
->bl
.cbegin();
439 // allow successful reads with empty buffers. ReadSyncStatus coroutines
440 // depend on this to be able to read without locking, because the
441 // cls lock from InitSyncStatus will create an empty object if it didn't
445 decode(*result
, iter
);
447 } catch (buffer::error
& err
) {
452 return handle_data(*result
);
455 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
456 RGWAsyncRadosProcessor
*async_rados
;
460 map
<string
, bufferlist
> *pattrs
;
462 RGWObjVersionTracker
* objv_tracker
;
463 RGWAsyncGetSystemObj
*req
= nullptr;
466 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
467 const rgw_raw_obj
& _obj
, map
<string
, bufferlist
> *_pattrs
,
468 bool _raw_attrs
, RGWObjVersionTracker
* objv_tracker
= nullptr)
469 : RGWSimpleCoroutine(_svc
->ctx()),
470 async_rados(_async_rados
), svc(_svc
),
473 raw_attrs(_raw_attrs
),
474 objv_tracker(objv_tracker
)
476 ~RGWSimpleRadosReadAttrsCR() override
{
480 void request_cleanup() override
{
487 int send_request() override
;
488 int request_complete() override
;
492 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
493 RGWAsyncRadosProcessor
*async_rados
;
497 RGWObjVersionTracker
*objv_tracker
;
498 RGWAsyncPutSystemObj
*req
{nullptr};
501 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
502 const rgw_raw_obj
& _obj
,
503 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
504 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
),
505 svc(_svc
), obj(_obj
), objv_tracker(objv_tracker
) {
509 ~RGWSimpleRadosWriteCR() override
{
513 void request_cleanup() override
{
520 int send_request() override
{
521 req
= new RGWAsyncPutSystemObj(this, stack
->create_completion_notifier(),
522 svc
, objv_tracker
, obj
, false, std::move(bl
));
523 async_rados
->queue(req
);
527 int request_complete() override
{
528 if (objv_tracker
) { // copy the updated version
529 *objv_tracker
= req
->objv_tracker
;
531 return req
->get_ret_status();
535 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
536 RGWAsyncRadosProcessor
*async_rados
;
538 RGWObjVersionTracker
*objv_tracker
;
541 map
<string
, bufferlist
> attrs
;
542 RGWAsyncPutSystemObjAttrs
*req
= nullptr;
545 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor
*_async_rados
,
546 RGWSI_SysObj
*_svc
, const rgw_raw_obj
& _obj
,
547 map
<string
, bufferlist
> _attrs
,
548 RGWObjVersionTracker
*objv_tracker
= nullptr)
549 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
),
550 svc(_svc
), objv_tracker(objv_tracker
), obj(_obj
),
551 attrs(std::move(_attrs
)) {
553 ~RGWSimpleRadosWriteAttrsCR() override
{
557 void request_cleanup() override
{
564 int send_request() override
{
565 req
= new RGWAsyncPutSystemObjAttrs(this, stack
->create_completion_notifier(),
566 svc
, objv_tracker
, obj
, std::move(attrs
));
567 async_rados
->queue(req
);
571 int request_complete() override
{
572 if (objv_tracker
) { // copy the updated version
573 *objv_tracker
= req
->objv_tracker
;
575 return req
->get_ret_status();
579 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
580 rgw::sal::RGWRadosStore
*store
;
581 map
<string
, bufferlist
> entries
;
587 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
590 RGWRadosSetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
591 const rgw_raw_obj
& _obj
,
592 map
<string
, bufferlist
>& _entries
);
594 int send_request() override
;
595 int request_complete() override
;
598 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
602 std::set
<std::string
> entries
;
605 using ResultPtr
= std::shared_ptr
<Result
>;
607 RGWRadosGetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
, const rgw_raw_obj
& _obj
,
608 const string
& _marker
, int _max_entries
,
611 int send_request() override
;
612 int request_complete() override
;
615 rgw::sal::RGWRadosStore
*store
;
620 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
623 class RGWRadosGetOmapValsCR
: public RGWSimpleCoroutine
{
627 std::map
<std::string
, bufferlist
> entries
;
630 using ResultPtr
= std::shared_ptr
<Result
>;
632 RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore
*_store
, const rgw_raw_obj
& _obj
,
633 const string
& _marker
, int _max_entries
,
636 int send_request() override
;
637 int request_complete() override
;
640 rgw::sal::RGWRadosStore
*store
;
645 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
648 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
649 rgw::sal::RGWRadosStore
*store
;
657 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
660 RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
661 const rgw_raw_obj
& _obj
,
662 const set
<string
>& _keys
);
664 int send_request() override
;
666 int request_complete() override
;
669 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
670 rgw::sal::RGWRadosStore
*store
;
671 librados::IoCtx ioctx
;
672 const rgw_raw_obj obj
;
673 RGWObjVersionTracker
* objv_tracker
;
674 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
677 RGWRadosRemoveCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
678 RGWObjVersionTracker
* objv_tracker
= nullptr);
680 int send_request() override
;
681 int request_complete() override
;
684 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
685 RGWAsyncRadosProcessor
*async_rados
;
686 rgw::sal::RGWRadosStore
*store
;
693 RGWAsyncLockSystemObj
*req
;
696 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
697 const rgw_raw_obj
& _obj
,
698 const string
& _lock_name
,
699 const string
& _cookie
,
701 ~RGWSimpleRadosLockCR() override
{
704 void request_cleanup() override
;
706 int send_request() override
;
707 int request_complete() override
;
709 static std::string
gen_random_cookie(CephContext
* cct
) {
710 #define COOKIE_LEN 16
711 char buf
[COOKIE_LEN
+ 1];
712 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
717 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
718 RGWAsyncRadosProcessor
*async_rados
;
719 rgw::sal::RGWRadosStore
*store
;
725 RGWAsyncUnlockSystemObj
*req
;
728 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
729 const rgw_raw_obj
& _obj
,
730 const string
& _lock_name
,
731 const string
& _cookie
);
732 ~RGWSimpleRadosUnlockCR() override
{
735 void request_cleanup() override
;
737 int send_request() override
;
738 int request_complete() override
;
741 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
743 class RGWOmapAppend
: public RGWConsumerCR
<string
> {
744 RGWAsyncRadosProcessor
*async_rados
;
745 rgw::sal::RGWRadosStore
*store
;
751 int num_pending_entries
;
752 list
<string
> pending_entries
;
754 map
<string
, bufferlist
> entries
;
756 uint64_t window_size
;
757 uint64_t total_entries
;
759 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
760 const rgw_raw_obj
& _obj
,
761 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
762 int operate() override
;
763 void flush_pending();
764 bool append(const string
& s
);
767 uint64_t get_total_entries() {
768 return total_entries
;
771 const rgw_raw_obj
& get_obj() {
776 class RGWShardedOmapCRManager
{
777 RGWAsyncRadosProcessor
*async_rados
;
778 rgw::sal::RGWRadosStore
*store
;
783 vector
<RGWOmapAppend
*> shards
;
785 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const string
& oid_prefix
)
786 : async_rados(_async_rados
),
787 store(_store
), op(_op
), num_shards(_num_shards
) {
788 shards
.reserve(num_shards
);
789 for (int i
= 0; i
< num_shards
; ++i
) {
790 char buf
[oid_prefix
.size() + 16];
791 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
792 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
794 shards
.push_back(shard
);
795 op
->spawn(shard
, false);
799 ~RGWShardedOmapCRManager() {
800 for (auto shard
: shards
) {
805 bool append(const string
& entry
, int shard_id
) {
806 return shards
[shard_id
]->append(entry
);
810 for (vector
<RGWOmapAppend
*>::iterator iter
= shards
.begin(); iter
!= shards
.end(); ++iter
) {
811 success
&= ((*iter
)->finish() && (!(*iter
)->is_error()));
816 uint64_t get_total_entries(int shard_id
) {
817 return shards
[shard_id
]->get_total_entries();
821 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
822 rgw::sal::RGWRadosStore
*store
;
826 int _send_request() override
;
828 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
829 rgw::sal::RGWRadosStore
*_store
, const rgw_bucket
& bucket
)
830 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), bucket(bucket
) {}
832 RGWBucketInfo bucket_info
;
833 map
<string
, bufferlist
> attrs
;
836 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
837 RGWAsyncRadosProcessor
*async_rados
;
838 rgw::sal::RGWRadosStore
*store
;
840 RGWBucketInfo
*bucket_info
;
841 map
<string
, bufferlist
> *pattrs
;
843 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
846 // rgw_bucket constructor
847 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
848 const rgw_bucket
& _bucket
, RGWBucketInfo
*_bucket_info
,
849 map
<string
, bufferlist
> *_pattrs
)
850 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
851 bucket(_bucket
), bucket_info(_bucket_info
), pattrs(_pattrs
) {}
852 ~RGWGetBucketInstanceInfoCR() override
{
855 void request_cleanup() override
{
862 int send_request() override
{
863 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, bucket
);
864 async_rados
->queue(req
);
867 int request_complete() override
{
869 *bucket_info
= std::move(req
->bucket_info
);
872 *pattrs
= std::move(req
->attrs
);
874 return req
->get_ret_status();
878 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
879 RGWRados::BucketShard bs
;
880 std::string start_marker
;
881 std::string end_marker
;
882 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
884 RGWRadosBILogTrimCR(rgw::sal::RGWRadosStore
*store
, const RGWBucketInfo
& bucket_info
,
885 int shard_id
, const std::string
& start_marker
,
886 const std::string
& end_marker
);
888 int send_request() override
;
889 int request_complete() override
;
892 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
893 rgw::sal::RGWRadosStore
*store
;
894 rgw_zone_id source_zone
;
896 std::optional
<rgw_user
> user_id
;
898 rgw_bucket src_bucket
;
899 std::optional
<rgw_placement_rule
> dest_placement_rule
;
900 RGWBucketInfo dest_bucket_info
;
903 std::optional
<rgw_obj_key
> dest_key
;
904 std::optional
<uint64_t> versioned_epoch
;
909 std::shared_ptr
<RGWFetchObjFilter
> filter
;
910 rgw_zone_set zones_trace
;
911 PerfCounters
* counters
;
912 const DoutPrefixProvider
*dpp
;
915 int _send_request() override
;
917 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
918 const rgw_zone_id
& _source_zone
,
919 std::optional
<rgw_user
>& _user_id
,
920 const rgw_bucket
& _src_bucket
,
921 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
922 const RGWBucketInfo
& _dest_bucket_info
,
923 const rgw_obj_key
& _key
,
924 const std::optional
<rgw_obj_key
>& _dest_key
,
925 std::optional
<uint64_t> _versioned_epoch
,
927 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
928 rgw_zone_set
*_zones_trace
,
929 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
930 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
931 source_zone(_source_zone
),
933 src_bucket(_src_bucket
),
934 dest_placement_rule(_dest_placement_rule
),
935 dest_bucket_info(_dest_bucket_info
),
938 versioned_epoch(_versioned_epoch
),
939 copy_if_newer(_if_newer
),
945 zones_trace
= *_zones_trace
;
950 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
952 RGWAsyncRadosProcessor
*async_rados
;
953 rgw::sal::RGWRadosStore
*store
;
954 rgw_zone_id source_zone
;
956 std::optional
<rgw_user
> user_id
;
958 rgw_bucket src_bucket
;
959 std::optional
<rgw_placement_rule
> dest_placement_rule
;
960 RGWBucketInfo dest_bucket_info
;
963 std::optional
<rgw_obj_key
> dest_key
;
964 std::optional
<uint64_t> versioned_epoch
;
970 std::shared_ptr
<RGWFetchObjFilter
> filter
;
972 RGWAsyncFetchRemoteObj
*req
;
973 rgw_zone_set
*zones_trace
;
974 PerfCounters
* counters
;
975 const DoutPrefixProvider
*dpp
;
978 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
979 const rgw_zone_id
& _source_zone
,
980 std::optional
<rgw_user
> _user_id
,
981 const rgw_bucket
& _src_bucket
,
982 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
983 const RGWBucketInfo
& _dest_bucket_info
,
984 const rgw_obj_key
& _key
,
985 const std::optional
<rgw_obj_key
>& _dest_key
,
986 std::optional
<uint64_t> _versioned_epoch
,
988 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
989 rgw_zone_set
*_zones_trace
,
990 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
991 : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
992 async_rados(_async_rados
), store(_store
),
993 source_zone(_source_zone
),
995 src_bucket(_src_bucket
),
996 dest_placement_rule(_dest_placement_rule
),
997 dest_bucket_info(_dest_bucket_info
),
1000 versioned_epoch(_versioned_epoch
),
1001 copy_if_newer(_if_newer
),
1004 zones_trace(_zones_trace
), counters(counters
), dpp(dpp
) {}
1007 ~RGWFetchRemoteObjCR() override
{
1011 void request_cleanup() override
{
1018 int send_request() override
{
1019 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
,
1020 source_zone
, user_id
, src_bucket
, dest_placement_rule
, dest_bucket_info
,
1021 key
, dest_key
, versioned_epoch
, copy_if_newer
, filter
,
1022 zones_trace
, counters
, dpp
);
1023 async_rados
->queue(req
);
1027 int request_complete() override
{
1028 return req
->get_ret_status();
1032 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
1033 rgw::sal::RGWRadosStore
*store
;
1034 rgw_zone_id source_zone
;
1036 rgw_bucket src_bucket
;
1039 ceph::real_time
*pmtime
;
1042 map
<string
, bufferlist
> *pattrs
;
1043 map
<string
, string
> *pheaders
;
1046 int _send_request() override
;
1048 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
1049 const rgw_zone_id
& _source_zone
,
1050 rgw_bucket
& _src_bucket
,
1051 const rgw_obj_key
& _key
,
1052 ceph::real_time
*_pmtime
,
1055 map
<string
, bufferlist
> *_pattrs
,
1056 map
<string
, string
> *_pheaders
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1057 source_zone(_source_zone
),
1058 src_bucket(_src_bucket
),
1064 pheaders(_pheaders
) {}
1067 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
1069 RGWAsyncRadosProcessor
*async_rados
;
1070 rgw::sal::RGWRadosStore
*store
;
1071 rgw_zone_id source_zone
;
1073 rgw_bucket src_bucket
;
1076 ceph::real_time
*pmtime
;
1079 map
<string
, bufferlist
> *pattrs
;
1080 map
<string
, string
> *pheaders
;
1082 RGWAsyncStatRemoteObj
*req
;
1085 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1086 const rgw_zone_id
& _source_zone
,
1087 rgw_bucket
& _src_bucket
,
1088 const rgw_obj_key
& _key
,
1089 ceph::real_time
*_pmtime
,
1092 map
<string
, bufferlist
> *_pattrs
,
1093 map
<string
, string
> *_pheaders
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1094 async_rados(_async_rados
), store(_store
),
1095 source_zone(_source_zone
),
1096 src_bucket(_src_bucket
),
1102 pheaders(_pheaders
),
1106 ~RGWStatRemoteObjCR() override
{
1110 void request_cleanup() override
{
1117 int send_request() override
{
1118 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
1119 src_bucket
, key
, pmtime
, psize
, petag
, pattrs
, pheaders
);
1120 async_rados
->queue(req
);
1124 int request_complete() override
{
1125 return req
->get_ret_status();
1129 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
1130 rgw::sal::RGWRadosStore
*store
;
1131 rgw_zone_id source_zone
;
1133 RGWBucketInfo bucket_info
;
1137 string owner_display_name
;
1139 uint64_t versioned_epoch
;
1140 string marker_version_id
;
1143 ceph::real_time timestamp
;
1144 rgw_zone_set zones_trace
;
1147 int _send_request() override
;
1149 RGWAsyncRemoveObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
1150 const rgw_zone_id
& _source_zone
,
1151 RGWBucketInfo
& _bucket_info
,
1152 const rgw_obj_key
& _key
,
1153 const string
& _owner
,
1154 const string
& _owner_display_name
,
1156 uint64_t _versioned_epoch
,
1157 bool _delete_marker
,
1159 real_time
& _timestamp
,
1160 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1161 source_zone(_source_zone
),
1162 bucket_info(_bucket_info
),
1165 owner_display_name(_owner_display_name
),
1166 versioned(_versioned
),
1167 versioned_epoch(_versioned_epoch
),
1168 del_if_older(_if_older
),
1169 timestamp(_timestamp
) {
1170 if (_delete_marker
) {
1171 marker_version_id
= key
.instance
;
1175 zones_trace
= *_zones_trace
;
1180 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
1182 RGWAsyncRadosProcessor
*async_rados
;
1183 rgw::sal::RGWRadosStore
*store
;
1184 rgw_zone_id source_zone
;
1186 RGWBucketInfo bucket_info
;
1190 uint64_t versioned_epoch
;
1193 string owner_display_name
;
1196 real_time timestamp
;
1198 RGWAsyncRemoveObj
*req
;
1200 rgw_zone_set
*zones_trace
;
1203 RGWRemoveObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1204 const rgw_zone_id
& _source_zone
,
1205 RGWBucketInfo
& _bucket_info
,
1206 const rgw_obj_key
& _key
,
1208 uint64_t _versioned_epoch
,
1210 string
*_owner_display_name
,
1211 bool _delete_marker
,
1212 real_time
*_timestamp
,
1213 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1214 async_rados(_async_rados
), store(_store
),
1215 source_zone(_source_zone
),
1216 bucket_info(_bucket_info
),
1218 versioned(_versioned
),
1219 versioned_epoch(_versioned_epoch
),
1220 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1221 del_if_older
= (_timestamp
!= NULL
);
1223 timestamp
= *_timestamp
;
1230 if (_owner_display_name
) {
1231 owner_display_name
= *_owner_display_name
;
1234 ~RGWRemoveObjCR() override
{
1238 void request_cleanup() override
{
1245 int send_request() override
{
1246 req
= new RGWAsyncRemoveObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1247 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1248 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1249 async_rados
->queue(req
);
1253 int request_complete() override
{
1254 return req
->get_ret_status();
1258 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1259 RGWAsyncRadosProcessor
*async_rados
;
1260 rgw::sal::RGWRadosStore
*store
;
1262 const rgw_raw_obj obj
;
1264 const string lock_name
;
1265 const string cookie
;
1268 bool going_down
{ false };
1271 RGWCoroutine
*caller
;
1273 bool aborted
{false};
1276 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1277 const rgw_raw_obj
& _obj
,
1278 const string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1279 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1280 obj(_obj
), lock_name(_lock_name
),
1281 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1282 interval(_interval
), caller(_caller
)
1285 int operate() override
;
1287 bool is_locked() const {
1291 void set_locked(bool status
) {
1305 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1306 rgw::sal::RGWRadosStore
*store
;
1307 list
<cls_log_entry
> entries
;
1311 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1314 RGWRadosTimelogAddCR(rgw::sal::RGWRadosStore
*_store
, const string
& _oid
,
1315 const cls_log_entry
& entry
);
1317 int send_request() override
;
1318 int request_complete() override
;
1321 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1322 rgw::sal::RGWRadosStore
*store
;
1323 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1326 real_time start_time
;
1328 std::string from_marker
;
1329 std::string to_marker
;
1332 RGWRadosTimelogTrimCR(rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
1333 const real_time
& start_time
, const real_time
& end_time
,
1334 const std::string
& from_marker
,
1335 const std::string
& to_marker
);
1337 int send_request() override
;
1338 int request_complete() override
;
1341 // wrapper to update last_trim_marker on success
1342 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1344 std::string
*last_trim_marker
;
1346 static constexpr const char* max_marker
= "99999999";
1348 RGWSyncLogTrimCR(rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
1349 const std::string
& to_marker
, std::string
*last_trim_marker
);
1350 int request_complete() override
;
1353 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1354 rgw::sal::RGWRadosStore
*store
;
1355 RGWBucketInfo bucket_info
;
1360 RGWObjVersionTracker
*objv_tracker
;
1362 int _send_request() override
;
1364 RGWAsyncStatObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*store
,
1365 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1366 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1367 RGWObjVersionTracker
*objv_tracker
= nullptr)
1368 : RGWAsyncRadosRequest(caller
, cn
), store(store
), obj(obj
), psize(psize
),
1369 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1372 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1373 rgw::sal::RGWRadosStore
*store
;
1374 RGWAsyncRadosProcessor
*async_rados
;
1375 RGWBucketInfo bucket_info
;
1380 RGWObjVersionTracker
*objv_tracker
;
1381 RGWAsyncStatObj
*req
= nullptr;
1383 RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RGWRadosStore
*store
,
1384 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1385 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1386 RGWObjVersionTracker
*objv_tracker
= nullptr);
1387 ~RGWStatObjCR() override
{
1390 void request_cleanup() override
;
1392 int send_request() override
;
1393 int request_complete() override
;
1396 /// coroutine wrapper for IoCtx::aio_notify()
1397 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1398 rgw::sal::RGWRadosStore
*const store
;
1399 const rgw_raw_obj obj
;
1401 const uint64_t timeout_ms
;
1402 bufferlist
*response
;
1404 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1407 RGWRadosNotifyCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
1408 bufferlist
& request
, uint64_t timeout_ms
,
1409 bufferlist
*response
);
1411 int send_request() override
;
1412 int request_complete() override
;