1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "include/compat.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_sync_counters.h"
10 #include "rgw_bucket.h"
12 #include "services/svc_zone.h"
13 #include "services/svc_zone_utils.h"
14 #include "services/svc_sys_obj.h"
15 #include "services/svc_cls.h"
17 #include "cls/lock/cls_lock_client.h"
18 #include "cls/rgw/cls_rgw_client.h"
20 #include <boost/asio/yield.hpp>
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
25 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest
*req
) {
26 if (processor
->is_going_down()) {
30 processor
->m_req_queue
.push_back(req
);
31 dout(20) << "enqueued request req=" << hex
<< req
<< dec
<< dendl
;
36 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
37 return processor
->m_req_queue
.empty();
40 RGWAsyncRadosRequest
*RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
41 if (processor
->m_req_queue
.empty())
43 RGWAsyncRadosRequest
*req
= processor
->m_req_queue
.front();
44 processor
->m_req_queue
.pop_front();
45 dout(20) << "dequeued request req=" << hex
<< req
<< dec
<< dendl
;
50 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) {
51 processor
->handle_request(this, req
);
52 processor
->req_throttle
.put(1);
55 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
56 if (!g_conf()->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
59 deque
<RGWAsyncRadosRequest
*>::iterator iter
;
60 if (processor
->m_req_queue
.empty()) {
61 dout(20) << "RGWWQ: empty" << dendl
;
64 dout(20) << "RGWWQ:" << dendl
;
65 for (iter
= processor
->m_req_queue
.begin(); iter
!= processor
->m_req_queue
.end(); ++iter
) {
66 dout(20) << "req: " << hex
<< *iter
<< dec
<< dendl
;
70 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
)
71 : cct(_cct
), m_tp(cct
, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads
),
72 req_throttle(_cct
, "rgw_async_rados_ops", num_threads
* 2),
74 ceph::make_timespan(g_conf()->rgw_op_thread_timeout
),
75 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout
),
79 void RGWAsyncRadosProcessor::start() {
83 void RGWAsyncRadosProcessor::stop() {
87 for (auto iter
= m_req_queue
.begin(); iter
!= m_req_queue
.end(); ++iter
) {
92 void RGWAsyncRadosProcessor::handle_request(const DoutPrefixProvider
*dpp
, RGWAsyncRadosRequest
*req
) {
93 req
->send_request(dpp
);
97 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest
*req
) {
102 int RGWAsyncGetSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
104 map
<string
, bufferlist
> *pattrs
= want_attrs
? &attrs
: nullptr;
106 auto sysobj
= obj_ctx
.get_obj(obj
);
108 .set_objv_tracker(&objv_tracker
)
110 .set_raw_attrs(raw_attrs
)
111 .read(dpp
, &bl
, null_yield
);
114 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
115 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
116 bool want_attrs
, bool raw_attrs
)
117 : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), obj_ctx(_svc
),
118 obj(_obj
), want_attrs(want_attrs
), raw_attrs(raw_attrs
)
121 objv_tracker
= *_objv_tracker
;
125 int RGWSimpleRadosReadAttrsCR::send_request(const DoutPrefixProvider
*dpp
)
127 req
= new RGWAsyncGetSystemObj(dpp
, this, stack
->create_completion_notifier(),
128 svc
, objv_tracker
, obj
, true, raw_attrs
);
129 async_rados
->queue(req
);
133 int RGWSimpleRadosReadAttrsCR::request_complete()
136 *pattrs
= std::move(req
->attrs
);
139 *objv_tracker
= req
->objv_tracker
;
141 return req
->get_ret_status();
144 int RGWAsyncPutSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
146 auto obj_ctx
= svc
->init_obj_ctx();
147 auto sysobj
= obj_ctx
.get_obj(obj
);
149 .set_objv_tracker(&objv_tracker
)
150 .set_exclusive(exclusive
)
151 .write_data(dpp
, bl
, null_yield
);
154 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(const DoutPrefixProvider
*_dpp
,
155 RGWCoroutine
*caller
,
156 RGWAioCompletionNotifier
*cn
,
158 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
159 bool _exclusive
, bufferlist _bl
)
160 : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), svc(_svc
),
161 obj(_obj
), exclusive(_exclusive
), bl(std::move(_bl
))
164 objv_tracker
= *_objv_tracker
;
168 int RGWAsyncPutSystemObjAttrs::_send_request(const DoutPrefixProvider
*dpp
)
170 auto obj_ctx
= svc
->init_obj_ctx();
171 auto sysobj
= obj_ctx
.get_obj(obj
);
173 .set_objv_tracker(&objv_tracker
)
174 .set_exclusive(false)
176 .write_attrs(dpp
, null_yield
);
179 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
181 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
182 map
<string
, bufferlist
> _attrs
)
183 : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), svc(_svc
),
184 obj(_obj
), attrs(std::move(_attrs
))
187 objv_tracker
= *_objv_tracker
;
192 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
, const rgw_raw_obj
& _obj
,
193 uint64_t _window_size
)
194 : RGWConsumerCR
<string
>(_store
->ctx()), async_rados(_async_rados
),
195 store(_store
), obj(_obj
), going_down(false), num_pending_entries(0), window_size(_window_size
), total_entries(0)
199 int RGWAsyncLockSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
202 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
204 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
208 rados::cls::lock::Lock
l(lock_name
);
209 utime_t
duration(duration_secs
, 0);
210 l
.set_duration(duration
);
211 l
.set_cookie(cookie
);
212 l
.set_may_renew(true);
214 return l
.lock_exclusive(&ref
.pool
.ioctx(), ref
.obj
.oid
);
217 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
218 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
219 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
223 duration_secs(_duration_secs
)
227 int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
230 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
232 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
236 rados::cls::lock::Lock
l(lock_name
);
238 l
.set_cookie(cookie
);
240 return l
.unlock(&ref
.pool
.ioctx(), ref
.obj
.oid
);
243 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RGWRadosStore
*_store
,
244 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
245 const string
& _name
, const string
& _cookie
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
247 lock_name(_name
), cookie(_cookie
)
251 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
252 const rgw_raw_obj
& _obj
,
253 map
<string
, bufferlist
>& _entries
) : RGWSimpleCoroutine(_store
->ctx()),
258 stringstream
& s
= set_description();
259 s
<< "set omap keys dest=" << obj
<< " keys=[" << s
.str() << "]";
260 for (auto i
= entries
.begin(); i
!= entries
.end(); ++i
) {
261 if (i
!= entries
.begin()) {
269 int RGWRadosSetOmapKeysCR::send_request(const DoutPrefixProvider
*dpp
)
271 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
273 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
277 set_status() << "sending request";
279 librados::ObjectWriteOperation op
;
280 op
.omap_set(entries
);
282 cn
= stack
->create_completion_notifier();
283 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
286 int RGWRadosSetOmapKeysCR::request_complete()
288 int r
= cn
->completion()->get_return_value();
290 set_status() << "request complete; ret=" << r
;
295 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
296 const rgw_raw_obj
& _obj
,
297 const string
& _marker
,
300 : RGWSimpleCoroutine(_store
->ctx()), store(_store
), obj(_obj
),
301 marker(_marker
), max_entries(_max_entries
),
302 result(std::move(_result
))
304 ceph_assert(result
); // must be allocated
305 set_description() << "get omap keys dest=" << obj
<< " marker=" << marker
;
308 int RGWRadosGetOmapKeysCR::send_request(const DoutPrefixProvider
*dpp
) {
309 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &result
->ref
);
311 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
315 set_status() << "send request";
317 librados::ObjectReadOperation op
;
318 op
.omap_get_keys2(marker
, max_entries
, &result
->entries
, &result
->more
, nullptr);
320 cn
= stack
->create_completion_notifier(result
);
321 return result
->ref
.pool
.ioctx().aio_operate(result
->ref
.obj
.oid
, cn
->completion(), &op
, NULL
);
324 int RGWRadosGetOmapKeysCR::request_complete()
326 int r
= cn
->completion()->get_return_value();
328 set_status() << "request complete; ret=" << r
;
333 RGWRadosGetOmapValsCR::RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore
*_store
,
334 const rgw_raw_obj
& _obj
,
335 const string
& _marker
,
338 : RGWSimpleCoroutine(_store
->ctx()), store(_store
), obj(_obj
),
339 marker(_marker
), max_entries(_max_entries
),
340 result(std::move(_result
))
342 ceph_assert(result
); // must be allocated
343 set_description() << "get omap keys dest=" << obj
<< " marker=" << marker
;
346 int RGWRadosGetOmapValsCR::send_request(const DoutPrefixProvider
*dpp
) {
347 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &result
->ref
);
349 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
353 set_status() << "send request";
355 librados::ObjectReadOperation op
;
356 op
.omap_get_vals2(marker
, max_entries
, &result
->entries
, &result
->more
, nullptr);
358 cn
= stack
->create_completion_notifier(result
);
359 return result
->ref
.pool
.ioctx().aio_operate(result
->ref
.obj
.oid
, cn
->completion(), &op
, NULL
);
362 int RGWRadosGetOmapValsCR::request_complete()
364 int r
= cn
->completion()->get_return_value();
366 set_status() << "request complete; ret=" << r
;
371 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore
*_store
,
372 const rgw_raw_obj
& _obj
,
373 const set
<string
>& _keys
) : RGWSimpleCoroutine(_store
->ctx()),
378 set_description() << "remove omap keys dest=" << obj
<< " keys=" << keys
;
381 int RGWRadosRemoveOmapKeysCR::send_request(const DoutPrefixProvider
*dpp
) {
382 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
384 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
388 set_status() << "send request";
390 librados::ObjectWriteOperation op
;
391 op
.omap_rm_keys(keys
);
393 cn
= stack
->create_completion_notifier();
394 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
397 int RGWRadosRemoveOmapKeysCR::request_complete()
399 int r
= cn
->completion()->get_return_value();
401 set_status() << "request complete; ret=" << r
;
406 RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
407 RGWObjVersionTracker
* objv_tracker
)
408 : RGWSimpleCoroutine(store
->ctx()),
409 store(store
), obj(obj
), objv_tracker(objv_tracker
)
411 set_description() << "remove dest=" << obj
;
414 int RGWRadosRemoveCR::send_request(const DoutPrefixProvider
*dpp
)
416 auto rados
= store
->getRados()->get_rados_handle();
417 int r
= rados
->ioctx_create(obj
.pool
.name
.c_str(), ioctx
);
419 lderr(cct
) << "ERROR: failed to open pool (" << obj
.pool
.name
<< ") ret=" << r
<< dendl
;
422 ioctx
.locator_set_key(obj
.loc
);
424 set_status() << "send request";
426 librados::ObjectWriteOperation op
;
428 objv_tracker
->prepare_op_for_write(&op
);
432 cn
= stack
->create_completion_notifier();
433 return ioctx
.aio_operate(obj
.oid
, cn
->completion(), &op
);
436 int RGWRadosRemoveCR::request_complete()
438 int r
= cn
->completion()->get_return_value();
440 set_status() << "request complete; ret=" << r
;
445 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
446 const rgw_raw_obj
& _obj
,
447 const string
& _lock_name
,
448 const string
& _cookie
,
449 uint32_t _duration
) : RGWSimpleCoroutine(_store
->ctx()),
450 async_rados(_async_rados
),
452 lock_name(_lock_name
),
458 set_description() << "rados lock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
<< " duration=" << duration
;
461 void RGWSimpleRadosLockCR::request_cleanup()
469 int RGWSimpleRadosLockCR::send_request(const DoutPrefixProvider
*dpp
)
471 set_status() << "sending request";
472 req
= new RGWAsyncLockSystemObj(this, stack
->create_completion_notifier(),
473 store
, NULL
, obj
, lock_name
, cookie
, duration
);
474 async_rados
->queue(req
);
478 int RGWSimpleRadosLockCR::request_complete()
480 set_status() << "request complete; ret=" << req
->get_ret_status();
481 return req
->get_ret_status();
484 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RGWRadosStore
*_store
,
485 const rgw_raw_obj
& _obj
,
486 const string
& _lock_name
,
487 const string
& _cookie
) : RGWSimpleCoroutine(_store
->ctx()),
488 async_rados(_async_rados
),
490 lock_name(_lock_name
),
495 set_description() << "rados unlock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
;
498 void RGWSimpleRadosUnlockCR::request_cleanup()
506 int RGWSimpleRadosUnlockCR::send_request(const DoutPrefixProvider
*dpp
)
508 set_status() << "sending request";
510 req
= new RGWAsyncUnlockSystemObj(this, stack
->create_completion_notifier(),
511 store
, NULL
, obj
, lock_name
, cookie
);
512 async_rados
->queue(req
);
516 int RGWSimpleRadosUnlockCR::request_complete()
518 set_status() << "request complete; ret=" << req
->get_ret_status();
519 return req
->get_ret_status();
522 int RGWOmapAppend::operate(const DoutPrefixProvider
*dpp
) {
525 if (!has_product() && going_down
) {
526 set_status() << "going down";
529 set_status() << "waiting for product";
530 yield
wait_for_product();
533 while (consume(&entry
)) {
534 set_status() << "adding entry: " << entry
;
535 entries
[entry
] = bufferlist();
536 if (entries
.size() >= window_size
) {
540 if (entries
.size() >= window_size
|| going_down
) {
541 set_status() << "flushing to omap";
542 call(new RGWRadosSetOmapKeysCR(store
, obj
, entries
));
546 if (get_ret_status() < 0) {
547 ldout(cct
, 0) << "ERROR: failed to store entries in omap" << dendl
;
548 return set_state(RGWCoroutine_Error
);
551 /* done with coroutine */
552 return set_state(RGWCoroutine_Done
);
557 void RGWOmapAppend::flush_pending() {
558 receive(pending_entries
);
559 num_pending_entries
= 0;
562 bool RGWOmapAppend::append(const string
& s
) {
567 pending_entries
.push_back(s
);
568 if (++num_pending_entries
>= (int)window_size
) {
574 bool RGWOmapAppend::finish() {
581 int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider
*dpp
)
584 if (!bucket
.bucket_id
.empty()) {
585 RGWSysObjectCtx obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
586 r
= store
->getRados()->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, nullptr, &attrs
, null_yield
, dpp
);
588 r
= store
->ctl()->bucket
->read_bucket_info(bucket
, &bucket_info
, null_yield
, dpp
,
589 RGWBucketCtl::BucketInstance::GetParams().set_attrs(&attrs
));
592 ldpp_dout(dpp
, 0) << "ERROR: failed to get bucket instance info for "
600 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(const DoutPrefixProvider
*dpp
,
601 rgw::sal::RGWRadosStore
*store
,
602 const RGWBucketInfo
& bucket_info
,
604 const std::string
& start_marker
,
605 const std::string
& end_marker
)
606 : RGWSimpleCoroutine(store
->ctx()), bs(store
->getRados()),
607 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker
)),
608 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker
))
610 bs
.init(dpp
, bucket_info
, bucket_info
.layout
.current_index
, shard_id
);
613 int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider
*dpp
)
616 cls_rgw_bi_log_trim_op call
;
617 call
.start_marker
= std::move(start_marker
);
618 call
.end_marker
= std::move(end_marker
);
621 librados::ObjectWriteOperation op
;
622 op
.exec(RGW_CLASS
, RGW_BI_LOG_TRIM
, in
);
624 cn
= stack
->create_completion_notifier();
625 return bs
.bucket_obj
.aio_operate(cn
->completion(), &op
);
628 int RGWRadosBILogTrimCR::request_complete()
630 int r
= cn
->completion()->get_return_value();
631 set_status() << "request complete; ret=" << r
;
635 int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider
*dpp
)
637 RGWObjectCtx
obj_ctx(store
);
640 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->getRados()->instance_id());
641 rgw::sal::RGWAttrs attrs
;
643 rgw::sal::RGWRadosBucket
bucket(store
, src_bucket
);
644 rgw::sal::RGWRadosObject
src_obj(store
, key
, &bucket
);
645 rgw::sal::RGWRadosBucket
dest_bucket(store
, dest_bucket_info
);
646 rgw::sal::RGWRadosObject
dest_obj(store
, dest_key
.value_or(key
), &dest_bucket
);
648 std::optional
<uint64_t> bytes_transferred
;
649 int r
= store
->getRados()->fetch_remote_obj(obj_ctx
,
650 user_id
.value_or(rgw_user()),
655 &dest_bucket
, /* dest */
656 nullptr, /* source */
658 NULL
, /* real_time* src_mtime, */
659 NULL
, /* real_time* mtime, */
660 NULL
, /* const real_time* mod_ptr, */
661 NULL
, /* const real_time* unmod_ptr, */
662 false, /* high precision time */
663 NULL
, /* const char *if_match, */
664 NULL
, /* const char *if_nomatch, */
665 RGWRados::ATTRSMOD_NONE
,
668 RGWObjCategory::Main
,
670 real_time(), /* delete_at */
671 NULL
, /* string *ptag, */
672 NULL
, /* string *petag, */
673 NULL
, /* void (*progress_cb)(off_t, void *), */
674 NULL
, /* void *progress_data*); */
681 ldpp_dout(dpp
, 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
683 counters
->inc(sync_counters::l_fetch_err
, 1);
685 } else if (counters
) {
686 if (bytes_transferred
) {
687 counters
->inc(sync_counters::l_fetch
, *bytes_transferred
);
689 counters
->inc(sync_counters::l_fetch_not_modified
);
695 int RGWAsyncStatRemoteObj::_send_request(const DoutPrefixProvider
*dpp
)
697 RGWObjectCtx
obj_ctx(store
);
701 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->getRados()->instance_id());
703 rgw::sal::RGWRadosBucket
bucket(store
, src_bucket
);
704 rgw::sal::RGWRadosObject
src_obj(store
, key
, &bucket
);
706 int r
= store
->getRados()->stat_remote_obj(dpp
,
709 nullptr, /* req_info */
712 nullptr, /* source */
713 pmtime
, /* real_time* src_mtime, */
714 psize
, /* uint64_t * */
715 nullptr, /* const real_time* mod_ptr, */
716 nullptr, /* const real_time* unmod_ptr, */
717 true, /* high precision time */
718 nullptr, /* const char *if_match, */
719 nullptr, /* const char *if_nomatch, */
723 nullptr, /* string *ptag, */
724 petag
); /* string *petag, */
727 ldpp_dout(dpp
, 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
733 int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider
*dpp
)
735 RGWObjectCtx
obj_ctx(store
);
737 rgw_obj
obj(bucket_info
.bucket
, key
);
739 ldpp_dout(dpp
, 0) << __func__
<< "(): deleting obj=" << obj
<< dendl
;
741 obj_ctx
.set_atomic(obj
);
745 int ret
= store
->getRados()->get_obj_state(dpp
, &obj_ctx
, bucket_info
, obj
, &state
, null_yield
);
747 ldpp_dout(dpp
, 20) << __func__
<< "(): get_obj_state() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
751 /* has there been any racing object write? */
752 if (del_if_older
&& (state
->mtime
> timestamp
)) {
753 ldpp_dout(dpp
, 20) << __func__
<< "(): skipping object removal obj=" << obj
<< " (obj mtime=" << state
->mtime
<< ", request timestamp=" << timestamp
<< ")" << dendl
;
757 RGWAccessControlPolicy policy
;
760 map
<string
, bufferlist
>::iterator iter
= state
->attrset
.find(RGW_ATTR_ACL
);
761 if (iter
!= state
->attrset
.end()) {
762 auto bliter
= iter
->second
.cbegin();
764 policy
.decode(bliter
);
765 } catch (buffer::error
& err
) {
766 ldpp_dout(dpp
, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
771 RGWRados::Object
del_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
772 RGWRados::Object::Delete
del_op(&del_target
);
774 del_op
.params
.bucket_owner
= bucket_info
.owner
;
775 del_op
.params
.obj_owner
= policy
.get_owner();
777 del_op
.params
.unmod_since
= timestamp
;
780 del_op
.params
.versioning_status
= BUCKET_VERSIONED
;
782 del_op
.params
.olh_epoch
= versioned_epoch
;
783 del_op
.params
.marker_version_id
= marker_version_id
;
784 del_op
.params
.obj_owner
.set_id(rgw_user(owner
));
785 del_op
.params
.obj_owner
.set_name(owner_display_name
);
786 del_op
.params
.mtime
= timestamp
;
787 del_op
.params
.high_precision_time
= true;
788 del_op
.params
.zones_trace
= &zones_trace
;
790 ret
= del_op
.delete_obj(null_yield
, dpp
);
792 ldpp_dout(dpp
, 20) << __func__
<< "(): delete_obj() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
797 int RGWContinuousLeaseCR::operate(const DoutPrefixProvider
*dpp
)
800 caller
->set_sleeping(false);
801 return set_cr_done();
804 while (!going_down
) {
805 yield
call(new RGWSimpleRadosLockCR(async_rados
, store
, obj
, lock_name
, cookie
, interval
));
807 caller
->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
810 ldout(store
->ctx(), 20) << *this << ": couldn't lock " << obj
<< ":" << lock_name
<< ": retcode=" << retcode
<< dendl
;
811 return set_state(RGWCoroutine_Error
, retcode
);
814 yield
wait(utime_t(interval
/ 2, 0));
816 set_locked(false); /* moot at this point anyway */
817 yield
call(new RGWSimpleRadosUnlockCR(async_rados
, store
, obj
, lock_name
, cookie
));
818 return set_state(RGWCoroutine_Done
);
823 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(const DoutPrefixProvider
*_dpp
, rgw::sal::RGWRadosStore
*_store
, const string
& _oid
,
824 const cls_log_entry
& entry
) : RGWSimpleCoroutine(_store
->ctx()),
829 stringstream
& s
= set_description();
830 s
<< "timelog add entry oid=" << oid
<< "entry={id=" << entry
.id
<< ", section=" << entry
.section
<< ", name=" << entry
.name
<< "}";
831 entries
.push_back(entry
);
834 int RGWRadosTimelogAddCR::send_request(const DoutPrefixProvider
*dpp
)
836 set_status() << "sending request";
838 cn
= stack
->create_completion_notifier();
839 return store
->svc()->cls
->timelog
.add(dpp
, oid
, entries
, cn
->completion(), true, null_yield
);
842 int RGWRadosTimelogAddCR::request_complete()
844 int r
= cn
->completion()->get_return_value();
846 set_status() << "request complete; ret=" << r
;
851 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(const DoutPrefixProvider
*dpp
,
852 rgw::sal::RGWRadosStore
*store
,
853 const std::string
& oid
,
854 const real_time
& start_time
,
855 const real_time
& end_time
,
856 const std::string
& from_marker
,
857 const std::string
& to_marker
)
858 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
), oid(oid
),
859 start_time(start_time
), end_time(end_time
),
860 from_marker(from_marker
), to_marker(to_marker
)
862 set_description() << "timelog trim oid=" << oid
863 << " start_time=" << start_time
<< " end_time=" << end_time
864 << " from_marker=" << from_marker
<< " to_marker=" << to_marker
;
867 int RGWRadosTimelogTrimCR::send_request(const DoutPrefixProvider
*dpp
)
869 set_status() << "sending request";
871 cn
= stack
->create_completion_notifier();
872 return store
->svc()->cls
->timelog
.trim(dpp
, oid
, start_time
, end_time
, from_marker
,
873 to_marker
, cn
->completion(),
877 int RGWRadosTimelogTrimCR::request_complete()
879 int r
= cn
->completion()->get_return_value();
881 set_status() << "request complete; ret=" << r
;
887 RGWSyncLogTrimCR::RGWSyncLogTrimCR(const DoutPrefixProvider
*dpp
,
888 rgw::sal::RGWRadosStore
*store
, const std::string
& oid
,
889 const std::string
& to_marker
,
890 std::string
*last_trim_marker
)
891 : RGWRadosTimelogTrimCR(dpp
, store
, oid
, real_time
{}, real_time
{},
892 std::string
{}, to_marker
),
893 cct(store
->ctx()), last_trim_marker(last_trim_marker
)
897 int RGWSyncLogTrimCR::request_complete()
899 int r
= RGWRadosTimelogTrimCR::request_complete();
903 // nothing left to trim, update last_trim_marker
904 if (*last_trim_marker
< to_marker
&& to_marker
!= max_marker
) {
905 *last_trim_marker
= to_marker
;
911 int RGWAsyncStatObj::_send_request(const DoutPrefixProvider
*dpp
)
914 store
->getRados()->obj_to_raw(bucket_info
.placement_rule
, obj
, &raw_obj
);
915 return store
->getRados()->raw_obj_stat(dpp
, raw_obj
, psize
, pmtime
, pepoch
,
916 nullptr, nullptr, objv_tracker
, null_yield
);
919 RGWStatObjCR::RGWStatObjCR(const DoutPrefixProvider
*dpp
,
920 RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RGWRadosStore
*store
,
921 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
,
922 real_time
* pmtime
, uint64_t *pepoch
,
923 RGWObjVersionTracker
*objv_tracker
)
924 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
), async_rados(async_rados
),
925 bucket_info(_bucket_info
), obj(obj
), psize(psize
), pmtime(pmtime
), pepoch(pepoch
),
926 objv_tracker(objv_tracker
)
930 void RGWStatObjCR::request_cleanup()
938 int RGWStatObjCR::send_request(const DoutPrefixProvider
*dpp
)
940 req
= new RGWAsyncStatObj(dpp
, this, stack
->create_completion_notifier(),
941 store
, bucket_info
, obj
, psize
, pmtime
, pepoch
, objv_tracker
);
942 async_rados
->queue(req
);
946 int RGWStatObjCR::request_complete()
948 return req
->get_ret_status();
951 RGWRadosNotifyCR::RGWRadosNotifyCR(rgw::sal::RGWRadosStore
*store
, const rgw_raw_obj
& obj
,
952 bufferlist
& request
, uint64_t timeout_ms
,
953 bufferlist
*response
)
954 : RGWSimpleCoroutine(store
->ctx()), store(store
), obj(obj
),
955 request(request
), timeout_ms(timeout_ms
), response(response
)
957 set_description() << "notify dest=" << obj
;
960 int RGWRadosNotifyCR::send_request(const DoutPrefixProvider
*dpp
)
962 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
964 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
968 set_status() << "sending request";
970 cn
= stack
->create_completion_notifier();
971 return ref
.pool
.ioctx().aio_notify(ref
.obj
.oid
, cn
->completion(), request
,
972 timeout_ms
, response
);
975 int RGWRadosNotifyCR::request_complete()
977 int r
= cn
->completion()->get_return_value();
979 set_status() << "request complete; ret=" << r
;