]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_lc.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_lc.cc
CommitLineData
1adf2230 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
1adf2230 3
7c673cae
FG
4#include <string.h>
5#include <iostream>
6#include <map>
e306af50
TL
7#include <algorithm>
8#include <tuple>
9#include <functional>
7c673cae
FG
10
11#include <boost/algorithm/string/split.hpp>
12#include <boost/algorithm/string.hpp>
9f95a23c 13#include <boost/algorithm/string/predicate.hpp>
e306af50 14#include <boost/variant.hpp>
7c673cae 15
f6b5b4d7 16#include "include/scope_guard.h"
7c673cae 17#include "common/Formatter.h"
e306af50 18#include "common/containers.h"
7c673cae 19#include <common/errno.h>
11fdf7f2 20#include "include/random.h"
7c673cae
FG
21#include "cls/rgw/cls_rgw_client.h"
22#include "cls/lock/cls_lock_client.h"
e306af50 23#include "rgw_perf_counters.h"
7c673cae
FG
24#include "rgw_common.h"
25#include "rgw_bucket.h"
26#include "rgw_lc.h"
eafe8130 27#include "rgw_zone.h"
11fdf7f2 28#include "rgw_string.h"
9f95a23c 29#include "rgw_multi.h"
f6b5b4d7 30#include "rgw_sal.h"
9f95a23c
TL
31
32// this seems safe to use, at least for now--arguably, we should
33// prefer header-only fmt, in general
34#undef FMT_HEADER_ONLY
35#define FMT_HEADER_ONLY 1
36#include "fmt/format.h"
11fdf7f2
TL
37
38#include "services/svc_sys_obj.h"
9f95a23c
TL
39#include "services/svc_zone.h"
40#include "services/svc_tier_rados.h"
7c673cae
FG
41
42#define dout_context g_ceph_context
43#define dout_subsys ceph_subsys_rgw
44
45const char* LC_STATUS[] = {
46 "UNINITIAL",
47 "PROCESSING",
48 "FAILED",
49 "COMPLETE"
50};
51
7c673cae
FG
52using namespace librados;
53
11fdf7f2 54bool LCRule::valid() const
7c673cae
FG
55{
56 if (id.length() > MAX_ID_LEN) {
57 return false;
58 }
e306af50
TL
59 else if(expiration.empty() && noncur_expiration.empty() &&
60 mp_expiration.empty() && !dm_expiration &&
11fdf7f2 61 transitions.empty() && noncur_transitions.empty()) {
7c673cae
FG
62 return false;
63 }
e306af50
TL
64 else if (!expiration.valid() || !noncur_expiration.valid() ||
65 !mp_expiration.valid()) {
7c673cae
FG
66 return false;
67 }
11fdf7f2
TL
68 if (!transitions.empty()) {
69 bool using_days = expiration.has_days();
70 bool using_date = expiration.has_date();
71 for (const auto& elem : transitions) {
72 if (!elem.second.valid()) {
73 return false;
74 }
75 using_days = using_days || elem.second.has_days();
76 using_date = using_date || elem.second.has_date();
77 if (using_days && using_date) {
78 return false;
79 }
80 }
81 }
82 for (const auto& elem : noncur_transitions) {
83 if (!elem.second.valid()) {
84 return false;
85 }
86 }
87
7c673cae
FG
88 return true;
89}
90
e306af50
TL
91void LCRule::init_simple_days_rule(std::string_view _id,
92 std::string_view _prefix, int num_days)
11fdf7f2
TL
93{
94 id = _id;
95 prefix = _prefix;
96 char buf[32];
97 snprintf(buf, sizeof(buf), "%d", num_days);
98 expiration.set_days(buf);
99 set_enabled(true);
100}
101
102void RGWLifecycleConfiguration::add_rule(const LCRule& rule)
7c673cae 103{
11fdf7f2
TL
104 auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
105 rule_map.insert(pair<string, LCRule>(id, rule));
7c673cae
FG
106}
107
11fdf7f2 108bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
7c673cae 109{
494da23a 110 lc_op op(rule.get_id());
11fdf7f2
TL
111 op.status = rule.is_enabled();
112 if (rule.get_expiration().has_days()) {
113 op.expiration = rule.get_expiration().get_days();
7c673cae 114 }
11fdf7f2
TL
115 if (rule.get_expiration().has_date()) {
116 op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date());
7c673cae 117 }
11fdf7f2
TL
118 if (rule.get_noncur_expiration().has_days()) {
119 op.noncur_expiration = rule.get_noncur_expiration().get_days();
224ce89b 120 }
11fdf7f2
TL
121 if (rule.get_mp_expiration().has_days()) {
122 op.mp_expiration = rule.get_mp_expiration().get_days();
7c673cae 123 }
11fdf7f2
TL
124 op.dm_expiration = rule.get_dm_expiration();
125 for (const auto &elem : rule.get_transitions()) {
126 transition_action action;
127 if (elem.second.has_days()) {
128 action.days = elem.second.get_days();
129 } else {
130 action.date = ceph::from_iso_8601(elem.second.get_date());
131 }
e306af50
TL
132 action.storage_class
133 = rgw_placement_rule::get_canonical_storage_class(elem.first);
11fdf7f2
TL
134 op.transitions.emplace(elem.first, std::move(action));
135 }
136 for (const auto &elem : rule.get_noncur_transitions()) {
137 transition_action action;
138 action.days = elem.second.get_days();
139 action.date = ceph::from_iso_8601(elem.second.get_date());
140 action.storage_class = elem.first;
141 op.noncur_transitions.emplace(elem.first, std::move(action));
7c673cae 142 }
181888fb 143 std::string prefix;
11fdf7f2
TL
144 if (rule.get_filter().has_prefix()){
145 prefix = rule.get_filter().get_prefix();
181888fb 146 } else {
11fdf7f2
TL
147 prefix = rule.get_prefix();
148 }
149
150 if (rule.get_filter().has_tags()){
151 op.obj_tags = rule.get_filter().get_tags();
181888fb 152 }
494da23a
TL
153 prefix_map.emplace(std::move(prefix), std::move(op));
154 return true;
7c673cae
FG
155}
156
11fdf7f2 157int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
7c673cae 158{
11fdf7f2 159 if (!rule.valid()) {
7c673cae
FG
160 return -EINVAL;
161 }
11fdf7f2 162 auto& id = rule.get_id();
7c673cae
FG
163 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
164 return -EINVAL;
165 }
e306af50
TL
166 if (rule.get_filter().has_tags() && (rule.get_dm_expiration() ||
167 !rule.get_mp_expiration().empty())) {
9f95a23c
TL
168 return -ERR_INVALID_REQUEST;
169 }
11fdf7f2 170 rule_map.insert(pair<string, LCRule>(id, rule));
7c673cae
FG
171
172 if (!_add_rule(rule)) {
173 return -ERR_INVALID_REQUEST;
174 }
175 return 0;
176}
177
e306af50
TL
178bool RGWLifecycleConfiguration::has_same_action(const lc_op& first,
179 const lc_op& second) {
224ce89b
WB
180 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
181 (second.expiration > 0 || second.expiration_date != boost::none)) {
182 return true;
183 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
184 return true;
185 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
186 return true;
11fdf7f2
TL
187 } else if (!first.transitions.empty() && !second.transitions.empty()) {
188 for (auto &elem : first.transitions) {
189 if (second.transitions.find(elem.first) != second.transitions.end()) {
190 return true;
191 }
192 }
e306af50
TL
193 } else if (!first.noncur_transitions.empty() &&
194 !second.noncur_transitions.empty()) {
11fdf7f2 195 for (auto &elem : first.noncur_transitions) {
e306af50
TL
196 if (second.noncur_transitions.find(elem.first) !=
197 second.noncur_transitions.end()) {
11fdf7f2
TL
198 return true;
199 }
200 }
224ce89b 201 }
11fdf7f2 202 return false;
224ce89b
WB
203}
204
494da23a
TL
205/* Formerly, this method checked for duplicate rules using an invalid
206 * method (prefix uniqueness). */
224ce89b 207bool RGWLifecycleConfiguration::valid()
7c673cae 208{
7c673cae
FG
209 return true;
210}
211
212void *RGWLC::LCWorker::entry() {
213 do {
214 utime_t start = ceph_clock_now();
215 if (should_work(start)) {
11fdf7f2 216 ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
f6b5b4d7 217 int r = lc->process(this, false /* once */);
7c673cae 218 if (r < 0) {
f6b5b4d7
TL
219 ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
220 << r << dendl;
7c673cae 221 }
11fdf7f2 222 ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
7c673cae
FG
223 }
224 if (lc->going_down())
225 break;
226
227 utime_t end = ceph_clock_now();
228 int secs = schedule_next_start_time(start, end);
c07f9fc5
FG
229 utime_t next;
230 next.set_from_double(end + secs);
231
f6b5b4d7
TL
232 ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
233 << rgw_to_asctime(next) << dendl;
7c673cae 234
9f95a23c
TL
235 std::unique_lock l{lock};
236 cond.wait_for(l, std::chrono::seconds(secs));
7c673cae
FG
237 } while (!lc->going_down());
238
239 return NULL;
240}
241
9f95a23c 242void RGWLC::initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store) {
7c673cae
FG
243 cct = _cct;
244 store = _store;
245 max_objs = cct->_conf->rgw_lc_max_objs;
246 if (max_objs > HASH_PRIME)
247 max_objs = HASH_PRIME;
248
249 obj_names = new string[max_objs];
250
251 for (int i = 0; i < max_objs; i++) {
252 obj_names[i] = lc_oid_prefix;
253 char buf[32];
254 snprintf(buf, 32, ".%d", i);
255 obj_names[i].append(buf);
256 }
257
258#define COOKIE_LEN 16
259 char cookie_buf[COOKIE_LEN + 1];
260 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
261 cookie = cookie_buf;
262}
263
264void RGWLC::finalize()
265{
266 delete[] obj_names;
267}
268
f6b5b4d7 269bool RGWLC::if_already_run_today(time_t start_date)
7c673cae
FG
270{
271 struct tm bdt;
272 time_t begin_of_day;
273 utime_t now = ceph_clock_now();
274 localtime_r(&start_date, &bdt);
275
276 if (cct->_conf->rgw_lc_debug_interval > 0) {
3efd9988
FG
277 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
278 return true;
279 else
280 return false;
7c673cae
FG
281 }
282
283 bdt.tm_hour = 0;
284 bdt.tm_min = 0;
285 bdt.tm_sec = 0;
286 begin_of_day = mktime(&bdt);
287 if (now - begin_of_day < 24*60*60)
288 return true;
289 else
290 return false;
291}
292
f6b5b4d7
TL
293static inline std::ostream& operator<<(std::ostream &os, cls_rgw_lc_entry& ent) {
294 os << "<ent: bucket=";
295 os << ent.bucket;
296 os << "; start_time=";
297 os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
298 os << "; status=";
299 os << ent.status;
300 os << ">";
301 return os;
302}
303
e306af50 304int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
7c673cae 305{
f6b5b4d7 306 vector<cls_rgw_lc_entry> entries;
7c673cae
FG
307 string marker;
308
f6b5b4d7
TL
309 dout(5) << "RGWLC::bucket_lc_prepare(): PREPARE "
310 << "index: " << index << " worker ix: " << worker->ix
311 << dendl;
312
7c673cae
FG
313#define MAX_LC_LIST_ENTRIES 100
314 do {
e306af50
TL
315 int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index],
316 marker, MAX_LC_LIST_ENTRIES, entries);
7c673cae
FG
317 if (ret < 0)
318 return ret;
f6b5b4d7
TL
319
320 for (auto& entry : entries) {
321 entry.start_time = ceph_clock_now();
322 entry.status = lc_uninitial; // lc_uninitial? really?
e306af50 323 ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
f6b5b4d7 324 obj_names[index], entry);
7c673cae 325 if (ret < 0) {
f6b5b4d7
TL
326 ldpp_dout(this, 0)
327 << "RGWLC::bucket_lc_prepare() failed to set entry on "
328 << obj_names[index] << dendl;
11fdf7f2 329 return ret;
7c673cae 330 }
11fdf7f2
TL
331 }
332
f6b5b4d7
TL
333 if (! entries.empty()) {
334 marker = std::move(entries.back().bucket);
7c673cae
FG
335 }
336 } while (!entries.empty());
337
338 return 0;
339}
340
e306af50
TL
341static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days,
342 ceph::real_time *expire_time = nullptr)
7c673cae 343{
11fdf7f2
TL
344 double timediff, cmp;
345 utime_t base_time;
31f18b77
FG
346 if (cct->_conf->rgw_lc_debug_interval <= 0) {
347 /* Normal case, run properly */
348 cmp = days*24*60*60;
11fdf7f2 349 base_time = ceph_clock_now().round_to_day();
31f18b77
FG
350 } else {
351 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
352 cmp = days*cct->_conf->rgw_lc_debug_interval;
11fdf7f2 353 base_time = ceph_clock_now();
31f18b77 354 }
f6b5b4d7
TL
355 auto tt_mtime = ceph::real_clock::to_time_t(mtime);
356 timediff = base_time - tt_mtime;
7c673cae 357
11fdf7f2
TL
358 if (expire_time) {
359 *expire_time = mtime + make_timespan(cmp);
7c673cae 360 }
f6b5b4d7
TL
361
362 ldout(cct, 20) << __func__ << __func__
363 << "(): mtime=" << mtime << " days=" << days
364 << " base_time=" << base_time << " timediff=" << timediff
365 << " cmp=" << cmp
366 << " is_expired=" << (timediff >= cmp)
367 << dendl;
11fdf7f2
TL
368
369 return (timediff >= cmp);
7c673cae
FG
370}
371
e306af50
TL
372static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info,
373 rgw_obj& obj, RGWObjectCtx& ctx)
eafe8130
TL
374{
375 if (!bucket_info.obj_lock_enabled()) {
376 return true;
377 }
378 RGWRados::Object op_target(store, bucket_info, ctx, obj);
379 RGWRados::Object::Read read_op(&op_target);
380 map<string, bufferlist> attrs;
381 read_op.params.attrs = &attrs;
9f95a23c 382 int ret = read_op.prepare(null_yield);
eafe8130
TL
383 if (ret < 0) {
384 if (ret == -ENOENT) {
385 return true;
386 } else {
387 return false;
388 }
389 } else {
390 auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
391 if (iter != attrs.end()) {
392 RGWObjectRetention retention;
393 try {
394 decode(retention, iter->second);
395 } catch (buffer::error& err) {
f6b5b4d7
TL
396 ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention"
397 << dendl;
eafe8130
TL
398 return false;
399 }
e306af50
TL
400 if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
401 ceph_clock_now()) {
eafe8130
TL
402 return false;
403 }
404 }
405 iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD);
406 if (iter != attrs.end()) {
407 RGWObjectLegalHold obj_legal_hold;
408 try {
409 decode(obj_legal_hold, iter->second);
410 } catch (buffer::error& err) {
f6b5b4d7
TL
411 ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold"
412 << dendl;
eafe8130
TL
413 return false;
414 }
415 if (obj_legal_hold.is_enabled()) {
416 return false;
417 }
418 }
419 return true;
420 }
421}
422
11fdf7f2 423class LCObjsLister {
9f95a23c 424 rgw::sal::RGWRadosStore *store;
11fdf7f2
TL
425 RGWBucketInfo& bucket_info;
426 RGWRados::Bucket target;
427 RGWRados::Bucket::List list_op;
428 bool is_truncated{false};
429 rgw_obj_key next_marker;
430 string prefix;
431 vector<rgw_bucket_dir_entry> objs;
432 vector<rgw_bucket_dir_entry>::iterator obj_iter;
433 rgw_bucket_dir_entry pre_obj;
434 int64_t delay_ms;
435
436public:
9f95a23c 437 LCObjsLister(rgw::sal::RGWRadosStore *_store, RGWBucketInfo& _bucket_info) :
11fdf7f2 438 store(_store), bucket_info(_bucket_info),
9f95a23c 439 target(store->getRados(), bucket_info), list_op(&target) {
11fdf7f2
TL
440 list_op.params.list_versions = bucket_info.versioned();
441 list_op.params.allow_unordered = true;
442 delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
443 }
444
445 void set_prefix(const string& p) {
446 prefix = p;
447 list_op.params.prefix = prefix;
448 }
449
450 int init() {
451 return fetch();
452 }
453
454 int fetch() {
e306af50
TL
455 int ret = list_op.list_objects(
456 1000, &objs, NULL, &is_truncated, null_yield);
11fdf7f2
TL
457 if (ret < 0) {
458 return ret;
459 }
460
461 obj_iter = objs.begin();
462
463 return 0;
464 }
465
466 void delay() {
467 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
468 }
469
e306af50
TL
470 bool get_obj(rgw_bucket_dir_entry **obj,
471 std::function<void(void)> fetch_barrier
472 = []() { /* nada */}) {
11fdf7f2 473 if (obj_iter == objs.end()) {
9f95a23c
TL
474 if (!is_truncated) {
475 delay();
476 return false;
11fdf7f2 477 } else {
e306af50 478 fetch_barrier();
9f95a23c 479 list_op.params.marker = pre_obj.key;
9f95a23c
TL
480 int ret = fetch();
481 if (ret < 0) {
e306af50
TL
482 ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret
483 << dendl;
9f95a23c
TL
484 return ret;
485 }
11fdf7f2
TL
486 }
487 delay();
488 }
e306af50
TL
489 /* returning address of entry in objs */
490 *obj = &(*obj_iter);
9f95a23c 491 return obj_iter != objs.end();
11fdf7f2
TL
492 }
493
494 rgw_bucket_dir_entry get_prev_obj() {
495 return pre_obj;
496 }
497
498 void next() {
499 pre_obj = *obj_iter;
500 ++obj_iter;
501 }
502
f6b5b4d7
TL
503 boost::optional<std::string> next_key_name() {
504 if (obj_iter == objs.end() ||
505 (obj_iter + 1) == objs.end()) {
11fdf7f2
TL
506 /* this should have been called after get_obj() was called, so this should
507 * only happen if is_truncated is false */
f6b5b4d7 508 return boost::none;
11fdf7f2 509 }
f6b5b4d7
TL
510
511 return ((obj_iter + 1)->key.name);
11fdf7f2 512 }
f6b5b4d7 513
e306af50 514}; /* LCObjsLister */
11fdf7f2
TL
515
516struct op_env {
e306af50
TL
517
518 using LCWorker = RGWLC::LCWorker;
519
f6b5b4d7 520 lc_op op;
9f95a23c 521 rgw::sal::RGWRadosStore *store;
e306af50 522 LCWorker* worker;
11fdf7f2
TL
523 RGWBucketInfo& bucket_info;
524 LCObjsLister& ol;
525
e306af50
TL
526 op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, LCWorker* _worker,
527 RGWBucketInfo& _bucket_info, LCObjsLister& _ol)
528 : op(_op), store(_store), worker(_worker), bucket_info(_bucket_info),
529 ol(_ol) {}
530}; /* op_env */
11fdf7f2
TL
531
532class LCRuleOp;
f6b5b4d7 533class WorkQ;
11fdf7f2
TL
534
535struct lc_op_ctx {
536 CephContext *cct;
f6b5b4d7
TL
537 op_env env;
538 rgw_bucket_dir_entry o;
539 boost::optional<std::string> next_key_name;
540 ceph::real_time effective_mtime;
11fdf7f2 541
9f95a23c 542 rgw::sal::RGWRadosStore *store;
11fdf7f2 543 RGWBucketInfo& bucket_info;
f6b5b4d7 544 lc_op& op; // ok--refers to expanded env.op
11fdf7f2
TL
545 LCObjsLister& ol;
546
547 rgw_obj obj;
548 RGWObjectCtx rctx;
9f95a23c 549 const DoutPrefixProvider *dpp;
f6b5b4d7
TL
550 WorkQ* wq;
551
552 lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
553 boost::optional<std::string> next_key_name,
554 ceph::real_time effective_mtime,
555 const DoutPrefixProvider *dpp, WorkQ* wq)
556 : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
557 effective_mtime(effective_mtime),
e306af50 558 store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
f6b5b4d7
TL
559 obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(dpp), wq(wq)
560 {}
561
562 bool next_has_same_name(const std::string& key_name) {
563 return (next_key_name && key_name.compare(
564 boost::get<std::string>(next_key_name)) == 0);
565 }
566
e306af50 567}; /* lc_op_ctx */
11fdf7f2
TL
568
569static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
570{
571 auto& store = oc.store;
572 auto& bucket_info = oc.bucket_info;
573 auto& o = oc.o;
574 auto obj_key = o.key;
575 auto& meta = o.meta;
576
577 if (!remove_indeed) {
578 obj_key.instance.clear();
579 } else if (obj_key.instance.empty()) {
580 obj_key.instance = "null";
581 }
582
583 rgw_obj obj(bucket_info.bucket, obj_key);
584 ACLOwner obj_owner;
585 obj_owner.set_id(rgw_user {meta.owner});
586 obj_owner.set_name(meta.owner_display_name);
587
9f95a23c 588 RGWRados::Object del_target(store->getRados(), bucket_info, oc.rctx, obj);
11fdf7f2
TL
589 RGWRados::Object::Delete del_op(&del_target);
590
591 del_op.params.bucket_owner = bucket_info.owner;
592 del_op.params.versioning_status = bucket_info.versioning_status();
593 del_op.params.obj_owner = obj_owner;
594 del_op.params.unmod_since = meta.mtime;
595
9f95a23c 596 return del_op.delete_obj(null_yield);
e306af50 597} /* remove_expired_obj */
11fdf7f2
TL
598
599class LCOpAction {
600public:
601 virtual ~LCOpAction() {}
602
603 virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
604 return false;
f6b5b4d7 605 }
11fdf7f2
TL
606
607 /* called after check(). Check should tell us whether this action
608 * is applicable. If there are multiple actions, we'll end up executing
609 * the latest applicable action
610 * For example:
611 * one action after 10 days, another after 20, third after 40.
612 * After 10 days, the latest applicable action would be the first one,
613 * after 20 days it will be the second one. After 21 days it will still be the
614 * second one. So check() should return true for the second action at that point,
615 * but should_process() if the action has already been applied. In object removal
616 * it doesn't matter, but in object transition it does.
617 */
618 virtual bool should_process() {
619 return true;
620 }
621
622 virtual int process(lc_op_ctx& oc) {
623 return 0;
624 }
f6b5b4d7
TL
625
626 friend class LCOpRule;
e306af50 627}; /* LCOpAction */
11fdf7f2
TL
628
629class LCOpFilter {
630public:
631virtual ~LCOpFilter() {}
632 virtual bool check(lc_op_ctx& oc) {
633 return false;
634 }
e306af50 635}; /* LCOpFilter */
11fdf7f2
TL
636
637class LCOpRule {
638 friend class LCOpAction;
639
f6b5b4d7
TL
640 op_env env;
641 boost::optional<std::string> next_key_name;
642 ceph::real_time effective_mtime;
11fdf7f2 643
f6b5b4d7
TL
644 std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
645 std::vector<shared_ptr<LCOpAction> > actions;
11fdf7f2
TL
646
647public:
648 LCOpRule(op_env& _env) : env(_env) {}
649
f6b5b4d7
TL
650 boost::optional<std::string> get_next_key_name() {
651 return next_key_name;
652 }
653
654 std::vector<shared_ptr<LCOpAction>>& get_actions() {
655 return actions;
656 }
657
11fdf7f2 658 void build();
f6b5b4d7
TL
659 void update();
660 int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
661 WorkQ* wq);
e306af50
TL
662}; /* LCOpRule */
663
664using WorkItem =
665 boost::variant<void*,
666 /* out-of-line delete */
f6b5b4d7 667 std::tuple<LCOpRule, rgw_bucket_dir_entry>,
e306af50 668 /* uncompleted MPU expiration */
f6b5b4d7 669 std::tuple<lc_op, rgw_bucket_dir_entry>,
e306af50
TL
670 rgw_bucket_dir_entry>;
671
672class WorkQ : public Thread
673{
674public:
675 using unique_lock = std::unique_lock<std::mutex>;
f6b5b4d7 676 using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
e306af50
TL
677 using dequeue_result = boost::variant<void*, WorkItem>;
678
f6b5b4d7
TL
679 static constexpr uint32_t FLAG_NONE = 0x0000;
680 static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
681 static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
682 static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
683
e306af50 684private:
f6b5b4d7 685 const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
e306af50
TL
686 RGWLC::LCWorker* wk;
687 uint32_t qmax;
f6b5b4d7 688 int ix;
e306af50
TL
689 std::mutex mtx;
690 std::condition_variable cv;
f6b5b4d7 691 uint32_t flags;
e306af50
TL
692 vector<WorkItem> items;
693 work_f f;
694
695public:
696 WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
f6b5b4d7 697 : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
e306af50 698 {
f6b5b4d7 699 create(thr_name().c_str());
e306af50
TL
700 }
701
f6b5b4d7
TL
702 std::string thr_name() {
703 return std::string{"wp_thrd: "}
704 + std::to_string(wk->ix) + ", " + std::to_string(ix);
705 }
706
e306af50
TL
707 void setf(work_f _f) {
708 f = _f;
709 }
710
711 void enqueue(WorkItem&& item) {
712 unique_lock uniq(mtx);
713 while ((!wk->get_lc()->going_down()) &&
714 (items.size() > qmax)) {
f6b5b4d7 715 flags |= FLAG_EWAIT_SYNC;
e306af50
TL
716 cv.wait_for(uniq, 200ms);
717 }
718 items.push_back(item);
f6b5b4d7
TL
719 if (flags & FLAG_DWAIT_SYNC) {
720 flags &= ~FLAG_DWAIT_SYNC;
721 cv.notify_one();
722 }
e306af50
TL
723 }
724
725 void drain() {
726 unique_lock uniq(mtx);
f6b5b4d7
TL
727 flags |= FLAG_EDRAIN_SYNC;
728 while (flags & FLAG_EDRAIN_SYNC) {
e306af50
TL
729 cv.wait_for(uniq, 200ms);
730 }
731 }
732
733private:
734 dequeue_result dequeue() {
735 unique_lock uniq(mtx);
736 while ((!wk->get_lc()->going_down()) &&
737 (items.size() == 0)) {
f6b5b4d7
TL
738 /* clear drain state, as we are NOT doing work and qlen==0 */
739 if (flags & FLAG_EDRAIN_SYNC) {
740 flags &= ~FLAG_EDRAIN_SYNC;
741 }
742 flags |= FLAG_DWAIT_SYNC;
e306af50
TL
743 cv.wait_for(uniq, 200ms);
744 }
745 if (items.size() > 0) {
746 auto item = items.back();
747 items.pop_back();
f6b5b4d7
TL
748 if (flags & FLAG_EWAIT_SYNC) {
749 flags &= ~FLAG_EWAIT_SYNC;
750 cv.notify_one();
751 }
e306af50
TL
752 return {item};
753 }
754 return nullptr;
755 }
756
757 void* entry() override {
758 while (!wk->get_lc()->going_down()) {
759 auto item = dequeue();
760 if (item.which() == 0) {
761 /* going down */
762 break;
763 }
f6b5b4d7 764 f(wk, this, boost::get<WorkItem>(item));
e306af50
TL
765 }
766 return nullptr;
767 }
768}; /* WorkQ */
769
770class RGWLC::WorkPool
771{
772 using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
773 TVector wqs;
774 uint64_t ix;
775
776public:
777 WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
778 : wqs(TVector{
779 n_threads,
780 [&](const size_t ix, auto emplacer) {
781 emplacer.emplace(wk, ix, qmax);
782 }}),
783 ix(0)
784 {}
785
f6b5b4d7
TL
786 ~WorkPool() {
787 for (auto& wq : wqs) {
788 wq.join();
789 }
790 }
791
e306af50
TL
792 void setf(WorkQ::work_f _f) {
793 for (auto& wq : wqs) {
794 wq.setf(_f);
795 }
796 }
797
798 void enqueue(WorkItem item) {
799 const auto tix = ix;
800 ix = (ix+1) % wqs.size();
801 (wqs[tix]).enqueue(std::move(item));
802 }
803
804 void drain() {
805 for (auto& wq : wqs) {
806 wq.drain();
807 }
808 }
809}; /* WorkPool */
810
f6b5b4d7
TL
811RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
812 RGWLC *lc, int ix)
813 : dpp(dpp), cct(cct), lc(lc), ix(ix)
e306af50
TL
814{
815 auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
816 workpool = new WorkPool(this, wpw, 512);
817}
818
f6b5b4d7
TL
819static inline bool worker_should_stop(time_t stop_at, bool once)
820{
821 return !once && stop_at < time(nullptr);
822}
823
e306af50
TL
824int RGWLC::handle_multipart_expiration(
825 RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map,
f6b5b4d7 826 LCWorker* worker, time_t stop_at, bool once)
e306af50
TL
827{
828 MultipartMetaFilter mp_filter;
829 vector<rgw_bucket_dir_entry> objs;
830 bool is_truncated;
831 int ret;
832 RGWBucketInfo& bucket_info = target->get_bucket_info();
833 RGWRados::Bucket::List list_op(target);
834 auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
835 list_op.params.list_versions = false;
836 /* lifecycle processing does not depend on total order, so can
837 * take advantage of unordered listing optimizations--such as
838 * operating on one shard at a time */
839 list_op.params.allow_unordered = true;
840 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
841 list_op.params.filter = &mp_filter;
842
f6b5b4d7
TL
843 auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
844 auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
e306af50
TL
845 auto& [rule, obj] = wt;
846 RGWMPObj mp_obj;
847 if (obj_has_expired(cct, obj.meta.mtime, rule.mp_expiration)) {
848 rgw_obj_key key(obj.key);
849 if (!mp_obj.from_meta(key.name)) {
850 return;
851 }
852 RGWObjectCtx rctx(store);
f6b5b4d7
TL
853 int ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
854 if (ret == 0) {
855 if (perfcounter) {
856 perfcounter->inc(l_rgw_lc_abort_mpu, 1);
857 }
858 } else {
859 if (ret == -ERR_NO_SUCH_UPLOAD) {
860 ldpp_dout(wk->get_lc(), 5)
861 << "ERROR: abort_multipart_upload failed, ret=" << ret
862 << wq->thr_name()
863 << ", meta:" << obj.key
864 << dendl;
865 } else {
866 ldpp_dout(wk->get_lc(), 0)
867 << "ERROR: abort_multipart_upload failed, ret=" << ret
868 << wq->thr_name()
869 << ", meta:" << obj.key
870 << dendl;
871 }
872 } /* abort failed */
e306af50
TL
873 } /* expired */
874 };
875
876 worker->workpool->setf(pf);
877
878 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
879 ++prefix_iter) {
f6b5b4d7
TL
880
881 if (worker_should_stop(stop_at, once)) {
882 ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
883 << worker->ix
884 << dendl;
885 return 0;
886 }
887
e306af50
TL
888 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
889 continue;
890 }
891 list_op.params.prefix = prefix_iter->first;
892 do {
893 objs.clear();
894 list_op.params.marker = list_op.get_next_marker();
895 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
896 if (ret < 0) {
897 if (ret == (-ENOENT))
898 return 0;
899 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
900 return ret;
901 }
902
903 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
f6b5b4d7 904 std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
e306af50
TL
905 {prefix_iter->second, *obj_iter};
906 worker->workpool->enqueue(WorkItem{t1});
907 if (going_down()) {
e306af50
TL
908 return 0;
909 }
910 } /* for objs */
911
912 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
913 } while(is_truncated);
914 } /* for prefix_map */
915
916 worker->workpool->drain();
917 return 0;
918}
919
920static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info,
921 rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
922{
923 RGWRados::Object op_target(store, bucket_info, ctx, obj);
924 RGWRados::Object::Read read_op(&op_target);
925
926 return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
927}
928
929static bool is_valid_op(const lc_op& op)
930{
931 return (op.status &&
932 (op.expiration > 0
933 || op.expiration_date != boost::none
934 || op.noncur_expiration > 0
935 || op.dm_expiration
936 || !op.transitions.empty()
937 || !op.noncur_transitions.empty()));
938}
939
940static inline bool has_all_tags(const lc_op& rule_action,
941 const RGWObjTags& object_tags)
942{
943 if(! rule_action.obj_tags)
944 return false;
945 if(object_tags.count() < rule_action.obj_tags->count())
946 return false;
947 size_t tag_count = 0;
948 for (const auto& tag : object_tags.get_tags()) {
949 const auto& rule_tags = rule_action.obj_tags->get_tags();
950 const auto& iter = rule_tags.find(tag.first);
f6b5b4d7
TL
951 if(iter == rule_tags.end())
952 continue;
e306af50
TL
953 if(iter->second == tag.second)
954 {
955 tag_count++;
956 }
957 /* all tags in the rule appear in obj tags */
958 }
959 return tag_count == rule_action.obj_tags->count();
960}
11fdf7f2
TL
961
962static int check_tags(lc_op_ctx& oc, bool *skip)
963{
964 auto& op = oc.op;
965
966 if (op.obj_tags != boost::none) {
967 *skip = true;
968
969 bufferlist tags_bl;
e306af50
TL
970 int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj,
971 oc.rctx, tags_bl);
11fdf7f2
TL
972 if (ret < 0) {
973 if (ret != -ENODATA) {
f6b5b4d7
TL
974 ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r="
975 << ret << " " << oc.wq->thr_name() << dendl;
11fdf7f2
TL
976 }
977 return 0;
978 }
979 RGWObjTags dest_obj_tags;
980 try {
981 auto iter = tags_bl.cbegin();
982 dest_obj_tags.decode(iter);
983 } catch (buffer::error& err) {
f6b5b4d7
TL
984 ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
985 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
986 return -EIO;
987 }
988
494da23a 989 if (! has_all_tags(op, dest_obj_tags)) {
f6b5b4d7
TL
990 ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj
991 << " as tags do not match in rule: "
992 << op.id << " "
993 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
994 return 0;
995 }
996 }
997 *skip = false;
998 return 0;
999}
1000
1001class LCOpFilter_Tags : public LCOpFilter {
1002public:
1003 bool check(lc_op_ctx& oc) override {
1004 auto& o = oc.o;
1005
1006 if (o.is_delete_marker()) {
1007 return true;
1008 }
1009
1010 bool skip;
1011
1012 int ret = check_tags(oc, &skip);
1013 if (ret < 0) {
1014 if (ret == -ENOENT) {
1015 return false;
1016 }
f6b5b4d7
TL
1017 ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj
1018 << " returned ret=" << ret << " "
1019 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1020 return false;
1021 }
1022
1023 return !skip;
1024 };
1025};
1026
1027class LCOpAction_CurrentExpiration : public LCOpAction {
1028public:
f6b5b4d7
TL
1029 LCOpAction_CurrentExpiration(op_env& env) {}
1030
11fdf7f2
TL
1031 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
1032 auto& o = oc.o;
1033 if (!o.is_current()) {
f6b5b4d7
TL
1034 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
1035 << ": not current, skipping "
1036 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1037 return false;
1038 }
eafe8130 1039 if (o.is_delete_marker()) {
f6b5b4d7
TL
1040 std::string nkn;
1041 if (oc.next_key_name) nkn = *oc.next_key_name;
1042 if (oc.next_has_same_name(o.key.name)) {
1043 ldout(oc.cct, 7) << __func__ << "(): dm-check SAME: key=" << o.key
1044 << " next_key_name: %%" << nkn << "%% "
1045 << oc.wq->thr_name() << dendl;
1046 return false;
eafe8130 1047 } else {
f6b5b4d7
TL
1048 ldout(oc.cct, 7) << __func__ << "(): dm-check DELE: key=" << o.key
1049 << " next_key_name: %%" << nkn << "%% "
1050 << oc.wq->thr_name() << dendl;
eafe8130
TL
1051 *exp_time = real_clock::now();
1052 return true;
1053 }
1054 }
11fdf7f2
TL
1055
1056 auto& mtime = o.meta.mtime;
1057 bool is_expired;
1058 auto& op = oc.op;
1059 if (op.expiration <= 0) {
1060 if (op.expiration_date == boost::none) {
f6b5b4d7
TL
1061 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
1062 << ": no expiration set in rule, skipping "
1063 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1064 return false;
1065 }
e306af50
TL
1066 is_expired = ceph_clock_now() >=
1067 ceph::real_clock::to_time_t(*op.expiration_date);
11fdf7f2
TL
1068 *exp_time = *op.expiration_date;
1069 } else {
1070 is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
1071 }
1072
f6b5b4d7
TL
1073 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1074 << (int)is_expired << " "
1075 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1076 return is_expired;
1077 }
1078
1079 int process(lc_op_ctx& oc) {
1080 auto& o = oc.o;
eafe8130
TL
1081 int r;
1082 if (o.is_delete_marker()) {
1083 r = remove_expired_obj(oc, true);
f6b5b4d7
TL
1084 if (r < 0) {
1085 ldout(oc.cct, 0) << "ERROR: current is-dm remove_expired_obj "
1086 << oc.bucket_info.bucket << ":" << o.key
1087 << " " << cpp_strerror(r) << " "
1088 << oc.wq->thr_name() << dendl;
1089 return r;
1090 }
1091 ldout(oc.cct, 2) << "DELETED: current is-dm "
1092 << oc.bucket_info.bucket << ":" << o.key
1093 << " " << oc.wq->thr_name() << dendl;
eafe8130 1094 } else {
f6b5b4d7 1095 /* ! o.is_delete_marker() */
eafe8130 1096 r = remove_expired_obj(oc, !oc.bucket_info.versioned());
f6b5b4d7
TL
1097 if (r < 0) {
1098 ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
1099 << oc.bucket_info.bucket << ":" << o.key
1100 << " " << cpp_strerror(r) << " "
1101 << oc.wq->thr_name() << dendl;
1102 return r;
1103 }
1104 if (perfcounter) {
1105 perfcounter->inc(l_rgw_lc_expire_current, 1);
1106 }
1107 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
1108 << " " << oc.wq->thr_name() << dendl;
eafe8130 1109 }
11fdf7f2
TL
1110 return 0;
1111 }
1112};
1113
1114class LCOpAction_NonCurrentExpiration : public LCOpAction {
f6b5b4d7 1115protected:
11fdf7f2 1116public:
f6b5b4d7
TL
1117 LCOpAction_NonCurrentExpiration(op_env& env)
1118 {}
1119
11fdf7f2
TL
1120 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
1121 auto& o = oc.o;
1122 if (o.is_current()) {
f6b5b4d7
TL
1123 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
1124 << ": current version, skipping "
1125 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1126 return false;
1127 }
1128
11fdf7f2 1129 int expiration = oc.op.noncur_expiration;
f6b5b4d7
TL
1130 bool is_expired = obj_has_expired(oc.cct, oc.effective_mtime, expiration,
1131 exp_time);
1132
1133 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1134 << is_expired << " "
1135 << oc.wq->thr_name() << dendl;
11fdf7f2 1136
e306af50
TL
1137 return is_expired &&
1138 pass_object_lock_check(oc.store->getRados(),
1139 oc.bucket_info, oc.obj, oc.rctx);
11fdf7f2
TL
1140 }
1141
1142 int process(lc_op_ctx& oc) {
1143 auto& o = oc.o;
1144 int r = remove_expired_obj(oc, true);
1145 if (r < 0) {
9f95a23c 1146 ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) "
f6b5b4d7
TL
1147 << oc.bucket_info.bucket << ":" << o.key
1148 << " " << cpp_strerror(r)
1149 << " " << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1150 return r;
1151 }
f6b5b4d7
TL
1152 if (perfcounter) {
1153 perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
1154 }
1155 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
1156 << " (non-current expiration) "
1157 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1158 return 0;
1159 }
1160};
1161
1162class LCOpAction_DMExpiration : public LCOpAction {
1163public:
f6b5b4d7
TL
1164 LCOpAction_DMExpiration(op_env& env) {}
1165
11fdf7f2
TL
1166 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
1167 auto& o = oc.o;
1168 if (!o.is_delete_marker()) {
f6b5b4d7
TL
1169 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
1170 << ": not a delete marker, skipping "
1171 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1172 return false;
1173 }
f6b5b4d7
TL
1174 if (oc.next_has_same_name(o.key.name)) {
1175 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
1176 << ": next is same object, skipping "
1177 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1178 return false;
1179 }
1180
1181 *exp_time = real_clock::now();
1182
1183 return true;
1184 }
1185
1186 int process(lc_op_ctx& oc) {
1187 auto& o = oc.o;
1188 int r = remove_expired_obj(oc, true);
1189 if (r < 0) {
9f95a23c 1190 ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
f6b5b4d7
TL
1191 << oc.bucket_info.bucket << ":" << o.key
1192 << " " << cpp_strerror(r)
1193 << " " << oc.wq->thr_name()
1194 << dendl;
11fdf7f2
TL
1195 return r;
1196 }
f6b5b4d7
TL
1197 if (perfcounter) {
1198 perfcounter->inc(l_rgw_lc_expire_dm, 1);
1199 }
1200 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key
1201 << " (delete marker expiration) "
1202 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1203 return 0;
1204 }
1205};
1206
1207class LCOpAction_Transition : public LCOpAction {
1208 const transition_action& transition;
1209 bool need_to_process{false};
1210
1211protected:
1212 virtual bool check_current_state(bool is_current) = 0;
1213 virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
1214public:
e306af50
TL
1215 LCOpAction_Transition(const transition_action& _transition)
1216 : transition(_transition) {}
11fdf7f2
TL
1217
1218 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
1219 auto& o = oc.o;
1220
1221 if (o.is_delete_marker()) {
1222 return false;
1223 }
1224
1225 if (!check_current_state(o.is_current())) {
1226 return false;
1227 }
1228
1229 auto mtime = get_effective_mtime(oc);
1230 bool is_expired;
9f95a23c 1231 if (transition.days < 0) {
11fdf7f2 1232 if (transition.date == boost::none) {
f6b5b4d7
TL
1233 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key
1234 << ": no transition day/date set in rule, skipping "
1235 << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1236 return false;
1237 }
e306af50
TL
1238 is_expired = ceph_clock_now() >=
1239 ceph::real_clock::to_time_t(*transition.date);
11fdf7f2
TL
1240 *exp_time = *transition.date;
1241 } else {
1242 is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
1243 }
1244
f6b5b4d7
TL
1245 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1246 << is_expired << " "
1247 << oc.wq->thr_name() << dendl;
11fdf7f2 1248
e306af50
TL
1249 need_to_process =
1250 (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
1251 transition.storage_class);
11fdf7f2
TL
1252
1253 return is_expired;
1254 }
1255
1256 bool should_process() override {
1257 return need_to_process;
1258 }
1259
1260 int process(lc_op_ctx& oc) {
1261 auto& o = oc.o;
1262
1263 rgw_placement_rule target_placement;
1264 target_placement.inherit_from(oc.bucket_info.placement_rule);
1265 target_placement.storage_class = transition.storage_class;
1266
e306af50
TL
1267 if (!oc.store->svc()->zone->get_zone_params().
1268 valid_placement(target_placement)) {
f6b5b4d7
TL
1269 ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
1270 << target_placement
9f95a23c 1271 << " bucket="<< oc.bucket_info.bucket
f6b5b4d7
TL
1272 << " rule_id=" << oc.op.id
1273 << " " << oc.wq->thr_name() << dendl;
eafe8130
TL
1274 return -EINVAL;
1275 }
1276
e306af50
TL
1277 int r = oc.store->getRados()->transition_obj(
1278 oc.rctx, oc.bucket_info, oc.obj, target_placement, o.meta.mtime,
1279 o.versioned_epoch, oc.dpp, null_yield);
11fdf7f2 1280 if (r < 0) {
9f95a23c 1281 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
f6b5b4d7
TL
1282 << oc.bucket_info.bucket << ":" << o.key
1283 << " -> " << transition.storage_class
1284 << " " << cpp_strerror(r)
1285 << " " << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1286 return r;
1287 }
f6b5b4d7
TL
1288 ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket
1289 << ":" << o.key << " -> "
1290 << transition.storage_class
1291 << " " << oc.wq->thr_name() << dendl;
11fdf7f2
TL
1292 return 0;
1293 }
1294};
1295
1296class LCOpAction_CurrentTransition : public LCOpAction_Transition {
1297protected:
1298 bool check_current_state(bool is_current) override {
1299 return is_current;
1300 }
1301
1302 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1303 return oc.o.meta.mtime;
1304 }
1305public:
e306af50
TL
1306 LCOpAction_CurrentTransition(const transition_action& _transition)
1307 : LCOpAction_Transition(_transition) {}
f6b5b4d7
TL
1308 int process(lc_op_ctx& oc) {
1309 int r = LCOpAction_Transition::process(oc);
1310 if (r == 0) {
1311 if (perfcounter) {
1312 perfcounter->inc(l_rgw_lc_transition_current, 1);
1313 }
1314 }
1315 return r;
1316 }
11fdf7f2
TL
1317};
1318
1319class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
1320protected:
1321 bool check_current_state(bool is_current) override {
1322 return !is_current;
1323 }
1324
1325 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
f6b5b4d7 1326 return oc.effective_mtime;
11fdf7f2
TL
1327 }
1328public:
f6b5b4d7
TL
1329 LCOpAction_NonCurrentTransition(op_env& env,
1330 const transition_action& _transition)
1331 : LCOpAction_Transition(_transition)
1332 {}
1333 int process(lc_op_ctx& oc) {
1334 int r = LCOpAction_Transition::process(oc);
1335 if (r == 0) {
1336 if (perfcounter) {
1337 perfcounter->inc(l_rgw_lc_transition_noncurrent, 1);
1338 }
1339 }
1340 return r;
1341 }
11fdf7f2
TL
1342};
1343
1344void LCOpRule::build()
1345{
1346 filters.emplace_back(new LCOpFilter_Tags);
1347
1348 auto& op = env.op;
1349
1350 if (op.expiration > 0 ||
1351 op.expiration_date != boost::none) {
f6b5b4d7 1352 actions.emplace_back(new LCOpAction_CurrentExpiration(env));
11fdf7f2
TL
1353 }
1354
1355 if (op.dm_expiration) {
f6b5b4d7 1356 actions.emplace_back(new LCOpAction_DMExpiration(env));
11fdf7f2
TL
1357 }
1358
1359 if (op.noncur_expiration > 0) {
f6b5b4d7 1360 actions.emplace_back(new LCOpAction_NonCurrentExpiration(env));
11fdf7f2
TL
1361 }
1362
1363 for (auto& iter : op.transitions) {
1364 actions.emplace_back(new LCOpAction_CurrentTransition(iter.second));
1365 }
1366
1367 for (auto& iter : op.noncur_transitions) {
f6b5b4d7 1368 actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second));
11fdf7f2
TL
1369 }
1370}
1371
f6b5b4d7 1372void LCOpRule::update()
11fdf7f2 1373{
f6b5b4d7
TL
1374 next_key_name = env.ol.next_key_name();
1375 effective_mtime = env.ol.get_prev_obj().meta.mtime;
1376}
11fdf7f2 1377
f6b5b4d7
TL
1378int LCOpRule::process(rgw_bucket_dir_entry& o,
1379 const DoutPrefixProvider *dpp,
1380 WorkQ* wq)
1381{
1382 lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq);
1383 shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
11fdf7f2
TL
1384 real_time exp;
1385
1386 for (auto& a : actions) {
1387 real_time action_exp;
1388
1389 if (a->check(ctx, &action_exp)) {
1390 if (action_exp > exp) {
1391 exp = action_exp;
1392 selected = &a;
1393 }
1394 }
1395 }
1396
1397 if (selected &&
1398 (*selected)->should_process()) {
1399
1400 /*
1401 * Calling filter checks after action checks because
1402 * all action checks (as they are implemented now) do
1403 * not access the objects themselves, but return result
1404 * from info from bucket index listing. The current tags filter
1405 * check does access the objects, so we avoid unnecessary rados calls
1406 * having filters check later in the process.
1407 */
1408
1409 bool cont = false;
1410 for (auto& f : filters) {
1411 if (f->check(ctx)) {
1412 cont = true;
1413 break;
1414 }
1415 }
1416
1417 if (!cont) {
f6b5b4d7
TL
1418 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1419 << ": no rule match, skipping "
1420 << " " << wq->thr_name() << dendl;
11fdf7f2
TL
1421 return 0;
1422 }
1423
1424 int r = (*selected)->process(ctx);
1425 if (r < 0) {
9f95a23c 1426 ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
f6b5b4d7
TL
1427 << env.bucket_info.bucket << ":" << o.key
1428 << " " << cpp_strerror(r)
1429 << " " << wq->thr_name() << dendl;
11fdf7f2
TL
1430 return r;
1431 }
f6b5b4d7
TL
1432 ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":"
1433 << o.key << " " << wq->thr_name() << dendl;
11fdf7f2
TL
1434 }
1435
1436 return 0;
1437
1438}
1439
f6b5b4d7
TL
1440int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
1441 time_t stop_at, bool once)
7c673cae
FG
1442{
1443 RGWLifecycleConfiguration config(cct);
1444 RGWBucketInfo bucket_info;
1445 map<string, bufferlist> bucket_attrs;
11fdf7f2 1446 string no_ns, list_versions;
7c673cae 1447 vector<rgw_bucket_dir_entry> objs;
7c673cae
FG
1448 vector<std::string> result;
1449 boost::split(result, shard_id, boost::is_any_of(":"));
1450 string bucket_tenant = result[0];
1451 string bucket_name = result[1];
11fdf7f2 1452 string bucket_marker = result[2];
e306af50
TL
1453 int ret = store->getRados()->get_bucket_info(
1454 store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield,
1455 &bucket_attrs);
7c673cae 1456 if (ret < 0) {
f6b5b4d7
TL
1457 ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
1458 << " failed" << dendl;
7c673cae
FG
1459 return ret;
1460 }
1461
f6b5b4d7
TL
1462 auto stack_guard = make_scope_guard(
1463 [&worker, &bucket_info]
1464 {
1465 worker->workpool->drain();
1466 }
1467 );
1468
11fdf7f2 1469 if (bucket_info.bucket.marker != bucket_marker) {
f6b5b4d7
TL
1470 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
1471 << bucket_tenant << ":" << bucket_name
1472 << " cur_marker=" << bucket_info.bucket.marker
11fdf7f2 1473 << " orig_marker=" << bucket_marker << dendl;
7c673cae
FG
1474 return -ENOENT;
1475 }
1476
9f95a23c 1477 RGWRados::Bucket target(store->getRados(), bucket_info);
7c673cae
FG
1478
1479 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
1480 if (aiter == bucket_attrs.end())
1481 return 0;
1482
11fdf7f2 1483 bufferlist::const_iterator iter{&aiter->second};
7c673cae
FG
1484 try {
1485 config.decode(iter);
1486 } catch (const buffer::error& e) {
f6b5b4d7
TL
1487 ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed"
1488 << dendl;
7c673cae
FG
1489 return -1;
1490 }
1491
f6b5b4d7 1492 auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
e306af50 1493 auto wt =
f6b5b4d7 1494 boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
e306af50 1495 auto& [op_rule, o] = wt;
f6b5b4d7 1496
e306af50 1497 ldpp_dout(wk->get_lc(), 20)
f6b5b4d7
TL
1498 << __func__ << "(): key=" << o.key << wq->thr_name()
1499 << dendl;
1500 int ret = op_rule.process(o, wk->dpp, wq);
e306af50
TL
1501 if (ret < 0) {
1502 ldpp_dout(wk->get_lc(), 20)
1503 << "ERROR: orule.process() returned ret=" << ret
f6b5b4d7 1504 << wq->thr_name()
e306af50
TL
1505 << dendl;
1506 }
1507 };
1508 worker->workpool->setf(pf);
494da23a 1509
e306af50 1510 multimap<string, lc_op>& prefix_map = config.get_prefix_map();
494da23a
TL
1511 ldpp_dout(this, 10) << __func__ << "() prefix_map size="
1512 << prefix_map.size()
1513 << dendl;
1514
11fdf7f2
TL
1515 rgw_obj_key pre_marker;
1516 rgw_obj_key next_marker;
e306af50
TL
1517 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
1518 ++prefix_iter) {
f6b5b4d7
TL
1519
1520 if (worker_should_stop(stop_at, once)) {
1521 ldout(cct, 5) << __func__ << " interval budget EXPIRED worker "
1522 << worker->ix
1523 << dendl;
1524 return 0;
1525 }
1526
11fdf7f2
TL
1527 auto& op = prefix_iter->second;
1528 if (!is_valid_op(op)) {
1529 continue;
1530 }
f6b5b4d7
TL
1531 ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
1532 << dendl;
11fdf7f2 1533 if (prefix_iter != prefix_map.begin() &&
e306af50
TL
1534 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
1535 prev(prefix_iter)->first) == 0)) {
11fdf7f2
TL
1536 next_marker = pre_marker;
1537 } else {
1538 pre_marker = next_marker;
1539 }
494da23a
TL
1540
1541 LCObjsLister ol(store, bucket_info);
11fdf7f2 1542 ol.set_prefix(prefix_iter->first);
7c673cae 1543
11fdf7f2 1544 ret = ol.init();
11fdf7f2
TL
1545 if (ret < 0) {
1546 if (ret == (-ENOENT))
1547 return 0;
1548 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
1549 return ret;
7c673cae 1550 }
7c673cae 1551
e306af50 1552 op_env oenv(op, store, worker, bucket_info, ol);
11fdf7f2 1553 LCOpRule orule(oenv);
e306af50 1554 orule.build(); // why can't ctor do it?
e306af50
TL
1555 rgw_bucket_dir_entry* o{nullptr};
1556 for (; ol.get_obj(&o /* , fetch_barrier */); ol.next()) {
f6b5b4d7
TL
1557 orule.update();
1558 std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
e306af50 1559 worker->workpool->enqueue(WorkItem{t1});
7c673cae 1560 }
e306af50 1561 worker->workpool->drain();
7c673cae
FG
1562 }
1563
f6b5b4d7 1564 ret = handle_multipart_expiration(&target, prefix_map, worker, stop_at, once);
7c673cae
FG
1565 return ret;
1566}
1567
e306af50 1568int RGWLC::bucket_lc_post(int index, int max_lock_sec,
f6b5b4d7 1569 cls_rgw_lc_entry& entry, int& result,
e306af50 1570 LCWorker* worker)
7c673cae
FG
1571{
1572 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
1573
1574 rados::cls::lock::Lock l(lc_index_lock_name);
1575 l.set_cookie(cookie);
1576 l.set_duration(lock_duration);
1577
f6b5b4d7
TL
1578 dout(5) << "RGWLC::bucket_lc_post(): POST " << entry
1579 << " index: " << index << " worker ix: " << worker->ix
1580 << dendl;
1581
7c673cae 1582 do {
e306af50
TL
1583 int ret = l.lock_exclusive(
1584 &store->getRados()->lc_pool_ctx, obj_names[index]);
f6b5b4d7
TL
1585 if (ret == -EBUSY || ret == -EEXIST) {
1586 /* already locked by another lc processor */
11fdf7f2 1587 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
f6b5b4d7 1588 << obj_names[index] << ", sleep 5, try again " << dendl;
7c673cae
FG
1589 sleep(5);
1590 continue;
1591 }
1592 if (ret < 0)
1593 return 0;
f6b5b4d7
TL
1594 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
1595 << dendl;
7c673cae 1596 if (result == -ENOENT) {
e306af50
TL
1597 ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx,
1598 obj_names[index], entry);
7c673cae 1599 if (ret < 0) {
11fdf7f2
TL
1600 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1601 << obj_names[index] << dendl;
7c673cae 1602 }
31f18b77 1603 goto clean;
7c673cae 1604 } else if (result < 0) {
f6b5b4d7 1605 entry.status = lc_failed;
7c673cae 1606 } else {
f6b5b4d7 1607 entry.status = lc_complete;
7c673cae
FG
1608 }
1609
e306af50
TL
1610 ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
1611 obj_names[index], entry);
7c673cae 1612 if (ret < 0) {
11fdf7f2
TL
1613 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1614 << obj_names[index] << dendl;
7c673cae
FG
1615 }
1616clean:
9f95a23c 1617 l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
f6b5b4d7
TL
1618 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
1619 << obj_names[index] << dendl;
7c673cae
FG
1620 return 0;
1621 } while (true);
1622}
1623
f6b5b4d7
TL
1624int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
1625 vector<cls_rgw_lc_entry>& progress_map,
1626 int& index)
7c673cae 1627{
f6b5b4d7
TL
1628 progress_map.clear();
1629 for(; index < max_objs; index++, marker="") {
1630 vector<cls_rgw_lc_entry> entries;
e306af50
TL
1631 int ret =
1632 cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker,
1633 max_entries, entries);
31f18b77
FG
1634 if (ret < 0) {
1635 if (ret == -ENOENT) {
11fdf7f2 1636 ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
31f18b77
FG
1637 << obj_names[index] << dendl;
1638 continue;
1639 } else {
1640 return ret;
1641 }
1642 }
f6b5b4d7
TL
1643 progress_map.reserve(progress_map.size() + entries.size());
1644 progress_map.insert(progress_map.end(), entries.begin(), entries.end());
1645
1646 /* update index, marker tuple */
1647 if (progress_map.size() > 0)
1648 marker = progress_map.back().bucket;
1649
1650 if (progress_map.size() >= max_entries)
1651 break;
7c673cae
FG
1652 }
1653 return 0;
1654}
1655
e306af50 1656static inline vector<int> random_sequence(uint32_t n)
7c673cae 1657{
e306af50
TL
1658 vector<int> v(n-1, 0);
1659 std::generate(v.begin(), v.end(),
1660 [ix = 0]() mutable {
1661 return ix++;
1662 });
1663 std::random_shuffle(v.begin(), v.end());
1664 return v;
1665}
7c673cae 1666
f6b5b4d7 1667int RGWLC::process(LCWorker* worker, bool once = false)
e306af50
TL
1668{
1669 int max_secs = cct->_conf->rgw_lc_lock_max_time;
7c673cae 1670
e306af50
TL
1671 /* generate an index-shard sequence unrelated to any other
1672 * that might be running in parallel */
1673 vector<int> shard_seq = random_sequence(max_objs);
1674 for (auto index : shard_seq) {
f6b5b4d7 1675 int ret = process(index, max_secs, worker, once);
7c673cae
FG
1676 if (ret < 0)
1677 return ret;
1678 }
1679
1680 return 0;
1681}
1682
f6b5b4d7 1683bool RGWLC::expired_session(time_t started)
7c673cae 1684{
f6b5b4d7
TL
1685 time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
1686 ? cct->_conf->rgw_lc_debug_interval
1687 : 24*60*60;
1688
1689 auto now = time(nullptr);
1690
1691 dout(16) << "RGWLC::expired_session"
1692 << " started: " << started
1693 << " interval: " << interval << "(*2==" << 2*interval << ")"
1694 << " now: " << now
1695 << dendl;
1696
1697 return (started + 2*interval < now);
1698}
1699
1700time_t RGWLC::thread_stop_at()
1701{
1702 uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
1703 ? cct->_conf->rgw_lc_debug_interval
1704 : 24*60*60;
1705
1706 return time(nullptr) + interval;
1707}
1708
1709int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
1710 bool once = false)
1711{
1712 dout(5) << "RGWLC::process(): ENTER: "
1713 << "index: " << index << " worker ix: " << worker->ix
1714 << dendl;
1715
7c673cae
FG
1716 rados::cls::lock::Lock l(lc_index_lock_name);
1717 do {
1718 utime_t now = ceph_clock_now();
f6b5b4d7
TL
1719 //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
1720 cls_rgw_lc_entry entry;
7c673cae
FG
1721 if (max_lock_secs <= 0)
1722 return -EAGAIN;
1723
1724 utime_t time(max_lock_secs, 0);
1725 l.set_duration(time);
1726
e306af50
TL
1727 int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx,
1728 obj_names[index]);
f6b5b4d7
TL
1729 if (ret == -EBUSY || ret == -EEXIST) {
1730 /* already locked by another lc processor */
11fdf7f2
TL
1731 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
1732 << obj_names[index] << ", sleep 5, try again" << dendl;
7c673cae
FG
1733 sleep(5);
1734 continue;
1735 }
1736 if (ret < 0)
1737 return 0;
1738
7c673cae 1739 cls_rgw_lc_obj_head head;
e306af50
TL
1740 ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index],
1741 head);
7c673cae 1742 if (ret < 0) {
11fdf7f2
TL
1743 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
1744 << obj_names[index] << ", ret=" << ret << dendl;
7c673cae
FG
1745 goto exit;
1746 }
1747
f6b5b4d7
TL
1748 if (! (cct->_conf->rgw_lc_lock_max_time == 9969)) {
1749 ret = cls_rgw_lc_get_entry(store->getRados()->lc_pool_ctx,
1750 obj_names[index], head.marker, entry);
1751 if (ret >= 0) {
1752 if (entry.status == lc_processing) {
1753 if (expired_session(entry.start_time)) {
1754 dout(5) << "RGWLC::process(): STALE lc session found for: " << entry
1755 << " index: " << index << " worker ix: " << worker->ix
1756 << " (clearing)"
1757 << dendl;
1758 } else {
1759 dout(5) << "RGWLC::process(): ACTIVE entry: " << entry
1760 << " index: " << index << " worker ix: " << worker->ix
1761 << dendl;
1762 goto exit;
1763 }
1764 }
1765 }
1766 }
1767
1768 if(!if_already_run_today(head.start_date) ||
1769 once) {
7c673cae
FG
1770 head.start_date = now;
1771 head.marker.clear();
e306af50 1772 ret = bucket_lc_prepare(index, worker);
7c673cae 1773 if (ret < 0) {
11fdf7f2 1774 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
e306af50
TL
1775 << obj_names[index]
1776 << ", ret=" << ret
1777 << dendl;
7c673cae
FG
1778 goto exit;
1779 }
1780 }
1781
e306af50
TL
1782 ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx,
1783 obj_names[index], head.marker, entry);
7c673cae 1784 if (ret < 0) {
11fdf7f2
TL
1785 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
1786 << obj_names[index] << dendl;
7c673cae
FG
1787 goto exit;
1788 }
1789
e306af50 1790 /* termination condition (eof) */
f6b5b4d7 1791 if (entry.bucket.empty())
7c673cae
FG
1792 goto exit;
1793
f6b5b4d7
TL
1794 ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
1795 << " index: " << index << " worker ix: " << worker->ix
1796 << dendl;
1797
1798 entry.status = lc_processing;
e306af50 1799 ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx,
f6b5b4d7 1800 obj_names[index], entry);
7c673cae 1801 if (ret < 0) {
e306af50 1802 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
f6b5b4d7 1803 << obj_names[index] << entry.bucket << entry.status << dendl;
7c673cae
FG
1804 goto exit;
1805 }
1806
f6b5b4d7
TL
1807 head.marker = entry.bucket;
1808 ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx,
1809 obj_names[index], head);
7c673cae 1810 if (ret < 0) {
e306af50
TL
1811 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
1812 << obj_names[index]
f6b5b4d7 1813 << dendl;
7c673cae
FG
1814 goto exit;
1815 }
f6b5b4d7
TL
1816
1817 ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
1818 << " index: " << index << " worker ix: " << worker->ix
1819 << dendl;
1820
9f95a23c 1821 l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
f6b5b4d7 1822 ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
e306af50 1823 bucket_lc_post(index, max_lock_secs, entry, ret, worker);
f6b5b4d7
TL
1824 } while(1 && !once);
1825
1826 return 0;
3efd9988 1827
7c673cae 1828exit:
f6b5b4d7
TL
1829 l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
1830 return 0;
7c673cae
FG
1831}
1832
1833void RGWLC::start_processor()
1834{
e306af50
TL
1835 auto maxw = cct->_conf->rgw_lc_max_worker;
1836 workers.reserve(maxw);
1837 for (int ix = 0; ix < maxw; ++ix) {
1838 auto worker =
f6b5b4d7 1839 std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
e306af50
TL
1840 worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
1841 workers.emplace_back(std::move(worker));
1842 }
7c673cae
FG
1843}
1844
1845void RGWLC::stop_processor()
1846{
1847 down_flag = true;
e306af50 1848 for (auto& worker : workers) {
7c673cae
FG
1849 worker->stop();
1850 worker->join();
1851 }
e306af50 1852 workers.clear();
7c673cae
FG
1853}
1854
11fdf7f2
TL
1855unsigned RGWLC::get_subsys() const
1856{
1857 return dout_subsys;
1858}
1859
1860std::ostream& RGWLC::gen_prefix(std::ostream& out) const
1861{
1862 return out << "lifecycle: ";
1863}
1864
7c673cae
FG
1865void RGWLC::LCWorker::stop()
1866{
9f95a23c
TL
1867 std::lock_guard l{lock};
1868 cond.notify_all();
7c673cae
FG
1869}
1870
1871bool RGWLC::going_down()
1872{
1873 return down_flag;
1874}
1875
1876bool RGWLC::LCWorker::should_work(utime_t& now)
1877{
1878 int start_hour;
1879 int start_minute;
1880 int end_hour;
1881 int end_minute;
1882 string worktime = cct->_conf->rgw_lifecycle_work_time;
e306af50
TL
1883 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute,
1884 &end_hour, &end_minute);
7c673cae
FG
1885 struct tm bdt;
1886 time_t tt = now.sec();
1887 localtime_r(&tt, &bdt);
1888
1889 if (cct->_conf->rgw_lc_debug_interval > 0) {
1890 /* We're debugging, so say we can run */
1891 return true;
1892 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
1893 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
1894 return true;
1895 } else {
1896 return false;
1897 }
1898
1899}
1900
1901int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
1902{
11fdf7f2
TL
1903 int secs;
1904
7c673cae 1905 if (cct->_conf->rgw_lc_debug_interval > 0) {
11fdf7f2 1906 secs = start + cct->_conf->rgw_lc_debug_interval - now;
7c673cae
FG
1907 if (secs < 0)
1908 secs = 0;
1909 return (secs);
1910 }
1911
1912 int start_hour;
1913 int start_minute;
1914 int end_hour;
1915 int end_minute;
1916 string worktime = cct->_conf->rgw_lifecycle_work_time;
e306af50
TL
1917 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour,
1918 &end_minute);
7c673cae
FG
1919 struct tm bdt;
1920 time_t tt = now.sec();
1921 time_t nt;
1922 localtime_r(&tt, &bdt);
1923 bdt.tm_hour = start_hour;
1924 bdt.tm_min = start_minute;
1925 bdt.tm_sec = 0;
1926 nt = mktime(&bdt);
11fdf7f2
TL
1927 secs = nt - tt;
1928
1929 return secs>0 ? secs : secs+24*60*60;
1930}
1931
e306af50
TL
1932RGWLC::LCWorker::~LCWorker()
1933{
e306af50
TL
1934 delete workpool;
1935} /* ~LCWorker */
1936
1937void RGWLifecycleConfiguration::generate_test_instances(
1938 list<RGWLifecycleConfiguration*>& o)
11fdf7f2
TL
1939{
1940 o.push_back(new RGWLifecycleConfiguration);
1941}
1942
f6b5b4d7
TL
1943static inline void get_lc_oid(CephContext *cct,
1944 const string& shard_id, string *oid)
11fdf7f2 1945{
e306af50
TL
1946 int max_objs =
1947 (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
1948 cct->_conf->rgw_lc_max_objs);
f6b5b4d7 1949 /* n.b. review hash algo */
e306af50
TL
1950 int index = ceph_str_hash_linux(shard_id.c_str(),
1951 shard_id.size()) % HASH_PRIME % max_objs;
11fdf7f2
TL
1952 *oid = lc_oid_prefix;
1953 char buf[32];
1954 snprintf(buf, 32, ".%d", index);
1955 oid->append(buf);
1956 return;
1957}
1958
11fdf7f2
TL
1959static std::string get_lc_shard_name(const rgw_bucket& bucket){
1960 return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
1961}
1962
1963template<typename F>
f6b5b4d7
TL
1964static int guard_lc_modify(rgw::sal::RGWRadosStore* store,
1965 const rgw_bucket& bucket, const string& cookie,
1966 const F& f) {
11fdf7f2
TL
1967 CephContext *cct = store->ctx();
1968
1969 string shard_id = get_lc_shard_name(bucket);
1970
1971 string oid;
1972 get_lc_oid(cct, shard_id, &oid);
1973
f6b5b4d7
TL
1974 /* XXX it makes sense to take shard_id for a bucket_id? */
1975 cls_rgw_lc_entry entry;
1976 entry.bucket = shard_id;
1977 entry.status = lc_uninitial;
11fdf7f2 1978 int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
7c673cae 1979
11fdf7f2
TL
1980 rados::cls::lock::Lock l(lc_index_lock_name);
1981 utime_t time(max_lock_secs, 0);
1982 l.set_duration(time);
1983 l.set_cookie(cookie);
1984
9f95a23c 1985 librados::IoCtx *ctx = store->getRados()->get_lc_pool_ctx();
11fdf7f2
TL
1986 int ret;
1987
1988 do {
1989 ret = l.lock_exclusive(ctx, oid);
1990 if (ret == -EBUSY || ret == -EEXIST) {
1991 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1992 << oid << ", sleep 5, try again" << dendl;
1993 sleep(5); // XXX: return retryable error
1994 continue;
1995 }
1996 if (ret < 0) {
1997 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1998 << oid << ", ret=" << ret << dendl;
1999 break;
2000 }
2001 ret = f(ctx, oid, entry);
2002 if (ret < 0) {
2003 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on "
2004 << oid << ", ret=" << ret << dendl;
2005 }
2006 break;
2007 } while(true);
2008 l.unlock(ctx, oid);
2009 return ret;
2010}
2011
2012int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
2013 const map<string, bufferlist>& bucket_attrs,
2014 RGWLifecycleConfiguration *config)
2015{
2016 map<string, bufferlist> attrs = bucket_attrs;
494da23a
TL
2017 bufferlist lc_bl;
2018 config->encode(lc_bl);
2019
2020 attrs[RGW_ATTR_LC] = std::move(lc_bl);
2021
e306af50
TL
2022 int ret =
2023 store->ctl()->bucket->set_bucket_instance_attrs(
2024 bucket_info, attrs, &bucket_info.objv_tracker, null_yield);
11fdf7f2
TL
2025 if (ret < 0)
2026 return ret;
2027
2028 rgw_bucket& bucket = bucket_info.bucket;
2029
f6b5b4d7 2030
e306af50
TL
2031 ret = guard_lc_modify(store, bucket, cookie,
2032 [&](librados::IoCtx *ctx, const string& oid,
f6b5b4d7 2033 const cls_rgw_lc_entry& entry) {
11fdf7f2
TL
2034 return cls_rgw_lc_set_entry(*ctx, oid, entry);
2035 });
2036
2037 return ret;
7c673cae
FG
2038}
2039
11fdf7f2
TL
2040int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
2041 const map<string, bufferlist>& bucket_attrs)
2042{
2043 map<string, bufferlist> attrs = bucket_attrs;
2044 attrs.erase(RGW_ATTR_LC);
e306af50
TL
2045 int ret =
2046 store->ctl()->bucket->set_bucket_instance_attrs(
2047 bucket_info, attrs, &bucket_info.objv_tracker, null_yield);
11fdf7f2
TL
2048
2049 rgw_bucket& bucket = bucket_info.bucket;
2050
2051 if (ret < 0) {
2052 ldout(cct, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
2053 << bucket.name << " returned err=" << ret << dendl;
2054 return ret;
2055 }
2056
2057
e306af50
TL
2058 ret = guard_lc_modify(store, bucket, cookie,
2059 [&](librados::IoCtx *ctx, const string& oid,
f6b5b4d7 2060 const cls_rgw_lc_entry& entry) {
11fdf7f2
TL
2061 return cls_rgw_lc_rm_entry(*ctx, oid, entry);
2062 });
2063
2064 return ret;
e306af50
TL
2065} /* RGWLC::remove_bucket_config */
2066
2067RGWLC::~RGWLC()
2068{
2069 stop_processor();
2070 finalize();
2071} /* ~RGWLC() */
11fdf7f2
TL
2072
2073namespace rgw::lc {
2074
e306af50
TL
2075int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store,
2076 const RGWBucketInfo& bucket_info,
11fdf7f2
TL
2077 const map<std::string,bufferlist>& battrs)
2078{
2079 if (auto aiter = battrs.find(RGW_ATTR_LC);
2080 aiter == battrs.end()) {
2081 return 0; // No entry, nothing to fix
2082 }
2083
2084 auto shard_name = get_lc_shard_name(bucket_info.bucket);
2085 std::string lc_oid;
2086 get_lc_oid(store->ctx(), shard_name, &lc_oid);
2087
f6b5b4d7 2088 cls_rgw_lc_entry entry;
11fdf7f2
TL
2089 // There are multiple cases we need to encounter here
2090 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
2091 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
2092 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
2093 // We are not dropping the old marker here as that would be caught by the next LC process update
9f95a23c 2094 auto lc_pool_ctx = store->getRados()->get_lc_pool_ctx();
11fdf7f2
TL
2095 int ret = cls_rgw_lc_get_entry(*lc_pool_ctx,
2096 lc_oid, shard_name, entry);
2097 if (ret == 0) {
2098 ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl;
2099 return ret; // entry is already existing correctly set to marker
2100 }
2101 ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl;
2102 if (ret == -ENOENT) {
2103 ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name
2104 << " creating " << dendl;
2105 // TODO: we have too many ppl making cookies like this!
2106 char cookie_buf[COOKIE_LEN + 1];
2107 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
2108 std::string cookie = cookie_buf;
2109
e306af50
TL
2110 ret = guard_lc_modify(
2111 store, bucket_info.bucket, cookie,
f6b5b4d7
TL
2112 [&lc_pool_ctx, &lc_oid](librados::IoCtx* ctx,
2113 const string& oid,
2114 const cls_rgw_lc_entry& entry) {
e306af50
TL
2115 return cls_rgw_lc_set_entry(*lc_pool_ctx, lc_oid, entry);
2116 });
11fdf7f2
TL
2117
2118 }
2119
2120 return ret;
2121}
2122
9f95a23c
TL
2123std::string s3_expiration_header(
2124 DoutPrefixProvider* dpp,
2125 const rgw_obj_key& obj_key,
2126 const RGWObjTags& obj_tagset,
2127 const ceph::real_time& mtime,
2128 const std::map<std::string, buffer::list>& bucket_attrs)
2129{
2130 CephContext* cct = dpp->get_cct();
2131 RGWLifecycleConfiguration config(cct);
2132 std::string hdr{""};
2133
2134 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2135 if (aiter == bucket_attrs.end())
2136 return hdr;
2137
2138 bufferlist::const_iterator iter{&aiter->second};
2139 try {
2140 config.decode(iter);
2141 } catch (const buffer::error& e) {
2142 ldpp_dout(dpp, 0) << __func__
2143 << "() decode life cycle config failed"
2144 << dendl;
2145 return hdr;
2146 } /* catch */
2147
2148 /* dump tags at debug level 16 */
2149 RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags();
2150 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) {
2151 for (const auto& elt : obj_tag_map) {
2152 ldout(cct, 16) << __func__
2153 << "() key=" << elt.first << " val=" << elt.second
2154 << dendl;
2155 }
2156 }
2157
2158 boost::optional<ceph::real_time> expiration_date;
2159 boost::optional<std::string> rule_id;
2160
2161 const auto& rule_map = config.get_rule_map();
2162 for (const auto& ri : rule_map) {
2163 const auto& rule = ri.second;
2164 auto& id = rule.get_id();
2165 auto& prefix = rule.get_prefix();
2166 auto& filter = rule.get_filter();
2167 auto& expiration = rule.get_expiration();
2168 auto& noncur_expiration = rule.get_noncur_expiration();
2169
2170 ldpp_dout(dpp, 10) << "rule: " << ri.first
2171 << " prefix: " << prefix
2172 << " expiration: "
2173 << " date: " << expiration.get_date()
2174 << " days: " << expiration.get_days()
2175 << " noncur_expiration: "
2176 << " date: " << noncur_expiration.get_date()
2177 << " days: " << noncur_expiration.get_days()
2178 << dendl;
2179
2180 /* skip if rule !enabled
2181 * if rule has prefix, skip iff object !match prefix
2182 * if rule has tags, skip iff object !match tags
2183 * note if object is current or non-current, compare accordingly
2184 * if rule has days, construct date expression and save iff older
2185 * than last saved
2186 * if rule has date, convert date expression and save iff older
2187 * than last saved
2188 * if the date accum has a value, format it into hdr
2189 */
2190
2191 if (! rule.is_enabled())
2192 continue;
2193
2194 if(! prefix.empty()) {
2195 if (! boost::starts_with(obj_key.name, prefix))
2196 continue;
2197 }
2198
2199 if (filter.has_tags()) {
2200 bool tag_match = false;
2201 const RGWObjTags& rule_tagset = filter.get_tags();
2202 for (auto& tag : rule_tagset.get_tags()) {
2203 /* remember, S3 tags are {key,value} tuples */
2204 auto ma1 = obj_tag_map.find(tag.first);
2205 if ( ma1 != obj_tag_map.end()) {
2206 if (tag.second == ma1->second) {
2207 ldpp_dout(dpp, 10) << "tag match obj_key=" << obj_key
2208 << " rule_id=" << id
2209 << " tag=" << tag
2210 << " (ma=" << *ma1 << ")"
2211 << dendl;
2212 tag_match = true;
2213 break;
2214 }
2215 }
2216 }
2217 if (! tag_match)
2218 continue;
2219 }
2220
2221 // compute a uniform expiration date
2222 boost::optional<ceph::real_time> rule_expiration_date;
2223 const LCExpiration& rule_expiration =
2224 (obj_key.instance.empty()) ? expiration : noncur_expiration;
2225
2226 if (rule_expiration.has_date()) {
2227 rule_expiration_date =
2228 boost::optional<ceph::real_time>(
2229 ceph::from_iso_8601(rule.get_expiration().get_date()));
2230 } else {
2231 if (rule_expiration.has_days()) {
2232 rule_expiration_date =
2233 boost::optional<ceph::real_time>(
2234 mtime + make_timespan(rule_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2235 }
2236 }
2237
2238 // update earliest expiration
2239 if (rule_expiration_date) {
2240 if ((! expiration_date) ||
2241 (*expiration_date > *rule_expiration_date)) {
2242 expiration_date =
2243 boost::optional<ceph::real_time>(rule_expiration_date);
2244 rule_id = boost::optional<std::string>(id);
2245 }
2246 }
2247 }
2248
2249 // cond format header
2250 if (expiration_date && rule_id) {
2251 // Fri, 23 Dec 2012 00:00:00 GMT
2252 char exp_buf[100];
2253 time_t exp = ceph::real_clock::to_time_t(*expiration_date);
2254 if (std::strftime(exp_buf, sizeof(exp_buf),
2255 "%a, %d %b %Y %T %Z", std::gmtime(&exp))) {
2256 hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf,
2257 *rule_id);
2258 } else {
2259 ldpp_dout(dpp, 0) << __func__ <<
2260 "() strftime of life cycle expiration header failed"
2261 << dendl;
2262 }
2263 }
2264
2265 return hdr;
2266
2267} /* rgwlc_s3_expiration_header */
2268
f6b5b4d7
TL
2269bool s3_multipart_abort_header(
2270 DoutPrefixProvider* dpp,
2271 const rgw_obj_key& obj_key,
2272 const ceph::real_time& mtime,
2273 const std::map<std::string, buffer::list>& bucket_attrs,
2274 ceph::real_time& abort_date,
2275 std::string& rule_id)
2276{
2277 CephContext* cct = dpp->get_cct();
2278 RGWLifecycleConfiguration config(cct);
2279
2280 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2281 if (aiter == bucket_attrs.end())
2282 return false;
2283
2284 bufferlist::const_iterator iter{&aiter->second};
2285 try {
2286 config.decode(iter);
2287 } catch (const buffer::error& e) {
2288 ldpp_dout(dpp, 0) << __func__
2289 << "() decode life cycle config failed"
2290 << dendl;
2291 return false;
2292 } /* catch */
2293
2294 std::optional<ceph::real_time> abort_date_tmp;
2295 std::optional<std::string_view> rule_id_tmp;
2296 const auto& rule_map = config.get_rule_map();
2297 for (const auto& ri : rule_map) {
2298 const auto& rule = ri.second;
2299 const auto& id = rule.get_id();
2300 const auto& filter = rule.get_filter();
2301 const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix();
2302 const auto& mp_expiration = rule.get_mp_expiration();
2303 if (!rule.is_enabled()) {
2304 continue;
2305 }
2306 if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) {
2307 continue;
2308 }
2309
2310 std::optional<ceph::real_time> rule_abort_date;
2311 if (mp_expiration.has_days()) {
2312 rule_abort_date = std::optional<ceph::real_time>(
2313 mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2314 }
2315
2316 // update earliest abort date
2317 if (rule_abort_date) {
2318 if ((! abort_date_tmp) ||
2319 (*abort_date_tmp > *rule_abort_date)) {
2320 abort_date_tmp =
2321 std::optional<ceph::real_time>(rule_abort_date);
2322 rule_id_tmp = std::optional<std::string_view>(id);
2323 }
2324 }
2325 }
2326 if (abort_date_tmp && rule_id_tmp) {
2327 abort_date = *abort_date_tmp;
2328 rule_id = *rule_id_tmp;
2329 return true;
2330 } else {
2331 return false;
2332 }
2333}
2334
9f95a23c 2335} /* namespace rgw::lc */