1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
8 #include <boost/algorithm/string/split.hpp>
9 #include <boost/algorithm/string.hpp>
11 #include "common/Formatter.h"
12 #include <common/errno.h>
13 #include "include/random.h"
14 #include "cls/rgw/cls_rgw_client.h"
15 #include "cls/lock/cls_lock_client.h"
16 #include "rgw_common.h"
17 #include "rgw_bucket.h"
19 #include "rgw_string.h"
21 #include "services/svc_sys_obj.h"
23 #define dout_context g_ceph_context
24 #define dout_subsys ceph_subsys_rgw
26 const char* LC_STATUS
[] = {
33 using namespace librados
;
35 bool LCRule::valid() const
37 if (id
.length() > MAX_ID_LEN
) {
40 else if(expiration
.empty() && noncur_expiration
.empty() && mp_expiration
.empty() && !dm_expiration
&&
41 transitions
.empty() && noncur_transitions
.empty()) {
44 else if (!expiration
.valid() || !noncur_expiration
.valid() || !mp_expiration
.valid()) {
47 if (!transitions
.empty()) {
48 bool using_days
= expiration
.has_days();
49 bool using_date
= expiration
.has_date();
50 for (const auto& elem
: transitions
) {
51 if (!elem
.second
.valid()) {
54 using_days
= using_days
|| elem
.second
.has_days();
55 using_date
= using_date
|| elem
.second
.has_date();
56 if (using_days
&& using_date
) {
61 for (const auto& elem
: noncur_transitions
) {
62 if (!elem
.second
.valid()) {
70 void LCRule::init_simple_days_rule(std::string_view _id
, std::string_view _prefix
, int num_days
)
75 snprintf(buf
, sizeof(buf
), "%d", num_days
);
76 expiration
.set_days(buf
);
80 void RGWLifecycleConfiguration::add_rule(const LCRule
& rule
)
82 auto& id
= rule
.get_id(); // note that this will return false for groups, but that's ok, we won't search groups
83 rule_map
.insert(pair
<string
, LCRule
>(id
, rule
));
86 bool RGWLifecycleConfiguration::_add_rule(const LCRule
& rule
)
88 lc_op
op(rule
.get_id());
89 op
.status
= rule
.is_enabled();
90 if (rule
.get_expiration().has_days()) {
91 op
.expiration
= rule
.get_expiration().get_days();
93 if (rule
.get_expiration().has_date()) {
94 op
.expiration_date
= ceph::from_iso_8601(rule
.get_expiration().get_date());
96 if (rule
.get_noncur_expiration().has_days()) {
97 op
.noncur_expiration
= rule
.get_noncur_expiration().get_days();
99 if (rule
.get_mp_expiration().has_days()) {
100 op
.mp_expiration
= rule
.get_mp_expiration().get_days();
102 op
.dm_expiration
= rule
.get_dm_expiration();
103 for (const auto &elem
: rule
.get_transitions()) {
104 transition_action action
;
105 if (elem
.second
.has_days()) {
106 action
.days
= elem
.second
.get_days();
108 action
.date
= ceph::from_iso_8601(elem
.second
.get_date());
110 action
.storage_class
= rgw_placement_rule::get_canonical_storage_class(elem
.first
);
111 op
.transitions
.emplace(elem
.first
, std::move(action
));
113 for (const auto &elem
: rule
.get_noncur_transitions()) {
114 transition_action action
;
115 action
.days
= elem
.second
.get_days();
116 action
.date
= ceph::from_iso_8601(elem
.second
.get_date());
117 action
.storage_class
= elem
.first
;
118 op
.noncur_transitions
.emplace(elem
.first
, std::move(action
));
121 if (rule
.get_filter().has_prefix()){
122 prefix
= rule
.get_filter().get_prefix();
124 prefix
= rule
.get_prefix();
127 if (rule
.get_filter().has_tags()){
128 op
.obj_tags
= rule
.get_filter().get_tags();
130 prefix_map
.emplace(std::move(prefix
), std::move(op
));
134 int RGWLifecycleConfiguration::check_and_add_rule(const LCRule
& rule
)
139 auto& id
= rule
.get_id();
140 if (rule_map
.find(id
) != rule_map
.end()) { //id shouldn't be the same
143 rule_map
.insert(pair
<string
, LCRule
>(id
, rule
));
145 if (!_add_rule(rule
)) {
146 return -ERR_INVALID_REQUEST
;
151 bool RGWLifecycleConfiguration::has_same_action(const lc_op
& first
, const lc_op
& second
) {
152 if ((first
.expiration
> 0 || first
.expiration_date
!= boost::none
) &&
153 (second
.expiration
> 0 || second
.expiration_date
!= boost::none
)) {
155 } else if (first
.noncur_expiration
> 0 && second
.noncur_expiration
> 0) {
157 } else if (first
.mp_expiration
> 0 && second
.mp_expiration
> 0) {
159 } else if (!first
.transitions
.empty() && !second
.transitions
.empty()) {
160 for (auto &elem
: first
.transitions
) {
161 if (second
.transitions
.find(elem
.first
) != second
.transitions
.end()) {
165 } else if (!first
.noncur_transitions
.empty() && !second
.noncur_transitions
.empty()) {
166 for (auto &elem
: first
.noncur_transitions
) {
167 if (second
.noncur_transitions
.find(elem
.first
) != second
.noncur_transitions
.end()) {
175 /* Formerly, this method checked for duplicate rules using an invalid
176 * method (prefix uniqueness). */
177 bool RGWLifecycleConfiguration::valid()
182 void *RGWLC::LCWorker::entry() {
184 utime_t start
= ceph_clock_now();
185 if (should_work(start
)) {
186 ldpp_dout(dpp
, 2) << "life cycle: start" << dendl
;
187 int r
= lc
->process();
189 ldpp_dout(dpp
, 0) << "ERROR: do life cycle process() returned error r=" << r
<< dendl
;
191 ldpp_dout(dpp
, 2) << "life cycle: stop" << dendl
;
193 if (lc
->going_down())
196 utime_t end
= ceph_clock_now();
197 int secs
= schedule_next_start_time(start
, end
);
199 next
.set_from_double(end
+ secs
);
201 ldpp_dout(dpp
, 5) << "schedule life cycle next start time: " << rgw_to_asctime(next
) << dendl
;
204 cond
.WaitInterval(lock
, utime_t(secs
, 0));
206 } while (!lc
->going_down());
211 void RGWLC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
214 max_objs
= cct
->_conf
->rgw_lc_max_objs
;
215 if (max_objs
> HASH_PRIME
)
216 max_objs
= HASH_PRIME
;
218 obj_names
= new string
[max_objs
];
220 for (int i
= 0; i
< max_objs
; i
++) {
221 obj_names
[i
] = lc_oid_prefix
;
223 snprintf(buf
, 32, ".%d", i
);
224 obj_names
[i
].append(buf
);
227 #define COOKIE_LEN 16
228 char cookie_buf
[COOKIE_LEN
+ 1];
229 gen_rand_alphanumeric(cct
, cookie_buf
, sizeof(cookie_buf
) - 1);
233 void RGWLC::finalize()
238 bool RGWLC::if_already_run_today(time_t& start_date
)
242 utime_t now
= ceph_clock_now();
243 localtime_r(&start_date
, &bdt
);
245 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
246 if (now
- start_date
< cct
->_conf
->rgw_lc_debug_interval
)
255 begin_of_day
= mktime(&bdt
);
256 if (now
- begin_of_day
< 24*60*60)
262 int RGWLC::bucket_lc_prepare(int index
)
264 map
<string
, int > entries
;
268 #define MAX_LC_LIST_ENTRIES 100
270 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, MAX_LC_LIST_ENTRIES
, entries
);
273 map
<string
, int>::iterator iter
;
274 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
275 pair
<string
, int > entry(iter
->first
, lc_uninitial
);
276 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
278 ldpp_dout(this, 0) << "RGWLC::bucket_lc_prepare() failed to set entry on "
279 << obj_names
[index
] << dendl
;
284 if (!entries
.empty()) {
285 marker
= std::move(entries
.rbegin()->first
);
287 } while (!entries
.empty());
292 static bool obj_has_expired(CephContext
*cct
, ceph::real_time mtime
, int days
, ceph::real_time
*expire_time
= nullptr)
294 double timediff
, cmp
;
296 if (cct
->_conf
->rgw_lc_debug_interval
<= 0) {
297 /* Normal case, run properly */
299 base_time
= ceph_clock_now().round_to_day();
301 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
302 cmp
= days
*cct
->_conf
->rgw_lc_debug_interval
;
303 base_time
= ceph_clock_now();
305 timediff
= base_time
- ceph::real_clock::to_time_t(mtime
);
308 *expire_time
= mtime
+ make_timespan(cmp
);
310 ldout(cct
, 20) << __func__
<< "(): mtime=" << mtime
<< " days=" << days
<< " base_time=" << base_time
<< " timediff=" << timediff
<< " cmp=" << cmp
<< dendl
;
312 return (timediff
>= cmp
);
315 int RGWLC::handle_multipart_expiration(
316 RGWRados::Bucket
*target
, const multimap
<string
, lc_op
>& prefix_map
)
318 MultipartMetaFilter mp_filter
;
319 vector
<rgw_bucket_dir_entry
> objs
;
323 RGWBucketInfo
& bucket_info
= target
->get_bucket_info();
324 RGWRados::Bucket::List
list_op(target
);
325 auto delay_ms
= cct
->_conf
.get_val
<int64_t>("rgw_lc_thread_delay");
326 list_op
.params
.list_versions
= false;
327 /* lifecycle processing does not depend on total order, so can
328 * take advantage of unorderd listing optimizations--such as
329 * operating on one shard at a time */
330 list_op
.params
.allow_unordered
= true;
331 list_op
.params
.ns
= RGW_OBJ_NS_MULTIPART
;
332 list_op
.params
.filter
= &mp_filter
;
333 for (auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
334 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.mp_expiration
<= 0) {
337 list_op
.params
.prefix
= prefix_iter
->first
;
340 list_op
.params
.marker
= list_op
.get_next_marker();
341 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
343 if (ret
== (-ENOENT
))
345 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl
;
349 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
350 if (obj_has_expired(cct
, obj_iter
->meta
.mtime
, prefix_iter
->second
.mp_expiration
)) {
351 rgw_obj_key
key(obj_iter
->key
);
352 if (!mp_obj
.from_meta(key
.name
)) {
355 RGWObjectCtx
rctx(store
);
356 ret
= abort_multipart_upload(store
, cct
, &rctx
, bucket_info
, mp_obj
);
357 if (ret
< 0 && ret
!= -ERR_NO_SUCH_UPLOAD
) {
358 ldpp_dout(this, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret
<< ", meta:" << obj_iter
->key
<< dendl
;
359 } else if (ret
== -ERR_NO_SUCH_UPLOAD
) {
360 ldpp_dout(this, 5) << "ERROR: abort_multipart_upload failed, ret=" << ret
<< ", meta:" << obj_iter
->key
<< dendl
;
366 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms
));
367 } while(is_truncated
);
372 static int read_obj_tags(RGWRados
*store
, RGWBucketInfo
& bucket_info
, rgw_obj
& obj
, RGWObjectCtx
& ctx
, bufferlist
& tags_bl
)
374 RGWRados::Object
op_target(store
, bucket_info
, ctx
, obj
);
375 RGWRados::Object::Read
read_op(&op_target
);
377 return read_op
.get_attr(RGW_ATTR_TAGS
, tags_bl
);
380 static bool is_valid_op(const lc_op
& op
)
384 || op
.expiration_date
!= boost::none
385 || op
.noncur_expiration
> 0
387 || !op
.transitions
.empty()
388 || !op
.noncur_transitions
.empty()));
391 static inline bool has_all_tags(const lc_op
& rule_action
,
392 const RGWObjTags
& object_tags
)
394 for (const auto& tag
: object_tags
.get_tags()) {
396 if (! rule_action
.obj_tags
)
399 const auto& rule_tags
= rule_action
.obj_tags
->get_tags();
400 const auto& iter
= rule_tags
.find(tag
.first
);
402 if ((iter
== rule_tags
.end()) ||
403 (iter
->second
!= tag
.second
))
406 /* all tags matched */
412 RGWBucketInfo
& bucket_info
;
413 RGWRados::Bucket target
;
414 RGWRados::Bucket::List list_op
;
415 bool is_truncated
{false};
416 rgw_obj_key next_marker
;
418 vector
<rgw_bucket_dir_entry
> objs
;
419 vector
<rgw_bucket_dir_entry
>::iterator obj_iter
;
420 rgw_bucket_dir_entry pre_obj
;
424 LCObjsLister(RGWRados
*_store
, RGWBucketInfo
& _bucket_info
) :
425 store(_store
), bucket_info(_bucket_info
),
426 target(store
, bucket_info
), list_op(&target
) {
427 list_op
.params
.list_versions
= bucket_info
.versioned();
428 list_op
.params
.allow_unordered
= true;
429 delay_ms
= store
->ctx()->_conf
.get_val
<int64_t>("rgw_lc_thread_delay");
432 void set_prefix(const string
& p
) {
434 list_op
.params
.prefix
= prefix
;
442 int ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
447 obj_iter
= objs
.begin();
453 std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms
));
456 bool get_obj(rgw_bucket_dir_entry
*obj
) {
457 if (obj_iter
== objs
.end()) {
461 if (is_truncated
&& (obj_iter
+ 1)==objs
.end()) {
462 list_op
.params
.marker
= obj_iter
->key
;
466 ldout(store
->ctx(), 0) << "ERROR: list_op returned ret=" << ret
<< dendl
;
469 obj_iter
= objs
.begin();
477 rgw_bucket_dir_entry
get_prev_obj() {
486 bool next_has_same_name()
488 if ((obj_iter
+ 1) == objs
.end()) {
489 /* this should have been called after get_obj() was called, so this should
490 * only happen if is_truncated is false */
493 return (obj_iter
->key
.name
.compare((obj_iter
+ 1)->key
.name
) == 0);
502 RGWBucketInfo
& bucket_info
;
505 op_env(lc_op
& _op
, RGWRados
*_store
, RGWLC
*_lc
, RGWBucketInfo
& _bucket_info
,
506 LCObjsLister
& _ol
) : op(_op
), store(_store
), lc(_lc
), bucket_info(_bucket_info
), ol(_ol
) {}
514 rgw_bucket_dir_entry
& o
;
517 RGWBucketInfo
& bucket_info
;
524 lc_op_ctx(op_env
& _env
, rgw_bucket_dir_entry
& _o
) : cct(_env
.store
->ctx()), env(_env
), o(_o
),
525 store(env
.store
), bucket_info(env
.bucket_info
), op(env
.op
), ol(env
.ol
),
526 obj(env
.bucket_info
.bucket
, o
.key
), rctx(env
.store
) {}
529 static int remove_expired_obj(lc_op_ctx
& oc
, bool remove_indeed
)
531 auto& store
= oc
.store
;
532 auto& bucket_info
= oc
.bucket_info
;
534 auto obj_key
= o
.key
;
537 if (!remove_indeed
) {
538 obj_key
.instance
.clear();
539 } else if (obj_key
.instance
.empty()) {
540 obj_key
.instance
= "null";
543 rgw_obj
obj(bucket_info
.bucket
, obj_key
);
545 obj_owner
.set_id(rgw_user
{meta
.owner
});
546 obj_owner
.set_name(meta
.owner_display_name
);
548 RGWRados::Object
del_target(store
, bucket_info
, oc
.rctx
, obj
);
549 RGWRados::Object::Delete
del_op(&del_target
);
551 del_op
.params
.bucket_owner
= bucket_info
.owner
;
552 del_op
.params
.versioning_status
= bucket_info
.versioning_status();
553 del_op
.params
.obj_owner
= obj_owner
;
554 del_op
.params
.unmod_since
= meta
.mtime
;
556 return del_op
.delete_obj();
561 virtual ~LCOpAction() {}
563 virtual bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
) {
567 /* called after check(). Check should tell us whether this action
568 * is applicable. If there are multiple actions, we'll end up executing
569 * the latest applicable action
571 * one action after 10 days, another after 20, third after 40.
572 * After 10 days, the latest applicable action would be the first one,
573 * after 20 days it will be the second one. After 21 days it will still be the
574 * second one. So check() should return true for the second action at that point,
575 * but should_process() if the action has already been applied. In object removal
576 * it doesn't matter, but in object transition it does.
578 virtual bool should_process() {
582 virtual int process(lc_op_ctx
& oc
) {
589 virtual ~LCOpFilter() {}
590 virtual bool check(lc_op_ctx
& oc
) {
596 friend class LCOpAction
;
600 std::vector
<unique_ptr
<LCOpFilter
> > filters
;
601 std::vector
<unique_ptr
<LCOpAction
> > actions
;
604 LCOpRule(op_env
& _env
) : env(_env
) {}
607 int process(rgw_bucket_dir_entry
& o
);
610 static int check_tags(lc_op_ctx
& oc
, bool *skip
)
614 if (op
.obj_tags
!= boost::none
) {
618 int ret
= read_obj_tags(oc
.store
, oc
.bucket_info
, oc
.obj
, oc
.rctx
, tags_bl
);
620 if (ret
!= -ENODATA
) {
621 ldout(oc
.cct
, 5) << "ERROR: read_obj_tags returned r=" << ret
<< dendl
;
625 RGWObjTags dest_obj_tags
;
627 auto iter
= tags_bl
.cbegin();
628 dest_obj_tags
.decode(iter
);
629 } catch (buffer::error
& err
) {
630 ldout(oc
.cct
,0) << "ERROR: caught buffer::error, couldn't decode TagSet" << dendl
;
634 if (! has_all_tags(op
, dest_obj_tags
)) {
635 ldout(oc
.cct
, 20) << __func__
<< "() skipping obj " << oc
.obj
<< " as tags do not match" << dendl
;
643 class LCOpFilter_Tags
: public LCOpFilter
{
645 bool check(lc_op_ctx
& oc
) override
{
648 if (o
.is_delete_marker()) {
654 int ret
= check_tags(oc
, &skip
);
656 if (ret
== -ENOENT
) {
659 ldout(oc
.cct
, 0) << "ERROR: check_tags on obj=" << oc
.obj
<< " returned ret=" << ret
<< dendl
;
667 class LCOpAction_CurrentExpiration
: public LCOpAction
{
669 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
) override
{
671 if (!o
.is_current()) {
672 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": not current, skipping" << dendl
;
676 auto& mtime
= o
.meta
.mtime
;
679 if (op
.expiration
<= 0) {
680 if (op
.expiration_date
== boost::none
) {
681 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": no expiration set in rule, skipping" << dendl
;
684 is_expired
= ceph_clock_now() >= ceph::real_clock::to_time_t(*op
.expiration_date
);
685 *exp_time
= *op
.expiration_date
;
687 is_expired
= obj_has_expired(oc
.cct
, mtime
, op
.expiration
, exp_time
);
690 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired=" << (int)is_expired
<< dendl
;
694 int process(lc_op_ctx
& oc
) {
696 int r
= remove_expired_obj(oc
, !oc
.bucket_info
.versioned());
698 ldout(oc
.cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
701 ldout(oc
.cct
, 2) << "DELETED:" << oc
.bucket_info
.bucket
<< ":" << o
.key
<< dendl
;
706 class LCOpAction_NonCurrentExpiration
: public LCOpAction
{
708 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
) override
{
710 if (o
.is_current()) {
711 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": current version, skipping" << dendl
;
715 auto mtime
= oc
.ol
.get_prev_obj().meta
.mtime
;
716 int expiration
= oc
.op
.noncur_expiration
;
717 bool is_expired
= obj_has_expired(oc
.cct
, mtime
, expiration
, exp_time
);
719 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired=" << is_expired
<< dendl
;
723 int process(lc_op_ctx
& oc
) {
725 int r
= remove_expired_obj(oc
, true);
727 ldout(oc
.cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
730 ldout(oc
.cct
, 2) << "DELETED:" << oc
.bucket_info
.bucket
<< ":" << o
.key
<< " (non-current expiration)" << dendl
;
735 class LCOpAction_DMExpiration
: public LCOpAction
{
737 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
) override
{
739 if (!o
.is_delete_marker()) {
740 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": not a delete marker, skipping" << dendl
;
744 if (oc
.ol
.next_has_same_name()) {
745 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": next is same object, skipping" << dendl
;
749 *exp_time
= real_clock::now();
754 int process(lc_op_ctx
& oc
) {
756 int r
= remove_expired_obj(oc
, true);
758 ldout(oc
.cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
761 ldout(oc
.cct
, 2) << "DELETED:" << oc
.bucket_info
.bucket
<< ":" << o
.key
<< " (delete marker expiration)" << dendl
;
766 class LCOpAction_Transition
: public LCOpAction
{
767 const transition_action
& transition
;
768 bool need_to_process
{false};
771 virtual bool check_current_state(bool is_current
) = 0;
772 virtual ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) = 0;
774 LCOpAction_Transition(const transition_action
& _transition
) : transition(_transition
) {}
776 bool check(lc_op_ctx
& oc
, ceph::real_time
*exp_time
) override
{
779 if (o
.is_delete_marker()) {
783 if (!check_current_state(o
.is_current())) {
787 auto mtime
= get_effective_mtime(oc
);
789 if (transition
.days
<= 0) {
790 if (transition
.date
== boost::none
) {
791 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": no transition day/date set in rule, skipping" << dendl
;
794 is_expired
= ceph_clock_now() >= ceph::real_clock::to_time_t(*transition
.date
);
795 *exp_time
= *transition
.date
;
797 is_expired
= obj_has_expired(oc
.cct
, mtime
, transition
.days
, exp_time
);
800 ldout(oc
.cct
, 20) << __func__
<< "(): key=" << o
.key
<< ": is_expired=" << is_expired
<< dendl
;
802 need_to_process
= (rgw_placement_rule::get_canonical_storage_class(o
.meta
.storage_class
) != transition
.storage_class
);
807 bool should_process() override
{
808 return need_to_process
;
811 int process(lc_op_ctx
& oc
) {
814 rgw_placement_rule target_placement
;
815 target_placement
.inherit_from(oc
.bucket_info
.placement_rule
);
816 target_placement
.storage_class
= transition
.storage_class
;
818 int r
= oc
.store
->transition_obj(oc
.rctx
, oc
.bucket_info
, oc
.obj
,
819 target_placement
, o
.meta
.mtime
, o
.versioned_epoch
);
821 ldout(oc
.cct
, 0) << "ERROR: failed to transition obj (r=" << r
<< ")" << dendl
;
824 ldout(oc
.cct
, 2) << "TRANSITIONED:" << oc
.bucket_info
.bucket
<< ":" << o
.key
<< " -> " << transition
.storage_class
<< dendl
;
829 class LCOpAction_CurrentTransition
: public LCOpAction_Transition
{
831 bool check_current_state(bool is_current
) override
{
835 ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) override
{
836 return oc
.o
.meta
.mtime
;
839 LCOpAction_CurrentTransition(const transition_action
& _transition
) : LCOpAction_Transition(_transition
) {}
842 class LCOpAction_NonCurrentTransition
: public LCOpAction_Transition
{
844 bool check_current_state(bool is_current
) override
{
848 ceph::real_time
get_effective_mtime(lc_op_ctx
& oc
) override
{
849 return oc
.ol
.get_prev_obj().meta
.mtime
;
852 LCOpAction_NonCurrentTransition(const transition_action
& _transition
) : LCOpAction_Transition(_transition
) {}
855 void LCOpRule::build()
857 filters
.emplace_back(new LCOpFilter_Tags
);
861 if (op
.expiration
> 0 ||
862 op
.expiration_date
!= boost::none
) {
863 actions
.emplace_back(new LCOpAction_CurrentExpiration
);
866 if (op
.dm_expiration
) {
867 actions
.emplace_back(new LCOpAction_DMExpiration
);
870 if (op
.noncur_expiration
> 0) {
871 actions
.emplace_back(new LCOpAction_NonCurrentExpiration
);
874 for (auto& iter
: op
.transitions
) {
875 actions
.emplace_back(new LCOpAction_CurrentTransition(iter
.second
));
878 for (auto& iter
: op
.noncur_transitions
) {
879 actions
.emplace_back(new LCOpAction_NonCurrentTransition(iter
.second
));
883 int LCOpRule::process(rgw_bucket_dir_entry
& o
)
885 lc_op_ctx
ctx(env
, o
);
887 unique_ptr
<LCOpAction
> *selected
= nullptr;
890 for (auto& a
: actions
) {
891 real_time action_exp
;
893 if (a
->check(ctx
, &action_exp
)) {
894 if (action_exp
> exp
) {
902 (*selected
)->should_process()) {
905 * Calling filter checks after action checks because
906 * all action checks (as they are implemented now) do
907 * not access the objects themselves, but return result
908 * from info from bucket index listing. The current tags filter
909 * check does access the objects, so we avoid unnecessary rados calls
910 * having filters check later in the process.
914 for (auto& f
: filters
) {
922 ldout(env
.store
->ctx(), 20) << __func__
<< "(): key=" << o
.key
<< ": no rule match, skipping" << dendl
;
926 int r
= (*selected
)->process(ctx
);
928 ldout(ctx
.cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
931 ldout(ctx
.cct
, 20) << "processed:" << env
.bucket_info
.bucket
<< ":" << o
.key
<< dendl
;
938 int RGWLC::bucket_lc_process(string
& shard_id
)
940 RGWLifecycleConfiguration
config(cct
);
941 RGWBucketInfo bucket_info
;
942 map
<string
, bufferlist
> bucket_attrs
;
943 string no_ns
, list_versions
;
944 vector
<rgw_bucket_dir_entry
> objs
;
945 auto obj_ctx
= store
->svc
.sysobj
->init_obj_ctx();
946 vector
<std::string
> result
;
947 boost::split(result
, shard_id
, boost::is_any_of(":"));
948 string bucket_tenant
= result
[0];
949 string bucket_name
= result
[1];
950 string bucket_marker
= result
[2];
951 int ret
= store
->get_bucket_info(obj_ctx
, bucket_tenant
, bucket_name
, bucket_info
, NULL
, &bucket_attrs
);
953 ldpp_dout(this, 0) << "LC:get_bucket_info for " << bucket_name
<< " failed" << dendl
;
957 if (bucket_info
.bucket
.marker
!= bucket_marker
) {
958 ldpp_dout(this, 1) << "LC: deleting stale entry found for bucket=" << bucket_tenant
959 << ":" << bucket_name
<< " cur_marker=" << bucket_info
.bucket
.marker
960 << " orig_marker=" << bucket_marker
<< dendl
;
964 RGWRados::Bucket
target(store
, bucket_info
);
966 map
<string
, bufferlist
>::iterator aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
967 if (aiter
== bucket_attrs
.end())
970 bufferlist::const_iterator iter
{&aiter
->second
};
973 } catch (const buffer::error
& e
) {
974 ldpp_dout(this, 0) << __func__
<< "() decode life cycle config failed" << dendl
;
978 multimap
<string
, lc_op
>& prefix_map
= config
.get_prefix_map();
980 ldpp_dout(this, 10) << __func__
<< "() prefix_map size="
984 rgw_obj_key pre_marker
;
985 rgw_obj_key next_marker
;
986 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
987 auto& op
= prefix_iter
->second
;
988 if (!is_valid_op(op
)) {
991 ldpp_dout(this, 20) << __func__
<< "(): prefix=" << prefix_iter
->first
<< dendl
;
992 if (prefix_iter
!= prefix_map
.begin() &&
993 (prefix_iter
->first
.compare(0, prev(prefix_iter
)->first
.length(), prev(prefix_iter
)->first
) == 0)) {
994 next_marker
= pre_marker
;
996 pre_marker
= next_marker
;
999 LCObjsLister
ol(store
, bucket_info
);
1000 ol
.set_prefix(prefix_iter
->first
);
1005 if (ret
== (-ENOENT
))
1007 ldpp_dout(this, 0) << "ERROR: store->list_objects():" <<dendl
;
1011 op_env
oenv(op
, store
, this, bucket_info
, ol
);
1013 LCOpRule
orule(oenv
);
1017 ceph::real_time mtime
;
1018 rgw_bucket_dir_entry o
;
1019 for (; ol
.get_obj(&o
); ol
.next()) {
1020 ldpp_dout(this, 20) << __func__
<< "(): key=" << o
.key
<< dendl
;
1021 int ret
= orule
.process(o
);
1023 ldpp_dout(this, 20) << "ERROR: orule.process() returned ret="
1034 ret
= handle_multipart_expiration(&target
, prefix_map
);
1039 int RGWLC::bucket_lc_post(int index
, int max_lock_sec
, pair
<string
, int >& entry
, int& result
)
1041 utime_t
lock_duration(cct
->_conf
->rgw_lc_lock_max_time
, 0);
1043 rados::cls::lock::Lock
l(lc_index_lock_name
);
1044 l
.set_cookie(cookie
);
1045 l
.set_duration(lock_duration
);
1048 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
1049 if (ret
== -EBUSY
|| ret
== -EEXIST
) { /* already locked by another lc processor */
1050 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to acquire lock on "
1051 << obj_names
[index
] << ", sleep 5, try again" << dendl
;
1057 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() lock " << obj_names
[index
] << dendl
;
1058 if (result
== -ENOENT
) {
1059 ret
= cls_rgw_lc_rm_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
1061 ldpp_dout(this, 0) << "RGWLC::bucket_lc_post() failed to remove entry "
1062 << obj_names
[index
] << dendl
;
1065 } else if (result
< 0) {
1066 entry
.second
= lc_failed
;
1068 entry
.second
= lc_complete
;
1071 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
1073 ldpp_dout(this, 0) << "RGWLC::process() failed to set entry on "
1074 << obj_names
[index
] << dendl
;
1077 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
1078 ldpp_dout(this, 20) << "RGWLC::bucket_lc_post() unlock " << obj_names
[index
] << dendl
;
1083 int RGWLC::list_lc_progress(const string
& marker
, uint32_t max_entries
, map
<string
, int> *progress_map
)
1086 progress_map
->clear();
1087 for(; index
<max_objs
; index
++) {
1088 map
<string
, int > entries
;
1089 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, max_entries
, entries
);
1091 if (ret
== -ENOENT
) {
1092 ldpp_dout(this, 10) << __func__
<< "() ignoring unfound lc object="
1093 << obj_names
[index
] << dendl
;
1099 map
<string
, int>::iterator iter
;
1100 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
1101 progress_map
->insert(*iter
);
1107 int RGWLC::process()
1109 int max_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
1111 const int start
= ceph::util::generate_random_number(0, max_objs
- 1);
1113 for (int i
= 0; i
< max_objs
; i
++) {
1114 int index
= (i
+ start
) % max_objs
;
1115 int ret
= process(index
, max_secs
);
1123 int RGWLC::process(int index
, int max_lock_secs
)
1125 rados::cls::lock::Lock
l(lc_index_lock_name
);
1127 utime_t now
= ceph_clock_now();
1128 pair
<string
, int > entry
;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
1129 if (max_lock_secs
<= 0)
1132 utime_t
time(max_lock_secs
, 0);
1133 l
.set_duration(time
);
1135 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
1136 if (ret
== -EBUSY
|| ret
== -EEXIST
) { /* already locked by another lc processor */
1137 ldpp_dout(this, 0) << "RGWLC::process() failed to acquire lock on "
1138 << obj_names
[index
] << ", sleep 5, try again" << dendl
;
1145 cls_rgw_lc_obj_head head
;
1146 ret
= cls_rgw_lc_get_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
1148 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj head "
1149 << obj_names
[index
] << ", ret=" << ret
<< dendl
;
1153 if(!if_already_run_today(head
.start_date
)) {
1154 head
.start_date
= now
;
1155 head
.marker
.clear();
1156 ret
= bucket_lc_prepare(index
);
1158 ldpp_dout(this, 0) << "RGWLC::process() failed to update lc object "
1159 << obj_names
[index
] << ", ret=" << ret
<< dendl
;
1164 ret
= cls_rgw_lc_get_next_entry(store
->lc_pool_ctx
, obj_names
[index
], head
.marker
, entry
);
1166 ldpp_dout(this, 0) << "RGWLC::process() failed to get obj entry "
1167 << obj_names
[index
] << dendl
;
1171 if (entry
.first
.empty())
1174 entry
.second
= lc_processing
;
1175 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
1177 ldpp_dout(this, 0) << "RGWLC::process() failed to set obj entry " << obj_names
[index
]
1178 << " (" << entry
.first
<< "," << entry
.second
<< ")" << dendl
;
1182 head
.marker
= entry
.first
;
1183 ret
= cls_rgw_lc_put_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
1185 ldpp_dout(this, 0) << "RGWLC::process() failed to put head " << obj_names
[index
] << dendl
;
1188 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
1189 ret
= bucket_lc_process(entry
.first
);
1190 bucket_lc_post(index
, max_lock_secs
, entry
, ret
);
1194 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
1198 void RGWLC::start_processor()
1200 worker
= new LCWorker(this, cct
, this);
1201 worker
->create("lifecycle_thr");
1204 void RGWLC::stop_processor()
1216 unsigned RGWLC::get_subsys() const
1221 std::ostream
& RGWLC::gen_prefix(std::ostream
& out
) const
1223 return out
<< "lifecycle: ";
1226 void RGWLC::LCWorker::stop()
1228 Mutex::Locker
l(lock
);
1232 bool RGWLC::going_down()
1237 bool RGWLC::LCWorker::should_work(utime_t
& now
)
1243 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
1244 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
1246 time_t tt
= now
.sec();
1247 localtime_r(&tt
, &bdt
);
1249 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
1250 /* We're debugging, so say we can run */
1252 } else if ((bdt
.tm_hour
*60 + bdt
.tm_min
>= start_hour
*60 + start_minute
) &&
1253 (bdt
.tm_hour
*60 + bdt
.tm_min
<= end_hour
*60 + end_minute
)) {
1261 int RGWLC::LCWorker::schedule_next_start_time(utime_t
&start
, utime_t
& now
)
1265 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
1266 secs
= start
+ cct
->_conf
->rgw_lc_debug_interval
- now
;
1276 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
1277 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
1279 time_t tt
= now
.sec();
1281 localtime_r(&tt
, &bdt
);
1282 bdt
.tm_hour
= start_hour
;
1283 bdt
.tm_min
= start_minute
;
1288 return secs
>0 ? secs
: secs
+24*60*60;
1291 void RGWLifecycleConfiguration::generate_test_instances(list
<RGWLifecycleConfiguration
*>& o
)
1293 o
.push_back(new RGWLifecycleConfiguration
);
1296 void get_lc_oid(CephContext
*cct
, const string
& shard_id
, string
*oid
)
1298 int max_objs
= (cct
->_conf
->rgw_lc_max_objs
> HASH_PRIME
? HASH_PRIME
: cct
->_conf
->rgw_lc_max_objs
);
1299 int index
= ceph_str_hash_linux(shard_id
.c_str(), shard_id
.size()) % HASH_PRIME
% max_objs
;
1300 *oid
= lc_oid_prefix
;
1302 snprintf(buf
, 32, ".%d", index
);
1309 static std::string
get_lc_shard_name(const rgw_bucket
& bucket
){
1310 return string_join_reserve(':', bucket
.tenant
, bucket
.name
, bucket
.marker
);
1313 template<typename F
>
1314 static int guard_lc_modify(RGWRados
* store
, const rgw_bucket
& bucket
, const string
& cookie
, const F
& f
) {
1315 CephContext
*cct
= store
->ctx();
1317 string shard_id
= get_lc_shard_name(bucket
);
1320 get_lc_oid(cct
, shard_id
, &oid
);
1322 pair
<string
, int> entry(shard_id
, lc_uninitial
);
1323 int max_lock_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
1325 rados::cls::lock::Lock
l(lc_index_lock_name
);
1326 utime_t
time(max_lock_secs
, 0);
1327 l
.set_duration(time
);
1328 l
.set_cookie(cookie
);
1330 librados::IoCtx
*ctx
= store
->get_lc_pool_ctx();
1334 ret
= l
.lock_exclusive(ctx
, oid
);
1335 if (ret
== -EBUSY
|| ret
== -EEXIST
) {
1336 ldout(cct
, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1337 << oid
<< ", sleep 5, try again" << dendl
;
1338 sleep(5); // XXX: return retryable error
1342 ldout(cct
, 0) << "RGWLC::RGWPutLC() failed to acquire lock on "
1343 << oid
<< ", ret=" << ret
<< dendl
;
1346 ret
= f(ctx
, oid
, entry
);
1348 ldout(cct
, 0) << "RGWLC::RGWPutLC() failed to set entry on "
1349 << oid
<< ", ret=" << ret
<< dendl
;
1357 int RGWLC::set_bucket_config(RGWBucketInfo
& bucket_info
,
1358 const map
<string
, bufferlist
>& bucket_attrs
,
1359 RGWLifecycleConfiguration
*config
)
1361 map
<string
, bufferlist
> attrs
= bucket_attrs
;
1363 config
->encode(lc_bl
);
1365 attrs
[RGW_ATTR_LC
] = std::move(lc_bl
);
1367 int ret
= rgw_bucket_set_attrs(store
, bucket_info
, attrs
, &bucket_info
.objv_tracker
);
1371 rgw_bucket
& bucket
= bucket_info
.bucket
;
1374 ret
= guard_lc_modify(store
, bucket
, cookie
, [&](librados::IoCtx
*ctx
, const string
& oid
,
1375 const pair
<string
, int>& entry
) {
1376 return cls_rgw_lc_set_entry(*ctx
, oid
, entry
);
1382 int RGWLC::remove_bucket_config(RGWBucketInfo
& bucket_info
,
1383 const map
<string
, bufferlist
>& bucket_attrs
)
1385 map
<string
, bufferlist
> attrs
= bucket_attrs
;
1386 attrs
.erase(RGW_ATTR_LC
);
1387 int ret
= rgw_bucket_set_attrs(store
, bucket_info
, attrs
,
1388 &bucket_info
.objv_tracker
);
1390 rgw_bucket
& bucket
= bucket_info
.bucket
;
1393 ldout(cct
, 0) << "RGWLC::RGWDeleteLC() failed to set attrs on bucket="
1394 << bucket
.name
<< " returned err=" << ret
<< dendl
;
1399 ret
= guard_lc_modify(store
, bucket
, cookie
, [&](librados::IoCtx
*ctx
, const string
& oid
,
1400 const pair
<string
, int>& entry
) {
1401 return cls_rgw_lc_rm_entry(*ctx
, oid
, entry
);
1409 int fix_lc_shard_entry(RGWRados
* store
, const RGWBucketInfo
& bucket_info
,
1410 const map
<std::string
,bufferlist
>& battrs
)
1412 if (auto aiter
= battrs
.find(RGW_ATTR_LC
);
1413 aiter
== battrs
.end()) {
1414 return 0; // No entry, nothing to fix
1417 auto shard_name
= get_lc_shard_name(bucket_info
.bucket
);
1419 get_lc_oid(store
->ctx(), shard_name
, &lc_oid
);
1421 rgw_lc_entry_t entry
;
1422 // There are multiple cases we need to encounter here
1423 // 1. entry exists and is already set to marker, happens in plain buckets & newly resharded buckets
1424 // 2. entry doesn't exist, which usually happens when reshard has happened prior to update and next LC process has already dropped the update
1425 // 3. entry exists matching the current bucket id which was after a reshard (needs to be updated to the marker)
1426 // We are not dropping the old marker here as that would be caught by the next LC process update
1427 auto lc_pool_ctx
= store
->get_lc_pool_ctx();
1428 int ret
= cls_rgw_lc_get_entry(*lc_pool_ctx
,
1429 lc_oid
, shard_name
, entry
);
1431 ldout(store
->ctx(), 5) << "Entry already exists, nothing to do" << dendl
;
1432 return ret
; // entry is already existing correctly set to marker
1434 ldout(store
->ctx(), 5) << "cls_rgw_lc_get_entry errored ret code=" << ret
<< dendl
;
1435 if (ret
== -ENOENT
) {
1436 ldout(store
->ctx(), 1) << "No entry for bucket=" << bucket_info
.bucket
.name
1437 << " creating " << dendl
;
1438 // TODO: we have too many ppl making cookies like this!
1439 char cookie_buf
[COOKIE_LEN
+ 1];
1440 gen_rand_alphanumeric(store
->ctx(), cookie_buf
, sizeof(cookie_buf
) - 1);
1441 std::string cookie
= cookie_buf
;
1443 ret
= guard_lc_modify(store
, bucket_info
.bucket
, cookie
,
1444 [&lc_pool_ctx
, &lc_oid
](librados::IoCtx
*ctx
, const string
& oid
,
1445 const pair
<string
, int>& entry
) {
1446 return cls_rgw_lc_set_entry(*lc_pool_ctx
,