1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_CR_RADOS_H
5 #define CEPH_RGW_CR_RADOS_H
7 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h"
9 #include "rgw_coroutine.h"
11 #include "rgw_sal_rados.h"
12 #include "common/WorkQueue.h"
13 #include "common/Throttle.h"
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_bucket.h"
20 #define dout_subsys ceph_subsys_rgw
22 class RGWAsyncRadosRequest
: public RefCountedObject
{
24 RGWAioCompletionNotifier
*notifier
;
28 ceph::mutex lock
= ceph::make_mutex("RGWAsyncRadosRequest::lock");
31 virtual int _send_request(const DoutPrefixProvider
*dpp
) = 0;
33 RGWAsyncRadosRequest(RGWCoroutine
*_caller
, RGWAioCompletionNotifier
*_cn
)
34 : caller(_caller
), notifier(_cn
), retcode(0) {
36 ~RGWAsyncRadosRequest() override
{
42 void send_request(const DoutPrefixProvider
*dpp
) {
44 retcode
= _send_request(dpp
);
46 std::lock_guard l
{lock
};
48 notifier
->cb(); // drops its own ref
55 int get_ret_status() { return retcode
; }
59 std::lock_guard l
{lock
};
61 // we won't call notifier->cb() to drop its ref, so drop it here
71 class RGWAsyncRadosProcessor
{
72 std::deque
<RGWAsyncRadosRequest
*> m_req_queue
;
73 std::atomic
<bool> going_down
= { false };
77 Throttle req_throttle
;
79 struct RGWWQ
: public DoutPrefixProvider
, public ThreadPool::WorkQueue
<RGWAsyncRadosRequest
> {
80 RGWAsyncRadosProcessor
*processor
;
81 RGWWQ(RGWAsyncRadosProcessor
*p
,
82 ceph::timespan timeout
, ceph::timespan suicide_timeout
,
84 : ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>("RGWWQ", timeout
, suicide_timeout
, tp
), processor(p
) {}
86 bool _enqueue(RGWAsyncRadosRequest
*req
) override
;
87 void _dequeue(RGWAsyncRadosRequest
*req
) override
{
90 bool _empty() override
;
91 RGWAsyncRadosRequest
*_dequeue() override
;
92 using ThreadPool::WorkQueue
<RGWAsyncRadosRequest
>::_process
;
93 void _process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) override
;
95 void _clear() override
{
96 ceph_assert(processor
->m_req_queue
.empty());
99 CephContext
*get_cct() const { return processor
->cct
; }
100 unsigned get_subsys() const { return ceph_subsys_rgw
; }
101 std::ostream
& gen_prefix(std::ostream
& out
) const { return out
<< "rgw async rados processor: ";}
106 RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
);
107 ~RGWAsyncRadosProcessor() {}
110 void handle_request(const DoutPrefixProvider
*dpp
, RGWAsyncRadosRequest
*req
);
111 void queue(RGWAsyncRadosRequest
*req
);
113 bool is_going_down() {
120 class RGWSimpleWriteOnlyAsyncCR
: public RGWSimpleCoroutine
{
121 RGWAsyncRadosProcessor
*async_rados
;
122 rgw::sal::RadosStore
* store
;
125 const DoutPrefixProvider
*dpp
;
127 class Request
: public RGWAsyncRadosRequest
{
128 rgw::sal::RadosStore
* store
;
130 const DoutPrefixProvider
*dpp
;
132 int _send_request(const DoutPrefixProvider
*dpp
) override
;
134 Request(RGWCoroutine
*caller
,
135 RGWAioCompletionNotifier
*cn
,
136 rgw::sal::RadosStore
* store
,
138 const DoutPrefixProvider
*dpp
) : RGWAsyncRadosRequest(caller
, cn
),
145 RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
146 rgw::sal::RadosStore
* _store
,
148 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
149 async_rados(_async_rados
),
154 ~RGWSimpleWriteOnlyAsyncCR() override
{
157 void request_cleanup() override
{
164 int send_request(const DoutPrefixProvider
*dpp
) override
{
165 req
= new Request(this,
166 stack
->create_completion_notifier(),
171 async_rados
->queue(req
);
174 int request_complete() override
{
175 return req
->get_ret_status();
180 template <class P
, class R
>
181 class RGWSimpleAsyncCR
: public RGWSimpleCoroutine
{
182 RGWAsyncRadosProcessor
*async_rados
;
183 rgw::sal::RadosStore
* store
;
186 std::shared_ptr
<R
> result
;
187 const DoutPrefixProvider
*dpp
;
189 class Request
: public RGWAsyncRadosRequest
{
190 rgw::sal::RadosStore
* store
;
192 std::shared_ptr
<R
> result
;
193 const DoutPrefixProvider
*dpp
;
195 int _send_request(const DoutPrefixProvider
*dpp
) override
;
197 Request(const DoutPrefixProvider
*dpp
,
198 RGWCoroutine
*caller
,
199 RGWAioCompletionNotifier
*cn
,
200 rgw::sal::RadosStore
* _store
,
202 std::shared_ptr
<R
>& _result
,
203 const DoutPrefixProvider
*_dpp
) : RGWAsyncRadosRequest(caller
, cn
),
211 RGWSimpleAsyncCR(RGWAsyncRadosProcessor
*_async_rados
,
212 rgw::sal::RadosStore
* _store
,
214 std::shared_ptr
<R
>& _result
,
215 const DoutPrefixProvider
*_dpp
) : RGWSimpleCoroutine(_store
->ctx()),
216 async_rados(_async_rados
),
222 ~RGWSimpleAsyncCR() override
{
225 void request_cleanup() override
{
232 int send_request(const DoutPrefixProvider
*dpp
) override
{
233 req
= new Request(dpp
,
235 stack
->create_completion_notifier(),
241 async_rados
->queue(req
);
244 int request_complete() override
{
245 return req
->get_ret_status();
249 class RGWGenericAsyncCR
: public RGWSimpleCoroutine
{
250 RGWAsyncRadosProcessor
*async_rados
;
251 rgw::sal::RadosStore
* store
;
258 virtual int operate() = 0;
262 std::shared_ptr
<Action
> action
;
264 class Request
: public RGWAsyncRadosRequest
{
265 std::shared_ptr
<Action
> action
;
267 int _send_request(const DoutPrefixProvider
*dpp
) override
{
271 return action
->operate();
274 Request(const DoutPrefixProvider
*dpp
,
275 RGWCoroutine
*caller
,
276 RGWAioCompletionNotifier
*cn
,
277 std::shared_ptr
<Action
>& _action
) : RGWAsyncRadosRequest(caller
, cn
),
282 RGWGenericAsyncCR(CephContext
*_cct
,
283 RGWAsyncRadosProcessor
*_async_rados
,
284 std::shared_ptr
<Action
>& _action
) : RGWSimpleCoroutine(_cct
),
285 async_rados(_async_rados
),
288 RGWGenericAsyncCR(CephContext
*_cct
,
289 RGWAsyncRadosProcessor
*_async_rados
,
290 std::shared_ptr
<T
>& _action
) : RGWSimpleCoroutine(_cct
),
291 async_rados(_async_rados
),
292 action(std::static_pointer_cast
<Action
>(_action
)) {}
294 ~RGWGenericAsyncCR() override
{
297 void request_cleanup() override
{
304 int send_request(const DoutPrefixProvider
*dpp
) override
{
305 req
= new Request(dpp
, this,
306 stack
->create_completion_notifier(),
309 async_rados
->queue(req
);
312 int request_complete() override
{
313 return req
->get_ret_status();
318 class RGWAsyncGetSystemObj
: public RGWAsyncRadosRequest
{
319 const DoutPrefixProvider
*dpp
;
320 RGWSysObjectCtx obj_ctx
;
322 const bool want_attrs
;
323 const bool raw_attrs
;
325 int _send_request(const DoutPrefixProvider
*dpp
) override
;
327 RGWAsyncGetSystemObj(const DoutPrefixProvider
*dpp
,
328 RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
329 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
330 bool want_attrs
, bool raw_attrs
);
333 std::map
<std::string
, bufferlist
> attrs
;
334 RGWObjVersionTracker objv_tracker
;
337 class RGWAsyncPutSystemObj
: public RGWAsyncRadosRequest
{
338 const DoutPrefixProvider
*dpp
;
345 int _send_request(const DoutPrefixProvider
*dpp
) override
;
347 RGWAsyncPutSystemObj(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
,
348 RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
349 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
350 bool _exclusive
, bufferlist _bl
);
352 RGWObjVersionTracker objv_tracker
;
355 class RGWAsyncPutSystemObjAttrs
: public RGWAsyncRadosRequest
{
356 const DoutPrefixProvider
*dpp
;
359 std::map
<std::string
, bufferlist
> attrs
;
362 int _send_request(const DoutPrefixProvider
*dpp
) override
;
364 RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
365 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
366 std::map
<std::string
, bufferlist
> _attrs
);
368 RGWObjVersionTracker objv_tracker
;
371 class RGWAsyncLockSystemObj
: public RGWAsyncRadosRequest
{
372 rgw::sal::RadosStore
* store
;
374 std::string lock_name
;
376 uint32_t duration_secs
;
379 int _send_request(const DoutPrefixProvider
*dpp
) override
;
381 RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
382 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
383 const std::string
& _name
, const std::string
& _cookie
, uint32_t _duration_secs
);
386 class RGWAsyncUnlockSystemObj
: public RGWAsyncRadosRequest
{
387 rgw::sal::RadosStore
* store
;
389 std::string lock_name
;
393 int _send_request(const DoutPrefixProvider
*dpp
) override
;
395 RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
396 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
397 const std::string
& _name
, const std::string
& _cookie
);
401 class RGWSimpleRadosReadCR
: public RGWSimpleCoroutine
{
402 const DoutPrefixProvider
*dpp
;
403 RGWAsyncRadosProcessor
*async_rados
;
408 /// on ENOENT, call handle_data() with an empty object instead of failing
409 const bool empty_on_enoent
;
410 RGWObjVersionTracker
*objv_tracker
;
411 RGWAsyncGetSystemObj
*req
{nullptr};
414 RGWSimpleRadosReadCR(const DoutPrefixProvider
*_dpp
,
415 RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
416 const rgw_raw_obj
& _obj
,
417 T
*_result
, bool empty_on_enoent
= true,
418 RGWObjVersionTracker
*objv_tracker
= nullptr)
419 : RGWSimpleCoroutine(_svc
->ctx()), dpp(_dpp
), async_rados(_async_rados
), svc(_svc
),
420 obj(_obj
), result(_result
),
421 empty_on_enoent(empty_on_enoent
), objv_tracker(objv_tracker
) {}
422 ~RGWSimpleRadosReadCR() override
{
426 void request_cleanup() override
{
433 int send_request(const DoutPrefixProvider
*dpp
) override
;
434 int request_complete() override
;
436 virtual int handle_data(T
& data
) {
442 int RGWSimpleRadosReadCR
<T
>::send_request(const DoutPrefixProvider
*dpp
)
444 req
= new RGWAsyncGetSystemObj(dpp
, this, stack
->create_completion_notifier(), svc
,
445 objv_tracker
, obj
, false, false);
446 async_rados
->queue(req
);
451 int RGWSimpleRadosReadCR
<T
>::request_complete()
453 int ret
= req
->get_ret_status();
455 if (ret
== -ENOENT
&& empty_on_enoent
) {
462 auto iter
= req
->bl
.cbegin();
464 // allow successful reads with empty buffers. ReadSyncStatus coroutines
465 // depend on this to be able to read without locking, because the
466 // cls lock from InitSyncStatus will create an empty object if it didn't
470 decode(*result
, iter
);
472 } catch (buffer::error
& err
) {
477 return handle_data(*result
);
480 class RGWSimpleRadosReadAttrsCR
: public RGWSimpleCoroutine
{
481 const DoutPrefixProvider
*dpp
;
482 RGWAsyncRadosProcessor
*async_rados
;
486 std::map
<std::string
, bufferlist
> *pattrs
;
488 RGWObjVersionTracker
* objv_tracker
;
489 RGWAsyncGetSystemObj
*req
= nullptr;
492 RGWSimpleRadosReadAttrsCR(const DoutPrefixProvider
*_dpp
, RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
493 const rgw_raw_obj
& _obj
, std::map
<std::string
, bufferlist
> *_pattrs
,
494 bool _raw_attrs
, RGWObjVersionTracker
* objv_tracker
= nullptr)
495 : RGWSimpleCoroutine(_svc
->ctx()),
497 async_rados(_async_rados
), svc(_svc
),
500 raw_attrs(_raw_attrs
),
501 objv_tracker(objv_tracker
)
503 ~RGWSimpleRadosReadAttrsCR() override
{
507 void request_cleanup() override
{
514 int send_request(const DoutPrefixProvider
*dpp
) override
;
515 int request_complete() override
;
519 class RGWSimpleRadosWriteCR
: public RGWSimpleCoroutine
{
520 const DoutPrefixProvider
*dpp
;
521 RGWAsyncRadosProcessor
*async_rados
;
525 RGWObjVersionTracker
*objv_tracker
;
526 RGWAsyncPutSystemObj
*req
{nullptr};
529 RGWSimpleRadosWriteCR(const DoutPrefixProvider
*_dpp
,
530 RGWAsyncRadosProcessor
*_async_rados
, RGWSI_SysObj
*_svc
,
531 const rgw_raw_obj
& _obj
,
532 const T
& _data
, RGWObjVersionTracker
*objv_tracker
= nullptr)
533 : RGWSimpleCoroutine(_svc
->ctx()), dpp(_dpp
), async_rados(_async_rados
),
534 svc(_svc
), obj(_obj
), objv_tracker(objv_tracker
) {
538 ~RGWSimpleRadosWriteCR() override
{
542 void request_cleanup() override
{
549 int send_request(const DoutPrefixProvider
*dpp
) override
{
550 req
= new RGWAsyncPutSystemObj(dpp
, this, stack
->create_completion_notifier(),
551 svc
, objv_tracker
, obj
, false, std::move(bl
));
552 async_rados
->queue(req
);
556 int request_complete() override
{
557 if (objv_tracker
) { // copy the updated version
558 *objv_tracker
= req
->objv_tracker
;
560 return req
->get_ret_status();
564 class RGWSimpleRadosWriteAttrsCR
: public RGWSimpleCoroutine
{
565 const DoutPrefixProvider
*dpp
;
566 RGWAsyncRadosProcessor
*async_rados
;
568 RGWObjVersionTracker
*objv_tracker
;
571 std::map
<std::string
, bufferlist
> attrs
;
572 RGWAsyncPutSystemObjAttrs
*req
= nullptr;
575 RGWSimpleRadosWriteAttrsCR(const DoutPrefixProvider
*_dpp
,
576 RGWAsyncRadosProcessor
*_async_rados
,
577 RGWSI_SysObj
*_svc
, const rgw_raw_obj
& _obj
,
578 std::map
<std::string
, bufferlist
> _attrs
,
579 RGWObjVersionTracker
*objv_tracker
= nullptr)
580 : RGWSimpleCoroutine(_svc
->ctx()), dpp(_dpp
), async_rados(_async_rados
),
581 svc(_svc
), objv_tracker(objv_tracker
), obj(_obj
),
582 attrs(std::move(_attrs
)) {
584 ~RGWSimpleRadosWriteAttrsCR() override
{
588 void request_cleanup() override
{
595 int send_request(const DoutPrefixProvider
*dpp
) override
{
596 req
= new RGWAsyncPutSystemObjAttrs(dpp
, this, stack
->create_completion_notifier(),
597 svc
, objv_tracker
, obj
, std::move(attrs
));
598 async_rados
->queue(req
);
602 int request_complete() override
{
603 if (objv_tracker
) { // copy the updated version
604 *objv_tracker
= req
->objv_tracker
;
606 return req
->get_ret_status();
610 class RGWRadosSetOmapKeysCR
: public RGWSimpleCoroutine
{
611 rgw::sal::RadosStore
* store
;
612 std::map
<std::string
, bufferlist
> entries
;
618 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
621 RGWRadosSetOmapKeysCR(rgw::sal::RadosStore
* _store
,
622 const rgw_raw_obj
& _obj
,
623 std::map
<std::string
, bufferlist
>& _entries
);
625 int send_request(const DoutPrefixProvider
*dpp
) override
;
626 int request_complete() override
;
629 class RGWRadosGetOmapKeysCR
: public RGWSimpleCoroutine
{
633 std::set
<std::string
> entries
;
636 using ResultPtr
= std::shared_ptr
<Result
>;
638 RGWRadosGetOmapKeysCR(rgw::sal::RadosStore
* _store
, const rgw_raw_obj
& _obj
,
639 const std::string
& _marker
, int _max_entries
,
642 int send_request(const DoutPrefixProvider
*dpp
) override
;
643 int request_complete() override
;
646 rgw::sal::RadosStore
* store
;
651 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
654 class RGWRadosGetOmapValsCR
: public RGWSimpleCoroutine
{
658 std::map
<std::string
, bufferlist
> entries
;
661 using ResultPtr
= std::shared_ptr
<Result
>;
663 RGWRadosGetOmapValsCR(rgw::sal::RadosStore
* _store
, const rgw_raw_obj
& _obj
,
664 const std::string
& _marker
, int _max_entries
,
667 int send_request(const DoutPrefixProvider
*dpp
) override
;
668 int request_complete() override
;
671 rgw::sal::RadosStore
* store
;
676 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
679 class RGWRadosRemoveOmapKeysCR
: public RGWSimpleCoroutine
{
680 rgw::sal::RadosStore
* store
;
684 std::set
<std::string
> keys
;
688 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
691 RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore
* _store
,
692 const rgw_raw_obj
& _obj
,
693 const std::set
<std::string
>& _keys
);
695 int send_request(const DoutPrefixProvider
*dpp
) override
;
697 int request_complete() override
;
700 class RGWRadosRemoveCR
: public RGWSimpleCoroutine
{
701 rgw::sal::RadosStore
* store
;
702 librados::IoCtx ioctx
;
703 const rgw_raw_obj obj
;
704 RGWObjVersionTracker
* objv_tracker
;
705 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
708 RGWRadosRemoveCR(rgw::sal::RadosStore
* store
, const rgw_raw_obj
& obj
,
709 RGWObjVersionTracker
* objv_tracker
= nullptr);
711 int send_request(const DoutPrefixProvider
*dpp
) override
;
712 int request_complete() override
;
715 class RGWSimpleRadosLockCR
: public RGWSimpleCoroutine
{
716 RGWAsyncRadosProcessor
*async_rados
;
717 rgw::sal::RadosStore
* store
;
718 std::string lock_name
;
724 RGWAsyncLockSystemObj
*req
;
727 RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
728 const rgw_raw_obj
& _obj
,
729 const std::string
& _lock_name
,
730 const std::string
& _cookie
,
732 ~RGWSimpleRadosLockCR() override
{
735 void request_cleanup() override
;
737 int send_request(const DoutPrefixProvider
*dpp
) override
;
738 int request_complete() override
;
740 static std::string
gen_random_cookie(CephContext
* cct
) {
741 #define COOKIE_LEN 16
742 char buf
[COOKIE_LEN
+ 1];
743 gen_rand_alphanumeric(cct
, buf
, sizeof(buf
) - 1);
748 class RGWSimpleRadosUnlockCR
: public RGWSimpleCoroutine
{
749 RGWAsyncRadosProcessor
*async_rados
;
750 rgw::sal::RadosStore
* store
;
751 std::string lock_name
;
756 RGWAsyncUnlockSystemObj
*req
;
759 RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
760 const rgw_raw_obj
& _obj
,
761 const std::string
& _lock_name
,
762 const std::string
& _cookie
);
763 ~RGWSimpleRadosUnlockCR() override
{
766 void request_cleanup() override
;
768 int send_request(const DoutPrefixProvider
*dpp
) override
;
769 int request_complete() override
;
772 #define OMAP_APPEND_MAX_ENTRIES_DEFAULT 100
774 class RGWOmapAppend
: public RGWConsumerCR
<std::string
> {
775 RGWAsyncRadosProcessor
*async_rados
;
776 rgw::sal::RadosStore
* store
;
782 int num_pending_entries
;
783 std::list
<std::string
> pending_entries
;
785 std::map
<std::string
, bufferlist
> entries
;
787 uint64_t window_size
;
788 uint64_t total_entries
;
790 RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
791 const rgw_raw_obj
& _obj
,
792 uint64_t _window_size
= OMAP_APPEND_MAX_ENTRIES_DEFAULT
);
793 int operate(const DoutPrefixProvider
*dpp
) override
;
794 void flush_pending();
795 bool append(const std::string
& s
);
798 uint64_t get_total_entries() {
799 return total_entries
;
802 const rgw_raw_obj
& get_obj() {
807 class RGWShardedOmapCRManager
{
808 RGWAsyncRadosProcessor
*async_rados
;
809 rgw::sal::RadosStore
* store
;
814 std::vector
<RGWOmapAppend
*> shards
;
816 RGWShardedOmapCRManager(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
, RGWCoroutine
*_op
, int _num_shards
, const rgw_pool
& pool
, const std::string
& oid_prefix
)
817 : async_rados(_async_rados
),
818 store(_store
), op(_op
), num_shards(_num_shards
) {
819 shards
.reserve(num_shards
);
820 for (int i
= 0; i
< num_shards
; ++i
) {
821 char buf
[oid_prefix
.size() + 16];
822 snprintf(buf
, sizeof(buf
), "%s.%d", oid_prefix
.c_str(), i
);
823 RGWOmapAppend
*shard
= new RGWOmapAppend(async_rados
, store
, rgw_raw_obj(pool
, buf
));
825 shards
.push_back(shard
);
826 op
->spawn(shard
, false);
830 ~RGWShardedOmapCRManager() {
831 for (auto shard
: shards
) {
836 bool append(const std::string
& entry
, int shard_id
) {
837 return shards
[shard_id
]->append(entry
);
841 for (auto& append_op
: shards
) {
842 success
&= (append_op
->finish() && (!append_op
->is_error()));
847 uint64_t get_total_entries(int shard_id
) {
848 return shards
[shard_id
]->get_total_entries();
852 class RGWAsyncGetBucketInstanceInfo
: public RGWAsyncRadosRequest
{
853 rgw::sal::RadosStore
* store
;
855 const DoutPrefixProvider
*dpp
;
858 int _send_request(const DoutPrefixProvider
*dpp
) override
;
860 RGWAsyncGetBucketInstanceInfo(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
861 rgw::sal::RadosStore
* _store
, const rgw_bucket
& bucket
,
862 const DoutPrefixProvider
*dpp
)
863 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), bucket(bucket
), dpp(dpp
) {}
865 RGWBucketInfo bucket_info
;
866 std::map
<std::string
, bufferlist
> attrs
;
869 class RGWGetBucketInstanceInfoCR
: public RGWSimpleCoroutine
{
870 RGWAsyncRadosProcessor
*async_rados
;
871 rgw::sal::RadosStore
* store
;
873 RGWBucketInfo
*bucket_info
;
874 std::map
<std::string
, bufferlist
> *pattrs
;
875 const DoutPrefixProvider
*dpp
;
877 RGWAsyncGetBucketInstanceInfo
*req
{nullptr};
880 // rgw_bucket constructor
881 RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
882 const rgw_bucket
& _bucket
, RGWBucketInfo
*_bucket_info
,
883 std::map
<std::string
, bufferlist
> *_pattrs
, const DoutPrefixProvider
*dpp
)
884 : RGWSimpleCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
885 bucket(_bucket
), bucket_info(_bucket_info
), pattrs(_pattrs
), dpp(dpp
) {}
886 ~RGWGetBucketInstanceInfoCR() override
{
889 void request_cleanup() override
{
896 int send_request(const DoutPrefixProvider
*dpp
) override
{
897 req
= new RGWAsyncGetBucketInstanceInfo(this, stack
->create_completion_notifier(), store
, bucket
, dpp
);
898 async_rados
->queue(req
);
901 int request_complete() override
{
903 *bucket_info
= std::move(req
->bucket_info
);
906 *pattrs
= std::move(req
->attrs
);
908 return req
->get_ret_status();
912 class RGWRadosBILogTrimCR
: public RGWSimpleCoroutine
{
913 const RGWBucketInfo
& bucket_info
;
915 RGWRados::BucketShard bs
;
916 std::string start_marker
;
917 std::string end_marker
;
918 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
920 RGWRadosBILogTrimCR(const DoutPrefixProvider
*dpp
,
921 rgw::sal::RadosStore
* store
, const RGWBucketInfo
& bucket_info
,
922 int shard_id
, const std::string
& start_marker
,
923 const std::string
& end_marker
);
925 int send_request(const DoutPrefixProvider
*dpp
) override
;
926 int request_complete() override
;
929 class RGWAsyncFetchRemoteObj
: public RGWAsyncRadosRequest
{
930 rgw::sal::RadosStore
* store
;
931 rgw_zone_id source_zone
;
933 std::optional
<rgw_user
> user_id
;
935 rgw_bucket src_bucket
;
936 std::optional
<rgw_placement_rule
> dest_placement_rule
;
937 RGWBucketInfo dest_bucket_info
;
940 std::optional
<rgw_obj_key
> dest_key
;
941 std::optional
<uint64_t> versioned_epoch
;
946 std::shared_ptr
<RGWFetchObjFilter
> filter
;
947 rgw_zone_set zones_trace
;
948 PerfCounters
* counters
;
949 const DoutPrefixProvider
*dpp
;
952 int _send_request(const DoutPrefixProvider
*dpp
) override
;
954 RGWAsyncFetchRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
955 const rgw_zone_id
& _source_zone
,
956 std::optional
<rgw_user
>& _user_id
,
957 const rgw_bucket
& _src_bucket
,
958 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
959 const RGWBucketInfo
& _dest_bucket_info
,
960 const rgw_obj_key
& _key
,
961 const std::optional
<rgw_obj_key
>& _dest_key
,
962 std::optional
<uint64_t> _versioned_epoch
,
964 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
965 rgw_zone_set
*_zones_trace
,
966 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
967 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
968 source_zone(_source_zone
),
970 src_bucket(_src_bucket
),
971 dest_placement_rule(_dest_placement_rule
),
972 dest_bucket_info(_dest_bucket_info
),
975 versioned_epoch(_versioned_epoch
),
976 copy_if_newer(_if_newer
),
982 zones_trace
= *_zones_trace
;
987 class RGWFetchRemoteObjCR
: public RGWSimpleCoroutine
{
989 RGWAsyncRadosProcessor
*async_rados
;
990 rgw::sal::RadosStore
* store
;
991 rgw_zone_id source_zone
;
993 std::optional
<rgw_user
> user_id
;
995 rgw_bucket src_bucket
;
996 std::optional
<rgw_placement_rule
> dest_placement_rule
;
997 RGWBucketInfo dest_bucket_info
;
1000 std::optional
<rgw_obj_key
> dest_key
;
1001 std::optional
<uint64_t> versioned_epoch
;
1003 real_time src_mtime
;
1007 std::shared_ptr
<RGWFetchObjFilter
> filter
;
1009 RGWAsyncFetchRemoteObj
*req
;
1010 rgw_zone_set
*zones_trace
;
1011 PerfCounters
* counters
;
1012 const DoutPrefixProvider
*dpp
;
1015 RGWFetchRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1016 const rgw_zone_id
& _source_zone
,
1017 std::optional
<rgw_user
> _user_id
,
1018 const rgw_bucket
& _src_bucket
,
1019 std::optional
<rgw_placement_rule
> _dest_placement_rule
,
1020 const RGWBucketInfo
& _dest_bucket_info
,
1021 const rgw_obj_key
& _key
,
1022 const std::optional
<rgw_obj_key
>& _dest_key
,
1023 std::optional
<uint64_t> _versioned_epoch
,
1025 std::shared_ptr
<RGWFetchObjFilter
> _filter
,
1026 rgw_zone_set
*_zones_trace
,
1027 PerfCounters
* counters
, const DoutPrefixProvider
*dpp
)
1028 : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1029 async_rados(_async_rados
), store(_store
),
1030 source_zone(_source_zone
),
1032 src_bucket(_src_bucket
),
1033 dest_placement_rule(_dest_placement_rule
),
1034 dest_bucket_info(_dest_bucket_info
),
1036 dest_key(_dest_key
),
1037 versioned_epoch(_versioned_epoch
),
1038 copy_if_newer(_if_newer
),
1041 zones_trace(_zones_trace
), counters(counters
), dpp(dpp
) {}
1044 ~RGWFetchRemoteObjCR() override
{
1048 void request_cleanup() override
{
1055 int send_request(const DoutPrefixProvider
*dpp
) override
{
1056 req
= new RGWAsyncFetchRemoteObj(this, stack
->create_completion_notifier(), store
,
1057 source_zone
, user_id
, src_bucket
, dest_placement_rule
, dest_bucket_info
,
1058 key
, dest_key
, versioned_epoch
, copy_if_newer
, filter
,
1059 zones_trace
, counters
, dpp
);
1060 async_rados
->queue(req
);
1064 int request_complete() override
{
1065 return req
->get_ret_status();
1069 class RGWAsyncStatRemoteObj
: public RGWAsyncRadosRequest
{
1070 rgw::sal::RadosStore
* store
;
1071 rgw_zone_id source_zone
;
1073 rgw_bucket src_bucket
;
1076 ceph::real_time
*pmtime
;
1079 std::map
<std::string
, bufferlist
> *pattrs
;
1080 std::map
<std::string
, std::string
> *pheaders
;
1083 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1085 RGWAsyncStatRemoteObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
1086 const rgw_zone_id
& _source_zone
,
1087 rgw_bucket
& _src_bucket
,
1088 const rgw_obj_key
& _key
,
1089 ceph::real_time
*_pmtime
,
1091 std::string
*_petag
,
1092 std::map
<std::string
, bufferlist
> *_pattrs
,
1093 std::map
<std::string
, std::string
> *_pheaders
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
1094 source_zone(_source_zone
),
1095 src_bucket(_src_bucket
),
1101 pheaders(_pheaders
) {}
1104 class RGWStatRemoteObjCR
: public RGWSimpleCoroutine
{
1106 RGWAsyncRadosProcessor
*async_rados
;
1107 rgw::sal::RadosStore
* store
;
1108 rgw_zone_id source_zone
;
1110 rgw_bucket src_bucket
;
1113 ceph::real_time
*pmtime
;
1116 std::map
<std::string
, bufferlist
> *pattrs
;
1117 std::map
<std::string
, std::string
> *pheaders
;
1119 RGWAsyncStatRemoteObj
*req
;
1122 RGWStatRemoteObjCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1123 const rgw_zone_id
& _source_zone
,
1124 rgw_bucket
& _src_bucket
,
1125 const rgw_obj_key
& _key
,
1126 ceph::real_time
*_pmtime
,
1128 std::string
*_petag
,
1129 std::map
<std::string
, bufferlist
> *_pattrs
,
1130 std::map
<std::string
, std::string
> *_pheaders
) : RGWSimpleCoroutine(_store
->ctx()), cct(_store
->ctx()),
1131 async_rados(_async_rados
), store(_store
),
1132 source_zone(_source_zone
),
1133 src_bucket(_src_bucket
),
1139 pheaders(_pheaders
),
1143 ~RGWStatRemoteObjCR() override
{
1147 void request_cleanup() override
{
1154 int send_request(const DoutPrefixProvider
*dpp
) override
{
1155 req
= new RGWAsyncStatRemoteObj(this, stack
->create_completion_notifier(), store
, source_zone
,
1156 src_bucket
, key
, pmtime
, psize
, petag
, pattrs
, pheaders
);
1157 async_rados
->queue(req
);
1161 int request_complete() override
{
1162 return req
->get_ret_status();
1166 class RGWAsyncRemoveObj
: public RGWAsyncRadosRequest
{
1167 const DoutPrefixProvider
*dpp
;
1168 rgw::sal::RadosStore
* store
;
1169 rgw_zone_id source_zone
;
1171 RGWBucketInfo bucket_info
;
1175 std::string owner_display_name
;
1177 uint64_t versioned_epoch
;
1178 std::string marker_version_id
;
1181 ceph::real_time timestamp
;
1182 rgw_zone_set zones_trace
;
1185 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1187 RGWAsyncRemoveObj(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
1188 rgw::sal::RadosStore
* _store
,
1189 const rgw_zone_id
& _source_zone
,
1190 RGWBucketInfo
& _bucket_info
,
1191 const rgw_obj_key
& _key
,
1192 const std::string
& _owner
,
1193 const std::string
& _owner_display_name
,
1195 uint64_t _versioned_epoch
,
1196 bool _delete_marker
,
1198 real_time
& _timestamp
,
1199 rgw_zone_set
* _zones_trace
) : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), store(_store
),
1200 source_zone(_source_zone
),
1201 bucket_info(_bucket_info
),
1204 owner_display_name(_owner_display_name
),
1205 versioned(_versioned
),
1206 versioned_epoch(_versioned_epoch
),
1207 del_if_older(_if_older
),
1208 timestamp(_timestamp
) {
1209 if (_delete_marker
) {
1210 marker_version_id
= key
.instance
;
1214 zones_trace
= *_zones_trace
;
1219 class RGWRemoveObjCR
: public RGWSimpleCoroutine
{
1220 const DoutPrefixProvider
*dpp
;
1222 RGWAsyncRadosProcessor
*async_rados
;
1223 rgw::sal::RadosStore
* store
;
1224 rgw_zone_id source_zone
;
1226 RGWBucketInfo bucket_info
;
1230 uint64_t versioned_epoch
;
1233 std::string owner_display_name
;
1236 real_time timestamp
;
1238 RGWAsyncRemoveObj
*req
;
1240 rgw_zone_set
*zones_trace
;
1243 RGWRemoveObjCR(const DoutPrefixProvider
*_dpp
, RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1244 const rgw_zone_id
& _source_zone
,
1245 RGWBucketInfo
& _bucket_info
,
1246 const rgw_obj_key
& _key
,
1248 uint64_t _versioned_epoch
,
1249 std::string
*_owner
,
1250 std::string
*_owner_display_name
,
1251 bool _delete_marker
,
1252 real_time
*_timestamp
,
1253 rgw_zone_set
*_zones_trace
) : RGWSimpleCoroutine(_store
->ctx()), dpp(_dpp
), cct(_store
->ctx()),
1254 async_rados(_async_rados
), store(_store
),
1255 source_zone(_source_zone
),
1256 bucket_info(_bucket_info
),
1258 versioned(_versioned
),
1259 versioned_epoch(_versioned_epoch
),
1260 delete_marker(_delete_marker
), req(NULL
), zones_trace(_zones_trace
) {
1261 del_if_older
= (_timestamp
!= NULL
);
1263 timestamp
= *_timestamp
;
1270 if (_owner_display_name
) {
1271 owner_display_name
= *_owner_display_name
;
1274 ~RGWRemoveObjCR() override
{
1278 void request_cleanup() override
{
1285 int send_request(const DoutPrefixProvider
*dpp
) override
{
1286 req
= new RGWAsyncRemoveObj(dpp
, this, stack
->create_completion_notifier(), store
, source_zone
, bucket_info
,
1287 key
, owner
, owner_display_name
, versioned
, versioned_epoch
,
1288 delete_marker
, del_if_older
, timestamp
, zones_trace
);
1289 async_rados
->queue(req
);
1293 int request_complete() override
{
1294 return req
->get_ret_status();
1298 class RGWContinuousLeaseCR
: public RGWCoroutine
{
1299 RGWAsyncRadosProcessor
*async_rados
;
1300 rgw::sal::RadosStore
* store
;
1302 const rgw_raw_obj obj
;
1304 const std::string lock_name
;
1305 const std::string cookie
;
1308 bool going_down
{ false };
1311 RGWCoroutine
*caller
;
1313 bool aborted
{false};
1316 RGWContinuousLeaseCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
1317 const rgw_raw_obj
& _obj
,
1318 const std::string
& _lock_name
, int _interval
, RGWCoroutine
*_caller
)
1319 : RGWCoroutine(_store
->ctx()), async_rados(_async_rados
), store(_store
),
1320 obj(_obj
), lock_name(_lock_name
),
1321 cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct
)),
1322 interval(_interval
), caller(_caller
)
1325 virtual ~RGWContinuousLeaseCR() override
;
1327 int operate(const DoutPrefixProvider
*dpp
) override
;
1329 bool is_locked() const {
1333 void set_locked(bool status
) {
1347 class RGWRadosTimelogAddCR
: public RGWSimpleCoroutine
{
1348 const DoutPrefixProvider
*dpp
;
1349 rgw::sal::RadosStore
* store
;
1350 std::list
<cls_log_entry
> entries
;
1354 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1357 RGWRadosTimelogAddCR(const DoutPrefixProvider
*dpp
, rgw::sal::RadosStore
* _store
, const std::string
& _oid
,
1358 const cls_log_entry
& entry
);
1360 int send_request(const DoutPrefixProvider
*dpp
) override
;
1361 int request_complete() override
;
1364 class RGWRadosTimelogTrimCR
: public RGWSimpleCoroutine
{
1365 const DoutPrefixProvider
*dpp
;
1366 rgw::sal::RadosStore
* store
;
1367 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1370 real_time start_time
;
1372 std::string from_marker
;
1373 std::string to_marker
;
1376 RGWRadosTimelogTrimCR(const DoutPrefixProvider
*dpp
,
1377 rgw::sal::RadosStore
* store
, const std::string
& oid
,
1378 const real_time
& start_time
, const real_time
& end_time
,
1379 const std::string
& from_marker
,
1380 const std::string
& to_marker
);
1382 int send_request(const DoutPrefixProvider
*dpp
) override
;
1383 int request_complete() override
;
1386 // wrapper to update last_trim_marker on success
1387 class RGWSyncLogTrimCR
: public RGWRadosTimelogTrimCR
{
1389 std::string
*last_trim_marker
;
1391 static constexpr const char* max_marker
= "99999999";
1393 RGWSyncLogTrimCR(const DoutPrefixProvider
*dpp
,
1394 rgw::sal::RadosStore
* store
, const std::string
& oid
,
1395 const std::string
& to_marker
, std::string
*last_trim_marker
);
1396 int request_complete() override
;
1399 class RGWAsyncStatObj
: public RGWAsyncRadosRequest
{
1400 const DoutPrefixProvider
*dpp
;
1401 rgw::sal::RadosStore
* store
;
1402 RGWBucketInfo bucket_info
;
1407 RGWObjVersionTracker
*objv_tracker
;
1409 int _send_request(const DoutPrefixProvider
*dpp
) override
;
1411 RGWAsyncStatObj(const DoutPrefixProvider
*dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* store
,
1412 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1413 real_time
*pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1414 RGWObjVersionTracker
*objv_tracker
= nullptr)
1415 : RGWAsyncRadosRequest(caller
, cn
), dpp(dpp
), store(store
), obj(obj
), psize(psize
),
1416 pmtime(pmtime
), pepoch(pepoch
), objv_tracker(objv_tracker
) {}
1419 class RGWStatObjCR
: public RGWSimpleCoroutine
{
1420 const DoutPrefixProvider
*dpp
;
1421 rgw::sal::RadosStore
* store
;
1422 RGWAsyncRadosProcessor
*async_rados
;
1423 RGWBucketInfo bucket_info
;
1428 RGWObjVersionTracker
*objv_tracker
;
1429 RGWAsyncStatObj
*req
= nullptr;
1431 RGWStatObjCR(const DoutPrefixProvider
*dpp
, RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RadosStore
* store
,
1432 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
= nullptr,
1433 real_time
* pmtime
= nullptr, uint64_t *pepoch
= nullptr,
1434 RGWObjVersionTracker
*objv_tracker
= nullptr);
1435 ~RGWStatObjCR() override
{
1438 void request_cleanup() override
;
1440 int send_request(const DoutPrefixProvider
*dpp
) override
;
1441 int request_complete() override
;
1444 /// coroutine wrapper for IoCtx::aio_notify()
1445 class RGWRadosNotifyCR
: public RGWSimpleCoroutine
{
1446 rgw::sal::RadosStore
* const store
;
1447 const rgw_raw_obj obj
;
1449 const uint64_t timeout_ms
;
1450 bufferlist
*response
;
1452 boost::intrusive_ptr
<RGWAioCompletionNotifier
> cn
;
1455 RGWRadosNotifyCR(rgw::sal::RadosStore
* store
, const rgw_raw_obj
& obj
,
1456 bufferlist
& request
, uint64_t timeout_ms
,
1457 bufferlist
*response
);
1459 int send_request(const DoutPrefixProvider
*dpp
) override
;
1460 int request_complete() override
;