]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_lc.cc
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_lc.cc
1 #include <string.h>
2 #include <iostream>
3 #include <map>
4
5 #include <boost/algorithm/string/split.hpp>
6 #include <boost/algorithm/string.hpp>
7
8 #include "common/Formatter.h"
9 #include <common/errno.h>
10 #include "auth/Crypto.h"
11 #include "cls/rgw/cls_rgw_client.h"
12 #include "cls/lock/cls_lock_client.h"
13 #include "rgw_common.h"
14 #include "rgw_bucket.h"
15 #include "rgw_lc.h"
16
17 #define dout_context g_ceph_context
18 #define dout_subsys ceph_subsys_rgw
19
20 const char* LC_STATUS[] = {
21 "UNINITIAL",
22 "PROCESSING",
23 "FAILED",
24 "COMPLETE"
25 };
26
27 using namespace std;
28 using namespace librados;
29
30 bool LCRule::valid()
31 {
32 if (id.length() > MAX_ID_LEN) {
33 return false;
34 }
35 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
36 return false;
37 }
38 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
39 return false;
40 }
41 return true;
42 }
43
44 void RGWLifecycleConfiguration::add_rule(LCRule *rule)
45 {
46 string id;
47 rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map.insert(pair<string, LCRule>(id, *rule));
49 }
50
51 bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
52 {
53 lc_op op;
54 if (rule->get_status().compare("Enabled") == 0) {
55 op.status = true;
56 }
57 if (rule->get_expiration().has_days()) {
58 op.expiration = rule->get_expiration().get_days();
59 }
60 if (rule->get_expiration().has_date()) {
61 op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
62 }
63 if (rule->get_noncur_expiration().has_days()) {
64 op.noncur_expiration = rule->get_noncur_expiration().get_days();
65 }
66 if (rule->get_mp_expiration().has_days()) {
67 op.mp_expiration = rule->get_mp_expiration().get_days();
68 }
69 op.dm_expiration = rule->get_dm_expiration();
70
71 std::string prefix;
72 if (rule->get_filter().has_prefix()){
73 prefix = rule->get_filter().get_prefix();
74 } else {
75 prefix = rule->get_prefix();
76 }
77 auto ret = prefix_map.emplace(std::move(prefix), std::move(op));
78 return ret.second;
79 }
80
81 int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
82 {
83 if (!rule->valid()) {
84 return -EINVAL;
85 }
86 string id;
87 rule->get_id(id);
88 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
89 return -EINVAL;
90 }
91 rule_map.insert(pair<string, LCRule>(id, *rule));
92
93 if (!_add_rule(rule)) {
94 return -ERR_INVALID_REQUEST;
95 }
96 return 0;
97 }
98
99 bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
100 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
101 (second.expiration > 0 || second.expiration_date != boost::none)) {
102 return true;
103 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
104 return true;
105 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
106 return true;
107 } else {
108 return false;
109 }
110 }
111
112 //Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
113 //define same action.
114 bool RGWLifecycleConfiguration::valid()
115 {
116 if (prefix_map.size() < 2) {
117 return true;
118 }
119 auto cur_iter = prefix_map.begin();
120 while (cur_iter != prefix_map.end()) {
121 auto next_iter = cur_iter;
122 ++next_iter;
123 while (next_iter != prefix_map.end()) {
124 string c_pre = cur_iter->first;
125 string n_pre = next_iter->first;
126 if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
127 if (has_same_action(cur_iter->second, next_iter->second)) {
128 return false;
129 } else {
130 ++next_iter;
131 }
132 } else {
133 break;
134 }
135 }
136 ++cur_iter;
137 }
138 return true;
139 }
140
141 void *RGWLC::LCWorker::entry() {
142 do {
143 utime_t start = ceph_clock_now();
144 if (should_work(start)) {
145 dout(5) << "life cycle: start" << dendl;
146 int r = lc->process();
147 if (r < 0) {
148 dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
149 }
150 dout(5) << "life cycle: stop" << dendl;
151 }
152 if (lc->going_down())
153 break;
154
155 utime_t end = ceph_clock_now();
156 int secs = schedule_next_start_time(start, end);
157 utime_t next;
158 next.set_from_double(end + secs);
159
160 dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
161
162 lock.Lock();
163 cond.WaitInterval(lock, utime_t(secs, 0));
164 lock.Unlock();
165 } while (!lc->going_down());
166
167 return NULL;
168 }
169
170 void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
171 cct = _cct;
172 store = _store;
173 max_objs = cct->_conf->rgw_lc_max_objs;
174 if (max_objs > HASH_PRIME)
175 max_objs = HASH_PRIME;
176
177 obj_names = new string[max_objs];
178
179 for (int i = 0; i < max_objs; i++) {
180 obj_names[i] = lc_oid_prefix;
181 char buf[32];
182 snprintf(buf, 32, ".%d", i);
183 obj_names[i].append(buf);
184 }
185
186 #define COOKIE_LEN 16
187 char cookie_buf[COOKIE_LEN + 1];
188 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
189 cookie = cookie_buf;
190 }
191
192 void RGWLC::finalize()
193 {
194 delete[] obj_names;
195 }
196
197 bool RGWLC::if_already_run_today(time_t& start_date)
198 {
199 struct tm bdt;
200 time_t begin_of_day;
201 utime_t now = ceph_clock_now();
202 localtime_r(&start_date, &bdt);
203
204 if (cct->_conf->rgw_lc_debug_interval > 0) {
205 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
206 return true;
207 else
208 return false;
209 }
210
211 bdt.tm_hour = 0;
212 bdt.tm_min = 0;
213 bdt.tm_sec = 0;
214 begin_of_day = mktime(&bdt);
215 if (now - begin_of_day < 24*60*60)
216 return true;
217 else
218 return false;
219 }
220
221 int RGWLC::bucket_lc_prepare(int index)
222 {
223 map<string, int > entries;
224
225 string marker;
226
227 #define MAX_LC_LIST_ENTRIES 100
228 do {
229 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
230 if (ret < 0)
231 return ret;
232 map<string, int>::iterator iter;
233 for (iter = entries.begin(); iter != entries.end(); ++iter) {
234 pair<string, int > entry(iter->first, lc_uninitial);
235 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
236 if (ret < 0) {
237 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
238 break;
239 }
240 marker = iter->first;
241 }
242 } while (!entries.empty());
243
244 return 0;
245 }
246
247 bool RGWLC::obj_has_expired(double timediff, int days)
248 {
249 double cmp;
250 if (cct->_conf->rgw_lc_debug_interval <= 0) {
251 /* Normal case, run properly */
252 cmp = days*24*60*60;
253 } else {
254 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
255 cmp = days*cct->_conf->rgw_lc_debug_interval;
256 }
257
258 return (timediff >= cmp);
259 }
260
261 int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
262 {
263 if (remove_indeed) {
264 return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
265 } else {
266 obj_key.instance.clear();
267 RGWObjectCtx rctx(store);
268 rgw_obj obj(bucket_info.bucket, obj_key);
269 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
270 }
271 }
272
273 int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
274 {
275 MultipartMetaFilter mp_filter;
276 vector<rgw_bucket_dir_entry> objs;
277 RGWMPObj mp_obj;
278 bool is_truncated;
279 int ret;
280 RGWBucketInfo& bucket_info = target->get_bucket_info();
281 RGWRados::Bucket::List list_op(target);
282 list_op.params.list_versions = false;
283 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
284 list_op.params.filter = &mp_filter;
285 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
286 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
287 continue;
288 }
289 list_op.params.prefix = prefix_iter->first;
290 do {
291 objs.clear();
292 list_op.params.marker = list_op.get_next_marker();
293 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
294 if (ret < 0) {
295 if (ret == (-ENOENT))
296 return 0;
297 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
298 return ret;
299 }
300
301 utime_t now = ceph_clock_now();
302 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
303 if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
304 rgw_obj_key key(obj_iter->key);
305 if (!mp_obj.from_meta(key.name)) {
306 continue;
307 }
308 RGWObjectCtx rctx(store);
309 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
310 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
311 ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
312 return ret;
313 }
314 if (going_down())
315 return 0;
316 }
317 }
318 } while(is_truncated);
319 }
320 return 0;
321 }
322
323 int RGWLC::bucket_lc_process(string& shard_id)
324 {
325 RGWLifecycleConfiguration config(cct);
326 RGWBucketInfo bucket_info;
327 map<string, bufferlist> bucket_attrs;
328 string next_marker, no_ns, list_versions;
329 bool is_truncated;
330 vector<rgw_bucket_dir_entry> objs;
331 RGWObjectCtx obj_ctx(store);
332 vector<std::string> result;
333 boost::split(result, shard_id, boost::is_any_of(":"));
334 string bucket_tenant = result[0];
335 string bucket_name = result[1];
336 string bucket_id = result[2];
337 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
338 if (ret < 0) {
339 ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
340 return ret;
341 }
342
343 ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
344 if (ret !=0) {
345 ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
346 return -ENOENT;
347 }
348
349 RGWRados::Bucket target(store, bucket_info);
350 RGWRados::Bucket::List list_op(&target);
351
352 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
353 if (aiter == bucket_attrs.end())
354 return 0;
355
356 bufferlist::iterator iter(&aiter->second);
357 try {
358 config.decode(iter);
359 } catch (const buffer::error& e) {
360 ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;
361 return -1;
362 }
363
364 map<string, lc_op>& prefix_map = config.get_prefix_map();
365 list_op.params.list_versions = bucket_info.versioned();
366 if (!bucket_info.versioned()) {
367 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
368 if (!prefix_iter->second.status ||
369 (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
370 continue;
371 }
372 if (prefix_iter->second.expiration_date != boost::none &&
373 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
374 continue;
375 }
376 list_op.params.prefix = prefix_iter->first;
377 do {
378 objs.clear();
379 list_op.params.marker = list_op.get_next_marker();
380 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
381
382 if (ret < 0) {
383 if (ret == (-ENOENT))
384 return 0;
385 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
386 return ret;
387 }
388
389 utime_t now = ceph_clock_now();
390 bool is_expired;
391 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
392 rgw_obj_key key(obj_iter->key);
393
394 if (!key.ns.empty()) {
395 continue;
396 }
397 if (prefix_iter->second.expiration_date != boost::none) {
398 //we have checked it before
399 is_expired = true;
400 } else {
401 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
402 }
403 if (is_expired) {
404 RGWObjectCtx rctx(store);
405 rgw_obj obj(bucket_info.bucket, key);
406 RGWObjState *state;
407 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
408 if (ret < 0) {
409 return ret;
410 }
411 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
412 continue;
413 ret = remove_expired_obj(bucket_info, obj_iter->key, true);
414 if (ret < 0) {
415 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
416 } else {
417 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
418 }
419
420 if (going_down())
421 return 0;
422 }
423 }
424 } while (is_truncated);
425 }
426 } else {
427 //bucket versioning is enabled or suspended
428 rgw_obj_key pre_marker;
429 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
430 if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0
431 && prefix_iter->second.expiration_date == boost::none
432 && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
433 continue;
434 }
435 if (prefix_iter != prefix_map.begin() &&
436 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
437 list_op.next_marker = pre_marker;
438 } else {
439 pre_marker = list_op.get_next_marker();
440 }
441 list_op.params.prefix = prefix_iter->first;
442 rgw_bucket_dir_entry pre_obj;
443 do {
444 if (!objs.empty()) {
445 pre_obj = objs.back();
446 }
447 objs.clear();
448 list_op.params.marker = list_op.get_next_marker();
449 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
450
451 if (ret < 0) {
452 if (ret == (-ENOENT))
453 return 0;
454 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
455 return ret;
456 }
457
458 utime_t now = ceph_clock_now();
459 ceph::real_time mtime;
460 bool remove_indeed = true;
461 int expiration;
462 bool skip_expiration;
463 bool is_expired;
464 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
465 skip_expiration = false;
466 is_expired = false;
467 if (obj_iter->is_current()) {
468 if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
469 && !prefix_iter->second.dm_expiration) {
470 continue;
471 }
472 if (obj_iter->is_delete_marker()) {
473 if ((obj_iter + 1)==objs.end()) {
474 if (is_truncated) {
475 //deal with it in next round because we can't judge whether this marker is the only version
476 list_op.next_marker = obj_iter->key;
477 break;
478 }
479 } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
480 continue;
481 }
482 skip_expiration = prefix_iter->second.dm_expiration;
483 remove_indeed = true; //we should remove the delete marker if it's the only version
484 } else {
485 remove_indeed = false;
486 }
487 mtime = obj_iter->meta.mtime;
488 expiration = prefix_iter->second.expiration;
489 if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
490 continue;
491 } else if (!skip_expiration) {
492 if (expiration > 0) {
493 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
494 } else {
495 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
496 }
497 }
498 } else {
499 if (prefix_iter->second.noncur_expiration <=0) {
500 continue;
501 }
502 remove_indeed = true;
503 mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
504 expiration = prefix_iter->second.noncur_expiration;
505 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
506 }
507 if (skip_expiration || is_expired) {
508 if (obj_iter->is_visible()) {
509 RGWObjectCtx rctx(store);
510 rgw_obj obj(bucket_info.bucket, obj_iter->key);
511 RGWObjState *state;
512 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
513 if (ret < 0) {
514 return ret;
515 }
516 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
517 continue;
518 }
519 ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
520 if (ret < 0) {
521 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
522 } else {
523 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
524 }
525
526 if (going_down())
527 return 0;
528 }
529 }
530 } while (is_truncated);
531 }
532 }
533
534 ret = handle_multipart_expiration(&target, prefix_map);
535
536 return ret;
537 }
538
539 int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
540 {
541 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
542
543 rados::cls::lock::Lock l(lc_index_lock_name);
544 l.set_cookie(cookie);
545 l.set_duration(lock_duration);
546
547 do {
548 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
549 if (ret == -EBUSY) { /* already locked by another lc processor */
550 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
551 sleep(5);
552 continue;
553 }
554 if (ret < 0)
555 return 0;
556 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;
557 if (result == -ENOENT) {
558 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
559 if (ret < 0) {
560 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
561 }
562 goto clean;
563 } else if (result < 0) {
564 entry.second = lc_failed;
565 } else {
566 entry.second = lc_complete;
567 }
568
569 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
570 if (ret < 0) {
571 dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
572 }
573 clean:
574 l.unlock(&store->lc_pool_ctx, obj_names[index]);
575 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;
576 return 0;
577 } while (true);
578 }
579
580 int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
581 {
582 int index = 0;
583 progress_map->clear();
584 for(; index <max_objs; index++) {
585 map<string, int > entries;
586 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
587 if (ret < 0) {
588 if (ret == -ENOENT) {
589 dout(10) << __func__ << " ignoring unfound lc object="
590 << obj_names[index] << dendl;
591 continue;
592 } else {
593 return ret;
594 }
595 }
596 map<string, int>::iterator iter;
597 for (iter = entries.begin(); iter != entries.end(); ++iter) {
598 progress_map->insert(*iter);
599 }
600 }
601 return 0;
602 }
603
604 int RGWLC::process()
605 {
606 int max_secs = cct->_conf->rgw_lc_lock_max_time;
607
608 unsigned start;
609 int ret = get_random_bytes((char *)&start, sizeof(start));
610 if (ret < 0)
611 return ret;
612
613 for (int i = 0; i < max_objs; i++) {
614 int index = (i + start) % max_objs;
615 ret = process(index, max_secs);
616 if (ret < 0)
617 return ret;
618 }
619
620 return 0;
621 }
622
623 int RGWLC::process(int index, int max_lock_secs)
624 {
625 rados::cls::lock::Lock l(lc_index_lock_name);
626 do {
627 utime_t now = ceph_clock_now();
628 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
629 if (max_lock_secs <= 0)
630 return -EAGAIN;
631
632 utime_t time(max_lock_secs, 0);
633 l.set_duration(time);
634
635 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
636 if (ret == -EBUSY) { /* already locked by another lc processor */
637 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
638 sleep(5);
639 continue;
640 }
641 if (ret < 0)
642 return 0;
643
644 string marker;
645 cls_rgw_lc_obj_head head;
646 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
647 if (ret < 0) {
648 dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
649 goto exit;
650 }
651
652 if(!if_already_run_today(head.start_date)) {
653 head.start_date = now;
654 head.marker.clear();
655 ret = bucket_lc_prepare(index);
656 if (ret < 0) {
657 dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
658 goto exit;
659 }
660 }
661
662 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
663 if (ret < 0) {
664 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
665 goto exit;
666 }
667
668 if (entry.first.empty())
669 goto exit;
670
671 entry.second = lc_processing;
672 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
673 if (ret < 0) {
674 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
675 goto exit;
676 }
677
678 head.marker = entry.first;
679 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
680 if (ret < 0) {
681 dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
682 goto exit;
683 }
684 l.unlock(&store->lc_pool_ctx, obj_names[index]);
685 ret = bucket_lc_process(entry.first);
686 bucket_lc_post(index, max_lock_secs, entry, ret);
687 }while(1);
688
689 exit:
690 l.unlock(&store->lc_pool_ctx, obj_names[index]);
691 return 0;
692 }
693
694 void RGWLC::start_processor()
695 {
696 worker = new LCWorker(cct, this);
697 worker->create("lifecycle_thr");
698 }
699
700 void RGWLC::stop_processor()
701 {
702 down_flag = true;
703 if (worker) {
704 worker->stop();
705 worker->join();
706 }
707 delete worker;
708 worker = NULL;
709 }
710
711 void RGWLC::LCWorker::stop()
712 {
713 Mutex::Locker l(lock);
714 cond.Signal();
715 }
716
717 bool RGWLC::going_down()
718 {
719 return down_flag;
720 }
721
722 bool RGWLC::LCWorker::should_work(utime_t& now)
723 {
724 int start_hour;
725 int start_minute;
726 int end_hour;
727 int end_minute;
728 string worktime = cct->_conf->rgw_lifecycle_work_time;
729 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
730 struct tm bdt;
731 time_t tt = now.sec();
732 localtime_r(&tt, &bdt);
733
734 if (cct->_conf->rgw_lc_debug_interval > 0) {
735 /* We're debugging, so say we can run */
736 return true;
737 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
738 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
739 return true;
740 } else {
741 return false;
742 }
743
744 }
745
746 int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
747 {
748 if (cct->_conf->rgw_lc_debug_interval > 0) {
749 int secs = start + cct->_conf->rgw_lc_debug_interval - now;
750 if (secs < 0)
751 secs = 0;
752 return (secs);
753 }
754
755 int start_hour;
756 int start_minute;
757 int end_hour;
758 int end_minute;
759 string worktime = cct->_conf->rgw_lifecycle_work_time;
760 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
761 struct tm bdt;
762 time_t tt = now.sec();
763 time_t nt;
764 localtime_r(&tt, &bdt);
765 bdt.tm_hour = start_hour;
766 bdt.tm_min = start_minute;
767 bdt.tm_sec = 0;
768 nt = mktime(&bdt);
769
770 return (nt+24*60*60 - tt);
771 }
772