]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string.h>
5 #include <iostream>
6 #include <map>
7
8 #include <boost/algorithm/string/split.hpp>
9 #include <boost/algorithm/string.hpp>
10 #include <boost/algorithm/string/predicate.hpp>
11
12 #include "common/Formatter.h"
13 #include <common/errno.h>
14 #include "include/random.h"
15 #include "cls/rgw/cls_rgw_client.h"
16 #include "cls/lock/cls_lock_client.h"
17 #include "rgw_common.h"
18 #include "rgw_bucket.h"
19 #include "rgw_lc.h"
20 #include "rgw_zone.h"
21 #include "rgw_string.h"
22 #include "rgw_multi.h"
23
24 // this seems safe to use, at least for now--arguably, we should
25 // prefer header-only fmt, in general
26 #undef FMT_HEADER_ONLY
27 #define FMT_HEADER_ONLY 1
28 #include "fmt/format.h"
29
30 #include "services/svc_sys_obj.h"
31 #include "services/svc_zone.h"
32 #include "services/svc_tier_rados.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_rgw
36
37 const char* LC_STATUS[] = {
38 "UNINITIAL",
39 "PROCESSING",
40 "FAILED",
41 "COMPLETE"
42 };
43
44 using namespace librados;
45
46 bool LCRule::valid() const
47 {
48 if (id.length() > MAX_ID_LEN) {
49 return false;
50 }
51 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration &&
52 transitions.empty() && noncur_transitions.empty()) {
53 return false;
54 }
55 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
56 return false;
57 }
58 if (!transitions.empty()) {
59 bool using_days = expiration.has_days();
60 bool using_date = expiration.has_date();
61 for (const auto& elem : transitions) {
62 if (!elem.second.valid()) {
63 return false;
64 }
65 using_days = using_days || elem.second.has_days();
66 using_date = using_date || elem.second.has_date();
67 if (using_days && using_date) {
68 return false;
69 }
70 }
71 }
72 for (const auto& elem : noncur_transitions) {
73 if (!elem.second.valid()) {
74 return false;
75 }
76 }
77
78 return true;
79 }
80
81 void LCRule::init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days)
82 {
83 id = _id;
84 prefix = _prefix;
85 char buf[32];
86 snprintf(buf, sizeof(buf), "%d", num_days);
87 expiration.set_days(buf);
88 set_enabled(true);
89 }
90
91 void RGWLifecycleConfiguration::add_rule(const LCRule& rule)
92 {
93 auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
94 rule_map.insert(pair<string, LCRule>(id, rule));
95 }
96
97 bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
98 {
99 lc_op op(rule.get_id());
100 op.status = rule.is_enabled();
101 if (rule.get_expiration().has_days()) {
102 op.expiration = rule.get_expiration().get_days();
103 }
104 if (rule.get_expiration().has_date()) {
105 op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date());
106 }
107 if (rule.get_noncur_expiration().has_days()) {
108 op.noncur_expiration = rule.get_noncur_expiration().get_days();
109 }
110 if (rule.get_mp_expiration().has_days()) {
111 op.mp_expiration = rule.get_mp_expiration().get_days();
112 }
113 op.dm_expiration = rule.get_dm_expiration();
114 for (const auto &elem : rule.get_transitions()) {
115 transition_action action;
116 if (elem.second.has_days()) {
117 action.days = elem.second.get_days();
118 } else {
119 action.date = ceph::from_iso_8601(elem.second.get_date());
120 }
121 action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first);
122 op.transitions.emplace(elem.first, std::move(action));
123 }
124 for (const auto &elem : rule.get_noncur_transitions()) {
125 transition_action action;
126 action.days = elem.second.get_days();
127 action.date = ceph::from_iso_8601(elem.second.get_date());
128 action.storage_class = elem.first;
129 op.noncur_transitions.emplace(elem.first, std::move(action));
130 }
131 std::string prefix;
132 if (rule.get_filter().has_prefix()){
133 prefix = rule.get_filter().get_prefix();
134 } else {
135 prefix = rule.get_prefix();
136 }
137
138 if (rule.get_filter().has_tags()){
139 op.obj_tags = rule.get_filter().get_tags();
140 }
141 prefix_map.emplace(std::move(prefix), std::move(op));
142 return true;
143 }
144
145 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
146 {
147 if (!rule.valid()) {
148 return -EINVAL;
149 }
150 auto& id = rule.get_id();
151 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
152 return -EINVAL;
153 }
154 if (rule.get_filter().has_tags() && (rule.get_dm_expiration() || !rule.get_mp_expiration().empty())) {
155 return -ERR_INVALID_REQUEST;
156 }
157 rule_map.insert(pair<string, LCRule>(id, rule));
158
159 if (!_add_rule(rule)) {
160 return -ERR_INVALID_REQUEST;
161 }
162 return 0;
163 }
164
165 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
166 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
167 (second.expiration > 0 || second.expiration_date != boost::none)) {
168 return true;
169 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
170 return true;
171 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
172 return true;
173 } else if (!first.transitions.empty() && !second.transitions.empty()) {
174 for (auto &elem : first.transitions) {
175 if (second.transitions.find(elem.first) != second.transitions.end()) {
176 return true;
177 }
178 }
179 } else if (!first.noncur_transitions.empty() && !second.noncur_transitions.empty()) {
180 for (auto &elem : first.noncur_transitions) {
181 if (second.noncur_transitions.find(elem.first) != second.noncur_transitions.end()) {
182 return true;
183 }
184 }
185 }
186 return false;
187 }
188
189 /* Formerly, this method checked for duplicate rules using an invalid
190 * method (prefix uniqueness). */
191 bool RGWLifecycleConfiguration::valid()
192 {
193 return true;
194 }
195
196 void *RGWLC::LCWorker::entry() {
197 do {
198 utime_t start = ceph_clock_now();
199 if (should_work(start)) {
200 ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
201 int r = lc->process();
202 if (r < 0) {
203 ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
204 }
205 ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
206 }
207 if (lc->going_down())
208 break;
209
210 utime_t end = ceph_clock_now();
211 int secs = schedule_next_start_time(start, end);
212 utime_t next;
213 next.set_from_double(end + secs);
214
215 ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
216
217 std::unique_lock l{lock};
218 cond.wait_for(l, std::chrono::seconds(secs));
219 } while (!lc->going_down());
220
221 return NULL;
222 }
223
224 void RGWLC::initialize(CephContext *_cct, rgw::sal::RGWRadosStore *_store) {
225 cct = _cct;
226 store = _store;
227 max_objs = cct->_conf->rgw_lc_max_objs;
228 if (max_objs > HASH_PRIME)
229 max_objs = HASH_PRIME;
230
231 obj_names = new string[max_objs];
232
233 for (int i = 0; i < max_objs; i++) {
234 obj_names[i] = lc_oid_prefix;
235 char buf[32];
236 snprintf(buf, 32, ".%d", i);
237 obj_names[i].append(buf);
238 }
239
240 #define COOKIE_LEN 16
241 char cookie_buf[COOKIE_LEN + 1];
242 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
243 cookie = cookie_buf;
244 }
245
246 void RGWLC::finalize()
247 {
248 delete[] obj_names;
249 }
250
251 bool RGWLC::if_already_run_today(time_t& start_date)
252 {
253 struct tm bdt;
254 time_t begin_of_day;
255 utime_t now = ceph_clock_now();
256 localtime_r(&start_date, &bdt);
257
258 if (cct->_conf->rgw_lc_debug_interval > 0) {
259 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
260 return true;
261 else
262 return false;
263 }
264
265 bdt.tm_hour = 0;
266 bdt.tm_min = 0;
267 bdt.tm_sec = 0;
268 begin_of_day = mktime(&bdt);
269 if (now - begin_of_day < 24*60*60)
270 return true;
271 else
272 return false;
273 }
274
275 int RGWLC::bucket_lc_prepare(int index)
276 {
277 map<string, int > entries;
278
279 string marker;
280
281 #define MAX_LC_LIST_ENTRIES 100
282 do {
283 int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
284 if (ret < 0)
285 return ret;
286 map<string, int>::iterator iter;
287 for (iter = entries.begin(); iter != entries.end(); ++iter) {
288 pair<string, int > entry(iter->first, lc_uninitial);
289 ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
290 if (ret < 0) {
291 ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
292 << obj_names[index] << dendl;
293 return ret;
294 }
295 }
296
297 if (!entries.empty()) {
298 marker = std::move(entries.rbegin()->first);
299 }
300 } while (!entries.empty());
301
302 return 0;
303 }
304
305 static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr)
306 {
307 double timediff, cmp;
308 utime_t base_time;
309 if (cct->_conf->rgw_lc_debug_interval <= 0) {
310 /* Normal case, run properly */
311 cmp = days*24*60*60;
312 base_time = ceph_clock_now().round_to_day();
313 } else {
314 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
315 cmp = days*cct->_conf->rgw_lc_debug_interval;
316 base_time = ceph_clock_now();
317 }
318 timediff = base_time - ceph::real_clock::to_time_t(mtime);
319
320 if (expire_time) {
321 *expire_time = mtime + make_timespan(cmp);
322 }
323 ldout(cct, 20) << __func__ << "(): mtime=" << mtime << " days=" << days << " base_time=" << base_time << " timediff=" << timediff << " cmp=" << cmp << dendl;
324
325 return (timediff >= cmp);
326 }
327
328 static bool pass_object_lock_check(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx)
329 {
330 if (!bucket_info.obj_lock_enabled()) {
331 return true;
332 }
333 RGWRados::Object op_target(store, bucket_info, ctx, obj);
334 RGWRados::Object::Read read_op(&op_target);
335 map<string, bufferlist> attrs;
336 read_op.params.attrs = &attrs;
337 int ret = read_op.prepare(null_yield);
338 if (ret < 0) {
339 if (ret == -ENOENT) {
340 return true;
341 } else {
342 return false;
343 }
344 } else {
345 auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
346 if (iter != attrs.end()) {
347 RGWObjectRetention retention;
348 try {
349 decode(retention, iter->second);
350 } catch (buffer::error& err) {
351 ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectRetention" << dendl;
352 return false;
353 }
354 if (ceph::real_clock::to_time_t(retention.get_retain_until_date()) > ceph_clock_now()) {
355 return false;
356 }
357 }
358 iter = attrs.find(RGW_ATTR_OBJECT_LEGAL_HOLD);
359 if (iter != attrs.end()) {
360 RGWObjectLegalHold obj_legal_hold;
361 try {
362 decode(obj_legal_hold, iter->second);
363 } catch (buffer::error& err) {
364 ldout(store->ctx(), 0) << "ERROR: failed to decode RGWObjectLegalHold" << dendl;
365 return false;
366 }
367 if (obj_legal_hold.is_enabled()) {
368 return false;
369 }
370 }
371 return true;
372 }
373 }
374
375 int RGWLC::handle_multipart_expiration(
376 RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map)
377 {
378 MultipartMetaFilter mp_filter;
379 vector<rgw_bucket_dir_entry> objs;
380 RGWMPObj mp_obj;
381 bool is_truncated;
382 int ret;
383 RGWBucketInfo& bucket_info = target->get_bucket_info();
384 RGWRados::Bucket::List list_op(target);
385 auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
386 list_op.params.list_versions = false;
387 /* lifecycle processing does not depend on total order, so can
388 * take advantage of unorderd listing optimizations--such as
389 * operating on one shard at a time */
390 list_op.params.allow_unordered = true;
391 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
392 list_op.params.filter = &mp_filter;
393 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
394 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
395 continue;
396 }
397 list_op.params.prefix = prefix_iter->first;
398 do {
399 objs.clear();
400 list_op.params.marker = list_op.get_next_marker();
401 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
402 if (ret < 0) {
403 if (ret == (-ENOENT))
404 return 0;
405 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
406 return ret;
407 }
408
409 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
410 if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
411 rgw_obj_key key(obj_iter->key);
412 if (!mp_obj.from_meta(key.name)) {
413 continue;
414 }
415 RGWObjectCtx rctx(store);
416 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
417 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
418 ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
419 } else if (ret == -ERR_NO_SUCH_UPLOAD) {
420 ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
421 }
422 if (going_down())
423 return 0;
424 }
425 } /* for objs */
426 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
427 } while(is_truncated);
428 }
429 return 0;
430 }
431
432 static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
433 {
434 RGWRados::Object op_target(store, bucket_info, ctx, obj);
435 RGWRados::Object::Read read_op(&op_target);
436
437 return read_op.get_attr(RGW_ATTR_TAGS, tags_bl, null_yield);
438 }
439
440 static bool is_valid_op(const lc_op& op)
441 {
442 return (op.status &&
443 (op.expiration > 0
444 || op.expiration_date != boost::none
445 || op.noncur_expiration > 0
446 || op.dm_expiration
447 || !op.transitions.empty()
448 || !op.noncur_transitions.empty()));
449 }
450
451 static inline bool has_all_tags(const lc_op& rule_action,
452 const RGWObjTags& object_tags)
453 {
454 if(! rule_action.obj_tags)
455 return false;
456 if(object_tags.count() < rule_action.obj_tags->count())
457 return false;
458 size_t tag_count = 0;
459 for (const auto& tag : object_tags.get_tags()) {
460 const auto& rule_tags = rule_action.obj_tags->get_tags();
461 const auto& iter = rule_tags.find(tag.first);
462 if(iter->second == tag.second)
463 {
464 tag_count++;
465 }
466 /* all tags in the rule appear in obj tags */
467 }
468 return tag_count == rule_action.obj_tags->count();
469 }
470
471 class LCObjsLister {
472 rgw::sal::RGWRadosStore *store;
473 RGWBucketInfo& bucket_info;
474 RGWRados::Bucket target;
475 RGWRados::Bucket::List list_op;
476 bool is_truncated{false};
477 rgw_obj_key next_marker;
478 string prefix;
479 vector<rgw_bucket_dir_entry> objs;
480 vector<rgw_bucket_dir_entry>::iterator obj_iter;
481 rgw_bucket_dir_entry pre_obj;
482 int64_t delay_ms;
483
484 public:
485 LCObjsLister(rgw::sal::RGWRadosStore *_store, RGWBucketInfo& _bucket_info) :
486 store(_store), bucket_info(_bucket_info),
487 target(store->getRados(), bucket_info), list_op(&target) {
488 list_op.params.list_versions = bucket_info.versioned();
489 list_op.params.allow_unordered = true;
490 delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
491 }
492
493 void set_prefix(const string& p) {
494 prefix = p;
495 list_op.params.prefix = prefix;
496 }
497
498 int init() {
499 return fetch();
500 }
501
502 int fetch() {
503 int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated, null_yield);
504 if (ret < 0) {
505 return ret;
506 }
507
508 obj_iter = objs.begin();
509
510 return 0;
511 }
512
513 void delay() {
514 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
515 }
516
517 bool get_obj(rgw_bucket_dir_entry *obj) {
518 if (obj_iter == objs.end()) {
519 if (!is_truncated) {
520 delay();
521 return false;
522 } else {
523 list_op.params.marker = pre_obj.key;
524
525 int ret = fetch();
526 if (ret < 0) {
527 ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl;
528 return ret;
529 }
530 }
531 delay();
532 }
533 *obj = *obj_iter;
534 return obj_iter != objs.end();
535 }
536
537 rgw_bucket_dir_entry get_prev_obj() {
538 return pre_obj;
539 }
540
541 void next() {
542 pre_obj = *obj_iter;
543 ++obj_iter;
544 }
545
546 bool next_has_same_name()
547 {
548 if ((obj_iter + 1) == objs.end()) {
549 /* this should have been called after get_obj() was called, so this should
550 * only happen if is_truncated is false */
551 return false;
552 }
553 return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
554 }
555 };
556
557
558 struct op_env {
559 lc_op& op;
560 rgw::sal::RGWRadosStore *store;
561 RGWLC *lc;
562 RGWBucketInfo& bucket_info;
563 LCObjsLister& ol;
564
565 op_env(lc_op& _op, rgw::sal::RGWRadosStore *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info,
566 LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {}
567 };
568
569 class LCRuleOp;
570
571 struct lc_op_ctx {
572 CephContext *cct;
573 op_env& env;
574 rgw_bucket_dir_entry& o;
575
576 rgw::sal::RGWRadosStore *store;
577 RGWBucketInfo& bucket_info;
578 lc_op& op;
579 LCObjsLister& ol;
580
581 rgw_obj obj;
582 RGWObjectCtx rctx;
583 const DoutPrefixProvider *dpp;
584
585 lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o, const DoutPrefixProvider *_dpp) : cct(_env.store->ctx()), env(_env), o(_o),
586 store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
587 obj(env.bucket_info.bucket, o.key), rctx(env.store), dpp(_dpp) {}
588 };
589
590 static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
591 {
592 auto& store = oc.store;
593 auto& bucket_info = oc.bucket_info;
594 auto& o = oc.o;
595 auto obj_key = o.key;
596 auto& meta = o.meta;
597
598 if (!remove_indeed) {
599 obj_key.instance.clear();
600 } else if (obj_key.instance.empty()) {
601 obj_key.instance = "null";
602 }
603
604 rgw_obj obj(bucket_info.bucket, obj_key);
605 ACLOwner obj_owner;
606 obj_owner.set_id(rgw_user {meta.owner});
607 obj_owner.set_name(meta.owner_display_name);
608
609 RGWRados::Object del_target(store->getRados(), bucket_info, oc.rctx, obj);
610 RGWRados::Object::Delete del_op(&del_target);
611
612 del_op.params.bucket_owner = bucket_info.owner;
613 del_op.params.versioning_status = bucket_info.versioning_status();
614 del_op.params.obj_owner = obj_owner;
615 del_op.params.unmod_since = meta.mtime;
616
617 return del_op.delete_obj(null_yield);
618 }
619
620 class LCOpAction {
621 public:
622 virtual ~LCOpAction() {}
623
624 virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
625 return false;
626 };
627
628 /* called after check(). Check should tell us whether this action
629 * is applicable. If there are multiple actions, we'll end up executing
630 * the latest applicable action
631 * For example:
632 * one action after 10 days, another after 20, third after 40.
633 * After 10 days, the latest applicable action would be the first one,
634 * after 20 days it will be the second one. After 21 days it will still be the
635 * second one. So check() should return true for the second action at that point,
636 * but should_process() if the action has already been applied. In object removal
637 * it doesn't matter, but in object transition it does.
638 */
639 virtual bool should_process() {
640 return true;
641 }
642
643 virtual int process(lc_op_ctx& oc) {
644 return 0;
645 }
646 };
647
648 class LCOpFilter {
649 public:
650 virtual ~LCOpFilter() {}
651 virtual bool check(lc_op_ctx& oc) {
652 return false;
653 }
654 };
655
656 class LCOpRule {
657 friend class LCOpAction;
658
659 op_env& env;
660
661 std::vector<unique_ptr<LCOpFilter> > filters;
662 std::vector<unique_ptr<LCOpAction> > actions;
663
664 public:
665 LCOpRule(op_env& _env) : env(_env) {}
666
667 void build();
668 int process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp);
669 };
670
671 static int check_tags(lc_op_ctx& oc, bool *skip)
672 {
673 auto& op = oc.op;
674
675 if (op.obj_tags != boost::none) {
676 *skip = true;
677
678 bufferlist tags_bl;
679 int ret = read_obj_tags(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx, tags_bl);
680 if (ret < 0) {
681 if (ret != -ENODATA) {
682 ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
683 }
684 return 0;
685 }
686 RGWObjTags dest_obj_tags;
687 try {
688 auto iter = tags_bl.cbegin();
689 dest_obj_tags.decode(iter);
690 } catch (buffer::error& err) {
691 ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
692 return -EIO;
693 }
694
695 if (! has_all_tags(op, dest_obj_tags)) {
696 ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match in rule: " << op.id << dendl;
697 return 0;
698 }
699 }
700 *skip = false;
701 return 0;
702 }
703
704 class LCOpFilter_Tags : public LCOpFilter {
705 public:
706 bool check(lc_op_ctx& oc) override {
707 auto& o = oc.o;
708
709 if (o.is_delete_marker()) {
710 return true;
711 }
712
713 bool skip;
714
715 int ret = check_tags(oc, &skip);
716 if (ret < 0) {
717 if (ret == -ENOENT) {
718 return false;
719 }
720 ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
721 return false;
722 }
723
724 return !skip;
725 };
726 };
727
728 class LCOpAction_CurrentExpiration : public LCOpAction {
729 public:
730 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
731 auto& o = oc.o;
732 if (!o.is_current()) {
733 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl;
734 return false;
735 }
736 if (o.is_delete_marker()) {
737 if (oc.ol.next_has_same_name()) {
738 return false;
739 } else {
740 *exp_time = real_clock::now();
741 return true;
742 }
743 }
744
745 auto& mtime = o.meta.mtime;
746 bool is_expired;
747 auto& op = oc.op;
748 if (op.expiration <= 0) {
749 if (op.expiration_date == boost::none) {
750 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
751 return false;
752 }
753 is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
754 *exp_time = *op.expiration_date;
755 } else {
756 is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
757 }
758
759 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl;
760 return is_expired;
761 }
762
763 int process(lc_op_ctx& oc) {
764 auto& o = oc.o;
765 int r;
766 if (o.is_delete_marker()) {
767 r = remove_expired_obj(oc, true);
768 } else {
769 r = remove_expired_obj(oc, !oc.bucket_info.versioned());
770 }
771 if (r < 0) {
772 ldout(oc.cct, 0) << "ERROR: remove_expired_obj "
773 << oc.bucket_info.bucket << ":" << o.key
774 << " " << cpp_strerror(r) << dendl;
775 return r;
776 }
777 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
778 return 0;
779 }
780 };
781
782 class LCOpAction_NonCurrentExpiration : public LCOpAction {
783 public:
784 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
785 auto& o = oc.o;
786 if (o.is_current()) {
787 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl;
788 return false;
789 }
790
791 auto mtime = oc.ol.get_prev_obj().meta.mtime;
792 int expiration = oc.op.noncur_expiration;
793 bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
794
795 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
796 return is_expired && pass_object_lock_check(oc.store->getRados(), oc.bucket_info, oc.obj, oc.rctx);
797 }
798
799 int process(lc_op_ctx& oc) {
800 auto& o = oc.o;
801 int r = remove_expired_obj(oc, true);
802 if (r < 0) {
803 ldout(oc.cct, 0) << "ERROR: remove_expired_obj (non-current expiration) "
804 << oc.bucket_info.bucket << ":" << o.key
805 << " " << cpp_strerror(r) << dendl;
806 return r;
807 }
808 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl;
809 return 0;
810 }
811 };
812
813 class LCOpAction_DMExpiration : public LCOpAction {
814 public:
815 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
816 auto& o = oc.o;
817 if (!o.is_delete_marker()) {
818 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl;
819 return false;
820 }
821
822 if (oc.ol.next_has_same_name()) {
823 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl;
824 return false;
825 }
826
827 *exp_time = real_clock::now();
828
829 return true;
830 }
831
832 int process(lc_op_ctx& oc) {
833 auto& o = oc.o;
834 int r = remove_expired_obj(oc, true);
835 if (r < 0) {
836 ldout(oc.cct, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
837 << oc.bucket_info.bucket << ":" << o.key
838 << " " << cpp_strerror(r) << dendl;
839 return r;
840 }
841 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl;
842 return 0;
843 }
844 };
845
846 class LCOpAction_Transition : public LCOpAction {
847 const transition_action& transition;
848 bool need_to_process{false};
849
850 protected:
851 virtual bool check_current_state(bool is_current) = 0;
852 virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
853 public:
854 LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {}
855
856 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
857 auto& o = oc.o;
858
859 if (o.is_delete_marker()) {
860 return false;
861 }
862
863 if (!check_current_state(o.is_current())) {
864 return false;
865 }
866
867 auto mtime = get_effective_mtime(oc);
868 bool is_expired;
869 if (transition.days < 0) {
870 if (transition.date == boost::none) {
871 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
872 return false;
873 }
874 is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*transition.date);
875 *exp_time = *transition.date;
876 } else {
877 is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
878 }
879
880 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
881
882 need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != transition.storage_class);
883
884 return is_expired;
885 }
886
887 bool should_process() override {
888 return need_to_process;
889 }
890
891 int process(lc_op_ctx& oc) {
892 auto& o = oc.o;
893
894 rgw_placement_rule target_placement;
895 target_placement.inherit_from(oc.bucket_info.placement_rule);
896 target_placement.storage_class = transition.storage_class;
897
898 if (!oc.store->svc()->zone->get_zone_params().valid_placement(target_placement)) {
899 ldpp_dout(oc.dpp, 0) << "ERROR: non existent dest placement: " << target_placement
900 << " bucket="<< oc.bucket_info.bucket
901 << " rule_id=" << oc.op.id << dendl;
902 return -EINVAL;
903 }
904
905 int r = oc.store->getRados()->transition_obj(oc.rctx, oc.bucket_info, oc.obj,
906 target_placement, o.meta.mtime, o.versioned_epoch, oc.dpp, null_yield);
907 if (r < 0) {
908 ldpp_dout(oc.dpp, 0) << "ERROR: failed to transition obj "
909 << oc.bucket_info.bucket << ":" << o.key
910 << " -> " << transition.storage_class
911 << " " << cpp_strerror(r) << dendl;
912 return r;
913 }
914 ldpp_dout(oc.dpp, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl;
915 return 0;
916 }
917 };
918
919 class LCOpAction_CurrentTransition : public LCOpAction_Transition {
920 protected:
921 bool check_current_state(bool is_current) override {
922 return is_current;
923 }
924
925 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
926 return oc.o.meta.mtime;
927 }
928 public:
929 LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
930 };
931
932 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
933 protected:
934 bool check_current_state(bool is_current) override {
935 return !is_current;
936 }
937
938 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
939 return oc.ol.get_prev_obj().meta.mtime;
940 }
941 public:
942 LCOpAction_NonCurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
943 };
944
945 void LCOpRule::build()
946 {
947 filters.emplace_back(new LCOpFilter_Tags);
948
949 auto& op = env.op;
950
951 if (op.expiration > 0 ||
952 op.expiration_date != boost::none) {
953 actions.emplace_back(new LCOpAction_CurrentExpiration);
954 }
955
956 if (op.dm_expiration) {
957 actions.emplace_back(new LCOpAction_DMExpiration);
958 }
959
960 if (op.noncur_expiration > 0) {
961 actions.emplace_back(new LCOpAction_NonCurrentExpiration);
962 }
963
964 for (auto& iter : op.transitions) {
965 actions.emplace_back(new LCOpAction_CurrentTransition(iter.second));
966 }
967
968 for (auto& iter : op.noncur_transitions) {
969 actions.emplace_back(new LCOpAction_NonCurrentTransition(iter.second));
970 }
971 }
972
973 int LCOpRule::process(rgw_bucket_dir_entry& o, const DoutPrefixProvider *dpp)
974 {
975 lc_op_ctx ctx(env, o, dpp);
976
977 unique_ptr<LCOpAction> *selected = nullptr;
978 real_time exp;
979
980 for (auto& a : actions) {
981 real_time action_exp;
982
983 if (a->check(ctx, &action_exp)) {
984 if (action_exp > exp) {
985 exp = action_exp;
986 selected = &a;
987 }
988 }
989 }
990
991 if (selected &&
992 (*selected)->should_process()) {
993
994 /*
995 * Calling filter checks after action checks because
996 * all action checks (as they are implemented now) do
997 * not access the objects themselves, but return result
998 * from info from bucket index listing. The current tags filter
999 * check does access the objects, so we avoid unnecessary rados calls
1000 * having filters check later in the process.
1001 */
1002
1003 bool cont = false;
1004 for (auto& f : filters) {
1005 if (f->check(ctx)) {
1006 cont = true;
1007 break;
1008 }
1009 }
1010
1011 if (!cont) {
1012 ldpp_dout(dpp, 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl;
1013 return 0;
1014 }
1015
1016 int r = (*selected)->process(ctx);
1017 if (r < 0) {
1018 ldpp_dout(dpp, 0) << "ERROR: remove_expired_obj "
1019 << env.bucket_info.bucket << ":" << o.key
1020 << " " << cpp_strerror(r) << dendl;
1021 return r;
1022 }
1023 ldpp_dout(dpp, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl;
1024 }
1025
1026 return 0;
1027
1028 }
1029
1030 int RGWLC::bucket_lc_process(string& shard_id)
1031 {
1032 RGWLifecycleConfiguration config(cct);
1033 RGWBucketInfo bucket_info;
1034 map<string, bufferlist> bucket_attrs;
1035 string no_ns, list_versions;
1036 vector<rgw_bucket_dir_entry> objs;
1037 vector<std::string> result;
1038 boost::split(result, shard_id, boost::is_any_of(":"));
1039 string bucket_tenant = result[0];
1040 string bucket_name = result[1];
1041 string bucket_marker = result[2];
1042 int ret = store->getRados()->get_bucket_info(store->svc(), bucket_tenant, bucket_name, bucket_info, NULL, null_yield, &bucket_attrs);
1043 if (ret < 0) {
1044 ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
1045 return ret;
1046 }
1047
1048 if (bucket_info.bucket.marker != bucket_marker) {
1049 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
1050 << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
1051 << " orig_marker=" << bucket_marker << dendl;
1052 return -ENOENT;
1053 }
1054
1055 RGWRados::Bucket target(store->getRados(), bucket_info);
1056
1057 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
1058 if (aiter == bucket_attrs.end())
1059 return 0;
1060
1061 bufferlist::const_iterator iter{&aiter->second};
1062 try {
1063 config.decode(iter);
1064 } catch (const buffer::error& e) {
1065 ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl;
1066 return -1;
1067 }
1068
1069 multimap<string, lc_op>& prefix_map = config.get_prefix_map();
1070
1071 ldpp_dout(this, 10) << __func__ << "() prefix_map size="
1072 << prefix_map.size()
1073 << dendl;
1074
1075 rgw_obj_key pre_marker;
1076 rgw_obj_key next_marker;
1077 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
1078 auto& op = prefix_iter->second;
1079 if (!is_valid_op(op)) {
1080 continue;
1081 }
1082 ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
1083 if (prefix_iter != prefix_map.begin() &&
1084 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
1085 next_marker = pre_marker;
1086 } else {
1087 pre_marker = next_marker;
1088 }
1089
1090 LCObjsLister ol(store, bucket_info);
1091 ol.set_prefix(prefix_iter->first);
1092
1093 ret = ol.init();
1094
1095 if (ret < 0) {
1096 if (ret == (-ENOENT))
1097 return 0;
1098 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
1099 return ret;
1100 }
1101
1102 op_env oenv(op, store, this, bucket_info, ol);
1103
1104 LCOpRule orule(oenv);
1105
1106 orule.build();
1107
1108 rgw_bucket_dir_entry o;
1109 for (; ol.get_obj(&o); ol.next()) {
1110 ldpp_dout(this, 20) << __func__ << "(): key=" << o.key << dendl;
1111 int ret = orule.process(o, this);
1112 if (ret < 0) {
1113 ldpp_dout(this, 20) << "ERROR: orule.process() returned ret="
1114 << ret
1115 << dendl;
1116 }
1117
1118 if (going_down()) {
1119 return 0;
1120 }
1121 }
1122 }
1123
1124 ret = handle_multipart_expiration(&target, prefix_map);
1125
1126 return ret;
1127 }
1128
1129 int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
1130 {
1131 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
1132
1133 rados::cls::lock::Lock l(lc_index_lock_name);
1134 l.set_cookie(cookie);
1135 l.set_duration(lock_duration);
1136
1137 do {
1138 int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]);
1139 if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
1140 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1141 << obj_names[index] << ", sleep 5, try again" << dendl;
1142 sleep(5);
1143 continue;
1144 }
1145 if (ret < 0)
1146 return 0;
1147 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
1148 if (result == -ENOENT) {
1149 ret = cls_rgw_lc_rm_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
1150 if (ret < 0) {
1151 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1152 << obj_names[index] << dendl;
1153 }
1154 goto clean;
1155 } else if (result < 0) {
1156 entry.second = lc_failed;
1157 } else {
1158 entry.second = lc_complete;
1159 }
1160
1161 ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
1162 if (ret < 0) {
1163 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1164 << obj_names[index] << dendl;
1165 }
1166 clean:
1167 l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
1168 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl;
1169 return 0;
1170 } while (true);
1171 }
1172
1173 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
1174 {
1175 int index = 0;
1176 progress_map->clear();
1177 for(; index <max_objs; index++) {
1178 map<string, int > entries;
1179 int ret = cls_rgw_lc_list(store->getRados()->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
1180 if (ret < 0) {
1181 if (ret == -ENOENT) {
1182 ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
1183 << obj_names[index] << dendl;
1184 continue;
1185 } else {
1186 return ret;
1187 }
1188 }
1189 map<string, int>::iterator iter;
1190 for (iter = entries.begin(); iter != entries.end(); ++iter) {
1191 progress_map->insert(*iter);
1192 }
1193 }
1194 return 0;
1195 }
1196
1197 int RGWLC::process()
1198 {
1199 int max_secs = cct->_conf->rgw_lc_lock_max_time;
1200
1201 const int start = ceph::util::generate_random_number(0, max_objs - 1);
1202
1203 for (int i = 0; i < max_objs; i++) {
1204 int index = (i + start) % max_objs;
1205 int ret = process(index, max_secs);
1206 if (ret < 0)
1207 return ret;
1208 }
1209
1210 return 0;
1211 }
1212
1213 int RGWLC::process(int index, int max_lock_secs)
1214 {
1215 rados::cls::lock::Lock l(lc_index_lock_name);
1216 do {
1217 utime_t now = ceph_clock_now();
1218 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
1219 if (max_lock_secs <= 0)
1220 return -EAGAIN;
1221
1222 utime_t time(max_lock_secs, 0);
1223 l.set_duration(time);
1224
1225 int ret = l.lock_exclusive(&store->getRados()->lc_pool_ctx, obj_names[index]);
1226 if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
1227 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
1228 << obj_names[index] << ", sleep 5, try again" << dendl;
1229 sleep(5);
1230 continue;
1231 }
1232 if (ret < 0)
1233 return 0;
1234
1235 cls_rgw_lc_obj_head head;
1236 ret = cls_rgw_lc_get_head(store->getRados()->lc_pool_ctx, obj_names[index], head);
1237 if (ret < 0) {
1238 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
1239 << obj_names[index] << ", ret=" << ret << dendl;
1240 goto exit;
1241 }
1242
1243 if(!if_already_run_today(head.start_date)) {
1244 head.start_date = now;
1245 head.marker.clear();
1246 ret = bucket_lc_prepare(index);
1247 if (ret < 0) {
1248 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
1249 << obj_names[index] << ", ret=" << ret << dendl;
1250 goto exit;
1251 }
1252 }
1253
1254 ret = cls_rgw_lc_get_next_entry(store->getRados()->lc_pool_ctx, obj_names[index], head.marker, entry);
1255 if (ret < 0) {
1256 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
1257 << obj_names[index] << dendl;
1258 goto exit;
1259 }
1260
1261 if (entry.first.empty())
1262 goto exit;
1263
1264 entry.second = lc_processing;
1265 ret = cls_rgw_lc_set_entry(store->getRados()->lc_pool_ctx, obj_names[index], entry);
1266 if (ret < 0) {
1267 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index]
1268 << " (" << entry.first << "," << entry.second << ")" << dendl;
1269 goto exit;
1270 }
1271
1272 head.marker = entry.first;
1273 ret = cls_rgw_lc_put_head(store->getRados()->lc_pool_ctx, obj_names[index], head);
1274 if (ret < 0) {
1275 ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
1276 goto exit;
1277 }
1278 l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
1279 ret = bucket_lc_process(entry.first);
1280 bucket_lc_post(index, max_lock_secs, entry, ret);
1281 }while(1);
1282
1283 exit:
1284 l.unlock(&store->getRados()->lc_pool_ctx, obj_names[index]);
1285 return 0;
1286 }
1287
1288 void RGWLC::start_processor()
1289 {
1290 worker = new LCWorker(this, cct, this);
1291 worker->create("lifecycle_thr");
1292 }
1293
1294 void RGWLC::stop_processor()
1295 {
1296 down_flag = true;
1297 if (worker) {
1298 worker->stop();
1299 worker->join();
1300 }
1301 delete worker;
1302 worker = NULL;
1303 }
1304
1305
1306 unsigned RGWLC::get_subsys() const
1307 {
1308 return dout_subsys;
1309 }
1310
1311 std::ostream& RGWLC::gen_prefix(std::ostream& out) const
1312 {
1313 return out << "lifecycle: ";
1314 }
1315
1316 void RGWLC::LCWorker::stop()
1317 {
1318 std::lock_guard l{lock};
1319 cond.notify_all();
1320 }
1321
1322 bool RGWLC::going_down()
1323 {
1324 return down_flag;
1325 }
1326
1327 bool RGWLC::LCWorker::should_work(utime_t& now)
1328 {
1329 int start_hour;
1330 int start_minute;
1331 int end_hour;
1332 int end_minute;
1333 string worktime = cct->_conf->rgw_lifecycle_work_time;
1334 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
1335 struct tm bdt;
1336 time_t tt = now.sec();
1337 localtime_r(&tt, &bdt);
1338
1339 if (cct->_conf->rgw_lc_debug_interval > 0) {
1340 /* We're debugging, so say we can run */
1341 return true;
1342 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
1343 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
1344 return true;
1345 } else {
1346 return false;
1347 }
1348
1349 }
1350
1351 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
1352 {
1353 int secs;
1354
1355 if (cct->_conf->rgw_lc_debug_interval > 0) {
1356 secs = start + cct->_conf->rgw_lc_debug_interval - now;
1357 if (secs < 0)
1358 secs = 0;
1359 return (secs);
1360 }
1361
1362 int start_hour;
1363 int start_minute;
1364 int end_hour;
1365 int end_minute;
1366 string worktime = cct->_conf->rgw_lifecycle_work_time;
1367 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
1368 struct tm bdt;
1369 time_t tt = now.sec();
1370 time_t nt;
1371 localtime_r(&tt, &bdt);
1372 bdt.tm_hour = start_hour;
1373 bdt.tm_min = start_minute;
1374 bdt.tm_sec = 0;
1375 nt = mktime(&bdt);
1376 secs = nt - tt;
1377
1378 return secs>0 ? secs : secs+24*60*60;
1379 }
1380
1381 void RGWLifecycleConfiguration::generate_test_instances(list<RGWLifecycleConfiguration*>& o)
1382 {
1383 o.push_back(new RGWLifecycleConfiguration);
1384 }
1385
1386 void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
1387 {
1388 int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs);
1389 int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
1390 *oid = lc_oid_prefix;
1391 char buf[32];
1392 snprintf(buf, 32, ".%d", index);
1393 oid->append(buf);
1394 return;
1395 }
1396
1397
1398
1399 static std::string get_lc_shard_name(const rgw_bucket& bucket){
1400 return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
1401 }
1402
1403 template<typename F>
1404 static int guard_lc_modify(rgw::sal::RGWRadosStore* store, const rgw_bucket& bucket, const string& cookie, const F& f) {
1405 CephContext *cct = store->ctx();
1406
1407 string shard_id = get_lc_shard_name(bucket);
1408
1409 string oid;
1410 get_lc_oid(cct, shard_id, &oid);
1411
1412 pair<string, int> entry(shard_id, lc_uninitial);
1413 int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
1414
1415 rados::cls::lock::Lock l(lc_index_lock_name);
1416 utime_t time(max_lock_secs, 0);
1417 l.set_duration(time);
1418 l.set_cookie(cookie);
1419
1420 librados::IoCtx *ctx = store->getRados()->get_lc_pool_ctx();
1421 int ret;
1422
1423 do {
1424 ret = l.lock_exclusive(ctx, oid);
1425 if (ret == -EBUSY || ret == -EEXIST) {
1426 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1427 << oid << ", sleep 5, try again" << dendl;
1428 sleep(5); // XXX: return retryable error
1429 continue;
1430 }
1431 if (ret < 0) {
1432 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1433 << oid << ", ret=" << ret << dendl;
1434 break;
1435 }
1436 ret = f(ctx, oid, entry);
1437 if (ret < 0) {
1438 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on "
1439 << oid << ", ret=" << ret << dendl;
1440 }
1441 break;
1442 } while(true);
1443 l.unlock(ctx, oid);
1444 return ret;
1445 }
1446
1447 int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
1448 const map<string, bufferlist>& bucket_attrs,
1449 RGWLifecycleConfiguration *config)
1450 {
1451 map<string, bufferlist> attrs = bucket_attrs;
1452 bufferlist lc_bl;
1453 config->encode(lc_bl);
1454
1455 attrs[RGW_ATTR_LC] = std::move(lc_bl);
1456
1457 int ret = store->ctl()->bucket->set_bucket_instance_attrs(bucket_info, attrs,
1458 &bucket_info.objv_tracker,
1459 null_yield);
1460 if (ret < 0)
1461 return ret;
1462
1463 rgw_bucket& bucket = bucket_info.bucket;
1464
1465
1466 ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
1467 const pair<string, int>& entry) {
1468 return cls_rgw_lc_set_entry(*ctx, oid, entry);
1469 });
1470
1471 return ret;
1472 }
1473
1474 int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
1475 const map<string, bufferlist>& bucket_attrs)
1476 {
1477 map<string, bufferlist> attrs = bucket_attrs;
1478 attrs.erase(RGW_ATTR_LC);
1479 int ret = store->ctl()->bucket->set_bucket_instance_attrs(bucket_info, attrs,
1480 &bucket_info.objv_tracker,
1481 null_yield);
1482
1483 rgw_bucket& bucket = bucket_info.bucket;
1484
1485 if (ret < 0) {
1486 ldout(cct, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
1487 << bucket.name << " returned err=" << ret << dendl;
1488 return ret;
1489 }
1490
1491
1492 ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
1493 const pair<string, int>& entry) {
1494 return cls_rgw_lc_rm_entry(*ctx, oid, entry);
1495 });
1496
1497 return ret;
1498 }
1499
1500 namespace rgw::lc {
1501
1502 int fix_lc_shard_entry(rgw::sal::RGWRadosStore* store, const RGWBucketInfo& bucket_info,
1503 const map<std::string,bufferlist>& battrs)
1504 {
1505 if (auto aiter = battrs.find(RGW_ATTR_LC);
1506 aiter == battrs.end()) {
1507 return 0; // No entry, nothing to fix
1508 }
1509
1510 auto shard_name = get_lc_shard_name(bucket_info.bucket);
1511 std::string lc_oid;
1512 get_lc_oid(store->ctx(), shard_name, &lc_oid);
1513
1514 rgw_lc_entry_t entry;
1515 // There are multiple cases we need to encounter here
1516 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
1517 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
1518 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
1519 // We are not dropping the old marker here as that would be caught by the next LC process update
1520 auto lc_pool_ctx = store->getRados()->get_lc_pool_ctx();
1521 int ret = cls_rgw_lc_get_entry(*lc_pool_ctx,
1522 lc_oid, shard_name, entry);
1523 if (ret == 0) {
1524 ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl;
1525 return ret; // entry is already existing correctly set to marker
1526 }
1527 ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl;
1528 if (ret == -ENOENT) {
1529 ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name
1530 << " creating " << dendl;
1531 // TODO: we have too many ppl making cookies like this!
1532 char cookie_buf[COOKIE_LEN + 1];
1533 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
1534 std::string cookie = cookie_buf;
1535
1536 ret = guard_lc_modify(store, bucket_info.bucket, cookie,
1537 [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
1538 const pair<string, int>& entry) {
1539 return cls_rgw_lc_set_entry(*lc_pool_ctx,
1540 lc_oid, entry);
1541 });
1542
1543 }
1544
1545 return ret;
1546 }
1547
1548 std::string s3_expiration_header(
1549 DoutPrefixProvider* dpp,
1550 const rgw_obj_key& obj_key,
1551 const RGWObjTags& obj_tagset,
1552 const ceph::real_time& mtime,
1553 const std::map<std::string, buffer::list>& bucket_attrs)
1554 {
1555 CephContext* cct = dpp->get_cct();
1556 RGWLifecycleConfiguration config(cct);
1557 std::string hdr{""};
1558
1559 const auto& aiter = bucket_attrs.find(RGW_ATTR_LC);
1560 if (aiter == bucket_attrs.end())
1561 return hdr;
1562
1563 bufferlist::const_iterator iter{&aiter->second};
1564 try {
1565 config.decode(iter);
1566 } catch (const buffer::error& e) {
1567 ldpp_dout(dpp, 0) << __func__
1568 << "() decode life cycle config failed"
1569 << dendl;
1570 return hdr;
1571 } /* catch */
1572
1573 /* dump tags at debug level 16 */
1574 RGWObjTags::tag_map_t obj_tag_map = obj_tagset.get_tags();
1575 if (cct->_conf->subsys.should_gather(ceph_subsys_rgw, 16)) {
1576 for (const auto& elt : obj_tag_map) {
1577 ldout(cct, 16) << __func__
1578 << "() key=" << elt.first << " val=" << elt.second
1579 << dendl;
1580 }
1581 }
1582
1583 boost::optional<ceph::real_time> expiration_date;
1584 boost::optional<std::string> rule_id;
1585
1586 const auto& rule_map = config.get_rule_map();
1587 for (const auto& ri : rule_map) {
1588 const auto& rule = ri.second;
1589 auto& id = rule.get_id();
1590 auto& prefix = rule.get_prefix();
1591 auto& filter = rule.get_filter();
1592 auto& expiration = rule.get_expiration();
1593 auto& noncur_expiration = rule.get_noncur_expiration();
1594
1595 ldpp_dout(dpp, 10) << "rule: " << ri.first
1596 << " prefix: " << prefix
1597 << " expiration: "
1598 << " date: " << expiration.get_date()
1599 << " days: " << expiration.get_days()
1600 << " noncur_expiration: "
1601 << " date: " << noncur_expiration.get_date()
1602 << " days: " << noncur_expiration.get_days()
1603 << dendl;
1604
1605 /* skip if rule !enabled
1606 * if rule has prefix, skip iff object !match prefix
1607 * if rule has tags, skip iff object !match tags
1608 * note if object is current or non-current, compare accordingly
1609 * if rule has days, construct date expression and save iff older
1610 * than last saved
1611 * if rule has date, convert date expression and save iff older
1612 * than last saved
1613 * if the date accum has a value, format it into hdr
1614 */
1615
1616 if (! rule.is_enabled())
1617 continue;
1618
1619 if(! prefix.empty()) {
1620 if (! boost::starts_with(obj_key.name, prefix))
1621 continue;
1622 }
1623
1624 if (filter.has_tags()) {
1625 bool tag_match = false;
1626 const RGWObjTags& rule_tagset = filter.get_tags();
1627 for (auto& tag : rule_tagset.get_tags()) {
1628 /* remember, S3 tags are {key,value} tuples */
1629 auto ma1 = obj_tag_map.find(tag.first);
1630 if ( ma1 != obj_tag_map.end()) {
1631 if (tag.second == ma1->second) {
1632 ldpp_dout(dpp, 10) << "tag match obj_key=" << obj_key
1633 << " rule_id=" << id
1634 << " tag=" << tag
1635 << " (ma=" << *ma1 << ")"
1636 << dendl;
1637 tag_match = true;
1638 break;
1639 }
1640 }
1641 }
1642 if (! tag_match)
1643 continue;
1644 }
1645
1646 // compute a uniform expiration date
1647 boost::optional<ceph::real_time> rule_expiration_date;
1648 const LCExpiration& rule_expiration =
1649 (obj_key.instance.empty()) ? expiration : noncur_expiration;
1650
1651 if (rule_expiration.has_date()) {
1652 rule_expiration_date =
1653 boost::optional<ceph::real_time>(
1654 ceph::from_iso_8601(rule.get_expiration().get_date()));
1655 } else {
1656 if (rule_expiration.has_days()) {
1657 rule_expiration_date =
1658 boost::optional<ceph::real_time>(
1659 mtime + make_timespan(rule_expiration.get_days()*24*60*60 - ceph::real_clock::to_time_t(mtime)%(24*60*60) + 24*60*60));
1660 }
1661 }
1662
1663 // update earliest expiration
1664 if (rule_expiration_date) {
1665 if ((! expiration_date) ||
1666 (*expiration_date > *rule_expiration_date)) {
1667 expiration_date =
1668 boost::optional<ceph::real_time>(rule_expiration_date);
1669 rule_id = boost::optional<std::string>(id);
1670 }
1671 }
1672 }
1673
1674 // cond format header
1675 if (expiration_date && rule_id) {
1676 // Fri, 23 Dec 2012 00:00:00 GMT
1677 char exp_buf[100];
1678 time_t exp = ceph::real_clock::to_time_t(*expiration_date);
1679 if (std::strftime(exp_buf, sizeof(exp_buf),
1680 "%a, %d %b %Y %T %Z", std::gmtime(&exp))) {
1681 hdr = fmt::format("expiry-date=\"{0}\", rule-id=\"{1}\"", exp_buf,
1682 *rule_id);
1683 } else {
1684 ldpp_dout(dpp, 0) << __func__ <<
1685 "() strftime of life cycle expiration header failed"
1686 << dendl;
1687 }
1688 }
1689
1690 return hdr;
1691
1692 } /* rgwlc_s3_expiration_header */
1693
1694 } /* namespace rgw::lc */