+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#ifndef CEPH_RGW_CR_RADOS_H
#define CEPH_RGW_CR_RADOS_H
#include <boost/intrusive_ptr.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "rgw_coroutine.h"
#include "rgw_rados.h"
#include "common/WorkQueue.h"
#include <atomic>
+#include "services/svc_sys_obj.h"
+
class RGWAsyncRadosRequest : public RefCountedObject {
RGWCoroutine *caller;
RGWAioCompletionNotifier *notifier;
void _process(RGWAsyncRadosRequest *req, ThreadPool::TPHandle& handle) override;
void _dump_queue();
void _clear() override {
- assert(processor->m_req_queue.empty());
+ ceph_assert(processor->m_req_queue.empty());
}
} req_wq;
}
};
+template <class P>
+class RGWSimpleWriteOnlyAsyncCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
-class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
+ P params;
+
+ class Request : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ P params;
+ protected:
+ int _send_request() override;
+ public:
+ Request(RGWCoroutine *caller,
+ RGWAioCompletionNotifier *cn,
+ RGWRados *store,
+ const P& _params) : RGWAsyncRadosRequest(caller, cn),
+ store(store),
+ params(_params) {}
+ } *req{nullptr};
+
+ public:
+ RGWSimpleWriteOnlyAsyncCR(RGWAsyncRadosProcessor *_async_rados,
+ RGWRados *_store,
+ const P& _params) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ params(_params) {}
+
+ ~RGWSimpleWriteOnlyAsyncCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new Request(this,
+ stack->create_completion_notifier(),
+ store,
+ params);
+
+ async_rados->queue(req);
+ return 0;
+ }
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+
+template <class P, class R>
+class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
- RGWObjectCtx *obj_ctx;
- RGWRados::SystemObject::Read::GetObjState read_state;
- RGWObjVersionTracker *objv_tracker;
+
+ P params;
+ std::shared_ptr<R> result;
+
+ class Request : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ P params;
+ std::shared_ptr<R> result;
+ protected:
+ int _send_request() override;
+ public:
+ Request(RGWCoroutine *caller,
+ RGWAioCompletionNotifier *cn,
+ RGWRados *_store,
+ const P& _params,
+ std::shared_ptr<R>& _result) : RGWAsyncRadosRequest(caller, cn),
+ store(_store),
+ params(_params),
+ result(_result) {}
+ } *req{nullptr};
+
+ public:
+ RGWSimpleAsyncCR(RGWAsyncRadosProcessor *_async_rados,
+ RGWRados *_store,
+ const P& _params,
+ std::shared_ptr<R>& _result) : RGWSimpleCoroutine(_store->ctx()),
+ async_rados(_async_rados),
+ store(_store),
+ params(_params),
+ result(_result) {}
+
+ ~RGWSimpleAsyncCR() override {
+ request_cleanup();
+ }
+ void request_cleanup() override {
+ if (req) {
+ req->finish();
+ req = NULL;
+ }
+ }
+
+ int send_request() override {
+ req = new Request(this,
+ stack->create_completion_notifier(),
+ store,
+ params,
+ result);
+
+ async_rados->queue(req);
+ return 0;
+ }
+ int request_complete() override {
+ return req->get_ret_status();
+ }
+};
+
+
+class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
+ RGWSysObjectCtx obj_ctx;
+ RGWObjVersionTracker objv_tracker;
rgw_raw_obj obj;
- bufferlist *pbl;
- map<string, bufferlist> *pattrs;
- off_t ofs;
- off_t end;
+ const bool want_attrs;
+ const bool raw_attrs;
protected:
int _send_request() override;
public:
- RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store, RGWObjectCtx *_obj_ctx,
+ RGWAsyncGetSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
- bufferlist *_pbl, off_t _ofs, off_t _end);
- void set_read_attrs(map<string, bufferlist> *_pattrs) { pattrs = _pattrs; }
+ bool want_attrs, bool raw_attrs);
+
+ bufferlist bl;
+ map<string, bufferlist> attrs;
};
class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
- RGWRados *store;
- RGWObjVersionTracker *objv_tracker;
+ RGWSI_SysObj *svc;
rgw_raw_obj obj;
bool exclusive;
bufferlist bl;
protected:
int _send_request() override;
public:
- RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
- RGWObjVersionTracker *_objv_tracker, rgw_raw_obj& _obj,
- bool _exclusive, bufferlist& _bl);
+ RGWAsyncPutSystemObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
+ RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
+ bool _exclusive, bufferlist _bl);
+
+ RGWObjVersionTracker objv_tracker;
};
class RGWAsyncPutSystemObjAttrs : public RGWAsyncRadosRequest {
- RGWRados *store;
- RGWObjVersionTracker *objv_tracker;
+ RGWSI_SysObj *svc;
rgw_raw_obj obj;
- map<string, bufferlist> *attrs;
+ map<string, bufferlist> attrs;
protected:
int _send_request() override;
public:
- RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
+ RGWAsyncPutSystemObjAttrs(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWSI_SysObj *_svc,
RGWObjVersionTracker *_objv_tracker, const rgw_raw_obj& _obj,
- map<string, bufferlist> *_attrs);
+ map<string, bufferlist> _attrs);
+
+ RGWObjVersionTracker objv_tracker;
};
class RGWAsyncLockSystemObj : public RGWAsyncRadosRequest {
template <class T>
class RGWSimpleRadosReadCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- RGWObjectCtx obj_ctx;
- bufferlist bl;
+ RGWSI_SysObj *svc;
rgw_raw_obj obj;
-
- map<string, bufferlist> *pattrs{nullptr};
-
T *result;
/// on ENOENT, call handle_data() with an empty object instead of failing
const bool empty_on_enoent;
RGWObjVersionTracker *objv_tracker;
-
RGWAsyncGetSystemObj *req{nullptr};
public:
- RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWSimpleRadosReadCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
const rgw_raw_obj& _obj,
T *_result, bool empty_on_enoent = true,
RGWObjVersionTracker *objv_tracker = nullptr)
- : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
- obj_ctx(store), obj(_obj), result(_result),
+ : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados), svc(_svc),
+ obj(_obj), result(_result),
empty_on_enoent(empty_on_enoent), objv_tracker(objv_tracker) {}
~RGWSimpleRadosReadCR() override {
request_cleanup();
template <class T>
int RGWSimpleRadosReadCR<T>::send_request()
{
- req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(),
- store, &obj_ctx, objv_tracker,
- obj,
- &bl, 0, -1);
- if (pattrs) {
- req->set_read_attrs(pattrs);
- }
+ req = new RGWAsyncGetSystemObj(this, stack->create_completion_notifier(), svc,
+ objv_tracker, obj, false, false);
async_rados->queue(req);
return 0;
}
return ret;
}
try {
- bufferlist::iterator iter = bl.begin();
+ auto iter = req->bl.cbegin();
if (iter.end()) {
// allow successful reads with empty buffers. ReadSyncStatus coroutines
// depend on this to be able to read without locking, because the
- // cls lock from InitSyncStatus will create an empty object if it didnt
+ // cls lock from InitSyncStatus will create an empty object if it didn't
// exist
*result = T();
} else {
- ::decode(*result, iter);
+ decode(*result, iter);
}
} catch (buffer::error& err) {
return -EIO;
class RGWSimpleRadosReadAttrsCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- RGWObjectCtx obj_ctx;
- bufferlist bl;
+ RGWSI_SysObj *svc;
rgw_raw_obj obj;
-
map<string, bufferlist> *pattrs;
-
+ bool raw_attrs;
RGWAsyncGetSystemObj *req;
public:
- RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
const rgw_raw_obj& _obj,
- map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados), store(_store),
- obj_ctx(store),
+ map<string, bufferlist> *_pattrs, bool _raw_attrs) : RGWSimpleCoroutine(_svc->ctx()),
+ async_rados(_async_rados), svc(_svc),
obj(_obj),
pattrs(_pattrs),
- req(NULL) { }
+ raw_attrs(_raw_attrs),
+ req(NULL) {}
~RGWSimpleRadosReadAttrsCR() override {
request_cleanup();
}
template <class T>
class RGWSimpleRadosWriteCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
+ RGWSI_SysObj *svc;
bufferlist bl;
-
rgw_raw_obj obj;
RGWObjVersionTracker *objv_tracker;
-
RGWAsyncPutSystemObj *req{nullptr};
public:
- RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWSimpleRadosWriteCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
const rgw_raw_obj& _obj,
const T& _data, RGWObjVersionTracker *objv_tracker = nullptr)
- : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados),
- store(_store), obj(_obj), objv_tracker(objv_tracker) {
- ::encode(_data, bl);
+ : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
+ svc(_svc), obj(_obj), objv_tracker(objv_tracker) {
+ encode(_data, bl);
}
~RGWSimpleRadosWriteCR() override {
int send_request() override {
req = new RGWAsyncPutSystemObj(this, stack->create_completion_notifier(),
- store, objv_tracker, obj, false, bl);
+ svc, objv_tracker, obj, false, std::move(bl));
async_rados->queue(req);
return 0;
}
int request_complete() override {
+ if (objv_tracker) { // copy the updated version
+ *objv_tracker = req->objv_tracker;
+ }
return req->get_ret_status();
}
};
class RGWSimpleRadosWriteAttrsCR : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
+ RGWSI_SysObj *svc;
+ RGWObjVersionTracker *objv_tracker;
rgw_raw_obj obj;
-
map<string, bufferlist> attrs;
-
- RGWAsyncPutSystemObjAttrs *req;
+ RGWAsyncPutSystemObjAttrs *req = nullptr;
public:
- RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- const rgw_raw_obj& _obj,
- map<string, bufferlist>& _attrs) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
- obj(_obj),
- attrs(_attrs), req(NULL) {
+ RGWSimpleRadosWriteAttrsCR(RGWAsyncRadosProcessor *_async_rados,
+ RGWSI_SysObj *_svc, const rgw_raw_obj& _obj,
+ map<string, bufferlist> _attrs,
+ RGWObjVersionTracker *objv_tracker = nullptr)
+ : RGWSimpleCoroutine(_svc->ctx()), async_rados(_async_rados),
+ svc(_svc), objv_tracker(objv_tracker), obj(_obj),
+ attrs(std::move(_attrs)) {
}
~RGWSimpleRadosWriteAttrsCR() override {
request_cleanup();
int send_request() override {
req = new RGWAsyncPutSystemObjAttrs(this, stack->create_completion_notifier(),
- store, NULL, obj, &attrs);
+ svc, objv_tracker, obj, std::move(attrs));
async_rados->queue(req);
return 0;
}
int request_complete() override {
+ if (objv_tracker) { // copy the updated version
+ *objv_tracker = req->objv_tracker;
+ }
return req->get_ret_status();
}
};
};
class RGWRadosGetOmapKeysCR : public RGWSimpleCoroutine {
- RGWRados *store;
+ public:
+ struct Result {
+ rgw_rados_ref ref;
+ std::set<std::string> entries;
+ bool more = false;
+ };
+ using ResultPtr = std::shared_ptr<Result>;
- string marker;
- std::set<std::string> *entries;
- int max_entries;
+ RGWRadosGetOmapKeysCR(RGWRados *_store, const rgw_raw_obj& _obj,
+ const string& _marker, int _max_entries,
+ ResultPtr result);
- rgw_rados_ref ref;
+ int send_request() override;
+ int request_complete() override;
+ private:
+ RGWRados *store;
rgw_raw_obj obj;
-
+ string marker;
+ int max_entries;
+ ResultPtr result;
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
-
-public:
- RGWRadosGetOmapKeysCR(RGWRados *_store,
- const rgw_raw_obj& _obj,
- const string& _marker,
- std::set<std::string> *_entries, int _max_entries);
-
- int send_request() override;
- int request_complete() override;
};
class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
public:
RGWRadosRemoveCR(RGWRados *store, const rgw_raw_obj& obj);
- int send_request();
- int request_complete();
+ int send_request() override;
+ int request_complete() override;
};
class RGWSimpleRadosLockCR : public RGWSimpleCoroutine {
class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
RGWRados *store;
const std::string oid;
- RGWBucketInfo *bucket_info;
protected:
int _send_request() override;
public:
RGWAsyncGetBucketInstanceInfo(RGWCoroutine *caller, RGWAioCompletionNotifier *cn,
- RGWRados *_store, const std::string& oid,
- RGWBucketInfo *_bucket_info)
- : RGWAsyncRadosRequest(caller, cn), store(_store),
- oid(oid), bucket_info(_bucket_info) {}
+ RGWRados *_store, const std::string& oid)
+ : RGWAsyncRadosRequest(caller, cn), store(_store), oid(oid) {}
+
+ RGWBucketInfo bucket_info;
};
class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
}
int send_request() override {
- req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid, bucket_info);
+ req = new RGWAsyncGetBucketInstanceInfo(this, stack->create_completion_notifier(), store, oid);
async_rados->queue(req);
return 0;
}
int request_complete() override {
+ if (bucket_info) {
+ *bucket_info = std::move(req->bucket_info);
+ }
return req->get_ret_status();
}
};
string source_zone;
RGWBucketInfo bucket_info;
+ std::optional<rgw_placement_rule> dest_placement_rule;
rgw_obj_key key;
- uint64_t versioned_epoch;
+ std::optional<rgw_obj_key> dest_key;
+ std::optional<uint64_t> versioned_epoch;
real_time src_mtime;
bool copy_if_newer;
- rgw_zone_set *zones_trace;
+ rgw_zone_set zones_trace;
+ PerfCounters* counters;
protected:
int _send_request() override;
RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
const string& _source_zone,
RGWBucketInfo& _bucket_info,
+ std::optional<rgw_placement_rule> _dest_placement_rule,
const rgw_obj_key& _key,
- uint64_t _versioned_epoch,
- bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
- source_zone(_source_zone),
- bucket_info(_bucket_info),
- key(_key),
- versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), zones_trace(_zones_trace) {}
+ const std::optional<rgw_obj_key>& _dest_key,
+ std::optional<uint64_t> _versioned_epoch,
+ bool _if_newer, rgw_zone_set *_zones_trace,
+ PerfCounters* counters)
+ : RGWAsyncRadosRequest(caller, cn), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ dest_placement_rule(_dest_placement_rule),
+ key(_key),
+ dest_key(_dest_key),
+ versioned_epoch(_versioned_epoch),
+ copy_if_newer(_if_newer), counters(counters)
+ {
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ }
};
class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
string source_zone;
RGWBucketInfo bucket_info;
+ std::optional<rgw_placement_rule> dest_placement_rule;
rgw_obj_key key;
- uint64_t versioned_epoch;
+ std::optional<rgw_obj_key> dest_key;
+ std::optional<uint64_t> versioned_epoch;
real_time src_mtime;
RGWAsyncFetchRemoteObj *req;
rgw_zone_set *zones_trace;
+ PerfCounters* counters;
public:
RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
const string& _source_zone,
RGWBucketInfo& _bucket_info,
+ std::optional<rgw_placement_rule> _dest_placement_rule,
const rgw_obj_key& _key,
- uint64_t _versioned_epoch,
- bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
- async_rados(_async_rados), store(_store),
- source_zone(_source_zone),
- bucket_info(_bucket_info),
- key(_key),
- versioned_epoch(_versioned_epoch),
- copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
+ const std::optional<rgw_obj_key>& _dest_key,
+ std::optional<uint64_t> _versioned_epoch,
+ bool _if_newer, rgw_zone_set *_zones_trace,
+ PerfCounters* counters)
+ : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ dest_placement_rule(_dest_placement_rule),
+ key(_key),
+ dest_key(_dest_key),
+ versioned_epoch(_versioned_epoch),
+ copy_if_newer(_if_newer), req(NULL),
+ zones_trace(_zones_trace), counters(counters) {}
~RGWFetchRemoteObjCR() override {
}
int send_request() override {
- req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
- key, versioned_epoch, copy_if_newer, zones_trace);
+ req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
+ source_zone, bucket_info, dest_placement_rule,
+ key, dest_key, versioned_epoch, copy_if_newer,
+ zones_trace, counters);
async_rados->queue(req);
return 0;
}
ceph::real_time *pmtime;
uint64_t *psize;
+ string *petag;
map<string, bufferlist> *pattrs;
+ map<string, string> *pheaders;
protected:
int _send_request() override;
const rgw_obj_key& _key,
ceph::real_time *_pmtime,
uint64_t *_psize,
- map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
+ string *_petag,
+ map<string, bufferlist> *_pattrs,
+ map<string, string> *_pheaders) : RGWAsyncRadosRequest(caller, cn), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
pmtime(_pmtime),
psize(_psize),
- pattrs(_pattrs) {}
+ petag(_petag),
+ pattrs(_pattrs),
+ pheaders(_pheaders) {}
};
class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
ceph::real_time *pmtime;
uint64_t *psize;
+ string *petag;
map<string, bufferlist> *pattrs;
+ map<string, string> *pheaders;
RGWAsyncStatRemoteObj *req;
const rgw_obj_key& _key,
ceph::real_time *_pmtime,
uint64_t *_psize,
- map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ string *_petag,
+ map<string, bufferlist> *_pattrs,
+ map<string, string> *_pheaders) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
async_rados(_async_rados), store(_store),
source_zone(_source_zone),
bucket_info(_bucket_info),
key(_key),
pmtime(_pmtime),
psize(_psize),
+ petag(_petag),
pattrs(_pattrs),
+ pheaders(_pheaders),
req(NULL) {}
int send_request() override {
req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
- bucket_info, key, pmtime, psize, pattrs);
+ bucket_info, key, pmtime, psize, petag, pattrs, pheaders);
async_rados->queue(req);
return 0;
}
bool del_if_older;
ceph::real_time timestamp;
- rgw_zone_set *zones_trace;
+ rgw_zone_set zones_trace;
protected:
int _send_request() override;
versioned(_versioned),
versioned_epoch(_versioned_epoch),
del_if_older(_if_older),
- timestamp(_timestamp), zones_trace(_zones_trace) {
+ timestamp(_timestamp) {
if (_delete_marker) {
marker_version_id = key.instance;
}
+
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
}
};
CephContext *cct;
std::string *last_trim_marker;
public:
+ // a marker that compares greater than any timestamp-based index
+ static constexpr const char* max_marker = "99999999";
+
RGWSyncLogTrimCR(RGWRados *store, const std::string& oid,
const std::string& to_marker, std::string *last_trim_marker);
int request_complete() override;