#include "include/ceph_assert.h"
#include "rgw_coroutine.h"
#include "rgw_sal.h"
+#include "rgw_sal_rados.h"
#include "common/WorkQueue.h"
#include "common/Throttle.h"
struct RGWWQ : public ThreadPool::WorkQueue<RGWAsyncRadosRequest> {
RGWAsyncRadosProcessor *processor;
- RGWWQ(RGWAsyncRadosProcessor *p, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
+ RGWWQ(RGWAsyncRadosProcessor *p,
+ ceph::timespan timeout, ceph::timespan suicide_timeout,
+ ThreadPool *tp)
: ThreadPool::WorkQueue<RGWAsyncRadosRequest>("RGWWQ", timeout, suicide_timeout, tp), processor(p) {}
bool _enqueue(RGWAsyncRadosRequest *req) override;
class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
RGWSysObjectCtx obj_ctx;
- RGWObjVersionTracker objv_tracker;
rgw_raw_obj obj;
const bool want_attrs;
const bool raw_attrs;
bufferlist bl;
map<string, bufferlist> attrs;
+ RGWObjVersionTracker objv_tracker;
};
class RGWAsyncPutSystemObj : public RGWAsyncRadosRequest {
rgw_raw_obj obj;
map<string, bufferlist> *pattrs;
bool raw_attrs;
- RGWAsyncGetSystemObj *req;
+ RGWObjVersionTracker* objv_tracker;
+ RGWAsyncGetSystemObj *req = nullptr;
public:
RGWSimpleRadosReadAttrsCR(RGWAsyncRadosProcessor *_async_rados, RGWSI_SysObj *_svc,
- const rgw_raw_obj& _obj,
- map<string, bufferlist> *_pattrs, bool _raw_attrs) : RGWSimpleCoroutine(_svc->ctx()),
- async_rados(_async_rados), svc(_svc),
- obj(_obj),
- pattrs(_pattrs),
- raw_attrs(_raw_attrs),
- req(NULL) {}
+ const rgw_raw_obj& _obj, map<string, bufferlist> *_pattrs,
+ bool _raw_attrs, RGWObjVersionTracker* objv_tracker = nullptr)
+ : RGWSimpleCoroutine(_svc->ctx()),
+ async_rados(_async_rados), svc(_svc),
+ obj(_obj),
+ pattrs(_pattrs),
+ raw_attrs(_raw_attrs),
+ objv_tracker(objv_tracker)
+ {}
~RGWSimpleRadosReadAttrsCR() override {
request_cleanup();
}
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
};
+class RGWRadosGetOmapValsCR : public RGWSimpleCoroutine {
+ public:
+ struct Result {
+ rgw_rados_ref ref;
+ std::map<std::string, bufferlist> entries;
+ bool more = false;
+ };
+ using ResultPtr = std::shared_ptr<Result>;
+
+ RGWRadosGetOmapValsCR(rgw::sal::RGWRadosStore *_store, const rgw_raw_obj& _obj,
+ const string& _marker, int _max_entries,
+ ResultPtr result);
+
+ int send_request() override;
+ int request_complete() override;
+
+ private:
+ rgw::sal::RGWRadosStore *store;
+ rgw_raw_obj obj;
+ string marker;
+ int max_entries;
+ ResultPtr result;
+ boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+};
+
class RGWRadosRemoveOmapKeysCR : public RGWSimpleCoroutine {
rgw::sal::RGWRadosStore *store;
rgw::sal::RGWRadosStore *store;
librados::IoCtx ioctx;
const rgw_raw_obj obj;
+ RGWObjVersionTracker* objv_tracker;
boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
public:
- RGWRadosRemoveCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj);
+ RGWRadosRemoveCR(rgw::sal::RGWRadosStore *store, const rgw_raw_obj& obj,
+ RGWObjVersionTracker* objv_tracker = nullptr);
int send_request() override;
int request_complete() override;
}
};
-class RGWAsyncWait : public RGWAsyncRadosRequest {
- CephContext *cct;
- ceph::mutex *lock;
- ceph::condition_variable *cond;
- std::chrono::seconds interval;
-protected:
- int _send_request() override {
- std::unique_lock l{*lock};
- return (cond->wait_for(l, interval) == std::cv_status::timeout ?
- ETIMEDOUT : 0);
- }
-public:
- RGWAsyncWait(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, CephContext *_cct,
- ceph::mutex *_lock, ceph::condition_variable *_cond, int _secs)
- : RGWAsyncRadosRequest(caller, cn),
- cct(_cct),
- lock(_lock), cond(_cond), interval(_secs) {}
-
- void wakeup() {
- std::lock_guard l{*lock};
- cond->notify_all();
- }
-};
-
-class RGWWaitCR : public RGWSimpleCoroutine {
- CephContext *cct;
- RGWAsyncRadosProcessor *async_rados;
- ceph::mutex *lock;
- ceph::condition_variable *cond;
- int secs;
-
- RGWAsyncWait *req;
-
-public:
- RGWWaitCR(RGWAsyncRadosProcessor *_async_rados, CephContext *_cct,
- ceph::mutex *_lock, ceph::condition_variable *_cond,
- int _secs) : RGWSimpleCoroutine(_cct), cct(_cct),
- async_rados(_async_rados), lock(_lock), cond(_cond), secs(_secs), req(NULL) {
- }
- ~RGWWaitCR() override {
- request_cleanup();
- }
-
- void request_cleanup() override {
- if (req) {
- wakeup();
- req->finish();
- req = NULL;
- }
- }
-
- int send_request() override {
- req = new RGWAsyncWait(this, stack->create_completion_notifier(), cct, lock, cond, secs);
- async_rados->queue(req);
- return 0;
- }
-
- int request_complete() override {
- return req->get_ret_status();
- }
-
- void wakeup() {
- req->wakeup();
- }
-};
-
class RGWShardedOmapCRManager {
RGWAsyncRadosProcessor *async_rados;
rgw::sal::RGWRadosStore *store;
const string cookie;
int interval;
-
- ceph::mutex lock = ceph::make_mutex("RGWContinuousLeaseCR");
- std::atomic<bool> going_down = { false };
+ bool going_down{ false };
bool locked{false};
RGWCoroutine *caller;
int operate() override;
- bool is_locked() {
- std::lock_guard l{lock};
+ bool is_locked() const {
return locked;
}
void set_locked(bool status) {
- std::lock_guard l{lock};
locked = status;
}
CephContext *cct;
std::string *last_trim_marker;
public:
- // a marker that compares greater than any timestamp-based index
static constexpr const char* max_marker = "99999999";
RGWSyncLogTrimCR(rgw::sal::RGWRadosStore *store, const std::string& oid,