]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string.h>
5 #include <iostream>
6 #include <map>
7 #include <algorithm>
8 #include <tuple>
9 #include <functional>
10
11 #include <boost/algorithm/string/split.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <boost/variant.hpp>
15
16 #include "include/scope_guard.h"
17 #include "include/function2.hpp"
18 #include "common/Formatter.h"
19 #include "common/containers.h"
20 #include "common/split.h"
21 #include <common/errno.h>
22 #include "include/random.h"
23 #include "cls/lock/cls_lock_client.h"
24 #include "rgw_perf_counters.h"
25 #include "rgw_common.h"
26 #include "rgw_bucket.h"
27 #include "rgw_lc.h"
28 #include "rgw_zone.h"
29 #include "rgw_string.h"
30 #include "rgw_multi.h"
31 #include "rgw_sal.h"
32 #include "rgw_lc_tier.h"
33 #include "rgw_notify.h"
34
35 #include "fmt/format.h"
36
37 #include "services/svc_sys_obj.h"
38 #include "services/svc_zone.h"
39 #include "services/svc_tier_rados.h"
40
41 #define dout_context g_ceph_context
42 #define dout_subsys ceph_subsys_rgw
43
44 using namespace std;
45
46 const char* LC_STATUS[] = {
47 "UNINITIAL",
48 "PROCESSING",
49 "FAILED",
50 "COMPLETE"
51 };
52
53 using namespace librados;
54
55 bool LCRule::valid() const
56 {
57 if (id.length() > MAX_ID_LEN) {
58 return false;
59 }
60 else if(expiration.empty() && noncur_expiration.empty() &&
61 mp_expiration.empty() && !dm_expiration &&
62 transitions.empty() && noncur_transitions.empty()) {
63 return false;
64 }
65 else if (!expiration.valid() || !noncur_expiration.valid() ||
66 !mp_expiration.valid()) {
67 return false;
68 }
69 if (!transitions.empty()) {
70 bool using_days = expiration.has_days();
71 bool using_date = expiration.has_date();
72 for (const auto& elem : transitions) {
73 if (!elem.second.valid()) {
74 return false;
75 }
76 using_days = using_days || elem.second.has_days();
77 using_date = using_date || elem.second.has_date();
78 if (using_days && using_date) {
79 return false;
80 }
81 }
82 }
83 for (const auto& elem : noncur_transitions) {
84 if (!elem.second.valid()) {
85 return false;
86 }
87 }
88
89 return true;
90 }
91
92 void LCRule::init_simple_days_rule(std::string_view _id,
93 std::string_view _prefix, int num_days)
94 {
95 id = _id;
96 prefix = _prefix;
97 char buf[32];
98 snprintf(buf, sizeof(buf), "%d", num_days);
99 expiration.set_days(buf);
100 set_enabled(true);
101 }
102
103 void RGWLifecycleConfiguration::add_rule(const LCRule& rule)
104 {
105 auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
106 rule_map.insert(pair<string, LCRule>(id, rule));
107 }
108
109 bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
110 {
111 lc_op op(rule.get_id());
112 op.status = rule.is_enabled();
113 if (rule.get_expiration().has_days()) {
114 op.expiration = rule.get_expiration().get_days();
115 }
116 if (rule.get_expiration().has_date()) {
117 op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date());
118 }
119 if (rule.get_noncur_expiration().has_days()) {
120 op.noncur_expiration = rule.get_noncur_expiration().get_days();
121 }
122 if (rule.get_mp_expiration().has_days()) {
123 op.mp_expiration = rule.get_mp_expiration().get_days();
124 }
125 op.dm_expiration = rule.get_dm_expiration();
126 for (const auto &elem : rule.get_transitions()) {
127 transition_action action;
128 if (elem.second.has_days()) {
129 action.days = elem.second.get_days();
130 } else {
131 action.date = ceph::from_iso_8601(elem.second.get_date());
132 }
133 action.storage_class
134 = rgw_placement_rule::get_canonical_storage_class(elem.first);
135 op.transitions.emplace(elem.first, std::move(action));
136 }
137 for (const auto &elem : rule.get_noncur_transitions()) {
138 transition_action action;
139 action.days = elem.second.get_days();
140 action.date = ceph::from_iso_8601(elem.second.get_date());
141 action.storage_class
142 = rgw_placement_rule::get_canonical_storage_class(elem.first);
143 op.noncur_transitions.emplace(elem.first, std::move(action));
144 }
145 std::string prefix;
146 if (rule.get_filter().has_prefix()){
147 prefix = rule.get_filter().get_prefix();
148 } else {
149 prefix = rule.get_prefix();
150 }
151 if (rule.get_filter().has_tags()){
152 op.obj_tags = rule.get_filter().get_tags();
153 }
154 op.rule_flags = rule.get_filter().get_flags();
155 prefix_map.emplace(std::move(prefix), std::move(op));
156 return true;
157 }
158
159 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
160 {
161 if (!rule.valid()) {
162 return -EINVAL;
163 }
164 auto& id = rule.get_id();
165 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
166 return -EINVAL;
167 }
168 if (rule.get_filter().has_tags() && (rule.get_dm_expiration() ||
169 !rule.get_mp_expiration().empty())) {
170 return -ERR_INVALID_REQUEST;
171 }
172 rule_map.insert(pair<string, LCRule>(id, rule));
173
174 if (!_add_rule(rule)) {
175 return -ERR_INVALID_REQUEST;
176 }
177 return 0;
178 }
179
180 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first,
181 const lc_op& second) {
182 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
183 (second.expiration > 0 || second.expiration_date != boost::none)) {
184 return true;
185 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
186 return true;
187 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
188 return true;
189 } else if (!first.transitions.empty() && !second.transitions.empty()) {
190 for (auto &elem : first.transitions) {
191 if (second.transitions.find(elem.first) != second.transitions.end()) {
192 return true;
193 }
194 }
195 } else if (!first.noncur_transitions.empty() &&
196 !second.noncur_transitions.empty()) {
197 for (auto &elem : first.noncur_transitions) {
198 if (second.noncur_transitions.find(elem.first) !=
199 second.noncur_transitions.end()) {
200 return true;
201 }
202 }
203 }
204 return false;
205 }
206
207 /* Formerly, this method checked for duplicate rules using an invalid
208 * method (prefix uniqueness). */
209 bool RGWLifecycleConfiguration::valid()
210 {
211 return true;
212 }
213
214 void *RGWLC::LCWorker::entry() {
215 do {
216 std::unique_ptr<rgw::sal::Bucket> all_buckets; // empty restriction
217 utime_t start = ceph_clock_now();
218 if (should_work(start)) {
219 ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
220 int r = lc->process(this, all_buckets, false /* once */);
221 if (r < 0) {
222 ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
223 << r << dendl;
224 }
225 ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
226 cloud_targets.clear(); // clear cloud targets
227 }
228 if (lc->going_down())
229 break;
230
231 utime_t end = ceph_clock_now();
232 int secs = schedule_next_start_time(start, end);
233 utime_t next;
234 next.set_from_double(end + secs);
235
236 ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
237 << rgw_to_asctime(next) << dendl;
238
239 std::unique_lock l{lock};
240 cond.wait_for(l, std::chrono::seconds(secs));
241 } while (!lc->going_down());
242
243 return NULL;
244 }
245
246 void RGWLC::initialize(CephContext *_cct, rgw::sal::Driver* _driver) {
247 cct = _cct;
248 driver = _driver;
249 sal_lc = driver->get_lifecycle();
250 max_objs = cct->_conf->rgw_lc_max_objs;
251 if (max_objs > HASH_PRIME)
252 max_objs = HASH_PRIME;
253
254 obj_names = new string[max_objs];
255
256 for (int i = 0; i < max_objs; i++) {
257 obj_names[i] = lc_oid_prefix;
258 char buf[32];
259 snprintf(buf, 32, ".%d", i);
260 obj_names[i].append(buf);
261 }
262
263 #define COOKIE_LEN 16
264 char cookie_buf[COOKIE_LEN + 1];
265 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
266 cookie = cookie_buf;
267 }
268
269 void RGWLC::finalize()
270 {
271 delete[] obj_names;
272 }
273
274 static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
275 os << "<ent: bucket=";
276 os << ent.get_bucket();
277 os << "; start_time=";
278 os << rgw_to_asctime(utime_t(time_t(ent.get_start_time()), 0));
279 os << "; status=";
280 os << LC_STATUS[ent.get_status()];
281 os << ">";
282 return os;
283 }
284
285 static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days,
286 ceph::real_time *expire_time = nullptr)
287 {
288 double timediff, cmp;
289 utime_t base_time;
290 if (cct->_conf->rgw_lc_debug_interval <= 0) {
291 /* Normal case, run properly */
292 cmp = double(days)*24*60*60;
293 base_time = ceph_clock_now().round_to_day();
294 } else {
295 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
296 cmp = double(days)*cct->_conf->rgw_lc_debug_interval;
297 base_time = ceph_clock_now();
298 }
299 auto tt_mtime = ceph::real_clock::to_time_t(mtime);
300 timediff = base_time - tt_mtime;
301
302 if (expire_time) {
303 *expire_time = mtime + make_timespan(cmp);
304 }
305
306 ldpp_dout(dpp, 20) << __func__
307 << "(): mtime=" << mtime << " days=" << days
308 << " base_time=" << base_time << " timediff=" << timediff
309 << " cmp=" << cmp
310 << " is_expired=" << (timediff >= cmp)
311 << dendl;
312
313 return (timediff >= cmp);
314 }
315
316 static bool pass_object_lock_check(rgw::sal::Driver* driver, rgw::sal::Object* obj, const DoutPrefixProvider *dpp)
317 {
318 if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
319 return true;
320 }
321 std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op();
322 int ret = read_op->prepare(null_yield, dpp);
323 if (ret < 0) {
324 if (ret == -ENOENT) {
325 return true;
326 } else {
327 return false;
328 }
329 } else {
330 auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION);
331 if (iter != obj->get_attrs().end()) {
332 RGWObjectRetention retention;
333 try {
334 decode(retention, iter->second);
335 } catch (buffer::error& err) {
336 ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectRetention"
337 << dendl;
338 return false;
339 }
340 if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
341 ceph_clock_now()) {
342 return false;
343 }
344 }
345 iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD);
346 if (iter != obj->get_attrs().end()) {
347 RGWObjectLegalHold obj_legal_hold;
348 try {
349 decode(obj_legal_hold, iter->second);
350 } catch (buffer::error& err) {
351 ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectLegalHold"
352 << dendl;
353 return false;
354 }
355 if (obj_legal_hold.is_enabled()) {
356 return false;
357 }
358 }
359 return true;
360 }
361 }
362
363 class LCObjsLister {
364 rgw::sal::Driver* driver;
365 rgw::sal::Bucket* bucket;
366 rgw::sal::Bucket::ListParams list_params;
367 rgw::sal::Bucket::ListResults list_results;
368 string prefix;
369 vector<rgw_bucket_dir_entry>::iterator obj_iter;
370 rgw_bucket_dir_entry pre_obj;
371 int64_t delay_ms;
372
373 public:
374 LCObjsLister(rgw::sal::Driver* _driver, rgw::sal::Bucket* _bucket) :
375 driver(_driver), bucket(_bucket) {
376 list_params.list_versions = bucket->versioned();
377 list_params.allow_unordered = true;
378 delay_ms = driver->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
379 }
380
381 void set_prefix(const string& p) {
382 prefix = p;
383 list_params.prefix = prefix;
384 }
385
386 int init(const DoutPrefixProvider *dpp) {
387 return fetch(dpp);
388 }
389
390 int fetch(const DoutPrefixProvider *dpp) {
391 int ret = bucket->list(dpp, list_params, 1000, list_results, null_yield);
392 if (ret < 0) {
393 return ret;
394 }
395
396 obj_iter = list_results.objs.begin();
397
398 return 0;
399 }
400
401 void delay() {
402 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
403 }
404
405 bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj,
406 std::function<void(void)> fetch_barrier
407 = []() { /* nada */}) {
408 if (obj_iter == list_results.objs.end()) {
409 if (!list_results.is_truncated) {
410 delay();
411 return false;
412 } else {
413 fetch_barrier();
414 list_params.marker = pre_obj.key;
415 int ret = fetch(dpp);
416 if (ret < 0) {
417 ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret
418 << dendl;
419 return false;
420 }
421 }
422 delay();
423 }
424 /* returning address of entry in objs */
425 *obj = &(*obj_iter);
426 return obj_iter != list_results.objs.end();
427 }
428
429 rgw_bucket_dir_entry get_prev_obj() {
430 return pre_obj;
431 }
432
433 void next() {
434 pre_obj = *obj_iter;
435 ++obj_iter;
436 }
437
438 boost::optional<std::string> next_key_name() {
439 if (obj_iter == list_results.objs.end() ||
440 (obj_iter + 1) == list_results.objs.end()) {
441 /* this should have been called after get_obj() was called, so this should
442 * only happen if is_truncated is false */
443 return boost::none;
444 }
445
446 return ((obj_iter + 1)->key.name);
447 }
448
449 }; /* LCObjsLister */
450
451 struct op_env {
452
453 using LCWorker = RGWLC::LCWorker;
454
455 lc_op op;
456 rgw::sal::Driver* driver;
457 LCWorker* worker;
458 rgw::sal::Bucket* bucket;
459 LCObjsLister& ol;
460
461 op_env(lc_op& _op, rgw::sal::Driver* _driver, LCWorker* _worker,
462 rgw::sal::Bucket* _bucket, LCObjsLister& _ol)
463 : op(_op), driver(_driver), worker(_worker), bucket(_bucket),
464 ol(_ol) {}
465 }; /* op_env */
466
467 class LCRuleOp;
468 class WorkQ;
469
470 struct lc_op_ctx {
471 CephContext *cct;
472 op_env env;
473 rgw_bucket_dir_entry o;
474 boost::optional<std::string> next_key_name;
475 ceph::real_time effective_mtime;
476
477 rgw::sal::Driver* driver;
478 rgw::sal::Bucket* bucket;
479 lc_op& op; // ok--refers to expanded env.op
480 LCObjsLister& ol;
481
482 std::unique_ptr<rgw::sal::Object> obj;
483 RGWObjectCtx rctx;
484 const DoutPrefixProvider *dpp;
485 WorkQ* wq;
486
487 std::unique_ptr<rgw::sal::PlacementTier> tier;
488
489 lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
490 boost::optional<std::string> next_key_name,
491 ceph::real_time effective_mtime,
492 const DoutPrefixProvider *dpp, WorkQ* wq)
493 : cct(env.driver->ctx()), env(env), o(o), next_key_name(next_key_name),
494 effective_mtime(effective_mtime),
495 driver(env.driver), bucket(env.bucket), op(env.op), ol(env.ol),
496 rctx(env.driver), dpp(dpp), wq(wq)
497 {
498 obj = bucket->get_object(o.key);
499 }
500
501 bool next_has_same_name(const std::string& key_name) {
502 return (next_key_name && key_name.compare(
503 boost::get<std::string>(next_key_name)) == 0);
504 }
505
506 }; /* lc_op_ctx */
507
508
509 static std::string lc_id = "rgw lifecycle";
510 static std::string lc_req_id = "0";
511
512 static int remove_expired_obj(
513 const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
514 rgw::notify::EventType event_type)
515 {
516 auto& driver = oc.driver;
517 auto& bucket_info = oc.bucket->get_info();
518 auto& o = oc.o;
519 auto obj_key = o.key;
520 auto& meta = o.meta;
521 int ret;
522 std::string version_id;
523 std::unique_ptr<rgw::sal::Notification> notify;
524
525 if (!remove_indeed) {
526 obj_key.instance.clear();
527 } else if (obj_key.instance.empty()) {
528 obj_key.instance = "null";
529 }
530
531 std::unique_ptr<rgw::sal::Bucket> bucket;
532 std::unique_ptr<rgw::sal::Object> obj;
533
534 ret = driver->get_bucket(nullptr, bucket_info, &bucket);
535 if (ret < 0) {
536 return ret;
537 }
538
539 // XXXX currently, rgw::sal::Bucket.owner is always null here
540 std::unique_ptr<rgw::sal::User> user;
541 if (! bucket->get_owner()) {
542 auto& bucket_info = bucket->get_info();
543 user = driver->get_user(bucket_info.owner);
544 // forgive me, lord
545 if (user) {
546 bucket->set_owner(user.get());
547 }
548 }
549
550 obj = bucket->get_object(obj_key);
551
552 RGWObjState* obj_state{nullptr};
553 ret = obj->get_obj_state(dpp, &obj_state, null_yield, true);
554 if (ret < 0) {
555 return ret;
556 }
557
558 std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
559 = obj->get_delete_op();
560 del_op->params.versioning_status
561 = obj->get_bucket()->get_info().versioning_status();
562 del_op->params.obj_owner.set_id(rgw_user {meta.owner});
563 del_op->params.obj_owner.set_name(meta.owner_display_name);
564 del_op->params.bucket_owner.set_id(bucket_info.owner);
565 del_op->params.unmod_since = meta.mtime;
566 del_op->params.marker_version_id = version_id;
567
568 // notification supported only for RADOS driver for now
569 notify = driver->get_notification(dpp, obj.get(), nullptr, event_type,
570 bucket.get(), lc_id,
571 const_cast<std::string&>(oc.bucket->get_tenant()),
572 lc_req_id, null_yield);
573
574 ret = notify->publish_reserve(dpp, nullptr);
575 if ( ret < 0) {
576 ldpp_dout(dpp, 1)
577 << "ERROR: notify reservation failed, deferring delete of object k="
578 << o.key
579 << dendl;
580 return ret;
581 }
582 ret = del_op->delete_obj(dpp, null_yield);
583 if (ret < 0) {
584 ldpp_dout(dpp, 1) <<
585 "ERROR: publishing notification failed, with error: " << ret << dendl;
586 } else {
587 // send request to notification manager
588 (void) notify->publish_commit(dpp, obj_state->size,
589 ceph::real_clock::now(),
590 obj_state->attrset[RGW_ATTR_ETAG].to_str(),
591 version_id);
592 }
593
594 return ret;
595
596 } /* remove_expired_obj */
597
598 class LCOpAction {
599 public:
600 virtual ~LCOpAction() {}
601
602 virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) {
603 return false;
604 }
605
606 /* called after check(). Check should tell us whether this action
607 * is applicable. If there are multiple actions, we'll end up executing
608 * the latest applicable action
609 * For example:
610 * one action after 10 days, another after 20, third after 40.
611 * After 10 days, the latest applicable action would be the first one,
612 * after 20 days it will be the second one. After 21 days it will still be the
613 * second one. So check() should return true for the second action at that point,
614 * but should_process() if the action has already been applied. In object removal
615 * it doesn't matter, but in object transition it does.
616 */
617 virtual bool should_process() {
618 return true;
619 }
620
621 virtual int process(lc_op_ctx& oc) {
622 return 0;
623 }
624
625 friend class LCOpRule;
626 }; /* LCOpAction */
627
628 class LCOpFilter {
629 public:
630 virtual ~LCOpFilter() {}
631 virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) {
632 return false;
633 }
634 }; /* LCOpFilter */
635
636 class LCOpRule {
637 friend class LCOpAction;
638
639 op_env env;
640 boost::optional<std::string> next_key_name;
641 ceph::real_time effective_mtime;
642
643 std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
644 std::vector<shared_ptr<LCOpAction> > actions;
645
646 public:
647 LCOpRule(op_env& _env) : env(_env) {}
648
649 boost::optional<std::string> get_next_key_name() {
650 return next_key_name;
651 }
652
653 std::vector<shared_ptr<LCOpAction>>& get_actions() {
654 return actions;
655 }
656
657 void build();
658 void update();
659 int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
660 WorkQ* wq);
661 }; /* LCOpRule */
662
663 using WorkItem =
664 boost::variant<void*,
665 /* out-of-line delete */
666 std::tuple<LCOpRule, rgw_bucket_dir_entry>,
667 /* uncompleted MPU expiration */
668 std::tuple<lc_op, rgw_bucket_dir_entry>,
669 rgw_bucket_dir_entry>;
670
671 class WorkQ : public Thread
672 {
673 public:
674 using unique_lock = std::unique_lock<std::mutex>;
675 using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
676 using dequeue_result = boost::variant<void*, WorkItem>;
677
678 static constexpr uint32_t FLAG_NONE = 0x0000;
679 static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
680 static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
681 static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
682
683 private:
684 const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
685 RGWLC::LCWorker* wk;
686 uint32_t qmax;
687 int ix;
688 std::mutex mtx;
689 std::condition_variable cv;
690 uint32_t flags;
691 vector<WorkItem> items;
692 work_f f;
693
694 public:
695 WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
696 : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
697 {
698 create(thr_name().c_str());
699 }
700
701 std::string thr_name() {
702 return std::string{"wp_thrd: "}
703 + std::to_string(wk->ix) + ", " + std::to_string(ix);
704 }
705
706 void setf(work_f _f) {
707 f = _f;
708 }
709
710 void enqueue(WorkItem&& item) {
711 unique_lock uniq(mtx);
712 while ((!wk->get_lc()->going_down()) &&
713 (items.size() > qmax)) {
714 flags |= FLAG_EWAIT_SYNC;
715 cv.wait_for(uniq, 200ms);
716 }
717 items.push_back(item);
718 if (flags & FLAG_DWAIT_SYNC) {
719 flags &= ~FLAG_DWAIT_SYNC;
720 cv.notify_one();
721 }
722 }
723
724 void drain() {
725 unique_lock uniq(mtx);
726 flags |= FLAG_EDRAIN_SYNC;
727 while (flags & FLAG_EDRAIN_SYNC) {
728 cv.wait_for(uniq, 200ms);
729 }
730 }
731
732 private:
733 dequeue_result dequeue() {
734 unique_lock uniq(mtx);
735 while ((!wk->get_lc()->going_down()) &&
736 (items.size() == 0)) {
737 /* clear drain state, as we are NOT doing work and qlen==0 */
738 if (flags & FLAG_EDRAIN_SYNC) {
739 flags &= ~FLAG_EDRAIN_SYNC;
740 }
741 flags |= FLAG_DWAIT_SYNC;
742 cv.wait_for(uniq, 200ms);
743 }
744 if (items.size() > 0) {
745 auto item = items.back();
746 items.pop_back();
747 if (flags & FLAG_EWAIT_SYNC) {
748 flags &= ~FLAG_EWAIT_SYNC;
749 cv.notify_one();
750 }
751 return {item};
752 }
753 return nullptr;
754 }
755
756 void* entry() override {
757 while (!wk->get_lc()->going_down()) {
758 auto item = dequeue();
759 if (item.which() == 0) {
760 /* going down */
761 break;
762 }
763 f(wk, this, boost::get<WorkItem>(item));
764 }
765 return nullptr;
766 }
767 }; /* WorkQ */
768
769 class RGWLC::WorkPool
770 {
771 using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
772 TVector wqs;
773 uint64_t ix;
774
775 public:
776 WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
777 : wqs(TVector{
778 n_threads,
779 [&](const size_t ix, auto emplacer) {
780 emplacer.emplace(wk, ix, qmax);
781 }}),
782 ix(0)
783 {}
784
785 ~WorkPool() {
786 for (auto& wq : wqs) {
787 wq.join();
788 }
789 }
790
791 void setf(WorkQ::work_f _f) {
792 for (auto& wq : wqs) {
793 wq.setf(_f);
794 }
795 }
796
797 void enqueue(WorkItem item) {
798 const auto tix = ix;
799 ix = (ix+1) % wqs.size();
800 (wqs[tix]).enqueue(std::move(item));
801 }
802
803 void drain() {
804 for (auto& wq : wqs) {
805 wq.drain();
806 }
807 }
808 }; /* WorkPool */
809
810 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
811 RGWLC *lc, int ix)
812 : dpp(dpp), cct(cct), lc(lc), ix(ix)
813 {
814 auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
815 workpool = new WorkPool(this, wpw, 512);
816 }
817
818 static inline bool worker_should_stop(time_t stop_at, bool once)
819 {
820 return !once && stop_at < time(nullptr);
821 }
822
823 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
824 const multimap<string, lc_op>& prefix_map,
825 LCWorker* worker, time_t stop_at, bool once)
826 {
827 MultipartMetaFilter mp_filter;
828 int ret;
829 rgw::sal::Bucket::ListParams params;
830 rgw::sal::Bucket::ListResults results;
831 auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
832 params.list_versions = false;
833 /* lifecycle processing does not depend on total order, so can
834 * take advantage of unordered listing optimizations--such as
835 * operating on one shard at a time */
836 params.allow_unordered = true;
837 params.ns = RGW_OBJ_NS_MULTIPART;
838 params.access_list_filter = &mp_filter;
839
840 auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
841 auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
842 auto& [rule, obj] = wt;
843 if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
844 rgw_obj_key key(obj.key);
845 std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
846 int ret = mpu->abort(this, cct);
847 if (ret == 0) {
848 if (perfcounter) {
849 perfcounter->inc(l_rgw_lc_abort_mpu, 1);
850 }
851 } else {
852 if (ret == -ERR_NO_SUCH_UPLOAD) {
853 ldpp_dout(wk->get_lc(), 5)
854 << "ERROR: abort_multipart_upload failed, ret=" << ret
855 << ", thread:" << wq->thr_name()
856 << ", meta:" << obj.key
857 << dendl;
858 } else {
859 ldpp_dout(wk->get_lc(), 0)
860 << "ERROR: abort_multipart_upload failed, ret=" << ret
861 << ", thread:" << wq->thr_name()
862 << ", meta:" << obj.key
863 << dendl;
864 }
865 } /* abort failed */
866 } /* expired */
867 };
868
869 worker->workpool->setf(pf);
870
871 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
872 ++prefix_iter) {
873
874 if (worker_should_stop(stop_at, once)) {
875 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
876 << worker->ix
877 << dendl;
878 return 0;
879 }
880
881 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
882 continue;
883 }
884 params.prefix = prefix_iter->first;
885 do {
886 auto offset = 0;
887 results.objs.clear();
888 ret = target->list(this, params, 1000, results, null_yield);
889 if (ret < 0) {
890 if (ret == (-ENOENT))
891 return 0;
892 ldpp_dout(this, 0) << "ERROR: driver->list_objects():" <<dendl;
893 return ret;
894 }
895
896 for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter, ++offset) {
897 std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
898 {prefix_iter->second, *obj_iter};
899 worker->workpool->enqueue(WorkItem{t1});
900 if (going_down()) {
901 return 0;
902 }
903 } /* for objs */
904
905 if ((offset % 100) == 0) {
906 if (worker_should_stop(stop_at, once)) {
907 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
908 << worker->ix
909 << dendl;
910 return 0;
911 }
912 }
913
914 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
915 } while(results.is_truncated);
916 } /* for prefix_map */
917
918 worker->workpool->drain();
919 return 0;
920 } /* RGWLC::handle_multipart_expiration */
921
922 static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, bufferlist& tags_bl)
923 {
924 std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op();
925
926 return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield);
927 }
928
929 static bool is_valid_op(const lc_op& op)
930 {
931 return (op.status &&
932 (op.expiration > 0
933 || op.expiration_date != boost::none
934 || op.noncur_expiration > 0
935 || op.dm_expiration
936 || !op.transitions.empty()
937 || !op.noncur_transitions.empty()));
938 }
939
940 static bool zone_check(const lc_op& op, rgw::sal::Zone* zone)
941 {
942
943 if (zone->get_tier_type() == "archive") {
944 return (op.rule_flags & uint32_t(LCFlagType::ArchiveZone));
945 } else {
946 return (! (op.rule_flags & uint32_t(LCFlagType::ArchiveZone)));
947 }
948 }
949
950 static inline bool has_all_tags(const lc_op& rule_action,
951 const RGWObjTags& object_tags)
952 {
953 if(! rule_action.obj_tags)
954 return false;
955 if(object_tags.count() < rule_action.obj_tags->count())
956 return false;
957 size_t tag_count = 0;
958 for (const auto& tag : object_tags.get_tags()) {
959 const auto& rule_tags = rule_action.obj_tags->get_tags();
960 const auto& iter = rule_tags.find(tag.first);
961 if(iter == rule_tags.end())
962 continue;
963 if(iter->second == tag.second)
964 {
965 tag_count++;
966 }
967 /* all tags in the rule appear in obj tags */
968 }
969 return tag_count == rule_action.obj_tags->count();
970 }
971
972 static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
973 {
974 auto& op = oc.op;
975
976 if (op.obj_tags != boost::none) {
977 *skip = true;
978
979 bufferlist tags_bl;
980 int ret = read_obj_tags(dpp, oc.obj.get(), tags_bl);
981 if (ret < 0) {
982 if (ret != -ENODATA) {
983 ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r="
984 << ret << " " << oc.wq->thr_name() << dendl;
985 }
986 return 0;
987 }
988 RGWObjTags dest_obj_tags;
989 try {
990 auto iter = tags_bl.cbegin();
991 dest_obj_tags.decode(iter);
992 } catch (buffer::error& err) {
993 ldpp_dout(oc.dpp,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
994 << oc.wq->thr_name() << dendl;
995 return -EIO;
996 }
997
998 if (! has_all_tags(op, dest_obj_tags)) {
999 ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj
1000 << " as tags do not match in rule: "
1001 << op.id << " "
1002 << oc.wq->thr_name() << dendl;
1003 return 0;
1004 }
1005 }
1006 *skip = false;
1007 return 0;
1008 }
1009
1010 class LCOpFilter_Tags : public LCOpFilter {
1011 public:
1012 bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override {
1013 auto& o = oc.o;
1014
1015 if (o.is_delete_marker()) {
1016 return true;
1017 }
1018
1019 bool skip;
1020
1021 int ret = check_tags(dpp, oc, &skip);
1022 if (ret < 0) {
1023 if (ret == -ENOENT) {
1024 return false;
1025 }
1026 ldpp_dout(oc.dpp, 0) << "ERROR: check_tags on obj=" << oc.obj
1027 << " returned ret=" << ret << " "
1028 << oc.wq->thr_name() << dendl;
1029 return false;
1030 }
1031
1032 return !skip;
1033 };
1034 };
1035
1036 class LCOpAction_CurrentExpiration : public LCOpAction {
1037 public:
1038 LCOpAction_CurrentExpiration(op_env& env) {}
1039
1040 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1041 auto& o = oc.o;
1042 if (!o.is_current()) {
1043 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1044 << ": not current, skipping "
1045 << oc.wq->thr_name() << dendl;
1046 return false;
1047 }
1048 if (o.is_delete_marker()) {
1049 if (oc.next_key_name) {
1050 std::string nkn = *oc.next_key_name;
1051 if (oc.next_has_same_name(o.key.name)) {
1052 ldpp_dout(dpp, 7) << __func__ << "(): dm-check SAME: key=" << o.key
1053 << " next_key_name: %%" << nkn << "%% "
1054 << oc.wq->thr_name() << dendl;
1055 return false;
1056 } else {
1057 ldpp_dout(dpp, 7) << __func__ << "(): dm-check DELE: key=" << o.key
1058 << " next_key_name: %%" << nkn << "%% "
1059 << oc.wq->thr_name() << dendl;
1060 *exp_time = real_clock::now();
1061 return true;
1062 }
1063 }
1064 return false;
1065 }
1066
1067 auto& mtime = o.meta.mtime;
1068 bool is_expired;
1069 auto& op = oc.op;
1070 if (op.expiration <= 0) {
1071 if (op.expiration_date == boost::none) {
1072 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1073 << ": no expiration set in rule, skipping "
1074 << oc.wq->thr_name() << dendl;
1075 return false;
1076 }
1077 is_expired = ceph_clock_now() >=
1078 ceph::real_clock::to_time_t(*op.expiration_date);
1079 *exp_time = *op.expiration_date;
1080 } else {
1081 is_expired = obj_has_expired(dpp, oc.cct, mtime, op.expiration, exp_time);
1082 }
1083
1084 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1085 << (int)is_expired << " "
1086 << oc.wq->thr_name() << dendl;
1087 return is_expired;
1088 }
1089
1090 int process(lc_op_ctx& oc) {
1091 auto& o = oc.o;
1092 int r;
1093 if (o.is_delete_marker()) {
1094 r = remove_expired_obj(oc.dpp, oc, true,
1095 rgw::notify::ObjectExpirationDeleteMarker);
1096 if (r < 0) {
1097 ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
1098 << oc.bucket << ":" << o.key
1099 << " " << cpp_strerror(r) << " "
1100 << oc.wq->thr_name() << dendl;
1101 return r;
1102 }
1103 ldpp_dout(oc.dpp, 2) << "DELETED: current is-dm "
1104 << oc.bucket << ":" << o.key
1105 << " " << oc.wq->thr_name() << dendl;
1106 } else {
1107 /* ! o.is_delete_marker() */
1108 r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
1109 rgw::notify::ObjectExpirationCurrent);
1110 if (r < 0) {
1111 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
1112 << oc.bucket << ":" << o.key
1113 << " " << cpp_strerror(r) << " "
1114 << oc.wq->thr_name() << dendl;
1115 return r;
1116 }
1117 if (perfcounter) {
1118 perfcounter->inc(l_rgw_lc_expire_current, 1);
1119 }
1120 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1121 << " " << oc.wq->thr_name() << dendl;
1122 }
1123 return 0;
1124 }
1125 };
1126
1127 class LCOpAction_NonCurrentExpiration : public LCOpAction {
1128 protected:
1129 public:
1130 LCOpAction_NonCurrentExpiration(op_env& env)
1131 {}
1132
1133 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1134 auto& o = oc.o;
1135 if (o.is_current()) {
1136 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1137 << ": current version, skipping "
1138 << oc.wq->thr_name() << dendl;
1139 return false;
1140 }
1141
1142 int expiration = oc.op.noncur_expiration;
1143 bool is_expired = obj_has_expired(dpp, oc.cct, oc.effective_mtime, expiration,
1144 exp_time);
1145
1146 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1147 << is_expired << " "
1148 << oc.wq->thr_name() << dendl;
1149
1150 return is_expired &&
1151 pass_object_lock_check(oc.driver, oc.obj.get(), dpp);
1152 }
1153
1154 int process(lc_op_ctx& oc) {
1155 auto& o = oc.o;
1156 int r = remove_expired_obj(oc.dpp, oc, true,
1157 rgw::notify::ObjectExpirationNoncurrent);
1158 if (r < 0) {
1159 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) "
1160 << oc.bucket << ":" << o.key
1161 << " " << cpp_strerror(r)
1162 << " " << oc.wq->thr_name() << dendl;
1163 return r;
1164 }
1165 if (perfcounter) {
1166 perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
1167 }
1168 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1169 << " (non-current expiration) "
1170 << oc.wq->thr_name() << dendl;
1171 return 0;
1172 }
1173 };
1174
1175 class LCOpAction_DMExpiration : public LCOpAction {
1176 public:
1177 LCOpAction_DMExpiration(op_env& env) {}
1178
1179 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1180 auto& o = oc.o;
1181 if (!o.is_delete_marker()) {
1182 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1183 << ": not a delete marker, skipping "
1184 << oc.wq->thr_name() << dendl;
1185 return false;
1186 }
1187 if (oc.next_has_same_name(o.key.name)) {
1188 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1189 << ": next is same object, skipping "
1190 << oc.wq->thr_name() << dendl;
1191 return false;
1192 }
1193
1194 *exp_time = real_clock::now();
1195
1196 return true;
1197 }
1198
1199 int process(lc_op_ctx& oc) {
1200 auto& o = oc.o;
1201 int r = remove_expired_obj(oc.dpp, oc, true,
1202 rgw::notify::ObjectExpirationDeleteMarker);
1203 if (r < 0) {
1204 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
1205 << oc.bucket << ":" << o.key
1206 << " " << cpp_strerror(r)
1207 << " " << oc.wq->thr_name()
1208 << dendl;
1209 return r;
1210 }
1211 if (perfcounter) {
1212 perfcounter->inc(l_rgw_lc_expire_dm, 1);
1213 }
1214 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1215 << " (delete marker expiration) "
1216 << oc.wq->thr_name() << dendl;
1217 return 0;
1218 }
1219 };
1220
1221 class LCOpAction_Transition : public LCOpAction {
1222 const transition_action& transition;
1223 bool need_to_process{false};
1224
1225 protected:
1226 virtual bool check_current_state(bool is_current) = 0;
1227 virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
1228 public:
1229 LCOpAction_Transition(const transition_action& _transition)
1230 : transition(_transition) {}
1231
1232 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1233 auto& o = oc.o;
1234
1235 if (o.is_delete_marker()) {
1236 return false;
1237 }
1238
1239 if (!check_current_state(o.is_current())) {
1240 return false;
1241 }
1242
1243 auto mtime = get_effective_mtime(oc);
1244 bool is_expired;
1245 if (transition.days < 0) {
1246 if (transition.date == boost::none) {
1247 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1248 << ": no transition day/date set in rule, skipping "
1249 << oc.wq->thr_name() << dendl;
1250 return false;
1251 }
1252 is_expired = ceph_clock_now() >=
1253 ceph::real_clock::to_time_t(*transition.date);
1254 *exp_time = *transition.date;
1255 } else {
1256 is_expired = obj_has_expired(dpp, oc.cct, mtime, transition.days, exp_time);
1257 }
1258
1259 ldpp_dout(oc.dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1260 << is_expired << " "
1261 << oc.wq->thr_name() << dendl;
1262
1263 need_to_process =
1264 (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
1265 transition.storage_class);
1266
1267 return is_expired;
1268 }
1269
1270 bool should_process() override {
1271 return need_to_process;
1272 }
1273
1274 int delete_tier_obj(lc_op_ctx& oc) {
1275 int ret = 0;
1276
1277 /* If bucket is versioned, create delete_marker for current version
1278 */
1279 if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
1280 ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
1281 ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
1282 } else {
1283 ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
1284 ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
1285 }
1286 return ret;
1287 }
1288
1289 int transition_obj_to_cloud(lc_op_ctx& oc) {
1290 /* If CurrentVersion object, remove it & create delete marker */
1291 bool delete_object = (!oc.tier->retain_head_object() ||
1292 (oc.o.is_current() && oc.bucket->versioned()));
1293
1294 int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
1295 oc.env.worker->get_cloud_targets(), oc.cct,
1296 !delete_object, oc.dpp, null_yield);
1297 if (ret < 0) {
1298 return ret;
1299 }
1300
1301 if (delete_object) {
1302 ret = delete_tier_obj(oc);
1303 if (ret < 0) {
1304 ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
1305 return ret;
1306 }
1307 }
1308
1309 return 0;
1310 }
1311
1312 int process(lc_op_ctx& oc) {
1313 auto& o = oc.o;
1314 int r;
1315
1316 if (oc.o.meta.category == RGWObjCategory::CloudTiered) {
1317 /* Skip objects which are already cloud tiered. */
1318 ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl;
1319 return 0;
1320 }
1321
1322 std::string tier_type = "";
1323 rgw::sal::ZoneGroup& zonegroup = oc.driver->get_zone()->get_zonegroup();
1324
1325 rgw_placement_rule target_placement;
1326 target_placement.inherit_from(oc.bucket->get_placement_rule());
1327 target_placement.storage_class = transition.storage_class;
1328
1329 r = zonegroup.get_placement_tier(target_placement, &oc.tier);
1330
1331 if (!r && oc.tier->get_tier_type() == "cloud-s3") {
1332 ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
1333 if (!oc.o.is_current() &&
1334 !pass_object_lock_check(oc.driver, oc.obj.get(), oc.dpp)) {
1335 /* Skip objects which has object lock enabled. */
1336 ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
1337 return 0;
1338 }
1339
1340 r = transition_obj_to_cloud(oc);
1341 if (r < 0) {
1342 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")"
1343 << dendl;
1344 return r;
1345 }
1346 } else {
1347 if (!oc.driver->valid_placement(target_placement)) {
1348 ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
1349 << target_placement
1350 << " bucket="<< oc.bucket
1351 << " rule_id=" << oc.op.id
1352 << " " << oc.wq->thr_name() << dendl;
1353 return -EINVAL;
1354 }
1355
1356 int r = oc.obj->transition(oc.bucket, target_placement, o.meta.mtime,
1357 o.versioned_epoch, oc.dpp, null_yield);
1358 if (r < 0) {
1359 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
1360 << oc.bucket << ":" << o.key
1361 << " -> " << transition.storage_class
1362 << " " << cpp_strerror(r)
1363 << " " << oc.wq->thr_name() << dendl;
1364 return r;
1365 }
1366 }
1367 ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket
1368 << ":" << o.key << " -> "
1369 << transition.storage_class
1370 << " " << oc.wq->thr_name() << dendl;
1371 return 0;
1372 }
1373 };
1374
1375 class LCOpAction_CurrentTransition : public LCOpAction_Transition {
1376 protected:
1377 bool check_current_state(bool is_current) override {
1378 return is_current;
1379 }
1380
1381 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1382 return oc.o.meta.mtime;
1383 }
1384 public:
1385 LCOpAction_CurrentTransition(const transition_action& _transition)
1386 : LCOpAction_Transition(_transition) {}
1387 int process(lc_op_ctx& oc) {
1388 int r = LCOpAction_Transition::process(oc);
1389 if (r == 0) {
1390 if (perfcounter) {
1391 perfcounter->inc(l_rgw_lc_transition_current, 1);
1392 }
1393 }
1394 return r;
1395 }
1396 };
1397
1398 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
1399 protected:
1400 bool check_current_state(bool is_current) override {
1401 return !is_current;
1402 }
1403
1404 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1405 return oc.effective_mtime;
1406 }
1407 public:
1408 LCOpAction_NonCurrentTransition(op_env& env,
1409 const transition_action& _transition)
1410 : LCOpAction_Transition(_transition)
1411 {}
1412 int process(lc_op_ctx& oc) {
1413 int r = LCOpAction_Transition::process(oc);
1414 if (r == 0) {
1415 if (perfcounter) {
1416 perfcounter->inc(l_rgw_lc_transition_noncurrent, 1);
1417 }
1418 }
1419 return r;
1420 }
1421 };
1422
1423 void LCOpRule::build()
1424 {
1425 filters.emplace_back(new LCOpFilter_Tags);
1426
1427 auto& op = env.op;
1428
1429 if (op.expiration > 0 ||
1430 op.expiration_date != boost::none) {
1431 actions.emplace_back(new LCOpAction_CurrentExpiration(env));
1432 }
1433
1434 if (op.dm_expiration) {
1435 actions.emplace_back(new LCOpAction_DMExpiration(env));
1436 }
1437
1438 if (op.noncur_expiration > 0) {
1439 actions.emplace_back(new LCOpAction_NonCurrentExpiration(env));
1440 }
1441
1442 for (auto& iter : op.transitions) {
1443 actions.emplace_back(new LCOpAction_CurrentTransition(iter.second));
1444 }
1445
1446 for (auto& iter : op.noncur_transitions) {
1447 actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second));
1448 }
1449 }
1450
1451 void LCOpRule::update()
1452 {
1453 next_key_name = env.ol.next_key_name();
1454 effective_mtime = env.ol.get_prev_obj().meta.mtime;
1455 }
1456
1457 int LCOpRule::process(rgw_bucket_dir_entry& o,
1458 const DoutPrefixProvider *dpp,
1459 WorkQ* wq)
1460 {
1461 lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq);
1462 shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
1463 real_time exp;
1464
1465 for (auto& a : actions) {
1466 real_time action_exp;
1467
1468 if (a->check(ctx, &action_exp, dpp)) {
1469 if (action_exp > exp) {
1470 exp = action_exp;
1471 selected = &a;
1472 }
1473 }
1474 }
1475
1476 if (selected &&
1477 (*selected)->should_process()) {
1478
1479 /*
1480 * Calling filter checks after action checks because
1481 * all action checks (as they are implemented now) do
1482 * not access the objects themselves, but return result
1483 * from info from bucket index listing. The current tags filter
1484 * check does access the objects, so we avoid unnecessary rados calls
1485 * having filters check later in the process.
1486 */
1487
1488 bool cont = false;
1489 for (auto& f : filters) {
1490 if (f->check(dpp, ctx)) {
1491 cont = true;
1492 break;
1493 }
1494 }
1495
1496 if (!cont) {
1497 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1498 << ": no rule match, skipping "
1499 << wq->thr_name() << dendl;
1500 return 0;
1501 }
1502
1503 int r = (*selected)->process(ctx);
1504 if (r < 0) {
1505 ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
1506 << env.bucket << ":" << o.key
1507 << " " << cpp_strerror(r)
1508 << " " << wq->thr_name() << dendl;
1509 return r;
1510 }
1511 ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
1512 << o.key << " " << wq->thr_name() << dendl;
1513 }
1514
1515 return 0;
1516
1517 }
1518
1519 int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
1520 time_t stop_at, bool once)
1521 {
1522 RGWLifecycleConfiguration config(cct);
1523 std::unique_ptr<rgw::sal::Bucket> bucket;
1524 string no_ns, list_versions;
1525 vector<rgw_bucket_dir_entry> objs;
1526 vector<std::string> result;
1527 boost::split(result, shard_id, boost::is_any_of(":"));
1528 string bucket_tenant = result[0];
1529 string bucket_name = result[1];
1530 string bucket_marker = result[2];
1531
1532 ldpp_dout(this, 5) << "RGWLC::bucket_lc_process ENTER " << bucket_name << dendl;
1533 if (unlikely(cct->_conf->rgwlc_skip_bucket_step)) {
1534 return 0;
1535 }
1536
1537 int ret = driver->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield);
1538 if (ret < 0) {
1539 ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
1540 << " failed" << dendl;
1541 return ret;
1542 }
1543
1544 ret = bucket->load_bucket(this, null_yield);
1545 if (ret < 0) {
1546 ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name
1547 << " failed" << dendl;
1548 return ret;
1549 }
1550
1551 auto stack_guard = make_scope_guard(
1552 [&worker]
1553 {
1554 worker->workpool->drain();
1555 }
1556 );
1557
1558 if (bucket->get_marker() != bucket_marker) {
1559 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
1560 << bucket_tenant << ":" << bucket_name
1561 << " cur_marker=" << bucket->get_marker()
1562 << " orig_marker=" << bucket_marker << dendl;
1563 return -ENOENT;
1564 }
1565
1566 map<string, bufferlist>::iterator aiter
1567 = bucket->get_attrs().find(RGW_ATTR_LC);
1568 if (aiter == bucket->get_attrs().end()) {
1569 ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for "
1570 << bucket_name << " (terminates bucket_lc_process(...))"
1571 << dendl;
1572 return 0;
1573 }
1574
1575 bufferlist::const_iterator iter{&aiter->second};
1576 try {
1577 config.decode(iter);
1578 } catch (const buffer::error& e) {
1579 ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed"
1580 << dendl;
1581 return -1;
1582 }
1583
1584 /* fetch information for zone checks */
1585 rgw::sal::Zone* zone = driver->get_zone();
1586
1587 auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
1588 auto wt =
1589 boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
1590 auto& [op_rule, o] = wt;
1591
1592 ldpp_dout(wk->get_lc(), 20)
1593 << __func__ << "(): key=" << o.key << wq->thr_name()
1594 << dendl;
1595 int ret = op_rule.process(o, wk->dpp, wq);
1596 if (ret < 0) {
1597 ldpp_dout(wk->get_lc(), 20)
1598 << "ERROR: orule.process() returned ret=" << ret
1599 << "thread:" << wq->thr_name()
1600 << dendl;
1601 }
1602 };
1603 worker->workpool->setf(pf);
1604
1605 multimap<string, lc_op>& prefix_map = config.get_prefix_map();
1606 ldpp_dout(this, 10) << __func__ << "() prefix_map size="
1607 << prefix_map.size()
1608 << dendl;
1609
1610 rgw_obj_key pre_marker;
1611 rgw_obj_key next_marker;
1612 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
1613 ++prefix_iter) {
1614
1615 if (worker_should_stop(stop_at, once)) {
1616 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
1617 << worker->ix
1618 << dendl;
1619 return 0;
1620 }
1621
1622 auto& op = prefix_iter->second;
1623 if (!is_valid_op(op)) {
1624 continue;
1625 }
1626 ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
1627 << dendl;
1628 if (prefix_iter != prefix_map.begin() &&
1629 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
1630 prev(prefix_iter)->first) == 0)) {
1631 next_marker = pre_marker;
1632 } else {
1633 pre_marker = next_marker;
1634 }
1635
1636 LCObjsLister ol(driver, bucket.get());
1637 ol.set_prefix(prefix_iter->first);
1638
1639 if (! zone_check(op, zone)) {
1640 ldpp_dout(this, 7) << "LC rule not executable in " << zone->get_tier_type()
1641 << " zone, skipping" << dendl;
1642 continue;
1643 }
1644
1645 ret = ol.init(this);
1646 if (ret < 0) {
1647 if (ret == (-ENOENT))
1648 return 0;
1649 ldpp_dout(this, 0) << "ERROR: driver->list_objects():" << dendl;
1650 return ret;
1651 }
1652
1653 op_env oenv(op, driver, worker, bucket.get(), ol);
1654 LCOpRule orule(oenv);
1655 orule.build(); // why can't ctor do it?
1656 rgw_bucket_dir_entry* o{nullptr};
1657 for (auto offset = 0; ol.get_obj(this, &o /* , fetch_barrier */); ++offset, ol.next()) {
1658 orule.update();
1659 std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
1660 worker->workpool->enqueue(WorkItem{t1});
1661 if ((offset % 100) == 0) {
1662 if (worker_should_stop(stop_at, once)) {
1663 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
1664 << worker->ix
1665 << dendl;
1666 return 0;
1667 }
1668 }
1669 }
1670 worker->workpool->drain();
1671 }
1672
1673 ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
1674 return ret;
1675 }
1676
1677 class SimpleBackoff
1678 {
1679 const int max_retries;
1680 std::chrono::milliseconds sleep_ms;
1681 int retries{0};
1682 public:
1683 SimpleBackoff(int max_retries, std::chrono::milliseconds initial_sleep_ms)
1684 : max_retries(max_retries), sleep_ms(initial_sleep_ms)
1685 {}
1686 SimpleBackoff(const SimpleBackoff&) = delete;
1687 SimpleBackoff& operator=(const SimpleBackoff&) = delete;
1688
1689 int get_retries() const {
1690 return retries;
1691 }
1692
1693 void reset() {
1694 retries = 0;
1695 }
1696
1697 bool wait_backoff(const fu2::unique_function<bool(void) const>& barrier) {
1698 reset();
1699 while (retries < max_retries) {
1700 auto r = barrier();
1701 if (r) {
1702 return r;
1703 }
1704 std::this_thread::sleep_for(sleep_ms * 2 * retries++);
1705 }
1706 return false;
1707 }
1708 };
1709
1710 int RGWLC::bucket_lc_post(int index, int max_lock_sec,
1711 rgw::sal::Lifecycle::LCEntry& entry, int& result,
1712 LCWorker* worker)
1713 {
1714 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
1715
1716 std::unique_ptr<rgw::sal::LCSerializer> lock =
1717 sal_lc->get_serializer(lc_index_lock_name, obj_names[index], cookie);
1718
1719 ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry
1720 << " index: " << index << " worker ix: " << worker->ix
1721 << dendl;
1722
1723 do {
1724 int ret = lock->try_lock(this, lock_duration, null_yield);
1725 if (ret == -EBUSY || ret == -EEXIST) {
1726 /* already locked by another lc processor */
1727 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1728 << obj_names[index] << ", sleep 5, try again " << dendl;
1729 sleep(5);
1730 continue;
1731 }
1732
1733 if (ret < 0)
1734 return 0;
1735 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
1736 << dendl;
1737
1738 if (result == -ENOENT) {
1739 /* XXXX are we SURE the only way result could == ENOENT is when
1740 * there is no such bucket? It is currently the value returned
1741 * from bucket_lc_process(...) */
1742 ret = sal_lc->rm_entry(obj_names[index], entry);
1743 if (ret < 0) {
1744 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1745 << obj_names[index] << dendl;
1746 }
1747 goto clean;
1748 } else if (result < 0) {
1749 entry.set_status(lc_failed);
1750 } else {
1751 entry.set_status(lc_complete);
1752 }
1753
1754 ret = sal_lc->set_entry(obj_names[index], entry);
1755 if (ret < 0) {
1756 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1757 << obj_names[index] << dendl;
1758 }
1759 clean:
1760 lock->unlock();
1761 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
1762 << obj_names[index] << dendl;
1763 return 0;
1764 } while (true);
1765 } /* RGWLC::bucket_lc_post */
1766
1767 int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
1768 vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>>& progress_map,
1769 int& index)
1770 {
1771 progress_map.clear();
1772 for(; index < max_objs; index++, marker="") {
1773 vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>> entries;
1774 int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries);
1775 if (ret < 0) {
1776 if (ret == -ENOENT) {
1777 ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
1778 << obj_names[index] << dendl;
1779 continue;
1780 } else {
1781 return ret;
1782 }
1783 }
1784 progress_map.reserve(progress_map.size() + entries.size());
1785 std::move(begin(entries), end(entries), std::back_inserter(progress_map));
1786 //progress_map.insert(progress_map.end(), entries.begin(), entries.end());
1787
1788 /* update index, marker tuple */
1789 if (progress_map.size() > 0)
1790 marker = progress_map.back()->get_bucket();
1791
1792 if (progress_map.size() >= max_entries)
1793 break;
1794 }
1795 return 0;
1796 }
1797
1798 static inline vector<int> random_sequence(uint32_t n)
1799 {
1800 vector<int> v(n, 0);
1801 std::generate(v.begin(), v.end(),
1802 [ix = 0]() mutable {
1803 return ix++;
1804 });
1805 std::random_device rd;
1806 std::default_random_engine rng{rd()};
1807 std::shuffle(v.begin(), v.end(), rng);
1808 return v;
1809 }
1810
1811 static inline int get_lc_index(CephContext *cct,
1812 const std::string& shard_id)
1813 {
1814 int max_objs =
1815 (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
1816 cct->_conf->rgw_lc_max_objs);
1817 /* n.b. review hash algo */
1818 int index = ceph_str_hash_linux(shard_id.c_str(),
1819 shard_id.size()) % HASH_PRIME % max_objs;
1820 return index;
1821 }
1822
1823 static inline void get_lc_oid(CephContext *cct,
1824 const std::string& shard_id, string *oid)
1825 {
1826 /* n.b. review hash algo */
1827 int index = get_lc_index(cct, shard_id);
1828 *oid = lc_oid_prefix;
1829 char buf[32];
1830 snprintf(buf, 32, ".%d", index);
1831 oid->append(buf);
1832 return;
1833 }
1834
1835 static std::string get_bucket_lc_key(const rgw_bucket& bucket){
1836 return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
1837 }
1838
1839 int RGWLC::process(LCWorker* worker,
1840 const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
1841 bool once = false)
1842 {
1843 int ret = 0;
1844 int max_secs = cct->_conf->rgw_lc_lock_max_time;
1845
1846 if (optional_bucket) {
1847 /* if a bucket is provided, this is a single-bucket run, and
1848 * can be processed without traversing any state entries (we
1849 * do need the entry {pro,epi}logue which update the state entry
1850 * for this bucket) */
1851 auto bucket_lc_key = get_bucket_lc_key(optional_bucket->get_key());
1852 auto index = get_lc_index(driver->ctx(), bucket_lc_key);
1853 ret = process_bucket(index, max_secs, worker, bucket_lc_key, once);
1854 return ret;
1855 } else {
1856 /* generate an index-shard sequence unrelated to any other
1857 * that might be running in parallel */
1858 std::string all_buckets{""};
1859 vector<int> shard_seq = random_sequence(max_objs);
1860 for (auto index : shard_seq) {
1861 ret = process(index, max_secs, worker, once);
1862 if (ret < 0)
1863 return ret;
1864 }
1865 }
1866
1867 return 0;
1868 }
1869
1870 bool RGWLC::expired_session(time_t started)
1871 {
1872 if (! cct->_conf->rgwlc_auto_session_clear) {
1873 return false;
1874 }
1875
1876 time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
1877 ? cct->_conf->rgw_lc_debug_interval
1878 : 24*60*60;
1879
1880 auto now = time(nullptr);
1881
1882 ldpp_dout(this, 16) << "RGWLC::expired_session"
1883 << " started: " << started
1884 << " interval: " << interval << "(*2==" << 2*interval << ")"
1885 << " now: " << now
1886 << dendl;
1887
1888 return (started + 2*interval < now);
1889 }
1890
1891 time_t RGWLC::thread_stop_at()
1892 {
1893 uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
1894 ? cct->_conf->rgw_lc_debug_interval
1895 : 24*60*60;
1896
1897 return time(nullptr) + interval;
1898 }
1899
1900 int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker,
1901 const std::string& bucket_entry_marker,
1902 bool once = false)
1903 {
1904 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
1905 << "index: " << index << " worker ix: " << worker->ix
1906 << dendl;
1907
1908 int ret = 0;
1909 std::unique_ptr<rgw::sal::LCSerializer> serializer =
1910 sal_lc->get_serializer(lc_index_lock_name, obj_names[index],
1911 worker->thr_name());
1912 std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry;
1913 if (max_lock_secs <= 0) {
1914 return -EAGAIN;
1915 }
1916
1917 utime_t time(max_lock_secs, 0);
1918 ret = serializer->try_lock(this, time, null_yield);
1919 if (ret == -EBUSY || ret == -EEXIST) {
1920 /* already locked by another lc processor */
1921 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
1922 << obj_names[index] << dendl;
1923 return -EBUSY;
1924 }
1925 if (ret < 0)
1926 return 0;
1927
1928 std::unique_lock<rgw::sal::LCSerializer> lock(
1929 *(serializer.get()), std::adopt_lock);
1930
1931 ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, &entry);
1932 if (ret >= 0) {
1933 if (entry->get_status() == lc_processing) {
1934 if (expired_session(entry->get_start_time())) {
1935 ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
1936 << " index: " << index << " worker ix: " << worker->ix
1937 << " (clearing)"
1938 << dendl;
1939 } else {
1940 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
1941 << entry
1942 << " index: " << index
1943 << " worker ix: " << worker->ix
1944 << dendl;
1945 return ret;
1946 }
1947 }
1948 }
1949
1950 /* do nothing if no bucket */
1951 if (entry->get_bucket().empty()) {
1952 return ret;
1953 }
1954
1955 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
1956 << " index: " << index << " worker ix: " << worker->ix
1957 << dendl;
1958
1959 entry->set_status(lc_processing);
1960 ret = sal_lc->set_entry(obj_names[index], *entry);
1961 if (ret < 0) {
1962 ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
1963 << obj_names[index] << entry->get_bucket() << entry->get_status()
1964 << dendl;
1965 return ret;
1966 }
1967
1968 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
1969 << " index: " << index << " worker ix: " << worker->ix
1970 << dendl;
1971
1972 lock.unlock();
1973 ret = bucket_lc_process(entry->get_bucket(), worker, thread_stop_at(), once);
1974 bucket_lc_post(index, max_lock_secs, *entry, ret, worker);
1975
1976 return ret;
1977 } /* RGWLC::process_bucket */
1978
1979 static inline bool allow_shard_rollover(CephContext* cct, time_t now, time_t shard_rollover_date)
1980 {
1981 /* return true iff:
1982 * - non-debug scheduling is in effect, and
1983 * - the current shard has not rolled over in the last 24 hours
1984 */
1985 if (((shard_rollover_date < now) &&
1986 (now - shard_rollover_date > 24*60*60)) ||
1987 (! shard_rollover_date /* no rollover date stored */) ||
1988 (cct->_conf->rgw_lc_debug_interval > 0 /* defaults to -1 == disabled */)) {
1989 return true;
1990 }
1991 return false;
1992 } /* allow_shard_rollover */
1993
1994 static inline bool already_run_today(CephContext* cct, time_t start_date)
1995 {
1996 struct tm bdt;
1997 time_t begin_of_day;
1998 utime_t now = ceph_clock_now();
1999 localtime_r(&start_date, &bdt);
2000
2001 if (cct->_conf->rgw_lc_debug_interval > 0) {
2002 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
2003 return true;
2004 else
2005 return false;
2006 }
2007
2008 bdt.tm_hour = 0;
2009 bdt.tm_min = 0;
2010 bdt.tm_sec = 0;
2011 begin_of_day = mktime(&bdt);
2012 if (now - begin_of_day < 24*60*60)
2013 return true;
2014 else
2015 return false;
2016 } /* already_run_today */
2017
2018 inline int RGWLC::advance_head(const std::string& lc_shard,
2019 rgw::sal::Lifecycle::LCHead& head,
2020 rgw::sal::Lifecycle::LCEntry& entry,
2021 time_t start_date)
2022 {
2023 int ret{0};
2024 std::unique_ptr<rgw::sal::Lifecycle::LCEntry> next_entry;
2025
2026 ret = sal_lc->get_next_entry(lc_shard, entry.get_bucket(), &next_entry);
2027 if (ret < 0) {
2028 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
2029 << lc_shard << dendl;
2030 goto exit;
2031 }
2032
2033 /* save the next position */
2034 head.set_marker(next_entry->get_bucket());
2035 head.set_start_date(start_date);
2036
2037 ret = sal_lc->put_head(lc_shard, head);
2038 if (ret < 0) {
2039 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2040 << lc_shard
2041 << dendl;
2042 goto exit;
2043 }
2044 exit:
2045 return ret;
2046 } /* advance head */
2047
2048 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
2049 bool once = false)
2050 {
2051 int ret{0};
2052 const auto& lc_shard = obj_names[index];
2053
2054 std::unique_ptr<rgw::sal::Lifecycle::LCHead> head;
2055 std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry; //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
2056
2057 ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
2058 << "index: " << index << " worker ix: " << worker->ix
2059 << dendl;
2060
2061 std::unique_ptr<rgw::sal::LCSerializer> lock =
2062 sal_lc->get_serializer(lc_index_lock_name, lc_shard, worker->thr_name());
2063
2064 utime_t lock_for_s(max_lock_secs, 0);
2065 const auto& lock_lambda = [&]() {
2066 ret = lock->try_lock(this, lock_for_s, null_yield);
2067 if (ret == 0) {
2068 return true;
2069 }
2070 if (ret == -EBUSY || ret == -EEXIST) {
2071 /* already locked by another lc processor */
2072 return false;
2073 }
2074 return false;
2075 };
2076
2077 SimpleBackoff shard_lock(5 /* max retries */, 50ms);
2078 if (! shard_lock.wait_backoff(lock_lambda)) {
2079 ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
2080 << lc_shard << " after " << shard_lock.get_retries()
2081 << dendl;
2082 return 0;
2083 }
2084
2085 do {
2086 utime_t now = ceph_clock_now();
2087
2088 /* preamble: find an inital bucket/marker */
2089 ret = sal_lc->get_head(lc_shard, &head);
2090 if (ret < 0) {
2091 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
2092 << lc_shard << ", ret=" << ret << dendl;
2093 goto exit;
2094 }
2095
2096 /* if there is nothing at head, try to reinitialize head.marker with the
2097 * first entry in the queue */
2098 if (head->get_marker().empty() &&
2099 allow_shard_rollover(cct, now, head->get_shard_rollover_date()) /* prevent multiple passes by diff.
2100 * rgws,in same cycle */) {
2101
2102 ldpp_dout(this, 5) << "RGWLC::process() process shard rollover lc_shard=" << lc_shard
2103 << " head.marker=" << head->get_marker()
2104 << " head.shard_rollover_date=" << head->get_shard_rollover_date()
2105 << dendl;
2106
2107 vector<std::unique_ptr<rgw::sal::Lifecycle::LCEntry>> entries;
2108 int ret = sal_lc->list_entries(lc_shard, head->get_marker(), 1, entries);
2109 if (ret < 0) {
2110 ldpp_dout(this, 0) << "RGWLC::process() sal_lc->list_entries(lc_shard, head.marker, 1, "
2111 << "entries) returned error ret==" << ret << dendl;
2112 goto exit;
2113 }
2114 if (entries.size() > 0) {
2115 entry = std::move(entries.front());
2116 head->set_marker(entry->get_bucket());
2117 head->set_start_date(now);
2118 head->set_shard_rollover_date(0);
2119 }
2120 } else {
2121 ldpp_dout(this, 0) << "RGWLC::process() head.marker !empty() at START for shard=="
2122 << lc_shard << " head last stored at "
2123 << rgw_to_asctime(utime_t(time_t(head->get_start_date()), 0))
2124 << dendl;
2125
2126 /* fetches the entry pointed to by head.bucket */
2127 ret = sal_lc->get_entry(lc_shard, head->get_marker(), &entry);
2128 if (ret < 0) {
2129 ldpp_dout(this, 0) << "RGWLC::process() sal_lc->get_entry(lc_shard, head.marker, entry) "
2130 << "returned error ret==" << ret << dendl;
2131 goto exit;
2132 }
2133 }
2134
2135 if (entry && !entry->get_bucket().empty()) {
2136 if (entry->get_status() == lc_processing) {
2137 if (expired_session(entry->get_start_time())) {
2138 ldpp_dout(this, 5)
2139 << "RGWLC::process(): STALE lc session found for: " << entry
2140 << " index: " << index << " worker ix: " << worker->ix
2141 << " (clearing)" << dendl;
2142 } else {
2143 ldpp_dout(this, 5)
2144 << "RGWLC::process(): ACTIVE entry: " << entry
2145 << " index: " << index << " worker ix: " << worker->ix << dendl;
2146 /* skip to next entry */
2147 if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
2148 goto exit;
2149 }
2150 /* done with this shard */
2151 if (head->get_marker().empty()) {
2152 ldpp_dout(this, 5) <<
2153 "RGWLC::process() cycle finished lc_shard="
2154 << lc_shard
2155 << dendl;
2156 head->set_shard_rollover_date(ceph_clock_now());
2157 ret = sal_lc->put_head(lc_shard, *head.get());
2158 if (ret < 0) {
2159 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2160 << lc_shard
2161 << dendl;
2162 }
2163 goto exit;
2164 }
2165 continue;
2166 }
2167 } else {
2168 if ((entry->get_status() == lc_complete) &&
2169 already_run_today(cct, entry->get_start_time())) {
2170 /* skip to next entry */
2171 if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
2172 goto exit;
2173 }
2174 ldpp_dout(this, 5) << "RGWLC::process() worker ix; " << worker->ix
2175 << " SKIP processing for already-processed bucket " << entry->get_bucket()
2176 << dendl;
2177 /* done with this shard */
2178 if (head->get_marker().empty()) {
2179 ldpp_dout(this, 5) <<
2180 "RGWLC::process() cycle finished lc_shard="
2181 << lc_shard
2182 << dendl;
2183 head->set_shard_rollover_date(ceph_clock_now());
2184 ret = sal_lc->put_head(lc_shard, *head.get());
2185 if (ret < 0) {
2186 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2187 << lc_shard
2188 << dendl;
2189 }
2190 goto exit;
2191 }
2192 continue;
2193 }
2194 }
2195 } else {
2196 ldpp_dout(this, 5) << "RGWLC::process() entry.bucket.empty() == true at START 1"
2197 << " (this is possible mainly before any lc policy has been stored"
2198 << " or after removal of an lc_shard object)"
2199 << dendl;
2200 goto exit;
2201 }
2202
2203 /* When there are no more entries to process, entry will be
2204 * equivalent to an empty marker and so the following resets the
2205 * processing for the shard automatically when processing is
2206 * finished for the shard */
2207 ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
2208 << " index: " << index << " worker ix: " << worker->ix
2209 << dendl;
2210
2211 entry->set_status(lc_processing);
2212 entry->set_start_time(now);
2213
2214 ret = sal_lc->set_entry(lc_shard, *entry);
2215 if (ret < 0) {
2216 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
2217 << lc_shard << entry->get_bucket() << entry->get_status() << dendl;
2218 goto exit;
2219 }
2220
2221 /* advance head for next waiter, then process */
2222 if (advance_head(lc_shard, *head.get(), *entry.get(), now) < 0) {
2223 goto exit;
2224 }
2225
2226 ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
2227 << " index: " << index << " worker ix: " << worker->ix
2228 << dendl;
2229
2230 /* drop lock so other instances can make progress while this
2231 * bucket is being processed */
2232 lock->unlock();
2233 ret = bucket_lc_process(entry->get_bucket(), worker, thread_stop_at(), once);
2234
2235 /* postamble */
2236 //bucket_lc_post(index, max_lock_secs, entry, ret, worker);
2237 if (! shard_lock.wait_backoff(lock_lambda)) {
2238 ldpp_dout(this, 0) << "RGWLC::process(): failed to aquire lock on "
2239 << lc_shard << " after " << shard_lock.get_retries()
2240 << dendl;
2241 return 0;
2242 }
2243
2244 if (ret == -ENOENT) {
2245 /* XXXX are we SURE the only way result could == ENOENT is when
2246 * there is no such bucket? It is currently the value returned
2247 * from bucket_lc_process(...) */
2248 ret = sal_lc->rm_entry(lc_shard, *entry);
2249 if (ret < 0) {
2250 ldpp_dout(this, 0) << "RGWLC::process() failed to remove entry "
2251 << lc_shard << " (nonfatal)"
2252 << dendl;
2253 /* not fatal, could result from a race */
2254 }
2255 } else {
2256 if (ret < 0) {
2257 entry->set_status(lc_failed);
2258 } else {
2259 entry->set_status(lc_complete);
2260 }
2261 ret = sal_lc->set_entry(lc_shard, *entry);
2262 if (ret < 0) {
2263 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
2264 << lc_shard
2265 << dendl;
2266 /* fatal, locked */
2267 goto exit;
2268 }
2269 }
2270
2271 /* done with this shard */
2272 if (head->get_marker().empty()) {
2273 ldpp_dout(this, 5) <<
2274 "RGWLC::process() cycle finished lc_shard="
2275 << lc_shard
2276 << dendl;
2277 head->set_shard_rollover_date(ceph_clock_now());
2278 ret = sal_lc->put_head(lc_shard, *head.get());
2279 if (ret < 0) {
2280 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2281 << lc_shard
2282 << dendl;
2283 }
2284 goto exit;
2285 }
2286 } while(1 && !once && !going_down());
2287
2288 exit:
2289 lock->unlock();
2290 return 0;
2291 }
2292
2293 void RGWLC::start_processor()
2294 {
2295 auto maxw = cct->_conf->rgw_lc_max_worker;
2296 workers.reserve(maxw);
2297 for (int ix = 0; ix < maxw; ++ix) {
2298 auto worker =
2299 std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
2300 worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
2301 workers.emplace_back(std::move(worker));
2302 }
2303 }
2304
2305 void RGWLC::stop_processor()
2306 {
2307 down_flag = true;
2308 for (auto& worker : workers) {
2309 worker->stop();
2310 worker->join();
2311 }
2312 workers.clear();
2313 }
2314
2315 unsigned RGWLC::get_subsys() const
2316 {
2317 return dout_subsys;
2318 }
2319
2320 std::ostream& RGWLC::gen_prefix(std::ostream& out) const
2321 {
2322 return out << "lifecycle: ";
2323 }
2324
2325 void RGWLC::LCWorker::stop()
2326 {
2327 std::lock_guard l{lock};
2328 cond.notify_all();
2329 }
2330
2331 bool RGWLC::going_down()
2332 {
2333 return down_flag;
2334 }
2335
2336 bool RGWLC::LCWorker::should_work(utime_t& now)
2337 {
2338 int start_hour;
2339 int start_minute;
2340 int end_hour;
2341 int end_minute;
2342 string worktime = cct->_conf->rgw_lifecycle_work_time;
2343 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute,
2344 &end_hour, &end_minute);
2345 struct tm bdt;
2346 time_t tt = now.sec();
2347 localtime_r(&tt, &bdt);
2348
2349 if (cct->_conf->rgw_lc_debug_interval > 0) {
2350 /* We're debugging, so say we can run */
2351 return true;
2352 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
2353 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
2354 return true;
2355 } else {
2356 return false;
2357 }
2358
2359 }
2360
2361 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
2362 {
2363 int secs;
2364
2365 if (cct->_conf->rgw_lc_debug_interval > 0) {
2366 secs = start + cct->_conf->rgw_lc_debug_interval - now;
2367 if (secs < 0)
2368 secs = 0;
2369 return (secs);
2370 }
2371
2372 int start_hour;
2373 int start_minute;
2374 int end_hour;
2375 int end_minute;
2376 string worktime = cct->_conf->rgw_lifecycle_work_time;
2377 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour,
2378 &end_minute);
2379 struct tm bdt;
2380 time_t tt = now.sec();
2381 time_t nt;
2382 localtime_r(&tt, &bdt);
2383 bdt.tm_hour = start_hour;
2384 bdt.tm_min = start_minute;
2385 bdt.tm_sec = 0;
2386 nt = mktime(&bdt);
2387 secs = nt - tt;
2388
2389 return secs>0 ? secs : secs+24*60*60;
2390 }
2391
2392 RGWLC::LCWorker::~LCWorker()
2393 {
2394 delete workpool;
2395 } /* ~LCWorker */
2396
2397 void RGWLifecycleConfiguration::generate_test_instances(
2398 list<RGWLifecycleConfiguration*>& o)
2399 {
2400 o.push_back(new RGWLifecycleConfiguration);
2401 }
2402
2403 template<typename F>
2404 static int guard_lc_modify(const DoutPrefixProvider *dpp,
2405 rgw::sal::Driver* driver,
2406 rgw::sal::Lifecycle* sal_lc,
2407 const rgw_bucket& bucket, const string& cookie,
2408 const F& f) {
2409 CephContext *cct = driver->ctx();
2410
2411 auto bucket_lc_key = get_bucket_lc_key(bucket);
2412 string oid;
2413 get_lc_oid(cct, bucket_lc_key, &oid);
2414
2415 /* XXX it makes sense to take shard_id for a bucket_id? */
2416 std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry = sal_lc->get_entry();
2417 entry->set_bucket(bucket_lc_key);
2418 entry->set_status(lc_uninitial);
2419 int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
2420
2421 std::unique_ptr<rgw::sal::LCSerializer> lock =
2422 sal_lc->get_serializer(lc_index_lock_name, oid, cookie);
2423 utime_t time(max_lock_secs, 0);
2424
2425 int ret;
2426 uint16_t retries{0};
2427
2428 // due to reports of starvation trying to save lifecycle policy, try hard
2429 do {
2430 ret = lock->try_lock(dpp, time, null_yield);
2431 if (ret == -EBUSY || ret == -EEXIST) {
2432 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2433 << oid << ", retry in 100ms, ret=" << ret << dendl;
2434 std::this_thread::sleep_for(std::chrono::milliseconds(100));
2435 // the typical S3 client will time out in 60s
2436 if(retries++ < 500) {
2437 continue;
2438 }
2439 }
2440 if (ret < 0) {
2441 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2442 << oid << ", ret=" << ret << dendl;
2443 break;
2444 }
2445 ret = f(sal_lc, oid, *entry.get());
2446 if (ret < 0) {
2447 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to set entry on "
2448 << oid << ", ret=" << ret << dendl;
2449 }
2450 break;
2451 } while(true);
2452 lock->unlock();
2453 return ret;
2454 }
2455
2456 int RGWLC::set_bucket_config(rgw::sal::Bucket* bucket,
2457 const rgw::sal::Attrs& bucket_attrs,
2458 RGWLifecycleConfiguration *config)
2459 {
2460 int ret{0};
2461 rgw::sal::Attrs attrs = bucket_attrs;
2462 if (config) {
2463 /* if no RGWLifecycleconfiguration provided, it means
2464 * RGW_ATTR_LC is already valid and present */
2465 bufferlist lc_bl;
2466 config->encode(lc_bl);
2467 attrs[RGW_ATTR_LC] = std::move(lc_bl);
2468
2469 ret =
2470 bucket->merge_and_store_attrs(this, attrs, null_yield);
2471 if (ret < 0) {
2472 return ret;
2473 }
2474 }
2475
2476 rgw_bucket& b = bucket->get_key();
2477
2478
2479 ret = guard_lc_modify(this, driver, sal_lc.get(), b, cookie,
2480 [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
2481 rgw::sal::Lifecycle::LCEntry& entry) {
2482 return sal_lc->set_entry(oid, entry);
2483 });
2484
2485 return ret;
2486 }
2487
2488 int RGWLC::remove_bucket_config(rgw::sal::Bucket* bucket,
2489 const rgw::sal::Attrs& bucket_attrs,
2490 bool merge_attrs)
2491 {
2492 rgw::sal::Attrs attrs = bucket_attrs;
2493 rgw_bucket& b = bucket->get_key();
2494 int ret{0};
2495
2496 if (merge_attrs) {
2497 attrs.erase(RGW_ATTR_LC);
2498 ret = bucket->merge_and_store_attrs(this, attrs, null_yield);
2499
2500 if (ret < 0) {
2501 ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
2502 << b.name << " returned err=" << ret << dendl;
2503 return ret;
2504 }
2505 }
2506
2507 ret = guard_lc_modify(this, driver, sal_lc.get(), b, cookie,
2508 [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
2509 rgw::sal::Lifecycle::LCEntry& entry) {
2510 return sal_lc->rm_entry(oid, entry);
2511 });
2512
2513 return ret;
2514 } /* RGWLC::remove_bucket_config */
2515
2516 RGWLC::~RGWLC()
2517 {
2518 stop_processor();
2519 finalize();
2520 } /* ~RGWLC() */
2521
2522 namespace rgw::lc {
2523
2524 int fix_lc_shard_entry(const DoutPrefixProvider *dpp,
2525 rgw::sal::Driver* driver,
2526 rgw::sal::Lifecycle* sal_lc,
2527 rgw::sal::Bucket* bucket)
2528 {
2529 if (auto aiter = bucket->get_attrs().find(RGW_ATTR_LC);
2530 aiter == bucket->get_attrs().end()) {
2531 return 0; // No entry, nothing to fix
2532 }
2533
2534 auto bucket_lc_key = get_bucket_lc_key(bucket->get_key());
2535 std::string lc_oid;
2536 get_lc_oid(driver->ctx(), bucket_lc_key, &lc_oid);
2537
2538 std::unique_ptr<rgw::sal::Lifecycle::LCEntry> entry;
2539 // There are multiple cases we need to encounter here
2540 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
2541 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
2542 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
2543 // We are not dropping the old marker here as that would be caught by the next LC process update
2544 int ret = sal_lc->get_entry(lc_oid, bucket_lc_key, &entry);
2545 if (ret == 0) {
2546 ldpp_dout(dpp, 5) << "Entry already exists, nothing to do" << dendl;
2547 return ret; // entry is already existing correctly set to marker
2548 }
2549 ldpp_dout(dpp, 5) << "lc_get_entry errored ret code=" << ret << dendl;
2550 if (ret == -ENOENT) {
2551 ldpp_dout(dpp, 1) << "No entry for bucket=" << bucket
2552 << " creating " << dendl;
2553 // TODO: we have too many ppl making cookies like this!
2554 char cookie_buf[COOKIE_LEN + 1];
2555 gen_rand_alphanumeric(driver->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
2556 std::string cookie = cookie_buf;
2557
2558 ret = guard_lc_modify(dpp,
2559 driver, sal_lc, bucket->get_key(), cookie,
2560 [&lc_oid](rgw::sal::Lifecycle* slc,
2561 const string& oid,
2562 rgw::sal::Lifecycle::LCEntry& entry) {
2563 return slc->set_entry(lc_oid, entry);
2564 });
2565
2566 }
2567
2568 return ret;
2569 }
2570
2571 std::string s3_expiration_header(
2572 DoutPrefixProvider* dpp,
2573 const rgw_obj_key& obj_key,
2574 const RGWObjTags& obj_tagset,
2575 const ceph::real_time& mtime,
2576 const std::map<std::string, buffer::list>& bucket_attrs)
2577 {
2578 CephContext* cct = dpp->get_cct();
2579 RGWLifecycleConfiguration config(cct);
2580 std::string hdr{""};
2581
2582 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2583 if (aiter == bucket_attrs.end())
2584 return hdr;
2585
2586 bufferlist::const_iterator iter{&aiter->second};
2587 try {
2588 config.decode(iter);
2589 } catch (const buffer::error& e) {
2590 ldpp_dout(dpp, 0) << __func__
2591 << "() decode life cycle config failed"
2592 << dendl;
2593 return hdr;
2594 } /* catch */
2595
2596 /* dump tags at debug level 16 */
2597 RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags();
2598 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) {
2599 for (const auto& elt : obj_tag_map) {
2600 ldpp_dout(dpp, 16) << __func__
2601 << "() key=" << elt.first << " val=" << elt.second
2602 << dendl;
2603 }
2604 }
2605
2606 boost::optional<ceph::real_time> expiration_date;
2607 boost::optional<std::string> rule_id;
2608
2609 const auto& rule_map = config.get_rule_map();
2610 for (const auto& ri : rule_map) {
2611 const auto& rule = ri.second;
2612 auto& id = rule.get_id();
2613 auto& filter = rule.get_filter();
2614 auto& prefix = filter.has_prefix() ? filter.get_prefix(): rule.get_prefix();
2615 auto& expiration = rule.get_expiration();
2616 auto& noncur_expiration = rule.get_noncur_expiration();
2617
2618 ldpp_dout(dpp, 10) << "rule: " << ri.first
2619 << " prefix: " << prefix
2620 << " expiration: "
2621 << " date: " << expiration.get_date()
2622 << " days: " << expiration.get_days()
2623 << " noncur_expiration: "
2624 << " date: " << noncur_expiration.get_date()
2625 << " days: " << noncur_expiration.get_days()
2626 << dendl;
2627
2628 /* skip if rule !enabled
2629 * if rule has prefix, skip iff object !match prefix
2630 * if rule has tags, skip iff object !match tags
2631 * note if object is current or non-current, compare accordingly
2632 * if rule has days, construct date expression and save iff older
2633 * than last saved
2634 * if rule has date, convert date expression and save iff older
2635 * than last saved
2636 * if the date accum has a value, format it into hdr
2637 */
2638
2639 if (! rule.is_enabled())
2640 continue;
2641
2642 if(! prefix.empty()) {
2643 if (! boost::starts_with(obj_key.name, prefix))
2644 continue;
2645 }
2646
2647 if (filter.has_tags()) {
2648 bool tag_match = false;
2649 const RGWObjTags& rule_tagset = filter.get_tags();
2650 for (auto& tag : rule_tagset.get_tags()) {
2651 /* remember, S3 tags are {key,value} tuples */
2652 tag_match = true;
2653 auto obj_tag = obj_tag_map.find(tag.first);
2654 if (obj_tag == obj_tag_map.end() || obj_tag->second != tag.second) {
2655 ldpp_dout(dpp, 10) << "tag does not match obj_key=" << obj_key
2656 << " rule_id=" << id
2657 << " tag=" << tag
2658 << dendl;
2659 tag_match = false;
2660 break;
2661 }
2662 }
2663 if (! tag_match)
2664 continue;
2665 }
2666
2667 // compute a uniform expiration date
2668 boost::optional<ceph::real_time> rule_expiration_date;
2669 const LCExpiration& rule_expiration =
2670 (obj_key.instance.empty()) ? expiration : noncur_expiration;
2671
2672 if (rule_expiration.has_date()) {
2673 rule_expiration_date =
2674 boost::optional<ceph::real_time>(
2675 ceph::from_iso_8601(rule.get_expiration().get_date()));
2676 } else {
2677 if (rule_expiration.has_days()) {
2678 rule_expiration_date =
2679 boost::optional<ceph::real_time>(
2680 mtime + make_timespan(double(rule_expiration.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2681 }
2682 }
2683
2684 // update earliest expiration
2685 if (rule_expiration_date) {
2686 if ((! expiration_date) ||
2687 (*expiration_date > *rule_expiration_date)) {
2688 expiration_date =
2689 boost::optional<ceph::real_time>(rule_expiration_date);
2690 rule_id = boost::optional<std::string>(id);
2691 }
2692 }
2693 }
2694
2695 // cond format header
2696 if (expiration_date && rule_id) {
2697 // Fri, 23 Dec 2012 00:00:00 GMT
2698 char exp_buf[100];
2699 time_t exp = ceph::real_clock::to_time_t(*expiration_date);
2700 if (std::strftime(exp_buf, sizeof(exp_buf),
2701 "%a, %d %b %Y %T %Z", std::gmtime(&exp))) {
2702 hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf,
2703 *rule_id);
2704 } else {
2705 ldpp_dout(dpp, 0) << __func__ <<
2706 "() strftime of life cycle expiration header failed"
2707 << dendl;
2708 }
2709 }
2710
2711 return hdr;
2712
2713 } /* rgwlc_s3_expiration_header */
2714
2715 bool s3_multipart_abort_header(
2716 DoutPrefixProvider* dpp,
2717 const rgw_obj_key& obj_key,
2718 const ceph::real_time& mtime,
2719 const std::map<std::string, buffer::list>& bucket_attrs,
2720 ceph::real_time& abort_date,
2721 std::string& rule_id)
2722 {
2723 CephContext* cct = dpp->get_cct();
2724 RGWLifecycleConfiguration config(cct);
2725
2726 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2727 if (aiter == bucket_attrs.end())
2728 return false;
2729
2730 bufferlist::const_iterator iter{&aiter->second};
2731 try {
2732 config.decode(iter);
2733 } catch (const buffer::error& e) {
2734 ldpp_dout(dpp, 0) << __func__
2735 << "() decode life cycle config failed"
2736 << dendl;
2737 return false;
2738 } /* catch */
2739
2740 std::optional<ceph::real_time> abort_date_tmp;
2741 std::optional<std::string_view> rule_id_tmp;
2742 const auto& rule_map = config.get_rule_map();
2743 for (const auto& ri : rule_map) {
2744 const auto& rule = ri.second;
2745 const auto& id = rule.get_id();
2746 const auto& filter = rule.get_filter();
2747 const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix();
2748 const auto& mp_expiration = rule.get_mp_expiration();
2749 if (!rule.is_enabled()) {
2750 continue;
2751 }
2752 if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) {
2753 continue;
2754 }
2755
2756 std::optional<ceph::real_time> rule_abort_date;
2757 if (mp_expiration.has_days()) {
2758 rule_abort_date = std::optional<ceph::real_time>(
2759 mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2760 }
2761
2762 // update earliest abort date
2763 if (rule_abort_date) {
2764 if ((! abort_date_tmp) ||
2765 (*abort_date_tmp > *rule_abort_date)) {
2766 abort_date_tmp =
2767 std::optional<ceph::real_time>(rule_abort_date);
2768 rule_id_tmp = std::optional<std::string_view>(id);
2769 }
2770 }
2771 }
2772 if (abort_date_tmp && rule_id_tmp) {
2773 abort_date = *abort_date_tmp;
2774 rule_id = *rule_id_tmp;
2775 return true;
2776 } else {
2777 return false;
2778 }
2779 }
2780
2781 } /* namespace rgw::lc */
2782
2783 void lc_op::dump(Formatter *f) const
2784 {
2785 f->dump_bool("status", status);
2786 f->dump_bool("dm_expiration", dm_expiration);
2787
2788 f->dump_int("expiration", expiration);
2789 f->dump_int("noncur_expiration", noncur_expiration);
2790 f->dump_int("mp_expiration", mp_expiration);
2791 if (expiration_date) {
2792 utime_t ut(*expiration_date);
2793 f->dump_stream("expiration_date") << ut;
2794 }
2795 if (obj_tags) {
2796 f->dump_object("obj_tags", *obj_tags);
2797 }
2798 f->open_object_section("transitions");
2799 for(auto& [storage_class, transition] : transitions) {
2800 f->dump_object(storage_class, transition);
2801 }
2802 f->close_section();
2803
2804 f->open_object_section("noncur_transitions");
2805 for (auto& [storage_class, transition] : noncur_transitions) {
2806 f->dump_object(storage_class, transition);
2807 }
2808 f->close_section();
2809 }
2810
2811 void LCFilter::dump(Formatter *f) const
2812 {
2813 f->dump_string("prefix", prefix);
2814 f->dump_object("obj_tags", obj_tags);
2815 if (have_flag(LCFlagType::ArchiveZone)) {
2816 f->dump_string("archivezone", "");
2817 }
2818 }
2819
2820 void LCExpiration::dump(Formatter *f) const
2821 {
2822 f->dump_string("days", days);
2823 f->dump_string("date", date);
2824 }
2825
2826 void LCRule::dump(Formatter *f) const
2827 {
2828 f->dump_string("id", id);
2829 f->dump_string("prefix", prefix);
2830 f->dump_string("status", status);
2831 f->dump_object("expiration", expiration);
2832 f->dump_object("noncur_expiration", noncur_expiration);
2833 f->dump_object("mp_expiration", mp_expiration);
2834 f->dump_object("filter", filter);
2835 f->open_object_section("transitions");
2836 for (auto& [storage_class, transition] : transitions) {
2837 f->dump_object(storage_class, transition);
2838 }
2839 f->close_section();
2840
2841 f->open_object_section("noncur_transitions");
2842 for (auto& [storage_class, transition] : noncur_transitions) {
2843 f->dump_object(storage_class, transition);
2844 }
2845 f->close_section();
2846 f->dump_bool("dm_expiration", dm_expiration);
2847 }
2848
2849
2850 void RGWLifecycleConfiguration::dump(Formatter *f) const
2851 {
2852 f->open_object_section("prefix_map");
2853 for (auto& prefix : prefix_map) {
2854 f->dump_object(prefix.first.c_str(), prefix.second);
2855 }
2856 f->close_section();
2857
2858 f->open_array_section("rule_map");
2859 for (auto& rule : rule_map) {
2860 f->open_object_section("entry");
2861 f->dump_string("id", rule.first);
2862 f->open_object_section("rule");
2863 rule.second.dump(f);
2864 f->close_section();
2865 f->close_section();
2866 }
2867 f->close_section();
2868 }
2869