1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
11 #include "common/WorkQueue.h"
12 #include "common/Throttle.h"
16 #include "services/svc_sys_obj.h"
17 #include "services/svc_bucket.h"
19 class RGWAsyncRadosRequest
: public RefCountedObject
{
21 RGWAioCompletionNotifier
*notifier
;
25 ceph::mutex lock
= ceph::make_mutex("RGWAsyncRadosRequest::lock");
28 virtual int _send_request() = 0;
30 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
)
31 : caller(_caller
), notifier(_cn
), retcode(0) {
33 ~RGWAsyncRadosRequest() override
{
41 retcode
= _send_request();
43 std::lock_guard l
{lock
};
45 notifier
->cb(); // drops its own ref
52 int get_ret_status() { return retcode
; }
56 std::lock_guard l
{lock
};
58 // we won't call notifier->cb() to drop its ref, so drop it here
68 class RGWAsyncRadosProcessor
{
69 deque
<RGWAsyncRadosRequest
*> m_req_queue
;
70 std::atomic
<bool> going_down
= { false };
74 Throttle req_throttle
;
76 struct RGWWQ
: public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
77 RGWAsyncRadosProcessor
*processor
;
78 RGWWQ(RGWAsyncRadosProcessor
*p
, time_t timeout
, time_t suicide_timeout
, ThreadPool
*tp
)
79 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
81 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
82 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
85 bool _empty() override
;
86 RGWAsyncRadosRequest
*_dequeue() override
;
87 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
88 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
90 void _clear() override
{
91 ceph_assert(processor
->m_req_queue
.empty());
96 RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
);
97 ~RGWAsyncRadosProcessor() {}
100 void handle_request(RGWAsyncRadosRequest
*req
);
101 void queue(RGWAsyncRadosRequest
*req
);
103 bool is_going_down() {
109 class RGWSimpleWriteOnlyAsyncCR
: public RGWSimpleCoroutine
{
110 RGWAsyncRadosProcessor
*async_rados
;
111 rgw::sal::RGWRadosStore
*store
;
114 const DoutPrefixProvider
*dpp
;
116 class Request
: public RGWAsyncRadosRequest
{
117 rgw::sal::RGWRadosStore
*store
;
119 const DoutPrefixProvider
*dpp
;
121 int _send_request() override
;
123 Request(RGWCoroutine
*caller
,
124 RGWAioCompletionNotifier
*cn
,
125 rgw::sal::RGWRadosStore
*store
,
127 const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
),
134 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
135 rgw::sal::RGWRadosStore
*_store
,
137 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
138 async_rados(_async_rados
),
143 ~RGWSimpleWriteOnlyAsyncCR() override
{
146 void request_cleanup() override
{
153 int send_request() override
{
154 req
= new Request(this,
155 stack
->create_completion_notifier(),
160 async_rados
->queue(req
);
163 int request_complete() override
{
164 return req
->get_ret_status();
169 template <class P
, class R
>
170 class RGWSimpleAsyncCR
: public RGWSimpleCoroutine
{
171 RGWAsyncRadosProcessor
*async_rados
;
172 rgw::sal::RGWRadosStore
*store
;
175 std::shared_ptr
<R
> result
;
177 class Request
: public RGWAsyncRadosRequest
{
178 rgw::sal::RGWRadosStore
*store
;
180 std::shared_ptr
<R
> result
;
182 int _send_request() override
;
184 Request(RGWCoroutine
*caller
,
185 RGWAioCompletionNotifier
*cn
,
186 rgw::sal::RGWRadosStore
*_store
,
188 std::shared_ptr
<R
>& _result
) : RGWAsyncRadosRequest(caller
, cn
),
195 RGWSimpleAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
196 rgw::sal::RGWRadosStore
*_store
,
198 std::shared_ptr
<R
>& _result
) : RGWSimpleCoroutine(_store
->ctx()),
199 async_rados(_async_rados
),
204 ~RGWSimpleAsyncCR() override
{
207 void request_cleanup() override
{
214 int send_request() override
{
215 req
= new Request(this,
216 stack
->create_completion_notifier(),
221 async_rados
->queue(req
);
224 int request_complete() override
{
225 return req
->get_ret_status();
229 class RGWGenericAsyncCR
: public RGWSimpleCoroutine
{
230 RGWAsyncRadosProcessor
*async_rados
;
231 rgw::sal::RGWRadosStore
*store
;
238 virtual int operate() = 0;
242 std::shared_ptr
<Action
> action
;
244 class Request
: public RGWAsyncRadosRequest
{
245 std::shared_ptr
<Action
> action
;
247 int _send_request() override
{
251 return action
->operate();
254 Request(RGWCoroutine
*caller
,
255 RGWAioCompletionNotifier
*cn
,
256 std::shared_ptr
<Action
>& _action
) : RGWAsyncRadosRequest(caller
, cn
),
261 RGWGenericAsyncCR(CephContext
*_cct
,
262 RGWAsyncRadosProcessor
*_async_rados
,
263 std::shared_ptr
<Action
>& _action
) : RGWSimpleCoroutine(_cct
),
264 async_rados(_async_rados
),
267 RGWGenericAsyncCR(CephContext
*_cct
,
268 RGWAsyncRadosProcessor
*_async_rados
,
269 std::shared_ptr
<T
>& _action
) : RGWSimpleCoroutine(_cct
),
270 async_rados(_async_rados
),
271 action(std::static_pointer_cast
<Action
>(_action
)) {}
273 ~RGWGenericAsyncCR() override
{
276 void request_cleanup() override
{
283 int send_request() override
{
284 req
= new Request(this,
285 stack
->create_completion_notifier(),
288 async_rados
->queue(req
);
291 int request_complete() override
{
292 return req
->get_ret_status();
297 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
298 RGWSysObjectCtx obj_ctx
;
299 RGWObjVersionTracker objv_tracker
;
301 const bool want_attrs
;
302 const bool raw_attrs
;
304 int _send_request() override
;
306 RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
307 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
308 bool want_attrs
, bool raw_attrs
);
311 map
<string
, bufferlist
> attrs
;
314 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
321 int _send_request() override
;
323 RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
324 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
325 bool _exclusive
, bufferlist _bl
);
327 RGWObjVersionTracker objv_tracker
;
330 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
333 map
<string
, bufferlist
> attrs
;
336 int _send_request() override
;
338 RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
339 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
340 map
<string
, bufferlist
> _attrs
);
342 RGWObjVersionTracker objv_tracker
;
345 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
346 rgw::sal::RGWRadosStore
*store
;
350 uint32_t duration_secs
;
353 int _send_request() override
;
355 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
356 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
357 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
);
360 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
361 rgw::sal::RGWRadosStore
*store
;
367 int _send_request() override
;
369 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
370 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
371 const string
& _name
, const string
& _cookie
);
375 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
376 RGWAsyncRadosProcessor
*async_rados
;
381 /// on ENOENT, call handle_data() with an empty object instead of failing
382 const bool empty_on_enoent
;
383 RGWObjVersionTracker
*objv_tracker
;
384 RGWAsyncGetSystemObj
*req
{nullptr};
387 RGWSimpleRadosReadCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
388 const rgw_raw_obj
& _obj
,
389 T
*_result
, bool empty_on_enoent
= true,
390 RGWObjVersionTracker
*objv_tracker
= nullptr)
391 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
), svc(_svc
),
392 obj(_obj
), result(_result
),
393 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
394 ~RGWSimpleRadosReadCR() override
{
398 void request_cleanup() override
{
405 int send_request() override
;
406 int request_complete() override
;
408 virtual int handle_data(T
& data
) {
414 int RGWSimpleRadosReadCR
<T
>::send_request()
416 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(), svc
,
417 objv_tracker
, obj
, false, false);
418 async_rados
->queue(req
);
423 int RGWSimpleRadosReadCR
<T
>::request_complete()
425 int ret
= req
->get_ret_status();
427 if (ret
== -ENOENT
&& empty_on_enoent
) {
434 auto iter
= req
->bl
.cbegin();
436 // allow successful reads with empty buffers. ReadSyncStatus coroutines
437 // depend on this to be able to read without locking, because the
438 // cls lock from InitSyncStatus will create an empty object if it didn't
442 decode(*result
, iter
);
444 } catch (buffer::error
& err
) {
449 return handle_data(*result
);
452 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
453 RGWAsyncRadosProcessor
*async_rados
;
457 map
<string
, bufferlist
> *pattrs
;
459 RGWAsyncGetSystemObj
*req
;
462 RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
463 const rgw_raw_obj
& _obj
,
464 map
<string
, bufferlist
> *_pattrs
, bool _raw_attrs
) : RGWSimpleCoroutine(_svc
->ctx()),
465 async_rados(_async_rados
), svc(_svc
),
468 raw_attrs(_raw_attrs
),
470 ~RGWSimpleRadosReadAttrsCR() override
{
474 void request_cleanup() override
{
481 int send_request() override
;
482 int request_complete() override
;
486 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
487 RGWAsyncRadosProcessor
*async_rados
;
491 RGWObjVersionTracker
*objv_tracker
;
492 RGWAsyncPutSystemObj
*req
{nullptr};
495 RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
496 const rgw_raw_obj
& _obj
,
497 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
498 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
),
499 svc(_svc
), obj(_obj
), objv_tracker(objv_tracker
) {
503 ~RGWSimpleRadosWriteCR() override
{
507 void request_cleanup() override
{
514 int send_request() override
{
515 req
= new RGWAsyncPutSystemObj(this, stack
->create_completion_notifier(),
516 svc
, objv_tracker
, obj
, false, std::move(bl
));
517 async_rados
->queue(req
);
521 int request_complete() override
{
522 if (objv_tracker
) { // copy the updated version
523 *objv_tracker
= req
->objv_tracker
;
525 return req
->get_ret_status();
529 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
530 RGWAsyncRadosProcessor
*async_rados
;
532 RGWObjVersionTracker
*objv_tracker
;
535 map
<string
, bufferlist
> attrs
;
536 RGWAsyncPutSystemObjAttrs
*req
= nullptr;
539 RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor
*_async_rados
,
540 RGWSI_SysObj
*_svc
, const rgw_raw_obj
& _obj
,
541 map
<string
, bufferlist
> _attrs
,
542 RGWObjVersionTracker
*objv_tracker
= nullptr)
543 : RGWSimpleCoroutine(_svc
->ctx()), async_rados(_async_rados
),
544 svc(_svc
), objv_tracker(objv_tracker
), obj(_obj
),
545 attrs(std::move(_attrs
)) {
547 ~RGWSimpleRadosWriteAttrsCR() override
{
551 void request_cleanup() override
{
558 int send_request() override
{
559 req
= new RGWAsyncPutSystemObjAttrs(this, stack
->create_completion_notifier(),
560 svc
, objv_tracker
, obj
, std::move(attrs
));
561 async_rados
->queue(req
);
565 int request_complete() override
{
566 if (objv_tracker
) { // copy the updated version
567 *objv_tracker
= req
->objv_tracker
;
569 return req
->get_ret_status();
573 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
574 rgw::sal::RGWRadosStore
*store
;
575 map
<string
, bufferlist
> entries
;
581 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
584 RGWRadosSetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
585 const rgw_raw_obj
& _obj
,
586 map
<string
, bufferlist
>& _entries
);
588 int send_request() override
;
589 int request_complete() override
;
592 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
596 std::set
<std::string
> entries
;
599 using ResultPtr
= std::shared_ptr
<Result
>;
601 RGWRadosGetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
, const rgw_raw_obj
& _obj
,
602 const string
& _marker
, int _max_entries
,
605 int send_request() override
;
606 int request_complete() override
;
609 rgw::sal::RGWRadosStore
*store
;
614 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
617 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
618 rgw::sal::RGWRadosStore
*store
;
626 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
629 RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
630 const rgw_raw_obj
& _obj
,
631 const set
<string
>& _keys
);
633 int send_request() override
;
635 int request_complete() override
;
638 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
639 rgw::sal::RGWRadosStore
*store
;
640 librados::IoCtx ioctx
;
641 const rgw_raw_obj obj
;
642 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
645 RGWRadosRemoveCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
);
647 int send_request() override
;
648 int request_complete() override
;
651 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
652 RGWAsyncRadosProcessor
*async_rados
;
653 rgw::sal::RGWRadosStore
*store
;
660 RGWAsyncLockSystemObj
*req
;
663 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
664 const rgw_raw_obj
& _obj
,
665 const string
& _lock_name
,
666 const string
& _cookie
,
668 ~RGWSimpleRadosLockCR() override
{
671 void request_cleanup() override
;
673 int send_request() override
;
674 int request_complete() override
;
676 static std::string
gen_random_cookie(CephContext
* cct
) {
677 #define COOKIE_LEN 16
678 char buf
[COOKIE_LEN
+ 1];
679 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
684 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
685 RGWAsyncRadosProcessor
*async_rados
;
686 rgw::sal::RGWRadosStore
*store
;
692 RGWAsyncUnlockSystemObj
*req
;
695 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
696 const rgw_raw_obj
& _obj
,
697 const string
& _lock_name
,
698 const string
& _cookie
);
699 ~RGWSimpleRadosUnlockCR() override
{
702 void request_cleanup() override
;
704 int send_request() override
;
705 int request_complete() override
;
708 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
710 class RGWOmapAppend
: public RGWConsumerCR
<string
> {
711 RGWAsyncRadosProcessor
*async_rados
;
712 rgw::sal::RGWRadosStore
*store
;
718 int num_pending_entries
;
719 list
<string
> pending_entries
;
721 map
<string
, bufferlist
> entries
;
723 uint64_t window_size
;
724 uint64_t total_entries
;
726 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
727 const rgw_raw_obj
& _obj
,
728 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
729 int operate() override
;
730 void flush_pending();
731 bool append(const string
& s
);
734 uint64_t get_total_entries() {
735 return total_entries
;
738 const rgw_raw_obj
& get_obj() {
743 class RGWAsyncWait
: public RGWAsyncRadosRequest
{
746 ceph::condition_variable
*cond
;
747 std::chrono::seconds interval
;
749 int _send_request() override
{
750 std::unique_lock l
{*lock
};
751 return (cond
->wait_for(l
, interval
) == std::cv_status::timeout
?
755 RGWAsyncWait(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, CephContext
*_cct
,
756 ceph::mutex
*_lock
, ceph::condition_variable
*_cond
, int _secs
)
757 : RGWAsyncRadosRequest(caller
, cn
),
759 lock(_lock
), cond(_cond
), interval(_secs
) {}
762 std::lock_guard l
{*lock
};
767 class RGWWaitCR
: public RGWSimpleCoroutine
{
769 RGWAsyncRadosProcessor
*async_rados
;
771 ceph::condition_variable
*cond
;
777 RGWWaitCR(RGWAsyncRadosProcessor
*_async_rados
, CephContext
*_cct
,
778 ceph::mutex
*_lock
, ceph::condition_variable
*_cond
,
779 int _secs
) : RGWSimpleCoroutine(_cct
), cct(_cct
),
780 async_rados(_async_rados
), lock(_lock
), cond(_cond
), secs(_secs
), req(NULL
) {
782 ~RGWWaitCR() override
{
786 void request_cleanup() override
{
794 int send_request() override
{
795 req
= new RGWAsyncWait(this, stack
->create_completion_notifier(), cct
, lock
, cond
, secs
);
796 async_rados
->queue(req
);
800 int request_complete() override
{
801 return req
->get_ret_status();
809 class RGWShardedOmapCRManager
{
810 RGWAsyncRadosProcessor
*async_rados
;
811 rgw::sal::RGWRadosStore
*store
;
816 vector
<RGWOmapAppend
*> shards
;
818 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const string
& oid_prefix
)
819 : async_rados(_async_rados
),
820 store(_store
), op(_op
), num_shards(_num_shards
) {
821 shards
.reserve(num_shards
);
822 for (int i
= 0; i
< num_shards
; ++i
) {
823 char buf
[oid_prefix
.size() + 16];
824 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
825 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
827 shards
.push_back(shard
);
828 op
->spawn(shard
, false);
832 ~RGWShardedOmapCRManager() {
833 for (auto shard
: shards
) {
838 bool append(const string
& entry
, int shard_id
) {
839 return shards
[shard_id
]->append(entry
);
843 for (vector
<RGWOmapAppend
*>::iterator iter
= shards
.begin(); iter
!= shards
.end(); ++iter
) {
844 success
&= ((*iter
)->finish() && (!(*iter
)->is_error()));
849 uint64_t get_total_entries(int shard_id
) {
850 return shards
[shard_id
]->get_total_entries();
854 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
855 rgw::sal::RGWRadosStore
*store
;
859 int _send_request() override
;
861 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
862 rgw::sal::RGWRadosStore
*_store
, const rgw_bucket
& bucket
)
863 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), bucket(bucket
) {}
865 RGWBucketInfo bucket_info
;
866 map
<string
, bufferlist
> attrs
;
869 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
870 RGWAsyncRadosProcessor
*async_rados
;
871 rgw::sal::RGWRadosStore
*store
;
873 RGWBucketInfo
*bucket_info
;
874 map
<string
, bufferlist
> *pattrs
;
876 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
879 // rgw_bucket constructor
880 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
881 const rgw_bucket
& _bucket
, RGWBucketInfo
*_bucket_info
,
882 map
<string
, bufferlist
> *_pattrs
)
883 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
884 bucket(_bucket
), bucket_info(_bucket_info
), pattrs(_pattrs
) {}
885 ~RGWGetBucketInstanceInfoCR() override
{
888 void request_cleanup() override
{
895 int send_request() override
{
896 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, bucket
);
897 async_rados
->queue(req
);
900 int request_complete() override
{
902 *bucket_info
= std::move(req
->bucket_info
);
905 *pattrs
= std::move(req
->attrs
);
907 return req
->get_ret_status();
911 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
912 RGWRados::BucketShard bs
;
913 std::string start_marker
;
914 std::string end_marker
;
915 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
917 RGWRadosBILogTrimCR(rgw::sal::RGWRadosStore
*store
, const RGWBucketInfo
& bucket_info
,
918 int shard_id
, const std::string
& start_marker
,
919 const std::string
& end_marker
);
921 int send_request() override
;
922 int request_complete() override
;
925 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
926 rgw::sal::RGWRadosStore
*store
;
927 rgw_zone_id source_zone
;
929 std::optional
<rgw_user
> user_id
;
931 rgw_bucket src_bucket
;
932 std::optional
<rgw_placement_rule
> dest_placement_rule
;
933 RGWBucketInfo dest_bucket_info
;
936 std::optional
<rgw_obj_key
> dest_key
;
937 std::optional
<uint64_t> versioned_epoch
;
942 std::shared_ptr
<RGWFetchObjFilter
> filter
;
943 rgw_zone_set zones_trace
;
944 PerfCounters
* counters
;
945 const DoutPrefixProvider
*dpp
;
948 int _send_request() override
;
950 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
951 const rgw_zone_id
& _source_zone
,
952 std::optional
<rgw_user
>& _user_id
,
953 const rgw_bucket
& _src_bucket
,
954 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
955 const RGWBucketInfo
& _dest_bucket_info
,
956 const rgw_obj_key
& _key
,
957 const std::optional
<rgw_obj_key
>& _dest_key
,
958 std::optional
<uint64_t> _versioned_epoch
,
960 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
961 rgw_zone_set
*_zones_trace
,
962 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
963 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
964 source_zone(_source_zone
),
966 src_bucket(_src_bucket
),
967 dest_placement_rule(_dest_placement_rule
),
968 dest_bucket_info(_dest_bucket_info
),
971 versioned_epoch(_versioned_epoch
),
972 copy_if_newer(_if_newer
),
978 zones_trace
= *_zones_trace
;
983 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
985 RGWAsyncRadosProcessor
*async_rados
;
986 rgw::sal::RGWRadosStore
*store
;
987 rgw_zone_id source_zone
;
989 std::optional
<rgw_user
> user_id
;
991 rgw_bucket src_bucket
;
992 std::optional
<rgw_placement_rule
> dest_placement_rule
;
993 RGWBucketInfo dest_bucket_info
;
996 std::optional
<rgw_obj_key
> dest_key
;
997 std::optional
<uint64_t> versioned_epoch
;
1003 std::shared_ptr
<RGWFetchObjFilter
> filter
;
1005 RGWAsyncFetchRemoteObj
*req
;
1006 rgw_zone_set
*zones_trace
;
1007 PerfCounters
* counters
;
1008 const DoutPrefixProvider
*dpp
;
1011 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1012 const rgw_zone_id
& _source_zone
,
1013 std::optional
<rgw_user
> _user_id
,
1014 const rgw_bucket
& _src_bucket
,
1015 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
1016 const RGWBucketInfo
& _dest_bucket_info
,
1017 const rgw_obj_key
& _key
,
1018 const std::optional
<rgw_obj_key
>& _dest_key
,
1019 std::optional
<uint64_t> _versioned_epoch
,
1021 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
1022 rgw_zone_set
*_zones_trace
,
1023 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
1024 : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1025 async_rados(_async_rados
), store(_store
),
1026 source_zone(_source_zone
),
1028 src_bucket(_src_bucket
),
1029 dest_placement_rule(_dest_placement_rule
),
1030 dest_bucket_info(_dest_bucket_info
),
1032 dest_key(_dest_key
),
1033 versioned_epoch(_versioned_epoch
),
1034 copy_if_newer(_if_newer
),
1037 zones_trace(_zones_trace
), counters(counters
), dpp(dpp
) {}
1040 ~RGWFetchRemoteObjCR() override
{
1044 void request_cleanup() override
{
1051 int send_request() override
{
1052 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
,
1053 source_zone
, user_id
, src_bucket
, dest_placement_rule
, dest_bucket_info
,
1054 key
, dest_key
, versioned_epoch
, copy_if_newer
, filter
,
1055 zones_trace
, counters
, dpp
);
1056 async_rados
->queue(req
);
1060 int request_complete() override
{
1061 return req
->get_ret_status();
1065 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
1066 rgw::sal::RGWRadosStore
*store
;
1067 rgw_zone_id source_zone
;
1069 rgw_bucket src_bucket
;
1072 ceph::real_time
*pmtime
;
1075 map
<string
, bufferlist
> *pattrs
;
1076 map
<string
, string
> *pheaders
;
1079 int _send_request() override
;
1081 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
1082 const rgw_zone_id
& _source_zone
,
1083 rgw_bucket
& _src_bucket
,
1084 const rgw_obj_key
& _key
,
1085 ceph::real_time
*_pmtime
,
1088 map
<string
, bufferlist
> *_pattrs
,
1089 map
<string
, string
> *_pheaders
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1090 source_zone(_source_zone
),
1091 src_bucket(_src_bucket
),
1097 pheaders(_pheaders
) {}
1100 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
1102 RGWAsyncRadosProcessor
*async_rados
;
1103 rgw::sal::RGWRadosStore
*store
;
1104 rgw_zone_id source_zone
;
1106 rgw_bucket src_bucket
;
1109 ceph::real_time
*pmtime
;
1112 map
<string
, bufferlist
> *pattrs
;
1113 map
<string
, string
> *pheaders
;
1115 RGWAsyncStatRemoteObj
*req
;
1118 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1119 const rgw_zone_id
& _source_zone
,
1120 rgw_bucket
& _src_bucket
,
1121 const rgw_obj_key
& _key
,
1122 ceph::real_time
*_pmtime
,
1125 map
<string
, bufferlist
> *_pattrs
,
1126 map
<string
, string
> *_pheaders
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1127 async_rados(_async_rados
), store(_store
),
1128 source_zone(_source_zone
),
1129 src_bucket(_src_bucket
),
1135 pheaders(_pheaders
),
1139 ~RGWStatRemoteObjCR() override
{
1143 void request_cleanup() override
{
1150 int send_request() override
{
1151 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
1152 src_bucket
, key
, pmtime
, psize
, petag
, pattrs
, pheaders
);
1153 async_rados
->queue(req
);
1157 int request_complete() override
{
1158 return req
->get_ret_status();
1162 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
1163 rgw::sal::RGWRadosStore
*store
;
1164 rgw_zone_id source_zone
;
1166 RGWBucketInfo bucket_info
;
1170 string owner_display_name
;
1172 uint64_t versioned_epoch
;
1173 string marker_version_id
;
1176 ceph::real_time timestamp
;
1177 rgw_zone_set zones_trace
;
1180 int _send_request() override
;
1182 RGWAsyncRemoveObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
1183 const rgw_zone_id
& _source_zone
,
1184 RGWBucketInfo
& _bucket_info
,
1185 const rgw_obj_key
& _key
,
1186 const string
& _owner
,
1187 const string
& _owner_display_name
,
1189 uint64_t _versioned_epoch
,
1190 bool _delete_marker
,
1192 real_time
& _timestamp
,
1193 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1194 source_zone(_source_zone
),
1195 bucket_info(_bucket_info
),
1198 owner_display_name(_owner_display_name
),
1199 versioned(_versioned
),
1200 versioned_epoch(_versioned_epoch
),
1201 del_if_older(_if_older
),
1202 timestamp(_timestamp
) {
1203 if (_delete_marker
) {
1204 marker_version_id
= key
.instance
;
1208 zones_trace
= *_zones_trace
;
1213 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
1215 RGWAsyncRadosProcessor
*async_rados
;
1216 rgw::sal::RGWRadosStore
*store
;
1217 rgw_zone_id source_zone
;
1219 RGWBucketInfo bucket_info
;
1223 uint64_t versioned_epoch
;
1226 string owner_display_name
;
1229 real_time timestamp
;
1231 RGWAsyncRemoveObj
*req
;
1233 rgw_zone_set
*zones_trace
;
1236 RGWRemoveObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1237 const rgw_zone_id
& _source_zone
,
1238 RGWBucketInfo
& _bucket_info
,
1239 const rgw_obj_key
& _key
,
1241 uint64_t _versioned_epoch
,
1243 string
*_owner_display_name
,
1244 bool _delete_marker
,
1245 real_time
*_timestamp
,
1246 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1247 async_rados(_async_rados
), store(_store
),
1248 source_zone(_source_zone
),
1249 bucket_info(_bucket_info
),
1251 versioned(_versioned
),
1252 versioned_epoch(_versioned_epoch
),
1253 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1254 del_if_older
= (_timestamp
!= NULL
);
1256 timestamp
= *_timestamp
;
1263 if (_owner_display_name
) {
1264 owner_display_name
= *_owner_display_name
;
1267 ~RGWRemoveObjCR() override
{
1271 void request_cleanup() override
{
1278 int send_request() override
{
1279 req
= new RGWAsyncRemoveObj(this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1280 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1281 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1282 async_rados
->queue(req
);
1286 int request_complete() override
{
1287 return req
->get_ret_status();
1291 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1292 RGWAsyncRadosProcessor
*async_rados
;
1293 rgw::sal::RGWRadosStore
*store
;
1295 const rgw_raw_obj obj
;
1297 const string lock_name
;
1298 const string cookie
;
1302 ceph::mutex lock
= ceph::make_mutex("RGWContinuousLeaseCR");
1303 std::atomic
<bool> going_down
= { false };
1306 RGWCoroutine
*caller
;
1308 bool aborted
{false};
1311 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1312 const rgw_raw_obj
& _obj
,
1313 const string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1314 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1315 obj(_obj
), lock_name(_lock_name
),
1316 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1317 interval(_interval
), caller(_caller
)
1320 int operate() override
;
1323 std::lock_guard l
{lock
};
1327 void set_locked(bool status
) {
1328 std::lock_guard l
{lock
};
1342 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1343 rgw::sal::RGWRadosStore
*store
;
1344 list
<cls_log_entry
> entries
;
1348 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1351 RGWRadosTimelogAddCR(rgw::sal::RGWRadosStore
*_store
, const string
& _oid
,
1352 const cls_log_entry
& entry
);
1354 int send_request() override
;
1355 int request_complete() override
;
1358 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1359 rgw::sal::RGWRadosStore
*store
;
1360 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1363 real_time start_time
;
1365 std::string from_marker
;
1366 std::string to_marker
;
1369 RGWRadosTimelogTrimCR(rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
1370 const real_time
& start_time
, const real_time
& end_time
,
1371 const std::string
& from_marker
,
1372 const std::string
& to_marker
);
1374 int send_request() override
;
1375 int request_complete() override
;
1378 // wrapper to update last_trim_marker on success
1379 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1381 std::string
*last_trim_marker
;
1383 // a marker that compares greater than any timestamp-based index
1384 static constexpr const char* max_marker
= "99999999";
1386 RGWSyncLogTrimCR(rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
1387 const std::string
& to_marker
, std::string
*last_trim_marker
);
1388 int request_complete() override
;
1391 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1392 rgw::sal::RGWRadosStore
*store
;
1393 RGWBucketInfo bucket_info
;
1398 RGWObjVersionTracker
*objv_tracker
;
1400 int _send_request() override
;
1402 RGWAsyncStatObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*store
,
1403 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1404 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1405 RGWObjVersionTracker
*objv_tracker
= nullptr)
1406 : RGWAsyncRadosRequest(caller
, cn
), store(store
), obj(obj
), psize(psize
),
1407 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1410 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1411 rgw::sal::RGWRadosStore
*store
;
1412 RGWAsyncRadosProcessor
*async_rados
;
1413 RGWBucketInfo bucket_info
;
1418 RGWObjVersionTracker
*objv_tracker
;
1419 RGWAsyncStatObj
*req
= nullptr;
1421 RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RGWRadosStore
*store
,
1422 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1423 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1424 RGWObjVersionTracker
*objv_tracker
= nullptr);
1425 ~RGWStatObjCR() override
{
1428 void request_cleanup() override
;
1430 int send_request() override
;
1431 int request_complete() override
;
1434 /// coroutine wrapper for IoCtx::aio_notify()
1435 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1436 rgw::sal::RGWRadosStore
*const store
;
1437 const rgw_raw_obj obj
;
1439 const uint64_t timeout_ms
;
1440 bufferlist
*response
;
1442 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1445 RGWRadosNotifyCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
1446 bufferlist
& request
, uint64_t timeout_ms
,
1447 bufferlist
*response
);
1449 int send_request() override
;
1450 int request_complete() override
;