1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/compat.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_sync_counters.h"
11 #include "services/svc_zone.h"
12 #include "services/svc_zone_utils.h"
13 #include "services/svc_sys_obj.h"
15 #include "cls/lock/cls_lock_client.h"
16 #include "cls/rgw/cls_rgw_client.h"
18 #include <boost/asio/yield.hpp>
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
23 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest
*req
) {
24 if (processor
->is_going_down()) {
28 processor
->m_req_queue
.push_back(req
);
29 dout(20) << "enqueued request req=" << hex
<< req
<< dec
<< dendl
;
34 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
35 return processor
->m_req_queue
.empty();
38 RGWAsyncRadosRequest
*RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
39 if (processor
->m_req_queue
.empty())
41 RGWAsyncRadosRequest
*req
= processor
->m_req_queue
.front();
42 processor
->m_req_queue
.pop_front();
43 dout(20) << "dequeued request req=" << hex
<< req
<< dec
<< dendl
;
48 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) {
49 processor
->handle_request(req
);
50 processor
->req_throttle
.put(1);
53 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
54 if (!g_conf()->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
57 deque
<RGWAsyncRadosRequest
*>::iterator iter
;
58 if (processor
->m_req_queue
.empty()) {
59 dout(20) << "RGWWQ: empty" << dendl
;
62 dout(20) << "RGWWQ:" << dendl
;
63 for (iter
= processor
->m_req_queue
.begin(); iter
!= processor
->m_req_queue
.end(); ++iter
) {
64 dout(20) << "req: " << hex
<< *iter
<< dec
<< dendl
;
68 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados
*_store
, int num_threads
)
69 : store(_store
), m_tp(store
->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads
),
70 req_throttle(store
->ctx(), "rgw_async_rados_ops", num_threads
* 2),
71 req_wq(this, g_conf()->rgw_op_thread_timeout
,
72 g_conf()->rgw_op_thread_suicide_timeout
, &m_tp
) {
75 void RGWAsyncRadosProcessor::start() {
79 void RGWAsyncRadosProcessor::stop() {
83 for (auto iter
= m_req_queue
.begin(); iter
!= m_req_queue
.end(); ++iter
) {
88 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest
*req
) {
93 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest
*req
) {
98 int RGWAsyncGetSystemObj::_send_request()
100 map
<string
, bufferlist
> *pattrs
= want_attrs
? &attrs
: nullptr;
102 auto sysobj
= obj_ctx
.get_obj(obj
);
104 .set_objv_tracker(&objv_tracker
)
106 .set_raw_attrs(raw_attrs
)
110 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
111 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
112 bool want_attrs
, bool raw_attrs
)
113 : RGWAsyncRadosRequest(caller
, cn
), obj_ctx(_svc
),
114 obj(_obj
), want_attrs(want_attrs
), raw_attrs(raw_attrs
)
117 objv_tracker
= *_objv_tracker
;
121 int RGWSimpleRadosReadAttrsCR::send_request()
123 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(),
124 svc
, nullptr, obj
, true, raw_attrs
);
125 async_rados
->queue(req
);
129 int RGWSimpleRadosReadAttrsCR::request_complete()
132 *pattrs
= std::move(req
->attrs
);
134 return req
->get_ret_status();
137 int RGWAsyncPutSystemObj::_send_request()
139 auto obj_ctx
= svc
->init_obj_ctx();
140 auto sysobj
= obj_ctx
.get_obj(obj
);
142 .set_objv_tracker(&objv_tracker
)
143 .set_exclusive(exclusive
)
147 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
149 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
150 bool _exclusive
, bufferlist _bl
)
151 : RGWAsyncRadosRequest(caller
, cn
), svc(_svc
),
152 obj(_obj
), exclusive(_exclusive
), bl(std::move(_bl
))
155 objv_tracker
= *_objv_tracker
;
159 int RGWAsyncPutSystemObjAttrs::_send_request()
161 auto obj_ctx
= svc
->init_obj_ctx();
162 auto sysobj
= obj_ctx
.get_obj(obj
);
164 .set_objv_tracker(&objv_tracker
)
165 .set_exclusive(false)
170 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
172 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
173 map
<string
, bufferlist
> _attrs
)
174 : RGWAsyncRadosRequest(caller
, cn
), svc(_svc
),
175 obj(_obj
), attrs(std::move(_attrs
))
178 objv_tracker
= *_objv_tracker
;
183 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
, const rgw_raw_obj
& _obj
,
184 uint64_t _window_size
)
185 : RGWConsumerCR
<string
>(_store
->ctx()), async_rados(_async_rados
),
186 store(_store
), obj(_obj
), going_down(false), num_pending_entries(0), window_size(_window_size
), total_entries(0)
190 int RGWAsyncLockSystemObj::_send_request()
193 int r
= store
->get_raw_obj_ref(obj
, &ref
);
195 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
199 rados::cls::lock::Lock
l(lock_name
);
200 utime_t
duration(duration_secs
, 0);
201 l
.set_duration(duration
);
202 l
.set_cookie(cookie
);
203 l
.set_may_renew(true);
205 return l
.lock_exclusive(&ref
.ioctx
, ref
.obj
.oid
);
208 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
209 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
210 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
214 duration_secs(_duration_secs
)
218 int RGWAsyncUnlockSystemObj::_send_request()
221 int r
= store
->get_raw_obj_ref(obj
, &ref
);
223 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
227 rados::cls::lock::Lock
l(lock_name
);
229 l
.set_cookie(cookie
);
231 return l
.unlock(&ref
.ioctx
, ref
.obj
.oid
);
234 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
235 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
236 const string
& _name
, const string
& _cookie
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
238 lock_name(_name
), cookie(_cookie
)
242 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados
*_store
,
243 const rgw_raw_obj
& _obj
,
244 map
<string
, bufferlist
>& _entries
) : RGWSimpleCoroutine(_store
->ctx()),
249 stringstream
& s
= set_description();
250 s
<< "set omap keys dest=" << obj
<< " keys=[" << s
.str() << "]";
251 for (auto i
= entries
.begin(); i
!= entries
.end(); ++i
) {
252 if (i
!= entries
.begin()) {
260 int RGWRadosSetOmapKeysCR::send_request()
262 int r
= store
->get_raw_obj_ref(obj
, &ref
);
264 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
268 set_status() << "sending request";
270 librados::ObjectWriteOperation op
;
271 op
.omap_set(entries
);
273 cn
= stack
->create_completion_notifier();
274 return ref
.ioctx
.aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
277 int RGWRadosSetOmapKeysCR::request_complete()
279 int r
= cn
->completion()->get_return_value();
281 set_status() << "request complete; ret=" << r
;
286 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados
*_store
,
287 const rgw_raw_obj
& _obj
,
288 const string
& _marker
,
291 : RGWSimpleCoroutine(_store
->ctx()), store(_store
), obj(_obj
),
292 marker(_marker
), max_entries(_max_entries
),
293 result(std::move(_result
))
295 ceph_assert(result
); // must be allocated
296 set_description() << "get omap keys dest=" << obj
<< " marker=" << marker
;
299 int RGWRadosGetOmapKeysCR::send_request() {
300 int r
= store
->get_raw_obj_ref(obj
, &result
->ref
);
302 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
306 set_status() << "send request";
308 librados::ObjectReadOperation op
;
309 op
.omap_get_keys2(marker
, max_entries
, &result
->entries
, &result
->more
, nullptr);
311 cn
= stack
->create_completion_notifier(result
);
312 return result
->ref
.ioctx
.aio_operate(result
->ref
.obj
.oid
, cn
->completion(), &op
, NULL
);
315 int RGWRadosGetOmapKeysCR::request_complete()
317 int r
= cn
->completion()->get_return_value();
319 set_status() << "request complete; ret=" << r
;
324 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados
*_store
,
325 const rgw_raw_obj
& _obj
,
326 const set
<string
>& _keys
) : RGWSimpleCoroutine(_store
->ctx()),
331 set_description() << "remove omap keys dest=" << obj
<< " keys=" << keys
;
334 int RGWRadosRemoveOmapKeysCR::send_request() {
335 int r
= store
->get_raw_obj_ref(obj
, &ref
);
337 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
341 set_status() << "send request";
343 librados::ObjectWriteOperation op
;
344 op
.omap_rm_keys(keys
);
346 cn
= stack
->create_completion_notifier();
347 return ref
.ioctx
.aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
350 int RGWRadosRemoveOmapKeysCR::request_complete()
352 int r
= cn
->completion()->get_return_value();
354 set_status() << "request complete; ret=" << r
;
359 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados
*store
, const rgw_raw_obj
& obj
)
360 : RGWSimpleCoroutine(store
->ctx()), store(store
), obj(obj
)
362 set_description() << "remove dest=" << obj
;
365 int RGWRadosRemoveCR::send_request()
367 auto rados
= store
->get_rados_handle();
368 int r
= rados
->ioctx_create(obj
.pool
.name
.c_str(), ioctx
);
370 lderr(cct
) << "ERROR: failed to open pool (" << obj
.pool
.name
<< ") ret=" << r
<< dendl
;
373 ioctx
.locator_set_key(obj
.loc
);
375 set_status() << "send request";
377 librados::ObjectWriteOperation op
;
380 cn
= stack
->create_completion_notifier();
381 return ioctx
.aio_operate(obj
.oid
, cn
->completion(), &op
);
384 int RGWRadosRemoveCR::request_complete()
386 int r
= cn
->completion()->get_return_value();
388 set_status() << "request complete; ret=" << r
;
393 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
394 const rgw_raw_obj
& _obj
,
395 const string
& _lock_name
,
396 const string
& _cookie
,
397 uint32_t _duration
) : RGWSimpleCoroutine(_store
->ctx()),
398 async_rados(_async_rados
),
400 lock_name(_lock_name
),
406 set_description() << "rados lock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
<< " duration=" << duration
;
409 void RGWSimpleRadosLockCR::request_cleanup()
417 int RGWSimpleRadosLockCR::send_request()
419 set_status() << "sending request";
420 req
= new RGWAsyncLockSystemObj(this, stack
->create_completion_notifier(),
421 store
, NULL
, obj
, lock_name
, cookie
, duration
);
422 async_rados
->queue(req
);
426 int RGWSimpleRadosLockCR::request_complete()
428 set_status() << "request complete; ret=" << req
->get_ret_status();
429 return req
->get_ret_status();
432 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
433 const rgw_raw_obj
& _obj
,
434 const string
& _lock_name
,
435 const string
& _cookie
) : RGWSimpleCoroutine(_store
->ctx()),
436 async_rados(_async_rados
),
438 lock_name(_lock_name
),
443 set_description() << "rados unlock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
;
446 void RGWSimpleRadosUnlockCR::request_cleanup()
454 int RGWSimpleRadosUnlockCR::send_request()
456 set_status() << "sending request";
458 req
= new RGWAsyncUnlockSystemObj(this, stack
->create_completion_notifier(),
459 store
, NULL
, obj
, lock_name
, cookie
);
460 async_rados
->queue(req
);
464 int RGWSimpleRadosUnlockCR::request_complete()
466 set_status() << "request complete; ret=" << req
->get_ret_status();
467 return req
->get_ret_status();
470 int RGWOmapAppend::operate() {
473 if (!has_product() && going_down
) {
474 set_status() << "going down";
477 set_status() << "waiting for product";
478 yield
wait_for_product();
481 while (consume(&entry
)) {
482 set_status() << "adding entry: " << entry
;
483 entries
[entry
] = bufferlist();
484 if (entries
.size() >= window_size
) {
488 if (entries
.size() >= window_size
|| going_down
) {
489 set_status() << "flushing to omap";
490 call(new RGWRadosSetOmapKeysCR(store
, obj
, entries
));
494 if (get_ret_status() < 0) {
495 ldout(cct
, 0) << "ERROR: failed to store entries in omap" << dendl
;
496 return set_state(RGWCoroutine_Error
);
499 /* done with coroutine */
500 return set_state(RGWCoroutine_Done
);
505 void RGWOmapAppend::flush_pending() {
506 receive(pending_entries
);
507 num_pending_entries
= 0;
510 bool RGWOmapAppend::append(const string
& s
) {
515 pending_entries
.push_back(s
);
516 if (++num_pending_entries
>= (int)window_size
) {
522 bool RGWOmapAppend::finish() {
529 int RGWAsyncGetBucketInstanceInfo::_send_request()
531 RGWSysObjectCtx obj_ctx
= store
->svc
.sysobj
->init_obj_ctx();
532 int r
= store
->get_bucket_instance_from_oid(obj_ctx
, oid
, bucket_info
, NULL
, NULL
);
534 ldout(store
->ctx(), 0) << "ERROR: failed to get bucket instance info for "
542 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(RGWRados
*store
,
543 const RGWBucketInfo
& bucket_info
,
545 const std::string
& start_marker
,
546 const std::string
& end_marker
)
547 : RGWSimpleCoroutine(store
->ctx()), bs(store
),
548 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker
)),
549 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker
))
551 bs
.init(bucket_info
, shard_id
);
554 int RGWRadosBILogTrimCR::send_request()
557 cls_rgw_bi_log_trim_op call
;
558 call
.start_marker
= std::move(start_marker
);
559 call
.end_marker
= std::move(end_marker
);
562 librados::ObjectWriteOperation op
;
563 op
.exec(RGW_CLASS
, RGW_BI_LOG_TRIM
, in
);
565 cn
= stack
->create_completion_notifier();
566 return bs
.index_ctx
.aio_operate(bs
.bucket_obj
, cn
->completion(), &op
);
569 int RGWRadosBILogTrimCR::request_complete()
571 int r
= cn
->completion()->get_return_value();
572 set_status() << "request complete; ret=" << r
;
576 int RGWAsyncFetchRemoteObj::_send_request()
578 RGWObjectCtx
obj_ctx(store
);
582 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->instance_id());
583 map
<string
, bufferlist
> attrs
;
585 rgw_obj
src_obj(bucket_info
.bucket
, key
);
587 rgw_obj
dest_obj(bucket_info
.bucket
, dest_key
.value_or(key
));
589 std::optional
<uint64_t> bytes_transferred
;
590 int r
= store
->fetch_remote_obj(obj_ctx
,
596 bucket_info
, /* dest */
597 bucket_info
, /* source */
599 NULL
, /* real_time* src_mtime, */
600 NULL
, /* real_time* mtime, */
601 NULL
, /* const real_time* mod_ptr, */
602 NULL
, /* const real_time* unmod_ptr, */
603 false, /* high precision time */
604 NULL
, /* const char *if_match, */
605 NULL
, /* const char *if_nomatch, */
606 RGWRados::ATTRSMOD_NONE
,
609 RGWObjCategory::Main
,
611 real_time(), /* delete_at */
612 NULL
, /* string *ptag, */
613 NULL
, /* string *petag, */
614 NULL
, /* void (*progress_cb)(off_t, void *), */
615 NULL
, /* void *progress_data*); */
620 ldout(store
->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
622 counters
->inc(sync_counters::l_fetch_err
, 1);
624 } else if (counters
) {
625 if (bytes_transferred
) {
626 counters
->inc(sync_counters::l_fetch
, *bytes_transferred
);
628 counters
->inc(sync_counters::l_fetch_not_modified
);
634 int RGWAsyncStatRemoteObj::_send_request()
636 RGWObjectCtx
obj_ctx(store
);
640 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->instance_id());
642 rgw_obj
src_obj(bucket_info
.bucket
, key
);
644 rgw_obj
dest_obj(src_obj
);
646 int r
= store
->stat_remote_obj(obj_ctx
,
648 nullptr, /* req_info */
651 bucket_info
, /* source */
652 pmtime
, /* real_time* src_mtime, */
653 psize
, /* uint64_t * */
654 nullptr, /* const real_time* mod_ptr, */
655 nullptr, /* const real_time* unmod_ptr, */
656 true, /* high precision time */
657 nullptr, /* const char *if_match, */
658 nullptr, /* const char *if_nomatch, */
662 nullptr, /* string *ptag, */
663 petag
); /* string *petag, */
666 ldout(store
->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
672 int RGWAsyncRemoveObj::_send_request()
674 RGWObjectCtx
obj_ctx(store
);
676 rgw_obj
obj(bucket_info
.bucket
, key
);
678 ldout(store
->ctx(), 0) << __func__
<< "(): deleting obj=" << obj
<< dendl
;
680 obj_ctx
.set_atomic(obj
);
684 int ret
= store
->get_obj_state(&obj_ctx
, bucket_info
, obj
, &state
);
686 ldout(store
->ctx(), 20) << __func__
<< "(): get_obj_state() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
690 /* has there been any racing object write? */
691 if (del_if_older
&& (state
->mtime
> timestamp
)) {
692 ldout(store
->ctx(), 20) << __func__
<< "(): skipping object removal obj=" << obj
<< " (obj mtime=" << state
->mtime
<< ", request timestamp=" << timestamp
<< ")" << dendl
;
696 RGWAccessControlPolicy policy
;
699 map
<string
, bufferlist
>::iterator iter
= state
->attrset
.find(RGW_ATTR_ACL
);
700 if (iter
!= state
->attrset
.end()) {
701 auto bliter
= iter
->second
.cbegin();
703 policy
.decode(bliter
);
704 } catch (buffer::error
& err
) {
705 ldout(store
->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
710 RGWRados::Object
del_target(store
, bucket_info
, obj_ctx
, obj
);
711 RGWRados::Object::Delete
del_op(&del_target
);
713 del_op
.params
.bucket_owner
= bucket_info
.owner
;
714 del_op
.params
.obj_owner
= policy
.get_owner();
716 del_op
.params
.unmod_since
= timestamp
;
719 del_op
.params
.versioning_status
= BUCKET_VERSIONED
;
721 del_op
.params
.olh_epoch
= versioned_epoch
;
722 del_op
.params
.marker_version_id
= marker_version_id
;
723 del_op
.params
.obj_owner
.set_id(owner
);
724 del_op
.params
.obj_owner
.set_name(owner_display_name
);
725 del_op
.params
.mtime
= timestamp
;
726 del_op
.params
.high_precision_time
= true;
727 del_op
.params
.zones_trace
= &zones_trace
;
729 ret
= del_op
.delete_obj();
731 ldout(store
->ctx(), 20) << __func__
<< "(): delete_obj() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
736 int RGWContinuousLeaseCR::operate()
739 caller
->set_sleeping(false);
740 return set_cr_done();
743 while (!going_down
) {
744 yield
call(new RGWSimpleRadosLockCR(async_rados
, store
, obj
, lock_name
, cookie
, interval
));
746 caller
->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
749 ldout(store
->ctx(), 20) << *this << ": couldn't lock " << obj
<< ":" << lock_name
<< ": retcode=" << retcode
<< dendl
;
750 return set_state(RGWCoroutine_Error
, retcode
);
753 yield
wait(utime_t(interval
/ 2, 0));
755 set_locked(false); /* moot at this point anyway */
756 yield
call(new RGWSimpleRadosUnlockCR(async_rados
, store
, obj
, lock_name
, cookie
));
757 return set_state(RGWCoroutine_Done
);
762 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados
*_store
, const string
& _oid
,
763 const cls_log_entry
& entry
) : RGWSimpleCoroutine(_store
->ctx()),
767 stringstream
& s
= set_description();
768 s
<< "timelog add entry oid=" << oid
<< "entry={id=" << entry
.id
<< ", section=" << entry
.section
<< ", name=" << entry
.name
<< "}";
769 entries
.push_back(entry
);
772 int RGWRadosTimelogAddCR::send_request()
774 set_status() << "sending request";
776 cn
= stack
->create_completion_notifier();
777 return store
->time_log_add(oid
, entries
, cn
->completion(), true);
780 int RGWRadosTimelogAddCR::request_complete()
782 int r
= cn
->completion()->get_return_value();
784 set_status() << "request complete; ret=" << r
;
789 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados
*store
,
790 const std::string
& oid
,
791 const real_time
& start_time
,
792 const real_time
& end_time
,
793 const std::string
& from_marker
,
794 const std::string
& to_marker
)
795 : RGWSimpleCoroutine(store
->ctx()), store(store
), oid(oid
),
796 start_time(start_time
), end_time(end_time
),
797 from_marker(from_marker
), to_marker(to_marker
)
799 set_description() << "timelog trim oid=" << oid
800 << " start_time=" << start_time
<< " end_time=" << end_time
801 << " from_marker=" << from_marker
<< " to_marker=" << to_marker
;
804 int RGWRadosTimelogTrimCR::send_request()
806 set_status() << "sending request";
808 cn
= stack
->create_completion_notifier();
809 return store
->time_log_trim(oid
, start_time
, end_time
, from_marker
,
810 to_marker
, cn
->completion());
813 int RGWRadosTimelogTrimCR::request_complete()
815 int r
= cn
->completion()->get_return_value();
817 set_status() << "request complete; ret=" << r
;
823 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados
*store
, const std::string
& oid
,
824 const std::string
& to_marker
,
825 std::string
*last_trim_marker
)
826 : RGWRadosTimelogTrimCR(store
, oid
, real_time
{}, real_time
{},
827 std::string
{}, to_marker
),
828 cct(store
->ctx()), last_trim_marker(last_trim_marker
)
832 int RGWSyncLogTrimCR::request_complete()
834 int r
= RGWRadosTimelogTrimCR::request_complete();
838 // nothing left to trim, update last_trim_marker
839 if (*last_trim_marker
< to_marker
) {
840 *last_trim_marker
= to_marker
;
846 int RGWAsyncStatObj::_send_request()
849 store
->obj_to_raw(bucket_info
.placement_rule
, obj
, &raw_obj
);
850 return store
->raw_obj_stat(raw_obj
, psize
, pmtime
, pepoch
,
851 nullptr, nullptr, objv_tracker
);
854 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, RGWRados
*store
,
855 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
,
856 real_time
* pmtime
, uint64_t *pepoch
,
857 RGWObjVersionTracker
*objv_tracker
)
858 : RGWSimpleCoroutine(store
->ctx()), store(store
), async_rados(async_rados
),
859 bucket_info(_bucket_info
), obj(obj
), psize(psize
), pmtime(pmtime
), pepoch(pepoch
),
860 objv_tracker(objv_tracker
)
864 void RGWStatObjCR::request_cleanup()
872 int RGWStatObjCR::send_request()
874 req
= new RGWAsyncStatObj(this, stack
->create_completion_notifier(),
875 store
, bucket_info
, obj
, psize
, pmtime
, pepoch
, objv_tracker
);
876 async_rados
->queue(req
);
880 int RGWStatObjCR::request_complete()
882 return req
->get_ret_status();
885 RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados
*store
, const rgw_raw_obj
& obj
,
886 bufferlist
& request
, uint64_t timeout_ms
,
887 bufferlist
*response
)
888 : RGWSimpleCoroutine(store
->ctx()), store(store
), obj(obj
),
889 request(request
), timeout_ms(timeout_ms
), response(response
)
891 set_description() << "notify dest=" << obj
;
894 int RGWRadosNotifyCR::send_request()
896 int r
= store
->get_raw_obj_ref(obj
, &ref
);
898 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
902 set_status() << "sending request";
904 cn
= stack
->create_completion_notifier();
905 return ref
.ioctx
.aio_notify(ref
.obj
.oid
, cn
->completion(), request
,
906 timeout_ms
, response
);
909 int RGWRadosNotifyCR::request_complete()
911 int r
= cn
->completion()->get_return_value();
913 set_status() << "request complete; ret=" << r
;