1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
11 #include "rgw_sal_rados.h"
12 #include "common/WorkQueue.h"
13 #include "common/Throttle.h"
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_bucket.h"
20 #define dout_subsys ceph_subsys_rgw
22 class RGWAsyncRadosRequest
: public RefCountedObject
{
24 RGWAioCompletionNotifier
*notifier
;
28 ceph::mutex lock
= ceph::make_mutex("RGWAsyncRadosRequest::lock");
31 virtual int _send_request(const DoutPrefixProvider
*dpp
) = 0;
33 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
)
34 : caller(_caller
), notifier(_cn
), retcode(0) {
36 ~RGWAsyncRadosRequest() override
{
42 void send_request(const DoutPrefixProvider
*dpp
) {
44 retcode
= _send_request(dpp
);
46 std::lock_guard l
{lock
};
48 notifier
->cb(); // drops its own ref
55 int get_ret_status() { return retcode
; }
59 std::lock_guard l
{lock
};
61 // we won't call notifier->cb() to drop its ref, so drop it here
71 class RGWAsyncRadosProcessor
{
72 deque
<RGWAsyncRadosRequest
*> m_req_queue
;
73 std::atomic
<bool> going_down
= { false };
77 Throttle req_throttle
;
79 struct RGWWQ
: public DoutPrefixProvider
, public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
80 RGWAsyncRadosProcessor
*processor
;
81 RGWWQ(RGWAsyncRadosProcessor
*p
,
82 ceph::timespan timeout
, ceph::timespan suicide_timeout
,
84 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
86 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
87 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
90 bool _empty() override
;
91 RGWAsyncRadosRequest
*_dequeue() override
;
92 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
93 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
95 void _clear() override
{
96 ceph_assert(processor
->m_req_queue
.empty());
99 CephContext
*get_cct() const { return processor
->cct
; }
100 unsigned get_subsys() const { return ceph_subsys_rgw
; }
101 std::ostream
& gen_prefix(std::ostream
& out
) const { return out
<< "rgw async rados processor: ";}
106 RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
);
107 ~RGWAsyncRadosProcessor() {}
110 void handle_request(const DoutPrefixProvider
*dpp
, RGWAsyncRadosRequest
*req
);
111 void queue(RGWAsyncRadosRequest
*req
);
113 bool is_going_down() {
120 class RGWSimpleWriteOnlyAsyncCR
: public RGWSimpleCoroutine
{
121 RGWAsyncRadosProcessor
*async_rados
;
122 rgw::sal::RGWRadosStore
*store
;
125 const DoutPrefixProvider
*dpp
;
127 class Request
: public RGWAsyncRadosRequest
{
128 rgw::sal::RGWRadosStore
*store
;
130 const DoutPrefixProvider
*dpp
;
132 int _send_request(const DoutPrefixProvider
*dpp
) override
;
134 Request(RGWCoroutine
*caller
,
135 RGWAioCompletionNotifier
*cn
,
136 rgw::sal::RGWRadosStore
*store
,
138 const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
),
145 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
146 rgw::sal::RGWRadosStore
*_store
,
148 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
149 async_rados(_async_rados
),
154 ~RGWSimpleWriteOnlyAsyncCR() override
{
157 void request_cleanup() override
{
164 int send_request(const DoutPrefixProvider
*dpp
) override
{
165 req
= new Request(this,
166 stack
->create_completion_notifier(),
171 async_rados
->queue(req
);
174 int request_complete() override
{
175 return req
->get_ret_status();
180 template <class P
, class R
>
181 class RGWSimpleAsyncCR
: public RGWSimpleCoroutine
{
182 RGWAsyncRadosProcessor
*async_rados
;
183 rgw::sal::RGWRadosStore
*store
;
186 std::shared_ptr
<R
> result
;
187 const DoutPrefixProvider
*dpp
;
189 class Request
: public RGWAsyncRadosRequest
{
190 rgw::sal::RGWRadosStore
*store
;
192 std::shared_ptr
<R
> result
;
193 const DoutPrefixProvider
*dpp
;
195 int _send_request(const DoutPrefixProvider
*dpp
) override
;
197 Request(const DoutPrefixProvider
*dpp
,
198 RGWCoroutine
*caller
,
199 RGWAioCompletionNotifier
*cn
,
200 rgw::sal::RGWRadosStore
*_store
,
202 std::shared_ptr
<R
>& _result
,
203 const DoutPrefixProvider
*_dpp
) : RGWAsyncRadosRequest(caller
, cn
),
211 RGWSimpleAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
212 rgw::sal::RGWRadosStore
*_store
,
214 std::shared_ptr
<R
>& _result
,
215 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
216 async_rados(_async_rados
),
222 ~RGWSimpleAsyncCR() override
{
225 void request_cleanup() override
{
232 int send_request(const DoutPrefixProvider
*dpp
) override
{
233 req
= new Request(dpp
,
235 stack
->create_completion_notifier(),
241 async_rados
->queue(req
);
244 int request_complete() override
{
245 return req
->get_ret_status();
249 class RGWGenericAsyncCR
: public RGWSimpleCoroutine
{
250 RGWAsyncRadosProcessor
*async_rados
;
251 rgw::sal::RGWRadosStore
*store
;
258 virtual int operate() = 0;
262 std::shared_ptr
<Action
> action
;
264 class Request
: public RGWAsyncRadosRequest
{
265 std::shared_ptr
<Action
> action
;
267 int _send_request(const DoutPrefixProvider
*dpp
) override
{
271 return action
->operate();
274 Request(const DoutPrefixProvider
*dpp
,
275 RGWCoroutine
*caller
,
276 RGWAioCompletionNotifier
*cn
,
277 std::shared_ptr
<Action
>& _action
) : RGWAsyncRadosRequest(caller
, cn
),
282 RGWGenericAsyncCR(CephContext
*_cct
,
283 RGWAsyncRadosProcessor
*_async_rados
,
284 std::shared_ptr
<Action
>& _action
) : RGWSimpleCoroutine(_cct
),
285 async_rados(_async_rados
),
288 RGWGenericAsyncCR(CephContext
*_cct
,
289 RGWAsyncRadosProcessor
*_async_rados
,
290 std::shared_ptr
<T
>& _action
) : RGWSimpleCoroutine(_cct
),
291 async_rados(_async_rados
),
292 action(std::static_pointer_cast
<Action
>(_action
)) {}
294 ~RGWGenericAsyncCR() override
{
297 void request_cleanup() override
{
304 int send_request(const DoutPrefixProvider
*dpp
) override
{
305 req
= new Request(dpp
, this,
306 stack
->create_completion_notifier(),
309 async_rados
->queue(req
);
312 int request_complete() override
{
313 return req
->get_ret_status();
318 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
319 const DoutPrefixProvider
*dpp
;
320 RGWSysObjectCtx obj_ctx
;
322 const bool want_attrs
;
323 const bool raw_attrs
;
325 int _send_request(const DoutPrefixProvider
*dpp
) override
;
327 RGWAsyncGetSystemObj(const DoutPrefixProvider
*dpp
,
328 RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
329 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
330 bool want_attrs
, bool raw_attrs
);
333 map
<string
, bufferlist
> attrs
;
334 RGWObjVersionTracker objv_tracker
;
337 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
338 const DoutPrefixProvider
*dpp
;
345 int _send_request(const DoutPrefixProvider
*dpp
) override
;
347 RGWAsyncPutSystemObj(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
,
348 RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
349 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
350 bool _exclusive
, bufferlist _bl
);
352 RGWObjVersionTracker objv_tracker
;
355 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
356 const DoutPrefixProvider
*dpp
;
359 map
<string
, bufferlist
> attrs
;
362 int _send_request(const DoutPrefixProvider
*dpp
) override
;
364 RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
365 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
366 map
<string
, bufferlist
> _attrs
);
368 RGWObjVersionTracker objv_tracker
;
371 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
372 rgw::sal::RGWRadosStore
*store
;
376 uint32_t duration_secs
;
379 int _send_request(const DoutPrefixProvider
*dpp
) override
;
381 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
382 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
383 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
);
386 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
387 rgw::sal::RGWRadosStore
*store
;
393 int _send_request(const DoutPrefixProvider
*dpp
) override
;
395 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
396 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
397 const string
& _name
, const string
& _cookie
);
401 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
402 const DoutPrefixProvider
*dpp
;
403 RGWAsyncRadosProcessor
*async_rados
;
408 /// on ENOENT, call handle_data() with an empty object instead of failing
409 const bool empty_on_enoent
;
410 RGWObjVersionTracker
*objv_tracker
;
411 RGWAsyncGetSystemObj
*req
{nullptr};
414 RGWSimpleRadosReadCR(const DoutPrefixProvider
*_dpp
,
415 RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
416 const rgw_raw_obj
& _obj
,
417 T
*_result
, bool empty_on_enoent
= true,
418 RGWObjVersionTracker
*objv_tracker
= nullptr)
419 : RGWSimpleCoroutine(_svc
->ctx()), dpp(_dpp
), async_rados(_async_rados
), svc(_svc
),
420 obj(_obj
), result(_result
),
421 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
422 ~RGWSimpleRadosReadCR() override
{
426 void request_cleanup() override
{
433 int send_request(const DoutPrefixProvider
*dpp
) override
;
434 int request_complete() override
;
436 virtual int handle_data(T
& data
) {
442 int RGWSimpleRadosReadCR
<T
>::send_request(const DoutPrefixProvider
*dpp
)
444 req
= new RGWAsyncGetSystemObj(dpp
, this, stack
->create_completion_notifier(), svc
,
445 objv_tracker
, obj
, false, false);
446 async_rados
->queue(req
);
451 int RGWSimpleRadosReadCR
<T
>::request_complete()
453 int ret
= req
->get_ret_status();
455 if (ret
== -ENOENT
&& empty_on_enoent
) {
462 auto iter
= req
->bl
.cbegin();
464 // allow successful reads with empty buffers. ReadSyncStatus coroutines
465 // depend on this to be able to read without locking, because the
466 // cls lock from InitSyncStatus will create an empty object if it didn't
470 decode(*result
, iter
);
472 } catch (buffer::error
& err
) {
477 return handle_data(*result
);
480 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
481 const DoutPrefixProvider
*dpp
;
482 RGWAsyncRadosProcessor
*async_rados
;
486 map
<string
, bufferlist
> *pattrs
;
488 RGWObjVersionTracker
* objv_tracker
;
489 RGWAsyncGetSystemObj
*req
= nullptr;
492 RGWSimpleRadosReadAttrsCR(const DoutPrefixProvider
*_dpp
, RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
493 const rgw_raw_obj
& _obj
, map
<string
, bufferlist
> *_pattrs
,
494 bool _raw_attrs
, RGWObjVersionTracker
* objv_tracker
= nullptr)
495 : RGWSimpleCoroutine(_svc
->ctx()),
497 async_rados(_async_rados
), svc(_svc
),
500 raw_attrs(_raw_attrs
),
501 objv_tracker(objv_tracker
)
503 ~RGWSimpleRadosReadAttrsCR() override
{
507 void request_cleanup() override
{
514 int send_request(const DoutPrefixProvider
*dpp
) override
;
515 int request_complete() override
;
519 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
520 const DoutPrefixProvider
*dpp
;
521 RGWAsyncRadosProcessor
*async_rados
;
525 RGWObjVersionTracker
*objv_tracker
;
526 RGWAsyncPutSystemObj
*req
{nullptr};
529 RGWSimpleRadosWriteCR(const DoutPrefixProvider
*_dpp
,
530 RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
531 const rgw_raw_obj
& _obj
,
532 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
533 : RGWSimpleCoroutine(_svc
->ctx()), dpp(_dpp
), async_rados(_async_rados
),
534 svc(_svc
), obj(_obj
), objv_tracker(objv_tracker
) {
538 ~RGWSimpleRadosWriteCR() override
{
542 void request_cleanup() override
{
549 int send_request(const DoutPrefixProvider
*dpp
) override
{
550 req
= new RGWAsyncPutSystemObj(dpp
, this, stack
->create_completion_notifier(),
551 svc
, objv_tracker
, obj
, false, std::move(bl
));
552 async_rados
->queue(req
);
556 int request_complete() override
{
557 if (objv_tracker
) { // copy the updated version
558 *objv_tracker
= req
->objv_tracker
;
560 return req
->get_ret_status();
564 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
565 const DoutPrefixProvider
*dpp
;
566 RGWAsyncRadosProcessor
*async_rados
;
568 RGWObjVersionTracker
*objv_tracker
;
571 map
<string
, bufferlist
> attrs
;
572 RGWAsyncPutSystemObjAttrs
*req
= nullptr;
575 RGWSimpleRadosWriteAttrsCR(const DoutPrefixProvider
*_dpp
,
576 RGWAsyncRadosProcessor
*_async_rados
,
577 RGWSI_SysObj
*_svc
, const rgw_raw_obj
& _obj
,
578 map
<string
, bufferlist
> _attrs
,
579 RGWObjVersionTracker
*objv_tracker
= nullptr)
580 : RGWSimpleCoroutine(_svc
->ctx()), dpp(_dpp
), async_rados(_async_rados
),
581 svc(_svc
), objv_tracker(objv_tracker
), obj(_obj
),
582 attrs(std::move(_attrs
)) {
584 ~RGWSimpleRadosWriteAttrsCR() override
{
588 void request_cleanup() override
{
595 int send_request(const DoutPrefixProvider
*dpp
) override
{
596 req
= new RGWAsyncPutSystemObjAttrs(dpp
, this, stack
->create_completion_notifier(),
597 svc
, objv_tracker
, obj
, std::move(attrs
));
598 async_rados
->queue(req
);
602 int request_complete() override
{
603 if (objv_tracker
) { // copy the updated version
604 *objv_tracker
= req
->objv_tracker
;
606 return req
->get_ret_status();
610 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
611 rgw::sal::RGWRadosStore
*store
;
612 map
<string
, bufferlist
> entries
;
618 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
621 RGWRadosSetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
622 const rgw_raw_obj
& _obj
,
623 map
<string
, bufferlist
>& _entries
);
625 int send_request(const DoutPrefixProvider
*dpp
) override
;
626 int request_complete() override
;
629 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
633 std::set
<std::string
> entries
;
636 using ResultPtr
= std::shared_ptr
<Result
>;
638 RGWRadosGetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
, const rgw_raw_obj
& _obj
,
639 const string
& _marker
, int _max_entries
,
642 int send_request(const DoutPrefixProvider
*dpp
) override
;
643 int request_complete() override
;
646 rgw::sal::RGWRadosStore
*store
;
651 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
654 class RGWRadosGetOmapValsCR
: public RGWSimpleCoroutine
{
658 std::map
<std::string
, bufferlist
> entries
;
661 using ResultPtr
= std::shared_ptr
<Result
>;
663 RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore
*_store
, const rgw_raw_obj
& _obj
,
664 const string
& _marker
, int _max_entries
,
667 int send_request(const DoutPrefixProvider
*dpp
) override
;
668 int request_complete() override
;
671 rgw::sal::RGWRadosStore
*store
;
676 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
679 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
680 rgw::sal::RGWRadosStore
*store
;
688 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
691 RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
692 const rgw_raw_obj
& _obj
,
693 const set
<string
>& _keys
);
695 int send_request(const DoutPrefixProvider
*dpp
) override
;
697 int request_complete() override
;
700 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
701 rgw::sal::RGWRadosStore
*store
;
702 librados::IoCtx ioctx
;
703 const rgw_raw_obj obj
;
704 RGWObjVersionTracker
* objv_tracker
;
705 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
708 RGWRadosRemoveCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
709 RGWObjVersionTracker
* objv_tracker
= nullptr);
711 int send_request(const DoutPrefixProvider
*dpp
) override
;
712 int request_complete() override
;
715 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
716 RGWAsyncRadosProcessor
*async_rados
;
717 rgw::sal::RGWRadosStore
*store
;
724 RGWAsyncLockSystemObj
*req
;
727 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
728 const rgw_raw_obj
& _obj
,
729 const string
& _lock_name
,
730 const string
& _cookie
,
732 ~RGWSimpleRadosLockCR() override
{
735 void request_cleanup() override
;
737 int send_request(const DoutPrefixProvider
*dpp
) override
;
738 int request_complete() override
;
740 static std::string
gen_random_cookie(CephContext
* cct
) {
741 #define COOKIE_LEN 16
742 char buf
[COOKIE_LEN
+ 1];
743 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
748 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
749 RGWAsyncRadosProcessor
*async_rados
;
750 rgw::sal::RGWRadosStore
*store
;
756 RGWAsyncUnlockSystemObj
*req
;
759 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
760 const rgw_raw_obj
& _obj
,
761 const string
& _lock_name
,
762 const string
& _cookie
);
763 ~RGWSimpleRadosUnlockCR() override
{
766 void request_cleanup() override
;
768 int send_request(const DoutPrefixProvider
*dpp
) override
;
769 int request_complete() override
;
772 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
774 class RGWOmapAppend
: public RGWConsumerCR
<string
> {
775 RGWAsyncRadosProcessor
*async_rados
;
776 rgw::sal::RGWRadosStore
*store
;
782 int num_pending_entries
;
783 list
<string
> pending_entries
;
785 map
<string
, bufferlist
> entries
;
787 uint64_t window_size
;
788 uint64_t total_entries
;
790 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
791 const rgw_raw_obj
& _obj
,
792 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
793 int operate(const DoutPrefixProvider
*dpp
) override
;
794 void flush_pending();
795 bool append(const string
& s
);
798 uint64_t get_total_entries() {
799 return total_entries
;
802 const rgw_raw_obj
& get_obj() {
807 class RGWShardedOmapCRManager
{
808 RGWAsyncRadosProcessor
*async_rados
;
809 rgw::sal::RGWRadosStore
*store
;
814 vector
<RGWOmapAppend
*> shards
;
816 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const string
& oid_prefix
)
817 : async_rados(_async_rados
),
818 store(_store
), op(_op
), num_shards(_num_shards
) {
819 shards
.reserve(num_shards
);
820 for (int i
= 0; i
< num_shards
; ++i
) {
821 char buf
[oid_prefix
.size() + 16];
822 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
823 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
825 shards
.push_back(shard
);
826 op
->spawn(shard
, false);
830 ~RGWShardedOmapCRManager() {
831 for (auto shard
: shards
) {
836 bool append(const string
& entry
, int shard_id
) {
837 return shards
[shard_id
]->append(entry
);
841 for (vector
<RGWOmapAppend
*>::iterator iter
= shards
.begin(); iter
!= shards
.end(); ++iter
) {
842 success
&= ((*iter
)->finish() && (!(*iter
)->is_error()));
847 uint64_t get_total_entries(int shard_id
) {
848 return shards
[shard_id
]->get_total_entries();
852 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
853 rgw::sal::RGWRadosStore
*store
;
855 const DoutPrefixProvider
*dpp
;
858 int _send_request(const DoutPrefixProvider
*dpp
) override
;
860 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
861 rgw::sal::RGWRadosStore
*_store
, const rgw_bucket
& bucket
,
862 const DoutPrefixProvider
*dpp
)
863 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), bucket(bucket
), dpp(dpp
) {}
865 RGWBucketInfo bucket_info
;
866 map
<string
, bufferlist
> attrs
;
869 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
870 RGWAsyncRadosProcessor
*async_rados
;
871 rgw::sal::RGWRadosStore
*store
;
873 RGWBucketInfo
*bucket_info
;
874 map
<string
, bufferlist
> *pattrs
;
875 const DoutPrefixProvider
*dpp
;
877 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
880 // rgw_bucket constructor
881 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
882 const rgw_bucket
& _bucket
, RGWBucketInfo
*_bucket_info
,
883 map
<string
, bufferlist
> *_pattrs
, const DoutPrefixProvider
*dpp
)
884 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
885 bucket(_bucket
), bucket_info(_bucket_info
), pattrs(_pattrs
), dpp(dpp
) {}
886 ~RGWGetBucketInstanceInfoCR() override
{
889 void request_cleanup() override
{
896 int send_request(const DoutPrefixProvider
*dpp
) override
{
897 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, bucket
, dpp
);
898 async_rados
->queue(req
);
901 int request_complete() override
{
903 *bucket_info
= std::move(req
->bucket_info
);
906 *pattrs
= std::move(req
->attrs
);
908 return req
->get_ret_status();
912 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
913 RGWRados::BucketShard bs
;
914 std::string start_marker
;
915 std::string end_marker
;
916 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
918 RGWRadosBILogTrimCR(const DoutPrefixProvider
*dpp
,
919 rgw::sal::RGWRadosStore
*store
, const RGWBucketInfo
& bucket_info
,
920 int shard_id
, const std::string
& start_marker
,
921 const std::string
& end_marker
);
923 int send_request(const DoutPrefixProvider
*dpp
) override
;
924 int request_complete() override
;
927 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
928 rgw::sal::RGWRadosStore
*store
;
929 rgw_zone_id source_zone
;
931 std::optional
<rgw_user
> user_id
;
933 rgw_bucket src_bucket
;
934 std::optional
<rgw_placement_rule
> dest_placement_rule
;
935 RGWBucketInfo dest_bucket_info
;
938 std::optional
<rgw_obj_key
> dest_key
;
939 std::optional
<uint64_t> versioned_epoch
;
944 std::shared_ptr
<RGWFetchObjFilter
> filter
;
945 rgw_zone_set zones_trace
;
946 PerfCounters
* counters
;
947 const DoutPrefixProvider
*dpp
;
950 int _send_request(const DoutPrefixProvider
*dpp
) override
;
952 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
953 const rgw_zone_id
& _source_zone
,
954 std::optional
<rgw_user
>& _user_id
,
955 const rgw_bucket
& _src_bucket
,
956 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
957 const RGWBucketInfo
& _dest_bucket_info
,
958 const rgw_obj_key
& _key
,
959 const std::optional
<rgw_obj_key
>& _dest_key
,
960 std::optional
<uint64_t> _versioned_epoch
,
962 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
963 rgw_zone_set
*_zones_trace
,
964 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
965 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
966 source_zone(_source_zone
),
968 src_bucket(_src_bucket
),
969 dest_placement_rule(_dest_placement_rule
),
970 dest_bucket_info(_dest_bucket_info
),
973 versioned_epoch(_versioned_epoch
),
974 copy_if_newer(_if_newer
),
980 zones_trace
= *_zones_trace
;
985 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
987 RGWAsyncRadosProcessor
*async_rados
;
988 rgw::sal::RGWRadosStore
*store
;
989 rgw_zone_id source_zone
;
991 std::optional
<rgw_user
> user_id
;
993 rgw_bucket src_bucket
;
994 std::optional
<rgw_placement_rule
> dest_placement_rule
;
995 RGWBucketInfo dest_bucket_info
;
998 std::optional
<rgw_obj_key
> dest_key
;
999 std::optional
<uint64_t> versioned_epoch
;
1001 real_time src_mtime
;
1005 std::shared_ptr
<RGWFetchObjFilter
> filter
;
1007 RGWAsyncFetchRemoteObj
*req
;
1008 rgw_zone_set
*zones_trace
;
1009 PerfCounters
* counters
;
1010 const DoutPrefixProvider
*dpp
;
1013 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1014 const rgw_zone_id
& _source_zone
,
1015 std::optional
<rgw_user
> _user_id
,
1016 const rgw_bucket
& _src_bucket
,
1017 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
1018 const RGWBucketInfo
& _dest_bucket_info
,
1019 const rgw_obj_key
& _key
,
1020 const std::optional
<rgw_obj_key
>& _dest_key
,
1021 std::optional
<uint64_t> _versioned_epoch
,
1023 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
1024 rgw_zone_set
*_zones_trace
,
1025 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
1026 : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1027 async_rados(_async_rados
), store(_store
),
1028 source_zone(_source_zone
),
1030 src_bucket(_src_bucket
),
1031 dest_placement_rule(_dest_placement_rule
),
1032 dest_bucket_info(_dest_bucket_info
),
1034 dest_key(_dest_key
),
1035 versioned_epoch(_versioned_epoch
),
1036 copy_if_newer(_if_newer
),
1039 zones_trace(_zones_trace
), counters(counters
), dpp(dpp
) {}
1042 ~RGWFetchRemoteObjCR() override
{
1046 void request_cleanup() override
{
1053 int send_request(const DoutPrefixProvider
*dpp
) override
{
1054 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
,
1055 source_zone
, user_id
, src_bucket
, dest_placement_rule
, dest_bucket_info
,
1056 key
, dest_key
, versioned_epoch
, copy_if_newer
, filter
,
1057 zones_trace
, counters
, dpp
);
1058 async_rados
->queue(req
);
1062 int request_complete() override
{
1063 return req
->get_ret_status();
1067 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
1068 rgw::sal::RGWRadosStore
*store
;
1069 rgw_zone_id source_zone
;
1071 rgw_bucket src_bucket
;
1074 ceph::real_time
*pmtime
;
1077 map
<string
, bufferlist
> *pattrs
;
1078 map
<string
, string
> *pheaders
;
1081 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1083 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
1084 const rgw_zone_id
& _source_zone
,
1085 rgw_bucket
& _src_bucket
,
1086 const rgw_obj_key
& _key
,
1087 ceph::real_time
*_pmtime
,
1090 map
<string
, bufferlist
> *_pattrs
,
1091 map
<string
, string
> *_pheaders
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1092 source_zone(_source_zone
),
1093 src_bucket(_src_bucket
),
1099 pheaders(_pheaders
) {}
1102 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
1104 RGWAsyncRadosProcessor
*async_rados
;
1105 rgw::sal::RGWRadosStore
*store
;
1106 rgw_zone_id source_zone
;
1108 rgw_bucket src_bucket
;
1111 ceph::real_time
*pmtime
;
1114 map
<string
, bufferlist
> *pattrs
;
1115 map
<string
, string
> *pheaders
;
1117 RGWAsyncStatRemoteObj
*req
;
1120 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1121 const rgw_zone_id
& _source_zone
,
1122 rgw_bucket
& _src_bucket
,
1123 const rgw_obj_key
& _key
,
1124 ceph::real_time
*_pmtime
,
1127 map
<string
, bufferlist
> *_pattrs
,
1128 map
<string
, string
> *_pheaders
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1129 async_rados(_async_rados
), store(_store
),
1130 source_zone(_source_zone
),
1131 src_bucket(_src_bucket
),
1137 pheaders(_pheaders
),
1141 ~RGWStatRemoteObjCR() override
{
1145 void request_cleanup() override
{
1152 int send_request(const DoutPrefixProvider
*dpp
) override
{
1153 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
1154 src_bucket
, key
, pmtime
, psize
, petag
, pattrs
, pheaders
);
1155 async_rados
->queue(req
);
1159 int request_complete() override
{
1160 return req
->get_ret_status();
1164 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
1165 const DoutPrefixProvider
*dpp
;
1166 rgw::sal::RGWRadosStore
*store
;
1167 rgw_zone_id source_zone
;
1169 RGWBucketInfo bucket_info
;
1173 string owner_display_name
;
1175 uint64_t versioned_epoch
;
1176 string marker_version_id
;
1179 ceph::real_time timestamp
;
1180 rgw_zone_set zones_trace
;
1183 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1185 RGWAsyncRemoveObj(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
1186 rgw::sal::RGWRadosStore
*_store
,
1187 const rgw_zone_id
& _source_zone
,
1188 RGWBucketInfo
& _bucket_info
,
1189 const rgw_obj_key
& _key
,
1190 const string
& _owner
,
1191 const string
& _owner_display_name
,
1193 uint64_t _versioned_epoch
,
1194 bool _delete_marker
,
1196 real_time
& _timestamp
,
1197 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), store(_store
),
1198 source_zone(_source_zone
),
1199 bucket_info(_bucket_info
),
1202 owner_display_name(_owner_display_name
),
1203 versioned(_versioned
),
1204 versioned_epoch(_versioned_epoch
),
1205 del_if_older(_if_older
),
1206 timestamp(_timestamp
) {
1207 if (_delete_marker
) {
1208 marker_version_id
= key
.instance
;
1212 zones_trace
= *_zones_trace
;
1217 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
1218 const DoutPrefixProvider
*dpp
;
1220 RGWAsyncRadosProcessor
*async_rados
;
1221 rgw::sal::RGWRadosStore
*store
;
1222 rgw_zone_id source_zone
;
1224 RGWBucketInfo bucket_info
;
1228 uint64_t versioned_epoch
;
1231 string owner_display_name
;
1234 real_time timestamp
;
1236 RGWAsyncRemoveObj
*req
;
1238 rgw_zone_set
*zones_trace
;
1241 RGWRemoveObjCR(const DoutPrefixProvider
*_dpp
, RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1242 const rgw_zone_id
& _source_zone
,
1243 RGWBucketInfo
& _bucket_info
,
1244 const rgw_obj_key
& _key
,
1246 uint64_t _versioned_epoch
,
1248 string
*_owner_display_name
,
1249 bool _delete_marker
,
1250 real_time
*_timestamp
,
1251 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), dpp(_dpp
), cct(_store
->ctx()),
1252 async_rados(_async_rados
), store(_store
),
1253 source_zone(_source_zone
),
1254 bucket_info(_bucket_info
),
1256 versioned(_versioned
),
1257 versioned_epoch(_versioned_epoch
),
1258 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1259 del_if_older
= (_timestamp
!= NULL
);
1261 timestamp
= *_timestamp
;
1268 if (_owner_display_name
) {
1269 owner_display_name
= *_owner_display_name
;
1272 ~RGWRemoveObjCR() override
{
1276 void request_cleanup() override
{
1283 int send_request(const DoutPrefixProvider
*dpp
) override
{
1284 req
= new RGWAsyncRemoveObj(dpp
, this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1285 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1286 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1287 async_rados
->queue(req
);
1291 int request_complete() override
{
1292 return req
->get_ret_status();
1296 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1297 RGWAsyncRadosProcessor
*async_rados
;
1298 rgw::sal::RGWRadosStore
*store
;
1300 const rgw_raw_obj obj
;
1302 const string lock_name
;
1303 const string cookie
;
1306 bool going_down
{ false };
1309 RGWCoroutine
*caller
;
1311 bool aborted
{false};
1314 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
1315 const rgw_raw_obj
& _obj
,
1316 const string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1317 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1318 obj(_obj
), lock_name(_lock_name
),
1319 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1320 interval(_interval
), caller(_caller
)
1323 int operate(const DoutPrefixProvider
*dpp
) override
;
1325 bool is_locked() const {
1329 void set_locked(bool status
) {
1343 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1344 const DoutPrefixProvider
*dpp
;
1345 rgw::sal::RGWRadosStore
*store
;
1346 list
<cls_log_entry
> entries
;
1350 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1353 RGWRadosTimelogAddCR(const DoutPrefixProvider
*dpp
, rgw::sal::RGWRadosStore
*_store
, const string
& _oid
,
1354 const cls_log_entry
& entry
);
1356 int send_request(const DoutPrefixProvider
*dpp
) override
;
1357 int request_complete() override
;
1360 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1361 const DoutPrefixProvider
*dpp
;
1362 rgw::sal::RGWRadosStore
*store
;
1363 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1366 real_time start_time
;
1368 std::string from_marker
;
1369 std::string to_marker
;
1372 RGWRadosTimelogTrimCR(const DoutPrefixProvider
*dpp
,
1373 rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
1374 const real_time
& start_time
, const real_time
& end_time
,
1375 const std::string
& from_marker
,
1376 const std::string
& to_marker
);
1378 int send_request(const DoutPrefixProvider
*dpp
) override
;
1379 int request_complete() override
;
1382 // wrapper to update last_trim_marker on success
1383 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1385 std::string
*last_trim_marker
;
1387 static constexpr const char* max_marker
= "99999999";
1389 RGWSyncLogTrimCR(const DoutPrefixProvider
*dpp
,
1390 rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
1391 const std::string
& to_marker
, std::string
*last_trim_marker
);
1392 int request_complete() override
;
1395 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1396 const DoutPrefixProvider
*dpp
;
1397 rgw::sal::RGWRadosStore
*store
;
1398 RGWBucketInfo bucket_info
;
1403 RGWObjVersionTracker
*objv_tracker
;
1405 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1407 RGWAsyncStatObj(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*store
,
1408 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1409 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1410 RGWObjVersionTracker
*objv_tracker
= nullptr)
1411 : RGWAsyncRadosRequest(caller
, cn
), dpp(dpp
), store(store
), obj(obj
), psize(psize
),
1412 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1415 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1416 const DoutPrefixProvider
*dpp
;
1417 rgw::sal::RGWRadosStore
*store
;
1418 RGWAsyncRadosProcessor
*async_rados
;
1419 RGWBucketInfo bucket_info
;
1424 RGWObjVersionTracker
*objv_tracker
;
1425 RGWAsyncStatObj
*req
= nullptr;
1427 RGWStatObjCR(const DoutPrefixProvider
*dpp
, RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RGWRadosStore
*store
,
1428 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1429 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1430 RGWObjVersionTracker
*objv_tracker
= nullptr);
1431 ~RGWStatObjCR() override
{
1434 void request_cleanup() override
;
1436 int send_request(const DoutPrefixProvider
*dpp
) override
;
1437 int request_complete() override
;
1440 /// coroutine wrapper for IoCtx::aio_notify()
1441 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1442 rgw::sal::RGWRadosStore
*const store
;
1443 const rgw_raw_obj obj
;
1445 const uint64_t timeout_ms
;
1446 bufferlist
*response
;
1448 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1451 RGWRadosNotifyCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
1452 bufferlist
& request
, uint64_t timeout_ms
,
1453 bufferlist
*response
);
1455 int send_request(const DoutPrefixProvider
*dpp
) override
;
1456 int request_complete() override
;