RGWAsyncRadosProcessor::RGWAsyncRadosProcessor(CephContext *_cct, int num_threads)
: cct(_cct), m_tp(cct, "RGWAsyncRadosProcessor::m_tp", "rados_async", num_threads),
req_throttle(_cct, "rgw_async_rados_ops", num_threads * 2),
- req_wq(this, g_conf()->rgw_op_thread_timeout,
- g_conf()->rgw_op_thread_suicide_timeout, &m_tp) {
+ req_wq(this,
+ ceph::make_timespan(g_conf()->rgw_op_thread_timeout),
+ ceph::make_timespan(g_conf()->rgw_op_thread_suicide_timeout),
+ &m_tp) {
}
void RGWAsyncRadosProcessor::start() {
int RGWSimpleRadosReadAttrsCR::send_request()
{
req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
- svc, nullptr, obj, true, raw_attrs);
+ svc, objv_tracker, obj, true, raw_attrs);
async_rados->queue(req);
return 0;
}
if (pattrs) {
*pattrs = std::move(req->attrs);
}
+ if (objv_tracker) {
+ *objv_tracker = req->objv_tracker;
+ }
return req->get_ret_status();
}
return r;
}
+RGWRadosGetOmapValsCR::RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore *_store,
+ const rgw_raw_obj& _obj,
+ const string& _marker,
+ int _max_entries,
+ ResultPtr _result)
+ : RGWSimpleCoroutine(_store->ctx()), store(_store), obj(_obj),
+ marker(_marker), max_entries(_max_entries),
+ result(std::move(_result))
+{
+ ceph_assert(result); // must be allocated
+ set_description() << "get omap keys dest=" << obj << " marker=" << marker;
+}
+
+int RGWRadosGetOmapValsCR::send_request() {
+ int r = store->getRados()->get_raw_obj_ref(obj, &result->ref);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to get ref for (" << obj << ") ret=" << r << dendl;
+ return r;
+ }
+
+ set_status() << "send request";
+
+ librados::ObjectReadOperation op;
+ op.omap_get_vals2(marker, max_entries, &result->entries, &result->more, nullptr);
+
+ cn = stack->create_completion_notifier(result);
+ return result->ref.pool.ioctx().aio_operate(result->ref.obj.oid, cn->completion(), &op, NULL);
+}
+
+int RGWRadosGetOmapValsCR::request_complete()
+{
+ int r = cn->completion()->get_return_value();
+
+ set_status() << "request complete; ret=" << r;
+
+ return r;
+}
+
RGWRadosRemoveOmapKeysCR::RGWRadosRemoveOmapKeysCR(rgw::sal::RGWRadosStore *_store,
const rgw_raw_obj& _obj,
const set<string>& _keys) : RGWSimpleCoroutine(_store->ctx()),
return r;
}
-RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj)
- : RGWSimpleCoroutine(store->ctx()), store(store), obj(obj)
+RGWRadosRemoveCR::RGWRadosRemoveCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
+ RGWObjVersionTracker* objv_tracker)
+ : RGWSimpleCoroutine(store->ctx()),
+ store(store), obj(obj), objv_tracker(objv_tracker)
{
set_description() << "remove dest=" << obj;
}
set_status() << "send request";
librados::ObjectWriteOperation op;
+ if (objv_tracker) {
+ objv_tracker->prepare_op_for_write(&op);
+ }
op.remove();
cn = stack->create_completion_notifier();
start_marker(BucketIndexShardsManager::get_shard_marker(start_marker)),
end_marker(BucketIndexShardsManager::get_shard_marker(end_marker))
{
- bs.init(bucket_info, shard_id);
+ bs.init(bucket_info, bucket_info.layout.current_index, shard_id);
}
int RGWRadosBILogTrimCR::send_request()
char buf[16];
snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
- map<string, bufferlist> attrs;
-
- rgw_obj src_obj(src_bucket, key);
+ rgw::sal::RGWAttrs attrs;
- rgw_obj dest_obj(dest_bucket_info.bucket, dest_key.value_or(key));
+ rgw::sal::RGWRadosBucket bucket(store, src_bucket);
+ rgw::sal::RGWRadosObject src_obj(store, key, &bucket);
+ rgw::sal::RGWRadosBucket dest_bucket(store, dest_bucket_info);
+ rgw::sal::RGWRadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket);
std::optional<uint64_t> bytes_transferred;
int r = store->getRados()->fetch_remote_obj(obj_ctx,
user_id.value_or(rgw_user()),
NULL, /* req_info */
source_zone,
- dest_obj,
- src_obj,
- dest_bucket_info, /* dest */
+ &dest_obj,
+ &src_obj,
+ &dest_bucket, /* dest */
nullptr, /* source */
dest_placement_rule,
NULL, /* real_time* src_mtime, */
char buf[16];
snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
- rgw_obj src_obj(src_bucket, key);
+ rgw::sal::RGWRadosBucket bucket(store, src_bucket);
+ rgw::sal::RGWRadosObject src_obj(store, key, &bucket);
int r = store->getRados()->stat_remote_obj(obj_ctx,
rgw_user(user_id),
nullptr, /* req_info */
source_zone,
- src_obj,
+ &src_obj,
nullptr, /* source */
pmtime, /* real_time* src_mtime, */
psize, /* uint64_t * */