]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
31822d862c60577802cccf9576638c165197ae33
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string.h>
5 #include <iostream>
6 #include <map>
7 #include <algorithm>
8 #include <tuple>
9 #include <functional>
10
11 #include <boost/algorithm/string/split.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <boost/variant.hpp>
15
16 #include "include/scope_guard.h"
17 #include "common/Formatter.h"
18 #include "common/containers.h"
19 #include <common/errno.h>
20 #include "include/random.h"
21 #include "cls/lock/cls_lock_client.h"
22 #include "rgw_perf_counters.h"
23 #include "rgw_common.h"
24 #include "rgw_bucket.h"
25 #include "rgw_lc.h"
26 #include "rgw_zone.h"
27 #include "rgw_string.h"
28 #include "rgw_multi.h"
29 #include "rgw_sal_rados.h"
30 #include "rgw_rados.h"
31 #include "rgw_lc_tier.h"
32 #include "rgw_notify.h"
33
34 // this seems safe to use, at least for now--arguably, we should
35 // prefer header-only fmt, in general
36 #undef FMT_HEADER_ONLY
37 #define FMT_HEADER_ONLY 1
38 #include "fmt/format.h"
39
40 #include "services/svc_sys_obj.h"
41 #include "services/svc_zone.h"
42 #include "services/svc_tier_rados.h"
43
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_rgw
46
47 using namespace std;
48
49 const char* LC_STATUS[] = {
50 "UNINITIAL",
51 "PROCESSING",
52 "FAILED",
53 "COMPLETE"
54 };
55
56 using namespace librados;
57
58 bool LCRule::valid() const
59 {
60 if (id.length() > MAX_ID_LEN) {
61 return false;
62 }
63 else if(expiration.empty() && noncur_expiration.empty() &&
64 mp_expiration.empty() && !dm_expiration &&
65 transitions.empty() && noncur_transitions.empty()) {
66 return false;
67 }
68 else if (!expiration.valid() || !noncur_expiration.valid() ||
69 !mp_expiration.valid()) {
70 return false;
71 }
72 if (!transitions.empty()) {
73 bool using_days = expiration.has_days();
74 bool using_date = expiration.has_date();
75 for (const auto& elem : transitions) {
76 if (!elem.second.valid()) {
77 return false;
78 }
79 using_days = using_days || elem.second.has_days();
80 using_date = using_date || elem.second.has_date();
81 if (using_days && using_date) {
82 return false;
83 }
84 }
85 }
86 for (const auto& elem : noncur_transitions) {
87 if (!elem.second.valid()) {
88 return false;
89 }
90 }
91
92 return true;
93 }
94
95 void LCRule::init_simple_days_rule(std::string_view _id,
96 std::string_view _prefix, int num_days)
97 {
98 id = _id;
99 prefix = _prefix;
100 char buf[32];
101 snprintf(buf, sizeof(buf), "%d", num_days);
102 expiration.set_days(buf);
103 set_enabled(true);
104 }
105
106 void RGWLifecycleConfiguration::add_rule(const LCRule& rule)
107 {
108 auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
109 rule_map.insert(pair<string, LCRule>(id, rule));
110 }
111
112 bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
113 {
114 lc_op op(rule.get_id());
115 op.status = rule.is_enabled();
116 if (rule.get_expiration().has_days()) {
117 op.expiration = rule.get_expiration().get_days();
118 }
119 if (rule.get_expiration().has_date()) {
120 op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date());
121 }
122 if (rule.get_noncur_expiration().has_days()) {
123 op.noncur_expiration = rule.get_noncur_expiration().get_days();
124 }
125 if (rule.get_mp_expiration().has_days()) {
126 op.mp_expiration = rule.get_mp_expiration().get_days();
127 }
128 op.dm_expiration = rule.get_dm_expiration();
129 for (const auto &elem : rule.get_transitions()) {
130 transition_action action;
131 if (elem.second.has_days()) {
132 action.days = elem.second.get_days();
133 } else {
134 action.date = ceph::from_iso_8601(elem.second.get_date());
135 }
136 action.storage_class
137 = rgw_placement_rule::get_canonical_storage_class(elem.first);
138 op.transitions.emplace(elem.first, std::move(action));
139 }
140 for (const auto &elem : rule.get_noncur_transitions()) {
141 transition_action action;
142 action.days = elem.second.get_days();
143 action.date = ceph::from_iso_8601(elem.second.get_date());
144 action.storage_class = elem.first;
145 op.noncur_transitions.emplace(elem.first, std::move(action));
146 }
147 std::string prefix;
148 if (rule.get_filter().has_prefix()){
149 prefix = rule.get_filter().get_prefix();
150 } else {
151 prefix = rule.get_prefix();
152 }
153
154 if (rule.get_filter().has_tags()){
155 op.obj_tags = rule.get_filter().get_tags();
156 }
157 prefix_map.emplace(std::move(prefix), std::move(op));
158 return true;
159 }
160
161 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
162 {
163 if (!rule.valid()) {
164 return -EINVAL;
165 }
166 auto& id = rule.get_id();
167 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
168 return -EINVAL;
169 }
170 if (rule.get_filter().has_tags() && (rule.get_dm_expiration() ||
171 !rule.get_mp_expiration().empty())) {
172 return -ERR_INVALID_REQUEST;
173 }
174 rule_map.insert(pair<string, LCRule>(id, rule));
175
176 if (!_add_rule(rule)) {
177 return -ERR_INVALID_REQUEST;
178 }
179 return 0;
180 }
181
182 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first,
183 const lc_op& second) {
184 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
185 (second.expiration > 0 || second.expiration_date != boost::none)) {
186 return true;
187 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
188 return true;
189 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
190 return true;
191 } else if (!first.transitions.empty() && !second.transitions.empty()) {
192 for (auto &elem : first.transitions) {
193 if (second.transitions.find(elem.first) != second.transitions.end()) {
194 return true;
195 }
196 }
197 } else if (!first.noncur_transitions.empty() &&
198 !second.noncur_transitions.empty()) {
199 for (auto &elem : first.noncur_transitions) {
200 if (second.noncur_transitions.find(elem.first) !=
201 second.noncur_transitions.end()) {
202 return true;
203 }
204 }
205 }
206 return false;
207 }
208
209 /* Formerly, this method checked for duplicate rules using an invalid
210 * method (prefix uniqueness). */
211 bool RGWLifecycleConfiguration::valid()
212 {
213 return true;
214 }
215
216 void *RGWLC::LCWorker::entry() {
217 do {
218 std::unique_ptr<rgw::sal::Bucket> all_buckets; // empty restriction
219 utime_t start = ceph_clock_now();
220 if (should_work(start)) {
221 ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
222 int r = lc->process(this, all_buckets, false /* once */);
223 if (r < 0) {
224 ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
225 << r << dendl;
226 }
227 ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
228 cloud_targets.clear(); // clear cloud targets
229 }
230 if (lc->going_down())
231 break;
232
233 utime_t end = ceph_clock_now();
234 int secs = schedule_next_start_time(start, end);
235 utime_t next;
236 next.set_from_double(end + secs);
237
238 ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
239 << rgw_to_asctime(next) << dendl;
240
241 std::unique_lock l{lock};
242 cond.wait_for(l, std::chrono::seconds(secs));
243 } while (!lc->going_down());
244
245 return NULL;
246 }
247
248 void RGWLC::initialize(CephContext *_cct, rgw::sal::Store* _store) {
249 cct = _cct;
250 store = _store;
251 sal_lc = store->get_lifecycle();
252 max_objs = cct->_conf->rgw_lc_max_objs;
253 if (max_objs > HASH_PRIME)
254 max_objs = HASH_PRIME;
255
256 obj_names = new string[max_objs];
257
258 for (int i = 0; i < max_objs; i++) {
259 obj_names[i] = lc_oid_prefix;
260 char buf[32];
261 snprintf(buf, 32, ".%d", i);
262 obj_names[i].append(buf);
263 }
264
265 #define COOKIE_LEN 16
266 char cookie_buf[COOKIE_LEN + 1];
267 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
268 cookie = cookie_buf;
269 }
270
271 void RGWLC::finalize()
272 {
273 delete[] obj_names;
274 }
275
276 bool RGWLC::if_already_run_today(time_t start_date)
277 {
278 struct tm bdt;
279 time_t begin_of_day;
280 utime_t now = ceph_clock_now();
281 localtime_r(&start_date, &bdt);
282
283 if (cct->_conf->rgw_lc_debug_interval > 0) {
284 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
285 return true;
286 else
287 return false;
288 }
289
290 bdt.tm_hour = 0;
291 bdt.tm_min = 0;
292 bdt.tm_sec = 0;
293 begin_of_day = mktime(&bdt);
294 if (now - begin_of_day < 24*60*60)
295 return true;
296 else
297 return false;
298 }
299
300 static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
301 os << "<ent: bucket=";
302 os << ent.bucket;
303 os << "; start_time=";
304 os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
305 os << "; status=";
306 os << ent.status;
307 os << ">";
308 return os;
309 }
310
311 int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
312 {
313 vector<rgw::sal::Lifecycle::LCEntry> entries;
314 string marker;
315
316 ldpp_dout(this, 5) << "RGWLC::bucket_lc_prepare(): PREPARE "
317 << "index: " << index << " worker ix: " << worker->ix
318 << dendl;
319
320 #define MAX_LC_LIST_ENTRIES 100
321 do {
322 int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
323 if (ret < 0)
324 return ret;
325
326 for (auto& entry : entries) {
327 entry.start_time = ceph_clock_now();
328 entry.status = lc_uninitial; // lc_uninitial? really?
329 ret = sal_lc->set_entry(obj_names[index], entry);
330 if (ret < 0) {
331 ldpp_dout(this, 0)
332 << "RGWLC::bucket_lc_prepare() failed to set entry on "
333 << obj_names[index] << dendl;
334 return ret;
335 }
336 }
337
338 if (! entries.empty()) {
339 marker = std::move(entries.back().bucket);
340 }
341 } while (!entries.empty());
342
343 return 0;
344 }
345
346 static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days,
347 ceph::real_time *expire_time = nullptr)
348 {
349 double timediff, cmp;
350 utime_t base_time;
351 if (cct->_conf->rgw_lc_debug_interval <= 0) {
352 /* Normal case, run properly */
353 cmp = double(days)*24*60*60;
354 base_time = ceph_clock_now().round_to_day();
355 } else {
356 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
357 cmp = double(days)*cct->_conf->rgw_lc_debug_interval;
358 base_time = ceph_clock_now();
359 }
360 auto tt_mtime = ceph::real_clock::to_time_t(mtime);
361 timediff = base_time - tt_mtime;
362
363 if (expire_time) {
364 *expire_time = mtime + make_timespan(cmp);
365 }
366
367 ldpp_dout(dpp, 20) << __func__
368 << "(): mtime=" << mtime << " days=" << days
369 << " base_time=" << base_time << " timediff=" << timediff
370 << " cmp=" << cmp
371 << " is_expired=" << (timediff >= cmp)
372 << dendl;
373
374 return (timediff >= cmp);
375 }
376
377 static bool pass_object_lock_check(rgw::sal::Store* store, rgw::sal::Object* obj, RGWObjectCtx& ctx, const DoutPrefixProvider *dpp)
378 {
379 if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
380 return true;
381 }
382 std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op(&ctx);
383 int ret = read_op->prepare(null_yield, dpp);
384 if (ret < 0) {
385 if (ret == -ENOENT) {
386 return true;
387 } else {
388 return false;
389 }
390 } else {
391 auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION);
392 if (iter != obj->get_attrs().end()) {
393 RGWObjectRetention retention;
394 try {
395 decode(retention, iter->second);
396 } catch (buffer::error& err) {
397 ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectRetention"
398 << dendl;
399 return false;
400 }
401 if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
402 ceph_clock_now()) {
403 return false;
404 }
405 }
406 iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD);
407 if (iter != obj->get_attrs().end()) {
408 RGWObjectLegalHold obj_legal_hold;
409 try {
410 decode(obj_legal_hold, iter->second);
411 } catch (buffer::error& err) {
412 ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectLegalHold"
413 << dendl;
414 return false;
415 }
416 if (obj_legal_hold.is_enabled()) {
417 return false;
418 }
419 }
420 return true;
421 }
422 }
423
424 class LCObjsLister {
425 rgw::sal::Store* store;
426 rgw::sal::Bucket* bucket;
427 rgw::sal::Bucket::ListParams list_params;
428 rgw::sal::Bucket::ListResults list_results;
429 string prefix;
430 vector<rgw_bucket_dir_entry>::iterator obj_iter;
431 rgw_bucket_dir_entry pre_obj;
432 int64_t delay_ms;
433
434 public:
435 LCObjsLister(rgw::sal::Store* _store, rgw::sal::Bucket* _bucket) :
436 store(_store), bucket(_bucket) {
437 list_params.list_versions = bucket->versioned();
438 list_params.allow_unordered = true;
439 delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
440 }
441
442 void set_prefix(const string& p) {
443 prefix = p;
444 list_params.prefix = prefix;
445 }
446
447 int init(const DoutPrefixProvider *dpp) {
448 return fetch(dpp);
449 }
450
451 int fetch(const DoutPrefixProvider *dpp) {
452 int ret = bucket->list(dpp, list_params, 1000, list_results, null_yield);
453 if (ret < 0) {
454 return ret;
455 }
456
457 obj_iter = list_results.objs.begin();
458
459 return 0;
460 }
461
462 void delay() {
463 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
464 }
465
466 bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj,
467 std::function<void(void)> fetch_barrier
468 = []() { /* nada */}) {
469 if (obj_iter == list_results.objs.end()) {
470 if (!list_results.is_truncated) {
471 delay();
472 return false;
473 } else {
474 fetch_barrier();
475 list_params.marker = pre_obj.key;
476 int ret = fetch(dpp);
477 if (ret < 0) {
478 ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret
479 << dendl;
480 return false;
481 }
482 }
483 delay();
484 }
485 /* returning address of entry in objs */
486 *obj = &(*obj_iter);
487 return obj_iter != list_results.objs.end();
488 }
489
490 rgw_bucket_dir_entry get_prev_obj() {
491 return pre_obj;
492 }
493
494 void next() {
495 pre_obj = *obj_iter;
496 ++obj_iter;
497 }
498
499 boost::optional<std::string> next_key_name() {
500 if (obj_iter == list_results.objs.end() ||
501 (obj_iter + 1) == list_results.objs.end()) {
502 /* this should have been called after get_obj() was called, so this should
503 * only happen if is_truncated is false */
504 return boost::none;
505 }
506
507 return ((obj_iter + 1)->key.name);
508 }
509
510 }; /* LCObjsLister */
511
512 struct op_env {
513
514 using LCWorker = RGWLC::LCWorker;
515
516 lc_op op;
517 rgw::sal::Store* store;
518 LCWorker* worker;
519 rgw::sal::Bucket* bucket;
520 LCObjsLister& ol;
521
522 op_env(lc_op& _op, rgw::sal::Store* _store, LCWorker* _worker,
523 rgw::sal::Bucket* _bucket, LCObjsLister& _ol)
524 : op(_op), store(_store), worker(_worker), bucket(_bucket),
525 ol(_ol) {}
526 }; /* op_env */
527
528 class LCRuleOp;
529 class WorkQ;
530
531 struct lc_op_ctx {
532 CephContext *cct;
533 op_env env;
534 rgw_bucket_dir_entry o;
535 boost::optional<std::string> next_key_name;
536 ceph::real_time effective_mtime;
537
538 rgw::sal::Store* store;
539 rgw::sal::Bucket* bucket;
540 lc_op& op; // ok--refers to expanded env.op
541 LCObjsLister& ol;
542
543 std::unique_ptr<rgw::sal::Object> obj;
544 RGWObjectCtx rctx;
545 const DoutPrefixProvider *dpp;
546 WorkQ* wq;
547
548 RGWZoneGroupPlacementTier tier = {};
549
550 lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
551 boost::optional<std::string> next_key_name,
552 ceph::real_time effective_mtime,
553 const DoutPrefixProvider *dpp, WorkQ* wq)
554 : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
555 effective_mtime(effective_mtime),
556 store(env.store), bucket(env.bucket), op(env.op), ol(env.ol),
557 rctx(env.store), dpp(dpp), wq(wq)
558 {
559 obj = bucket->get_object(o.key);
560 }
561
562 bool next_has_same_name(const std::string& key_name) {
563 return (next_key_name && key_name.compare(
564 boost::get<std::string>(next_key_name)) == 0);
565 }
566
567 }; /* lc_op_ctx */
568
569
570 static std::string lc_id = "rgw lifecycle";
571 static std::string lc_req_id = "0";
572
573 static int remove_expired_obj(
574 const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
575 rgw::notify::EventType event_type)
576 {
577 auto& store = oc.store;
578 auto& bucket_info = oc.bucket->get_info();
579 auto& o = oc.o;
580 auto obj_key = o.key;
581 auto& meta = o.meta;
582 int ret;
583 std::string version_id;
584
585 if (!remove_indeed) {
586 obj_key.instance.clear();
587 } else if (obj_key.instance.empty()) {
588 obj_key.instance = "null";
589 }
590
591 std::unique_ptr<rgw::sal::Bucket> bucket;
592 std::unique_ptr<rgw::sal::Object> obj;
593
594 ret = store->get_bucket(nullptr, bucket_info, &bucket);
595 if (ret < 0) {
596 return ret;
597 }
598
599 // XXXX currently, rgw::sal::Bucket.owner is always null here
600 std::unique_ptr<rgw::sal::User> user;
601 if (! bucket->get_owner()) {
602 auto& bucket_info = bucket->get_info();
603 user = store->get_user(bucket_info.owner);
604 // forgive me, lord
605 if (user) {
606 bucket->set_owner(user.get());
607 }
608 }
609
610 obj = bucket->get_object(obj_key);
611 std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
612 = obj->get_delete_op(&oc.rctx);
613 del_op->params.versioning_status
614 = obj->get_bucket()->get_info().versioning_status();
615 del_op->params.obj_owner.set_id(rgw_user {meta.owner});
616 del_op->params.obj_owner.set_name(meta.owner_display_name);
617 del_op->params.bucket_owner.set_id(bucket_info.owner);
618 del_op->params.unmod_since = meta.mtime;
619 del_op->params.marker_version_id = version_id;
620
621 std::unique_ptr<rgw::sal::Notification> notify
622 = store->get_notification(dpp, obj.get(), nullptr, &oc.rctx, event_type,
623 bucket.get(), lc_id,
624 const_cast<std::string&>(oc.bucket->get_tenant()),
625 lc_req_id, null_yield);
626
627 /* can eliminate cast when reservation is lifted into Notification */
628 auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
629
630 ret = rgw::notify::publish_reserve(dpp, event_type, notify_res, nullptr);
631 if (ret < 0) {
632 ldpp_dout(dpp, 1)
633 << "ERROR: notify reservation failed, deferring delete of object k="
634 << o.key
635 << dendl;
636 return ret;
637 }
638
639 ret = del_op->delete_obj(dpp, null_yield);
640 if (ret < 0) {
641 ldpp_dout(dpp, 1) <<
642 "ERROR: publishing notification failed, with error: " << ret << dendl;
643 } else {
644 // send request to notification manager
645 (void) rgw::notify::publish_commit(
646 obj.get(), obj->get_obj_size(), ceph::real_clock::now(),
647 obj->get_attrs()[RGW_ATTR_ETAG].to_str(), version_id, event_type,
648 notify_res, dpp);
649 }
650
651 return ret;
652
653 } /* remove_expired_obj */
654
655 class LCOpAction {
656 public:
657 virtual ~LCOpAction() {}
658
659 virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) {
660 return false;
661 }
662
663 /* called after check(). Check should tell us whether this action
664 * is applicable. If there are multiple actions, we'll end up executing
665 * the latest applicable action
666 * For example:
667 * one action after 10 days, another after 20, third after 40.
668 * After 10 days, the latest applicable action would be the first one,
669 * after 20 days it will be the second one. After 21 days it will still be the
670 * second one. So check() should return true for the second action at that point,
671 * but should_process() if the action has already been applied. In object removal
672 * it doesn't matter, but in object transition it does.
673 */
674 virtual bool should_process() {
675 return true;
676 }
677
678 virtual int process(lc_op_ctx& oc) {
679 return 0;
680 }
681
682 friend class LCOpRule;
683 }; /* LCOpAction */
684
685 class LCOpFilter {
686 public:
687 virtual ~LCOpFilter() {}
688 virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) {
689 return false;
690 }
691 }; /* LCOpFilter */
692
693 class LCOpRule {
694 friend class LCOpAction;
695
696 op_env env;
697 boost::optional<std::string> next_key_name;
698 ceph::real_time effective_mtime;
699
700 std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
701 std::vector<shared_ptr<LCOpAction> > actions;
702
703 public:
704 LCOpRule(op_env& _env) : env(_env) {}
705
706 boost::optional<std::string> get_next_key_name() {
707 return next_key_name;
708 }
709
710 std::vector<shared_ptr<LCOpAction>>& get_actions() {
711 return actions;
712 }
713
714 void build();
715 void update();
716 int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
717 WorkQ* wq);
718 }; /* LCOpRule */
719
720 using WorkItem =
721 boost::variant<void*,
722 /* out-of-line delete */
723 std::tuple<LCOpRule, rgw_bucket_dir_entry>,
724 /* uncompleted MPU expiration */
725 std::tuple<lc_op, rgw_bucket_dir_entry>,
726 rgw_bucket_dir_entry>;
727
728 class WorkQ : public Thread
729 {
730 public:
731 using unique_lock = std::unique_lock<std::mutex>;
732 using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
733 using dequeue_result = boost::variant<void*, WorkItem>;
734
735 static constexpr uint32_t FLAG_NONE = 0x0000;
736 static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
737 static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
738 static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
739
740 private:
741 const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
742 RGWLC::LCWorker* wk;
743 uint32_t qmax;
744 int ix;
745 std::mutex mtx;
746 std::condition_variable cv;
747 uint32_t flags;
748 vector<WorkItem> items;
749 work_f f;
750
751 public:
752 WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
753 : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
754 {
755 create(thr_name().c_str());
756 }
757
758 std::string thr_name() {
759 return std::string{"wp_thrd: "}
760 + std::to_string(wk->ix) + ", " + std::to_string(ix);
761 }
762
763 void setf(work_f _f) {
764 f = _f;
765 }
766
767 void enqueue(WorkItem&& item) {
768 unique_lock uniq(mtx);
769 while ((!wk->get_lc()->going_down()) &&
770 (items.size() > qmax)) {
771 flags |= FLAG_EWAIT_SYNC;
772 cv.wait_for(uniq, 200ms);
773 }
774 items.push_back(item);
775 if (flags & FLAG_DWAIT_SYNC) {
776 flags &= ~FLAG_DWAIT_SYNC;
777 cv.notify_one();
778 }
779 }
780
781 void drain() {
782 unique_lock uniq(mtx);
783 flags |= FLAG_EDRAIN_SYNC;
784 while (flags & FLAG_EDRAIN_SYNC) {
785 cv.wait_for(uniq, 200ms);
786 }
787 }
788
789 private:
790 dequeue_result dequeue() {
791 unique_lock uniq(mtx);
792 while ((!wk->get_lc()->going_down()) &&
793 (items.size() == 0)) {
794 /* clear drain state, as we are NOT doing work and qlen==0 */
795 if (flags & FLAG_EDRAIN_SYNC) {
796 flags &= ~FLAG_EDRAIN_SYNC;
797 }
798 flags |= FLAG_DWAIT_SYNC;
799 cv.wait_for(uniq, 200ms);
800 }
801 if (items.size() > 0) {
802 auto item = items.back();
803 items.pop_back();
804 if (flags & FLAG_EWAIT_SYNC) {
805 flags &= ~FLAG_EWAIT_SYNC;
806 cv.notify_one();
807 }
808 return {item};
809 }
810 return nullptr;
811 }
812
813 void* entry() override {
814 while (!wk->get_lc()->going_down()) {
815 auto item = dequeue();
816 if (item.which() == 0) {
817 /* going down */
818 break;
819 }
820 f(wk, this, boost::get<WorkItem>(item));
821 }
822 return nullptr;
823 }
824 }; /* WorkQ */
825
826 class RGWLC::WorkPool
827 {
828 using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
829 TVector wqs;
830 uint64_t ix;
831
832 public:
833 WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
834 : wqs(TVector{
835 n_threads,
836 [&](const size_t ix, auto emplacer) {
837 emplacer.emplace(wk, ix, qmax);
838 }}),
839 ix(0)
840 {}
841
842 ~WorkPool() {
843 for (auto& wq : wqs) {
844 wq.join();
845 }
846 }
847
848 void setf(WorkQ::work_f _f) {
849 for (auto& wq : wqs) {
850 wq.setf(_f);
851 }
852 }
853
854 void enqueue(WorkItem item) {
855 const auto tix = ix;
856 ix = (ix+1) % wqs.size();
857 (wqs[tix]).enqueue(std::move(item));
858 }
859
860 void drain() {
861 for (auto& wq : wqs) {
862 wq.drain();
863 }
864 }
865 }; /* WorkPool */
866
867 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
868 RGWLC *lc, int ix)
869 : dpp(dpp), cct(cct), lc(lc), ix(ix)
870 {
871 auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
872 workpool = new WorkPool(this, wpw, 512);
873 }
874
875 static inline bool worker_should_stop(time_t stop_at, bool once)
876 {
877 return !once && stop_at < time(nullptr);
878 }
879
880 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
881 const multimap<string, lc_op>& prefix_map,
882 LCWorker* worker, time_t stop_at, bool once)
883 {
884 MultipartMetaFilter mp_filter;
885 int ret;
886 rgw::sal::Bucket::ListParams params;
887 rgw::sal::Bucket::ListResults results;
888 auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
889 params.list_versions = false;
890 /* lifecycle processing does not depend on total order, so can
891 * take advantage of unordered listing optimizations--such as
892 * operating on one shard at a time */
893 params.allow_unordered = true;
894 params.ns = RGW_OBJ_NS_MULTIPART;
895 params.access_list_filter = &mp_filter;
896
897 auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
898 auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
899 auto& [rule, obj] = wt;
900 if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
901 rgw_obj_key key(obj.key);
902 std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
903 RGWObjectCtx rctx(store);
904 int ret = mpu->abort(this, cct, &rctx);
905 if (ret == 0) {
906 if (perfcounter) {
907 perfcounter->inc(l_rgw_lc_abort_mpu, 1);
908 }
909 } else {
910 if (ret == -ERR_NO_SUCH_UPLOAD) {
911 ldpp_dout(wk->get_lc(), 5)
912 << "ERROR: abort_multipart_upload failed, ret=" << ret
913 << ", thread:" << wq->thr_name()
914 << ", meta:" << obj.key
915 << dendl;
916 } else {
917 ldpp_dout(wk->get_lc(), 0)
918 << "ERROR: abort_multipart_upload failed, ret=" << ret
919 << ", thread:" << wq->thr_name()
920 << ", meta:" << obj.key
921 << dendl;
922 }
923 } /* abort failed */
924 } /* expired */
925 };
926
927 worker->workpool->setf(pf);
928
929 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
930 ++prefix_iter) {
931
932 if (worker_should_stop(stop_at, once)) {
933 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
934 << worker->ix
935 << dendl;
936 return 0;
937 }
938
939 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
940 continue;
941 }
942 params.prefix = prefix_iter->first;
943 do {
944 results.objs.clear();
945 ret = target->list(this, params, 1000, results, null_yield);
946 if (ret < 0) {
947 if (ret == (-ENOENT))
948 return 0;
949 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
950 return ret;
951 }
952
953 for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter) {
954 std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
955 {prefix_iter->second, *obj_iter};
956 worker->workpool->enqueue(WorkItem{t1});
957 if (going_down()) {
958 return 0;
959 }
960 } /* for objs */
961
962 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
963 } while(results.is_truncated);
964 } /* for prefix_map */
965
966 worker->workpool->drain();
967 return 0;
968 }
969
970 static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
971 {
972 std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op(&ctx);
973
974 return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield);
975 }
976
977 static bool is_valid_op(const lc_op& op)
978 {
979 return (op.status &&
980 (op.expiration > 0
981 || op.expiration_date != boost::none
982 || op.noncur_expiration > 0
983 || op.dm_expiration
984 || !op.transitions.empty()
985 || !op.noncur_transitions.empty()));
986 }
987
988 static inline bool has_all_tags(const lc_op& rule_action,
989 const RGWObjTags& object_tags)
990 {
991 if(! rule_action.obj_tags)
992 return false;
993 if(object_tags.count() < rule_action.obj_tags->count())
994 return false;
995 size_t tag_count = 0;
996 for (const auto& tag : object_tags.get_tags()) {
997 const auto& rule_tags = rule_action.obj_tags->get_tags();
998 const auto& iter = rule_tags.find(tag.first);
999 if(iter == rule_tags.end())
1000 continue;
1001 if(iter->second == tag.second)
1002 {
1003 tag_count++;
1004 }
1005 /* all tags in the rule appear in obj tags */
1006 }
1007 return tag_count == rule_action.obj_tags->count();
1008 }
1009
1010 static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
1011 {
1012 auto& op = oc.op;
1013
1014 if (op.obj_tags != boost::none) {
1015 *skip = true;
1016
1017 bufferlist tags_bl;
1018 int ret = read_obj_tags(dpp, oc.obj.get(), oc.rctx, tags_bl);
1019 if (ret < 0) {
1020 if (ret != -ENODATA) {
1021 ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r="
1022 << ret << " " << oc.wq->thr_name() << dendl;
1023 }
1024 return 0;
1025 }
1026 RGWObjTags dest_obj_tags;
1027 try {
1028 auto iter = tags_bl.cbegin();
1029 dest_obj_tags.decode(iter);
1030 } catch (buffer::error& err) {
1031 ldpp_dout(oc.dpp,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
1032 << oc.wq->thr_name() << dendl;
1033 return -EIO;
1034 }
1035
1036 if (! has_all_tags(op, dest_obj_tags)) {
1037 ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj
1038 << " as tags do not match in rule: "
1039 << op.id << " "
1040 << oc.wq->thr_name() << dendl;
1041 return 0;
1042 }
1043 }
1044 *skip = false;
1045 return 0;
1046 }
1047
1048 class LCOpFilter_Tags : public LCOpFilter {
1049 public:
1050 bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override {
1051 auto& o = oc.o;
1052
1053 if (o.is_delete_marker()) {
1054 return true;
1055 }
1056
1057 bool skip;
1058
1059 int ret = check_tags(dpp, oc, &skip);
1060 if (ret < 0) {
1061 if (ret == -ENOENT) {
1062 return false;
1063 }
1064 ldpp_dout(oc.dpp, 0) << "ERROR: check_tags on obj=" << oc.obj
1065 << " returned ret=" << ret << " "
1066 << oc.wq->thr_name() << dendl;
1067 return false;
1068 }
1069
1070 return !skip;
1071 };
1072 };
1073
1074 class LCOpAction_CurrentExpiration : public LCOpAction {
1075 public:
1076 LCOpAction_CurrentExpiration(op_env& env) {}
1077
1078 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1079 auto& o = oc.o;
1080 if (!o.is_current()) {
1081 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1082 << ": not current, skipping "
1083 << oc.wq->thr_name() << dendl;
1084 return false;
1085 }
1086 if (o.is_delete_marker()) {
1087 std::string nkn;
1088 if (oc.next_key_name) nkn = *oc.next_key_name;
1089 if (oc.next_has_same_name(o.key.name)) {
1090 ldpp_dout(dpp, 7) << __func__ << "(): dm-check SAME: key=" << o.key
1091 << " next_key_name: %%" << nkn << "%% "
1092 << oc.wq->thr_name() << dendl;
1093 return false;
1094 } else {
1095 ldpp_dout(dpp, 7) << __func__ << "(): dm-check DELE: key=" << o.key
1096 << " next_key_name: %%" << nkn << "%% "
1097 << oc.wq->thr_name() << dendl;
1098 *exp_time = real_clock::now();
1099 return true;
1100 }
1101 }
1102
1103 auto& mtime = o.meta.mtime;
1104 bool is_expired;
1105 auto& op = oc.op;
1106 if (op.expiration <= 0) {
1107 if (op.expiration_date == boost::none) {
1108 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1109 << ": no expiration set in rule, skipping "
1110 << oc.wq->thr_name() << dendl;
1111 return false;
1112 }
1113 is_expired = ceph_clock_now() >=
1114 ceph::real_clock::to_time_t(*op.expiration_date);
1115 *exp_time = *op.expiration_date;
1116 } else {
1117 is_expired = obj_has_expired(dpp, oc.cct, mtime, op.expiration, exp_time);
1118 }
1119
1120 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1121 << (int)is_expired << " "
1122 << oc.wq->thr_name() << dendl;
1123 return is_expired;
1124 }
1125
1126 int process(lc_op_ctx& oc) {
1127 auto& o = oc.o;
1128 int r;
1129 if (o.is_delete_marker()) {
1130 r = remove_expired_obj(oc.dpp, oc, true,
1131 rgw::notify::ObjectExpirationDeleteMarker);
1132 if (r < 0) {
1133 ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
1134 << oc.bucket << ":" << o.key
1135 << " " << cpp_strerror(r) << " "
1136 << oc.wq->thr_name() << dendl;
1137 return r;
1138 }
1139 ldpp_dout(oc.dpp, 2) << "DELETED: current is-dm "
1140 << oc.bucket << ":" << o.key
1141 << " " << oc.wq->thr_name() << dendl;
1142 } else {
1143 /* ! o.is_delete_marker() */
1144 r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
1145 rgw::notify::ObjectExpirationCurrent);
1146 if (r < 0) {
1147 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
1148 << oc.bucket << ":" << o.key
1149 << " " << cpp_strerror(r) << " "
1150 << oc.wq->thr_name() << dendl;
1151 return r;
1152 }
1153 if (perfcounter) {
1154 perfcounter->inc(l_rgw_lc_expire_current, 1);
1155 }
1156 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1157 << " " << oc.wq->thr_name() << dendl;
1158 }
1159 return 0;
1160 }
1161 };
1162
1163 class LCOpAction_NonCurrentExpiration : public LCOpAction {
1164 protected:
1165 public:
1166 LCOpAction_NonCurrentExpiration(op_env& env)
1167 {}
1168
1169 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1170 auto& o = oc.o;
1171 if (o.is_current()) {
1172 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1173 << ": current version, skipping "
1174 << oc.wq->thr_name() << dendl;
1175 return false;
1176 }
1177
1178 int expiration = oc.op.noncur_expiration;
1179 bool is_expired = obj_has_expired(dpp, oc.cct, oc.effective_mtime, expiration,
1180 exp_time);
1181
1182 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1183 << is_expired << " "
1184 << oc.wq->thr_name() << dendl;
1185
1186 return is_expired &&
1187 pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, dpp);
1188 }
1189
1190 int process(lc_op_ctx& oc) {
1191 auto& o = oc.o;
1192 int r = remove_expired_obj(oc.dpp, oc, true,
1193 rgw::notify::ObjectExpirationNoncurrent);
1194 if (r < 0) {
1195 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) "
1196 << oc.bucket << ":" << o.key
1197 << " " << cpp_strerror(r)
1198 << " " << oc.wq->thr_name() << dendl;
1199 return r;
1200 }
1201 if (perfcounter) {
1202 perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
1203 }
1204 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1205 << " (non-current expiration) "
1206 << oc.wq->thr_name() << dendl;
1207 return 0;
1208 }
1209 };
1210
1211 class LCOpAction_DMExpiration : public LCOpAction {
1212 public:
1213 LCOpAction_DMExpiration(op_env& env) {}
1214
1215 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1216 auto& o = oc.o;
1217 if (!o.is_delete_marker()) {
1218 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1219 << ": not a delete marker, skipping "
1220 << oc.wq->thr_name() << dendl;
1221 return false;
1222 }
1223 if (oc.next_has_same_name(o.key.name)) {
1224 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1225 << ": next is same object, skipping "
1226 << oc.wq->thr_name() << dendl;
1227 return false;
1228 }
1229
1230 *exp_time = real_clock::now();
1231
1232 return true;
1233 }
1234
1235 int process(lc_op_ctx& oc) {
1236 auto& o = oc.o;
1237 int r = remove_expired_obj(oc.dpp, oc, true,
1238 rgw::notify::ObjectExpirationDeleteMarker);
1239 if (r < 0) {
1240 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
1241 << oc.bucket << ":" << o.key
1242 << " " << cpp_strerror(r)
1243 << " " << oc.wq->thr_name()
1244 << dendl;
1245 return r;
1246 }
1247 if (perfcounter) {
1248 perfcounter->inc(l_rgw_lc_expire_dm, 1);
1249 }
1250 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1251 << " (delete marker expiration) "
1252 << oc.wq->thr_name() << dendl;
1253 return 0;
1254 }
1255 };
1256
1257 class LCOpAction_Transition : public LCOpAction {
1258 const transition_action& transition;
1259 bool need_to_process{false};
1260
1261 protected:
1262 virtual bool check_current_state(bool is_current) = 0;
1263 virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
1264 public:
1265 LCOpAction_Transition(const transition_action& _transition)
1266 : transition(_transition) {}
1267
1268 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1269 auto& o = oc.o;
1270
1271 if (o.is_delete_marker()) {
1272 return false;
1273 }
1274
1275 if (!check_current_state(o.is_current())) {
1276 return false;
1277 }
1278
1279 auto mtime = get_effective_mtime(oc);
1280 bool is_expired;
1281 if (transition.days < 0) {
1282 if (transition.date == boost::none) {
1283 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1284 << ": no transition day/date set in rule, skipping "
1285 << oc.wq->thr_name() << dendl;
1286 return false;
1287 }
1288 is_expired = ceph_clock_now() >=
1289 ceph::real_clock::to_time_t(*transition.date);
1290 *exp_time = *transition.date;
1291 } else {
1292 is_expired = obj_has_expired(dpp, oc.cct, mtime, transition.days, exp_time);
1293 }
1294
1295 ldpp_dout(oc.dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1296 << is_expired << " "
1297 << oc.wq->thr_name() << dendl;
1298
1299 need_to_process =
1300 (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
1301 transition.storage_class);
1302
1303 return is_expired;
1304 }
1305
1306 bool should_process() override {
1307 return need_to_process;
1308 }
1309
1310 /* find out if the the storage class is remote cloud */
1311 int get_tier_target(const RGWZoneGroup &zonegroup, const rgw_placement_rule& rule,
1312 RGWZoneGroupPlacementTier &tier) {
1313 std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
1314 titer = zonegroup.placement_targets.find(rule.name);
1315 if (titer == zonegroup.placement_targets.end()) {
1316 return -ENOENT;
1317 }
1318
1319 const auto& target_rule = titer->second;
1320 std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier;
1321 ttier = target_rule.tier_targets.find(rule.storage_class);
1322 if (ttier != target_rule.tier_targets.end()) {
1323 tier = ttier->second;
1324 } else { // not found
1325 return -ENOENT;
1326 }
1327 return 0;
1328 }
1329
1330 int delete_tier_obj(lc_op_ctx& oc) {
1331 int ret = 0;
1332
1333 /* If bucket is versioned, create delete_marker for current version
1334 */
1335 if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
1336 ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
1337 ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
1338 } else {
1339 ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
1340 ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
1341 }
1342 return ret;
1343 }
1344
1345 int update_tier_obj(lc_op_ctx& oc, RGWLCCloudTierCtx& tier_ctx) {
1346
1347 map<string, bufferlist> attrs;
1348 int r = 0;
1349
1350 real_time read_mtime;
1351
1352 std::unique_ptr<rgw::sal::Object::ReadOp> read_op(oc.obj->get_read_op(&oc.rctx));
1353
1354 read_op->params.lastmod = &read_mtime;
1355
1356 r = read_op->prepare(null_yield, oc.dpp);
1357 if (r < 0) {
1358 return r;
1359 }
1360
1361 if (read_mtime != tier_ctx.o.meta.mtime) {
1362 /* raced */
1363 return -ECANCELED;
1364 }
1365
1366 attrs = oc.obj->get_attrs();
1367
1368 rgw::sal::RadosStore *rados = static_cast<rgw::sal::RadosStore*>(oc.store);
1369 RGWRados::Object op_target(rados->getRados(), oc.bucket->get_info(), oc.rctx, oc.obj->get_obj());
1370 RGWRados::Object::Write obj_op(&op_target);
1371
1372 obj_op.meta.modify_tail = true;
1373 obj_op.meta.flags = PUT_OBJ_CREATE;
1374 obj_op.meta.category = RGWObjCategory::CloudTiered;
1375 obj_op.meta.delete_at = real_time();
1376 bufferlist blo;
1377 obj_op.meta.data = &blo;
1378 obj_op.meta.if_match = NULL;
1379 obj_op.meta.if_nomatch = NULL;
1380 obj_op.meta.user_data = NULL;
1381 obj_op.meta.zones_trace = NULL;
1382 obj_op.meta.delete_at = real_time();
1383 obj_op.meta.olh_epoch = tier_ctx.o.versioned_epoch;
1384
1385 RGWObjManifest *pmanifest;
1386 RGWObjManifest manifest;
1387
1388 pmanifest = &manifest;
1389 RGWObjTier tier_config;
1390 tier_config.name = oc.tier.storage_class;
1391 tier_config.tier_placement = oc.tier;
1392 tier_config.is_multipart_upload = tier_ctx.is_multipart_upload;
1393
1394 pmanifest->set_tier_type("cloud-s3");
1395 pmanifest->set_tier_config(tier_config);
1396
1397 /* check if its necessary */
1398 rgw_placement_rule target_placement;
1399 target_placement.inherit_from(tier_ctx.bucket_info.placement_rule);
1400 target_placement.storage_class = oc.tier.storage_class;
1401 pmanifest->set_head(target_placement, tier_ctx.obj->get_obj(), 0);
1402
1403 pmanifest->set_tail_placement(target_placement, tier_ctx.obj->get_obj().bucket);
1404
1405 pmanifest->set_obj_size(0);
1406
1407 obj_op.meta.manifest = pmanifest;
1408
1409 /* update storage class */
1410 bufferlist bl;
1411 bl.append(oc.tier.storage_class);
1412 attrs[RGW_ATTR_STORAGE_CLASS] = bl;
1413
1414 attrs.erase(RGW_ATTR_ID_TAG);
1415 attrs.erase(RGW_ATTR_TAIL_TAG);
1416
1417 r = obj_op.write_meta(oc.dpp, 0, 0, attrs, null_yield);
1418 if (r < 0) {
1419 return r;
1420 }
1421
1422 return 0;
1423 }
1424
1425 int transition_obj_to_cloud(lc_op_ctx& oc) {
1426 /* init */
1427 string id = "cloudid";
1428 string endpoint = oc.tier.t.s3.endpoint;
1429 RGWAccessKey key = oc.tier.t.s3.key;
1430 string region = oc.tier.t.s3.region;
1431 HostStyle host_style = oc.tier.t.s3.host_style;
1432 string bucket_name = oc.tier.t.s3.target_path;
1433 const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
1434 bool delete_object;
1435
1436 /* If CurrentVersion object, remove it & create delete marker */
1437 delete_object = (!oc.tier.retain_head_object ||
1438 (oc.o.is_current() && oc.bucket->versioned()));
1439
1440 if (bucket_name.empty()) {
1441 bucket_name = "rgwx-" + zonegroup.get_name() + "-" + oc.tier.storage_class +
1442 "-cloud-bucket";
1443 boost::algorithm::to_lower(bucket_name);
1444 }
1445
1446 /* Create RGW REST connection */
1447 S3RESTConn conn(oc.cct, oc.store, id, { endpoint }, key, region, host_style);
1448
1449 RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(),
1450 oc.obj.get(), oc.rctx, conn, bucket_name,
1451 oc.tier.t.s3.target_storage_class);
1452 tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings;
1453 tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size;
1454 tier_ctx.multipart_sync_threshold = oc.tier.t.s3.multipart_sync_threshold;
1455 tier_ctx.storage_class = oc.tier.storage_class;
1456
1457 // check if target_path is already created
1458 std::set<std::string>& cloud_targets = oc.env.worker->get_cloud_targets();
1459 std::pair<std::set<std::string>::iterator, bool> it;
1460
1461 it = cloud_targets.insert(bucket_name);
1462 tier_ctx.target_bucket_created = !(it.second);
1463
1464 ldpp_dout(oc.dpp, 0) << "Transitioning object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ")" << dendl;
1465
1466 /* Transition object to cloud end point */
1467 int ret = rgw_cloud_tier_transfer_object(tier_ctx);
1468
1469 if (ret < 0) {
1470 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transfer object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ") ret=" << ret << dendl;
1471 return ret;
1472
1473 if (!tier_ctx.target_bucket_created) {
1474 cloud_targets.erase(it.first);
1475 }
1476 }
1477
1478 if (delete_object) {
1479 ret = delete_tier_obj(oc);
1480 if (ret < 0) {
1481 ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
1482 return ret;
1483 }
1484 } else {
1485 ret = update_tier_obj(oc, tier_ctx);
1486 if (ret < 0) {
1487 ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
1488 return ret;
1489 }
1490 }
1491
1492 return 0;
1493 }
1494
1495 int process(lc_op_ctx& oc) {
1496 auto& o = oc.o;
1497 int r;
1498
1499 if (oc.o.meta.category == RGWObjCategory::CloudTiered) {
1500 /* Skip objects which are already cloud tiered. */
1501 ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl;
1502 return 0;
1503 }
1504
1505 std::string tier_type = "";
1506 const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
1507
1508 rgw_placement_rule target_placement;
1509 target_placement.inherit_from(oc.bucket->get_placement_rule());
1510 target_placement.storage_class = transition.storage_class;
1511
1512 r = get_tier_target(zonegroup, target_placement, oc.tier);
1513
1514 if (!r && oc.tier.tier_type == "cloud-s3") {
1515 ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
1516 if (!oc.o.is_current() &&
1517 !pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, oc.dpp)) {
1518 /* Skip objects which has object lock enabled. */
1519 ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
1520 return 0;
1521 }
1522
1523 /* Allow transition for only RadosStore */
1524 rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(oc.store);
1525
1526 if (!rados) {
1527 ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is not on RadosStore. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
1528 return -1;
1529 }
1530
1531 r = transition_obj_to_cloud(oc);
1532 if (r < 0) {
1533 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")"
1534 << dendl;
1535 return r;
1536 }
1537 } else {
1538 if (!oc.store->get_zone()->get_params().
1539 valid_placement(target_placement)) {
1540 ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
1541 << target_placement
1542 << " bucket="<< oc.bucket
1543 << " rule_id=" << oc.op.id
1544 << " " << oc.wq->thr_name() << dendl;
1545 return -EINVAL;
1546 }
1547
1548 int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime,
1549 o.versioned_epoch, oc.dpp, null_yield);
1550 if (r < 0) {
1551 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
1552 << oc.bucket << ":" << o.key
1553 << " -> " << transition.storage_class
1554 << " " << cpp_strerror(r)
1555 << " " << oc.wq->thr_name() << dendl;
1556 return r;
1557 }
1558 }
1559 ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket
1560 << ":" << o.key << " -> "
1561 << transition.storage_class
1562 << " " << oc.wq->thr_name() << dendl;
1563 return 0;
1564 }
1565 };
1566
1567 class LCOpAction_CurrentTransition : public LCOpAction_Transition {
1568 protected:
1569 bool check_current_state(bool is_current) override {
1570 return is_current;
1571 }
1572
1573 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1574 return oc.o.meta.mtime;
1575 }
1576 public:
1577 LCOpAction_CurrentTransition(const transition_action& _transition)
1578 : LCOpAction_Transition(_transition) {}
1579 int process(lc_op_ctx& oc) {
1580 int r = LCOpAction_Transition::process(oc);
1581 if (r == 0) {
1582 if (perfcounter) {
1583 perfcounter->inc(l_rgw_lc_transition_current, 1);
1584 }
1585 }
1586 return r;
1587 }
1588 };
1589
1590 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
1591 protected:
1592 bool check_current_state(bool is_current) override {
1593 return !is_current;
1594 }
1595
1596 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1597 return oc.effective_mtime;
1598 }
1599 public:
1600 LCOpAction_NonCurrentTransition(op_env& env,
1601 const transition_action& _transition)
1602 : LCOpAction_Transition(_transition)
1603 {}
1604 int process(lc_op_ctx& oc) {
1605 int r = LCOpAction_Transition::process(oc);
1606 if (r == 0) {
1607 if (perfcounter) {
1608 perfcounter->inc(l_rgw_lc_transition_noncurrent, 1);
1609 }
1610 }
1611 return r;
1612 }
1613 };
1614
1615 void LCOpRule::build()
1616 {
1617 filters.emplace_back(new LCOpFilter_Tags);
1618
1619 auto& op = env.op;
1620
1621 if (op.expiration > 0 ||
1622 op.expiration_date != boost::none) {
1623 actions.emplace_back(new LCOpAction_CurrentExpiration(env));
1624 }
1625
1626 if (op.dm_expiration) {
1627 actions.emplace_back(new LCOpAction_DMExpiration(env));
1628 }
1629
1630 if (op.noncur_expiration > 0) {
1631 actions.emplace_back(new LCOpAction_NonCurrentExpiration(env));
1632 }
1633
1634 for (auto& iter : op.transitions) {
1635 actions.emplace_back(new LCOpAction_CurrentTransition(iter.second));
1636 }
1637
1638 for (auto& iter : op.noncur_transitions) {
1639 actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second));
1640 }
1641 }
1642
1643 void LCOpRule::update()
1644 {
1645 next_key_name = env.ol.next_key_name();
1646 effective_mtime = env.ol.get_prev_obj().meta.mtime;
1647 }
1648
1649 int LCOpRule::process(rgw_bucket_dir_entry& o,
1650 const DoutPrefixProvider *dpp,
1651 WorkQ* wq)
1652 {
1653 lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq);
1654 shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
1655 real_time exp;
1656
1657 for (auto& a : actions) {
1658 real_time action_exp;
1659
1660 if (a->check(ctx, &action_exp, dpp)) {
1661 if (action_exp > exp) {
1662 exp = action_exp;
1663 selected = &a;
1664 }
1665 }
1666 }
1667
1668 if (selected &&
1669 (*selected)->should_process()) {
1670
1671 /*
1672 * Calling filter checks after action checks because
1673 * all action checks (as they are implemented now) do
1674 * not access the objects themselves, but return result
1675 * from info from bucket index listing. The current tags filter
1676 * check does access the objects, so we avoid unnecessary rados calls
1677 * having filters check later in the process.
1678 */
1679
1680 bool cont = false;
1681 for (auto& f : filters) {
1682 if (f->check(dpp, ctx)) {
1683 cont = true;
1684 break;
1685 }
1686 }
1687
1688 if (!cont) {
1689 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1690 << ": no rule match, skipping "
1691 << wq->thr_name() << dendl;
1692 return 0;
1693 }
1694
1695 int r = (*selected)->process(ctx);
1696 if (r < 0) {
1697 ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
1698 << env.bucket << ":" << o.key
1699 << " " << cpp_strerror(r)
1700 << " " << wq->thr_name() << dendl;
1701 return r;
1702 }
1703 ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
1704 << o.key << " " << wq->thr_name() << dendl;
1705 }
1706
1707 return 0;
1708
1709 }
1710
1711 int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
1712 time_t stop_at, bool once)
1713 {
1714 RGWLifecycleConfiguration config(cct);
1715 std::unique_ptr<rgw::sal::Bucket> bucket;
1716 string no_ns, list_versions;
1717 vector<rgw_bucket_dir_entry> objs;
1718 vector<std::string> result;
1719 boost::split(result, shard_id, boost::is_any_of(":"));
1720 string bucket_tenant = result[0];
1721 string bucket_name = result[1];
1722 string bucket_marker = result[2];
1723 int ret = store->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield);
1724 if (ret < 0) {
1725 ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
1726 << " failed" << dendl;
1727 return ret;
1728 }
1729
1730 ret = bucket->load_bucket(this, null_yield);
1731 if (ret < 0) {
1732 ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name
1733 << " failed" << dendl;
1734 return ret;
1735 }
1736
1737 auto stack_guard = make_scope_guard(
1738 [&worker]
1739 {
1740 worker->workpool->drain();
1741 }
1742 );
1743
1744 if (bucket->get_marker() != bucket_marker) {
1745 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
1746 << bucket_tenant << ":" << bucket_name
1747 << " cur_marker=" << bucket->get_marker()
1748 << " orig_marker=" << bucket_marker << dendl;
1749 return -ENOENT;
1750 }
1751
1752 map<string, bufferlist>::iterator aiter
1753 = bucket->get_attrs().find(RGW_ATTR_LC);
1754 if (aiter == bucket->get_attrs().end()) {
1755 ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for "
1756 << bucket_name << " (terminates bucket_lc_process(...))"
1757 << dendl;
1758 return 0;
1759 }
1760
1761 bufferlist::const_iterator iter{&aiter->second};
1762 try {
1763 config.decode(iter);
1764 } catch (const buffer::error& e) {
1765 ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed"
1766 << dendl;
1767 return -1;
1768 }
1769
1770 auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
1771 auto wt =
1772 boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
1773 auto& [op_rule, o] = wt;
1774
1775 ldpp_dout(wk->get_lc(), 20)
1776 << __func__ << "(): key=" << o.key << wq->thr_name()
1777 << dendl;
1778 int ret = op_rule.process(o, wk->dpp, wq);
1779 if (ret < 0) {
1780 ldpp_dout(wk->get_lc(), 20)
1781 << "ERROR: orule.process() returned ret=" << ret
1782 << "thread:" << wq->thr_name()
1783 << dendl;
1784 }
1785 };
1786 worker->workpool->setf(pf);
1787
1788 multimap<string, lc_op>& prefix_map = config.get_prefix_map();
1789 ldpp_dout(this, 10) << __func__ << "() prefix_map size="
1790 << prefix_map.size()
1791 << dendl;
1792
1793 rgw_obj_key pre_marker;
1794 rgw_obj_key next_marker;
1795 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
1796 ++prefix_iter) {
1797
1798 if (worker_should_stop(stop_at, once)) {
1799 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
1800 << worker->ix
1801 << dendl;
1802 return 0;
1803 }
1804
1805 auto& op = prefix_iter->second;
1806 if (!is_valid_op(op)) {
1807 continue;
1808 }
1809 ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
1810 << dendl;
1811 if (prefix_iter != prefix_map.begin() &&
1812 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
1813 prev(prefix_iter)->first) == 0)) {
1814 next_marker = pre_marker;
1815 } else {
1816 pre_marker = next_marker;
1817 }
1818
1819 LCObjsLister ol(store, bucket.get());
1820 ol.set_prefix(prefix_iter->first);
1821
1822 ret = ol.init(this);
1823 if (ret < 0) {
1824 if (ret == (-ENOENT))
1825 return 0;
1826 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
1827 return ret;
1828 }
1829
1830 op_env oenv(op, store, worker, bucket.get(), ol);
1831 LCOpRule orule(oenv);
1832 orule.build(); // why can't ctor do it?
1833 rgw_bucket_dir_entry* o{nullptr};
1834 for (; ol.get_obj(this, &o /* , fetch_barrier */); ol.next()) {
1835 orule.update();
1836 std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
1837 worker->workpool->enqueue(WorkItem{t1});
1838 }
1839 worker->workpool->drain();
1840 }
1841
1842 ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
1843 return ret;
1844 }
1845
1846 int RGWLC::bucket_lc_post(int index, int max_lock_sec,
1847 rgw::sal::Lifecycle::LCEntry& entry, int& result,
1848 LCWorker* worker)
1849 {
1850 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
1851
1852 rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
1853 obj_names[index],
1854 cookie);
1855
1856 ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry
1857 << " index: " << index << " worker ix: " << worker->ix
1858 << dendl;
1859
1860 do {
1861 int ret = lock->try_lock(this, lock_duration, null_yield);
1862 if (ret == -EBUSY || ret == -EEXIST) {
1863 /* already locked by another lc processor */
1864 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1865 << obj_names[index] << ", sleep 5, try again " << dendl;
1866 sleep(5);
1867 continue;
1868 }
1869
1870 if (ret < 0)
1871 return 0;
1872 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
1873 << dendl;
1874 if (result == -ENOENT) {
1875 /* XXXX are we SURE the only way result could == ENOENT is when
1876 * there is no such bucket? It is currently the value returned
1877 * from bucket_lc_process(...) */
1878 ret = sal_lc->rm_entry(obj_names[index], entry);
1879 if (ret < 0) {
1880 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1881 << obj_names[index] << dendl;
1882 }
1883 goto clean;
1884 } else if (result < 0) {
1885 entry.status = lc_failed;
1886 } else {
1887 entry.status = lc_complete;
1888 }
1889
1890 ret = sal_lc->set_entry(obj_names[index], entry);
1891 if (ret < 0) {
1892 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1893 << obj_names[index] << dendl;
1894 }
1895 clean:
1896 lock->unlock();
1897 delete lock;
1898 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
1899 << obj_names[index] << dendl;
1900 return 0;
1901 } while (true);
1902 }
1903
1904 int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
1905 vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
1906 int& index)
1907 {
1908 progress_map.clear();
1909 for(; index < max_objs; index++, marker="") {
1910 vector<rgw::sal::Lifecycle::LCEntry> entries;
1911 int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries);
1912 if (ret < 0) {
1913 if (ret == -ENOENT) {
1914 ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
1915 << obj_names[index] << dendl;
1916 continue;
1917 } else {
1918 return ret;
1919 }
1920 }
1921 progress_map.reserve(progress_map.size() + entries.size());
1922 progress_map.insert(progress_map.end(), entries.begin(), entries.end());
1923
1924 /* update index, marker tuple */
1925 if (progress_map.size() > 0)
1926 marker = progress_map.back().bucket;
1927
1928 if (progress_map.size() >= max_entries)
1929 break;
1930 }
1931 return 0;
1932 }
1933
1934 static inline vector<int> random_sequence(uint32_t n)
1935 {
1936 vector<int> v(n, 0);
1937 std::generate(v.begin(), v.end(),
1938 [ix = 0]() mutable {
1939 return ix++;
1940 });
1941 std::random_device rd;
1942 std::default_random_engine rng{rd()};
1943 std::shuffle(v.begin(), v.end(), rd);
1944 return v;
1945 }
1946
1947 static inline int get_lc_index(CephContext *cct,
1948 const std::string& shard_id)
1949 {
1950 int max_objs =
1951 (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
1952 cct->_conf->rgw_lc_max_objs);
1953 /* n.b. review hash algo */
1954 int index = ceph_str_hash_linux(shard_id.c_str(),
1955 shard_id.size()) % HASH_PRIME % max_objs;
1956 return index;
1957 }
1958
1959 static inline void get_lc_oid(CephContext *cct,
1960 const std::string& shard_id, string *oid)
1961 {
1962 /* n.b. review hash algo */
1963 int index = get_lc_index(cct, shard_id);
1964 *oid = lc_oid_prefix;
1965 char buf[32];
1966 snprintf(buf, 32, ".%d", index);
1967 oid->append(buf);
1968 return;
1969 }
1970
1971 static std::string get_lc_shard_name(const rgw_bucket& bucket){
1972 return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
1973 }
1974
1975 int RGWLC::process(LCWorker* worker,
1976 const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
1977 bool once = false)
1978 {
1979 int ret = 0;
1980 int max_secs = cct->_conf->rgw_lc_lock_max_time;
1981
1982 if (optional_bucket) {
1983 /* if a bucket is provided, this is a single-bucket run, and
1984 * can be processed without traversing any state entries (we
1985 * do need the entry {pro,epi}logue which update the state entry
1986 * for this bucket) */
1987 auto bucket_entry_marker = get_lc_shard_name(optional_bucket->get_key());
1988 auto index = get_lc_index(store->ctx(), bucket_entry_marker);
1989 ret = process_bucket(index, max_secs, worker, bucket_entry_marker, once);
1990 return ret;
1991 } else {
1992 /* generate an index-shard sequence unrelated to any other
1993 * that might be running in parallel */
1994 std::string all_buckets{""};
1995 vector<int> shard_seq = random_sequence(max_objs);
1996 for (auto index : shard_seq) {
1997 ret = process(index, max_secs, worker, once);
1998 if (ret < 0)
1999 return ret;
2000 }
2001 }
2002
2003 return 0;
2004 }
2005
2006 bool RGWLC::expired_session(time_t started)
2007 {
2008 time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
2009 ? cct->_conf->rgw_lc_debug_interval
2010 : 24*60*60;
2011
2012 auto now = time(nullptr);
2013
2014 ldpp_dout(this, 16) << "RGWLC::expired_session"
2015 << " started: " << started
2016 << " interval: " << interval << "(*2==" << 2*interval << ")"
2017 << " now: " << now
2018 << dendl;
2019
2020 return (started + 2*interval < now);
2021 }
2022
2023 time_t RGWLC::thread_stop_at()
2024 {
2025 uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
2026 ? cct->_conf->rgw_lc_debug_interval
2027 : 24*60*60;
2028
2029 return time(nullptr) + interval;
2030 }
2031
2032 int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker,
2033 const std::string& bucket_entry_marker,
2034 bool once = false)
2035 {
2036 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
2037 << "index: " << index << " worker ix: " << worker->ix
2038 << dendl;
2039
2040 int ret = 0;
2041 std::unique_ptr<rgw::sal::LCSerializer> serializer(
2042 sal_lc->get_serializer(lc_index_lock_name, obj_names[index],
2043 std::string()));
2044
2045 rgw::sal::Lifecycle::LCEntry entry;
2046 if (max_lock_secs <= 0) {
2047 return -EAGAIN;
2048 }
2049
2050 utime_t time(max_lock_secs, 0);
2051 ret = serializer->try_lock(this, time, null_yield);
2052 if (ret == -EBUSY || ret == -EEXIST) {
2053 /* already locked by another lc processor */
2054 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
2055 << obj_names[index] << dendl;
2056 return -EBUSY;
2057 }
2058 if (ret < 0)
2059 return 0;
2060
2061 std::unique_lock<rgw::sal::LCSerializer> lock(
2062 *(serializer.get()), std::adopt_lock);
2063
2064 ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, entry);
2065 if (ret >= 0) {
2066 if (entry.status == lc_processing) {
2067 if (expired_session(entry.start_time)) {
2068 ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
2069 << " index: " << index << " worker ix: " << worker->ix
2070 << " (clearing)"
2071 << dendl;
2072 } else {
2073 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
2074 << entry
2075 << " index: " << index
2076 << " worker ix: " << worker->ix
2077 << dendl;
2078 return ret;
2079 }
2080 }
2081 }
2082
2083 /* do nothing if no bucket */
2084 if (entry.bucket.empty()) {
2085 return ret;
2086 }
2087
2088 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
2089 << " index: " << index << " worker ix: " << worker->ix
2090 << dendl;
2091
2092 entry.status = lc_processing;
2093 ret = sal_lc->set_entry(obj_names[index], entry);
2094 if (ret < 0) {
2095 ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
2096 << obj_names[index] << entry.bucket << entry.status
2097 << dendl;
2098 return ret;
2099 }
2100
2101 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
2102 << " index: " << index << " worker ix: " << worker->ix
2103 << dendl;
2104
2105 lock.unlock();
2106 ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
2107 bucket_lc_post(index, max_lock_secs, entry, ret, worker);
2108
2109 return ret;
2110 } /* RGWLC::process_bucket */
2111
2112 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
2113 bool once = false)
2114 {
2115 ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
2116 << "index: " << index << " worker ix: " << worker->ix
2117 << dendl;
2118
2119 int ret = 0;
2120 rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
2121 obj_names[index],
2122 std::string());
2123 do {
2124 utime_t now = ceph_clock_now();
2125 //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
2126 rgw::sal::Lifecycle::LCEntry entry;
2127 if (max_lock_secs <= 0)
2128 return -EAGAIN;
2129
2130 utime_t time(max_lock_secs, 0);
2131 ret = lock->try_lock(this, time, null_yield);
2132 if (ret == -EBUSY || ret == -EEXIST) {
2133 /* already locked by another lc processor */
2134 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
2135 << obj_names[index] << ", sleep 5, try again" << dendl;
2136 sleep(5);
2137 continue; // XXXX really retry forever?
2138 }
2139 if (ret < 0)
2140 return 0;
2141
2142 rgw::sal::Lifecycle::LCHead head;
2143 ret = sal_lc->get_head(obj_names[index], head);
2144 if (ret < 0) {
2145 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
2146 << obj_names[index] << ", ret=" << ret << dendl;
2147 goto exit;
2148 }
2149
2150 ret = sal_lc->get_entry(obj_names[index], head.marker, entry);
2151 if (ret >= 0) {
2152 if (entry.status == lc_processing) {
2153 if (expired_session(entry.start_time)) {
2154 ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry
2155 << " index: " << index << " worker ix: " << worker->ix
2156 << " (clearing)"
2157 << dendl;
2158 } else {
2159 ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry
2160 << " index: " << index << " worker ix: " << worker->ix
2161 << dendl;
2162 goto exit;
2163 }
2164 }
2165 }
2166
2167 if(!if_already_run_today(head.start_date) ||
2168 once) {
2169 head.start_date = now;
2170 head.marker.clear();
2171 ret = bucket_lc_prepare(index, worker);
2172 if (ret < 0) {
2173 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
2174 << obj_names[index]
2175 << ", ret=" << ret
2176 << dendl;
2177 goto exit;
2178 }
2179 }
2180
2181 ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
2182 if (ret < 0) {
2183 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
2184 << obj_names[index] << dendl;
2185 goto exit;
2186 }
2187
2188 /* termination condition (eof) */
2189 if (entry.bucket.empty())
2190 goto exit;
2191
2192 ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
2193 << " index: " << index << " worker ix: " << worker->ix
2194 << dendl;
2195
2196 entry.status = lc_processing;
2197 ret = sal_lc->set_entry(obj_names[index], entry);
2198 if (ret < 0) {
2199 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
2200 << obj_names[index] << entry.bucket << entry.status << dendl;
2201 goto exit;
2202 }
2203
2204 head.marker = entry.bucket;
2205 ret = sal_lc->put_head(obj_names[index], head);
2206 if (ret < 0) {
2207 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2208 << obj_names[index]
2209 << dendl;
2210 goto exit;
2211 }
2212
2213 ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
2214 << " index: " << index << " worker ix: " << worker->ix
2215 << dendl;
2216
2217 lock->unlock();
2218 ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
2219 bucket_lc_post(index, max_lock_secs, entry, ret, worker);
2220 } while(1 && !once);
2221
2222 delete lock;
2223 return 0;
2224
2225 exit:
2226 lock->unlock();
2227 delete lock;
2228 return 0;
2229 }
2230
2231 void RGWLC::start_processor()
2232 {
2233 auto maxw = cct->_conf->rgw_lc_max_worker;
2234 workers.reserve(maxw);
2235 for (int ix = 0; ix < maxw; ++ix) {
2236 auto worker =
2237 std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
2238 worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
2239 workers.emplace_back(std::move(worker));
2240 }
2241 }
2242
2243 void RGWLC::stop_processor()
2244 {
2245 down_flag = true;
2246 for (auto& worker : workers) {
2247 worker->stop();
2248 worker->join();
2249 }
2250 workers.clear();
2251 }
2252
2253 unsigned RGWLC::get_subsys() const
2254 {
2255 return dout_subsys;
2256 }
2257
2258 std::ostream& RGWLC::gen_prefix(std::ostream& out) const
2259 {
2260 return out << "lifecycle: ";
2261 }
2262
2263 void RGWLC::LCWorker::stop()
2264 {
2265 std::lock_guard l{lock};
2266 cond.notify_all();
2267 }
2268
2269 bool RGWLC::going_down()
2270 {
2271 return down_flag;
2272 }
2273
2274 bool RGWLC::LCWorker::should_work(utime_t& now)
2275 {
2276 int start_hour;
2277 int start_minute;
2278 int end_hour;
2279 int end_minute;
2280 string worktime = cct->_conf->rgw_lifecycle_work_time;
2281 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute,
2282 &end_hour, &end_minute);
2283 struct tm bdt;
2284 time_t tt = now.sec();
2285 localtime_r(&tt, &bdt);
2286
2287 if (cct->_conf->rgw_lc_debug_interval > 0) {
2288 /* We're debugging, so say we can run */
2289 return true;
2290 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
2291 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
2292 return true;
2293 } else {
2294 return false;
2295 }
2296
2297 }
2298
2299 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
2300 {
2301 int secs;
2302
2303 if (cct->_conf->rgw_lc_debug_interval > 0) {
2304 secs = start + cct->_conf->rgw_lc_debug_interval - now;
2305 if (secs < 0)
2306 secs = 0;
2307 return (secs);
2308 }
2309
2310 int start_hour;
2311 int start_minute;
2312 int end_hour;
2313 int end_minute;
2314 string worktime = cct->_conf->rgw_lifecycle_work_time;
2315 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour,
2316 &end_minute);
2317 struct tm bdt;
2318 time_t tt = now.sec();
2319 time_t nt;
2320 localtime_r(&tt, &bdt);
2321 bdt.tm_hour = start_hour;
2322 bdt.tm_min = start_minute;
2323 bdt.tm_sec = 0;
2324 nt = mktime(&bdt);
2325 secs = nt - tt;
2326
2327 return secs>0 ? secs : secs+24*60*60;
2328 }
2329
2330 RGWLC::LCWorker::~LCWorker()
2331 {
2332 delete workpool;
2333 } /* ~LCWorker */
2334
2335 void RGWLifecycleConfiguration::generate_test_instances(
2336 list<RGWLifecycleConfiguration*>& o)
2337 {
2338 o.push_back(new RGWLifecycleConfiguration);
2339 }
2340
2341 template<typename F>
2342 static int guard_lc_modify(const DoutPrefixProvider *dpp,
2343 rgw::sal::Store* store,
2344 rgw::sal::Lifecycle* sal_lc,
2345 const rgw_bucket& bucket, const string& cookie,
2346 const F& f) {
2347 CephContext *cct = store->ctx();
2348
2349 string shard_id = get_lc_shard_name(bucket);
2350
2351 string oid;
2352 get_lc_oid(cct, shard_id, &oid);
2353
2354 /* XXX it makes sense to take shard_id for a bucket_id? */
2355 rgw::sal::Lifecycle::LCEntry entry;
2356 entry.bucket = shard_id;
2357 entry.status = lc_uninitial;
2358 int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
2359
2360 rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
2361 oid,
2362 cookie);
2363 utime_t time(max_lock_secs, 0);
2364
2365 int ret;
2366
2367 do {
2368 ret = lock->try_lock(dpp, time, null_yield);
2369 if (ret == -EBUSY || ret == -EEXIST) {
2370 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2371 << oid << ", sleep 5, try again" << dendl;
2372 sleep(5); // XXX: return retryable error
2373 continue;
2374 }
2375 if (ret < 0) {
2376 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2377 << oid << ", ret=" << ret << dendl;
2378 break;
2379 }
2380 ret = f(sal_lc, oid, entry);
2381 if (ret < 0) {
2382 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to set entry on "
2383 << oid << ", ret=" << ret << dendl;
2384 }
2385 break;
2386 } while(true);
2387 lock->unlock();
2388 delete lock;
2389 return ret;
2390 }
2391
2392 int RGWLC::set_bucket_config(rgw::sal::Bucket* bucket,
2393 const rgw::sal::Attrs& bucket_attrs,
2394 RGWLifecycleConfiguration *config)
2395 {
2396 rgw::sal::Attrs attrs = bucket_attrs;
2397 bufferlist lc_bl;
2398 config->encode(lc_bl);
2399
2400 attrs[RGW_ATTR_LC] = std::move(lc_bl);
2401
2402 int ret =
2403 bucket->merge_and_store_attrs(this, attrs, null_yield);
2404 if (ret < 0)
2405 return ret;
2406
2407 rgw_bucket& b = bucket->get_key();
2408
2409
2410 ret = guard_lc_modify(this, store, sal_lc.get(), b, cookie,
2411 [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
2412 const rgw::sal::Lifecycle::LCEntry& entry) {
2413 return sal_lc->set_entry(oid, entry);
2414 });
2415
2416 return ret;
2417 }
2418
2419 int RGWLC::remove_bucket_config(rgw::sal::Bucket* bucket,
2420 const rgw::sal::Attrs& bucket_attrs)
2421 {
2422 rgw::sal::Attrs attrs = bucket_attrs;
2423 attrs.erase(RGW_ATTR_LC);
2424 int ret = bucket->merge_and_store_attrs(this, attrs, null_yield);
2425
2426 rgw_bucket& b = bucket->get_key();
2427
2428 if (ret < 0) {
2429 ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
2430 << b.name << " returned err=" << ret << dendl;
2431 return ret;
2432 }
2433
2434
2435 ret = guard_lc_modify(this, store, sal_lc.get(), b, cookie,
2436 [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
2437 const rgw::sal::Lifecycle::LCEntry& entry) {
2438 return sal_lc->rm_entry(oid, entry);
2439 });
2440
2441 return ret;
2442 } /* RGWLC::remove_bucket_config */
2443
2444 RGWLC::~RGWLC()
2445 {
2446 stop_processor();
2447 finalize();
2448 } /* ~RGWLC() */
2449
2450 namespace rgw::lc {
2451
2452 int fix_lc_shard_entry(const DoutPrefixProvider *dpp,
2453 rgw::sal::Store* store,
2454 rgw::sal::Lifecycle* sal_lc,
2455 rgw::sal::Bucket* bucket)
2456 {
2457 if (auto aiter = bucket->get_attrs().find(RGW_ATTR_LC);
2458 aiter == bucket->get_attrs().end()) {
2459 return 0; // No entry, nothing to fix
2460 }
2461
2462 auto shard_name = get_lc_shard_name(bucket->get_key());
2463 std::string lc_oid;
2464 get_lc_oid(store->ctx(), shard_name, &lc_oid);
2465
2466 rgw::sal::Lifecycle::LCEntry entry;
2467 // There are multiple cases we need to encounter here
2468 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
2469 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
2470 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
2471 // We are not dropping the old marker here as that would be caught by the next LC process update
2472 int ret = sal_lc->get_entry(lc_oid, shard_name, entry);
2473 if (ret == 0) {
2474 ldpp_dout(dpp, 5) << "Entry already exists, nothing to do" << dendl;
2475 return ret; // entry is already existing correctly set to marker
2476 }
2477 ldpp_dout(dpp, 5) << "lc_get_entry errored ret code=" << ret << dendl;
2478 if (ret == -ENOENT) {
2479 ldpp_dout(dpp, 1) << "No entry for bucket=" << bucket
2480 << " creating " << dendl;
2481 // TODO: we have too many ppl making cookies like this!
2482 char cookie_buf[COOKIE_LEN + 1];
2483 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
2484 std::string cookie = cookie_buf;
2485
2486 ret = guard_lc_modify(dpp,
2487 store, sal_lc, bucket->get_key(), cookie,
2488 [&lc_oid](rgw::sal::Lifecycle* slc,
2489 const string& oid,
2490 const rgw::sal::Lifecycle::LCEntry& entry) {
2491 return slc->set_entry(lc_oid, entry);
2492 });
2493
2494 }
2495
2496 return ret;
2497 }
2498
2499 std::string s3_expiration_header(
2500 DoutPrefixProvider* dpp,
2501 const rgw_obj_key& obj_key,
2502 const RGWObjTags& obj_tagset,
2503 const ceph::real_time& mtime,
2504 const std::map<std::string, buffer::list>& bucket_attrs)
2505 {
2506 CephContext* cct = dpp->get_cct();
2507 RGWLifecycleConfiguration config(cct);
2508 std::string hdr{""};
2509
2510 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2511 if (aiter == bucket_attrs.end())
2512 return hdr;
2513
2514 bufferlist::const_iterator iter{&aiter->second};
2515 try {
2516 config.decode(iter);
2517 } catch (const buffer::error& e) {
2518 ldpp_dout(dpp, 0) << __func__
2519 << "() decode life cycle config failed"
2520 << dendl;
2521 return hdr;
2522 } /* catch */
2523
2524 /* dump tags at debug level 16 */
2525 RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags();
2526 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) {
2527 for (const auto& elt : obj_tag_map) {
2528 ldpp_dout(dpp, 16) << __func__
2529 << "() key=" << elt.first << " val=" << elt.second
2530 << dendl;
2531 }
2532 }
2533
2534 boost::optional<ceph::real_time> expiration_date;
2535 boost::optional<std::string> rule_id;
2536
2537 const auto& rule_map = config.get_rule_map();
2538 for (const auto& ri : rule_map) {
2539 const auto& rule = ri.second;
2540 auto& id = rule.get_id();
2541 auto& filter = rule.get_filter();
2542 auto& prefix = filter.has_prefix() ? filter.get_prefix(): rule.get_prefix();
2543 auto& expiration = rule.get_expiration();
2544 auto& noncur_expiration = rule.get_noncur_expiration();
2545
2546 ldpp_dout(dpp, 10) << "rule: " << ri.first
2547 << " prefix: " << prefix
2548 << " expiration: "
2549 << " date: " << expiration.get_date()
2550 << " days: " << expiration.get_days()
2551 << " noncur_expiration: "
2552 << " date: " << noncur_expiration.get_date()
2553 << " days: " << noncur_expiration.get_days()
2554 << dendl;
2555
2556 /* skip if rule !enabled
2557 * if rule has prefix, skip iff object !match prefix
2558 * if rule has tags, skip iff object !match tags
2559 * note if object is current or non-current, compare accordingly
2560 * if rule has days, construct date expression and save iff older
2561 * than last saved
2562 * if rule has date, convert date expression and save iff older
2563 * than last saved
2564 * if the date accum has a value, format it into hdr
2565 */
2566
2567 if (! rule.is_enabled())
2568 continue;
2569
2570 if(! prefix.empty()) {
2571 if (! boost::starts_with(obj_key.name, prefix))
2572 continue;
2573 }
2574
2575 if (filter.has_tags()) {
2576 bool tag_match = false;
2577 const RGWObjTags& rule_tagset = filter.get_tags();
2578 for (auto& tag : rule_tagset.get_tags()) {
2579 /* remember, S3 tags are {key,value} tuples */
2580 tag_match = true;
2581 auto obj_tag = obj_tag_map.find(tag.first);
2582 if (obj_tag == obj_tag_map.end() || obj_tag->second != tag.second) {
2583 ldpp_dout(dpp, 10) << "tag does not match obj_key=" << obj_key
2584 << " rule_id=" << id
2585 << " tag=" << tag
2586 << dendl;
2587 tag_match = false;
2588 break;
2589 }
2590 }
2591 if (! tag_match)
2592 continue;
2593 }
2594
2595 // compute a uniform expiration date
2596 boost::optional<ceph::real_time> rule_expiration_date;
2597 const LCExpiration& rule_expiration =
2598 (obj_key.instance.empty()) ? expiration : noncur_expiration;
2599
2600 if (rule_expiration.has_date()) {
2601 rule_expiration_date =
2602 boost::optional<ceph::real_time>(
2603 ceph::from_iso_8601(rule.get_expiration().get_date()));
2604 } else {
2605 if (rule_expiration.has_days()) {
2606 rule_expiration_date =
2607 boost::optional<ceph::real_time>(
2608 mtime + make_timespan(double(rule_expiration.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2609 }
2610 }
2611
2612 // update earliest expiration
2613 if (rule_expiration_date) {
2614 if ((! expiration_date) ||
2615 (*expiration_date > *rule_expiration_date)) {
2616 expiration_date =
2617 boost::optional<ceph::real_time>(rule_expiration_date);
2618 rule_id = boost::optional<std::string>(id);
2619 }
2620 }
2621 }
2622
2623 // cond format header
2624 if (expiration_date && rule_id) {
2625 // Fri, 23 Dec 2012 00:00:00 GMT
2626 char exp_buf[100];
2627 time_t exp = ceph::real_clock::to_time_t(*expiration_date);
2628 if (std::strftime(exp_buf, sizeof(exp_buf),
2629 "%a, %d %b %Y %T %Z", std::gmtime(&exp))) {
2630 hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf,
2631 *rule_id);
2632 } else {
2633 ldpp_dout(dpp, 0) << __func__ <<
2634 "() strftime of life cycle expiration header failed"
2635 << dendl;
2636 }
2637 }
2638
2639 return hdr;
2640
2641 } /* rgwlc_s3_expiration_header */
2642
2643 bool s3_multipart_abort_header(
2644 DoutPrefixProvider* dpp,
2645 const rgw_obj_key& obj_key,
2646 const ceph::real_time& mtime,
2647 const std::map<std::string, buffer::list>& bucket_attrs,
2648 ceph::real_time& abort_date,
2649 std::string& rule_id)
2650 {
2651 CephContext* cct = dpp->get_cct();
2652 RGWLifecycleConfiguration config(cct);
2653
2654 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2655 if (aiter == bucket_attrs.end())
2656 return false;
2657
2658 bufferlist::const_iterator iter{&aiter->second};
2659 try {
2660 config.decode(iter);
2661 } catch (const buffer::error& e) {
2662 ldpp_dout(dpp, 0) << __func__
2663 << "() decode life cycle config failed"
2664 << dendl;
2665 return false;
2666 } /* catch */
2667
2668 std::optional<ceph::real_time> abort_date_tmp;
2669 std::optional<std::string_view> rule_id_tmp;
2670 const auto& rule_map = config.get_rule_map();
2671 for (const auto& ri : rule_map) {
2672 const auto& rule = ri.second;
2673 const auto& id = rule.get_id();
2674 const auto& filter = rule.get_filter();
2675 const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix();
2676 const auto& mp_expiration = rule.get_mp_expiration();
2677 if (!rule.is_enabled()) {
2678 continue;
2679 }
2680 if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) {
2681 continue;
2682 }
2683
2684 std::optional<ceph::real_time> rule_abort_date;
2685 if (mp_expiration.has_days()) {
2686 rule_abort_date = std::optional<ceph::real_time>(
2687 mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2688 }
2689
2690 // update earliest abort date
2691 if (rule_abort_date) {
2692 if ((! abort_date_tmp) ||
2693 (*abort_date_tmp > *rule_abort_date)) {
2694 abort_date_tmp =
2695 std::optional<ceph::real_time>(rule_abort_date);
2696 rule_id_tmp = std::optional<std::string_view>(id);
2697 }
2698 }
2699 }
2700 if (abort_date_tmp && rule_id_tmp) {
2701 abort_date = *abort_date_tmp;
2702 rule_id = *rule_id_tmp;
2703 return true;
2704 } else {
2705 return false;
2706 }
2707 }
2708
2709 } /* namespace rgw::lc */
2710
2711 void lc_op::dump(Formatter *f) const
2712 {
2713 f->dump_bool("status", status);
2714 f->dump_bool("dm_expiration", dm_expiration);
2715
2716 f->dump_int("expiration", expiration);
2717 f->dump_int("noncur_expiration", noncur_expiration);
2718 f->dump_int("mp_expiration", mp_expiration);
2719 if (expiration_date) {
2720 utime_t ut(*expiration_date);
2721 f->dump_stream("expiration_date") << ut;
2722 }
2723 if (obj_tags) {
2724 f->dump_object("obj_tags", *obj_tags);
2725 }
2726 f->open_object_section("transitions");
2727 for(auto& [storage_class, transition] : transitions) {
2728 f->dump_object(storage_class, transition);
2729 }
2730 f->close_section();
2731
2732 f->open_object_section("noncur_transitions");
2733 for (auto& [storage_class, transition] : noncur_transitions) {
2734 f->dump_object(storage_class, transition);
2735 }
2736 f->close_section();
2737 }
2738
2739 void LCFilter::dump(Formatter *f) const
2740 {
2741 f->dump_string("prefix", prefix);
2742 f->dump_object("obj_tags", obj_tags);
2743 }
2744
2745 void LCExpiration::dump(Formatter *f) const
2746 {
2747 f->dump_string("days", days);
2748 f->dump_string("date", date);
2749 }
2750
2751 void LCRule::dump(Formatter *f) const
2752 {
2753 f->dump_string("id", id);
2754 f->dump_string("prefix", prefix);
2755 f->dump_string("status", status);
2756 f->dump_object("expiration", expiration);
2757 f->dump_object("noncur_expiration", noncur_expiration);
2758 f->dump_object("mp_expiration", mp_expiration);
2759 f->dump_object("filter", filter);
2760 f->open_object_section("transitions");
2761 for (auto& [storage_class, transition] : transitions) {
2762 f->dump_object(storage_class, transition);
2763 }
2764 f->close_section();
2765
2766 f->open_object_section("noncur_transitions");
2767 for (auto& [storage_class, transition] : noncur_transitions) {
2768 f->dump_object(storage_class, transition);
2769 }
2770 f->close_section();
2771 f->dump_bool("dm_expiration", dm_expiration);
2772 }
2773
2774
2775 void RGWLifecycleConfiguration::dump(Formatter *f) const
2776 {
2777 f->open_object_section("prefix_map");
2778 for (auto& prefix : prefix_map) {
2779 f->dump_object(prefix.first.c_str(), prefix.second);
2780 }
2781 f->close_section();
2782
2783 f->open_array_section("rule_map");
2784 for (auto& rule : rule_map) {
2785 f->open_object_section("entry");
2786 f->dump_string("id", rule.first);
2787 f->open_object_section("rule");
2788 rule.second.dump(f);
2789 f->close_section();
2790 f->close_section();
2791 }
2792 f->close_section();
2793 }
2794