5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
20 const char* LC_STATUS
[] = {
28 using namespace librados
;
32 if (id
.length() > MAX_ID_LEN
) {
35 else if(expiration
.empty() && noncur_expiration
.empty() && mp_expiration
.empty() && !dm_expiration
) {
38 else if (!expiration
.valid() || !noncur_expiration
.valid() || !mp_expiration
.valid()) {
44 void RGWLifecycleConfiguration::add_rule(LCRule
*rule
)
47 rule
->get_id(id
); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map
.insert(pair
<string
, LCRule
>(id
, *rule
));
51 bool RGWLifecycleConfiguration::_add_rule(LCRule
*rule
)
54 if (rule
->get_status().compare("Enabled") == 0) {
57 if (rule
->get_expiration().has_days()) {
58 op
.expiration
= rule
->get_expiration().get_days();
60 if (rule
->get_expiration().has_date()) {
61 op
.expiration_date
= ceph::from_iso_8601(rule
->get_expiration().get_date());
63 if (rule
->get_noncur_expiration().has_days()) {
64 op
.noncur_expiration
= rule
->get_noncur_expiration().get_days();
66 if (rule
->get_mp_expiration().has_days()) {
67 op
.mp_expiration
= rule
->get_mp_expiration().get_days();
69 op
.dm_expiration
= rule
->get_dm_expiration();
72 if (rule
->get_filter().has_prefix()){
73 prefix
= rule
->get_filter().get_prefix();
75 prefix
= rule
->get_prefix();
77 auto ret
= prefix_map
.emplace(std::move(prefix
), std::move(op
));
81 int RGWLifecycleConfiguration::check_and_add_rule(LCRule
*rule
)
88 if (rule_map
.find(id
) != rule_map
.end()) { //id shouldn't be the same
91 rule_map
.insert(pair
<string
, LCRule
>(id
, *rule
));
93 if (!_add_rule(rule
)) {
94 return -ERR_INVALID_REQUEST
;
99 bool RGWLifecycleConfiguration::has_same_action(const lc_op
& first
, const lc_op
& second
) {
100 if ((first
.expiration
> 0 || first
.expiration_date
!= boost::none
) &&
101 (second
.expiration
> 0 || second
.expiration_date
!= boost::none
)) {
103 } else if (first
.noncur_expiration
> 0 && second
.noncur_expiration
> 0) {
105 } else if (first
.mp_expiration
> 0 && second
.mp_expiration
> 0) {
112 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
113 //define same action.
114 bool RGWLifecycleConfiguration::valid()
116 if (prefix_map
.size() < 2) {
119 auto cur_iter
= prefix_map
.begin();
120 while (cur_iter
!= prefix_map
.end()) {
121 auto next_iter
= cur_iter
;
123 while (next_iter
!= prefix_map
.end()) {
124 string c_pre
= cur_iter
->first
;
125 string n_pre
= next_iter
->first
;
126 if (n_pre
.compare(0, c_pre
.length(), c_pre
) == 0) {
127 if (has_same_action(cur_iter
->second
, next_iter
->second
)) {
141 void *RGWLC::LCWorker::entry() {
143 utime_t start
= ceph_clock_now();
144 if (should_work(start
)) {
145 dout(5) << "life cycle: start" << dendl
;
146 int r
= lc
->process();
148 dout(0) << "ERROR: do life cycle process() returned error r=" << r
<< dendl
;
150 dout(5) << "life cycle: stop" << dendl
;
152 if (lc
->going_down())
155 utime_t end
= ceph_clock_now();
156 int secs
= schedule_next_start_time(start
, end
);
158 next
.set_from_double(end
+ secs
);
160 dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next
) <<dendl
;
163 cond
.WaitInterval(lock
, utime_t(secs
, 0));
165 } while (!lc
->going_down());
170 void RGWLC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
173 max_objs
= cct
->_conf
->rgw_lc_max_objs
;
174 if (max_objs
> HASH_PRIME
)
175 max_objs
= HASH_PRIME
;
177 obj_names
= new string
[max_objs
];
179 for (int i
= 0; i
< max_objs
; i
++) {
180 obj_names
[i
] = lc_oid_prefix
;
182 snprintf(buf
, 32, ".%d", i
);
183 obj_names
[i
].append(buf
);
186 #define COOKIE_LEN 16
187 char cookie_buf
[COOKIE_LEN
+ 1];
188 gen_rand_alphanumeric(cct
, cookie_buf
, sizeof(cookie_buf
) - 1);
192 void RGWLC::finalize()
197 bool RGWLC::if_already_run_today(time_t& start_date
)
201 utime_t now
= ceph_clock_now();
202 localtime_r(&start_date
, &bdt
);
204 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
205 /* We're debugging, so say we can run */
212 begin_of_day
= mktime(&bdt
);
213 if (now
- begin_of_day
< 24*60*60)
219 int RGWLC::bucket_lc_prepare(int index
)
221 map
<string
, int > entries
;
225 #define MAX_LC_LIST_ENTRIES 100
227 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, MAX_LC_LIST_ENTRIES
, entries
);
230 map
<string
, int>::iterator iter
;
231 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
232 pair
<string
, int > entry(iter
->first
, lc_uninitial
);
233 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
235 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names
[index
] << dendl
;
238 marker
= iter
->first
;
240 } while (!entries
.empty());
245 bool RGWLC::obj_has_expired(double timediff
, int days
)
248 if (cct
->_conf
->rgw_lc_debug_interval
<= 0) {
249 /* Normal case, run properly */
252 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
253 cmp
= days
*cct
->_conf
->rgw_lc_debug_interval
;
256 return (timediff
>= cmp
);
259 int RGWLC::remove_expired_obj(RGWBucketInfo
& bucket_info
, rgw_obj_key obj_key
, bool remove_indeed
)
262 return rgw_remove_object(store
, bucket_info
, bucket_info
.bucket
, obj_key
);
264 obj_key
.instance
.clear();
265 RGWObjectCtx
rctx(store
);
266 rgw_obj
obj(bucket_info
.bucket
, obj_key
);
267 return store
->delete_obj(rctx
, bucket_info
, obj
, bucket_info
.versioning_status());
271 int RGWLC::handle_multipart_expiration(RGWRados::Bucket
*target
, const map
<string
, lc_op
>& prefix_map
)
273 MultipartMetaFilter mp_filter
;
274 vector
<rgw_bucket_dir_entry
> objs
;
278 RGWBucketInfo
& bucket_info
= target
->get_bucket_info();
279 RGWRados::Bucket::List
list_op(target
);
280 list_op
.params
.list_versions
= false;
281 list_op
.params
.ns
= RGW_OBJ_NS_MULTIPART
;
282 list_op
.params
.filter
= &mp_filter
;
283 for (auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
284 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.mp_expiration
<= 0) {
287 list_op
.params
.prefix
= prefix_iter
->first
;
290 list_op
.params
.marker
= list_op
.get_next_marker();
291 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
293 if (ret
== (-ENOENT
))
295 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
299 utime_t now
= ceph_clock_now();
300 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
301 if (obj_has_expired(now
- ceph::real_clock::to_time_t(obj_iter
->meta
.mtime
), prefix_iter
->second
.mp_expiration
)) {
302 rgw_obj_key
key(obj_iter
->key
);
303 if (!mp_obj
.from_meta(key
.name
)) {
306 RGWObjectCtx
rctx(store
);
307 ret
= abort_multipart_upload(store
, cct
, &rctx
, bucket_info
, mp_obj
);
308 if (ret
< 0 && ret
!= -ERR_NO_SUCH_UPLOAD
) {
309 ldout(cct
, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret
<<dendl
;
314 } while(is_truncated
);
319 int RGWLC::bucket_lc_process(string
& shard_id
)
321 RGWLifecycleConfiguration
config(cct
);
322 RGWBucketInfo bucket_info
;
323 map
<string
, bufferlist
> bucket_attrs
;
324 string next_marker
, no_ns
, list_versions
;
326 vector
<rgw_bucket_dir_entry
> objs
;
327 RGWObjectCtx
obj_ctx(store
);
328 vector
<std::string
> result
;
329 boost::split(result
, shard_id
, boost::is_any_of(":"));
330 string bucket_tenant
= result
[0];
331 string bucket_name
= result
[1];
332 string bucket_id
= result
[2];
333 int ret
= store
->get_bucket_info(obj_ctx
, bucket_tenant
, bucket_name
, bucket_info
, NULL
, &bucket_attrs
);
335 ldout(cct
, 0) << "LC:get_bucket_info failed" << bucket_name
<<dendl
;
339 ret
= bucket_info
.bucket
.bucket_id
.compare(bucket_id
) ;
341 ldout(cct
, 0) << "LC:old bucket id find, should be delete" << bucket_name
<<dendl
;
345 RGWRados::Bucket
target(store
, bucket_info
);
346 RGWRados::Bucket::List
list_op(&target
);
348 map
<string
, bufferlist
>::iterator aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
349 if (aiter
== bucket_attrs
.end())
352 bufferlist::iterator
iter(&aiter
->second
);
355 } catch (const buffer::error
& e
) {
356 ldout(cct
, 0) << __func__
<< "decode life cycle config failed" << dendl
;
360 map
<string
, lc_op
>& prefix_map
= config
.get_prefix_map();
361 list_op
.params
.list_versions
= bucket_info
.versioned();
362 if (!bucket_info
.versioned()) {
363 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
364 if (!prefix_iter
->second
.status
||
365 (prefix_iter
->second
.expiration
<=0 && prefix_iter
->second
.expiration_date
== boost::none
)) {
368 if (prefix_iter
->second
.expiration_date
!= boost::none
&&
369 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter
->second
.expiration_date
)) {
372 list_op
.params
.prefix
= prefix_iter
->first
;
375 list_op
.params
.marker
= list_op
.get_next_marker();
376 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
379 if (ret
== (-ENOENT
))
381 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
385 utime_t now
= ceph_clock_now();
387 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
388 rgw_obj_key
key(obj_iter
->key
);
390 if (!key
.ns
.empty()) {
393 if (prefix_iter
->second
.expiration_date
!= boost::none
) {
394 //we have checked it before
397 is_expired
= obj_has_expired(now
- ceph::real_clock::to_time_t(obj_iter
->meta
.mtime
), prefix_iter
->second
.expiration
);
400 RGWObjectCtx
rctx(store
);
401 rgw_obj
obj(bucket_info
.bucket
, key
);
403 int ret
= store
->get_obj_state(&rctx
, bucket_info
, obj
, &state
, false);
407 if (state
->mtime
!= obj_iter
->meta
.mtime
)//Check mtime again to avoid delete a recently update object as much as possible
409 ret
= remove_expired_obj(bucket_info
, obj_iter
->key
, true);
411 ldout(cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
413 ldout(cct
, 10) << "DELETED:" << bucket_name
<< ":" << key
<< dendl
;
417 } while (is_truncated
);
420 //bucket versioning is enabled or suspended
421 rgw_obj_key pre_marker
;
422 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
423 if (!prefix_iter
->second
.status
|| (prefix_iter
->second
.expiration
<= 0
424 && prefix_iter
->second
.expiration_date
== boost::none
425 && prefix_iter
->second
.noncur_expiration
<= 0 && !prefix_iter
->second
.dm_expiration
)) {
428 if (prefix_iter
!= prefix_map
.begin() &&
429 (prefix_iter
->first
.compare(0, prev(prefix_iter
)->first
.length(), prev(prefix_iter
)->first
) == 0)) {
430 list_op
.next_marker
= pre_marker
;
432 pre_marker
= list_op
.get_next_marker();
434 list_op
.params
.prefix
= prefix_iter
->first
;
435 rgw_bucket_dir_entry pre_obj
;
438 pre_obj
= objs
.back();
441 list_op
.params
.marker
= list_op
.get_next_marker();
442 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
445 if (ret
== (-ENOENT
))
447 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
451 utime_t now
= ceph_clock_now();
452 ceph::real_time mtime
;
453 bool remove_indeed
= true;
455 bool skip_expiration
;
457 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
458 skip_expiration
= false;
460 if (obj_iter
->is_current()) {
461 if (prefix_iter
->second
.expiration
<= 0 && prefix_iter
->second
.expiration_date
== boost::none
462 && !prefix_iter
->second
.dm_expiration
) {
465 if (obj_iter
->is_delete_marker()) {
466 if ((obj_iter
+ 1)==objs
.end()) {
468 //deal with it in next round because we can't judge whether this marker is the only version
469 list_op
.next_marker
= obj_iter
->key
;
472 } else if (obj_iter
->key
.name
.compare((obj_iter
+ 1)->key
.name
) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
475 skip_expiration
= prefix_iter
->second
.dm_expiration
;
476 remove_indeed
= true; //we should remove the delete marker if it's the only version
478 remove_indeed
= false;
480 mtime
= obj_iter
->meta
.mtime
;
481 expiration
= prefix_iter
->second
.expiration
;
482 if (!skip_expiration
&& expiration
<= 0 && prefix_iter
->second
.expiration_date
== boost::none
) {
484 } else if (!skip_expiration
) {
485 if (expiration
> 0) {
486 is_expired
= obj_has_expired(now
- ceph::real_clock::to_time_t(mtime
), expiration
);
488 is_expired
= now
>= ceph::real_clock::to_time_t(*prefix_iter
->second
.expiration_date
);
492 if (prefix_iter
->second
.noncur_expiration
<=0) {
495 remove_indeed
= true;
496 mtime
= (obj_iter
== objs
.begin())?pre_obj
.meta
.mtime
:(obj_iter
- 1)->meta
.mtime
;
497 expiration
= prefix_iter
->second
.noncur_expiration
;
498 is_expired
= obj_has_expired(now
- ceph::real_clock::to_time_t(mtime
), expiration
);
500 if (skip_expiration
|| is_expired
) {
501 if (obj_iter
->is_visible()) {
502 RGWObjectCtx
rctx(store
);
503 rgw_obj
obj(bucket_info
.bucket
, obj_iter
->key
);
505 int ret
= store
->get_obj_state(&rctx
, bucket_info
, obj
, &state
, false);
509 if (state
->mtime
!= obj_iter
->meta
.mtime
)//Check mtime again to avoid delete a recently update object as much as possible
512 ret
= remove_expired_obj(bucket_info
, obj_iter
->key
, remove_indeed
);
514 ldout(cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
516 ldout(cct
, 10) << "DELETED:" << bucket_name
<< ":" << obj_iter
->key
<< dendl
;
520 } while (is_truncated
);
524 ret
= handle_multipart_expiration(&target
, prefix_map
);
529 int RGWLC::bucket_lc_post(int index
, int max_lock_sec
, pair
<string
, int >& entry
, int& result
)
531 utime_t
lock_duration(cct
->_conf
->rgw_lc_lock_max_time
, 0);
533 rados::cls::lock::Lock
l(lc_index_lock_name
);
534 l
.set_cookie(cookie
);
535 l
.set_duration(lock_duration
);
538 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
539 if (ret
== -EBUSY
) { /* already locked by another lc processor */
540 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names
[index
] << dendl
;
546 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names
[index
] << dendl
;
547 if (result
== -ENOENT
) {
548 ret
= cls_rgw_lc_rm_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
550 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names
[index
] << dendl
;
553 } else if (result
< 0) {
554 entry
.second
= lc_failed
;
556 entry
.second
= lc_complete
;
559 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
561 dout(0) << "RGWLC::process() failed to set entry " << obj_names
[index
] << dendl
;
564 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
565 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names
[index
] << dendl
;
570 int RGWLC::list_lc_progress(const string
& marker
, uint32_t max_entries
, map
<string
, int> *progress_map
)
573 progress_map
->clear();
574 for(; index
<max_objs
; index
++) {
575 map
<string
, int > entries
;
576 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, max_entries
, entries
);
578 if (ret
== -ENOENT
) {
579 dout(10) << __func__
<< " ignoring unfound lc object="
580 << obj_names
[index
] << dendl
;
586 map
<string
, int>::iterator iter
;
587 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
588 progress_map
->insert(*iter
);
596 int max_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
599 int ret
= get_random_bytes((char *)&start
, sizeof(start
));
603 for (int i
= 0; i
< max_objs
; i
++) {
604 int index
= (i
+ start
) % max_objs
;
605 ret
= process(index
, max_secs
);
613 int RGWLC::process(int index
, int max_lock_secs
)
615 rados::cls::lock::Lock
l(lc_index_lock_name
);
617 utime_t now
= ceph_clock_now();
618 pair
<string
, int > entry
;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
619 if (max_lock_secs
<= 0)
622 utime_t
time(max_lock_secs
, 0);
623 l
.set_duration(time
);
625 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
626 if (ret
== -EBUSY
) { /* already locked by another lc processor */
627 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names
[index
] << dendl
;
635 cls_rgw_lc_obj_head head
;
636 ret
= cls_rgw_lc_get_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
638 dout(0) << "RGWLC::process() failed to get obj head " << obj_names
[index
] << ret
<< dendl
;
642 if(!if_already_run_today(head
.start_date
)) {
643 head
.start_date
= now
;
645 ret
= bucket_lc_prepare(index
);
647 dout(0) << "RGWLC::process() failed to update lc object " << obj_names
[index
] << ret
<< dendl
;
652 ret
= cls_rgw_lc_get_next_entry(store
->lc_pool_ctx
, obj_names
[index
], head
.marker
, entry
);
654 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names
[index
] << dendl
;
658 if (entry
.first
.empty())
661 entry
.second
= lc_processing
;
662 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
664 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names
[index
] << entry
.first
<< entry
.second
<< dendl
;
668 head
.marker
= entry
.first
;
669 ret
= cls_rgw_lc_put_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
671 dout(0) << "RGWLC::process() failed to put head " << obj_names
[index
] << dendl
;
674 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
675 ret
= bucket_lc_process(entry
.first
);
676 bucket_lc_post(index
, max_lock_secs
, entry
, ret
);
679 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
686 void RGWLC::start_processor()
688 worker
= new LCWorker(cct
, this);
689 worker
->create("lifecycle_thr");
692 void RGWLC::stop_processor()
703 void RGWLC::LCWorker::stop()
705 Mutex::Locker
l(lock
);
709 bool RGWLC::going_down()
714 bool RGWLC::LCWorker::should_work(utime_t
& now
)
720 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
721 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
723 time_t tt
= now
.sec();
724 localtime_r(&tt
, &bdt
);
726 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
727 /* We're debugging, so say we can run */
729 } else if ((bdt
.tm_hour
*60 + bdt
.tm_min
>= start_hour
*60 + start_minute
) &&
730 (bdt
.tm_hour
*60 + bdt
.tm_min
<= end_hour
*60 + end_minute
)) {
738 int RGWLC::LCWorker::schedule_next_start_time(utime_t
&start
, utime_t
& now
)
740 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
741 int secs
= start
+ cct
->_conf
->rgw_lc_debug_interval
- now
;
751 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
752 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
754 time_t tt
= now
.sec();
756 localtime_r(&tt
, &bdt
);
757 bdt
.tm_hour
= start_hour
;
758 bdt
.tm_min
= start_minute
;
762 return (nt
+24*60*60 - tt
);