#include <boost/algorithm/string/predicate.hpp>
#include <boost/variant.hpp>
+#include "include/scope_guard.h"
#include "common/Formatter.h"
#include "common/containers.h"
#include <common/errno.h>
#include "rgw_zone.h"
#include "rgw_string.h"
#include "rgw_multi.h"
+#include "rgw_sal.h"
// this seems safe to use, at least for now--arguably, we should
// prefer header-only fmt, in general
utime_t start = ceph_clock_now();
if (should_work(start)) {
ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
- int r = lc->process(this);
+ int r = lc->process(this, false /* once */);
if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
+ << r << dendl;
}
ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
}
utime_t next;
next.set_from_double(end + secs);
- ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
+ ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
+ << rgw_to_asctime(next) << dendl;
std::unique_lock l{lock};
cond.wait_for(l, std::chrono::seconds(secs));
delete[] obj_names;
}
-bool RGWLC::if_already_run_today(time_t& start_date)
+bool RGWLC::if_already_run_today(time_t start_date)
{
struct tm bdt;
time_t begin_of_day;
return false;
}
+static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
+ os << "<ent: bucket=";
+ os << ent.bucket;
+ os << "; start_time=";
+ os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
+ os << "; status=";
+ os << ent.status;
+ os << ">";
+ return os;
+}
+
int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
{
- map<string, int > entries;
-
+ vector<cls_rgw_lc_entry> entries;
string marker;
+ dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
+ << "index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
#define MAX_LC_LIST_ENTRIES 100
do {
int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
marker, MAX_LC_LIST_ENTRIES, entries);
if (ret < 0)
return ret;
- map<string, int>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- pair<string, int > entry(iter->first, lc_uninitial);
+
+ for (auto& entry : entries) {
+ entry.start_time = ceph_clock_now();
+ entry.status = lc_uninitial; // lc_uninitial? really?
ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], entry);
+ obj_names[index], entry);
if (ret < 0) {
- ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
- << obj_names[index] << dendl;
+ ldpp_dout(this, 0)
+ << "RGWLC::bucket_lc_prepare() failed to set entry on "
+ << obj_names[index] << dendl;
return ret;
}
}
- if (!entries.empty()) {
- marker = std::move(entries.rbegin()->first);
+ if (! entries.empty()) {
+ marker = std::move(entries.back().bucket);
}
} while (!entries.empty());
cmp = days*cct->_conf->rgw_lc_debug_interval;
base_time = ceph_clock_now();
}
- timediff = base_time - ceph::real_clock::to_time_t(mtime);
+ auto tt_mtime = ceph::real_clock::to_time_t(mtime);
+ timediff = base_time - tt_mtime;
if (expire_time) {
*expire_time = mtime + make_timespan(cmp);
}
- ldout(cct, 20) << __func__ << "(): mtime=" << mtime << " days=" << days << " base_time=" << base_time << " timediff=" << timediff << " cmp=" << cmp << dendl;
+
+ ldout(cct, 20) << __func__ << __func__
+ << "(): mtime=" << mtime << " days=" << days
+ << " base_time=" << base_time << " timediff=" << timediff
+ << " cmp=" << cmp
+ << " is_expired=" << (timediff >= cmp)
+ << dendl;
return (timediff >= cmp);
}
try {
decode(retention, iter->second);
} catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention"
+ << dendl;
return false;
}
if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
try {
decode(obj_legal_hold, iter->second);
} catch (buffer::error& err) {
- ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold"
+ << dendl;
return false;
}
if (obj_legal_hold.is_enabled()) {
++obj_iter;
}
- bool next_has_same_name()
- {
- if ((obj_iter + 1) == objs.end()) {
+ boost::optional<std::string> next_key_name() {
+ if (obj_iter == objs.end() ||
+ (obj_iter + 1) == objs.end()) {
/* this should have been called after get_obj() was called, so this should
* only happen if is_truncated is false */
- return false;
+ return boost::none;
}
- return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
+
+ return ((obj_iter + 1)->key.name);
}
+
}; /* LCObjsLister */
struct op_env {
using LCWorker = RGWLC::LCWorker;
- lc_op& op;
+ lc_op op;
rgw::sal::RGWRadosStore *store;
LCWorker* worker;
RGWBucketInfo& bucket_info;
}; /* op_env */
class LCRuleOp;
+class WorkQ;
struct lc_op_ctx {
CephContext *cct;
- op_env& env;
- rgw_bucket_dir_entry& o;
+ op_env env;
+ rgw_bucket_dir_entry o;
+ boost::optional<std::string> next_key_name;
+ ceph::real_time effective_mtime;
rgw::sal::RGWRadosStore *store;
RGWBucketInfo& bucket_info;
- lc_op& op;
+ lc_op& op; // ok--refers to expanded env.op
LCObjsLister& ol;
rgw_obj obj;
RGWObjectCtx rctx;
const DoutPrefixProvider *dpp;
-
- lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o,
- const DoutPrefixProvider *_dpp)
- : cct(_env.store->ctx()), env(_env), o(_o),
+ WorkQ* wq;
+
+ lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
+ boost::optional<std::string> next_key_name,
+ ceph::real_time effective_mtime,
+ const DoutPrefixProvider *dpp, WorkQ* wq)
+ : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
+ effective_mtime(effective_mtime),
store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
- obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
+ obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
+ {}
+
+ bool next_has_same_name(const std::string& key_name) {
+ return (next_key_name && key_name.compare(
+ boost::get<std::string>(next_key_name)) == 0);
+ }
+
}; /* lc_op_ctx */
static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
del_op.params.obj_owner = obj_owner;
del_op.params.unmod_since = meta.mtime;
- if (perfcounter) {
- perfcounter->inc(l_rgw_lc_remove_expired, 1);
- }
-
return del_op.delete_obj(null_yield);
} /* remove_expired_obj */
virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
return false;
- };
+ }
/* called after check(). Check should tell us whether this action
* is applicable. If there are multiple actions, we'll end up executing
virtual int process(lc_op_ctx& oc) {
return 0;
}
+
+ friend class LCOpRule;
}; /* LCOpAction */
class LCOpFilter {
class LCOpRule {
friend class LCOpAction;
- op_env& env;
+ op_env env;
+ boost::optional<std::string> next_key_name;
+ ceph::real_time effective_mtime;
- std::vector<unique_ptr<LCOpFilter> > filters;
- std::vector<unique_ptr<LCOpAction> > actions;
+ std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
+ std::vector<shared_ptr<LCOpAction> > actions;
public:
LCOpRule(op_env& _env) : env(_env) {}
+ boost::optional<std::string> get_next_key_name() {
+ return next_key_name;
+ }
+
+ std::vector<shared_ptr<LCOpAction>>& get_actions() {
+ return actions;
+ }
+
void build();
- int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
+ void update();
+ int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
+ WorkQ* wq);
}; /* LCOpRule */
using WorkItem =
boost::variant<void*,
/* out-of-line delete */
- std::tuple<LCOpRule&, rgw_bucket_dir_entry>,
+ std::tuple<LCOpRule, rgw_bucket_dir_entry>,
/* uncompleted MPU expiration */
- std::tuple<const lc_op&, rgw_bucket_dir_entry>,
+ std::tuple<lc_op, rgw_bucket_dir_entry>,
rgw_bucket_dir_entry>;
class WorkQ : public Thread
{
public:
using unique_lock = std::unique_lock<std::mutex>;
- using work_f = std::function<void(RGWLC::LCWorker*, WorkItem&)>;
+ using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
using dequeue_result = boost::variant<void*, WorkItem>;
+ static constexpr uint32_t FLAG_NONE = 0x0000;
+ static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
+ static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
+ static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
+
private:
- const work_f bsf = [](RGWLC::LCWorker* wk, WorkItem& wi) {};
+ const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
RGWLC::LCWorker* wk;
uint32_t qmax;
+ int ix;
std::mutex mtx;
std::condition_variable cv;
+ uint32_t flags;
vector<WorkItem> items;
work_f f;
public:
WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
- : wk(wk), qmax(qmax), f(bsf)
+ : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
{
- create((string{"workpool_thr_"} + to_string(ix)).c_str());
+ create(thr_name().c_str());
}
+ std::string thr_name() {
+ return std::string{"wp_thrd: "}
+ + std::to_string(wk->ix) + ", " + std::to_string(ix);
+ }
+
void setf(work_f _f) {
f = _f;
}
unique_lock uniq(mtx);
while ((!wk->get_lc()->going_down()) &&
(items.size() > qmax)) {
+ flags |= FLAG_EWAIT_SYNC;
cv.wait_for(uniq, 200ms);
}
items.push_back(item);
+ if (flags & FLAG_DWAIT_SYNC) {
+ flags &= ~FLAG_DWAIT_SYNC;
+ cv.notify_one();
+ }
}
void drain() {
unique_lock uniq(mtx);
- while ((!wk->get_lc()->going_down()) &&
- (items.size() > 0)) {
+ flags |= FLAG_EDRAIN_SYNC;
+ while (flags & FLAG_EDRAIN_SYNC) {
cv.wait_for(uniq, 200ms);
}
}
unique_lock uniq(mtx);
while ((!wk->get_lc()->going_down()) &&
(items.size() == 0)) {
+ /* clear drain state, as we are NOT doing work and qlen==0 */
+ if (flags & FLAG_EDRAIN_SYNC) {
+ flags &= ~FLAG_EDRAIN_SYNC;
+ }
+ flags |= FLAG_DWAIT_SYNC;
cv.wait_for(uniq, 200ms);
}
if (items.size() > 0) {
auto item = items.back();
items.pop_back();
+ if (flags & FLAG_EWAIT_SYNC) {
+ flags &= ~FLAG_EWAIT_SYNC;
+ cv.notify_one();
+ }
return {item};
}
return nullptr;
/* going down */
break;
}
- f(wk, boost::get<WorkItem>(item));
+ f(wk, this, boost::get<WorkItem>(item));
}
return nullptr;
}
ix(0)
{}
+ ~WorkPool() {
+ for (auto& wq : wqs) {
+ wq.join();
+ }
+ }
+
void setf(WorkQ::work_f _f) {
for (auto& wq : wqs) {
wq.setf(_f);
}
}; /* WorkPool */
-RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
- RGWLC *_lc)
- : dpp(_dpp), cct(_cct), lc(_lc)
+RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
+ RGWLC *lc, int ix)
+ : dpp(dpp), cct(cct), lc(lc), ix(ix)
{
auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
workpool = new WorkPool(this, wpw, 512);
}
+static inline bool worker_should_stop(time_t stop_at, bool once)
+{
+ return !once && stop_at < time(nullptr);
+}
+
int RGWLC::handle_multipart_expiration(
RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
- LCWorker* worker)
+ LCWorker* worker, time_t stop_at, bool once)
{
MultipartMetaFilter mp_filter;
vector<rgw_bucket_dir_entry> objs;
list_op.params.ns = RGW_OBJ_NS_MULTIPART;
list_op.params.filter = &mp_filter;
- auto pf = [&](RGWLC::LCWorker* wk, WorkItem& wi) {
- auto wt = boost::get<std::tuple<const lc_op&, rgw_bucket_dir_entry>>(wi);
+ auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
+ auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
auto& [rule, obj] = wt;
RGWMPObj mp_obj;
if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
return;
}
RGWObjectCtx rctx(store);
- ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
- if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(wk->get_lc(), 0)
- << "ERROR: abort_multipart_upload failed, ret=" << ret
- << ", meta:" << obj.key
- << dendl;
- } else if (ret == -ERR_NO_SUCH_UPLOAD) {
- ldpp_dout(wk->get_lc(), 5)
- << "ERROR: abort_multipart_upload failed, ret=" << ret
- << ", meta:" << obj.key
- << dendl;
- }
+ int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
+ if (ret == 0) {
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_abort_mpu, 1);
+ }
+ } else {
+ if (ret == -ERR_NO_SUCH_UPLOAD) {
+ ldpp_dout(wk->get_lc(), 5)
+ << "ERROR: abort_multipart_upload failed, ret=" << ret
+ << wq->thr_name()
+ << ", meta:" << obj.key
+ << dendl;
+ } else {
+ ldpp_dout(wk->get_lc(), 0)
+ << "ERROR: abort_multipart_upload failed, ret=" << ret
+ << wq->thr_name()
+ << ", meta:" << obj.key
+ << dendl;
+ }
+ } /* abort failed */
} /* expired */
};
for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
++prefix_iter) {
+
+ if (worker_should_stop(stop_at, once)) {
+ ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+ << worker->ix
+ << dendl;
+ return 0;
+ }
+
if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
continue;
}
}
for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
- std::tuple<const lc_op&, rgw_bucket_dir_entry> t1 =
+ std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
{prefix_iter->second, *obj_iter};
worker->workpool->enqueue(WorkItem{t1});
if (going_down()) {
- worker->workpool->drain();
return 0;
}
} /* for objs */
for (const auto& tag : object_tags.get_tags()) {
const auto& rule_tags = rule_action.obj_tags->get_tags();
const auto& iter = rule_tags.find(tag.first);
+ if(iter == rule_tags.end())
+ continue;
if(iter->second == tag.second)
{
tag_count++;
oc.rctx, tags_bl);
if (ret < 0) {
if (ret != -ENODATA) {
- ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
+ ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
+ << ret << " " << oc.wq->thr_name() << dendl;
}
return 0;
}
auto iter = tags_bl.cbegin();
dest_obj_tags.decode(iter);
} catch (buffer::error& err) {
- ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
+ ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
+ << oc.wq->thr_name() << dendl;
return -EIO;
}
if (! has_all_tags(op, dest_obj_tags)) {
- ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl;
+ ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj
+ << " as tags do not match in rule: "
+ << op.id << " "
+ << oc.wq->thr_name() << dendl;
return 0;
}
}
if (ret == -ENOENT) {
return false;
}
- ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
+ ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj
+ << " returned ret=" << ret << " "
+ << oc.wq->thr_name() << dendl;
return false;
}
class LCOpAction_CurrentExpiration : public LCOpAction {
public:
+ LCOpAction_CurrentExpiration(op_env& env) {}
+
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
if (!o.is_current()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": not current, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
if (o.is_delete_marker()) {
- if (oc.ol.next_has_same_name()) {
- return false;
+ std::string nkn;
+ if (oc.next_key_name) nkn = *oc.next_key_name;
+ if (oc.next_has_same_name(o.key.name)) {
+ ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key
+ << " next_key_name: %%" << nkn << "%% "
+ << oc.wq->thr_name() << dendl;
+ return false;
} else {
+ ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key
+ << " next_key_name: %%" << nkn << "%% "
+ << oc.wq->thr_name() << dendl;
*exp_time = real_clock::now();
return true;
}
auto& op = oc.op;
if (op.expiration <= 0) {
if (op.expiration_date == boost::none) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": no expiration set in rule, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
is_expired = ceph_clock_now() >=
is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
}
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+ << (int)is_expired << " "
+ << oc.wq->thr_name() << dendl;
return is_expired;
}
int r;
if (o.is_delete_marker()) {
r = remove_expired_obj(oc, true);
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r) << " "
+ << oc.wq->thr_name() << dendl;
+ return r;
+ }
+ ldout(oc.cct, 2) << "DELETED: current is-dm "
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << oc.wq->thr_name() << dendl;
} else {
+ /* ! o.is_delete_marker() */
r = remove_expired_obj(oc, !oc.bucket_info.versioned());
+ if (r < 0) {
+ ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r) << " "
+ << oc.wq->thr_name() << dendl;
+ return r;
+ }
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_expire_current, 1);
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " " << oc.wq->thr_name() << dendl;
}
- if (r < 0) {
- ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
- return r;
- }
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
return 0;
}
};
class LCOpAction_NonCurrentExpiration : public LCOpAction {
+protected:
public:
+ LCOpAction_NonCurrentExpiration(op_env& env)
+ {}
+
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
if (o.is_current()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": current version, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
- auto mtime = oc.ol.get_prev_obj().meta.mtime;
int expiration = oc.op.noncur_expiration;
- bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
+ bool is_expired = obj_has_expired(oc.cct, oc.effective_mtime, expiration,
+ exp_time);
+
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+ << is_expired << " "
+ << oc.wq->thr_name() << dendl;
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
return is_expired &&
pass_object_lock_check(oc.store->getRados(),
oc.bucket_info, oc.obj, oc.rctx);
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r)
+ << " " << oc.wq->thr_name() << dendl;
return r;
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl;
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " (non-current expiration) "
+ << oc.wq->thr_name() << dendl;
return 0;
}
};
class LCOpAction_DMExpiration : public LCOpAction {
public:
+ LCOpAction_DMExpiration(op_env& env) {}
+
bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
auto& o = oc.o;
if (!o.is_delete_marker()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": not a delete marker, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
-
- if (oc.ol.next_has_same_name()) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl;
+ if (oc.next_has_same_name(o.key.name)) {
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": next is same object, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
int r = remove_expired_obj(oc, true);
if (r < 0) {
ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
- << oc.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r)
+ << " " << oc.wq->thr_name()
+ << dendl;
return r;
}
- ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl;
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_expire_dm, 1);
+ }
+ ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
+ << " (delete marker expiration) "
+ << oc.wq->thr_name() << dendl;
return 0;
}
};
bool is_expired;
if (transition.days < 0) {
if (transition.date == boost::none) {
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
+ << ": no transition day/date set in rule, skipping "
+ << oc.wq->thr_name() << dendl;
return false;
}
is_expired = ceph_clock_now() >=
is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
}
- ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
+ ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
+ << is_expired << " "
+ << oc.wq->thr_name() << dendl;
need_to_process =
(rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
if (!oc.store->svc()->zone->get_zone_params().
valid_placement(target_placement)) {
- ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
+ ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
+ << target_placement
<< " bucket="<< oc.bucket_info.bucket
- << " rule_id=" << oc.op.id << dendl;
+ << " rule_id=" << oc.op.id
+ << " " << oc.wq->thr_name() << dendl;
return -EINVAL;
}
o.versioned_epoch, oc.dpp, null_yield);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
- << oc.bucket_info.bucket << ":" << o.key
- << " -> " << transition.storage_class
- << " " << cpp_strerror(r) << dendl;
+ << oc.bucket_info.bucket << ":" << o.key
+ << " -> " << transition.storage_class
+ << " " << cpp_strerror(r)
+ << " " << oc.wq->thr_name() << dendl;
return r;
}
- ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl;
+ ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
+ << ":" << o.key << " -> "
+ << transition.storage_class
+ << " " << oc.wq->thr_name() << dendl;
return 0;
}
};
public:
LCOpAction_CurrentTransition(const transition_action& _transition)
: LCOpAction_Transition(_transition) {}
+ int process(lc_op_ctx& oc) {
+ int r = LCOpAction_Transition::process(oc);
+ if (r == 0) {
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_transition_current, 1);
+ }
+ }
+ return r;
+ }
};
class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
}
ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
- return oc.ol.get_prev_obj().meta.mtime;
+ return oc.effective_mtime;
}
public:
- LCOpAction_NonCurrentTransition(const transition_action& _transition)
- : LCOpAction_Transition(_transition) {}
+ LCOpAction_NonCurrentTransition(op_env& env,
+ const transition_action& _transition)
+ : LCOpAction_Transition(_transition)
+ {}
+ int process(lc_op_ctx& oc) {
+ int r = LCOpAction_Transition::process(oc);
+ if (r == 0) {
+ if (perfcounter) {
+ perfcounter->inc(l_rgw_lc_transition_noncurrent, 1);
+ }
+ }
+ return r;
+ }
};
void LCOpRule::build()
if (op.expiration > 0 ||
op.expiration_date != boost::none) {
- actions.emplace_back(new LCOpAction_CurrentExpiration);
+ actions.emplace_back(new LCOpAction_CurrentExpiration(env));
}
if (op.dm_expiration) {
- actions.emplace_back(new LCOpAction_DMExpiration);
+ actions.emplace_back(new LCOpAction_DMExpiration(env));
}
if (op.noncur_expiration > 0) {
- actions.emplace_back(new LCOpAction_NonCurrentExpiration);
+ actions.emplace_back(new LCOpAction_NonCurrentExpiration(env));
}
for (auto& iter : op.transitions) {
}
for (auto& iter : op.noncur_transitions) {
- actions.emplace_back(new LCOpAction_NonCurrentTransition(iter.second));
+ actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second));
}
}
-int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
+void LCOpRule::update()
{
- lc_op_ctx ctx(env, o, dpp);
+ next_key_name = env.ol.next_key_name();
+ effective_mtime = env.ol.get_prev_obj().meta.mtime;
+}
- unique_ptr<LCOpAction> *selected = nullptr;
+int LCOpRule::process(rgw_bucket_dir_entry& o,
+ const DoutPrefixProvider *dpp,
+ WorkQ* wq)
+{
+ lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq);
+ shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
real_time exp;
for (auto& a : actions) {
}
if (!cont) {
- ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl;
+ ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
+ << ": no rule match, skipping "
+ << " " << wq->thr_name() << dendl;
return 0;
}
int r = (*selected)->process(ctx);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
- << env.bucket_info.bucket << ":" << o.key
- << " " << cpp_strerror(r) << dendl;
+ << env.bucket_info.bucket << ":" << o.key
+ << " " << cpp_strerror(r)
+ << " " << wq->thr_name() << dendl;
return r;
}
- ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl;
+ ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
+ << o.key << " " << wq->thr_name() << dendl;
}
return 0;
}
-int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker)
+int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
+ time_t stop_at, bool once)
{
RGWLifecycleConfiguration config(cct);
RGWBucketInfo bucket_info;
store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
&bucket_attrs);
if (ret < 0) {
- ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
+ ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
+ << " failed" << dendl;
return ret;
}
+ auto stack_guard = make_scope_guard(
+ [&worker, &bucket_info]
+ {
+ worker->workpool->drain();
+ }
+ );
+
if (bucket_info.bucket.marker != bucket_marker) {
- ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
- << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
+ ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
+ << bucket_tenant << ":" << bucket_name
+ << " cur_marker=" << bucket_info.bucket.marker
<< " orig_marker=" << bucket_marker << dendl;
return -ENOENT;
}
try {
config.decode(iter);
} catch (const buffer::error& e) {
- ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl;
+ ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed"
+ << dendl;
return -1;
}
- auto pf = [](RGWLC::LCWorker* wk, WorkItem& wi) {
+ auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
auto wt =
- boost::get<std::tuple<LCOpRule&, rgw_bucket_dir_entry>>(wi);
+ boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
auto& [op_rule, o] = wt;
+
ldpp_dout(wk->get_lc(), 20)
- << __func__ << "(): key=" << o.key << dendl;
- std::cout << "KEY2: " << o.key << std::endl;
- int ret = op_rule.process(o, wk->dpp);
+ << __func__ << "(): key=" << o.key << wq->thr_name()
+ << dendl;
+ int ret = op_rule.process(o, wk->dpp, wq);
if (ret < 0) {
ldpp_dout(wk->get_lc(), 20)
<< "ERROR: orule.process() returned ret=" << ret
+ << wq->thr_name()
<< dendl;
}
};
rgw_obj_key next_marker;
for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
++prefix_iter) {
+
+ if (worker_should_stop(stop_at, once)) {
+ ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
+ << worker->ix
+ << dendl;
+ return 0;
+ }
+
auto& op = prefix_iter->second;
if (!is_valid_op(op)) {
continue;
}
- ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
+ ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
+ << dendl;
if (prefix_iter != prefix_map.begin() &&
(prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
prev(prefix_iter)->first) == 0)) {
op_env oenv(op, store, worker, bucket_info, ol);
LCOpRule orule(oenv);
orule.build(); // why can't ctor do it?
-#if 0
- /* would permit passing o by reference, removes fetch overlap */
- auto fetch_barrier = [&worker]()
- { worker->workpool->drain(); };
-#endif
rgw_bucket_dir_entry* o{nullptr};
for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
- std::tuple<LCOpRule&, rgw_bucket_dir_entry> t1 = {orule, *o};
+ orule.update();
+ std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
worker->workpool->enqueue(WorkItem{t1});
}
worker->workpool->drain();
}
- ret = handle_multipart_expiration(&target, prefix_map, worker);
+ ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once);
return ret;
}
int RGWLC::bucket_lc_post(int index, int max_lock_sec,
- pair<string, int>& entry, int& result,
+ cls_rgw_lc_entry& entry, int& result,
LCWorker* worker)
{
utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
l.set_cookie(cookie);
l.set_duration(lock_duration);
+ dout(5) << "RGWLC::bucket_lc_post(): POST " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
do {
int ret = l.lock_exclusive(
&store->getRados()->lc_pool_ctx, obj_names[index]);
- if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+ if (ret == -EBUSY || ret == -EEXIST) {
+ /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
- << obj_names[index] << ", sleep 5, try again" << dendl;
+ << obj_names[index] << ", sleep 5, try again " << dendl;
sleep(5);
continue;
}
if (ret < 0)
return 0;
- ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
+ ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
+ << dendl;
if (result == -ENOENT) {
ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
obj_names[index], entry);
}
goto clean;
} else if (result < 0) {
- entry.second = lc_failed;
+ entry.status = lc_failed;
} else {
- entry.second = lc_complete;
+ entry.status = lc_complete;
}
ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
}
clean:
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl;
+ ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
+ << obj_names[index] << dendl;
return 0;
} while (true);
}
-int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries,
- map<string, int>* progress_map)
+int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
+ vector<cls_rgw_lc_entry>& progress_map,
+ int& index)
{
- int index = 0;
- progress_map->clear();
- for(; index <max_objs; index++) {
- map<string, int > entries;
+ progress_map.clear();
+ for(; index < max_objs; index++, marker="") {
+ vector<cls_rgw_lc_entry> entries;
int ret =
cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
max_entries, entries);
return ret;
}
}
- map<string, int>::iterator iter;
- for (iter = entries.begin(); iter != entries.end(); ++iter) {
- progress_map->insert(*iter);
- }
+ progress_map.reserve(progress_map.size() + entries.size());
+ progress_map.insert(progress_map.end(), entries.begin(), entries.end());
+
+ /* update index, marker tuple */
+ if (progress_map.size() > 0)
+ marker = progress_map.back().bucket;
+
+ if (progress_map.size() >= max_entries)
+ break;
}
return 0;
}
return v;
}
-int RGWLC::process(LCWorker* worker)
+int RGWLC::process(LCWorker* worker, bool once = false)
{
int max_secs = cct->_conf->rgw_lc_lock_max_time;
* that might be running in parallel */
vector<int> shard_seq = random_sequence(max_objs);
for (auto index : shard_seq) {
- int ret = process(index, max_secs, worker);
+ int ret = process(index, max_secs, worker, once);
if (ret < 0)
return ret;
}
return 0;
}
-int RGWLC::process(int index, int max_lock_secs, LCWorker* worker)
+bool RGWLC::expired_session(time_t started)
{
+ time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+ ? cct->_conf->rgw_lc_debug_interval
+ : 24*60*60;
+
+ auto now = time(nullptr);
+
+ dout(16) << "RGWLC::expired_session"
+ << " started: " << started
+ << " interval: " << interval << "(*2==" << 2*interval << ")"
+ << " now: " << now
+ << dendl;
+
+ return (started + 2*interval < now);
+}
+
+time_t RGWLC::thread_stop_at()
+{
+ uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
+ ? cct->_conf->rgw_lc_debug_interval
+ : 24*60*60;
+
+ return time(nullptr) + interval;
+}
+
+int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
+ bool once = false)
+{
+ dout(5) << "RGWLC::process(): ENTER: "
+ << "index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
rados::cls::lock::Lock l(lc_index_lock_name);
do {
utime_t now = ceph_clock_now();
- pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
+ //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
+ cls_rgw_lc_entry entry;
if (max_lock_secs <= 0)
return -EAGAIN;
int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
obj_names[index]);
- if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
+ if (ret == -EBUSY || ret == -EEXIST) {
+ /* already locked by another lc processor */
ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
<< obj_names[index] << ", sleep 5, try again" << dendl;
sleep(5);
goto exit;
}
- if(!if_already_run_today(head.start_date)) {
+ if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
+ ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
+ obj_names[index], head.marker, entry);
+ if (ret >= 0) {
+ if (entry.status == lc_processing) {
+ if (expired_session(entry.start_time)) {
+ dout(5) << "RGWLC::process(): STALE lc session found for: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << " (clearing)"
+ << dendl;
+ } else {
+ dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+ goto exit;
+ }
+ }
+ }
+ }
+
+ if(!if_already_run_today(head.start_date) ||
+ once) {
head.start_date = now;
head.marker.clear();
ret = bucket_lc_prepare(index, worker);
}
/* termination condition (eof) */
- if (entry.first.empty())
+ if (entry.bucket.empty())
goto exit;
- entry.second = lc_processing;
+ ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
+ entry.status = lc_processing;
ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
- obj_names[index], entry);
+ obj_names[index], entry);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
- << obj_names[index]
- << " (" << entry.first << ","
- << entry.second << ")"
- << dendl;
+ << obj_names[index] << entry.bucket << entry.status << dendl;
goto exit;
}
- head.marker = entry.first;
- ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index],
- head);
+ head.marker = entry.bucket;
+ ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx,
+ obj_names[index], head);
if (ret < 0) {
ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
<< obj_names[index]
- << dendl;
+ << dendl;
goto exit;
}
+
+ ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
+ << " index: " << index << " worker ix: " << worker->ix
+ << dendl;
+
l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- ret = bucket_lc_process(entry.first, worker);
+ ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
bucket_lc_post(index, max_lock_secs, entry, ret, worker);
- } while(1);
+ } while(1 && !once);
+
+ return 0;
exit:
- l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
- return 0;
+ l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
+ return 0;
}
void RGWLC::start_processor()
workers.reserve(maxw);
for (int ix = 0; ix < maxw; ++ix) {
auto worker =
- std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this);
+ std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
workers.emplace_back(std::move(worker));
}
RGWLC::LCWorker::~LCWorker()
{
- workpool->drain();
delete workpool;
} /* ~LCWorker */
o.push_back(new RGWLifecycleConfiguration);
}
-void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
+static inline void get_lc_oid(CephContext *cct,
+ const string& shard_id, string *oid)
{
int max_objs =
(cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
cct->_conf->rgw_lc_max_objs);
- /* XXXX oh noes!!! */
+ /* n.b. review hash algo */
int index = ceph_str_hash_linux(shard_id.c_str(),
shard_id.size()) % HASH_PRIME % max_objs;
*oid = lc_oid_prefix;
}
template<typename F>
-static int guard_lc_modify(
- rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket,
- const string& cookie, const F& f) {
+static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
+ const rgw_bucket& bucket, const string& cookie,
+ const F& f) {
CephContext *cct = store->ctx();
string shard_id = get_lc_shard_name(bucket);
string oid;
get_lc_oid(cct, shard_id, &oid);
- pair<string, int> entry(shard_id, lc_uninitial);
+ /* XXX it makes sense to take shard_id for a bucket_id? */
+ cls_rgw_lc_entry entry;
+ entry.bucket = shard_id;
+ entry.status = lc_uninitial;
int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
rados::cls::lock::Lock l(lc_index_lock_name);
rgw_bucket& bucket = bucket_info.bucket;
+
ret = guard_lc_modify(store, bucket, cookie,
[&](librados::IoCtx *ctx, const string& oid,
- const pair<string, int>& entry) {
+ const cls_rgw_lc_entry& entry) {
return cls_rgw_lc_set_entry(*ctx, oid, entry);
});
ret = guard_lc_modify(store, bucket, cookie,
[&](librados::IoCtx *ctx, const string& oid,
- const pair<string, int>& entry) {
+ const cls_rgw_lc_entry& entry) {
return cls_rgw_lc_rm_entry(*ctx, oid, entry);
});
std::string lc_oid;
get_lc_oid(store->ctx(), shard_name, &lc_oid);
- rgw_lc_entry_t entry;
+ cls_rgw_lc_entry entry;
// There are multiple cases we need to encounter here
// 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
// 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
ret = guard_lc_modify(
store, bucket_info.bucket, cookie,
- [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
- const pair<string, int>& entry) {
+ [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx,
+ const string& oid,
+ const cls_rgw_lc_entry& entry) {
return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
});
} /* rgwlc_s3_expiration_header */
+bool s3_multipart_abort_header(
+ DoutPrefixProvider* dpp,
+ const rgw_obj_key& obj_key,
+ const ceph::real_time& mtime,
+ const std::map<std::string, buffer::list>& bucket_attrs,
+ ceph::real_time& abort_date,
+ std::string& rule_id)
+{
+ CephContext* cct = dpp->get_cct();
+ RGWLifecycleConfiguration config(cct);
+
+ const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
+ if (aiter == bucket_attrs.end())
+ return false;
+
+ bufferlist::const_iterator iter{&aiter->second};
+ try {
+ config.decode(iter);
+ } catch (const buffer::error& e) {
+ ldpp_dout(dpp, 0) << __func__
+ << "() decode life cycle config failed"
+ << dendl;
+ return false;
+ } /* catch */
+
+ std::optional<ceph::real_time> abort_date_tmp;
+ std::optional<std::string_view> rule_id_tmp;
+ const auto& rule_map = config.get_rule_map();
+ for (const auto& ri : rule_map) {
+ const auto& rule = ri.second;
+ const auto& id = rule.get_id();
+ const auto& filter = rule.get_filter();
+ const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix();
+ const auto& mp_expiration = rule.get_mp_expiration();
+ if (!rule.is_enabled()) {
+ continue;
+ }
+ if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) {
+ continue;
+ }
+
+ std::optional<ceph::real_time> rule_abort_date;
+ if (mp_expiration.has_days()) {
+ rule_abort_date = std::optional<ceph::real_time>(
+ mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
+ }
+
+ // update earliest abort date
+ if (rule_abort_date) {
+ if ((! abort_date_tmp) ||
+ (*abort_date_tmp > *rule_abort_date)) {
+ abort_date_tmp =
+ std::optional<ceph::real_time>(rule_abort_date);
+ rule_id_tmp = std::optional<std::string_view>(id);
+ }
+ }
+ }
+ if (abort_date_tmp && rule_id_tmp) {
+ abort_date = *abort_date_tmp;
+ rule_id = *rule_id_tmp;
+ return true;
+ } else {
+ return false;
+ }
+}
+
} /* namespace rgw::lc */