1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "include/compat.h"
7 #include "rgw_coroutine.h"
8 #include "rgw_cr_rados.h"
9 #include "rgw_sync_counters.h"
10 #include "rgw_bucket.h"
11 #include "rgw_datalog_notify.h"
12 #include "rgw_cr_rest.h"
13 #include "rgw_rest_conn.h"
14 #include "rgw_rados.h"
16 #include "services/svc_zone.h"
17 #include "services/svc_zone_utils.h"
18 #include "services/svc_sys_obj.h"
19 #include "services/svc_cls.h"
21 #include "cls/lock/cls_lock_client.h"
22 #include "cls/rgw/cls_rgw_client.h"
24 #include <boost/asio/yield.hpp>
25 #include <boost/container/flat_set.hpp>
27 #define dout_context g_ceph_context
28 #define dout_subsys ceph_subsys_rgw
32 bool RGWAsyncRadosProcessor::RGWWQ::_enqueue(RGWAsyncRadosRequest
*req
) {
33 if (processor
->is_going_down()) {
37 processor
->m_req_queue
.push_back(req
);
38 dout(20) << "enqueued request req=" << hex
<< req
<< dec
<< dendl
;
43 bool RGWAsyncRadosProcessor::RGWWQ::_empty() {
44 return processor
->m_req_queue
.empty();
47 RGWAsyncRadosRequest
*RGWAsyncRadosProcessor::RGWWQ::_dequeue() {
48 if (processor
->m_req_queue
.empty())
50 RGWAsyncRadosRequest
*req
= processor
->m_req_queue
.front();
51 processor
->m_req_queue
.pop_front();
52 dout(20) << "dequeued request req=" << hex
<< req
<< dec
<< dendl
;
57 void RGWAsyncRadosProcessor::RGWWQ::_process(RGWAsyncRadosRequest
*req
, ThreadPool::TPHandle
& handle
) {
58 processor
->handle_request(this, req
);
59 processor
->req_throttle
.put(1);
62 void RGWAsyncRadosProcessor::RGWWQ::_dump_queue() {
63 if (!g_conf()->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
66 deque
<RGWAsyncRadosRequest
*>::iterator iter
;
67 if (processor
->m_req_queue
.empty()) {
68 dout(20) << "RGWWQ: empty" << dendl
;
71 dout(20) << "RGWWQ:" << dendl
;
72 for (iter
= processor
->m_req_queue
.begin(); iter
!= processor
->m_req_queue
.end(); ++iter
) {
73 dout(20) << "req: " << hex
<< *iter
<< dec
<< dendl
;
77 RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext
*_cct
, int num_threads
)
78 : cct(_cct
), m_tp(cct
, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads
),
79 req_throttle(_cct
, "rgw_async_rados_ops", num_threads
* 2),
81 ceph::make_timespan(g_conf()->rgw_op_thread_timeout
),
82 ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout
),
86 void RGWAsyncRadosProcessor::start() {
90 void RGWAsyncRadosProcessor::stop() {
94 for (auto iter
= m_req_queue
.begin(); iter
!= m_req_queue
.end(); ++iter
) {
99 void RGWAsyncRadosProcessor::handle_request(const DoutPrefixProvider
*dpp
, RGWAsyncRadosRequest
*req
) {
100 req
->send_request(dpp
);
104 void RGWAsyncRadosProcessor::queue(RGWAsyncRadosRequest
*req
) {
109 int RGWAsyncGetSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
111 map
<string
, bufferlist
> *pattrs
= want_attrs
? &attrs
: nullptr;
113 auto sysobj
= svc_sysobj
->get_obj(obj
);
115 .set_objv_tracker(&objv_tracker
)
117 .set_raw_attrs(raw_attrs
)
118 .read(dpp
, &bl
, null_yield
);
121 RGWAsyncGetSystemObj::RGWAsyncGetSystemObj(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, RGWSI_SysObj
*_svc
,
122 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
123 bool want_attrs
, bool raw_attrs
)
124 : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), svc_sysobj(_svc
),
125 obj(_obj
), want_attrs(want_attrs
), raw_attrs(raw_attrs
)
128 objv_tracker
= *_objv_tracker
;
132 int RGWSimpleRadosReadAttrsCR::send_request(const DoutPrefixProvider
*dpp
)
134 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
136 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret="
141 set_status() << "sending request";
143 librados::ObjectReadOperation op
;
145 objv_tracker
->prepare_op_for_read(&op
);
148 if (raw_attrs
&& pattrs
) {
149 op
.getxattrs(pattrs
, nullptr);
151 op
.getxattrs(&unfiltered_attrs
, nullptr);
154 cn
= stack
->create_completion_notifier();
155 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
,
159 int RGWSimpleRadosReadAttrsCR::request_complete()
161 int ret
= cn
->completion()->get_return_value();
162 set_status() << "request complete; ret=" << ret
;
163 if (!raw_attrs
&& pattrs
) {
164 rgw_filter_attrset(unfiltered_attrs
, RGW_ATTR_PREFIX
, pattrs
);
169 int RGWAsyncPutSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
171 auto sysobj
= svc
->get_obj(obj
);
173 .set_objv_tracker(&objv_tracker
)
174 .set_exclusive(exclusive
)
175 .write_data(dpp
, bl
, null_yield
);
178 RGWAsyncPutSystemObj::RGWAsyncPutSystemObj(const DoutPrefixProvider
*_dpp
,
179 RGWCoroutine
*caller
,
180 RGWAioCompletionNotifier
*cn
,
182 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
183 bool _exclusive
, bufferlist _bl
)
184 : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), svc(_svc
),
185 obj(_obj
), exclusive(_exclusive
), bl(std::move(_bl
))
188 objv_tracker
= *_objv_tracker
;
192 int RGWAsyncPutSystemObjAttrs::_send_request(const DoutPrefixProvider
*dpp
)
194 auto sysobj
= svc
->get_obj(obj
);
196 .set_objv_tracker(&objv_tracker
)
197 .set_exclusive(exclusive
)
199 .write_attrs(dpp
, null_yield
);
202 RGWAsyncPutSystemObjAttrs::RGWAsyncPutSystemObjAttrs(const DoutPrefixProvider
*_dpp
, RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
,
204 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
205 map
<string
, bufferlist
> _attrs
, bool exclusive
)
206 : RGWAsyncRadosRequest(caller
, cn
), dpp(_dpp
), svc(_svc
),
207 obj(_obj
), attrs(std::move(_attrs
)), exclusive(exclusive
)
210 objv_tracker
= *_objv_tracker
;
215 RGWOmapAppend::RGWOmapAppend(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
, const rgw_raw_obj
& _obj
,
216 uint64_t _window_size
)
217 : RGWConsumerCR
<string
>(_store
->ctx()), async_rados(_async_rados
),
218 store(_store
), obj(_obj
), going_down(false), num_pending_entries(0), window_size(_window_size
), total_entries(0)
222 int RGWAsyncLockSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
225 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
227 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
231 rados::cls::lock::Lock
l(lock_name
);
232 utime_t
duration(duration_secs
, 0);
233 l
.set_duration(duration
);
234 l
.set_cookie(cookie
);
235 l
.set_may_renew(true);
237 return l
.lock_exclusive(&ref
.pool
.ioctx(), ref
.obj
.oid
);
240 RGWAsyncLockSystemObj::RGWAsyncLockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
241 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
242 const string
& _name
, const string
& _cookie
, uint32_t _duration_secs
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
246 duration_secs(_duration_secs
)
250 int RGWAsyncUnlockSystemObj::_send_request(const DoutPrefixProvider
*dpp
)
253 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
255 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
259 rados::cls::lock::Lock
l(lock_name
);
261 l
.set_cookie(cookie
);
263 return l
.unlock(&ref
.pool
.ioctx(), ref
.obj
.oid
);
266 RGWAsyncUnlockSystemObj::RGWAsyncUnlockSystemObj(RGWCoroutine
*caller
, RGWAioCompletionNotifier
*cn
, rgw::sal::RadosStore
* _store
,
267 RGWObjVersionTracker
*_objv_tracker
, const rgw_raw_obj
& _obj
,
268 const string
& _name
, const string
& _cookie
) : RGWAsyncRadosRequest(caller
, cn
), store(_store
),
270 lock_name(_name
), cookie(_cookie
)
274 RGWRadosSetOmapKeysCR::RGWRadosSetOmapKeysCR(rgw::sal::RadosStore
* _store
,
275 const rgw_raw_obj
& _obj
,
276 map
<string
, bufferlist
>& _entries
) : RGWSimpleCoroutine(_store
->ctx()),
281 stringstream
& s
= set_description();
282 s
<< "set omap keys dest=" << obj
<< " keys=[" << s
.str() << "]";
283 for (auto i
= entries
.begin(); i
!= entries
.end(); ++i
) {
284 if (i
!= entries
.begin()) {
292 int RGWRadosSetOmapKeysCR::send_request(const DoutPrefixProvider
*dpp
)
294 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
296 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
300 set_status() << "sending request";
302 librados::ObjectWriteOperation op
;
303 op
.omap_set(entries
);
305 cn
= stack
->create_completion_notifier();
306 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
309 int RGWRadosSetOmapKeysCR::request_complete()
311 int r
= cn
->completion()->get_return_value();
313 set_status() << "request complete; ret=" << r
;
318 RGWRadosGetOmapKeysCR::RGWRadosGetOmapKeysCR(rgw::sal::RadosStore
* _store
,
319 const rgw_raw_obj
& _obj
,
320 const string
& _marker
,
323 : RGWSimpleCoroutine(_store
->ctx()), store(_store
), obj(_obj
),
324 marker(_marker
), max_entries(_max_entries
),
325 result(std::move(_result
))
327 ceph_assert(result
); // must be allocated
328 set_description() << "get omap keys dest=" << obj
<< " marker=" << marker
;
331 int RGWRadosGetOmapKeysCR::send_request(const DoutPrefixProvider
*dpp
) {
332 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &result
->ref
);
334 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
338 set_status() << "send request";
340 librados::ObjectReadOperation op
;
341 op
.omap_get_keys2(marker
, max_entries
, &result
->entries
, &result
->more
, nullptr);
343 cn
= stack
->create_completion_notifier(result
);
344 return result
->ref
.pool
.ioctx().aio_operate(result
->ref
.obj
.oid
, cn
->completion(), &op
, NULL
);
347 int RGWRadosGetOmapKeysCR::request_complete()
349 int r
= cn
->completion()->get_return_value();
351 set_status() << "request complete; ret=" << r
;
356 RGWRadosGetOmapValsCR::RGWRadosGetOmapValsCR(rgw::sal::RadosStore
* _store
,
357 const rgw_raw_obj
& _obj
,
358 const string
& _marker
,
361 : RGWSimpleCoroutine(_store
->ctx()), store(_store
), obj(_obj
),
362 marker(_marker
), max_entries(_max_entries
),
363 result(std::move(_result
))
365 ceph_assert(result
); // must be allocated
366 set_description() << "get omap keys dest=" << obj
<< " marker=" << marker
;
369 int RGWRadosGetOmapValsCR::send_request(const DoutPrefixProvider
*dpp
) {
370 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &result
->ref
);
372 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
376 set_status() << "send request";
378 librados::ObjectReadOperation op
;
379 op
.omap_get_vals2(marker
, max_entries
, &result
->entries
, &result
->more
, nullptr);
381 cn
= stack
->create_completion_notifier(result
);
382 return result
->ref
.pool
.ioctx().aio_operate(result
->ref
.obj
.oid
, cn
->completion(), &op
, NULL
);
385 int RGWRadosGetOmapValsCR::request_complete()
387 int r
= cn
->completion()->get_return_value();
389 set_status() << "request complete; ret=" << r
;
394 RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(rgw::sal::RadosStore
* _store
,
395 const rgw_raw_obj
& _obj
,
396 const set
<string
>& _keys
) : RGWSimpleCoroutine(_store
->ctx()),
401 set_description() << "remove omap keys dest=" << obj
<< " keys=" << keys
;
404 int RGWRadosRemoveOmapKeysCR::send_request(const DoutPrefixProvider
*dpp
) {
405 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
407 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
411 set_status() << "send request";
413 librados::ObjectWriteOperation op
;
414 op
.omap_rm_keys(keys
);
416 cn
= stack
->create_completion_notifier();
417 return ref
.pool
.ioctx().aio_operate(ref
.obj
.oid
, cn
->completion(), &op
);
420 int RGWRadosRemoveOmapKeysCR::request_complete()
422 int r
= cn
->completion()->get_return_value();
424 set_status() << "request complete; ret=" << r
;
429 RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RadosStore
* store
, const rgw_raw_obj
& obj
,
430 RGWObjVersionTracker
* objv_tracker
)
431 : RGWSimpleCoroutine(store
->ctx()),
432 store(store
), obj(obj
), objv_tracker(objv_tracker
)
434 set_description() << "remove dest=" << obj
;
437 int RGWRadosRemoveCR::send_request(const DoutPrefixProvider
*dpp
)
439 auto rados
= store
->getRados()->get_rados_handle();
440 int r
= rados
->ioctx_create(obj
.pool
.name
.c_str(), ioctx
);
442 lderr(cct
) << "ERROR: failed to open pool (" << obj
.pool
.name
<< ") ret=" << r
<< dendl
;
445 ioctx
.locator_set_key(obj
.loc
);
447 set_status() << "send request";
449 librados::ObjectWriteOperation op
;
451 objv_tracker
->prepare_op_for_write(&op
);
455 cn
= stack
->create_completion_notifier();
456 return ioctx
.aio_operate(obj
.oid
, cn
->completion(), &op
);
459 int RGWRadosRemoveCR::request_complete()
461 int r
= cn
->completion()->get_return_value();
463 set_status() << "request complete; ret=" << r
;
468 RGWRadosRemoveOidCR::RGWRadosRemoveOidCR(rgw::sal::RadosStore
* store
,
469 librados::IoCtx
&& ioctx
,
470 std::string_view oid
,
471 RGWObjVersionTracker
* objv_tracker
)
472 : RGWSimpleCoroutine(store
->ctx()), ioctx(std::move(ioctx
)),
473 oid(std::string(oid
)), objv_tracker(objv_tracker
)
475 set_description() << "remove dest=" << oid
;
478 RGWRadosRemoveOidCR::RGWRadosRemoveOidCR(rgw::sal::RadosStore
* store
,
479 RGWSI_RADOS::Obj
& obj
,
480 RGWObjVersionTracker
* objv_tracker
)
481 : RGWSimpleCoroutine(store
->ctx()),
482 ioctx(librados::IoCtx(obj
.get_ref().pool
.ioctx())),
483 oid(obj
.get_ref().obj
.oid
),
484 objv_tracker(objv_tracker
)
486 set_description() << "remove dest=" << oid
;
489 RGWRadosRemoveOidCR::RGWRadosRemoveOidCR(rgw::sal::RadosStore
* store
,
490 RGWSI_RADOS::Obj
&& obj
,
491 RGWObjVersionTracker
* objv_tracker
)
492 : RGWSimpleCoroutine(store
->ctx()),
493 ioctx(std::move(obj
.get_ref().pool
.ioctx())),
494 oid(std::move(obj
.get_ref().obj
.oid
)),
495 objv_tracker(objv_tracker
)
497 set_description() << "remove dest=" << oid
;
500 int RGWRadosRemoveOidCR::send_request(const DoutPrefixProvider
*dpp
)
502 librados::ObjectWriteOperation op
;
504 objv_tracker
->prepare_op_for_write(&op
);
508 cn
= stack
->create_completion_notifier();
509 return ioctx
.aio_operate(oid
, cn
->completion(), &op
);
512 int RGWRadosRemoveOidCR::request_complete()
514 int r
= cn
->completion()->get_return_value();
516 set_status() << "request complete; ret=" << r
;
521 RGWSimpleRadosLockCR::RGWSimpleRadosLockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
522 const rgw_raw_obj
& _obj
,
523 const string
& _lock_name
,
524 const string
& _cookie
,
525 uint32_t _duration
) : RGWSimpleCoroutine(_store
->ctx()),
526 async_rados(_async_rados
),
528 lock_name(_lock_name
),
534 set_description() << "rados lock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
<< " duration=" << duration
;
537 void RGWSimpleRadosLockCR::request_cleanup()
545 int RGWSimpleRadosLockCR::send_request(const DoutPrefixProvider
*dpp
)
547 set_status() << "sending request";
548 req
= new RGWAsyncLockSystemObj(this, stack
->create_completion_notifier(),
549 store
, NULL
, obj
, lock_name
, cookie
, duration
);
550 async_rados
->queue(req
);
554 int RGWSimpleRadosLockCR::request_complete()
556 set_status() << "request complete; ret=" << req
->get_ret_status();
557 return req
->get_ret_status();
560 RGWSimpleRadosUnlockCR::RGWSimpleRadosUnlockCR(RGWAsyncRadosProcessor
*_async_rados
, rgw::sal::RadosStore
* _store
,
561 const rgw_raw_obj
& _obj
,
562 const string
& _lock_name
,
563 const string
& _cookie
) : RGWSimpleCoroutine(_store
->ctx()),
564 async_rados(_async_rados
),
566 lock_name(_lock_name
),
571 set_description() << "rados unlock dest=" << obj
<< " lock=" << lock_name
<< " cookie=" << cookie
;
574 void RGWSimpleRadosUnlockCR::request_cleanup()
582 int RGWSimpleRadosUnlockCR::send_request(const DoutPrefixProvider
*dpp
)
584 set_status() << "sending request";
586 req
= new RGWAsyncUnlockSystemObj(this, stack
->create_completion_notifier(),
587 store
, NULL
, obj
, lock_name
, cookie
);
588 async_rados
->queue(req
);
592 int RGWSimpleRadosUnlockCR::request_complete()
594 set_status() << "request complete; ret=" << req
->get_ret_status();
595 return req
->get_ret_status();
598 int RGWOmapAppend::operate(const DoutPrefixProvider
*dpp
) {
601 if (!has_product() && going_down
) {
602 set_status() << "going down";
605 set_status() << "waiting for product";
606 yield
wait_for_product();
609 while (consume(&entry
)) {
610 set_status() << "adding entry: " << entry
;
611 entries
[entry
] = bufferlist();
612 if (entries
.size() >= window_size
) {
616 if (entries
.size() >= window_size
|| going_down
) {
617 set_status() << "flushing to omap";
618 call(new RGWRadosSetOmapKeysCR(store
, obj
, entries
));
622 if (get_ret_status() < 0) {
623 ldout(cct
, 0) << "ERROR: failed to store entries in omap" << dendl
;
624 return set_state(RGWCoroutine_Error
);
627 /* done with coroutine */
628 return set_state(RGWCoroutine_Done
);
633 void RGWOmapAppend::flush_pending() {
634 receive(pending_entries
);
635 num_pending_entries
= 0;
638 bool RGWOmapAppend::append(const string
& s
) {
643 pending_entries
.push_back(s
);
644 if (++num_pending_entries
>= (int)window_size
) {
650 bool RGWOmapAppend::finish() {
657 int RGWAsyncGetBucketInstanceInfo::_send_request(const DoutPrefixProvider
*dpp
)
660 if (!bucket
.bucket_id
.empty()) {
661 r
= store
->getRados()->get_bucket_instance_info(bucket
, bucket_info
, nullptr, &attrs
, null_yield
, dpp
);
663 r
= store
->ctl()->bucket
->read_bucket_info(bucket
, &bucket_info
, null_yield
, dpp
,
664 RGWBucketCtl::BucketInstance::GetParams().set_attrs(&attrs
));
667 ldpp_dout(dpp
, 0) << "ERROR: failed to get bucket instance info for "
675 int RGWAsyncPutBucketInstanceInfo::_send_request(const DoutPrefixProvider
*dpp
)
677 auto r
= store
->getRados()->put_bucket_instance_info(bucket_info
, exclusive
,
678 mtime
, attrs
, dpp
, null_yield
);
680 ldpp_dout(dpp
, 0) << "ERROR: failed to put bucket instance info for "
681 << bucket_info
.bucket
<< dendl
;
688 RGWRadosBILogTrimCR::RGWRadosBILogTrimCR(
689 const DoutPrefixProvider
*dpp
,
690 rgw::sal::RadosStore
* store
,
691 const RGWBucketInfo
& bucket_info
,
693 const rgw::bucket_index_layout_generation
& generation
,
694 const std::string
& start_marker
,
695 const std::string
& end_marker
)
696 : RGWSimpleCoroutine(store
->ctx()), bucket_info(bucket_info
),
697 shard_id(shard_id
), generation(generation
), bs(store
->getRados()),
698 start_marker(BucketIndexShardsManager::get_shard_marker(start_marker
)),
699 end_marker(BucketIndexShardsManager::get_shard_marker(end_marker
))
703 int RGWRadosBILogTrimCR::send_request(const DoutPrefixProvider
*dpp
)
705 int r
= bs
.init(dpp
, bucket_info
, generation
, shard_id
);
707 ldpp_dout(dpp
, -1) << "ERROR: bucket shard init failed ret=" << r
<< dendl
;
712 cls_rgw_bi_log_trim_op call
;
713 call
.start_marker
= std::move(start_marker
);
714 call
.end_marker
= std::move(end_marker
);
717 librados::ObjectWriteOperation op
;
718 op
.exec(RGW_CLASS
, RGW_BI_LOG_TRIM
, in
);
720 cn
= stack
->create_completion_notifier();
721 return bs
.bucket_obj
.aio_operate(cn
->completion(), &op
);
724 int RGWRadosBILogTrimCR::request_complete()
726 int r
= cn
->completion()->get_return_value();
727 set_status() << "request complete; ret=" << r
;
731 int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider
*dpp
)
733 RGWObjectCtx
obj_ctx(store
);
736 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->getRados()->instance_id());
737 rgw::sal::Attrs attrs
;
739 rgw_obj
src_obj(src_bucket
, key
);
741 rgw::sal::RadosBucket
dest_bucket(store
, dest_bucket_info
);
742 rgw::sal::RadosObject
dest_obj(store
, dest_key
.value_or(key
), &dest_bucket
);
746 std::optional
<uint64_t> bytes_transferred
;
747 int r
= store
->getRados()->fetch_remote_obj(obj_ctx
,
748 user_id
.value_or(rgw_user()),
753 dest_bucket_info
, /* dest */
754 nullptr, /* source */
756 nullptr, /* real_time* src_mtime, */
757 NULL
, /* real_time* mtime, */
758 NULL
, /* const real_time* mod_ptr, */
759 NULL
, /* const real_time* unmod_ptr, */
760 false, /* high precision time */
761 NULL
, /* const char *if_match, */
762 NULL
, /* const char *if_nomatch, */
763 RGWRados::ATTRSMOD_NONE
,
766 RGWObjCategory::Main
,
768 real_time(), /* delete_at */
769 NULL
, /* string *ptag, */
770 &etag
, /* string *petag, */
771 NULL
, /* void (*progress_cb)(off_t, void *), */
772 NULL
, /* void *progress_data*); */
780 ldpp_dout(dpp
, 0) << "store->fetch_remote_obj() returned r=" << r
<< dendl
;
782 counters
->inc(sync_counters::l_fetch_err
, 1);
786 if (bytes_transferred
) {
787 // send notification that object was succesfully synced
788 std::string user_id
= "rgw sync";
789 std::string req_id
= "0";
792 auto iter
= attrs
.find(RGW_ATTR_TAGS
);
793 if (iter
!= attrs
.end()) {
795 auto it
= iter
->second
.cbegin();
797 } catch (buffer::error
&err
) {
798 ldpp_dout(dpp
, 1) << "ERROR: " << __func__
<< ": caught buffer::error couldn't decode TagSet " << dendl
;
802 // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const
803 std::string
tenant(dest_bucket
.get_tenant());
805 std::unique_ptr
<rgw::sal::Notification
> notify
806 = store
->get_notification(dpp
, &dest_obj
, nullptr, rgw::notify::ObjectSyncedCreate
,
807 &dest_bucket
, user_id
,
811 auto notify_res
= static_cast<rgw::sal::RadosNotification
*>(notify
.get())->get_reservation();
812 int ret
= rgw::notify::publish_reserve(dpp
, rgw::notify::ObjectSyncedCreate
, notify_res
, &obj_tags
);
814 ldpp_dout(dpp
, 1) << "ERROR: reserving notification failed, with error: " << ret
<< dendl
;
815 // no need to return, the sync already happened
817 ret
= rgw::notify::publish_commit(&dest_obj
, dest_obj
.get_obj_size(), ceph::real_clock::now(), etag
, dest_obj
.get_instance(), rgw::notify::ObjectSyncedCreate
, notify_res
, dpp
);
819 ldpp_dout(dpp
, 1) << "ERROR: publishing notification failed, with error: " << ret
<< dendl
;
825 if (bytes_transferred
) {
826 counters
->inc(sync_counters::l_fetch
, *bytes_transferred
);
828 counters
->inc(sync_counters::l_fetch_not_modified
);
835 int RGWAsyncStatRemoteObj::_send_request(const DoutPrefixProvider
*dpp
)
837 RGWObjectCtx
obj_ctx(store
);
841 snprintf(buf
, sizeof(buf
), ".%lld", (long long)store
->getRados()->instance_id());
844 rgw_obj
src_obj(src_bucket
, key
);
846 int r
= store
->getRados()->stat_remote_obj(dpp
,
849 nullptr, /* req_info */
852 nullptr, /* source */
853 pmtime
, /* real_time* src_mtime, */
854 psize
, /* uint64_t * */
855 nullptr, /* const real_time* mod_ptr, */
856 nullptr, /* const real_time* unmod_ptr, */
857 true, /* high precision time */
858 nullptr, /* const char *if_match, */
859 nullptr, /* const char *if_nomatch, */
863 nullptr, /* string *ptag, */
864 petag
); /* string *petag, */
867 ldpp_dout(dpp
, 0) << "store->stat_remote_obj() returned r=" << r
<< dendl
;
873 int RGWAsyncRemoveObj::_send_request(const DoutPrefixProvider
*dpp
)
875 ldpp_dout(dpp
, 0) << __func__
<< "(): deleting obj=" << obj
<< dendl
;
881 int ret
= obj
->get_obj_state(dpp
, &state
, null_yield
);
883 ldpp_dout(dpp
, 20) << __func__
<< "(): get_obj_state() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
887 /* has there been any racing object write? */
888 if (del_if_older
&& (state
->mtime
> timestamp
)) {
889 ldpp_dout(dpp
, 20) << __func__
<< "(): skipping object removal obj=" << obj
<< " (obj mtime=" << state
->mtime
<< ", request timestamp=" << timestamp
<< ")" << dendl
;
893 RGWAccessControlPolicy policy
;
896 map
<string
, bufferlist
>::iterator iter
= state
->attrset
.find(RGW_ATTR_ACL
);
897 if (iter
!= state
->attrset
.end()) {
898 auto bliter
= iter
->second
.cbegin();
900 policy
.decode(bliter
);
901 } catch (buffer::error
& err
) {
902 ldpp_dout(dpp
, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
907 std::unique_ptr
<rgw::sal::Object::DeleteOp
> del_op
= obj
->get_delete_op();
909 del_op
->params
.bucket_owner
= bucket
->get_info().owner
;
910 del_op
->params
.obj_owner
= policy
.get_owner();
912 del_op
->params
.unmod_since
= timestamp
;
915 del_op
->params
.versioning_status
= BUCKET_VERSIONED
;
917 del_op
->params
.olh_epoch
= versioned_epoch
;
918 del_op
->params
.marker_version_id
= marker_version_id
;
919 del_op
->params
.obj_owner
.set_id(rgw_user(owner
));
920 del_op
->params
.obj_owner
.set_name(owner_display_name
);
921 del_op
->params
.mtime
= timestamp
;
922 del_op
->params
.high_precision_time
= true;
923 del_op
->params
.zones_trace
= &zones_trace
;
925 ret
= del_op
->delete_obj(dpp
, null_yield
);
927 ldpp_dout(dpp
, 20) << __func__
<< "(): delete_obj() obj=" << obj
<< " returned ret=" << ret
<< dendl
;
932 int RGWContinuousLeaseCR::operate(const DoutPrefixProvider
*dpp
)
935 caller
->set_sleeping(false);
936 return set_cr_done();
939 last_renew_try_time
= ceph::coarse_mono_clock::now();
940 while (!going_down
) {
941 current_time
= ceph::coarse_mono_clock::now();
942 yield
call(new RGWSimpleRadosLockCR(async_rados
, store
, obj
, lock_name
, cookie
, interval
));
944 latency
->add_latency(ceph::coarse_mono_clock::now() - current_time
);
946 current_time
= ceph::coarse_mono_clock::now();
947 if (current_time
- last_renew_try_time
> interval_tolerance
) {
948 // renewal should happen between 50%-90% of interval
949 ldout(store
->ctx(), 1) << *this << ": WARNING: did not renew lock " << obj
<< ":" << lock_name
<< ": within 90\% of interval. " <<
950 (current_time
- last_renew_try_time
) << " > " << interval_tolerance
<< dendl
;
952 last_renew_try_time
= current_time
;
954 caller
->set_sleeping(false); /* will only be relevant when we return, that's why we can do it early */
957 ldout(store
->ctx(), 20) << *this << ": couldn't lock " << obj
<< ":" << lock_name
<< ": retcode=" << retcode
<< dendl
;
958 return set_state(RGWCoroutine_Error
, retcode
);
960 ldout(store
->ctx(), 20) << *this << ": successfully locked " << obj
<< ":" << lock_name
<< dendl
;
962 yield
wait(utime_t(interval
/ 2, 0));
964 set_locked(false); /* moot at this point anyway */
965 current_time
= ceph::coarse_mono_clock::now();
966 yield
call(new RGWSimpleRadosUnlockCR(async_rados
, store
, obj
, lock_name
, cookie
));
968 latency
->add_latency(ceph::coarse_mono_clock::now() - current_time
);
970 return set_state(RGWCoroutine_Done
);
975 RGWRadosTimelogAddCR::RGWRadosTimelogAddCR(const DoutPrefixProvider
*_dpp
, rgw::sal::RadosStore
* _store
, const string
& _oid
,
976 const cls_log_entry
& entry
) : RGWSimpleCoroutine(_store
->ctx()),
981 stringstream
& s
= set_description();
982 s
<< "timelog add entry oid=" << oid
<< "entry={id=" << entry
.id
<< ", section=" << entry
.section
<< ", name=" << entry
.name
<< "}";
983 entries
.push_back(entry
);
986 int RGWRadosTimelogAddCR::send_request(const DoutPrefixProvider
*dpp
)
988 set_status() << "sending request";
990 cn
= stack
->create_completion_notifier();
991 return store
->svc()->cls
->timelog
.add(dpp
, oid
, entries
, cn
->completion(), true, null_yield
);
994 int RGWRadosTimelogAddCR::request_complete()
996 int r
= cn
->completion()->get_return_value();
998 set_status() << "request complete; ret=" << r
;
1003 RGWRadosTimelogTrimCR::RGWRadosTimelogTrimCR(const DoutPrefixProvider
*dpp
,
1004 rgw::sal::RadosStore
* store
,
1005 const std::string
& oid
,
1006 const real_time
& start_time
,
1007 const real_time
& end_time
,
1008 const std::string
& from_marker
,
1009 const std::string
& to_marker
)
1010 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
), oid(oid
),
1011 start_time(start_time
), end_time(end_time
),
1012 from_marker(from_marker
), to_marker(to_marker
)
1014 set_description() << "timelog trim oid=" << oid
1015 << " start_time=" << start_time
<< " end_time=" << end_time
1016 << " from_marker=" << from_marker
<< " to_marker=" << to_marker
;
1019 int RGWRadosTimelogTrimCR::send_request(const DoutPrefixProvider
*dpp
)
1021 set_status() << "sending request";
1023 cn
= stack
->create_completion_notifier();
1024 return store
->svc()->cls
->timelog
.trim(dpp
, oid
, start_time
, end_time
, from_marker
,
1025 to_marker
, cn
->completion(),
1029 int RGWRadosTimelogTrimCR::request_complete()
1031 int r
= cn
->completion()->get_return_value();
1033 set_status() << "request complete; ret=" << r
;
1039 RGWSyncLogTrimCR::RGWSyncLogTrimCR(const DoutPrefixProvider
*dpp
,
1040 rgw::sal::RadosStore
* store
, const std::string
& oid
,
1041 const std::string
& to_marker
,
1042 std::string
*last_trim_marker
)
1043 : RGWRadosTimelogTrimCR(dpp
, store
, oid
, real_time
{}, real_time
{},
1044 std::string
{}, to_marker
),
1045 cct(store
->ctx()), last_trim_marker(last_trim_marker
)
1049 int RGWSyncLogTrimCR::request_complete()
1051 int r
= RGWRadosTimelogTrimCR::request_complete();
1052 if (r
!= -ENODATA
) {
1055 // nothing left to trim, update last_trim_marker
1056 if (*last_trim_marker
< to_marker
&& to_marker
!= max_marker
) {
1057 *last_trim_marker
= to_marker
;
1063 int RGWAsyncStatObj::_send_request(const DoutPrefixProvider
*dpp
)
1065 rgw_raw_obj raw_obj
;
1066 store
->getRados()->obj_to_raw(bucket_info
.placement_rule
, obj
, &raw_obj
);
1067 return store
->getRados()->raw_obj_stat(dpp
, raw_obj
, psize
, pmtime
, pepoch
,
1068 nullptr, nullptr, objv_tracker
, null_yield
);
1071 RGWStatObjCR::RGWStatObjCR(const DoutPrefixProvider
*dpp
,
1072 RGWAsyncRadosProcessor
*async_rados
, rgw::sal::RadosStore
* store
,
1073 const RGWBucketInfo
& _bucket_info
, const rgw_obj
& obj
, uint64_t *psize
,
1074 real_time
* pmtime
, uint64_t *pepoch
,
1075 RGWObjVersionTracker
*objv_tracker
)
1076 : RGWSimpleCoroutine(store
->ctx()), dpp(dpp
), store(store
), async_rados(async_rados
),
1077 bucket_info(_bucket_info
), obj(obj
), psize(psize
), pmtime(pmtime
), pepoch(pepoch
),
1078 objv_tracker(objv_tracker
)
1082 void RGWStatObjCR::request_cleanup()
1090 int RGWStatObjCR::send_request(const DoutPrefixProvider
*dpp
)
1092 req
= new RGWAsyncStatObj(dpp
, this, stack
->create_completion_notifier(),
1093 store
, bucket_info
, obj
, psize
, pmtime
, pepoch
, objv_tracker
);
1094 async_rados
->queue(req
);
1098 int RGWStatObjCR::request_complete()
1100 return req
->get_ret_status();
1103 RGWRadosNotifyCR::RGWRadosNotifyCR(rgw::sal::RadosStore
* store
, const rgw_raw_obj
& obj
,
1104 bufferlist
& request
, uint64_t timeout_ms
,
1105 bufferlist
*response
)
1106 : RGWSimpleCoroutine(store
->ctx()), store(store
), obj(obj
),
1107 request(request
), timeout_ms(timeout_ms
), response(response
)
1109 set_description() << "notify dest=" << obj
;
1112 int RGWRadosNotifyCR::send_request(const DoutPrefixProvider
*dpp
)
1114 int r
= store
->getRados()->get_raw_obj_ref(dpp
, obj
, &ref
);
1116 ldpp_dout(dpp
, -1) << "ERROR: failed to get ref for (" << obj
<< ") ret=" << r
<< dendl
;
1120 set_status() << "sending request";
1122 cn
= stack
->create_completion_notifier();
1123 return ref
.pool
.ioctx().aio_notify(ref
.obj
.oid
, cn
->completion(), request
,
1124 timeout_ms
, response
);
1127 int RGWRadosNotifyCR::request_complete()
1129 int r
= cn
->completion()->get_return_value();
1131 set_status() << "request complete; ret=" << r
;
1137 int RGWDataPostNotifyCR::operate(const DoutPrefixProvider
* dpp
)
1140 using PostNotify2
= RGWPostRESTResourceCR
<bc::flat_map
<int, bc::flat_set
<rgw_data_notify_entry
>>, int>;
1142 rgw_http_param_pair pairs
[] = { { "type", "data" },
1143 { "notify2", NULL
},
1144 { "source-zone", source_zone
},
1146 call(new PostNotify2(store
->ctx(), conn
, &http_manager
, "/admin/log", pairs
, shards
, nullptr));
1148 if (retcode
== -ERR_METHOD_NOT_ALLOWED
) {
1149 using PostNotify1
= RGWPostRESTResourceCR
<rgw_data_notify_v1_encoder
, int>;
1151 rgw_http_param_pair pairs
[] = { { "type", "data" },
1153 { "source-zone", source_zone
},
1155 auto encoder
= rgw_data_notify_v1_encoder
{shards
};
1156 call(new PostNotify1(store
->ctx(), conn
, &http_manager
, "/admin/log", pairs
, encoder
, nullptr));
1160 return set_cr_error(retcode
);
1162 return set_cr_done();