2 #include "rgw_coroutine.h"
3 #include "rgw_cr_rados.h"
5 #include "cls/lock/cls_lock_client.h"
6 #include "cls/rgw/cls_rgw_client.h"
8 #include <boost/asio/yield.hpp>
10 #define dout_context g_ceph_context
11 #define dout_subsys ceph_subsys_rgw
13 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest
*req
) {
14 if (processor
->is_going_down()) {
18 processor
->m_req_queue
.push_back(req
);
19 dout(20) << "enqueued request req=" << hex
<< req
<< dec
<< dendl
;
24 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
25 return processor
->m_req_queue
.empty();
28 RGWAsyncRadosRequest
*RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
29 if (processor
->m_req_queue
.empty())
31 RGWAsyncRadosRequest
*req
= processor
->m_req_queue
.front();
32 processor
->m_req_queue
.pop_front();
33 dout(20) << "dequeued request req=" << hex
<< req
<< dec
<< dendl
;
38 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) {
39 processor
->handle_request(req
);
40 processor
->req_throttle
.put(1);
43 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
44 if (!g_conf
->subsys
.should_gather(ceph_subsys_rgw
, 20)) {
47 deque
<RGWAsyncRadosRequest
*>::iterator iter
;
48 if (processor
->m_req_queue
.empty()) {
49 dout(20) << "RGWWQ: empty" << dendl
;
52 dout(20) << "RGWWQ:" << dendl
;
53 for (iter
= processor
->m_req_queue
.begin(); iter
!= processor
->m_req_queue
.end(); ++iter
) {
54 dout(20) << "req: " << hex
<< *iter
<< dec
<< dendl
;
58 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(RGWRados
*_store
, int num_threads
)
59 : store(_store
), m_tp(store
->ctx(), "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads
),
60 req_throttle(store
->ctx(), "rgw_async_rados_ops", num_threads
* 2),
61 req_wq(this, g_conf
->rgw_op_thread_timeout
,
62 g_conf
->rgw_op_thread_suicide_timeout
, &m_tp
) {
65 void RGWAsyncRadosProcessor::start() {
69 void RGWAsyncRadosProcessor::stop() {
73 for (auto iter
= m_req_queue
.begin(); iter
!= m_req_queue
.end(); ++iter
) {
78 void RGWAsyncRadosProcessor::handle_request(RGWAsyncRadosRequest
*req
) {
83 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest
*req
) {
88 int RGWAsyncGetSystemObj::_send_request()
90 map
<string
, bufferlist
> *pattrs
= want_attrs
? &attrs
: nullptr;
92 return store
->get_system_obj(obj_ctx
, read_state
, &objv_tracker
,
93 obj
, bl
, ofs
, end
, pattrs
, nullptr);
96 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
97 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
98 off_t _ofs
, off_t _end
, bool want_attrs
)
99 : RGWAsyncRadosRequest(caller
, cn
), store(_store
), obj_ctx(_store
),
100 obj(_obj
), ofs(_ofs
), end(_end
), want_attrs(want_attrs
)
103 objv_tracker
= *_objv_tracker
;
107 int RGWSimpleRadosReadAttrsCR::send_request()
109 req
= new RGWAsyncGetSystemObj(this, stack
->create_completion_notifier(),
110 store
, nullptr, obj
, 0, -1, true);
111 async_rados
->queue(req
);
115 int RGWSimpleRadosReadAttrsCR::request_complete()
118 *pattrs
= std::move(req
->attrs
);
120 return req
->get_ret_status();
123 int RGWAsyncPutSystemObj::_send_request()
125 return store
->put_system_obj_data(NULL
, obj
, bl
, -1, exclusive
, &objv_tracker
);
128 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
129 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
130 bool _exclusive
, bufferlist _bl
)
131 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
132 obj(_obj
), exclusive(_exclusive
), bl(std::move(_bl
))
135 objv_tracker
= *_objv_tracker
;
139 int RGWAsyncPutSystemObjAttrs::_send_request()
141 return store
->system_obj_set_attrs(nullptr, obj
, attrs
, nullptr, &objv_tracker
);
144 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
145 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
146 map
<string
, bufferlist
> _attrs
)
147 : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
148 obj(_obj
), attrs(std::move(_attrs
))
153 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
, const rgw_raw_obj
& _obj
,
154 uint64_t _window_size
)
155 : RGWConsumerCR
<string
>(_store
->ctx()), async_rados(_async_rados
),
156 store(_store
), obj(_obj
), going_down(false), num_pending_entries(0), window_size(_window_size
), total_entries(0)
160 int RGWAsyncLockSystemObj::_send_request()
163 int r
= store
->get_raw_obj_ref(obj
, &ref
);
165 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
169 rados::cls::lock::Lock
l(lock_name
);
170 utime_t
duration(duration_secs
, 0);
171 l
.set_duration(duration
);
172 l
.set_cookie(cookie
);
175 return l
.lock_exclusive(&ref
.ioctx
, ref
.oid
);
178 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
179 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
180 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
184 duration_secs(_duration_secs
)
188 int RGWAsyncUnlockSystemObj::_send_request()
191 int r
= store
->get_raw_obj_ref(obj
, &ref
);
193 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
197 rados::cls::lock::Lock
l(lock_name
);
199 l
.set_cookie(cookie
);
201 return l
.unlock(&ref
.ioctx
, ref
.oid
);
204 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWRados
*_store
,
205 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
206 const string
& _name
, const string
& _cookie
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
208 lock_name(_name
), cookie(_cookie
)
212 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(RGWRados
*_store
,
213 const rgw_raw_obj
& _obj
,
214 map
<string
, bufferlist
>& _entries
) : RGWSimpleCoroutine(_store
->ctx()),
219 stringstream
& s
= set_description();
220 s
<< "set omap keys dest=" << obj
<< " keys=[" << s
.str() << "]";
221 for (auto i
= entries
.begin(); i
!= entries
.end(); ++i
) {
222 if (i
!= entries
.begin()) {
230 int RGWRadosSetOmapKeysCR::send_request()
232 int r
= store
->get_raw_obj_ref(obj
, &ref
);
234 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
238 set_status() << "sending request";
240 librados::ObjectWriteOperation op
;
241 op
.omap_set(entries
);
243 cn
= stack
->create_completion_notifier();
244 return ref
.ioctx
.aio_operate(ref
.oid
, cn
->completion(), &op
);
247 int RGWRadosSetOmapKeysCR::request_complete()
249 int r
= cn
->completion()->get_return_value();
251 set_status() << "request complete; ret=" << r
;
256 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(RGWRados
*_store
,
257 const rgw_raw_obj
& _obj
,
258 const string
& _marker
,
259 std::set
<std::string
> *_entries
, int _max_entries
) : RGWSimpleCoroutine(_store
->ctx()),
262 entries(_entries
), max_entries(_max_entries
),
265 set_description() << "set omap keys dest=" << obj
<< " marker=" << marker
;
268 int RGWRadosGetOmapKeysCR::send_request() {
269 int r
= store
->get_raw_obj_ref(obj
, &ref
);
271 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
275 set_status() << "send request";
277 librados::ObjectReadOperation op
;
278 op
.omap_get_keys2(marker
, max_entries
, entries
, nullptr, nullptr);
280 cn
= stack
->create_completion_notifier();
281 return ref
.ioctx
.aio_operate(ref
.oid
, cn
->completion(), &op
, NULL
);
284 int RGWRadosGetOmapKeysCR::request_complete()
286 int r
= cn
->completion()->get_return_value();
288 set_status() << "request complete; ret=" << r
;
293 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(RGWRados
*_store
,
294 const rgw_raw_obj
& _obj
,
295 const set
<string
>& _keys
) : RGWSimpleCoroutine(_store
->ctx()),
300 set_description() << "remove omap keys dest=" << obj
<< " keys=" << keys
;
303 int RGWRadosRemoveOmapKeysCR::send_request() {
304 int r
= store
->get_raw_obj_ref(obj
, &ref
);
306 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
310 set_status() << "send request";
312 librados::ObjectWriteOperation op
;
313 op
.omap_rm_keys(keys
);
315 cn
= stack
->create_completion_notifier();
316 return ref
.ioctx
.aio_operate(ref
.oid
, cn
->completion(), &op
);
319 int RGWRadosRemoveOmapKeysCR::request_complete()
321 int r
= cn
->completion()->get_return_value();
323 set_status() << "request complete; ret=" << r
;
328 RGWRadosRemoveCR::RGWRadosRemoveCR(RGWRados
*store
, const rgw_raw_obj
& obj
)
329 : RGWSimpleCoroutine(store
->ctx()), store(store
), obj(obj
)
331 set_description() << "remove dest=" << obj
;
334 int RGWRadosRemoveCR::send_request()
336 auto rados
= store
->get_rados_handle();
337 int r
= rados
->ioctx_create(obj
.pool
.name
.c_str(), ioctx
);
339 lderr(cct
) << "ERROR: failed to open pool (" << obj
.pool
.name
<< ") ret=" << r
<< dendl
;
342 ioctx
.locator_set_key(obj
.loc
);
344 set_status() << "send request";
346 librados::ObjectWriteOperation op
;
349 cn
= stack
->create_completion_notifier();
350 return ioctx
.aio_operate(obj
.oid
, cn
->completion(), &op
);
353 int RGWRadosRemoveCR::request_complete()
355 int r
= cn
->completion()->get_return_value();
357 set_status() << "request complete; ret=" << r
;
362 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
363 const rgw_raw_obj
& _obj
,
364 const string
& _lock_name
,
365 const string
& _cookie
,
366 uint32_t _duration
) : RGWSimpleCoroutine(_store
->ctx()),
367 async_rados(_async_rados
),
369 lock_name(_lock_name
),
375 set_description() << "rados lock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
<< " duration=" << duration
;
378 void RGWSimpleRadosLockCR::request_cleanup()
386 int RGWSimpleRadosLockCR::send_request()
388 set_status() << "sending request";
389 req
= new RGWAsyncLockSystemObj(this, stack
->create_completion_notifier(),
390 store
, NULL
, obj
, lock_name
, cookie
, duration
);
391 async_rados
->queue(req
);
395 int RGWSimpleRadosLockCR::request_complete()
397 set_status() << "request complete; ret=" << req
->get_ret_status();
398 return req
->get_ret_status();
401 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, RGWRados
*_store
,
402 const rgw_raw_obj
& _obj
,
403 const string
& _lock_name
,
404 const string
& _cookie
) : RGWSimpleCoroutine(_store
->ctx()),
405 async_rados(_async_rados
),
407 lock_name(_lock_name
),
412 set_description() << "rados unlock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
;
415 void RGWSimpleRadosUnlockCR::request_cleanup()
423 int RGWSimpleRadosUnlockCR::send_request()
425 set_status() << "sending request";
427 req
= new RGWAsyncUnlockSystemObj(this, stack
->create_completion_notifier(),
428 store
, NULL
, obj
, lock_name
, cookie
);
429 async_rados
->queue(req
);
433 int RGWSimpleRadosUnlockCR::request_complete()
435 set_status() << "request complete; ret=" << req
->get_ret_status();
436 return req
->get_ret_status();
439 int RGWOmapAppend::operate() {
442 if (!has_product() && going_down
) {
443 set_status() << "going down";
446 set_status() << "waiting for product";
447 yield
wait_for_product();
450 while (consume(&entry
)) {
451 set_status() << "adding entry: " << entry
;
452 entries
[entry
] = bufferlist();
453 if (entries
.size() >= window_size
) {
457 if (entries
.size() >= window_size
|| going_down
) {
458 set_status() << "flushing to omap";
459 call(new RGWRadosSetOmapKeysCR(store
, obj
, entries
));
463 if (get_ret_status() < 0) {
464 ldout(cct
, 0) << "ERROR: failed to store entries in omap" << dendl
;
465 return set_state(RGWCoroutine_Error
);
468 /* done with coroutine */
469 return set_state(RGWCoroutine_Done
);
474 void RGWOmapAppend::flush_pending() {
475 receive(pending_entries
);
476 num_pending_entries
= 0;
479 bool RGWOmapAppend::append(const string
& s
) {
484 pending_entries
.push_back(s
);
485 if (++num_pending_entries
>= (int)window_size
) {
491 bool RGWOmapAppend::finish() {
498 int RGWAsyncGetBucketInstanceInfo::_send_request()
500 RGWObjectCtx
obj_ctx(store
);
501 int r
= store
->get_bucket_instance_from_oid(obj_ctx
, oid
, bucket_info
, NULL
, NULL
);
503 ldout(store
->ctx(), 0) << "ERROR: failed to get bucket instance info for "
511 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(RGWRados
*store
,
512 const RGWBucketInfo
& bucket_info
,
514 const std::string
& start_marker
,
515 const std::string
& end_marker
)
516 : RGWSimpleCoroutine(store
->ctx()), bs(store
),
517 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker
)),
518 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker
))
520 bs
.init(bucket_info
, shard_id
);
523 int RGWRadosBILogTrimCR::send_request()
526 cls_rgw_bi_log_trim_op call
;
527 call
.start_marker
= std::move(start_marker
);
528 call
.end_marker
= std::move(end_marker
);
531 librados::ObjectWriteOperation op
;
532 op
.exec(RGW_CLASS
, RGW_BI_LOG_TRIM
, in
);
534 cn
= stack
->create_completion_notifier();
535 return bs
.index_ctx
.aio_operate(bs
.bucket_obj
, cn
->completion(), &op
);
538 int RGWRadosBILogTrimCR::request_complete()
540 int r
= cn
->completion()->get_return_value();
541 set_status() << "request complete; ret=" << r
;
545 int RGWAsyncFetchRemoteObj::_send_request()
547 RGWObjectCtx
obj_ctx(store
);
551 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->instance_id());
552 string client_id
= store
->zone_id() + buf
;
553 string op_id
= store
->unique_id(store
->get_new_req_id());
554 map
<string
, bufferlist
> attrs
;
556 rgw_obj
src_obj(bucket_info
.bucket
, key
);
558 rgw_obj
dest_obj(src_obj
);
560 int r
= store
->fetch_remote_obj(obj_ctx
,
564 false, /* don't record op state in ops log */
569 bucket_info
, /* dest */
570 bucket_info
, /* source */
571 NULL
, /* real_time* src_mtime, */
572 NULL
, /* real_time* mtime, */
573 NULL
, /* const real_time* mod_ptr, */
574 NULL
, /* const real_time* unmod_ptr, */
575 false, /* high precision time */
576 NULL
, /* const char *if_match, */
577 NULL
, /* const char *if_nomatch, */
578 RGWRados::ATTRSMOD_NONE
,
581 RGW_OBJ_CATEGORY_MAIN
,
583 real_time(), /* delete_at */
584 &key
.instance
, /* string *version_id, */
585 NULL
, /* string *ptag, */
586 NULL
, /* string *petag, */
587 NULL
, /* void (*progress_cb)(off_t, void *), */
588 NULL
, /* void *progress_data*); */
592 ldout(store
->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
597 int RGWAsyncStatRemoteObj::_send_request()
599 RGWObjectCtx
obj_ctx(store
);
603 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->instance_id());
604 string client_id
= store
->zone_id() + buf
;
605 string op_id
= store
->unique_id(store
->get_new_req_id());
607 rgw_obj
src_obj(bucket_info
.bucket
, key
);
609 rgw_obj
dest_obj(src_obj
);
611 int r
= store
->stat_remote_obj(obj_ctx
,
614 nullptr, /* req_info */
617 bucket_info
, /* source */
618 pmtime
, /* real_time* src_mtime, */
619 psize
, /* uint64_t * */
620 nullptr, /* const real_time* mod_ptr, */
621 nullptr, /* const real_time* unmod_ptr, */
622 true, /* high precision time */
623 nullptr, /* const char *if_match, */
624 nullptr, /* const char *if_nomatch, */
627 nullptr, /* string *ptag, */
628 nullptr); /* string *petag, */
631 ldout(store
->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
637 int RGWAsyncRemoveObj::_send_request()
639 RGWObjectCtx
obj_ctx(store
);
641 rgw_obj
obj(bucket_info
.bucket
, key
);
643 ldout(store
->ctx(), 0) << __func__
<< "(): deleting obj=" << obj
<< dendl
;
645 obj_ctx
.obj
.set_atomic(obj
);
649 int ret
= store
->get_obj_state(&obj_ctx
, bucket_info
, obj
, &state
);
651 ldout(store
->ctx(), 20) << __func__
<< "(): get_obj_state() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
655 /* has there been any racing object write? */
656 if (del_if_older
&& (state
->mtime
> timestamp
)) {
657 ldout(store
->ctx(), 20) << __func__
<< "(): skipping object removal obj=" << obj
<< " (obj mtime=" << state
->mtime
<< ", request timestamp=" << timestamp
<< ")" << dendl
;
661 RGWAccessControlPolicy policy
;
664 map
<string
, bufferlist
>::iterator iter
= state
->attrset
.find(RGW_ATTR_ACL
);
665 if (iter
!= state
->attrset
.end()) {
666 bufferlist::iterator bliter
= iter
->second
.begin();
668 policy
.decode(bliter
);
669 } catch (buffer::error
& err
) {
670 ldout(store
->ctx(), 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
675 RGWRados::Object
del_target(store
, bucket_info
, obj_ctx
, obj
);
676 RGWRados::Object::Delete
del_op(&del_target
);
678 del_op
.params
.bucket_owner
= bucket_info
.owner
;
679 del_op
.params
.obj_owner
= policy
.get_owner();
681 del_op
.params
.unmod_since
= timestamp
;
684 del_op
.params
.versioning_status
= BUCKET_VERSIONED
;
686 del_op
.params
.olh_epoch
= versioned_epoch
;
687 del_op
.params
.marker_version_id
= marker_version_id
;
688 del_op
.params
.obj_owner
.set_id(owner
);
689 del_op
.params
.obj_owner
.set_name(owner_display_name
);
690 del_op
.params
.mtime
= timestamp
;
691 del_op
.params
.high_precision_time
= true;
692 del_op
.params
.zones_trace
= &zones_trace
;
694 ret
= del_op
.delete_obj();
696 ldout(store
->ctx(), 20) << __func__
<< "(): delete_obj() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
701 int RGWContinuousLeaseCR::operate()
704 caller
->set_sleeping(false);
705 return set_cr_done();
708 while (!going_down
) {
709 yield
call(new RGWSimpleRadosLockCR(async_rados
, store
, obj
, lock_name
, cookie
, interval
));
711 caller
->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
714 ldout(store
->ctx(), 20) << *this << ": couldn't lock " << obj
<< ":" << lock_name
<< ": retcode=" << retcode
<< dendl
;
715 return set_state(RGWCoroutine_Error
, retcode
);
718 yield
wait(utime_t(interval
/ 2, 0));
720 set_locked(false); /* moot at this point anyway */
721 yield
call(new RGWSimpleRadosUnlockCR(async_rados
, store
, obj
, lock_name
, cookie
));
722 return set_state(RGWCoroutine_Done
);
727 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(RGWRados
*_store
, const string
& _oid
,
728 const cls_log_entry
& entry
) : RGWSimpleCoroutine(_store
->ctx()),
732 stringstream
& s
= set_description();
733 s
<< "timelog add entry oid=" << oid
<< "entry={id=" << entry
.id
<< ", section=" << entry
.section
<< ", name=" << entry
.name
<< "}";
734 entries
.push_back(entry
);
737 int RGWRadosTimelogAddCR::send_request()
739 set_status() << "sending request";
741 cn
= stack
->create_completion_notifier();
742 return store
->time_log_add(oid
, entries
, cn
->completion(), true);
745 int RGWRadosTimelogAddCR::request_complete()
747 int r
= cn
->completion()->get_return_value();
749 set_status() << "request complete; ret=" << r
;
754 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(RGWRados
*store
,
755 const std::string
& oid
,
756 const real_time
& start_time
,
757 const real_time
& end_time
,
758 const std::string
& from_marker
,
759 const std::string
& to_marker
)
760 : RGWSimpleCoroutine(store
->ctx()), store(store
), oid(oid
),
761 start_time(start_time
), end_time(end_time
),
762 from_marker(from_marker
), to_marker(to_marker
)
764 set_description() << "timelog trim oid=" << oid
765 << " start_time=" << start_time
<< " end_time=" << end_time
766 << " from_marker=" << from_marker
<< " to_marker=" << to_marker
;
769 int RGWRadosTimelogTrimCR::send_request()
771 set_status() << "sending request";
773 cn
= stack
->create_completion_notifier();
774 return store
->time_log_trim(oid
, start_time
, end_time
, from_marker
,
775 to_marker
, cn
->completion());
778 int RGWRadosTimelogTrimCR::request_complete()
780 int r
= cn
->completion()->get_return_value();
782 set_status() << "request complete; ret=" << r
;
788 RGWSyncLogTrimCR::RGWSyncLogTrimCR(RGWRados
*store
, const std::string
& oid
,
789 const std::string
& to_marker
,
790 std::string
*last_trim_marker
)
791 : RGWRadosTimelogTrimCR(store
, oid
, real_time
{}, real_time
{},
792 std::string
{}, to_marker
),
793 cct(store
->ctx()), last_trim_marker(last_trim_marker
)
797 int RGWSyncLogTrimCR::request_complete()
799 int r
= RGWRadosTimelogTrimCR::request_complete();
800 if (r
< 0 && r
!= -ENODATA
) {
803 if (*last_trim_marker
< to_marker
) {
804 *last_trim_marker
= to_marker
;
810 int RGWAsyncStatObj::_send_request()
813 store
->obj_to_raw(bucket_info
.placement_rule
, obj
, &raw_obj
);
814 return store
->raw_obj_stat(raw_obj
, psize
, pmtime
, pepoch
,
815 nullptr, nullptr, objv_tracker
);
818 RGWStatObjCR::RGWStatObjCR(RGWAsyncRadosProcessor
*async_rados
, RGWRados
*store
,
819 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
,
820 real_time
* pmtime
, uint64_t *pepoch
,
821 RGWObjVersionTracker
*objv_tracker
)
822 : RGWSimpleCoroutine(store
->ctx()), store(store
), async_rados(async_rados
),
823 bucket_info(_bucket_info
), obj(obj
), psize(psize
), pmtime(pmtime
), pepoch(pepoch
),
824 objv_tracker(objv_tracker
)
828 void RGWStatObjCR::request_cleanup()
836 int RGWStatObjCR::send_request()
838 req
= new RGWAsyncStatObj(this, stack
->create_completion_notifier(),
839 store
, bucket_info
, obj
, psize
, pmtime
, pepoch
, objv_tracker
);
840 async_rados
->queue(req
);
844 int RGWStatObjCR::request_complete()
846 return req
->get_ret_status();
849 RGWRadosNotifyCR::RGWRadosNotifyCR(RGWRados
*store
, const rgw_raw_obj
& obj
,
850 bufferlist
& request
, uint64_t timeout_ms
,
851 bufferlist
*response
)
852 : RGWSimpleCoroutine(store
->ctx()), store(store
), obj(obj
),
853 request(request
), timeout_ms(timeout_ms
), response(response
)
855 set_description() << "notify dest=" << obj
;
858 int RGWRadosNotifyCR::send_request()
860 int r
= store
->get_raw_obj_ref(obj
, &ref
);
862 lderr(store
->ctx()) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
866 set_status() << "sending request";
868 cn
= stack
->create_completion_notifier();
869 return ref
.ioctx
.aio_notify(ref
.oid
, cn
->completion(), request
,
870 timeout_ms
, response
);
873 int RGWRadosNotifyCR::request_complete()
875 int r
= cn
->completion()->get_return_value();
877 set_status() << "request complete; ret=" << r
;