5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
20 const char* LC_STATUS
[] = {
28 using namespace librados
;
32 if (id
.length() > MAX_ID_LEN
) {
35 else if(expiration
.empty() && noncur_expiration
.empty() && mp_expiration
.empty() && !dm_expiration
) {
38 else if (!expiration
.valid() || !noncur_expiration
.valid() || !mp_expiration
.valid()) {
44 void RGWLifecycleConfiguration::add_rule(LCRule
*rule
)
47 rule
->get_id(id
); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map
.insert(pair
<string
, LCRule
>(id
, *rule
));
51 bool RGWLifecycleConfiguration::_add_rule(LCRule
*rule
)
54 if (rule
->get_status().compare("Enabled") == 0) {
57 if (rule
->get_expiration().has_days()) {
58 op
.expiration
= rule
->get_expiration().get_days();
60 if (rule
->get_expiration().has_date()) {
61 op
.expiration_date
= ceph::from_iso_8601(rule
->get_expiration().get_date());
63 if (rule
->get_noncur_expiration().has_days()) {
64 op
.noncur_expiration
= rule
->get_noncur_expiration().get_days();
66 if (rule
->get_mp_expiration().has_days()) {
67 op
.mp_expiration
= rule
->get_mp_expiration().get_days();
69 op
.dm_expiration
= rule
->get_dm_expiration();
70 auto ret
= prefix_map
.insert(pair
<string
, lc_op
>(rule
->get_prefix(), op
));
74 int RGWLifecycleConfiguration::check_and_add_rule(LCRule
*rule
)
81 if (rule_map
.find(id
) != rule_map
.end()) { //id shouldn't be the same
84 rule_map
.insert(pair
<string
, LCRule
>(id
, *rule
));
86 if (!_add_rule(rule
)) {
87 return -ERR_INVALID_REQUEST
;
92 bool RGWLifecycleConfiguration::has_same_action(const lc_op
& first
, const lc_op
& second
) {
93 if ((first
.expiration
> 0 || first
.expiration_date
!= boost::none
) &&
94 (second
.expiration
> 0 || second
.expiration_date
!= boost::none
)) {
96 } else if (first
.noncur_expiration
> 0 && second
.noncur_expiration
> 0) {
98 } else if (first
.mp_expiration
> 0 && second
.mp_expiration
> 0) {
105 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
106 //define same action.
107 bool RGWLifecycleConfiguration::valid()
109 if (prefix_map
.size() < 2) {
112 auto cur_iter
= prefix_map
.begin();
113 while (cur_iter
!= prefix_map
.end()) {
114 auto next_iter
= cur_iter
;
116 while (next_iter
!= prefix_map
.end()) {
117 string c_pre
= cur_iter
->first
;
118 string n_pre
= next_iter
->first
;
119 if (n_pre
.compare(0, c_pre
.length(), c_pre
) == 0) {
120 if (has_same_action(cur_iter
->second
, next_iter
->second
)) {
134 void *RGWLC::LCWorker::entry() {
136 utime_t start
= ceph_clock_now();
137 if (should_work(start
)) {
138 dout(5) << "life cycle: start" << dendl
;
139 int r
= lc
->process();
141 dout(0) << "ERROR: do life cycle process() returned error r=" << r
<< dendl
;
143 dout(5) << "life cycle: stop" << dendl
;
145 if (lc
->going_down())
148 utime_t end
= ceph_clock_now();
149 int secs
= schedule_next_start_time(start
, end
);
150 time_t next_time
= end
+ secs
;
152 char *nt
= ctime_r(&next_time
, buf
);
153 dout(5) << "schedule life cycle next start time: " << nt
<<dendl
;
156 cond
.WaitInterval(lock
, utime_t(secs
, 0));
158 } while (!lc
->going_down());
163 void RGWLC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
166 max_objs
= cct
->_conf
->rgw_lc_max_objs
;
167 if (max_objs
> HASH_PRIME
)
168 max_objs
= HASH_PRIME
;
170 obj_names
= new string
[max_objs
];
172 for (int i
= 0; i
< max_objs
; i
++) {
173 obj_names
[i
] = lc_oid_prefix
;
175 snprintf(buf
, 32, ".%d", i
);
176 obj_names
[i
].append(buf
);
179 #define COOKIE_LEN 16
180 char cookie_buf
[COOKIE_LEN
+ 1];
181 gen_rand_alphanumeric(cct
, cookie_buf
, sizeof(cookie_buf
) - 1);
185 void RGWLC::finalize()
190 bool RGWLC::if_already_run_today(time_t& start_date
)
194 utime_t now
= ceph_clock_now();
195 localtime_r(&start_date
, &bdt
);
197 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
198 /* We're debugging, so say we can run */
205 begin_of_day
= mktime(&bdt
);
206 if (now
- begin_of_day
< 24*60*60)
212 int RGWLC::bucket_lc_prepare(int index
)
214 map
<string
, int > entries
;
218 #define MAX_LC_LIST_ENTRIES 100
220 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, MAX_LC_LIST_ENTRIES
, entries
);
223 map
<string
, int>::iterator iter
;
224 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
225 pair
<string
, int > entry(iter
->first
, lc_uninitial
);
226 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
228 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names
[index
] << dendl
;
231 marker
= iter
->first
;
233 } while (!entries
.empty());
238 bool RGWLC::obj_has_expired(double timediff
, int days
)
241 if (cct
->_conf
->rgw_lc_debug_interval
<= 0) {
242 /* Normal case, run properly */
245 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
246 cmp
= days
*cct
->_conf
->rgw_lc_debug_interval
;
249 return (timediff
>= cmp
);
252 int RGWLC::remove_expired_obj(RGWBucketInfo
& bucket_info
, rgw_obj_key obj_key
, bool remove_indeed
)
255 return rgw_remove_object(store
, bucket_info
, bucket_info
.bucket
, obj_key
);
257 obj_key
.instance
.clear();
258 RGWObjectCtx
rctx(store
);
259 rgw_obj
obj(bucket_info
.bucket
, obj_key
);
260 return store
->delete_obj(rctx
, bucket_info
, obj
, bucket_info
.versioning_status());
264 int RGWLC::handle_multipart_expiration(RGWRados::Bucket
*target
, const map
<string
, lc_op
>& prefix_map
)
266 MultipartMetaFilter mp_filter
;
267 vector
<rgw_bucket_dir_entry
> objs
;
271 RGWBucketInfo
& bucket_info
= target
->get_bucket_info();
272 RGWRados::Bucket::List
list_op(target
);
273 list_op
.params
.list_versions
= false;
274 list_op
.params
.ns
= RGW_OBJ_NS_MULTIPART
;
275 list_op
.params
.filter
= &mp_filter
;
276 for (auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
277 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.mp_expiration
<= 0) {
280 list_op
.params
.prefix
= prefix_iter
->first
;
283 list_op
.params
.marker
= list_op
.get_next_marker();
284 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
286 if (ret
== (-ENOENT
))
288 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
292 utime_t now
= ceph_clock_now();
293 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
294 if (obj_has_expired(now
- ceph::real_clock::to_time_t(obj_iter
->meta
.mtime
), prefix_iter
->second
.mp_expiration
)) {
295 rgw_obj_key
key(obj_iter
->key
);
296 if (!mp_obj
.from_meta(key
.name
)) {
299 RGWObjectCtx
rctx(store
);
300 ret
= abort_multipart_upload(store
, cct
, &rctx
, bucket_info
, mp_obj
);
301 if (ret
< 0 && ret
!= -ERR_NO_SUCH_UPLOAD
) {
302 ldout(cct
, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret
<<dendl
;
307 } while(is_truncated
);
312 int RGWLC::bucket_lc_process(string
& shard_id
)
314 RGWLifecycleConfiguration
config(cct
);
315 RGWBucketInfo bucket_info
;
316 map
<string
, bufferlist
> bucket_attrs
;
317 string next_marker
, no_ns
, list_versions
;
319 vector
<rgw_bucket_dir_entry
> objs
;
320 RGWObjectCtx
obj_ctx(store
);
321 vector
<std::string
> result
;
322 boost::split(result
, shard_id
, boost::is_any_of(":"));
323 string bucket_tenant
= result
[0];
324 string bucket_name
= result
[1];
325 string bucket_id
= result
[2];
326 int ret
= store
->get_bucket_info(obj_ctx
, bucket_tenant
, bucket_name
, bucket_info
, NULL
, &bucket_attrs
);
328 ldout(cct
, 0) << "LC:get_bucket_info failed" << bucket_name
<<dendl
;
332 ret
= bucket_info
.bucket
.bucket_id
.compare(bucket_id
) ;
334 ldout(cct
, 0) << "LC:old bucket id find, should be delete" << bucket_name
<<dendl
;
338 RGWRados::Bucket
target(store
, bucket_info
);
339 RGWRados::Bucket::List
list_op(&target
);
341 map
<string
, bufferlist
>::iterator aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
342 if (aiter
== bucket_attrs
.end())
345 bufferlist::iterator
iter(&aiter
->second
);
348 } catch (const buffer::error
& e
) {
349 ldout(cct
, 0) << __func__
<< "decode life cycle config failed" << dendl
;
353 map
<string
, lc_op
>& prefix_map
= config
.get_prefix_map();
354 list_op
.params
.list_versions
= bucket_info
.versioned();
355 if (!bucket_info
.versioned()) {
356 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
357 if (!prefix_iter
->second
.status
||
358 (prefix_iter
->second
.expiration
<=0 && prefix_iter
->second
.expiration_date
== boost::none
)) {
361 if (prefix_iter
->second
.expiration_date
!= boost::none
&&
362 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter
->second
.expiration_date
)) {
365 list_op
.params
.prefix
= prefix_iter
->first
;
368 list_op
.params
.marker
= list_op
.get_next_marker();
369 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
372 if (ret
== (-ENOENT
))
374 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
378 utime_t now
= ceph_clock_now();
380 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
381 rgw_obj_key
key(obj_iter
->key
);
383 if (!key
.ns
.empty()) {
386 if (prefix_iter
->second
.expiration_date
!= boost::none
) {
387 //we have checked it before
390 is_expired
= obj_has_expired(now
- ceph::real_clock::to_time_t(obj_iter
->meta
.mtime
), prefix_iter
->second
.expiration
);
393 RGWObjectCtx
rctx(store
);
394 rgw_obj
obj(bucket_info
.bucket
, key
);
396 int ret
= store
->get_obj_state(&rctx
, bucket_info
, obj
, &state
, false);
400 if (state
->mtime
!= obj_iter
->meta
.mtime
)//Check mtime again to avoid delete a recently update object as much as possible
402 ret
= remove_expired_obj(bucket_info
, obj_iter
->key
, true);
404 ldout(cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
406 ldout(cct
, 10) << "DELETED:" << bucket_name
<< ":" << key
<< dendl
;
410 } while (is_truncated
);
413 //bucket versioning is enabled or suspended
414 rgw_obj_key pre_marker
;
415 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
416 if (!prefix_iter
->second
.status
|| (prefix_iter
->second
.expiration
<= 0
417 && prefix_iter
->second
.expiration_date
== boost::none
418 && prefix_iter
->second
.noncur_expiration
<= 0 && !prefix_iter
->second
.dm_expiration
)) {
421 if (prefix_iter
!= prefix_map
.begin() &&
422 (prefix_iter
->first
.compare(0, prev(prefix_iter
)->first
.length(), prev(prefix_iter
)->first
) == 0)) {
423 list_op
.next_marker
= pre_marker
;
425 pre_marker
= list_op
.get_next_marker();
427 list_op
.params
.prefix
= prefix_iter
->first
;
428 rgw_bucket_dir_entry pre_obj
;
431 pre_obj
= objs
.back();
434 list_op
.params
.marker
= list_op
.get_next_marker();
435 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
438 if (ret
== (-ENOENT
))
440 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
444 utime_t now
= ceph_clock_now();
445 ceph::real_time mtime
;
446 bool remove_indeed
= true;
448 bool skip_expiration
;
450 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
451 skip_expiration
= false;
453 if (obj_iter
->is_current()) {
454 if (prefix_iter
->second
.expiration
<= 0 && prefix_iter
->second
.expiration_date
== boost::none
455 && !prefix_iter
->second
.dm_expiration
) {
458 if (obj_iter
->is_delete_marker()) {
459 if ((obj_iter
+ 1)==objs
.end()) {
461 //deal with it in next round because we can't judge whether this marker is the only version
462 list_op
.next_marker
= obj_iter
->key
;
465 } else if (obj_iter
->key
.name
.compare((obj_iter
+ 1)->key
.name
) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
468 skip_expiration
= prefix_iter
->second
.dm_expiration
;
469 remove_indeed
= true; //we should remove the delete marker if it's the only version
471 remove_indeed
= false;
473 mtime
= obj_iter
->meta
.mtime
;
474 expiration
= prefix_iter
->second
.expiration
;
475 if (!skip_expiration
&& expiration
<= 0 && prefix_iter
->second
.expiration_date
== boost::none
) {
477 } else if (!skip_expiration
) {
478 if (expiration
> 0) {
479 is_expired
= obj_has_expired(now
- ceph::real_clock::to_time_t(mtime
), expiration
);
481 is_expired
= now
>= ceph::real_clock::to_time_t(*prefix_iter
->second
.expiration_date
);
485 if (prefix_iter
->second
.noncur_expiration
<=0) {
488 remove_indeed
= true;
489 mtime
= (obj_iter
== objs
.begin())?pre_obj
.meta
.mtime
:(obj_iter
- 1)->meta
.mtime
;
490 expiration
= prefix_iter
->second
.noncur_expiration
;
491 is_expired
= obj_has_expired(now
- ceph::real_clock::to_time_t(mtime
), expiration
);
493 if (skip_expiration
|| is_expired
) {
494 if (obj_iter
->is_visible()) {
495 RGWObjectCtx
rctx(store
);
496 rgw_obj
obj(bucket_info
.bucket
, obj_iter
->key
);
498 int ret
= store
->get_obj_state(&rctx
, bucket_info
, obj
, &state
, false);
502 if (state
->mtime
!= obj_iter
->meta
.mtime
)//Check mtime again to avoid delete a recently update object as much as possible
505 ret
= remove_expired_obj(bucket_info
, obj_iter
->key
, remove_indeed
);
507 ldout(cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
509 ldout(cct
, 10) << "DELETED:" << bucket_name
<< ":" << obj_iter
->key
<< dendl
;
513 } while (is_truncated
);
517 ret
= handle_multipart_expiration(&target
, prefix_map
);
522 int RGWLC::bucket_lc_post(int index
, int max_lock_sec
, pair
<string
, int >& entry
, int& result
)
524 utime_t
lock_duration(cct
->_conf
->rgw_lc_lock_max_time
, 0);
526 rados::cls::lock::Lock
l(lc_index_lock_name
);
527 l
.set_cookie(cookie
);
528 l
.set_duration(lock_duration
);
531 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
532 if (ret
== -EBUSY
) { /* already locked by another lc processor */
533 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names
[index
] << dendl
;
539 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names
[index
] << dendl
;
540 if (result
== -ENOENT
) {
541 ret
= cls_rgw_lc_rm_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
543 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names
[index
] << dendl
;
546 } else if (result
< 0) {
547 entry
.second
= lc_failed
;
549 entry
.second
= lc_complete
;
552 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
554 dout(0) << "RGWLC::process() failed to set entry " << obj_names
[index
] << dendl
;
557 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
558 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names
[index
] << dendl
;
563 int RGWLC::list_lc_progress(const string
& marker
, uint32_t max_entries
, map
<string
, int> *progress_map
)
566 progress_map
->clear();
567 for(; index
<max_objs
; index
++) {
568 map
<string
, int > entries
;
569 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, max_entries
, entries
);
571 if (ret
== -ENOENT
) {
572 dout(10) << __func__
<< " ignoring unfound lc object="
573 << obj_names
[index
] << dendl
;
579 map
<string
, int>::iterator iter
;
580 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
581 progress_map
->insert(*iter
);
589 int max_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
592 int ret
= get_random_bytes((char *)&start
, sizeof(start
));
596 for (int i
= 0; i
< max_objs
; i
++) {
597 int index
= (i
+ start
) % max_objs
;
598 ret
= process(index
, max_secs
);
606 int RGWLC::process(int index
, int max_lock_secs
)
608 rados::cls::lock::Lock
l(lc_index_lock_name
);
610 utime_t now
= ceph_clock_now();
611 pair
<string
, int > entry
;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
612 if (max_lock_secs
<= 0)
615 utime_t
time(max_lock_secs
, 0);
616 l
.set_duration(time
);
618 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
619 if (ret
== -EBUSY
) { /* already locked by another lc processor */
620 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names
[index
] << dendl
;
628 cls_rgw_lc_obj_head head
;
629 ret
= cls_rgw_lc_get_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
631 dout(0) << "RGWLC::process() failed to get obj head " << obj_names
[index
] << ret
<< dendl
;
635 if(!if_already_run_today(head
.start_date
)) {
636 head
.start_date
= now
;
638 ret
= bucket_lc_prepare(index
);
640 dout(0) << "RGWLC::process() failed to update lc object " << obj_names
[index
] << ret
<< dendl
;
645 ret
= cls_rgw_lc_get_next_entry(store
->lc_pool_ctx
, obj_names
[index
], head
.marker
, entry
);
647 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names
[index
] << dendl
;
651 if (entry
.first
.empty())
654 entry
.second
= lc_processing
;
655 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
657 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names
[index
] << entry
.first
<< entry
.second
<< dendl
;
661 head
.marker
= entry
.first
;
662 ret
= cls_rgw_lc_put_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
664 dout(0) << "RGWLC::process() failed to put head " << obj_names
[index
] << dendl
;
667 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
668 ret
= bucket_lc_process(entry
.first
);
669 bucket_lc_post(index
, max_lock_secs
, entry
, ret
);
672 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
679 void RGWLC::start_processor()
681 worker
= new LCWorker(cct
, this);
682 worker
->create("lifecycle_thr");
685 void RGWLC::stop_processor()
696 void RGWLC::LCWorker::stop()
698 Mutex::Locker
l(lock
);
702 bool RGWLC::going_down()
707 bool RGWLC::LCWorker::should_work(utime_t
& now
)
713 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
714 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
716 time_t tt
= now
.sec();
717 localtime_r(&tt
, &bdt
);
719 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
720 /* We're debugging, so say we can run */
722 } else if ((bdt
.tm_hour
*60 + bdt
.tm_min
>= start_hour
*60 + start_minute
) &&
723 (bdt
.tm_hour
*60 + bdt
.tm_min
<= end_hour
*60 + end_minute
)) {
731 int RGWLC::LCWorker::schedule_next_start_time(utime_t
&start
, utime_t
& now
)
733 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
734 int secs
= start
+ cct
->_conf
->rgw_lc_debug_interval
- now
;
744 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
745 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
747 time_t tt
= now
.sec();
749 localtime_r(&tt
, &bdt
);
750 bdt
.tm_hour
= start_hour
;
751 bdt
.tm_min
= start_minute
;
755 return (nt
+24*60*60 - tt
);