5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
20 const char* LC_STATUS
[] = {
28 using namespace librados
;
30 bool LCRule::validate()
32 if (id
.length() > MAX_ID_LEN
) {
35 else if(expiration
.empty() && noncur_expiration
.empty() && mp_expiration
.empty() && !dm_expiration
) {
38 else if (!expiration
.empty() && expiration
.get_days() <= 0) {
41 else if (!noncur_expiration
.empty() && noncur_expiration
.get_days() <=0) {
44 else if (!mp_expiration
.empty() && mp_expiration
.get_days() <= 0) {
50 void RGWLifecycleConfiguration::add_rule(LCRule
*rule
)
53 rule
->get_id(id
); // not that this will return false for groups, but that's ok, we won't search groups
54 rule_map
.insert(pair
<string
, LCRule
>(id
, *rule
));
57 bool RGWLifecycleConfiguration::_add_rule(LCRule
*rule
)
60 if (rule
->get_status().compare("Enabled") == 0) {
63 if (!rule
->get_expiration().empty()) {
64 op
.expiration
= rule
->get_expiration().get_days();
66 if (!rule
->get_noncur_expiration().empty()) {
67 op
.noncur_expiration
= rule
->get_noncur_expiration().get_days();
69 if (!rule
->get_mp_expiration().empty()) {
70 op
.mp_expiration
= rule
->get_mp_expiration().get_days();
72 op
.dm_expiration
= rule
->get_dm_expiration();
73 auto ret
= prefix_map
.insert(pair
<string
, lc_op
>(rule
->get_prefix(), op
));
77 int RGWLifecycleConfiguration::check_and_add_rule(LCRule
*rule
)
79 if (!rule
->validate()) {
84 if (rule_map
.find(id
) != rule_map
.end()) { //id shouldn't be the same
87 rule_map
.insert(pair
<string
, LCRule
>(id
, *rule
));
89 if (!_add_rule(rule
)) {
90 return -ERR_INVALID_REQUEST
;
95 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
97 bool RGWLifecycleConfiguration::validate()
99 if (prefix_map
.size() < 2) {
102 auto cur_iter
= prefix_map
.begin();
103 while (cur_iter
!= prefix_map
.end()) {
104 auto next_iter
= cur_iter
;
106 while (next_iter
!= prefix_map
.end()) {
107 string c_pre
= cur_iter
->first
;
108 string n_pre
= next_iter
->first
;
109 if (n_pre
.compare(0, c_pre
.length(), c_pre
) == 0) {
110 if ((cur_iter
->second
.expiration
> 0 && next_iter
->second
.expiration
> 0) ||
111 (cur_iter
->second
.noncur_expiration
> 0 && next_iter
->second
.noncur_expiration
> 0) ||
112 (cur_iter
->second
.mp_expiration
> 0 && next_iter
->second
.mp_expiration
> 0)) {
126 void *RGWLC::LCWorker::entry() {
128 utime_t start
= ceph_clock_now();
129 if (should_work(start
)) {
130 dout(5) << "life cycle: start" << dendl
;
131 int r
= lc
->process();
133 dout(0) << "ERROR: do life cycle process() returned error r=" << r
<< dendl
;
135 dout(5) << "life cycle: stop" << dendl
;
137 if (lc
->going_down())
140 utime_t end
= ceph_clock_now();
141 int secs
= schedule_next_start_time(start
, end
);
142 time_t next_time
= end
+ secs
;
144 char *nt
= ctime_r(&next_time
, buf
);
145 dout(5) << "schedule life cycle next start time: " << nt
<<dendl
;
148 cond
.WaitInterval(lock
, utime_t(secs
, 0));
150 } while (!lc
->going_down());
155 void RGWLC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
158 max_objs
= cct
->_conf
->rgw_lc_max_objs
;
159 if (max_objs
> HASH_PRIME
)
160 max_objs
= HASH_PRIME
;
162 obj_names
= new string
[max_objs
];
164 for (int i
= 0; i
< max_objs
; i
++) {
165 obj_names
[i
] = lc_oid_prefix
;
167 snprintf(buf
, 32, ".%d", i
);
168 obj_names
[i
].append(buf
);
171 #define COOKIE_LEN 16
172 char cookie_buf
[COOKIE_LEN
+ 1];
173 gen_rand_alphanumeric(cct
, cookie_buf
, sizeof(cookie_buf
) - 1);
177 void RGWLC::finalize()
182 bool RGWLC::if_already_run_today(time_t& start_date
)
186 utime_t now
= ceph_clock_now();
187 localtime_r(&start_date
, &bdt
);
189 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
190 /* We're debugging, so say we can run */
197 begin_of_day
= mktime(&bdt
);
198 if (now
- begin_of_day
< 24*60*60)
204 int RGWLC::bucket_lc_prepare(int index
)
206 map
<string
, int > entries
;
210 #define MAX_LC_LIST_ENTRIES 100
212 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, MAX_LC_LIST_ENTRIES
, entries
);
215 map
<string
, int>::iterator iter
;
216 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
217 pair
<string
, int > entry(iter
->first
, lc_uninitial
);
218 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
220 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names
[index
] << dendl
;
223 marker
= iter
->first
;
225 } while (!entries
.empty());
230 bool RGWLC::obj_has_expired(double timediff
, int days
)
233 if (cct
->_conf
->rgw_lc_debug_interval
<= 0) {
234 /* Normal case, run properly */
237 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
238 cmp
= days
*cct
->_conf
->rgw_lc_debug_interval
;
241 return (timediff
>= cmp
);
244 int RGWLC::remove_expired_obj(RGWBucketInfo
& bucket_info
, rgw_obj_key obj_key
, bool remove_indeed
)
247 return rgw_remove_object(store
, bucket_info
, bucket_info
.bucket
, obj_key
);
249 obj_key
.instance
.clear();
250 RGWObjectCtx
rctx(store
);
251 rgw_obj
obj(bucket_info
.bucket
, obj_key
);
252 return store
->delete_obj(rctx
, bucket_info
, obj
, bucket_info
.versioning_status());
256 int RGWLC::handle_multipart_expiration(RGWRados::Bucket
*target
, const map
<string
, lc_op
>& prefix_map
)
258 MultipartMetaFilter mp_filter
;
259 vector
<rgw_bucket_dir_entry
> objs
;
263 RGWBucketInfo
& bucket_info
= target
->get_bucket_info();
264 RGWRados::Bucket::List
list_op(target
);
265 list_op
.params
.list_versions
= false;
266 list_op
.params
.ns
= RGW_OBJ_NS_MULTIPART
;
267 list_op
.params
.filter
= &mp_filter
;
268 for (auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
269 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.mp_expiration
<= 0) {
272 list_op
.params
.prefix
= prefix_iter
->first
;
275 list_op
.params
.marker
= list_op
.get_next_marker();
276 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
278 if (ret
== (-ENOENT
))
280 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
284 utime_t now
= ceph_clock_now();
285 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
286 if (obj_has_expired(now
- ceph::real_clock::to_time_t(obj_iter
->meta
.mtime
), prefix_iter
->second
.mp_expiration
)) {
287 rgw_obj_key
key(obj_iter
->key
);
288 if (!mp_obj
.from_meta(key
.name
)) {
291 RGWObjectCtx
rctx(store
);
292 ret
= abort_multipart_upload(store
, cct
, &rctx
, bucket_info
, mp_obj
);
293 if (ret
< 0 && ret
!= -ERR_NO_SUCH_UPLOAD
) {
294 ldout(cct
, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret
<<dendl
;
299 } while(is_truncated
);
304 int RGWLC::bucket_lc_process(string
& shard_id
)
306 RGWLifecycleConfiguration
config(cct
);
307 RGWBucketInfo bucket_info
;
308 map
<string
, bufferlist
> bucket_attrs
;
309 string next_marker
, no_ns
, list_versions
;
311 vector
<rgw_bucket_dir_entry
> objs
;
312 RGWObjectCtx
obj_ctx(store
);
313 vector
<std::string
> result
;
314 boost::split(result
, shard_id
, boost::is_any_of(":"));
315 string bucket_tenant
= result
[0];
316 string bucket_name
= result
[1];
317 string bucket_id
= result
[2];
318 int ret
= store
->get_bucket_info(obj_ctx
, bucket_tenant
, bucket_name
, bucket_info
, NULL
, &bucket_attrs
);
320 ldout(cct
, 0) << "LC:get_bucket_info failed" << bucket_name
<<dendl
;
324 ret
= bucket_info
.bucket
.bucket_id
.compare(bucket_id
) ;
326 ldout(cct
, 0) << "LC:old bucket id find, should be delete" << bucket_name
<<dendl
;
330 RGWRados::Bucket
target(store
, bucket_info
);
331 RGWRados::Bucket::List
list_op(&target
);
333 map
<string
, bufferlist
>::iterator aiter
= bucket_attrs
.find(RGW_ATTR_LC
);
334 if (aiter
== bucket_attrs
.end())
337 bufferlist::iterator
iter(&aiter
->second
);
340 } catch (const buffer::error
& e
) {
341 ldout(cct
, 0) << __func__
<< "decode life cycle config failed" << dendl
;
345 map
<string
, lc_op
>& prefix_map
= config
.get_prefix_map();
346 list_op
.params
.list_versions
= bucket_info
.versioned();
347 if (!bucket_info
.versioned()) {
348 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
349 if (!prefix_iter
->second
.status
|| prefix_iter
->second
.expiration
<=0) {
352 list_op
.params
.prefix
= prefix_iter
->first
;
355 list_op
.params
.marker
= list_op
.get_next_marker();
356 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
359 if (ret
== (-ENOENT
))
361 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
365 utime_t now
= ceph_clock_now();
367 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
368 rgw_obj_key
key(obj_iter
->key
);
370 if (!key
.ns
.empty()) {
374 if (obj_has_expired(now
- ceph::real_clock::to_time_t(obj_iter
->meta
.mtime
), prefix_iter
->second
.expiration
)) {
375 RGWObjectCtx
rctx(store
);
376 rgw_obj
obj(bucket_info
.bucket
, key
);
378 int ret
= store
->get_obj_state(&rctx
, bucket_info
, obj
, &state
, false);
382 if (state
->mtime
!= obj_iter
->meta
.mtime
)//Check mtime again to avoid delete a recently update object as much as possible
384 ret
= remove_expired_obj(bucket_info
, obj_iter
->key
, true);
386 ldout(cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
388 ldout(cct
, 10) << "DELETED:" << bucket_name
<< ":" << key
<< dendl
;
392 } while (is_truncated
);
395 //bucket versioning is enabled or suspended
396 rgw_obj_key pre_marker
;
397 for(auto prefix_iter
= prefix_map
.begin(); prefix_iter
!= prefix_map
.end(); ++prefix_iter
) {
398 if (!prefix_iter
->second
.status
|| (prefix_iter
->second
.expiration
<= 0
399 && prefix_iter
->second
.noncur_expiration
<= 0 && !prefix_iter
->second
.dm_expiration
)) {
402 if (prefix_iter
!= prefix_map
.begin() &&
403 (prefix_iter
->first
.compare(0, prev(prefix_iter
)->first
.length(), prev(prefix_iter
)->first
) == 0)) {
404 list_op
.next_marker
= pre_marker
;
406 pre_marker
= list_op
.get_next_marker();
408 list_op
.params
.prefix
= prefix_iter
->first
;
409 rgw_bucket_dir_entry pre_obj
;
412 pre_obj
= objs
.back();
415 list_op
.params
.marker
= list_op
.get_next_marker();
416 ret
= list_op
.list_objects(1000, &objs
, NULL
, &is_truncated
);
419 if (ret
== (-ENOENT
))
421 ldout(cct
, 0) << "ERROR: store->list_objects():" <<dendl
;
425 utime_t now
= ceph_clock_now();
426 ceph::real_time mtime
;
427 bool remove_indeed
= true;
429 bool skip_expiration
;
430 for (auto obj_iter
= objs
.begin(); obj_iter
!= objs
.end(); ++obj_iter
) {
431 skip_expiration
= false;
432 if (obj_iter
->is_current()) {
433 if (prefix_iter
->second
.expiration
<= 0 && !prefix_iter
->second
.dm_expiration
) {
436 if (obj_iter
->is_delete_marker()) {
437 if ((obj_iter
+ 1)==objs
.end()) {
439 //deal with it in next round because we can't judge whether this marker is the only version
440 list_op
.next_marker
= obj_iter
->key
;
443 } else if (obj_iter
->key
.name
.compare((obj_iter
+ 1)->key
.name
) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
446 skip_expiration
= prefix_iter
->second
.dm_expiration
;
447 remove_indeed
= true; //we should remove the delete marker if it's the only version
449 remove_indeed
= false;
451 mtime
= obj_iter
->meta
.mtime
;
452 expiration
= prefix_iter
->second
.expiration
;
453 if (!skip_expiration
&& expiration
<= 0) {
457 if (prefix_iter
->second
.noncur_expiration
<=0) {
460 remove_indeed
= true;
461 mtime
= (obj_iter
== objs
.begin())?pre_obj
.meta
.mtime
:(obj_iter
- 1)->meta
.mtime
;
462 expiration
= prefix_iter
->second
.noncur_expiration
;
464 if (skip_expiration
|| obj_has_expired(now
- ceph::real_clock::to_time_t(mtime
), expiration
)) {
465 if (obj_iter
->is_visible()) {
466 RGWObjectCtx
rctx(store
);
467 rgw_obj
obj(bucket_info
.bucket
, obj_iter
->key
);
469 int ret
= store
->get_obj_state(&rctx
, bucket_info
, obj
, &state
, false);
473 if (state
->mtime
!= obj_iter
->meta
.mtime
)//Check mtime again to avoid delete a recently update object as much as possible
476 ret
= remove_expired_obj(bucket_info
, obj_iter
->key
, remove_indeed
);
478 ldout(cct
, 0) << "ERROR: remove_expired_obj " << dendl
;
480 ldout(cct
, 10) << "DELETED:" << bucket_name
<< ":" << obj_iter
->key
<< dendl
;
484 } while (is_truncated
);
488 ret
= handle_multipart_expiration(&target
, prefix_map
);
493 int RGWLC::bucket_lc_post(int index
, int max_lock_sec
, pair
<string
, int >& entry
, int& result
)
495 utime_t
lock_duration(cct
->_conf
->rgw_lc_lock_max_time
, 0);
497 rados::cls::lock::Lock
l(lc_index_lock_name
);
498 l
.set_cookie(cookie
);
499 l
.set_duration(lock_duration
);
502 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
503 if (ret
== -EBUSY
) { /* already locked by another lc processor */
504 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names
[index
] << dendl
;
510 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names
[index
] << dendl
;
511 if (result
== -ENOENT
) {
512 ret
= cls_rgw_lc_rm_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
514 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names
[index
] << dendl
;
517 } else if (result
< 0) {
518 entry
.second
= lc_failed
;
520 entry
.second
= lc_complete
;
523 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
525 dout(0) << "RGWLC::process() failed to set entry " << obj_names
[index
] << dendl
;
528 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
529 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names
[index
] << dendl
;
534 int RGWLC::list_lc_progress(const string
& marker
, uint32_t max_entries
, map
<string
, int> *progress_map
)
537 progress_map
->clear();
538 for(; index
<max_objs
; index
++) {
539 map
<string
, int > entries
;
540 int ret
= cls_rgw_lc_list(store
->lc_pool_ctx
, obj_names
[index
], marker
, max_entries
, entries
);
542 if (ret
== -ENOENT
) {
543 dout(10) << __func__
<< " ignoring unfound lc object="
544 << obj_names
[index
] << dendl
;
550 map
<string
, int>::iterator iter
;
551 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
552 progress_map
->insert(*iter
);
560 int max_secs
= cct
->_conf
->rgw_lc_lock_max_time
;
563 int ret
= get_random_bytes((char *)&start
, sizeof(start
));
567 for (int i
= 0; i
< max_objs
; i
++) {
568 int index
= (i
+ start
) % max_objs
;
569 ret
= process(index
, max_secs
);
577 int RGWLC::process(int index
, int max_lock_secs
)
579 rados::cls::lock::Lock
l(lc_index_lock_name
);
581 utime_t now
= ceph_clock_now();
582 pair
<string
, int > entry
;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
583 if (max_lock_secs
<= 0)
586 utime_t
time(max_lock_secs
, 0);
587 l
.set_duration(time
);
589 int ret
= l
.lock_exclusive(&store
->lc_pool_ctx
, obj_names
[index
]);
590 if (ret
== -EBUSY
) { /* already locked by another lc processor */
591 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names
[index
] << dendl
;
599 cls_rgw_lc_obj_head head
;
600 ret
= cls_rgw_lc_get_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
602 dout(0) << "RGWLC::process() failed to get obj head " << obj_names
[index
] << ret
<< dendl
;
606 if(!if_already_run_today(head
.start_date
)) {
607 head
.start_date
= now
;
609 ret
= bucket_lc_prepare(index
);
611 dout(0) << "RGWLC::process() failed to update lc object " << obj_names
[index
] << ret
<< dendl
;
616 ret
= cls_rgw_lc_get_next_entry(store
->lc_pool_ctx
, obj_names
[index
], head
.marker
, entry
);
618 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names
[index
] << dendl
;
622 if (entry
.first
.empty())
625 entry
.second
= lc_processing
;
626 ret
= cls_rgw_lc_set_entry(store
->lc_pool_ctx
, obj_names
[index
], entry
);
628 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names
[index
] << entry
.first
<< entry
.second
<< dendl
;
632 head
.marker
= entry
.first
;
633 ret
= cls_rgw_lc_put_head(store
->lc_pool_ctx
, obj_names
[index
], head
);
635 dout(0) << "RGWLC::process() failed to put head " << obj_names
[index
] << dendl
;
638 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
639 ret
= bucket_lc_process(entry
.first
);
640 bucket_lc_post(index
, max_lock_secs
, entry
, ret
);
643 l
.unlock(&store
->lc_pool_ctx
, obj_names
[index
]);
650 void RGWLC::start_processor()
652 worker
= new LCWorker(cct
, this);
653 worker
->create("lifecycle_thr");
656 void RGWLC::stop_processor()
667 void RGWLC::LCWorker::stop()
669 Mutex::Locker
l(lock
);
673 bool RGWLC::going_down()
678 bool RGWLC::LCWorker::should_work(utime_t
& now
)
684 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
685 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
687 time_t tt
= now
.sec();
688 localtime_r(&tt
, &bdt
);
690 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
691 /* We're debugging, so say we can run */
693 } else if ((bdt
.tm_hour
*60 + bdt
.tm_min
>= start_hour
*60 + start_minute
) &&
694 (bdt
.tm_hour
*60 + bdt
.tm_min
<= end_hour
*60 + end_minute
)) {
702 int RGWLC::LCWorker::schedule_next_start_time(utime_t
&start
, utime_t
& now
)
704 if (cct
->_conf
->rgw_lc_debug_interval
> 0) {
705 int secs
= start
+ cct
->_conf
->rgw_lc_debug_interval
- now
;
715 string worktime
= cct
->_conf
->rgw_lifecycle_work_time
;
716 sscanf(worktime
.c_str(),"%d:%d-%d:%d",&start_hour
, &start_minute
, &end_hour
, &end_minute
);
718 time_t tt
= now
.sec();
720 localtime_r(&tt
, &bdt
);
721 bdt
.tm_hour
= start_hour
;
722 bdt
.tm_min
= start_minute
;
726 return (nt
+24*60*60 - tt
);