]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string.h>
5 #include <iostream>
6 #include <map>
7 #include <algorithm>
8 #include <tuple>
9 #include <functional>
10
11 #include <boost/algorithm/string/split.hpp>
12 #include <boost/algorithm/string.hpp>
13 #include <boost/algorithm/string/predicate.hpp>
14 #include <boost/variant.hpp>
15
16 #include "include/scope_guard.h"
17 #include "common/Formatter.h"
18 #include "common/containers.h"
19 #include <common/errno.h>
20 #include "include/random.h"
21 #include "cls/lock/cls_lock_client.h"
22 #include "rgw_perf_counters.h"
23 #include "rgw_common.h"
24 #include "rgw_bucket.h"
25 #include "rgw_lc.h"
26 #include "rgw_zone.h"
27 #include "rgw_string.h"
28 #include "rgw_multi.h"
29 #include "rgw_sal_rados.h"
30 #include "rgw_rados.h"
31 #include "rgw_lc_tier.h"
32 #include "rgw_notify.h"
33
34 // this seems safe to use, at least for now--arguably, we should
35 // prefer header-only fmt, in general
36 #undef FMT_HEADER_ONLY
37 #define FMT_HEADER_ONLY 1
38 #include "fmt/format.h"
39
40 #include "services/svc_sys_obj.h"
41 #include "services/svc_zone.h"
42 #include "services/svc_tier_rados.h"
43
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_rgw
46
47 using namespace std;
48
49 const char* LC_STATUS[] = {
50 "UNINITIAL",
51 "PROCESSING",
52 "FAILED",
53 "COMPLETE"
54 };
55
56 using namespace librados;
57
58 bool LCRule::valid() const
59 {
60 if (id.length() > MAX_ID_LEN) {
61 return false;
62 }
63 else if(expiration.empty() && noncur_expiration.empty() &&
64 mp_expiration.empty() && !dm_expiration &&
65 transitions.empty() && noncur_transitions.empty()) {
66 return false;
67 }
68 else if (!expiration.valid() || !noncur_expiration.valid() ||
69 !mp_expiration.valid()) {
70 return false;
71 }
72 if (!transitions.empty()) {
73 bool using_days = expiration.has_days();
74 bool using_date = expiration.has_date();
75 for (const auto& elem : transitions) {
76 if (!elem.second.valid()) {
77 return false;
78 }
79 using_days = using_days || elem.second.has_days();
80 using_date = using_date || elem.second.has_date();
81 if (using_days && using_date) {
82 return false;
83 }
84 }
85 }
86 for (const auto& elem : noncur_transitions) {
87 if (!elem.second.valid()) {
88 return false;
89 }
90 }
91
92 return true;
93 }
94
95 void LCRule::init_simple_days_rule(std::string_view _id,
96 std::string_view _prefix, int num_days)
97 {
98 id = _id;
99 prefix = _prefix;
100 char buf[32];
101 snprintf(buf, sizeof(buf), "%d", num_days);
102 expiration.set_days(buf);
103 set_enabled(true);
104 }
105
106 void RGWLifecycleConfiguration::add_rule(const LCRule& rule)
107 {
108 auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
109 rule_map.insert(pair<string, LCRule>(id, rule));
110 }
111
112 bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
113 {
114 lc_op op(rule.get_id());
115 op.status = rule.is_enabled();
116 if (rule.get_expiration().has_days()) {
117 op.expiration = rule.get_expiration().get_days();
118 }
119 if (rule.get_expiration().has_date()) {
120 op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date());
121 }
122 if (rule.get_noncur_expiration().has_days()) {
123 op.noncur_expiration = rule.get_noncur_expiration().get_days();
124 }
125 if (rule.get_mp_expiration().has_days()) {
126 op.mp_expiration = rule.get_mp_expiration().get_days();
127 }
128 op.dm_expiration = rule.get_dm_expiration();
129 for (const auto &elem : rule.get_transitions()) {
130 transition_action action;
131 if (elem.second.has_days()) {
132 action.days = elem.second.get_days();
133 } else {
134 action.date = ceph::from_iso_8601(elem.second.get_date());
135 }
136 action.storage_class
137 = rgw_placement_rule::get_canonical_storage_class(elem.first);
138 op.transitions.emplace(elem.first, std::move(action));
139 }
140 for (const auto &elem : rule.get_noncur_transitions()) {
141 transition_action action;
142 action.days = elem.second.get_days();
143 action.date = ceph::from_iso_8601(elem.second.get_date());
144 action.storage_class = elem.first;
145 op.noncur_transitions.emplace(elem.first, std::move(action));
146 }
147 std::string prefix;
148 if (rule.get_filter().has_prefix()){
149 prefix = rule.get_filter().get_prefix();
150 } else {
151 prefix = rule.get_prefix();
152 }
153
154 if (rule.get_filter().has_tags()){
155 op.obj_tags = rule.get_filter().get_tags();
156 }
157 prefix_map.emplace(std::move(prefix), std::move(op));
158 return true;
159 }
160
161 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
162 {
163 if (!rule.valid()) {
164 return -EINVAL;
165 }
166 auto& id = rule.get_id();
167 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
168 return -EINVAL;
169 }
170 if (rule.get_filter().has_tags() && (rule.get_dm_expiration() ||
171 !rule.get_mp_expiration().empty())) {
172 return -ERR_INVALID_REQUEST;
173 }
174 rule_map.insert(pair<string, LCRule>(id, rule));
175
176 if (!_add_rule(rule)) {
177 return -ERR_INVALID_REQUEST;
178 }
179 return 0;
180 }
181
182 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first,
183 const lc_op& second) {
184 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
185 (second.expiration > 0 || second.expiration_date != boost::none)) {
186 return true;
187 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
188 return true;
189 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
190 return true;
191 } else if (!first.transitions.empty() && !second.transitions.empty()) {
192 for (auto &elem : first.transitions) {
193 if (second.transitions.find(elem.first) != second.transitions.end()) {
194 return true;
195 }
196 }
197 } else if (!first.noncur_transitions.empty() &&
198 !second.noncur_transitions.empty()) {
199 for (auto &elem : first.noncur_transitions) {
200 if (second.noncur_transitions.find(elem.first) !=
201 second.noncur_transitions.end()) {
202 return true;
203 }
204 }
205 }
206 return false;
207 }
208
209 /* Formerly, this method checked for duplicate rules using an invalid
210 * method (prefix uniqueness). */
211 bool RGWLifecycleConfiguration::valid()
212 {
213 return true;
214 }
215
216 void *RGWLC::LCWorker::entry() {
217 do {
218 std::unique_ptr<rgw::sal::Bucket> all_buckets; // empty restriction
219 utime_t start = ceph_clock_now();
220 if (should_work(start)) {
221 ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
222 int r = lc->process(this, all_buckets, false /* once */);
223 if (r < 0) {
224 ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r="
225 << r << dendl;
226 }
227 ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
228 cloud_targets.clear(); // clear cloud targets
229 }
230 if (lc->going_down())
231 break;
232
233 utime_t end = ceph_clock_now();
234 int secs = schedule_next_start_time(start, end);
235 utime_t next;
236 next.set_from_double(end + secs);
237
238 ldpp_dout(dpp, 5) << "schedule life cycle next start time: "
239 << rgw_to_asctime(next) << dendl;
240
241 std::unique_lock l{lock};
242 cond.wait_for(l, std::chrono::seconds(secs));
243 } while (!lc->going_down());
244
245 return NULL;
246 }
247
248 void RGWLC::initialize(CephContext *_cct, rgw::sal::Store* _store) {
249 cct = _cct;
250 store = _store;
251 sal_lc = store->get_lifecycle();
252 max_objs = cct->_conf->rgw_lc_max_objs;
253 if (max_objs > HASH_PRIME)
254 max_objs = HASH_PRIME;
255
256 obj_names = new string[max_objs];
257
258 for (int i = 0; i < max_objs; i++) {
259 obj_names[i] = lc_oid_prefix;
260 char buf[32];
261 snprintf(buf, 32, ".%d", i);
262 obj_names[i].append(buf);
263 }
264
265 #define COOKIE_LEN 16
266 char cookie_buf[COOKIE_LEN + 1];
267 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
268 cookie = cookie_buf;
269 }
270
271 void RGWLC::finalize()
272 {
273 delete[] obj_names;
274 }
275
276 bool RGWLC::if_already_run_today(time_t start_date)
277 {
278 struct tm bdt;
279 time_t begin_of_day;
280 utime_t now = ceph_clock_now();
281 localtime_r(&start_date, &bdt);
282
283 if (cct->_conf->rgw_lc_debug_interval > 0) {
284 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
285 return true;
286 else
287 return false;
288 }
289
290 bdt.tm_hour = 0;
291 bdt.tm_min = 0;
292 bdt.tm_sec = 0;
293 begin_of_day = mktime(&bdt);
294 if (now - begin_of_day < 24*60*60)
295 return true;
296 else
297 return false;
298 }
299
300 static inline std::ostream& operator<<(std::ostream &os, rgw::sal::Lifecycle::LCEntry& ent) {
301 os << "<ent: bucket=";
302 os << ent.bucket;
303 os << "; start_time=";
304 os << rgw_to_asctime(utime_t(time_t(ent.start_time), 0));
305 os << "; status=";
306 os << ent.status;
307 os << ">";
308 return os;
309 }
310
311 int RGWLC::bucket_lc_prepare(int index, LCWorker* worker)
312 {
313 vector<rgw::sal::Lifecycle::LCEntry> entries;
314 string marker;
315
316 ldpp_dout(this, 5) << "RGWLC::bucket_lc_prepare(): PREPARE "
317 << "index: " << index << " worker ix: " << worker->ix
318 << dendl;
319
320 #define MAX_LC_LIST_ENTRIES 100
321 do {
322 int ret = sal_lc->list_entries(obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
323 if (ret < 0)
324 return ret;
325
326 for (auto& entry : entries) {
327 entry.start_time = ceph_clock_now();
328 entry.status = lc_uninitial; // lc_uninitial? really?
329 ret = sal_lc->set_entry(obj_names[index], entry);
330 if (ret < 0) {
331 ldpp_dout(this, 0)
332 << "RGWLC::bucket_lc_prepare() failed to set entry on "
333 << obj_names[index] << dendl;
334 return ret;
335 }
336 }
337
338 if (! entries.empty()) {
339 marker = std::move(entries.back().bucket);
340 }
341 } while (!entries.empty());
342
343 return 0;
344 }
345
346 static bool obj_has_expired(const DoutPrefixProvider *dpp, CephContext *cct, ceph::real_time mtime, int days,
347 ceph::real_time *expire_time = nullptr)
348 {
349 double timediff, cmp;
350 utime_t base_time;
351 if (cct->_conf->rgw_lc_debug_interval <= 0) {
352 /* Normal case, run properly */
353 cmp = double(days)*24*60*60;
354 base_time = ceph_clock_now().round_to_day();
355 } else {
356 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
357 cmp = double(days)*cct->_conf->rgw_lc_debug_interval;
358 base_time = ceph_clock_now();
359 }
360 auto tt_mtime = ceph::real_clock::to_time_t(mtime);
361 timediff = base_time - tt_mtime;
362
363 if (expire_time) {
364 *expire_time = mtime + make_timespan(cmp);
365 }
366
367 ldpp_dout(dpp, 20) << __func__
368 << "(): mtime=" << mtime << " days=" << days
369 << " base_time=" << base_time << " timediff=" << timediff
370 << " cmp=" << cmp
371 << " is_expired=" << (timediff >= cmp)
372 << dendl;
373
374 return (timediff >= cmp);
375 }
376
377 static bool pass_object_lock_check(rgw::sal::Store* store, rgw::sal::Object* obj, RGWObjectCtx& ctx, const DoutPrefixProvider *dpp)
378 {
379 if (!obj->get_bucket()->get_info().obj_lock_enabled()) {
380 return true;
381 }
382 std::unique_ptr<rgw::sal::Object::ReadOp> read_op = obj->get_read_op(&ctx);
383 int ret = read_op->prepare(null_yield, dpp);
384 if (ret < 0) {
385 if (ret == -ENOENT) {
386 return true;
387 } else {
388 return false;
389 }
390 } else {
391 auto iter = obj->get_attrs().find(RGW_ATTR_OBJECT_RETENTION);
392 if (iter != obj->get_attrs().end()) {
393 RGWObjectRetention retention;
394 try {
395 decode(retention, iter->second);
396 } catch (buffer::error& err) {
397 ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectRetention"
398 << dendl;
399 return false;
400 }
401 if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) >
402 ceph_clock_now()) {
403 return false;
404 }
405 }
406 iter = obj->get_attrs().find(RGW_ATTR_OBJECT_LEGAL_HOLD);
407 if (iter != obj->get_attrs().end()) {
408 RGWObjectLegalHold obj_legal_hold;
409 try {
410 decode(obj_legal_hold, iter->second);
411 } catch (buffer::error& err) {
412 ldpp_dout(dpp, 0) << "ERROR: failed to decode RGWObjectLegalHold"
413 << dendl;
414 return false;
415 }
416 if (obj_legal_hold.is_enabled()) {
417 return false;
418 }
419 }
420 return true;
421 }
422 }
423
424 class LCObjsLister {
425 rgw::sal::Store* store;
426 rgw::sal::Bucket* bucket;
427 rgw::sal::Bucket::ListParams list_params;
428 rgw::sal::Bucket::ListResults list_results;
429 string prefix;
430 vector<rgw_bucket_dir_entry>::iterator obj_iter;
431 rgw_bucket_dir_entry pre_obj;
432 int64_t delay_ms;
433
434 public:
435 LCObjsLister(rgw::sal::Store* _store, rgw::sal::Bucket* _bucket) :
436 store(_store), bucket(_bucket) {
437 list_params.list_versions = bucket->versioned();
438 list_params.allow_unordered = true;
439 delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
440 }
441
442 void set_prefix(const string& p) {
443 prefix = p;
444 list_params.prefix = prefix;
445 }
446
447 int init(const DoutPrefixProvider *dpp) {
448 return fetch(dpp);
449 }
450
451 int fetch(const DoutPrefixProvider *dpp) {
452 int ret = bucket->list(dpp, list_params, 1000, list_results, null_yield);
453 if (ret < 0) {
454 return ret;
455 }
456
457 obj_iter = list_results.objs.begin();
458
459 return 0;
460 }
461
462 void delay() {
463 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
464 }
465
466 bool get_obj(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry **obj,
467 std::function<void(void)> fetch_barrier
468 = []() { /* nada */}) {
469 if (obj_iter == list_results.objs.end()) {
470 if (!list_results.is_truncated) {
471 delay();
472 return false;
473 } else {
474 fetch_barrier();
475 list_params.marker = pre_obj.key;
476 int ret = fetch(dpp);
477 if (ret < 0) {
478 ldpp_dout(dpp, 0) << "ERROR: list_op returned ret=" << ret
479 << dendl;
480 return false;
481 }
482 }
483 delay();
484 }
485 /* returning address of entry in objs */
486 *obj = &(*obj_iter);
487 return obj_iter != list_results.objs.end();
488 }
489
490 rgw_bucket_dir_entry get_prev_obj() {
491 return pre_obj;
492 }
493
494 void next() {
495 pre_obj = *obj_iter;
496 ++obj_iter;
497 }
498
499 boost::optional<std::string> next_key_name() {
500 if (obj_iter == list_results.objs.end() ||
501 (obj_iter + 1) == list_results.objs.end()) {
502 /* this should have been called after get_obj() was called, so this should
503 * only happen if is_truncated is false */
504 return boost::none;
505 }
506
507 return ((obj_iter + 1)->key.name);
508 }
509
510 }; /* LCObjsLister */
511
512 struct op_env {
513
514 using LCWorker = RGWLC::LCWorker;
515
516 lc_op op;
517 rgw::sal::Store* store;
518 LCWorker* worker;
519 rgw::sal::Bucket* bucket;
520 LCObjsLister& ol;
521
522 op_env(lc_op& _op, rgw::sal::Store* _store, LCWorker* _worker,
523 rgw::sal::Bucket* _bucket, LCObjsLister& _ol)
524 : op(_op), store(_store), worker(_worker), bucket(_bucket),
525 ol(_ol) {}
526 }; /* op_env */
527
528 class LCRuleOp;
529 class WorkQ;
530
531 struct lc_op_ctx {
532 CephContext *cct;
533 op_env env;
534 rgw_bucket_dir_entry o;
535 boost::optional<std::string> next_key_name;
536 ceph::real_time effective_mtime;
537
538 rgw::sal::Store* store;
539 rgw::sal::Bucket* bucket;
540 lc_op& op; // ok--refers to expanded env.op
541 LCObjsLister& ol;
542
543 std::unique_ptr<rgw::sal::Object> obj;
544 RGWObjectCtx rctx;
545 const DoutPrefixProvider *dpp;
546 WorkQ* wq;
547
548 RGWZoneGroupPlacementTier tier = {};
549
550 lc_op_ctx(op_env& env, rgw_bucket_dir_entry& o,
551 boost::optional<std::string> next_key_name,
552 ceph::real_time effective_mtime,
553 const DoutPrefixProvider *dpp, WorkQ* wq)
554 : cct(env.store->ctx()), env(env), o(o), next_key_name(next_key_name),
555 effective_mtime(effective_mtime),
556 store(env.store), bucket(env.bucket), op(env.op), ol(env.ol),
557 rctx(env.store), dpp(dpp), wq(wq)
558 {
559 obj = bucket->get_object(o.key);
560 }
561
562 bool next_has_same_name(const std::string& key_name) {
563 return (next_key_name && key_name.compare(
564 boost::get<std::string>(next_key_name)) == 0);
565 }
566
567 }; /* lc_op_ctx */
568
569
570 static std::string lc_id = "rgw lifecycle";
571 static std::string lc_req_id = "0";
572
573 static int remove_expired_obj(
574 const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
575 rgw::notify::EventType event_type)
576 {
577 auto& store = oc.store;
578 auto& bucket_info = oc.bucket->get_info();
579 auto& o = oc.o;
580 auto obj_key = o.key;
581 auto& meta = o.meta;
582 int ret;
583 std::string version_id;
584
585 if (!remove_indeed) {
586 obj_key.instance.clear();
587 } else if (obj_key.instance.empty()) {
588 obj_key.instance = "null";
589 }
590
591 std::unique_ptr<rgw::sal::Bucket> bucket;
592 std::unique_ptr<rgw::sal::Object> obj;
593
594 ret = store->get_bucket(nullptr, bucket_info, &bucket);
595 if (ret < 0) {
596 return ret;
597 }
598
599 // XXXX currently, rgw::sal::Bucket.owner is always null here
600 std::unique_ptr<rgw::sal::User> user;
601 if (! bucket->get_owner()) {
602 auto& bucket_info = bucket->get_info();
603 user = store->get_user(bucket_info.owner);
604 // forgive me, lord
605 if (user) {
606 bucket->set_owner(user.get());
607 }
608 }
609
610 obj = bucket->get_object(obj_key);
611 std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
612 = obj->get_delete_op(&oc.rctx);
613 del_op->params.versioning_status
614 = obj->get_bucket()->get_info().versioning_status();
615 del_op->params.obj_owner.set_id(rgw_user {meta.owner});
616 del_op->params.obj_owner.set_name(meta.owner_display_name);
617 del_op->params.bucket_owner.set_id(bucket_info.owner);
618 del_op->params.unmod_since = meta.mtime;
619 del_op->params.marker_version_id = version_id;
620
621 std::unique_ptr<rgw::sal::Notification> notify
622 = store->get_notification(dpp, obj.get(), nullptr, &oc.rctx, event_type,
623 bucket.get(), lc_id,
624 const_cast<std::string&>(oc.bucket->get_tenant()),
625 lc_req_id, null_yield);
626
627 /* can eliminate cast when reservation is lifted into Notification */
628 auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
629
630 ret = rgw::notify::publish_reserve(dpp, event_type, notify_res, nullptr);
631 if (ret < 0) {
632 ldpp_dout(dpp, 1)
633 << "ERROR: notify reservation failed, deferring delete of object k="
634 << o.key
635 << dendl;
636 return ret;
637 }
638
639 ret = del_op->delete_obj(dpp, null_yield);
640 if (ret < 0) {
641 ldpp_dout(dpp, 1) <<
642 "ERROR: publishing notification failed, with error: " << ret << dendl;
643 } else {
644 // send request to notification manager
645 (void) rgw::notify::publish_commit(
646 obj.get(), obj->get_obj_size(), ceph::real_clock::now(),
647 obj->get_attrs()[RGW_ATTR_ETAG].to_str(), version_id, event_type,
648 notify_res, dpp);
649 }
650
651 return ret;
652
653 } /* remove_expired_obj */
654
655 class LCOpAction {
656 public:
657 virtual ~LCOpAction() {}
658
659 virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) {
660 return false;
661 }
662
663 /* called after check(). Check should tell us whether this action
664 * is applicable. If there are multiple actions, we'll end up executing
665 * the latest applicable action
666 * For example:
667 * one action after 10 days, another after 20, third after 40.
668 * After 10 days, the latest applicable action would be the first one,
669 * after 20 days it will be the second one. After 21 days it will still be the
670 * second one. So check() should return true for the second action at that point,
671 * but should_process() if the action has already been applied. In object removal
672 * it doesn't matter, but in object transition it does.
673 */
674 virtual bool should_process() {
675 return true;
676 }
677
678 virtual int process(lc_op_ctx& oc) {
679 return 0;
680 }
681
682 friend class LCOpRule;
683 }; /* LCOpAction */
684
685 class LCOpFilter {
686 public:
687 virtual ~LCOpFilter() {}
688 virtual bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) {
689 return false;
690 }
691 }; /* LCOpFilter */
692
693 class LCOpRule {
694 friend class LCOpAction;
695
696 op_env env;
697 boost::optional<std::string> next_key_name;
698 ceph::real_time effective_mtime;
699
700 std::vector<shared_ptr<LCOpFilter> > filters; // n.b., sharing ovhd
701 std::vector<shared_ptr<LCOpAction> > actions;
702
703 public:
704 LCOpRule(op_env& _env) : env(_env) {}
705
706 boost::optional<std::string> get_next_key_name() {
707 return next_key_name;
708 }
709
710 std::vector<shared_ptr<LCOpAction>>& get_actions() {
711 return actions;
712 }
713
714 void build();
715 void update();
716 int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp,
717 WorkQ* wq);
718 }; /* LCOpRule */
719
720 using WorkItem =
721 boost::variant<void*,
722 /* out-of-line delete */
723 std::tuple<LCOpRule, rgw_bucket_dir_entry>,
724 /* uncompleted MPU expiration */
725 std::tuple<lc_op, rgw_bucket_dir_entry>,
726 rgw_bucket_dir_entry>;
727
728 class WorkQ : public Thread
729 {
730 public:
731 using unique_lock = std::unique_lock<std::mutex>;
732 using work_f = std::function<void(RGWLC::LCWorker*, WorkQ*, WorkItem&)>;
733 using dequeue_result = boost::variant<void*, WorkItem>;
734
735 static constexpr uint32_t FLAG_NONE = 0x0000;
736 static constexpr uint32_t FLAG_EWAIT_SYNC = 0x0001;
737 static constexpr uint32_t FLAG_DWAIT_SYNC = 0x0002;
738 static constexpr uint32_t FLAG_EDRAIN_SYNC = 0x0004;
739
740 private:
741 const work_f bsf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {};
742 RGWLC::LCWorker* wk;
743 uint32_t qmax;
744 int ix;
745 std::mutex mtx;
746 std::condition_variable cv;
747 uint32_t flags;
748 vector<WorkItem> items;
749 work_f f;
750
751 public:
752 WorkQ(RGWLC::LCWorker* wk, uint32_t ix, uint32_t qmax)
753 : wk(wk), qmax(qmax), ix(ix), flags(FLAG_NONE), f(bsf)
754 {
755 create(thr_name().c_str());
756 }
757
758 std::string thr_name() {
759 return std::string{"wp_thrd: "}
760 + std::to_string(wk->ix) + ", " + std::to_string(ix);
761 }
762
763 void setf(work_f _f) {
764 f = _f;
765 }
766
767 void enqueue(WorkItem&& item) {
768 unique_lock uniq(mtx);
769 while ((!wk->get_lc()->going_down()) &&
770 (items.size() > qmax)) {
771 flags |= FLAG_EWAIT_SYNC;
772 cv.wait_for(uniq, 200ms);
773 }
774 items.push_back(item);
775 if (flags & FLAG_DWAIT_SYNC) {
776 flags &= ~FLAG_DWAIT_SYNC;
777 cv.notify_one();
778 }
779 }
780
781 void drain() {
782 unique_lock uniq(mtx);
783 flags |= FLAG_EDRAIN_SYNC;
784 while (flags & FLAG_EDRAIN_SYNC) {
785 cv.wait_for(uniq, 200ms);
786 }
787 }
788
789 private:
790 dequeue_result dequeue() {
791 unique_lock uniq(mtx);
792 while ((!wk->get_lc()->going_down()) &&
793 (items.size() == 0)) {
794 /* clear drain state, as we are NOT doing work and qlen==0 */
795 if (flags & FLAG_EDRAIN_SYNC) {
796 flags &= ~FLAG_EDRAIN_SYNC;
797 }
798 flags |= FLAG_DWAIT_SYNC;
799 cv.wait_for(uniq, 200ms);
800 }
801 if (items.size() > 0) {
802 auto item = items.back();
803 items.pop_back();
804 if (flags & FLAG_EWAIT_SYNC) {
805 flags &= ~FLAG_EWAIT_SYNC;
806 cv.notify_one();
807 }
808 return {item};
809 }
810 return nullptr;
811 }
812
813 void* entry() override {
814 while (!wk->get_lc()->going_down()) {
815 auto item = dequeue();
816 if (item.which() == 0) {
817 /* going down */
818 break;
819 }
820 f(wk, this, boost::get<WorkItem>(item));
821 }
822 return nullptr;
823 }
824 }; /* WorkQ */
825
826 class RGWLC::WorkPool
827 {
828 using TVector = ceph::containers::tiny_vector<WorkQ, 3>;
829 TVector wqs;
830 uint64_t ix;
831
832 public:
833 WorkPool(RGWLC::LCWorker* wk, uint16_t n_threads, uint32_t qmax)
834 : wqs(TVector{
835 n_threads,
836 [&](const size_t ix, auto emplacer) {
837 emplacer.emplace(wk, ix, qmax);
838 }}),
839 ix(0)
840 {}
841
842 ~WorkPool() {
843 for (auto& wq : wqs) {
844 wq.join();
845 }
846 }
847
848 void setf(WorkQ::work_f _f) {
849 for (auto& wq : wqs) {
850 wq.setf(_f);
851 }
852 }
853
854 void enqueue(WorkItem item) {
855 const auto tix = ix;
856 ix = (ix+1) % wqs.size();
857 (wqs[tix]).enqueue(std::move(item));
858 }
859
860 void drain() {
861 for (auto& wq : wqs) {
862 wq.drain();
863 }
864 }
865 }; /* WorkPool */
866
867 RGWLC::LCWorker::LCWorker(const DoutPrefixProvider* dpp, CephContext *cct,
868 RGWLC *lc, int ix)
869 : dpp(dpp), cct(cct), lc(lc), ix(ix)
870 {
871 auto wpw = cct->_conf.get_val<int64_t>("rgw_lc_max_wp_worker");
872 workpool = new WorkPool(this, wpw, 512);
873 }
874
875 static inline bool worker_should_stop(time_t stop_at, bool once)
876 {
877 return !once && stop_at < time(nullptr);
878 }
879
880 int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
881 const multimap<string, lc_op>& prefix_map,
882 LCWorker* worker, time_t stop_at, bool once)
883 {
884 MultipartMetaFilter mp_filter;
885 int ret;
886 rgw::sal::Bucket::ListParams params;
887 rgw::sal::Bucket::ListResults results;
888 auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
889 params.list_versions = false;
890 /* lifecycle processing does not depend on total order, so can
891 * take advantage of unordered listing optimizations--such as
892 * operating on one shard at a time */
893 params.allow_unordered = true;
894 params.ns = RGW_OBJ_NS_MULTIPART;
895 params.access_list_filter = &mp_filter;
896
897 auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
898 auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
899 auto& [rule, obj] = wt;
900 if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
901 rgw_obj_key key(obj.key);
902 std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
903 RGWObjectCtx rctx(store);
904 int ret = mpu->abort(this, cct, &rctx);
905 if (ret == 0) {
906 if (perfcounter) {
907 perfcounter->inc(l_rgw_lc_abort_mpu, 1);
908 }
909 } else {
910 if (ret == -ERR_NO_SUCH_UPLOAD) {
911 ldpp_dout(wk->get_lc(), 5)
912 << "ERROR: abort_multipart_upload failed, ret=" << ret
913 << ", thread:" << wq->thr_name()
914 << ", meta:" << obj.key
915 << dendl;
916 } else {
917 ldpp_dout(wk->get_lc(), 0)
918 << "ERROR: abort_multipart_upload failed, ret=" << ret
919 << ", thread:" << wq->thr_name()
920 << ", meta:" << obj.key
921 << dendl;
922 }
923 } /* abort failed */
924 } /* expired */
925 };
926
927 worker->workpool->setf(pf);
928
929 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
930 ++prefix_iter) {
931
932 if (worker_should_stop(stop_at, once)) {
933 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
934 << worker->ix
935 << dendl;
936 return 0;
937 }
938
939 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
940 continue;
941 }
942 params.prefix = prefix_iter->first;
943 do {
944 results.objs.clear();
945 ret = target->list(this, params, 1000, results, null_yield);
946 if (ret < 0) {
947 if (ret == (-ENOENT))
948 return 0;
949 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
950 return ret;
951 }
952
953 for (auto obj_iter = results.objs.begin(); obj_iter != results.objs.end(); ++obj_iter) {
954 std::tuple<lc_op, rgw_bucket_dir_entry> t1 =
955 {prefix_iter->second, *obj_iter};
956 worker->workpool->enqueue(WorkItem{t1});
957 if (going_down()) {
958 return 0;
959 }
960 } /* for objs */
961
962 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
963 } while(results.is_truncated);
964 } /* for prefix_map */
965
966 worker->workpool->drain();
967 return 0;
968 }
969
970 static int read_obj_tags(const DoutPrefixProvider *dpp, rgw::sal::Object* obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
971 {
972 std::unique_ptr<rgw::sal::Object::ReadOp> rop = obj->get_read_op(&ctx);
973
974 return rop->get_attr(dpp, RGW_ATTR_TAGS, tags_bl, null_yield);
975 }
976
977 static bool is_valid_op(const lc_op& op)
978 {
979 return (op.status &&
980 (op.expiration > 0
981 || op.expiration_date != boost::none
982 || op.noncur_expiration > 0
983 || op.dm_expiration
984 || !op.transitions.empty()
985 || !op.noncur_transitions.empty()));
986 }
987
988 static inline bool has_all_tags(const lc_op& rule_action,
989 const RGWObjTags& object_tags)
990 {
991 if(! rule_action.obj_tags)
992 return false;
993 if(object_tags.count() < rule_action.obj_tags->count())
994 return false;
995 size_t tag_count = 0;
996 for (const auto& tag : object_tags.get_tags()) {
997 const auto& rule_tags = rule_action.obj_tags->get_tags();
998 const auto& iter = rule_tags.find(tag.first);
999 if(iter == rule_tags.end())
1000 continue;
1001 if(iter->second == tag.second)
1002 {
1003 tag_count++;
1004 }
1005 /* all tags in the rule appear in obj tags */
1006 }
1007 return tag_count == rule_action.obj_tags->count();
1008 }
1009
1010 static int check_tags(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool *skip)
1011 {
1012 auto& op = oc.op;
1013
1014 if (op.obj_tags != boost::none) {
1015 *skip = true;
1016
1017 bufferlist tags_bl;
1018 int ret = read_obj_tags(dpp, oc.obj.get(), oc.rctx, tags_bl);
1019 if (ret < 0) {
1020 if (ret != -ENODATA) {
1021 ldpp_dout(oc.dpp, 5) << "ERROR: read_obj_tags returned r="
1022 << ret << " " << oc.wq->thr_name() << dendl;
1023 }
1024 return 0;
1025 }
1026 RGWObjTags dest_obj_tags;
1027 try {
1028 auto iter = tags_bl.cbegin();
1029 dest_obj_tags.decode(iter);
1030 } catch (buffer::error& err) {
1031 ldpp_dout(oc.dpp,0) << "ERROR: caught buffer::error, couldn't decode TagSet "
1032 << oc.wq->thr_name() << dendl;
1033 return -EIO;
1034 }
1035
1036 if (! has_all_tags(op, dest_obj_tags)) {
1037 ldpp_dout(oc.dpp, 20) << __func__ << "() skipping obj " << oc.obj
1038 << " as tags do not match in rule: "
1039 << op.id << " "
1040 << oc.wq->thr_name() << dendl;
1041 return 0;
1042 }
1043 }
1044 *skip = false;
1045 return 0;
1046 }
1047
1048 class LCOpFilter_Tags : public LCOpFilter {
1049 public:
1050 bool check(const DoutPrefixProvider *dpp, lc_op_ctx& oc) override {
1051 auto& o = oc.o;
1052
1053 if (o.is_delete_marker()) {
1054 return true;
1055 }
1056
1057 bool skip;
1058
1059 int ret = check_tags(dpp, oc, &skip);
1060 if (ret < 0) {
1061 if (ret == -ENOENT) {
1062 return false;
1063 }
1064 ldpp_dout(oc.dpp, 0) << "ERROR: check_tags on obj=" << oc.obj
1065 << " returned ret=" << ret << " "
1066 << oc.wq->thr_name() << dendl;
1067 return false;
1068 }
1069
1070 return !skip;
1071 };
1072 };
1073
1074 class LCOpAction_CurrentExpiration : public LCOpAction {
1075 public:
1076 LCOpAction_CurrentExpiration(op_env& env) {}
1077
1078 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1079 auto& o = oc.o;
1080 if (!o.is_current()) {
1081 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1082 << ": not current, skipping "
1083 << oc.wq->thr_name() << dendl;
1084 return false;
1085 }
1086 if (o.is_delete_marker()) {
1087 std::string nkn;
1088 if (oc.next_key_name) nkn = *oc.next_key_name;
1089 if (oc.next_has_same_name(o.key.name)) {
1090 ldpp_dout(dpp, 7) << __func__ << "(): dm-check SAME: key=" << o.key
1091 << " next_key_name: %%" << nkn << "%% "
1092 << oc.wq->thr_name() << dendl;
1093 return false;
1094 } else {
1095 ldpp_dout(dpp, 7) << __func__ << "(): dm-check DELE: key=" << o.key
1096 << " next_key_name: %%" << nkn << "%% "
1097 << oc.wq->thr_name() << dendl;
1098 *exp_time = real_clock::now();
1099 return true;
1100 }
1101 }
1102
1103 auto& mtime = o.meta.mtime;
1104 bool is_expired;
1105 auto& op = oc.op;
1106 if (op.expiration <= 0) {
1107 if (op.expiration_date == boost::none) {
1108 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1109 << ": no expiration set in rule, skipping "
1110 << oc.wq->thr_name() << dendl;
1111 return false;
1112 }
1113 is_expired = ceph_clock_now() >=
1114 ceph::real_clock::to_time_t(*op.expiration_date);
1115 *exp_time = *op.expiration_date;
1116 } else {
1117 is_expired = obj_has_expired(dpp, oc.cct, mtime, op.expiration, exp_time);
1118 }
1119
1120 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1121 << (int)is_expired << " "
1122 << oc.wq->thr_name() << dendl;
1123 return is_expired;
1124 }
1125
1126 int process(lc_op_ctx& oc) {
1127 auto& o = oc.o;
1128 int r;
1129 if (o.is_delete_marker()) {
1130 r = remove_expired_obj(oc.dpp, oc, true,
1131 rgw::notify::ObjectExpirationDeleteMarker);
1132 if (r < 0) {
1133 ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
1134 << oc.bucket << ":" << o.key
1135 << " " << cpp_strerror(r) << " "
1136 << oc.wq->thr_name() << dendl;
1137 return r;
1138 }
1139 ldpp_dout(oc.dpp, 2) << "DELETED: current is-dm "
1140 << oc.bucket << ":" << o.key
1141 << " " << oc.wq->thr_name() << dendl;
1142 } else {
1143 /* ! o.is_delete_marker() */
1144 r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
1145 rgw::notify::ObjectExpirationCurrent);
1146 if (r < 0) {
1147 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
1148 << oc.bucket << ":" << o.key
1149 << " " << cpp_strerror(r) << " "
1150 << oc.wq->thr_name() << dendl;
1151 return r;
1152 }
1153 if (perfcounter) {
1154 perfcounter->inc(l_rgw_lc_expire_current, 1);
1155 }
1156 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1157 << " " << oc.wq->thr_name() << dendl;
1158 }
1159 return 0;
1160 }
1161 };
1162
1163 class LCOpAction_NonCurrentExpiration : public LCOpAction {
1164 protected:
1165 public:
1166 LCOpAction_NonCurrentExpiration(op_env& env)
1167 {}
1168
1169 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1170 auto& o = oc.o;
1171 if (o.is_current()) {
1172 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1173 << ": current version, skipping "
1174 << oc.wq->thr_name() << dendl;
1175 return false;
1176 }
1177
1178 int expiration = oc.op.noncur_expiration;
1179 bool is_expired = obj_has_expired(dpp, oc.cct, oc.effective_mtime, expiration,
1180 exp_time);
1181
1182 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1183 << is_expired << " "
1184 << oc.wq->thr_name() << dendl;
1185
1186 return is_expired &&
1187 pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, dpp);
1188 }
1189
1190 int process(lc_op_ctx& oc) {
1191 auto& o = oc.o;
1192 int r = remove_expired_obj(oc.dpp, oc, true,
1193 rgw::notify::ObjectExpirationNoncurrent);
1194 if (r < 0) {
1195 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) "
1196 << oc.bucket << ":" << o.key
1197 << " " << cpp_strerror(r)
1198 << " " << oc.wq->thr_name() << dendl;
1199 return r;
1200 }
1201 if (perfcounter) {
1202 perfcounter->inc(l_rgw_lc_expire_noncurrent, 1);
1203 }
1204 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1205 << " (non-current expiration) "
1206 << oc.wq->thr_name() << dendl;
1207 return 0;
1208 }
1209 };
1210
1211 class LCOpAction_DMExpiration : public LCOpAction {
1212 public:
1213 LCOpAction_DMExpiration(op_env& env) {}
1214
1215 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1216 auto& o = oc.o;
1217 if (!o.is_delete_marker()) {
1218 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1219 << ": not a delete marker, skipping "
1220 << oc.wq->thr_name() << dendl;
1221 return false;
1222 }
1223 if (oc.next_has_same_name(o.key.name)) {
1224 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1225 << ": next is same object, skipping "
1226 << oc.wq->thr_name() << dendl;
1227 return false;
1228 }
1229
1230 *exp_time = real_clock::now();
1231
1232 return true;
1233 }
1234
1235 int process(lc_op_ctx& oc) {
1236 auto& o = oc.o;
1237 int r = remove_expired_obj(oc.dpp, oc, true,
1238 rgw::notify::ObjectExpirationDeleteMarker);
1239 if (r < 0) {
1240 ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
1241 << oc.bucket << ":" << o.key
1242 << " " << cpp_strerror(r)
1243 << " " << oc.wq->thr_name()
1244 << dendl;
1245 return r;
1246 }
1247 if (perfcounter) {
1248 perfcounter->inc(l_rgw_lc_expire_dm, 1);
1249 }
1250 ldpp_dout(oc.dpp, 2) << "DELETED:" << oc.bucket << ":" << o.key
1251 << " (delete marker expiration) "
1252 << oc.wq->thr_name() << dendl;
1253 return 0;
1254 }
1255 };
1256
1257 class LCOpAction_Transition : public LCOpAction {
1258 const transition_action& transition;
1259 bool need_to_process{false};
1260
1261 protected:
1262 virtual bool check_current_state(bool is_current) = 0;
1263 virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
1264 public:
1265 LCOpAction_Transition(const transition_action& _transition)
1266 : transition(_transition) {}
1267
1268 bool check(lc_op_ctx& oc, ceph::real_time *exp_time, const DoutPrefixProvider *dpp) override {
1269 auto& o = oc.o;
1270
1271 if (o.is_delete_marker()) {
1272 return false;
1273 }
1274
1275 if (!check_current_state(o.is_current())) {
1276 return false;
1277 }
1278
1279 auto mtime = get_effective_mtime(oc);
1280 bool is_expired;
1281 if (transition.days < 0) {
1282 if (transition.date == boost::none) {
1283 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1284 << ": no transition day/date set in rule, skipping "
1285 << oc.wq->thr_name() << dendl;
1286 return false;
1287 }
1288 is_expired = ceph_clock_now() >=
1289 ceph::real_clock::to_time_t(*transition.date);
1290 *exp_time = *transition.date;
1291 } else {
1292 is_expired = obj_has_expired(dpp, oc.cct, mtime, transition.days, exp_time);
1293 }
1294
1295 ldpp_dout(oc.dpp, 20) << __func__ << "(): key=" << o.key << ": is_expired="
1296 << is_expired << " "
1297 << oc.wq->thr_name() << dendl;
1298
1299 need_to_process =
1300 (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) !=
1301 transition.storage_class);
1302
1303 return is_expired;
1304 }
1305
1306 bool should_process() override {
1307 return need_to_process;
1308 }
1309
1310 /* find out if the the storage class is remote cloud */
1311 int get_tier_target(const RGWZoneGroup &zonegroup, const rgw_placement_rule& rule,
1312 RGWZoneGroupPlacementTier &tier) {
1313 std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
1314 titer = zonegroup.placement_targets.find(rule.name);
1315 if (titer == zonegroup.placement_targets.end()) {
1316 return -ENOENT;
1317 }
1318
1319 const auto& target_rule = titer->second;
1320 std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier;
1321 ttier = target_rule.tier_targets.find(rule.storage_class);
1322 if (ttier != target_rule.tier_targets.end()) {
1323 tier = ttier->second;
1324 } else { // not found
1325 return -ENOENT;
1326 }
1327 return 0;
1328 }
1329
1330 int delete_tier_obj(lc_op_ctx& oc) {
1331 int ret = 0;
1332
1333 /* If bucket is versioned, create delete_marker for current version
1334 */
1335 if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
1336 ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
1337 ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
1338 } else {
1339 ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
1340 ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
1341 }
1342 return ret;
1343 }
1344
1345 int update_tier_obj(lc_op_ctx& oc, RGWLCCloudTierCtx& tier_ctx) {
1346
1347 map<string, bufferlist> attrs;
1348 int r = 0;
1349
1350 real_time read_mtime;
1351
1352 std::unique_ptr<rgw::sal::Object::ReadOp> read_op(oc.obj->get_read_op(&oc.rctx));
1353
1354 read_op->params.lastmod = &read_mtime;
1355
1356 r = read_op->prepare(null_yield, oc.dpp);
1357 if (r < 0) {
1358 return r;
1359 }
1360
1361 if (read_mtime != tier_ctx.o.meta.mtime) {
1362 /* raced */
1363 return -ECANCELED;
1364 }
1365
1366 attrs = oc.obj->get_attrs();
1367
1368 rgw::sal::RadosStore *rados = static_cast<rgw::sal::RadosStore*>(oc.store);
1369 RGWRados::Object op_target(rados->getRados(), oc.bucket->get_info(), oc.rctx, oc.obj->get_obj());
1370 RGWRados::Object::Write obj_op(&op_target);
1371
1372 obj_op.meta.modify_tail = true;
1373 obj_op.meta.flags = PUT_OBJ_CREATE;
1374 obj_op.meta.category = RGWObjCategory::CloudTiered;
1375 obj_op.meta.delete_at = real_time();
1376 bufferlist blo;
1377 obj_op.meta.data = &blo;
1378 obj_op.meta.if_match = NULL;
1379 obj_op.meta.if_nomatch = NULL;
1380 obj_op.meta.user_data = NULL;
1381 obj_op.meta.zones_trace = NULL;
1382 obj_op.meta.delete_at = real_time();
1383 obj_op.meta.olh_epoch = tier_ctx.o.versioned_epoch;
1384
1385 RGWObjManifest *pmanifest;
1386 RGWObjManifest manifest;
1387
1388 pmanifest = &manifest;
1389 RGWObjTier tier_config;
1390 tier_config.name = oc.tier.storage_class;
1391 tier_config.tier_placement = oc.tier;
1392 tier_config.is_multipart_upload = tier_ctx.is_multipart_upload;
1393
1394 pmanifest->set_tier_type("cloud-s3");
1395 pmanifest->set_tier_config(tier_config);
1396
1397 /* check if its necessary */
1398 rgw_placement_rule target_placement;
1399 target_placement.inherit_from(tier_ctx.bucket_info.placement_rule);
1400 target_placement.storage_class = oc.tier.storage_class;
1401 pmanifest->set_head(target_placement, tier_ctx.obj->get_obj(), 0);
1402
1403 pmanifest->set_tail_placement(target_placement, tier_ctx.obj->get_obj().bucket);
1404
1405 pmanifest->set_obj_size(0);
1406
1407 obj_op.meta.manifest = pmanifest;
1408
1409 /* update storage class */
1410 bufferlist bl;
1411 bl.append(oc.tier.storage_class);
1412 attrs[RGW_ATTR_STORAGE_CLASS] = bl;
1413
1414 attrs.erase(RGW_ATTR_ID_TAG);
1415 attrs.erase(RGW_ATTR_TAIL_TAG);
1416
1417 r = obj_op.write_meta(oc.dpp, 0, 0, attrs, null_yield);
1418 if (r < 0) {
1419 return r;
1420 }
1421
1422 return 0;
1423 }
1424
1425 int transition_obj_to_cloud(lc_op_ctx& oc) {
1426 /* init */
1427 string id = "cloudid";
1428 string endpoint = oc.tier.t.s3.endpoint;
1429 RGWAccessKey key = oc.tier.t.s3.key;
1430 string region = oc.tier.t.s3.region;
1431 HostStyle host_style = oc.tier.t.s3.host_style;
1432 string bucket_name = oc.tier.t.s3.target_path;
1433 const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
1434 bool delete_object;
1435
1436 /* If CurrentVersion object, remove it & create delete marker */
1437 delete_object = (!oc.tier.retain_head_object ||
1438 (oc.o.is_current() && oc.bucket->versioned()));
1439
1440 if (bucket_name.empty()) {
1441 bucket_name = "rgwx-" + zonegroup.get_name() + "-" + oc.tier.storage_class +
1442 "-cloud-bucket";
1443 boost::algorithm::to_lower(bucket_name);
1444 }
1445
1446 /* Create RGW REST connection */
1447 S3RESTConn conn(oc.cct, oc.store, id, { endpoint }, key, region, host_style);
1448
1449 RGWLCCloudTierCtx tier_ctx(oc.cct, oc.dpp, oc.o, oc.store, oc.bucket->get_info(),
1450 oc.obj.get(), oc.rctx, conn, bucket_name,
1451 oc.tier.t.s3.target_storage_class);
1452 tier_ctx.acl_mappings = oc.tier.t.s3.acl_mappings;
1453 tier_ctx.multipart_min_part_size = oc.tier.t.s3.multipart_min_part_size;
1454 tier_ctx.multipart_sync_threshold = oc.tier.t.s3.multipart_sync_threshold;
1455 tier_ctx.storage_class = oc.tier.storage_class;
1456
1457 std::set<std::string>& cloud_targets = oc.env.worker->get_cloud_targets();
1458
1459 ldpp_dout(oc.dpp, 0) << "Transitioning object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ")" << dendl;
1460
1461 /* Transition object to cloud end point */
1462 int ret = rgw_cloud_tier_transfer_object(tier_ctx, cloud_targets);
1463
1464 if (ret < 0) {
1465 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transfer object(" << oc.o.key << ") to the cloud endpoint(" << endpoint << ") ret=" << ret << dendl;
1466 return ret;
1467 }
1468
1469 if (delete_object) {
1470 ret = delete_tier_obj(oc);
1471 if (ret < 0) {
1472 ldpp_dout(oc.dpp, 0) << "ERROR: Deleting tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
1473 return ret;
1474 }
1475 } else {
1476 ret = update_tier_obj(oc, tier_ctx);
1477 if (ret < 0) {
1478 ldpp_dout(oc.dpp, 0) << "ERROR: Updating tier object(" << oc.o.key << ") failed ret=" << ret << dendl;
1479 return ret;
1480 }
1481 }
1482
1483 return 0;
1484 }
1485
1486 int process(lc_op_ctx& oc) {
1487 auto& o = oc.o;
1488 int r;
1489
1490 if (oc.o.meta.category == RGWObjCategory::CloudTiered) {
1491 /* Skip objects which are already cloud tiered. */
1492 ldpp_dout(oc.dpp, 30) << "Object(key:" << oc.o.key << ") is already cloud tiered to cloud-s3 tier: " << oc.o.meta.storage_class << dendl;
1493 return 0;
1494 }
1495
1496 std::string tier_type = "";
1497 const RGWZoneGroup& zonegroup = oc.store->get_zone()->get_zonegroup();
1498
1499 rgw_placement_rule target_placement;
1500 target_placement.inherit_from(oc.bucket->get_placement_rule());
1501 target_placement.storage_class = transition.storage_class;
1502
1503 r = get_tier_target(zonegroup, target_placement, oc.tier);
1504
1505 if (!r && oc.tier.tier_type == "cloud-s3") {
1506 ldpp_dout(oc.dpp, 30) << "Found cloud s3 tier: " << target_placement.storage_class << dendl;
1507 if (!oc.o.is_current() &&
1508 !pass_object_lock_check(oc.store, oc.obj.get(), oc.rctx, oc.dpp)) {
1509 /* Skip objects which has object lock enabled. */
1510 ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is locked. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
1511 return 0;
1512 }
1513
1514 /* Allow transition for only RadosStore */
1515 rgw::sal::RadosStore *rados = dynamic_cast<rgw::sal::RadosStore*>(oc.store);
1516
1517 if (!rados) {
1518 ldpp_dout(oc.dpp, 10) << "Object(key:" << oc.o.key << ") is not on RadosStore. Skipping transition to cloud-s3 tier: " << target_placement.storage_class << dendl;
1519 return -1;
1520 }
1521
1522 r = transition_obj_to_cloud(oc);
1523 if (r < 0) {
1524 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj(key:" << oc.o.key << ") to cloud (r=" << r << ")"
1525 << dendl;
1526 return r;
1527 }
1528 } else {
1529 if (!oc.store->get_zone()->get_params().
1530 valid_placement(target_placement)) {
1531 ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: "
1532 << target_placement
1533 << " bucket="<< oc.bucket
1534 << " rule_id=" << oc.op.id
1535 << " " << oc.wq->thr_name() << dendl;
1536 return -EINVAL;
1537 }
1538
1539 int r = oc.obj->transition(oc.rctx, oc.bucket, target_placement, o.meta.mtime,
1540 o.versioned_epoch, oc.dpp, null_yield);
1541 if (r < 0) {
1542 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
1543 << oc.bucket << ":" << o.key
1544 << " -> " << transition.storage_class
1545 << " " << cpp_strerror(r)
1546 << " " << oc.wq->thr_name() << dendl;
1547 return r;
1548 }
1549 }
1550 ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket
1551 << ":" << o.key << " -> "
1552 << transition.storage_class
1553 << " " << oc.wq->thr_name() << dendl;
1554 return 0;
1555 }
1556 };
1557
1558 class LCOpAction_CurrentTransition : public LCOpAction_Transition {
1559 protected:
1560 bool check_current_state(bool is_current) override {
1561 return is_current;
1562 }
1563
1564 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1565 return oc.o.meta.mtime;
1566 }
1567 public:
1568 LCOpAction_CurrentTransition(const transition_action& _transition)
1569 : LCOpAction_Transition(_transition) {}
1570 int process(lc_op_ctx& oc) {
1571 int r = LCOpAction_Transition::process(oc);
1572 if (r == 0) {
1573 if (perfcounter) {
1574 perfcounter->inc(l_rgw_lc_transition_current, 1);
1575 }
1576 }
1577 return r;
1578 }
1579 };
1580
1581 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
1582 protected:
1583 bool check_current_state(bool is_current) override {
1584 return !is_current;
1585 }
1586
1587 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
1588 return oc.effective_mtime;
1589 }
1590 public:
1591 LCOpAction_NonCurrentTransition(op_env& env,
1592 const transition_action& _transition)
1593 : LCOpAction_Transition(_transition)
1594 {}
1595 int process(lc_op_ctx& oc) {
1596 int r = LCOpAction_Transition::process(oc);
1597 if (r == 0) {
1598 if (perfcounter) {
1599 perfcounter->inc(l_rgw_lc_transition_noncurrent, 1);
1600 }
1601 }
1602 return r;
1603 }
1604 };
1605
1606 void LCOpRule::build()
1607 {
1608 filters.emplace_back(new LCOpFilter_Tags);
1609
1610 auto& op = env.op;
1611
1612 if (op.expiration > 0 ||
1613 op.expiration_date != boost::none) {
1614 actions.emplace_back(new LCOpAction_CurrentExpiration(env));
1615 }
1616
1617 if (op.dm_expiration) {
1618 actions.emplace_back(new LCOpAction_DMExpiration(env));
1619 }
1620
1621 if (op.noncur_expiration > 0) {
1622 actions.emplace_back(new LCOpAction_NonCurrentExpiration(env));
1623 }
1624
1625 for (auto& iter : op.transitions) {
1626 actions.emplace_back(new LCOpAction_CurrentTransition(iter.second));
1627 }
1628
1629 for (auto& iter : op.noncur_transitions) {
1630 actions.emplace_back(new LCOpAction_NonCurrentTransition(env, iter.second));
1631 }
1632 }
1633
1634 void LCOpRule::update()
1635 {
1636 next_key_name = env.ol.next_key_name();
1637 effective_mtime = env.ol.get_prev_obj().meta.mtime;
1638 }
1639
1640 int LCOpRule::process(rgw_bucket_dir_entry& o,
1641 const DoutPrefixProvider *dpp,
1642 WorkQ* wq)
1643 {
1644 lc_op_ctx ctx(env, o, next_key_name, effective_mtime, dpp, wq);
1645 shared_ptr<LCOpAction> *selected = nullptr; // n.b., req'd by sharing
1646 real_time exp;
1647
1648 for (auto& a : actions) {
1649 real_time action_exp;
1650
1651 if (a->check(ctx, &action_exp, dpp)) {
1652 if (action_exp > exp) {
1653 exp = action_exp;
1654 selected = &a;
1655 }
1656 }
1657 }
1658
1659 if (selected &&
1660 (*selected)->should_process()) {
1661
1662 /*
1663 * Calling filter checks after action checks because
1664 * all action checks (as they are implemented now) do
1665 * not access the objects themselves, but return result
1666 * from info from bucket index listing. The current tags filter
1667 * check does access the objects, so we avoid unnecessary rados calls
1668 * having filters check later in the process.
1669 */
1670
1671 bool cont = false;
1672 for (auto& f : filters) {
1673 if (f->check(dpp, ctx)) {
1674 cont = true;
1675 break;
1676 }
1677 }
1678
1679 if (!cont) {
1680 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key
1681 << ": no rule match, skipping "
1682 << wq->thr_name() << dendl;
1683 return 0;
1684 }
1685
1686 int r = (*selected)->process(ctx);
1687 if (r < 0) {
1688 ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
1689 << env.bucket << ":" << o.key
1690 << " " << cpp_strerror(r)
1691 << " " << wq->thr_name() << dendl;
1692 return r;
1693 }
1694 ldpp_dout(dpp, 20) << "processed:" << env.bucket << ":"
1695 << o.key << " " << wq->thr_name() << dendl;
1696 }
1697
1698 return 0;
1699
1700 }
1701
1702 int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
1703 time_t stop_at, bool once)
1704 {
1705 RGWLifecycleConfiguration config(cct);
1706 std::unique_ptr<rgw::sal::Bucket> bucket;
1707 string no_ns, list_versions;
1708 vector<rgw_bucket_dir_entry> objs;
1709 vector<std::string> result;
1710 boost::split(result, shard_id, boost::is_any_of(":"));
1711 string bucket_tenant = result[0];
1712 string bucket_name = result[1];
1713 string bucket_marker = result[2];
1714 int ret = store->get_bucket(this, nullptr, bucket_tenant, bucket_name, &bucket, null_yield);
1715 if (ret < 0) {
1716 ldpp_dout(this, 0) << "LC:get_bucket for " << bucket_name
1717 << " failed" << dendl;
1718 return ret;
1719 }
1720
1721 ret = bucket->load_bucket(this, null_yield);
1722 if (ret < 0) {
1723 ldpp_dout(this, 0) << "LC:load_bucket for " << bucket_name
1724 << " failed" << dendl;
1725 return ret;
1726 }
1727
1728 auto stack_guard = make_scope_guard(
1729 [&worker]
1730 {
1731 worker->workpool->drain();
1732 }
1733 );
1734
1735 if (bucket->get_marker() != bucket_marker) {
1736 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket="
1737 << bucket_tenant << ":" << bucket_name
1738 << " cur_marker=" << bucket->get_marker()
1739 << " orig_marker=" << bucket_marker << dendl;
1740 return -ENOENT;
1741 }
1742
1743 map<string, bufferlist>::iterator aiter
1744 = bucket->get_attrs().find(RGW_ATTR_LC);
1745 if (aiter == bucket->get_attrs().end()) {
1746 ldpp_dout(this, 0) << "WARNING: bucket_attrs.find(RGW_ATTR_LC) failed for "
1747 << bucket_name << " (terminates bucket_lc_process(...))"
1748 << dendl;
1749 return 0;
1750 }
1751
1752 bufferlist::const_iterator iter{&aiter->second};
1753 try {
1754 config.decode(iter);
1755 } catch (const buffer::error& e) {
1756 ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed"
1757 << dendl;
1758 return -1;
1759 }
1760
1761 auto pf = [](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
1762 auto wt =
1763 boost::get<std::tuple<LCOpRule, rgw_bucket_dir_entry>>(wi);
1764 auto& [op_rule, o] = wt;
1765
1766 ldpp_dout(wk->get_lc(), 20)
1767 << __func__ << "(): key=" << o.key << wq->thr_name()
1768 << dendl;
1769 int ret = op_rule.process(o, wk->dpp, wq);
1770 if (ret < 0) {
1771 ldpp_dout(wk->get_lc(), 20)
1772 << "ERROR: orule.process() returned ret=" << ret
1773 << "thread:" << wq->thr_name()
1774 << dendl;
1775 }
1776 };
1777 worker->workpool->setf(pf);
1778
1779 multimap<string, lc_op>& prefix_map = config.get_prefix_map();
1780 ldpp_dout(this, 10) << __func__ << "() prefix_map size="
1781 << prefix_map.size()
1782 << dendl;
1783
1784 rgw_obj_key pre_marker;
1785 rgw_obj_key next_marker;
1786 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end();
1787 ++prefix_iter) {
1788
1789 if (worker_should_stop(stop_at, once)) {
1790 ldpp_dout(this, 5) << __func__ << " interval budget EXPIRED worker "
1791 << worker->ix
1792 << dendl;
1793 return 0;
1794 }
1795
1796 auto& op = prefix_iter->second;
1797 if (!is_valid_op(op)) {
1798 continue;
1799 }
1800 ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first
1801 << dendl;
1802 if (prefix_iter != prefix_map.begin() &&
1803 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(),
1804 prev(prefix_iter)->first) == 0)) {
1805 next_marker = pre_marker;
1806 } else {
1807 pre_marker = next_marker;
1808 }
1809
1810 LCObjsLister ol(store, bucket.get());
1811 ol.set_prefix(prefix_iter->first);
1812
1813 ret = ol.init(this);
1814 if (ret < 0) {
1815 if (ret == (-ENOENT))
1816 return 0;
1817 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
1818 return ret;
1819 }
1820
1821 op_env oenv(op, store, worker, bucket.get(), ol);
1822 LCOpRule orule(oenv);
1823 orule.build(); // why can't ctor do it?
1824 rgw_bucket_dir_entry* o{nullptr};
1825 for (; ol.get_obj(this, &o /* , fetch_barrier */); ol.next()) {
1826 orule.update();
1827 std::tuple<LCOpRule, rgw_bucket_dir_entry> t1 = {orule, *o};
1828 worker->workpool->enqueue(WorkItem{t1});
1829 }
1830 worker->workpool->drain();
1831 }
1832
1833 ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
1834 return ret;
1835 }
1836
1837 int RGWLC::bucket_lc_post(int index, int max_lock_sec,
1838 rgw::sal::Lifecycle::LCEntry& entry, int& result,
1839 LCWorker* worker)
1840 {
1841 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
1842
1843 rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
1844 obj_names[index],
1845 cookie);
1846
1847 ldpp_dout(this, 5) << "RGWLC::bucket_lc_post(): POST " << entry
1848 << " index: " << index << " worker ix: " << worker->ix
1849 << dendl;
1850
1851 do {
1852 int ret = lock->try_lock(this, lock_duration, null_yield);
1853 if (ret == -EBUSY || ret == -EEXIST) {
1854 /* already locked by another lc processor */
1855 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1856 << obj_names[index] << ", sleep 5, try again " << dendl;
1857 sleep(5);
1858 continue;
1859 }
1860
1861 if (ret < 0)
1862 return 0;
1863 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index]
1864 << dendl;
1865 if (result == -ENOENT) {
1866 /* XXXX are we SURE the only way result could == ENOENT is when
1867 * there is no such bucket? It is currently the value returned
1868 * from bucket_lc_process(...) */
1869 ret = sal_lc->rm_entry(obj_names[index], entry);
1870 if (ret < 0) {
1871 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1872 << obj_names[index] << dendl;
1873 }
1874 goto clean;
1875 } else if (result < 0) {
1876 entry.status = lc_failed;
1877 } else {
1878 entry.status = lc_complete;
1879 }
1880
1881 ret = sal_lc->set_entry(obj_names[index], entry);
1882 if (ret < 0) {
1883 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1884 << obj_names[index] << dendl;
1885 }
1886 clean:
1887 lock->unlock();
1888 delete lock;
1889 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock "
1890 << obj_names[index] << dendl;
1891 return 0;
1892 } while (true);
1893 }
1894
1895 int RGWLC::list_lc_progress(string& marker, uint32_t max_entries,
1896 vector<rgw::sal::Lifecycle::LCEntry>& progress_map,
1897 int& index)
1898 {
1899 progress_map.clear();
1900 for(; index < max_objs; index++, marker="") {
1901 vector<rgw::sal::Lifecycle::LCEntry> entries;
1902 int ret = sal_lc->list_entries(obj_names[index], marker, max_entries, entries);
1903 if (ret < 0) {
1904 if (ret == -ENOENT) {
1905 ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
1906 << obj_names[index] << dendl;
1907 continue;
1908 } else {
1909 return ret;
1910 }
1911 }
1912 progress_map.reserve(progress_map.size() + entries.size());
1913 progress_map.insert(progress_map.end(), entries.begin(), entries.end());
1914
1915 /* update index, marker tuple */
1916 if (progress_map.size() > 0)
1917 marker = progress_map.back().bucket;
1918
1919 if (progress_map.size() >= max_entries)
1920 break;
1921 }
1922 return 0;
1923 }
1924
1925 static inline vector<int> random_sequence(uint32_t n)
1926 {
1927 vector<int> v(n, 0);
1928 std::generate(v.begin(), v.end(),
1929 [ix = 0]() mutable {
1930 return ix++;
1931 });
1932 std::random_device rd;
1933 std::default_random_engine rng{rd()};
1934 std::shuffle(v.begin(), v.end(), rd);
1935 return v;
1936 }
1937
1938 static inline int get_lc_index(CephContext *cct,
1939 const std::string& shard_id)
1940 {
1941 int max_objs =
1942 (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME :
1943 cct->_conf->rgw_lc_max_objs);
1944 /* n.b. review hash algo */
1945 int index = ceph_str_hash_linux(shard_id.c_str(),
1946 shard_id.size()) % HASH_PRIME % max_objs;
1947 return index;
1948 }
1949
1950 static inline void get_lc_oid(CephContext *cct,
1951 const std::string& shard_id, string *oid)
1952 {
1953 /* n.b. review hash algo */
1954 int index = get_lc_index(cct, shard_id);
1955 *oid = lc_oid_prefix;
1956 char buf[32];
1957 snprintf(buf, 32, ".%d", index);
1958 oid->append(buf);
1959 return;
1960 }
1961
1962 static std::string get_lc_shard_name(const rgw_bucket& bucket){
1963 return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
1964 }
1965
1966 int RGWLC::process(LCWorker* worker,
1967 const std::unique_ptr<rgw::sal::Bucket>& optional_bucket,
1968 bool once = false)
1969 {
1970 int ret = 0;
1971 int max_secs = cct->_conf->rgw_lc_lock_max_time;
1972
1973 if (optional_bucket) {
1974 /* if a bucket is provided, this is a single-bucket run, and
1975 * can be processed without traversing any state entries (we
1976 * do need the entry {pro,epi}logue which update the state entry
1977 * for this bucket) */
1978 auto bucket_entry_marker = get_lc_shard_name(optional_bucket->get_key());
1979 auto index = get_lc_index(store->ctx(), bucket_entry_marker);
1980 ret = process_bucket(index, max_secs, worker, bucket_entry_marker, once);
1981 return ret;
1982 } else {
1983 /* generate an index-shard sequence unrelated to any other
1984 * that might be running in parallel */
1985 std::string all_buckets{""};
1986 vector<int> shard_seq = random_sequence(max_objs);
1987 for (auto index : shard_seq) {
1988 ret = process(index, max_secs, worker, once);
1989 if (ret < 0)
1990 return ret;
1991 }
1992 }
1993
1994 return 0;
1995 }
1996
1997 bool RGWLC::expired_session(time_t started)
1998 {
1999 time_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
2000 ? cct->_conf->rgw_lc_debug_interval
2001 : 24*60*60;
2002
2003 auto now = time(nullptr);
2004
2005 ldpp_dout(this, 16) << "RGWLC::expired_session"
2006 << " started: " << started
2007 << " interval: " << interval << "(*2==" << 2*interval << ")"
2008 << " now: " << now
2009 << dendl;
2010
2011 return (started + 2*interval < now);
2012 }
2013
2014 time_t RGWLC::thread_stop_at()
2015 {
2016 uint64_t interval = (cct->_conf->rgw_lc_debug_interval > 0)
2017 ? cct->_conf->rgw_lc_debug_interval
2018 : 24*60*60;
2019
2020 return time(nullptr) + interval;
2021 }
2022
2023 int RGWLC::process_bucket(int index, int max_lock_secs, LCWorker* worker,
2024 const std::string& bucket_entry_marker,
2025 bool once = false)
2026 {
2027 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ENTER: "
2028 << "index: " << index << " worker ix: " << worker->ix
2029 << dendl;
2030
2031 int ret = 0;
2032 std::unique_ptr<rgw::sal::LCSerializer> serializer(
2033 sal_lc->get_serializer(lc_index_lock_name, obj_names[index],
2034 std::string()));
2035
2036 rgw::sal::Lifecycle::LCEntry entry;
2037 if (max_lock_secs <= 0) {
2038 return -EAGAIN;
2039 }
2040
2041 utime_t time(max_lock_secs, 0);
2042 ret = serializer->try_lock(this, time, null_yield);
2043 if (ret == -EBUSY || ret == -EEXIST) {
2044 /* already locked by another lc processor */
2045 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
2046 << obj_names[index] << dendl;
2047 return -EBUSY;
2048 }
2049 if (ret < 0)
2050 return 0;
2051
2052 std::unique_lock<rgw::sal::LCSerializer> lock(
2053 *(serializer.get()), std::adopt_lock);
2054
2055 ret = sal_lc->get_entry(obj_names[index], bucket_entry_marker, entry);
2056 if (ret >= 0) {
2057 if (entry.status == lc_processing) {
2058 if (expired_session(entry.start_time)) {
2059 ldpp_dout(this, 5) << "RGWLC::process_bucket(): STALE lc session found for: " << entry
2060 << " index: " << index << " worker ix: " << worker->ix
2061 << " (clearing)"
2062 << dendl;
2063 } else {
2064 ldpp_dout(this, 5) << "RGWLC::process_bucket(): ACTIVE entry: "
2065 << entry
2066 << " index: " << index
2067 << " worker ix: " << worker->ix
2068 << dendl;
2069 return ret;
2070 }
2071 }
2072 }
2073
2074 /* do nothing if no bucket */
2075 if (entry.bucket.empty()) {
2076 return ret;
2077 }
2078
2079 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 1: " << entry
2080 << " index: " << index << " worker ix: " << worker->ix
2081 << dendl;
2082
2083 entry.status = lc_processing;
2084 ret = sal_lc->set_entry(obj_names[index], entry);
2085 if (ret < 0) {
2086 ldpp_dout(this, 0) << "RGWLC::process_bucket() failed to set obj entry "
2087 << obj_names[index] << entry.bucket << entry.status
2088 << dendl;
2089 return ret;
2090 }
2091
2092 ldpp_dout(this, 5) << "RGWLC::process_bucket(): START entry 2: " << entry
2093 << " index: " << index << " worker ix: " << worker->ix
2094 << dendl;
2095
2096 lock.unlock();
2097 ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
2098 bucket_lc_post(index, max_lock_secs, entry, ret, worker);
2099
2100 return ret;
2101 } /* RGWLC::process_bucket */
2102
2103 int RGWLC::process(int index, int max_lock_secs, LCWorker* worker,
2104 bool once = false)
2105 {
2106 ldpp_dout(this, 5) << "RGWLC::process(): ENTER: "
2107 << "index: " << index << " worker ix: " << worker->ix
2108 << dendl;
2109
2110 int ret = 0;
2111 rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
2112 obj_names[index],
2113 std::string());
2114 do {
2115 utime_t now = ceph_clock_now();
2116 //string = bucket_name:bucket_id, start_time, int = LC_BUCKET_STATUS
2117 rgw::sal::Lifecycle::LCEntry entry;
2118 if (max_lock_secs <= 0)
2119 return -EAGAIN;
2120
2121 utime_t time(max_lock_secs, 0);
2122 ret = lock->try_lock(this, time, null_yield);
2123 if (ret == -EBUSY || ret == -EEXIST) {
2124 /* already locked by another lc processor */
2125 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
2126 << obj_names[index] << ", sleep 5, try again" << dendl;
2127 sleep(5);
2128 continue; // XXXX really retry forever?
2129 }
2130 if (ret < 0)
2131 return 0;
2132
2133 rgw::sal::Lifecycle::LCHead head;
2134 ret = sal_lc->get_head(obj_names[index], head);
2135 if (ret < 0) {
2136 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
2137 << obj_names[index] << ", ret=" << ret << dendl;
2138 goto exit;
2139 }
2140
2141 ret = sal_lc->get_entry(obj_names[index], head.marker, entry);
2142 if (ret >= 0) {
2143 if (entry.status == lc_processing) {
2144 if (expired_session(entry.start_time)) {
2145 ldpp_dout(this, 5) << "RGWLC::process(): STALE lc session found for: " << entry
2146 << " index: " << index << " worker ix: " << worker->ix
2147 << " (clearing)"
2148 << dendl;
2149 } else {
2150 ldpp_dout(this, 5) << "RGWLC::process(): ACTIVE entry: " << entry
2151 << " index: " << index << " worker ix: " << worker->ix
2152 << dendl;
2153 goto exit;
2154 }
2155 }
2156 }
2157
2158 if(!if_already_run_today(head.start_date) ||
2159 once) {
2160 head.start_date = now;
2161 head.marker.clear();
2162 ret = bucket_lc_prepare(index, worker);
2163 if (ret < 0) {
2164 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
2165 << obj_names[index]
2166 << ", ret=" << ret
2167 << dendl;
2168 goto exit;
2169 }
2170 }
2171
2172 ret = sal_lc->get_next_entry(obj_names[index], head.marker, entry);
2173 if (ret < 0) {
2174 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
2175 << obj_names[index] << dendl;
2176 goto exit;
2177 }
2178
2179 /* termination condition (eof) */
2180 if (entry.bucket.empty())
2181 goto exit;
2182
2183 ldpp_dout(this, 5) << "RGWLC::process(): START entry 1: " << entry
2184 << " index: " << index << " worker ix: " << worker->ix
2185 << dendl;
2186
2187 entry.status = lc_processing;
2188 ret = sal_lc->set_entry(obj_names[index], entry);
2189 if (ret < 0) {
2190 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry "
2191 << obj_names[index] << entry.bucket << entry.status << dendl;
2192 goto exit;
2193 }
2194
2195 head.marker = entry.bucket;
2196 ret = sal_lc->put_head(obj_names[index], head);
2197 if (ret < 0) {
2198 ldpp_dout(this, 0) << "RGWLC::process() failed to put head "
2199 << obj_names[index]
2200 << dendl;
2201 goto exit;
2202 }
2203
2204 ldpp_dout(this, 5) << "RGWLC::process(): START entry 2: " << entry
2205 << " index: " << index << " worker ix: " << worker->ix
2206 << dendl;
2207
2208 lock->unlock();
2209 ret = bucket_lc_process(entry.bucket, worker, thread_stop_at(), once);
2210 bucket_lc_post(index, max_lock_secs, entry, ret, worker);
2211 } while(1 && !once);
2212
2213 delete lock;
2214 return 0;
2215
2216 exit:
2217 lock->unlock();
2218 delete lock;
2219 return 0;
2220 }
2221
2222 void RGWLC::start_processor()
2223 {
2224 auto maxw = cct->_conf->rgw_lc_max_worker;
2225 workers.reserve(maxw);
2226 for (int ix = 0; ix < maxw; ++ix) {
2227 auto worker =
2228 std::make_unique<RGWLC::LCWorker>(this /* dpp */, cct, this, ix);
2229 worker->create((string{"lifecycle_thr_"} + to_string(ix)).c_str());
2230 workers.emplace_back(std::move(worker));
2231 }
2232 }
2233
2234 void RGWLC::stop_processor()
2235 {
2236 down_flag = true;
2237 for (auto& worker : workers) {
2238 worker->stop();
2239 worker->join();
2240 }
2241 workers.clear();
2242 }
2243
2244 unsigned RGWLC::get_subsys() const
2245 {
2246 return dout_subsys;
2247 }
2248
2249 std::ostream& RGWLC::gen_prefix(std::ostream& out) const
2250 {
2251 return out << "lifecycle: ";
2252 }
2253
2254 void RGWLC::LCWorker::stop()
2255 {
2256 std::lock_guard l{lock};
2257 cond.notify_all();
2258 }
2259
2260 bool RGWLC::going_down()
2261 {
2262 return down_flag;
2263 }
2264
2265 bool RGWLC::LCWorker::should_work(utime_t& now)
2266 {
2267 int start_hour;
2268 int start_minute;
2269 int end_hour;
2270 int end_minute;
2271 string worktime = cct->_conf->rgw_lifecycle_work_time;
2272 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute,
2273 &end_hour, &end_minute);
2274 struct tm bdt;
2275 time_t tt = now.sec();
2276 localtime_r(&tt, &bdt);
2277
2278 if (cct->_conf->rgw_lc_debug_interval > 0) {
2279 /* We're debugging, so say we can run */
2280 return true;
2281 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
2282 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
2283 return true;
2284 } else {
2285 return false;
2286 }
2287
2288 }
2289
2290 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
2291 {
2292 int secs;
2293
2294 if (cct->_conf->rgw_lc_debug_interval > 0) {
2295 secs = start + cct->_conf->rgw_lc_debug_interval - now;
2296 if (secs < 0)
2297 secs = 0;
2298 return (secs);
2299 }
2300
2301 int start_hour;
2302 int start_minute;
2303 int end_hour;
2304 int end_minute;
2305 string worktime = cct->_conf->rgw_lifecycle_work_time;
2306 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour,
2307 &end_minute);
2308 struct tm bdt;
2309 time_t tt = now.sec();
2310 time_t nt;
2311 localtime_r(&tt, &bdt);
2312 bdt.tm_hour = start_hour;
2313 bdt.tm_min = start_minute;
2314 bdt.tm_sec = 0;
2315 nt = mktime(&bdt);
2316 secs = nt - tt;
2317
2318 return secs>0 ? secs : secs+24*60*60;
2319 }
2320
2321 RGWLC::LCWorker::~LCWorker()
2322 {
2323 delete workpool;
2324 } /* ~LCWorker */
2325
2326 void RGWLifecycleConfiguration::generate_test_instances(
2327 list<RGWLifecycleConfiguration*>& o)
2328 {
2329 o.push_back(new RGWLifecycleConfiguration);
2330 }
2331
2332 template<typename F>
2333 static int guard_lc_modify(const DoutPrefixProvider *dpp,
2334 rgw::sal::Store* store,
2335 rgw::sal::Lifecycle* sal_lc,
2336 const rgw_bucket& bucket, const string& cookie,
2337 const F& f) {
2338 CephContext *cct = store->ctx();
2339
2340 string shard_id = get_lc_shard_name(bucket);
2341
2342 string oid;
2343 get_lc_oid(cct, shard_id, &oid);
2344
2345 /* XXX it makes sense to take shard_id for a bucket_id? */
2346 rgw::sal::Lifecycle::LCEntry entry;
2347 entry.bucket = shard_id;
2348 entry.status = lc_uninitial;
2349 int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
2350
2351 rgw::sal::LCSerializer* lock = sal_lc->get_serializer(lc_index_lock_name,
2352 oid,
2353 cookie);
2354 utime_t time(max_lock_secs, 0);
2355
2356 int ret;
2357
2358 do {
2359 ret = lock->try_lock(dpp, time, null_yield);
2360 if (ret == -EBUSY || ret == -EEXIST) {
2361 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2362 << oid << ", sleep 5, try again" << dendl;
2363 sleep(5); // XXX: return retryable error
2364 continue;
2365 }
2366 if (ret < 0) {
2367 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
2368 << oid << ", ret=" << ret << dendl;
2369 break;
2370 }
2371 ret = f(sal_lc, oid, entry);
2372 if (ret < 0) {
2373 ldpp_dout(dpp, 0) << "RGWLC::RGWPutLC() failed to set entry on "
2374 << oid << ", ret=" << ret << dendl;
2375 }
2376 break;
2377 } while(true);
2378 lock->unlock();
2379 delete lock;
2380 return ret;
2381 }
2382
2383 int RGWLC::set_bucket_config(rgw::sal::Bucket* bucket,
2384 const rgw::sal::Attrs& bucket_attrs,
2385 RGWLifecycleConfiguration *config)
2386 {
2387 rgw::sal::Attrs attrs = bucket_attrs;
2388 bufferlist lc_bl;
2389 config->encode(lc_bl);
2390
2391 attrs[RGW_ATTR_LC] = std::move(lc_bl);
2392
2393 int ret =
2394 bucket->merge_and_store_attrs(this, attrs, null_yield);
2395 if (ret < 0)
2396 return ret;
2397
2398 rgw_bucket& b = bucket->get_key();
2399
2400
2401 ret = guard_lc_modify(this, store, sal_lc.get(), b, cookie,
2402 [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
2403 const rgw::sal::Lifecycle::LCEntry& entry) {
2404 return sal_lc->set_entry(oid, entry);
2405 });
2406
2407 return ret;
2408 }
2409
2410 int RGWLC::remove_bucket_config(rgw::sal::Bucket* bucket,
2411 const rgw::sal::Attrs& bucket_attrs)
2412 {
2413 rgw::sal::Attrs attrs = bucket_attrs;
2414 attrs.erase(RGW_ATTR_LC);
2415 int ret = bucket->merge_and_store_attrs(this, attrs, null_yield);
2416
2417 rgw_bucket& b = bucket->get_key();
2418
2419 if (ret < 0) {
2420 ldpp_dout(this, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
2421 << b.name << " returned err=" << ret << dendl;
2422 return ret;
2423 }
2424
2425
2426 ret = guard_lc_modify(this, store, sal_lc.get(), b, cookie,
2427 [&](rgw::sal::Lifecycle* sal_lc, const string& oid,
2428 const rgw::sal::Lifecycle::LCEntry& entry) {
2429 return sal_lc->rm_entry(oid, entry);
2430 });
2431
2432 return ret;
2433 } /* RGWLC::remove_bucket_config */
2434
2435 RGWLC::~RGWLC()
2436 {
2437 stop_processor();
2438 finalize();
2439 } /* ~RGWLC() */
2440
2441 namespace rgw::lc {
2442
2443 int fix_lc_shard_entry(const DoutPrefixProvider *dpp,
2444 rgw::sal::Store* store,
2445 rgw::sal::Lifecycle* sal_lc,
2446 rgw::sal::Bucket* bucket)
2447 {
2448 if (auto aiter = bucket->get_attrs().find(RGW_ATTR_LC);
2449 aiter == bucket->get_attrs().end()) {
2450 return 0; // No entry, nothing to fix
2451 }
2452
2453 auto shard_name = get_lc_shard_name(bucket->get_key());
2454 std::string lc_oid;
2455 get_lc_oid(store->ctx(), shard_name, &lc_oid);
2456
2457 rgw::sal::Lifecycle::LCEntry entry;
2458 // There are multiple cases we need to encounter here
2459 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
2460 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
2461 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
2462 // We are not dropping the old marker here as that would be caught by the next LC process update
2463 int ret = sal_lc->get_entry(lc_oid, shard_name, entry);
2464 if (ret == 0) {
2465 ldpp_dout(dpp, 5) << "Entry already exists, nothing to do" << dendl;
2466 return ret; // entry is already existing correctly set to marker
2467 }
2468 ldpp_dout(dpp, 5) << "lc_get_entry errored ret code=" << ret << dendl;
2469 if (ret == -ENOENT) {
2470 ldpp_dout(dpp, 1) << "No entry for bucket=" << bucket
2471 << " creating " << dendl;
2472 // TODO: we have too many ppl making cookies like this!
2473 char cookie_buf[COOKIE_LEN + 1];
2474 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
2475 std::string cookie = cookie_buf;
2476
2477 ret = guard_lc_modify(dpp,
2478 store, sal_lc, bucket->get_key(), cookie,
2479 [&lc_oid](rgw::sal::Lifecycle* slc,
2480 const string& oid,
2481 const rgw::sal::Lifecycle::LCEntry& entry) {
2482 return slc->set_entry(lc_oid, entry);
2483 });
2484
2485 }
2486
2487 return ret;
2488 }
2489
2490 std::string s3_expiration_header(
2491 DoutPrefixProvider* dpp,
2492 const rgw_obj_key& obj_key,
2493 const RGWObjTags& obj_tagset,
2494 const ceph::real_time& mtime,
2495 const std::map<std::string, buffer::list>& bucket_attrs)
2496 {
2497 CephContext* cct = dpp->get_cct();
2498 RGWLifecycleConfiguration config(cct);
2499 std::string hdr{""};
2500
2501 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2502 if (aiter == bucket_attrs.end())
2503 return hdr;
2504
2505 bufferlist::const_iterator iter{&aiter->second};
2506 try {
2507 config.decode(iter);
2508 } catch (const buffer::error& e) {
2509 ldpp_dout(dpp, 0) << __func__
2510 << "() decode life cycle config failed"
2511 << dendl;
2512 return hdr;
2513 } /* catch */
2514
2515 /* dump tags at debug level 16 */
2516 RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags();
2517 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) {
2518 for (const auto& elt : obj_tag_map) {
2519 ldpp_dout(dpp, 16) << __func__
2520 << "() key=" << elt.first << " val=" << elt.second
2521 << dendl;
2522 }
2523 }
2524
2525 boost::optional<ceph::real_time> expiration_date;
2526 boost::optional<std::string> rule_id;
2527
2528 const auto& rule_map = config.get_rule_map();
2529 for (const auto& ri : rule_map) {
2530 const auto& rule = ri.second;
2531 auto& id = rule.get_id();
2532 auto& filter = rule.get_filter();
2533 auto& prefix = filter.has_prefix() ? filter.get_prefix(): rule.get_prefix();
2534 auto& expiration = rule.get_expiration();
2535 auto& noncur_expiration = rule.get_noncur_expiration();
2536
2537 ldpp_dout(dpp, 10) << "rule: " << ri.first
2538 << " prefix: " << prefix
2539 << " expiration: "
2540 << " date: " << expiration.get_date()
2541 << " days: " << expiration.get_days()
2542 << " noncur_expiration: "
2543 << " date: " << noncur_expiration.get_date()
2544 << " days: " << noncur_expiration.get_days()
2545 << dendl;
2546
2547 /* skip if rule !enabled
2548 * if rule has prefix, skip iff object !match prefix
2549 * if rule has tags, skip iff object !match tags
2550 * note if object is current or non-current, compare accordingly
2551 * if rule has days, construct date expression and save iff older
2552 * than last saved
2553 * if rule has date, convert date expression and save iff older
2554 * than last saved
2555 * if the date accum has a value, format it into hdr
2556 */
2557
2558 if (! rule.is_enabled())
2559 continue;
2560
2561 if(! prefix.empty()) {
2562 if (! boost::starts_with(obj_key.name, prefix))
2563 continue;
2564 }
2565
2566 if (filter.has_tags()) {
2567 bool tag_match = false;
2568 const RGWObjTags& rule_tagset = filter.get_tags();
2569 for (auto& tag : rule_tagset.get_tags()) {
2570 /* remember, S3 tags are {key,value} tuples */
2571 tag_match = true;
2572 auto obj_tag = obj_tag_map.find(tag.first);
2573 if (obj_tag == obj_tag_map.end() || obj_tag->second != tag.second) {
2574 ldpp_dout(dpp, 10) << "tag does not match obj_key=" << obj_key
2575 << " rule_id=" << id
2576 << " tag=" << tag
2577 << dendl;
2578 tag_match = false;
2579 break;
2580 }
2581 }
2582 if (! tag_match)
2583 continue;
2584 }
2585
2586 // compute a uniform expiration date
2587 boost::optional<ceph::real_time> rule_expiration_date;
2588 const LCExpiration& rule_expiration =
2589 (obj_key.instance.empty()) ? expiration : noncur_expiration;
2590
2591 if (rule_expiration.has_date()) {
2592 rule_expiration_date =
2593 boost::optional<ceph::real_time>(
2594 ceph::from_iso_8601(rule.get_expiration().get_date()));
2595 } else {
2596 if (rule_expiration.has_days()) {
2597 rule_expiration_date =
2598 boost::optional<ceph::real_time>(
2599 mtime + make_timespan(double(rule_expiration.get_days())*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2600 }
2601 }
2602
2603 // update earliest expiration
2604 if (rule_expiration_date) {
2605 if ((! expiration_date) ||
2606 (*expiration_date > *rule_expiration_date)) {
2607 expiration_date =
2608 boost::optional<ceph::real_time>(rule_expiration_date);
2609 rule_id = boost::optional<std::string>(id);
2610 }
2611 }
2612 }
2613
2614 // cond format header
2615 if (expiration_date && rule_id) {
2616 // Fri, 23 Dec 2012 00:00:00 GMT
2617 char exp_buf[100];
2618 time_t exp = ceph::real_clock::to_time_t(*expiration_date);
2619 if (std::strftime(exp_buf, sizeof(exp_buf),
2620 "%a, %d %b %Y %T %Z", std::gmtime(&exp))) {
2621 hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf,
2622 *rule_id);
2623 } else {
2624 ldpp_dout(dpp, 0) << __func__ <<
2625 "() strftime of life cycle expiration header failed"
2626 << dendl;
2627 }
2628 }
2629
2630 return hdr;
2631
2632 } /* rgwlc_s3_expiration_header */
2633
2634 bool s3_multipart_abort_header(
2635 DoutPrefixProvider* dpp,
2636 const rgw_obj_key& obj_key,
2637 const ceph::real_time& mtime,
2638 const std::map<std::string, buffer::list>& bucket_attrs,
2639 ceph::real_time& abort_date,
2640 std::string& rule_id)
2641 {
2642 CephContext* cct = dpp->get_cct();
2643 RGWLifecycleConfiguration config(cct);
2644
2645 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
2646 if (aiter == bucket_attrs.end())
2647 return false;
2648
2649 bufferlist::const_iterator iter{&aiter->second};
2650 try {
2651 config.decode(iter);
2652 } catch (const buffer::error& e) {
2653 ldpp_dout(dpp, 0) << __func__
2654 << "() decode life cycle config failed"
2655 << dendl;
2656 return false;
2657 } /* catch */
2658
2659 std::optional<ceph::real_time> abort_date_tmp;
2660 std::optional<std::string_view> rule_id_tmp;
2661 const auto& rule_map = config.get_rule_map();
2662 for (const auto& ri : rule_map) {
2663 const auto& rule = ri.second;
2664 const auto& id = rule.get_id();
2665 const auto& filter = rule.get_filter();
2666 const auto& prefix = filter.has_prefix()?filter.get_prefix():rule.get_prefix();
2667 const auto& mp_expiration = rule.get_mp_expiration();
2668 if (!rule.is_enabled()) {
2669 continue;
2670 }
2671 if(!prefix.empty() && !boost::starts_with(obj_key.name, prefix)) {
2672 continue;
2673 }
2674
2675 std::optional<ceph::real_time> rule_abort_date;
2676 if (mp_expiration.has_days()) {
2677 rule_abort_date = std::optional<ceph::real_time>(
2678 mtime + make_timespan(mp_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
2679 }
2680
2681 // update earliest abort date
2682 if (rule_abort_date) {
2683 if ((! abort_date_tmp) ||
2684 (*abort_date_tmp > *rule_abort_date)) {
2685 abort_date_tmp =
2686 std::optional<ceph::real_time>(rule_abort_date);
2687 rule_id_tmp = std::optional<std::string_view>(id);
2688 }
2689 }
2690 }
2691 if (abort_date_tmp && rule_id_tmp) {
2692 abort_date = *abort_date_tmp;
2693 rule_id = *rule_id_tmp;
2694 return true;
2695 } else {
2696 return false;
2697 }
2698 }
2699
2700 } /* namespace rgw::lc */
2701
2702 void lc_op::dump(Formatter *f) const
2703 {
2704 f->dump_bool("status", status);
2705 f->dump_bool("dm_expiration", dm_expiration);
2706
2707 f->dump_int("expiration", expiration);
2708 f->dump_int("noncur_expiration", noncur_expiration);
2709 f->dump_int("mp_expiration", mp_expiration);
2710 if (expiration_date) {
2711 utime_t ut(*expiration_date);
2712 f->dump_stream("expiration_date") << ut;
2713 }
2714 if (obj_tags) {
2715 f->dump_object("obj_tags", *obj_tags);
2716 }
2717 f->open_object_section("transitions");
2718 for(auto& [storage_class, transition] : transitions) {
2719 f->dump_object(storage_class, transition);
2720 }
2721 f->close_section();
2722
2723 f->open_object_section("noncur_transitions");
2724 for (auto& [storage_class, transition] : noncur_transitions) {
2725 f->dump_object(storage_class, transition);
2726 }
2727 f->close_section();
2728 }
2729
2730 void LCFilter::dump(Formatter *f) const
2731 {
2732 f->dump_string("prefix", prefix);
2733 f->dump_object("obj_tags", obj_tags);
2734 }
2735
2736 void LCExpiration::dump(Formatter *f) const
2737 {
2738 f->dump_string("days", days);
2739 f->dump_string("date", date);
2740 }
2741
2742 void LCRule::dump(Formatter *f) const
2743 {
2744 f->dump_string("id", id);
2745 f->dump_string("prefix", prefix);
2746 f->dump_string("status", status);
2747 f->dump_object("expiration", expiration);
2748 f->dump_object("noncur_expiration", noncur_expiration);
2749 f->dump_object("mp_expiration", mp_expiration);
2750 f->dump_object("filter", filter);
2751 f->open_object_section("transitions");
2752 for (auto& [storage_class, transition] : transitions) {
2753 f->dump_object(storage_class, transition);
2754 }
2755 f->close_section();
2756
2757 f->open_object_section("noncur_transitions");
2758 for (auto& [storage_class, transition] : noncur_transitions) {
2759 f->dump_object(storage_class, transition);
2760 }
2761 f->close_section();
2762 f->dump_bool("dm_expiration", dm_expiration);
2763 }
2764
2765
2766 void RGWLifecycleConfiguration::dump(Formatter *f) const
2767 {
2768 f->open_object_section("prefix_map");
2769 for (auto& prefix : prefix_map) {
2770 f->dump_object(prefix.first.c_str(), prefix.second);
2771 }
2772 f->close_section();
2773
2774 f->open_array_section("rule_map");
2775 for (auto& rule : rule_map) {
2776 f->open_object_section("entry");
2777 f->dump_string("id", rule.first);
2778 f->open_object_section("rule");
2779 rule.second.dump(f);
2780 f->close_section();
2781 f->close_section();
2782 }
2783 f->close_section();
2784 }
2785