]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <string.h>
5 #include <iostream>
6 #include <map>
7
8 #include <boost/algorithm/string/split.hpp>
9 #include <boost/algorithm/string.hpp>
10
11 #include "common/Formatter.h"
12 #include <common/errno.h>
13 #include "include/random.h"
14 #include "cls/rgw/cls_rgw_client.h"
15 #include "cls/lock/cls_lock_client.h"
16 #include "rgw_common.h"
17 #include "rgw_bucket.h"
18 #include "rgw_lc.h"
19 #include "rgw_string.h"
20
21 #include "services/svc_sys_obj.h"
22
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_rgw
25
26 const char* LC_STATUS[] = {
27 "UNINITIAL",
28 "PROCESSING",
29 "FAILED",
30 "COMPLETE"
31 };
32
33 using namespace librados;
34
35 bool LCRule::valid() const
36 {
37 if (id.length() > MAX_ID_LEN) {
38 return false;
39 }
40 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration &&
41 transitions.empty() && noncur_transitions.empty()) {
42 return false;
43 }
44 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
45 return false;
46 }
47 if (!transitions.empty()) {
48 bool using_days = expiration.has_days();
49 bool using_date = expiration.has_date();
50 for (const auto& elem : transitions) {
51 if (!elem.second.valid()) {
52 return false;
53 }
54 using_days = using_days || elem.second.has_days();
55 using_date = using_date || elem.second.has_date();
56 if (using_days && using_date) {
57 return false;
58 }
59 }
60 }
61 for (const auto& elem : noncur_transitions) {
62 if (!elem.second.valid()) {
63 return false;
64 }
65 }
66
67 return true;
68 }
69
70 void LCRule::init_simple_days_rule(std::string_view _id, std::string_view _prefix, int num_days)
71 {
72 id = _id;
73 prefix = _prefix;
74 char buf[32];
75 snprintf(buf, sizeof(buf), "%d", num_days);
76 expiration.set_days(buf);
77 set_enabled(true);
78 }
79
80 void RGWLifecycleConfiguration::add_rule(const LCRule& rule)
81 {
82 auto& id = rule.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
83 rule_map.insert(pair<string, LCRule>(id, rule));
84 }
85
86 bool RGWLifecycleConfiguration::_add_rule(const LCRule& rule)
87 {
88 lc_op op(rule.get_id());
89 op.status = rule.is_enabled();
90 if (rule.get_expiration().has_days()) {
91 op.expiration = rule.get_expiration().get_days();
92 }
93 if (rule.get_expiration().has_date()) {
94 op.expiration_date = ceph::from_iso_8601(rule.get_expiration().get_date());
95 }
96 if (rule.get_noncur_expiration().has_days()) {
97 op.noncur_expiration = rule.get_noncur_expiration().get_days();
98 }
99 if (rule.get_mp_expiration().has_days()) {
100 op.mp_expiration = rule.get_mp_expiration().get_days();
101 }
102 op.dm_expiration = rule.get_dm_expiration();
103 for (const auto &elem : rule.get_transitions()) {
104 transition_action action;
105 if (elem.second.has_days()) {
106 action.days = elem.second.get_days();
107 } else {
108 action.date = ceph::from_iso_8601(elem.second.get_date());
109 }
110 action.storage_class = rgw_placement_rule::get_canonical_storage_class(elem.first);
111 op.transitions.emplace(elem.first, std::move(action));
112 }
113 for (const auto &elem : rule.get_noncur_transitions()) {
114 transition_action action;
115 action.days = elem.second.get_days();
116 action.date = ceph::from_iso_8601(elem.second.get_date());
117 action.storage_class = elem.first;
118 op.noncur_transitions.emplace(elem.first, std::move(action));
119 }
120 std::string prefix;
121 if (rule.get_filter().has_prefix()){
122 prefix = rule.get_filter().get_prefix();
123 } else {
124 prefix = rule.get_prefix();
125 }
126
127 if (rule.get_filter().has_tags()){
128 op.obj_tags = rule.get_filter().get_tags();
129 }
130 prefix_map.emplace(std::move(prefix), std::move(op));
131 return true;
132 }
133
134 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule& rule)
135 {
136 if (!rule.valid()) {
137 return -EINVAL;
138 }
139 auto& id = rule.get_id();
140 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
141 return -EINVAL;
142 }
143 rule_map.insert(pair<string, LCRule>(id, rule));
144
145 if (!_add_rule(rule)) {
146 return -ERR_INVALID_REQUEST;
147 }
148 return 0;
149 }
150
151 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
152 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
153 (second.expiration > 0 || second.expiration_date != boost::none)) {
154 return true;
155 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
156 return true;
157 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
158 return true;
159 } else if (!first.transitions.empty() && !second.transitions.empty()) {
160 for (auto &elem : first.transitions) {
161 if (second.transitions.find(elem.first) != second.transitions.end()) {
162 return true;
163 }
164 }
165 } else if (!first.noncur_transitions.empty() && !second.noncur_transitions.empty()) {
166 for (auto &elem : first.noncur_transitions) {
167 if (second.noncur_transitions.find(elem.first) != second.noncur_transitions.end()) {
168 return true;
169 }
170 }
171 }
172 return false;
173 }
174
175 /* Formerly, this method checked for duplicate rules using an invalid
176 * method (prefix uniqueness). */
177 bool RGWLifecycleConfiguration::valid()
178 {
179 return true;
180 }
181
182 void *RGWLC::LCWorker::entry() {
183 do {
184 utime_t start = ceph_clock_now();
185 if (should_work(start)) {
186 ldpp_dout(dpp, 2) << "life cycle: start" << dendl;
187 int r = lc->process();
188 if (r < 0) {
189 ldpp_dout(dpp, 0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
190 }
191 ldpp_dout(dpp, 2) << "life cycle: stop" << dendl;
192 }
193 if (lc->going_down())
194 break;
195
196 utime_t end = ceph_clock_now();
197 int secs = schedule_next_start_time(start, end);
198 utime_t next;
199 next.set_from_double(end + secs);
200
201 ldpp_dout(dpp, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next) << dendl;
202
203 lock.Lock();
204 cond.WaitInterval(lock, utime_t(secs, 0));
205 lock.Unlock();
206 } while (!lc->going_down());
207
208 return NULL;
209 }
210
211 void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
212 cct = _cct;
213 store = _store;
214 max_objs = cct->_conf->rgw_lc_max_objs;
215 if (max_objs > HASH_PRIME)
216 max_objs = HASH_PRIME;
217
218 obj_names = new string[max_objs];
219
220 for (int i = 0; i < max_objs; i++) {
221 obj_names[i] = lc_oid_prefix;
222 char buf[32];
223 snprintf(buf, 32, ".%d", i);
224 obj_names[i].append(buf);
225 }
226
227 #define COOKIE_LEN 16
228 char cookie_buf[COOKIE_LEN + 1];
229 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
230 cookie = cookie_buf;
231 }
232
233 void RGWLC::finalize()
234 {
235 delete[] obj_names;
236 }
237
238 bool RGWLC::if_already_run_today(time_t& start_date)
239 {
240 struct tm bdt;
241 time_t begin_of_day;
242 utime_t now = ceph_clock_now();
243 localtime_r(&start_date, &bdt);
244
245 if (cct->_conf->rgw_lc_debug_interval > 0) {
246 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
247 return true;
248 else
249 return false;
250 }
251
252 bdt.tm_hour = 0;
253 bdt.tm_min = 0;
254 bdt.tm_sec = 0;
255 begin_of_day = mktime(&bdt);
256 if (now - begin_of_day < 24*60*60)
257 return true;
258 else
259 return false;
260 }
261
262 int RGWLC::bucket_lc_prepare(int index)
263 {
264 map<string, int > entries;
265
266 string marker;
267
268 #define MAX_LC_LIST_ENTRIES 100
269 do {
270 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
271 if (ret < 0)
272 return ret;
273 map<string, int>::iterator iter;
274 for (iter = entries.begin(); iter != entries.end(); ++iter) {
275 pair<string, int > entry(iter->first, lc_uninitial);
276 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
277 if (ret < 0) {
278 ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
279 << obj_names[index] << dendl;
280 return ret;
281 }
282 }
283
284 if (!entries.empty()) {
285 marker = std::move(entries.rbegin()->first);
286 }
287 } while (!entries.empty());
288
289 return 0;
290 }
291
292 static bool obj_has_expired(CephContext *cct, ceph::real_time mtime, int days, ceph::real_time *expire_time = nullptr)
293 {
294 double timediff, cmp;
295 utime_t base_time;
296 if (cct->_conf->rgw_lc_debug_interval <= 0) {
297 /* Normal case, run properly */
298 cmp = days*24*60*60;
299 base_time = ceph_clock_now().round_to_day();
300 } else {
301 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
302 cmp = days*cct->_conf->rgw_lc_debug_interval;
303 base_time = ceph_clock_now();
304 }
305 timediff = base_time - ceph::real_clock::to_time_t(mtime);
306
307 if (expire_time) {
308 *expire_time = mtime + make_timespan(cmp);
309 }
310 ldout(cct, 20) << __func__ << "(): mtime=" << mtime << " days=" << days << " base_time=" << base_time << " timediff=" << timediff << " cmp=" << cmp << dendl;
311
312 return (timediff >= cmp);
313 }
314
315 int RGWLC::handle_multipart_expiration(
316 RGWRados::Bucket *target, const multimap<string, lc_op>& prefix_map)
317 {
318 MultipartMetaFilter mp_filter;
319 vector<rgw_bucket_dir_entry> objs;
320 RGWMPObj mp_obj;
321 bool is_truncated;
322 int ret;
323 RGWBucketInfo& bucket_info = target->get_bucket_info();
324 RGWRados::Bucket::List list_op(target);
325 auto delay_ms = cct->_conf.get_val<int64_t>("rgw_lc_thread_delay");
326 list_op.params.list_versions = false;
327 /* lifecycle processing does not depend on total order, so can
328 * take advantage of unorderd listing optimizations--such as
329 * operating on one shard at a time */
330 list_op.params.allow_unordered = true;
331 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
332 list_op.params.filter = &mp_filter;
333 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
334 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
335 continue;
336 }
337 list_op.params.prefix = prefix_iter->first;
338 do {
339 objs.clear();
340 list_op.params.marker = list_op.get_next_marker();
341 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
342 if (ret < 0) {
343 if (ret == (-ENOENT))
344 return 0;
345 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
346 return ret;
347 }
348
349 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
350 if (obj_has_expired(cct, obj_iter->meta.mtime, prefix_iter->second.mp_expiration)) {
351 rgw_obj_key key(obj_iter->key);
352 if (!mp_obj.from_meta(key.name)) {
353 continue;
354 }
355 RGWObjectCtx rctx(store);
356 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
357 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
358 ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
359 } else if (ret == -ERR_NO_SUCH_UPLOAD) {
360 ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" << ret << ", meta:" << obj_iter->key << dendl;
361 }
362 if (going_down())
363 return 0;
364 }
365 } /* for objs */
366 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
367 } while(is_truncated);
368 }
369 return 0;
370 }
371
372 static int read_obj_tags(RGWRados *store, RGWBucketInfo& bucket_info, rgw_obj& obj, RGWObjectCtx& ctx, bufferlist& tags_bl)
373 {
374 RGWRados::Object op_target(store, bucket_info, ctx, obj);
375 RGWRados::Object::Read read_op(&op_target);
376
377 return read_op.get_attr(RGW_ATTR_TAGS, tags_bl);
378 }
379
380 static bool is_valid_op(const lc_op& op)
381 {
382 return (op.status &&
383 (op.expiration > 0
384 || op.expiration_date != boost::none
385 || op.noncur_expiration > 0
386 || op.dm_expiration
387 || !op.transitions.empty()
388 || !op.noncur_transitions.empty()));
389 }
390
391 static inline bool has_all_tags(const lc_op& rule_action,
392 const RGWObjTags& object_tags)
393 {
394 for (const auto& tag : object_tags.get_tags()) {
395
396 if (! rule_action.obj_tags)
397 return false;
398
399 const auto& rule_tags = rule_action.obj_tags->get_tags();
400 const auto& iter = rule_tags.find(tag.first);
401
402 if ((iter == rule_tags.end()) ||
403 (iter->second != tag.second))
404 return false;
405 }
406 /* all tags matched */
407 return true;
408 }
409
410 class LCObjsLister {
411 RGWRados *store;
412 RGWBucketInfo& bucket_info;
413 RGWRados::Bucket target;
414 RGWRados::Bucket::List list_op;
415 bool is_truncated{false};
416 rgw_obj_key next_marker;
417 string prefix;
418 vector<rgw_bucket_dir_entry> objs;
419 vector<rgw_bucket_dir_entry>::iterator obj_iter;
420 rgw_bucket_dir_entry pre_obj;
421 int64_t delay_ms;
422
423 public:
424 LCObjsLister(RGWRados *_store, RGWBucketInfo& _bucket_info) :
425 store(_store), bucket_info(_bucket_info),
426 target(store, bucket_info), list_op(&target) {
427 list_op.params.list_versions = bucket_info.versioned();
428 list_op.params.allow_unordered = true;
429 delay_ms = store->ctx()->_conf.get_val<int64_t>("rgw_lc_thread_delay");
430 }
431
432 void set_prefix(const string& p) {
433 prefix = p;
434 list_op.params.prefix = prefix;
435 }
436
437 int init() {
438 return fetch();
439 }
440
441 int fetch() {
442 int ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
443 if (ret < 0) {
444 return ret;
445 }
446
447 obj_iter = objs.begin();
448
449 return 0;
450 }
451
452 void delay() {
453 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms));
454 }
455
456 bool get_obj(rgw_bucket_dir_entry *obj) {
457 if (obj_iter == objs.end()) {
458 delay();
459 return false;
460 }
461 if (is_truncated && (obj_iter + 1)==objs.end()) {
462 list_op.params.marker = obj_iter->key;
463
464 int ret = fetch();
465 if (ret < 0) {
466 ldout(store->ctx(), 0) << "ERROR: list_op returned ret=" << ret << dendl;
467 return ret;
468 } else {
469 obj_iter = objs.begin();
470 }
471 delay();
472 }
473 *obj = *obj_iter;
474 return true;
475 }
476
477 rgw_bucket_dir_entry get_prev_obj() {
478 return pre_obj;
479 }
480
481 void next() {
482 pre_obj = *obj_iter;
483 ++obj_iter;
484 }
485
486 bool next_has_same_name()
487 {
488 if ((obj_iter + 1) == objs.end()) {
489 /* this should have been called after get_obj() was called, so this should
490 * only happen if is_truncated is false */
491 return false;
492 }
493 return (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0);
494 }
495 };
496
497
498 struct op_env {
499 lc_op& op;
500 RGWRados *store;
501 RGWLC *lc;
502 RGWBucketInfo& bucket_info;
503 LCObjsLister& ol;
504
505 op_env(lc_op& _op, RGWRados *_store, RGWLC *_lc, RGWBucketInfo& _bucket_info,
506 LCObjsLister& _ol) : op(_op), store(_store), lc(_lc), bucket_info(_bucket_info), ol(_ol) {}
507 };
508
509 class LCRuleOp;
510
511 struct lc_op_ctx {
512 CephContext *cct;
513 op_env& env;
514 rgw_bucket_dir_entry& o;
515
516 RGWRados *store;
517 RGWBucketInfo& bucket_info;
518 lc_op& op;
519 LCObjsLister& ol;
520
521 rgw_obj obj;
522 RGWObjectCtx rctx;
523
524 lc_op_ctx(op_env& _env, rgw_bucket_dir_entry& _o) : cct(_env.store->ctx()), env(_env), o(_o),
525 store(env.store), bucket_info(env.bucket_info), op(env.op), ol(env.ol),
526 obj(env.bucket_info.bucket, o.key), rctx(env.store) {}
527 };
528
529 static int remove_expired_obj(lc_op_ctx& oc, bool remove_indeed)
530 {
531 auto& store = oc.store;
532 auto& bucket_info = oc.bucket_info;
533 auto& o = oc.o;
534 auto obj_key = o.key;
535 auto& meta = o.meta;
536
537 if (!remove_indeed) {
538 obj_key.instance.clear();
539 } else if (obj_key.instance.empty()) {
540 obj_key.instance = "null";
541 }
542
543 rgw_obj obj(bucket_info.bucket, obj_key);
544 ACLOwner obj_owner;
545 obj_owner.set_id(rgw_user {meta.owner});
546 obj_owner.set_name(meta.owner_display_name);
547
548 RGWRados::Object del_target(store, bucket_info, oc.rctx, obj);
549 RGWRados::Object::Delete del_op(&del_target);
550
551 del_op.params.bucket_owner = bucket_info.owner;
552 del_op.params.versioning_status = bucket_info.versioning_status();
553 del_op.params.obj_owner = obj_owner;
554 del_op.params.unmod_since = meta.mtime;
555
556 return del_op.delete_obj();
557 }
558
559 class LCOpAction {
560 public:
561 virtual ~LCOpAction() {}
562
563 virtual bool check(lc_op_ctx& oc, ceph::real_time *exp_time) {
564 return false;
565 };
566
567 /* called after check(). Check should tell us whether this action
568 * is applicable. If there are multiple actions, we'll end up executing
569 * the latest applicable action
570 * For example:
571 * one action after 10 days, another after 20, third after 40.
572 * After 10 days, the latest applicable action would be the first one,
573 * after 20 days it will be the second one. After 21 days it will still be the
574 * second one. So check() should return true for the second action at that point,
575 * but should_process() if the action has already been applied. In object removal
576 * it doesn't matter, but in object transition it does.
577 */
578 virtual bool should_process() {
579 return true;
580 }
581
582 virtual int process(lc_op_ctx& oc) {
583 return 0;
584 }
585 };
586
587 class LCOpFilter {
588 public:
589 virtual ~LCOpFilter() {}
590 virtual bool check(lc_op_ctx& oc) {
591 return false;
592 }
593 };
594
595 class LCOpRule {
596 friend class LCOpAction;
597
598 op_env& env;
599
600 std::vector<unique_ptr<LCOpFilter> > filters;
601 std::vector<unique_ptr<LCOpAction> > actions;
602
603 public:
604 LCOpRule(op_env& _env) : env(_env) {}
605
606 void build();
607 int process(rgw_bucket_dir_entry& o);
608 };
609
610 static int check_tags(lc_op_ctx& oc, bool *skip)
611 {
612 auto& op = oc.op;
613
614 if (op.obj_tags != boost::none) {
615 *skip = true;
616
617 bufferlist tags_bl;
618 int ret = read_obj_tags(oc.store, oc.bucket_info, oc.obj, oc.rctx, tags_bl);
619 if (ret < 0) {
620 if (ret != -ENODATA) {
621 ldout(oc.cct, 5) << "ERROR: read_obj_tags returned r=" << ret << dendl;
622 }
623 return 0;
624 }
625 RGWObjTags dest_obj_tags;
626 try {
627 auto iter = tags_bl.cbegin();
628 dest_obj_tags.decode(iter);
629 } catch (buffer::error& err) {
630 ldout(oc.cct,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl;
631 return -EIO;
632 }
633
634 if (! has_all_tags(op, dest_obj_tags)) {
635 ldout(oc.cct, 20) << __func__ << "() skipping obj " << oc.obj << " as tags do not match" << dendl;
636 return 0;
637 }
638 }
639 *skip = false;
640 return 0;
641 }
642
643 class LCOpFilter_Tags : public LCOpFilter {
644 public:
645 bool check(lc_op_ctx& oc) override {
646 auto& o = oc.o;
647
648 if (o.is_delete_marker()) {
649 return true;
650 }
651
652 bool skip;
653
654 int ret = check_tags(oc, &skip);
655 if (ret < 0) {
656 if (ret == -ENOENT) {
657 return false;
658 }
659 ldout(oc.cct, 0) << "ERROR: check_tags on obj=" << oc.obj << " returned ret=" << ret << dendl;
660 return false;
661 }
662
663 return !skip;
664 };
665 };
666
667 class LCOpAction_CurrentExpiration : public LCOpAction {
668 public:
669 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
670 auto& o = oc.o;
671 if (!o.is_current()) {
672 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not current, skipping" << dendl;
673 return false;
674 }
675
676 auto& mtime = o.meta.mtime;
677 bool is_expired;
678 auto& op = oc.op;
679 if (op.expiration <= 0) {
680 if (op.expiration_date == boost::none) {
681 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no expiration set in rule, skipping" << dendl;
682 return false;
683 }
684 is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*op.expiration_date);
685 *exp_time = *op.expiration_date;
686 } else {
687 is_expired = obj_has_expired(oc.cct, mtime, op.expiration, exp_time);
688 }
689
690 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << (int)is_expired << dendl;
691 return is_expired;
692 }
693
694 int process(lc_op_ctx& oc) {
695 auto& o = oc.o;
696 int r = remove_expired_obj(oc, !oc.bucket_info.versioned());
697 if (r < 0) {
698 ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
699 return r;
700 }
701 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << dendl;
702 return 0;
703 }
704 };
705
706 class LCOpAction_NonCurrentExpiration : public LCOpAction {
707 public:
708 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
709 auto& o = oc.o;
710 if (o.is_current()) {
711 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": current version, skipping" << dendl;
712 return false;
713 }
714
715 auto mtime = oc.ol.get_prev_obj().meta.mtime;
716 int expiration = oc.op.noncur_expiration;
717 bool is_expired = obj_has_expired(oc.cct, mtime, expiration, exp_time);
718
719 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
720 return is_expired;
721 }
722
723 int process(lc_op_ctx& oc) {
724 auto& o = oc.o;
725 int r = remove_expired_obj(oc, true);
726 if (r < 0) {
727 ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
728 return r;
729 }
730 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (non-current expiration)" << dendl;
731 return 0;
732 }
733 };
734
735 class LCOpAction_DMExpiration : public LCOpAction {
736 public:
737 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
738 auto& o = oc.o;
739 if (!o.is_delete_marker()) {
740 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": not a delete marker, skipping" << dendl;
741 return false;
742 }
743
744 if (oc.ol.next_has_same_name()) {
745 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": next is same object, skipping" << dendl;
746 return false;
747 }
748
749 *exp_time = real_clock::now();
750
751 return true;
752 }
753
754 int process(lc_op_ctx& oc) {
755 auto& o = oc.o;
756 int r = remove_expired_obj(oc, true);
757 if (r < 0) {
758 ldout(oc.cct, 0) << "ERROR: remove_expired_obj " << dendl;
759 return r;
760 }
761 ldout(oc.cct, 2) << "DELETED:" << oc.bucket_info.bucket << ":" << o.key << " (delete marker expiration)" << dendl;
762 return 0;
763 }
764 };
765
766 class LCOpAction_Transition : public LCOpAction {
767 const transition_action& transition;
768 bool need_to_process{false};
769
770 protected:
771 virtual bool check_current_state(bool is_current) = 0;
772 virtual ceph::real_time get_effective_mtime(lc_op_ctx& oc) = 0;
773 public:
774 LCOpAction_Transition(const transition_action& _transition) : transition(_transition) {}
775
776 bool check(lc_op_ctx& oc, ceph::real_time *exp_time) override {
777 auto& o = oc.o;
778
779 if (o.is_delete_marker()) {
780 return false;
781 }
782
783 if (!check_current_state(o.is_current())) {
784 return false;
785 }
786
787 auto mtime = get_effective_mtime(oc);
788 bool is_expired;
789 if (transition.days <= 0) {
790 if (transition.date == boost::none) {
791 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": no transition day/date set in rule, skipping" << dendl;
792 return false;
793 }
794 is_expired = ceph_clock_now() >= ceph::real_clock::to_time_t(*transition.date);
795 *exp_time = *transition.date;
796 } else {
797 is_expired = obj_has_expired(oc.cct, mtime, transition.days, exp_time);
798 }
799
800 ldout(oc.cct, 20) << __func__ << "(): key=" << o.key << ": is_expired=" << is_expired << dendl;
801
802 need_to_process = (rgw_placement_rule::get_canonical_storage_class(o.meta.storage_class) != transition.storage_class);
803
804 return is_expired;
805 }
806
807 bool should_process() override {
808 return need_to_process;
809 }
810
811 int process(lc_op_ctx& oc) {
812 auto& o = oc.o;
813
814 rgw_placement_rule target_placement;
815 target_placement.inherit_from(oc.bucket_info.placement_rule);
816 target_placement.storage_class = transition.storage_class;
817
818 int r = oc.store->transition_obj(oc.rctx, oc.bucket_info, oc.obj,
819 target_placement, o.meta.mtime, o.versioned_epoch);
820 if (r < 0) {
821 ldout(oc.cct, 0) << "ERROR: failed to transition obj (r=" << r << ")" << dendl;
822 return r;
823 }
824 ldout(oc.cct, 2) << "TRANSITIONED:" << oc.bucket_info.bucket << ":" << o.key << " -> " << transition.storage_class << dendl;
825 return 0;
826 }
827 };
828
829 class LCOpAction_CurrentTransition : public LCOpAction_Transition {
830 protected:
831 bool check_current_state(bool is_current) override {
832 return is_current;
833 }
834
835 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
836 return oc.o.meta.mtime;
837 }
838 public:
839 LCOpAction_CurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
840 };
841
842 class LCOpAction_NonCurrentTransition : public LCOpAction_Transition {
843 protected:
844 bool check_current_state(bool is_current) override {
845 return !is_current;
846 }
847
848 ceph::real_time get_effective_mtime(lc_op_ctx& oc) override {
849 return oc.ol.get_prev_obj().meta.mtime;
850 }
851 public:
852 LCOpAction_NonCurrentTransition(const transition_action& _transition) : LCOpAction_Transition(_transition) {}
853 };
854
855 void LCOpRule::build()
856 {
857 filters.emplace_back(new LCOpFilter_Tags);
858
859 auto& op = env.op;
860
861 if (op.expiration > 0 ||
862 op.expiration_date != boost::none) {
863 actions.emplace_back(new LCOpAction_CurrentExpiration);
864 }
865
866 if (op.dm_expiration) {
867 actions.emplace_back(new LCOpAction_DMExpiration);
868 }
869
870 if (op.noncur_expiration > 0) {
871 actions.emplace_back(new LCOpAction_NonCurrentExpiration);
872 }
873
874 for (auto& iter : op.transitions) {
875 actions.emplace_back(new LCOpAction_CurrentTransition(iter.second));
876 }
877
878 for (auto& iter : op.noncur_transitions) {
879 actions.emplace_back(new LCOpAction_NonCurrentTransition(iter.second));
880 }
881 }
882
883 int LCOpRule::process(rgw_bucket_dir_entry& o)
884 {
885 lc_op_ctx ctx(env, o);
886
887 unique_ptr<LCOpAction> *selected = nullptr;
888 real_time exp;
889
890 for (auto& a : actions) {
891 real_time action_exp;
892
893 if (a->check(ctx, &action_exp)) {
894 if (action_exp > exp) {
895 exp = action_exp;
896 selected = &a;
897 }
898 }
899 }
900
901 if (selected &&
902 (*selected)->should_process()) {
903
904 /*
905 * Calling filter checks after action checks because
906 * all action checks (as they are implemented now) do
907 * not access the objects themselves, but return result
908 * from info from bucket index listing. The current tags filter
909 * check does access the objects, so we avoid unnecessary rados calls
910 * having filters check later in the process.
911 */
912
913 bool cont = false;
914 for (auto& f : filters) {
915 if (f->check(ctx)) {
916 cont = true;
917 break;
918 }
919 }
920
921 if (!cont) {
922 ldout(env.store->ctx(), 20) << __func__ << "(): key=" << o.key << ": no rule match, skipping" << dendl;
923 return 0;
924 }
925
926 int r = (*selected)->process(ctx);
927 if (r < 0) {
928 ldout(ctx.cct, 0) << "ERROR: remove_expired_obj " << dendl;
929 return r;
930 }
931 ldout(ctx.cct, 20) << "processed:" << env.bucket_info.bucket << ":" << o.key << dendl;
932 }
933
934 return 0;
935
936 }
937
938 int RGWLC::bucket_lc_process(string& shard_id)
939 {
940 RGWLifecycleConfiguration config(cct);
941 RGWBucketInfo bucket_info;
942 map<string, bufferlist> bucket_attrs;
943 string no_ns, list_versions;
944 vector<rgw_bucket_dir_entry> objs;
945 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
946 vector<std::string> result;
947 boost::split(result, shard_id, boost::is_any_of(":"));
948 string bucket_tenant = result[0];
949 string bucket_name = result[1];
950 string bucket_marker = result[2];
951 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
952 if (ret < 0) {
953 ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name << " failed" << dendl;
954 return ret;
955 }
956
957 if (bucket_info.bucket.marker != bucket_marker) {
958 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
959 << ":" << bucket_name << " cur_marker=" << bucket_info.bucket.marker
960 << " orig_marker=" << bucket_marker << dendl;
961 return -ENOENT;
962 }
963
964 RGWRados::Bucket target(store, bucket_info);
965
966 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
967 if (aiter == bucket_attrs.end())
968 return 0;
969
970 bufferlist::const_iterator iter{&aiter->second};
971 try {
972 config.decode(iter);
973 } catch (const buffer::error& e) {
974 ldpp_dout(this, 0) << __func__ << "() decode life cycle config failed" << dendl;
975 return -1;
976 }
977
978 multimap<string, lc_op>& prefix_map = config.get_prefix_map();
979
980 ldpp_dout(this, 10) << __func__ << "() prefix_map size="
981 << prefix_map.size()
982 << dendl;
983
984 rgw_obj_key pre_marker;
985 rgw_obj_key next_marker;
986 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
987 auto& op = prefix_iter->second;
988 if (!is_valid_op(op)) {
989 continue;
990 }
991 ldpp_dout(this, 20) << __func__ << "(): prefix=" << prefix_iter->first << dendl;
992 if (prefix_iter != prefix_map.begin() &&
993 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
994 next_marker = pre_marker;
995 } else {
996 pre_marker = next_marker;
997 }
998
999 LCObjsLister ol(store, bucket_info);
1000 ol.set_prefix(prefix_iter->first);
1001
1002 ret = ol.init();
1003
1004 if (ret < 0) {
1005 if (ret == (-ENOENT))
1006 return 0;
1007 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl;
1008 return ret;
1009 }
1010
1011 op_env oenv(op, store, this, bucket_info, ol);
1012
1013 LCOpRule orule(oenv);
1014
1015 orule.build();
1016
1017 ceph::real_time mtime;
1018 rgw_bucket_dir_entry o;
1019 for (; ol.get_obj(&o); ol.next()) {
1020 ldpp_dout(this, 20) << __func__ << "(): key=" << o.key << dendl;
1021 int ret = orule.process(o);
1022 if (ret < 0) {
1023 ldpp_dout(this, 20) << "ERROR: orule.process() returned ret="
1024 << ret
1025 << dendl;
1026 }
1027
1028 if (going_down()) {
1029 return 0;
1030 }
1031 }
1032 }
1033
1034 ret = handle_multipart_expiration(&target, prefix_map);
1035
1036 return ret;
1037 }
1038
1039 int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
1040 {
1041 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
1042
1043 rados::cls::lock::Lock l(lc_index_lock_name);
1044 l.set_cookie(cookie);
1045 l.set_duration(lock_duration);
1046
1047 do {
1048 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
1049 if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
1050 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1051 << obj_names[index] << ", sleep 5, try again" << dendl;
1052 sleep(5);
1053 continue;
1054 }
1055 if (ret < 0)
1056 return 0;
1057 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names[index] << dendl;
1058 if (result == -ENOENT) {
1059 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
1060 if (ret < 0) {
1061 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1062 << obj_names[index] << dendl;
1063 }
1064 goto clean;
1065 } else if (result < 0) {
1066 entry.second = lc_failed;
1067 } else {
1068 entry.second = lc_complete;
1069 }
1070
1071 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
1072 if (ret < 0) {
1073 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1074 << obj_names[index] << dendl;
1075 }
1076 clean:
1077 l.unlock(&store->lc_pool_ctx, obj_names[index]);
1078 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names[index] << dendl;
1079 return 0;
1080 } while (true);
1081 }
1082
1083 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
1084 {
1085 int index = 0;
1086 progress_map->clear();
1087 for(; index <max_objs; index++) {
1088 map<string, int > entries;
1089 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
1090 if (ret < 0) {
1091 if (ret == -ENOENT) {
1092 ldpp_dout(this, 10) << __func__ << "() ignoring unfound lc object="
1093 << obj_names[index] << dendl;
1094 continue;
1095 } else {
1096 return ret;
1097 }
1098 }
1099 map<string, int>::iterator iter;
1100 for (iter = entries.begin(); iter != entries.end(); ++iter) {
1101 progress_map->insert(*iter);
1102 }
1103 }
1104 return 0;
1105 }
1106
1107 int RGWLC::process()
1108 {
1109 int max_secs = cct->_conf->rgw_lc_lock_max_time;
1110
1111 const int start = ceph::util::generate_random_number(0, max_objs - 1);
1112
1113 for (int i = 0; i < max_objs; i++) {
1114 int index = (i + start) % max_objs;
1115 int ret = process(index, max_secs);
1116 if (ret < 0)
1117 return ret;
1118 }
1119
1120 return 0;
1121 }
1122
1123 int RGWLC::process(int index, int max_lock_secs)
1124 {
1125 rados::cls::lock::Lock l(lc_index_lock_name);
1126 do {
1127 utime_t now = ceph_clock_now();
1128 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
1129 if (max_lock_secs <= 0)
1130 return -EAGAIN;
1131
1132 utime_t time(max_lock_secs, 0);
1133 l.set_duration(time);
1134
1135 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
1136 if (ret == -EBUSY || ret == -EEXIST) { /* already locked by another lc processor */
1137 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
1138 << obj_names[index] << ", sleep 5, try again" << dendl;
1139 sleep(5);
1140 continue;
1141 }
1142 if (ret < 0)
1143 return 0;
1144
1145 cls_rgw_lc_obj_head head;
1146 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
1147 if (ret < 0) {
1148 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
1149 << obj_names[index] << ", ret=" << ret << dendl;
1150 goto exit;
1151 }
1152
1153 if(!if_already_run_today(head.start_date)) {
1154 head.start_date = now;
1155 head.marker.clear();
1156 ret = bucket_lc_prepare(index);
1157 if (ret < 0) {
1158 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
1159 << obj_names[index] << ", ret=" << ret << dendl;
1160 goto exit;
1161 }
1162 }
1163
1164 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
1165 if (ret < 0) {
1166 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
1167 << obj_names[index] << dendl;
1168 goto exit;
1169 }
1170
1171 if (entry.first.empty())
1172 goto exit;
1173
1174 entry.second = lc_processing;
1175 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
1176 if (ret < 0) {
1177 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names[index]
1178 << " (" << entry.first << "," << entry.second << ")" << dendl;
1179 goto exit;
1180 }
1181
1182 head.marker = entry.first;
1183 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
1184 if (ret < 0) {
1185 ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
1186 goto exit;
1187 }
1188 l.unlock(&store->lc_pool_ctx, obj_names[index]);
1189 ret = bucket_lc_process(entry.first);
1190 bucket_lc_post(index, max_lock_secs, entry, ret);
1191 }while(1);
1192
1193 exit:
1194 l.unlock(&store->lc_pool_ctx, obj_names[index]);
1195 return 0;
1196 }
1197
1198 void RGWLC::start_processor()
1199 {
1200 worker = new LCWorker(this, cct, this);
1201 worker->create("lifecycle_thr");
1202 }
1203
1204 void RGWLC::stop_processor()
1205 {
1206 down_flag = true;
1207 if (worker) {
1208 worker->stop();
1209 worker->join();
1210 }
1211 delete worker;
1212 worker = NULL;
1213 }
1214
1215
1216 unsigned RGWLC::get_subsys() const
1217 {
1218 return dout_subsys;
1219 }
1220
1221 std::ostream& RGWLC::gen_prefix(std::ostream& out) const
1222 {
1223 return out << "lifecycle: ";
1224 }
1225
1226 void RGWLC::LCWorker::stop()
1227 {
1228 Mutex::Locker l(lock);
1229 cond.Signal();
1230 }
1231
1232 bool RGWLC::going_down()
1233 {
1234 return down_flag;
1235 }
1236
1237 bool RGWLC::LCWorker::should_work(utime_t& now)
1238 {
1239 int start_hour;
1240 int start_minute;
1241 int end_hour;
1242 int end_minute;
1243 string worktime = cct->_conf->rgw_lifecycle_work_time;
1244 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
1245 struct tm bdt;
1246 time_t tt = now.sec();
1247 localtime_r(&tt, &bdt);
1248
1249 if (cct->_conf->rgw_lc_debug_interval > 0) {
1250 /* We're debugging, so say we can run */
1251 return true;
1252 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
1253 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
1254 return true;
1255 } else {
1256 return false;
1257 }
1258
1259 }
1260
1261 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
1262 {
1263 int secs;
1264
1265 if (cct->_conf->rgw_lc_debug_interval > 0) {
1266 secs = start + cct->_conf->rgw_lc_debug_interval - now;
1267 if (secs < 0)
1268 secs = 0;
1269 return (secs);
1270 }
1271
1272 int start_hour;
1273 int start_minute;
1274 int end_hour;
1275 int end_minute;
1276 string worktime = cct->_conf->rgw_lifecycle_work_time;
1277 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
1278 struct tm bdt;
1279 time_t tt = now.sec();
1280 time_t nt;
1281 localtime_r(&tt, &bdt);
1282 bdt.tm_hour = start_hour;
1283 bdt.tm_min = start_minute;
1284 bdt.tm_sec = 0;
1285 nt = mktime(&bdt);
1286 secs = nt - tt;
1287
1288 return secs>0 ? secs : secs+24*60*60;
1289 }
1290
1291 void RGWLifecycleConfiguration::generate_test_instances(list<RGWLifecycleConfiguration*>& o)
1292 {
1293 o.push_back(new RGWLifecycleConfiguration);
1294 }
1295
1296 void get_lc_oid(CephContext *cct, const string& shard_id, string *oid)
1297 {
1298 int max_objs = (cct->_conf->rgw_lc_max_objs > HASH_PRIME ? HASH_PRIME : cct->_conf->rgw_lc_max_objs);
1299 int index = ceph_str_hash_linux(shard_id.c_str(), shard_id.size()) % HASH_PRIME % max_objs;
1300 *oid = lc_oid_prefix;
1301 char buf[32];
1302 snprintf(buf, 32, ".%d", index);
1303 oid->append(buf);
1304 return;
1305 }
1306
1307
1308
1309 static std::string get_lc_shard_name(const rgw_bucket& bucket){
1310 return string_join_reserve(':', bucket.tenant, bucket.name, bucket.marker);
1311 }
1312
1313 template<typename F>
1314 static int guard_lc_modify(RGWRados* store, const rgw_bucket& bucket, const string& cookie, const F& f) {
1315 CephContext *cct = store->ctx();
1316
1317 string shard_id = get_lc_shard_name(bucket);
1318
1319 string oid;
1320 get_lc_oid(cct, shard_id, &oid);
1321
1322 pair<string, int> entry(shard_id, lc_uninitial);
1323 int max_lock_secs = cct->_conf->rgw_lc_lock_max_time;
1324
1325 rados::cls::lock::Lock l(lc_index_lock_name);
1326 utime_t time(max_lock_secs, 0);
1327 l.set_duration(time);
1328 l.set_cookie(cookie);
1329
1330 librados::IoCtx *ctx = store->get_lc_pool_ctx();
1331 int ret;
1332
1333 do {
1334 ret = l.lock_exclusive(ctx, oid);
1335 if (ret == -EBUSY || ret == -EEXIST) {
1336 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1337 << oid << ", sleep 5, try again" << dendl;
1338 sleep(5); // XXX: return retryable error
1339 continue;
1340 }
1341 if (ret < 0) {
1342 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1343 << oid << ", ret=" << ret << dendl;
1344 break;
1345 }
1346 ret = f(ctx, oid, entry);
1347 if (ret < 0) {
1348 ldout(cct, 0) << "RGWLC::RGWPutLC() failed to set entry on "
1349 << oid << ", ret=" << ret << dendl;
1350 }
1351 break;
1352 } while(true);
1353 l.unlock(ctx, oid);
1354 return ret;
1355 }
1356
1357 int RGWLC::set_bucket_config(RGWBucketInfo& bucket_info,
1358 const map<string, bufferlist>& bucket_attrs,
1359 RGWLifecycleConfiguration *config)
1360 {
1361 map<string, bufferlist> attrs = bucket_attrs;
1362 bufferlist lc_bl;
1363 config->encode(lc_bl);
1364
1365 attrs[RGW_ATTR_LC] = std::move(lc_bl);
1366
1367 int ret = rgw_bucket_set_attrs(store, bucket_info, attrs, &bucket_info.objv_tracker);
1368 if (ret < 0)
1369 return ret;
1370
1371 rgw_bucket& bucket = bucket_info.bucket;
1372
1373
1374 ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
1375 const pair<string, int>& entry) {
1376 return cls_rgw_lc_set_entry(*ctx, oid, entry);
1377 });
1378
1379 return ret;
1380 }
1381
1382 int RGWLC::remove_bucket_config(RGWBucketInfo& bucket_info,
1383 const map<string, bufferlist>& bucket_attrs)
1384 {
1385 map<string, bufferlist> attrs = bucket_attrs;
1386 attrs.erase(RGW_ATTR_LC);
1387 int ret = rgw_bucket_set_attrs(store, bucket_info, attrs,
1388 &bucket_info.objv_tracker);
1389
1390 rgw_bucket& bucket = bucket_info.bucket;
1391
1392 if (ret < 0) {
1393 ldout(cct, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
1394 << bucket.name << " returned err=" << ret << dendl;
1395 return ret;
1396 }
1397
1398
1399 ret = guard_lc_modify(store, bucket, cookie, [&](librados::IoCtx *ctx, const string& oid,
1400 const pair<string, int>& entry) {
1401 return cls_rgw_lc_rm_entry(*ctx, oid, entry);
1402 });
1403
1404 return ret;
1405 }
1406
1407 namespace rgw::lc {
1408
1409 int fix_lc_shard_entry(RGWRados* store, const RGWBucketInfo& bucket_info,
1410 const map<std::string,bufferlist>& battrs)
1411 {
1412 if (auto aiter = battrs.find(RGW_ATTR_LC);
1413 aiter == battrs.end()) {
1414 return 0; // No entry, nothing to fix
1415 }
1416
1417 auto shard_name = get_lc_shard_name(bucket_info.bucket);
1418 std::string lc_oid;
1419 get_lc_oid(store->ctx(), shard_name, &lc_oid);
1420
1421 rgw_lc_entry_t entry;
1422 // There are multiple cases we need to encounter here
1423 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
1424 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
1425 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
1426 // We are not dropping the old marker here as that would be caught by the next LC process update
1427 auto lc_pool_ctx = store->get_lc_pool_ctx();
1428 int ret = cls_rgw_lc_get_entry(*lc_pool_ctx,
1429 lc_oid, shard_name, entry);
1430 if (ret == 0) {
1431 ldout(store->ctx(), 5) << "Entry already exists, nothing to do" << dendl;
1432 return ret; // entry is already existing correctly set to marker
1433 }
1434 ldout(store->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret << dendl;
1435 if (ret == -ENOENT) {
1436 ldout(store->ctx(), 1) << "No entry for bucket=" << bucket_info.bucket.name
1437 << " creating " << dendl;
1438 // TODO: we have too many ppl making cookies like this!
1439 char cookie_buf[COOKIE_LEN + 1];
1440 gen_rand_alphanumeric(store->ctx(), cookie_buf, sizeof(cookie_buf) - 1);
1441 std::string cookie = cookie_buf;
1442
1443 ret = guard_lc_modify(store, bucket_info.bucket, cookie,
1444 [&lc_pool_ctx, &lc_oid](librados::IoCtx *ctx, const string& oid,
1445 const pair<string, int>& entry) {
1446 return cls_rgw_lc_set_entry(*lc_pool_ctx,
1447 lc_oid, entry);
1448 });
1449
1450 }
1451
1452 return ret;
1453 }
1454
1455 }