]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
update sources to v12.2.1
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 #include <string.h>
2 #include <iostream>
3 #include <map>
4
5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
7
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
15 #include "rgw_lc.h"
16
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
19
20 const char* LC_STATUS[] = {
21 "UNINITIAL",
22 "PROCESSING",
23 "FAILED",
24 "COMPLETE"
25 };
26
27 using namespace std;
28 using namespace librados;
29
30 bool LCRule::valid()
31 {
32 if (id.length() > MAX_ID_LEN) {
33 return false;
34 }
35 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
36 return false;
37 }
38 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
39 return false;
40 }
41 return true;
42 }
43
44 void RGWLifecycleConfiguration::add_rule(LCRule *rule)
45 {
46 string id;
47 rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map.insert(pair<string, LCRule>(id, *rule));
49 }
50
51 bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
52 {
53 lc_op op;
54 if (rule->get_status().compare("Enabled") == 0) {
55 op.status = true;
56 }
57 if (rule->get_expiration().has_days()) {
58 op.expiration = rule->get_expiration().get_days();
59 }
60 if (rule->get_expiration().has_date()) {
61 op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
62 }
63 if (rule->get_noncur_expiration().has_days()) {
64 op.noncur_expiration = rule->get_noncur_expiration().get_days();
65 }
66 if (rule->get_mp_expiration().has_days()) {
67 op.mp_expiration = rule->get_mp_expiration().get_days();
68 }
69 op.dm_expiration = rule->get_dm_expiration();
70
71 std::string prefix;
72 if (rule->get_filter().has_prefix()){
73 prefix = rule->get_filter().get_prefix();
74 } else {
75 prefix = rule->get_prefix();
76 }
77 auto ret = prefix_map.emplace(std::move(prefix), std::move(op));
78 return ret.second;
79 }
80
81 int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
82 {
83 if (!rule->valid()) {
84 return -EINVAL;
85 }
86 string id;
87 rule->get_id(id);
88 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
89 return -EINVAL;
90 }
91 rule_map.insert(pair<string, LCRule>(id, *rule));
92
93 if (!_add_rule(rule)) {
94 return -ERR_INVALID_REQUEST;
95 }
96 return 0;
97 }
98
99 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
100 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
101 (second.expiration > 0 || second.expiration_date != boost::none)) {
102 return true;
103 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
104 return true;
105 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
106 return true;
107 } else {
108 return false;
109 }
110 }
111
112 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
113 //define same action.
114 bool RGWLifecycleConfiguration::valid()
115 {
116 if (prefix_map.size() < 2) {
117 return true;
118 }
119 auto cur_iter = prefix_map.begin();
120 while (cur_iter != prefix_map.end()) {
121 auto next_iter = cur_iter;
122 ++next_iter;
123 while (next_iter != prefix_map.end()) {
124 string c_pre = cur_iter->first;
125 string n_pre = next_iter->first;
126 if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
127 if (has_same_action(cur_iter->second, next_iter->second)) {
128 return false;
129 } else {
130 ++next_iter;
131 }
132 } else {
133 break;
134 }
135 }
136 ++cur_iter;
137 }
138 return true;
139 }
140
141 void *RGWLC::LCWorker::entry() {
142 do {
143 utime_t start = ceph_clock_now();
144 if (should_work(start)) {
145 dout(5) << "life cycle: start" << dendl;
146 int r = lc->process();
147 if (r < 0) {
148 dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
149 }
150 dout(5) << "life cycle: stop" << dendl;
151 }
152 if (lc->going_down())
153 break;
154
155 utime_t end = ceph_clock_now();
156 int secs = schedule_next_start_time(start, end);
157 utime_t next;
158 next.set_from_double(end + secs);
159
160 dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
161
162 lock.Lock();
163 cond.WaitInterval(lock, utime_t(secs, 0));
164 lock.Unlock();
165 } while (!lc->going_down());
166
167 return NULL;
168 }
169
170 void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
171 cct = _cct;
172 store = _store;
173 max_objs = cct->_conf->rgw_lc_max_objs;
174 if (max_objs > HASH_PRIME)
175 max_objs = HASH_PRIME;
176
177 obj_names = new string[max_objs];
178
179 for (int i = 0; i < max_objs; i++) {
180 obj_names[i] = lc_oid_prefix;
181 char buf[32];
182 snprintf(buf, 32, ".%d", i);
183 obj_names[i].append(buf);
184 }
185
186 #define COOKIE_LEN 16
187 char cookie_buf[COOKIE_LEN + 1];
188 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
189 cookie = cookie_buf;
190 }
191
192 void RGWLC::finalize()
193 {
194 delete[] obj_names;
195 }
196
197 bool RGWLC::if_already_run_today(time_t& start_date)
198 {
199 struct tm bdt;
200 time_t begin_of_day;
201 utime_t now = ceph_clock_now();
202 localtime_r(&start_date, &bdt);
203
204 if (cct->_conf->rgw_lc_debug_interval > 0) {
205 /* We're debugging, so say we can run */
206 return false;
207 }
208
209 bdt.tm_hour = 0;
210 bdt.tm_min = 0;
211 bdt.tm_sec = 0;
212 begin_of_day = mktime(&bdt);
213 if (now - begin_of_day < 24*60*60)
214 return true;
215 else
216 return false;
217 }
218
219 int RGWLC::bucket_lc_prepare(int index)
220 {
221 map<string, int > entries;
222
223 string marker;
224
225 #define MAX_LC_LIST_ENTRIES 100
226 do {
227 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
228 if (ret < 0)
229 return ret;
230 map<string, int>::iterator iter;
231 for (iter = entries.begin(); iter != entries.end(); ++iter) {
232 pair<string, int > entry(iter->first, lc_uninitial);
233 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
234 if (ret < 0) {
235 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
236 break;
237 }
238 marker = iter->first;
239 }
240 } while (!entries.empty());
241
242 return 0;
243 }
244
245 bool RGWLC::obj_has_expired(double timediff, int days)
246 {
247 double cmp;
248 if (cct->_conf->rgw_lc_debug_interval <= 0) {
249 /* Normal case, run properly */
250 cmp = days*24*60*60;
251 } else {
252 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
253 cmp = days*cct->_conf->rgw_lc_debug_interval;
254 }
255
256 return (timediff >= cmp);
257 }
258
259 int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
260 {
261 if (remove_indeed) {
262 return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
263 } else {
264 obj_key.instance.clear();
265 RGWObjectCtx rctx(store);
266 rgw_obj obj(bucket_info.bucket, obj_key);
267 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
268 }
269 }
270
271 int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
272 {
273 MultipartMetaFilter mp_filter;
274 vector<rgw_bucket_dir_entry> objs;
275 RGWMPObj mp_obj;
276 bool is_truncated;
277 int ret;
278 RGWBucketInfo& bucket_info = target->get_bucket_info();
279 RGWRados::Bucket::List list_op(target);
280 list_op.params.list_versions = false;
281 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
282 list_op.params.filter = &mp_filter;
283 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
284 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
285 continue;
286 }
287 list_op.params.prefix = prefix_iter->first;
288 do {
289 objs.clear();
290 list_op.params.marker = list_op.get_next_marker();
291 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
292 if (ret < 0) {
293 if (ret == (-ENOENT))
294 return 0;
295 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
296 return ret;
297 }
298
299 utime_t now = ceph_clock_now();
300 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
301 if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
302 rgw_obj_key key(obj_iter->key);
303 if (!mp_obj.from_meta(key.name)) {
304 continue;
305 }
306 RGWObjectCtx rctx(store);
307 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
308 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
309 ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
310 return ret;
311 }
312 }
313 }
314 } while(is_truncated);
315 }
316 return 0;
317 }
318
319 int RGWLC::bucket_lc_process(string& shard_id)
320 {
321 RGWLifecycleConfiguration config(cct);
322 RGWBucketInfo bucket_info;
323 map<string, bufferlist> bucket_attrs;
324 string next_marker, no_ns, list_versions;
325 bool is_truncated;
326 vector<rgw_bucket_dir_entry> objs;
327 RGWObjectCtx obj_ctx(store);
328 vector<std::string> result;
329 boost::split(result, shard_id, boost::is_any_of(":"));
330 string bucket_tenant = result[0];
331 string bucket_name = result[1];
332 string bucket_id = result[2];
333 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
334 if (ret < 0) {
335 ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
336 return ret;
337 }
338
339 ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
340 if (ret !=0) {
341 ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
342 return -ENOENT;
343 }
344
345 RGWRados::Bucket target(store, bucket_info);
346 RGWRados::Bucket::List list_op(&target);
347
348 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
349 if (aiter == bucket_attrs.end())
350 return 0;
351
352 bufferlist::iterator iter(&aiter->second);
353 try {
354 config.decode(iter);
355 } catch (const buffer::error& e) {
356 ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;
357 return -1;
358 }
359
360 map<string, lc_op>& prefix_map = config.get_prefix_map();
361 list_op.params.list_versions = bucket_info.versioned();
362 if (!bucket_info.versioned()) {
363 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
364 if (!prefix_iter->second.status ||
365 (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
366 continue;
367 }
368 if (prefix_iter->second.expiration_date != boost::none &&
369 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
370 continue;
371 }
372 list_op.params.prefix = prefix_iter->first;
373 do {
374 objs.clear();
375 list_op.params.marker = list_op.get_next_marker();
376 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
377
378 if (ret < 0) {
379 if (ret == (-ENOENT))
380 return 0;
381 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
382 return ret;
383 }
384
385 utime_t now = ceph_clock_now();
386 bool is_expired;
387 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
388 rgw_obj_key key(obj_iter->key);
389
390 if (!key.ns.empty()) {
391 continue;
392 }
393 if (prefix_iter->second.expiration_date != boost::none) {
394 //we have checked it before
395 is_expired = true;
396 } else {
397 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
398 }
399 if (is_expired) {
400 RGWObjectCtx rctx(store);
401 rgw_obj obj(bucket_info.bucket, key);
402 RGWObjState *state;
403 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
404 if (ret < 0) {
405 return ret;
406 }
407 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
408 continue;
409 ret = remove_expired_obj(bucket_info, obj_iter->key, true);
410 if (ret < 0) {
411 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
412 } else {
413 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
414 }
415 }
416 }
417 } while (is_truncated);
418 }
419 } else {
420 //bucket versioning is enabled or suspended
421 rgw_obj_key pre_marker;
422 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
423 if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0
424 && prefix_iter->second.expiration_date == boost::none
425 && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
426 continue;
427 }
428 if (prefix_iter != prefix_map.begin() &&
429 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
430 list_op.next_marker = pre_marker;
431 } else {
432 pre_marker = list_op.get_next_marker();
433 }
434 list_op.params.prefix = prefix_iter->first;
435 rgw_bucket_dir_entry pre_obj;
436 do {
437 if (!objs.empty()) {
438 pre_obj = objs.back();
439 }
440 objs.clear();
441 list_op.params.marker = list_op.get_next_marker();
442 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
443
444 if (ret < 0) {
445 if (ret == (-ENOENT))
446 return 0;
447 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
448 return ret;
449 }
450
451 utime_t now = ceph_clock_now();
452 ceph::real_time mtime;
453 bool remove_indeed = true;
454 int expiration;
455 bool skip_expiration;
456 bool is_expired;
457 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
458 skip_expiration = false;
459 is_expired = false;
460 if (obj_iter->is_current()) {
461 if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
462 && !prefix_iter->second.dm_expiration) {
463 continue;
464 }
465 if (obj_iter->is_delete_marker()) {
466 if ((obj_iter + 1)==objs.end()) {
467 if (is_truncated) {
468 //deal with it in next round because we can't judge whether this marker is the only version
469 list_op.next_marker = obj_iter->key;
470 break;
471 }
472 } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
473 continue;
474 }
475 skip_expiration = prefix_iter->second.dm_expiration;
476 remove_indeed = true; //we should remove the delete marker if it's the only version
477 } else {
478 remove_indeed = false;
479 }
480 mtime = obj_iter->meta.mtime;
481 expiration = prefix_iter->second.expiration;
482 if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
483 continue;
484 } else if (!skip_expiration) {
485 if (expiration > 0) {
486 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
487 } else {
488 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
489 }
490 }
491 } else {
492 if (prefix_iter->second.noncur_expiration <=0) {
493 continue;
494 }
495 remove_indeed = true;
496 mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
497 expiration = prefix_iter->second.noncur_expiration;
498 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
499 }
500 if (skip_expiration || is_expired) {
501 if (obj_iter->is_visible()) {
502 RGWObjectCtx rctx(store);
503 rgw_obj obj(bucket_info.bucket, obj_iter->key);
504 RGWObjState *state;
505 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
506 if (ret < 0) {
507 return ret;
508 }
509 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
510 continue;
511 }
512 ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
513 if (ret < 0) {
514 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
515 } else {
516 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
517 }
518 }
519 }
520 } while (is_truncated);
521 }
522 }
523
524 ret = handle_multipart_expiration(&target, prefix_map);
525
526 return ret;
527 }
528
529 int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
530 {
531 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
532
533 rados::cls::lock::Lock l(lc_index_lock_name);
534 l.set_cookie(cookie);
535 l.set_duration(lock_duration);
536
537 do {
538 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
539 if (ret == -EBUSY) { /* already locked by another lc processor */
540 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
541 sleep(5);
542 continue;
543 }
544 if (ret < 0)
545 return 0;
546 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;
547 if (result == -ENOENT) {
548 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
549 if (ret < 0) {
550 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
551 }
552 goto clean;
553 } else if (result < 0) {
554 entry.second = lc_failed;
555 } else {
556 entry.second = lc_complete;
557 }
558
559 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
560 if (ret < 0) {
561 dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
562 }
563 clean:
564 l.unlock(&store->lc_pool_ctx, obj_names[index]);
565 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;
566 return 0;
567 } while (true);
568 }
569
570 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
571 {
572 int index = 0;
573 progress_map->clear();
574 for(; index <max_objs; index++) {
575 map<string, int > entries;
576 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
577 if (ret < 0) {
578 if (ret == -ENOENT) {
579 dout(10) << __func__ << " ignoring unfound lc object="
580 << obj_names[index] << dendl;
581 continue;
582 } else {
583 return ret;
584 }
585 }
586 map<string, int>::iterator iter;
587 for (iter = entries.begin(); iter != entries.end(); ++iter) {
588 progress_map->insert(*iter);
589 }
590 }
591 return 0;
592 }
593
594 int RGWLC::process()
595 {
596 int max_secs = cct->_conf->rgw_lc_lock_max_time;
597
598 unsigned start;
599 int ret = get_random_bytes((char *)&start, sizeof(start));
600 if (ret < 0)
601 return ret;
602
603 for (int i = 0; i < max_objs; i++) {
604 int index = (i + start) % max_objs;
605 ret = process(index, max_secs);
606 if (ret < 0)
607 return ret;
608 }
609
610 return 0;
611 }
612
613 int RGWLC::process(int index, int max_lock_secs)
614 {
615 rados::cls::lock::Lock l(lc_index_lock_name);
616 do {
617 utime_t now = ceph_clock_now();
618 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
619 if (max_lock_secs <= 0)
620 return -EAGAIN;
621
622 utime_t time(max_lock_secs, 0);
623 l.set_duration(time);
624
625 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
626 if (ret == -EBUSY) { /* already locked by another lc processor */
627 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
628 sleep(5);
629 continue;
630 }
631 if (ret < 0)
632 return 0;
633
634 string marker;
635 cls_rgw_lc_obj_head head;
636 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
637 if (ret < 0) {
638 dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
639 goto exit;
640 }
641
642 if(!if_already_run_today(head.start_date)) {
643 head.start_date = now;
644 head.marker.clear();
645 ret = bucket_lc_prepare(index);
646 if (ret < 0) {
647 dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
648 goto exit;
649 }
650 }
651
652 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
653 if (ret < 0) {
654 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
655 goto exit;
656 }
657
658 if (entry.first.empty())
659 goto exit;
660
661 entry.second = lc_processing;
662 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
663 if (ret < 0) {
664 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
665 goto exit;
666 }
667
668 head.marker = entry.first;
669 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
670 if (ret < 0) {
671 dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
672 goto exit;
673 }
674 l.unlock(&store->lc_pool_ctx, obj_names[index]);
675 ret = bucket_lc_process(entry.first);
676 bucket_lc_post(index, max_lock_secs, entry, ret);
677 return 0;
678 exit:
679 l.unlock(&store->lc_pool_ctx, obj_names[index]);
680 return 0;
681
682 }while(1);
683
684 }
685
686 void RGWLC::start_processor()
687 {
688 worker = new LCWorker(cct, this);
689 worker->create("lifecycle_thr");
690 }
691
692 void RGWLC::stop_processor()
693 {
694 down_flag = true;
695 if (worker) {
696 worker->stop();
697 worker->join();
698 }
699 delete worker;
700 worker = NULL;
701 }
702
703 void RGWLC::LCWorker::stop()
704 {
705 Mutex::Locker l(lock);
706 cond.Signal();
707 }
708
709 bool RGWLC::going_down()
710 {
711 return down_flag;
712 }
713
714 bool RGWLC::LCWorker::should_work(utime_t& now)
715 {
716 int start_hour;
717 int start_minute;
718 int end_hour;
719 int end_minute;
720 string worktime = cct->_conf->rgw_lifecycle_work_time;
721 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
722 struct tm bdt;
723 time_t tt = now.sec();
724 localtime_r(&tt, &bdt);
725
726 if (cct->_conf->rgw_lc_debug_interval > 0) {
727 /* We're debugging, so say we can run */
728 return true;
729 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
730 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
731 return true;
732 } else {
733 return false;
734 }
735
736 }
737
738 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
739 {
740 if (cct->_conf->rgw_lc_debug_interval > 0) {
741 int secs = start + cct->_conf->rgw_lc_debug_interval - now;
742 if (secs < 0)
743 secs = 0;
744 return (secs);
745 }
746
747 int start_hour;
748 int start_minute;
749 int end_hour;
750 int end_minute;
751 string worktime = cct->_conf->rgw_lifecycle_work_time;
752 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
753 struct tm bdt;
754 time_t tt = now.sec();
755 time_t nt;
756 localtime_r(&tt, &bdt);
757 bdt.tm_hour = start_hour;
758 bdt.tm_min = start_minute;
759 bdt.tm_sec = 0;
760 nt = mktime(&bdt);
761
762 return (nt+24*60*60 - tt);
763 }
764