]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_lc.cc
update sources to v12.1.1
[ceph.git] / ceph / src / rgw / rgw_lc.cc
CommitLineData
7c673cae
FG
1#include <string.h>
2#include <iostream>
3#include <map>
4
5#include <boost/algorithm/string/split.hpp>
6#include <boost/algorithm/string.hpp>
7
8#include "common/Formatter.h"
9#include <common/errno.h>
10#include "auth/Crypto.h"
11#include "cls/rgw/cls_rgw_client.h"
12#include "cls/lock/cls_lock_client.h"
13#include "rgw_common.h"
14#include "rgw_bucket.h"
15#include "rgw_lc.h"
16
17#define dout_context g_ceph_context
18#define dout_subsys ceph_subsys_rgw
19
20const char* LC_STATUS[] = {
21 "UNINITIAL",
22 "PROCESSING",
23 "FAILED",
24 "COMPLETE"
25};
26
27using namespace std;
28using namespace librados;
29
224ce89b 30bool LCRule::valid()
7c673cae
FG
31{
32 if (id.length() > MAX_ID_LEN) {
33 return false;
34 }
31f18b77 35 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
7c673cae
FG
36 return false;
37 }
224ce89b 38 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
7c673cae
FG
39 return false;
40 }
41 return true;
42}
43
44void RGWLifecycleConfiguration::add_rule(LCRule *rule)
45{
46 string id;
47 rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map.insert(pair<string, LCRule>(id, *rule));
49}
50
51bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
52{
53 lc_op op;
54 if (rule->get_status().compare("Enabled") == 0) {
55 op.status = true;
56 }
224ce89b 57 if (rule->get_expiration().has_days()) {
7c673cae
FG
58 op.expiration = rule->get_expiration().get_days();
59 }
224ce89b
WB
60 if (rule->get_expiration().has_date()) {
61 op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
62 }
63 if (rule->get_noncur_expiration().has_days()) {
7c673cae
FG
64 op.noncur_expiration = rule->get_noncur_expiration().get_days();
65 }
224ce89b 66 if (rule->get_mp_expiration().has_days()) {
7c673cae
FG
67 op.mp_expiration = rule->get_mp_expiration().get_days();
68 }
31f18b77 69 op.dm_expiration = rule->get_dm_expiration();
7c673cae
FG
70 auto ret = prefix_map.insert(pair<string, lc_op>(rule->get_prefix(), op));
71 return ret.second;
72}
73
74int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
75{
224ce89b 76 if (!rule->valid()) {
7c673cae
FG
77 return -EINVAL;
78 }
79 string id;
80 rule->get_id(id);
81 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
82 return -EINVAL;
83 }
84 rule_map.insert(pair<string, LCRule>(id, *rule));
85
86 if (!_add_rule(rule)) {
87 return -ERR_INVALID_REQUEST;
88 }
89 return 0;
90}
91
224ce89b
WB
92bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
93 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
94 (second.expiration > 0 || second.expiration_date != boost::none)) {
95 return true;
96 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
97 return true;
98 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
99 return true;
100 } else {
101 return false;
102 }
103}
104
7c673cae
FG
105//Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
106//define same action.
224ce89b 107bool RGWLifecycleConfiguration::valid()
7c673cae
FG
108{
109 if (prefix_map.size() < 2) {
110 return true;
111 }
112 auto cur_iter = prefix_map.begin();
113 while (cur_iter != prefix_map.end()) {
114 auto next_iter = cur_iter;
115 ++next_iter;
116 while (next_iter != prefix_map.end()) {
117 string c_pre = cur_iter->first;
118 string n_pre = next_iter->first;
119 if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
224ce89b 120 if (has_same_action(cur_iter->second, next_iter->second)) {
7c673cae
FG
121 return false;
122 } else {
123 ++next_iter;
124 }
125 } else {
126 break;
127 }
128 }
129 ++cur_iter;
130 }
131 return true;
132}
133
134void *RGWLC::LCWorker::entry() {
135 do {
136 utime_t start = ceph_clock_now();
137 if (should_work(start)) {
138 dout(5) << "life cycle: start" << dendl;
139 int r = lc->process();
140 if (r < 0) {
141 dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
142 }
143 dout(5) << "life cycle: stop" << dendl;
144 }
145 if (lc->going_down())
146 break;
147
148 utime_t end = ceph_clock_now();
149 int secs = schedule_next_start_time(start, end);
150 time_t next_time = end + secs;
151 char buf[30];
152 char *nt = ctime_r(&next_time, buf);
153 dout(5) << "schedule life cycle next start time: " << nt <<dendl;
154
155 lock.Lock();
156 cond.WaitInterval(lock, utime_t(secs, 0));
157 lock.Unlock();
158 } while (!lc->going_down());
159
160 return NULL;
161}
162
163void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
164 cct = _cct;
165 store = _store;
166 max_objs = cct->_conf->rgw_lc_max_objs;
167 if (max_objs > HASH_PRIME)
168 max_objs = HASH_PRIME;
169
170 obj_names = new string[max_objs];
171
172 for (int i = 0; i < max_objs; i++) {
173 obj_names[i] = lc_oid_prefix;
174 char buf[32];
175 snprintf(buf, 32, ".%d", i);
176 obj_names[i].append(buf);
177 }
178
179#define COOKIE_LEN 16
180 char cookie_buf[COOKIE_LEN + 1];
181 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
182 cookie = cookie_buf;
183}
184
185void RGWLC::finalize()
186{
187 delete[] obj_names;
188}
189
190bool RGWLC::if_already_run_today(time_t& start_date)
191{
192 struct tm bdt;
193 time_t begin_of_day;
194 utime_t now = ceph_clock_now();
195 localtime_r(&start_date, &bdt);
196
197 if (cct->_conf->rgw_lc_debug_interval > 0) {
198 /* We're debugging, so say we can run */
199 return false;
200 }
201
202 bdt.tm_hour = 0;
203 bdt.tm_min = 0;
204 bdt.tm_sec = 0;
205 begin_of_day = mktime(&bdt);
206 if (now - begin_of_day < 24*60*60)
207 return true;
208 else
209 return false;
210}
211
212int RGWLC::bucket_lc_prepare(int index)
213{
214 map<string, int > entries;
215
216 string marker;
217
218#define MAX_LC_LIST_ENTRIES 100
219 do {
220 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
221 if (ret < 0)
222 return ret;
223 map<string, int>::iterator iter;
224 for (iter = entries.begin(); iter != entries.end(); ++iter) {
225 pair<string, int > entry(iter->first, lc_uninitial);
226 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
227 if (ret < 0) {
228 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
229 break;
230 }
231 marker = iter->first;
232 }
233 } while (!entries.empty());
234
235 return 0;
236}
237
238bool RGWLC::obj_has_expired(double timediff, int days)
239{
31f18b77
FG
240 double cmp;
241 if (cct->_conf->rgw_lc_debug_interval <= 0) {
242 /* Normal case, run properly */
243 cmp = days*24*60*60;
244 } else {
245 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
246 cmp = days*cct->_conf->rgw_lc_debug_interval;
247 }
7c673cae 248
31f18b77 249 return (timediff >= cmp);
7c673cae
FG
250}
251
252int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
253{
254 if (remove_indeed) {
255 return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
256 } else {
257 obj_key.instance.clear();
258 RGWObjectCtx rctx(store);
259 rgw_obj obj(bucket_info.bucket, obj_key);
260 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
261 }
262}
263
264int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
265{
266 MultipartMetaFilter mp_filter;
267 vector<rgw_bucket_dir_entry> objs;
268 RGWMPObj mp_obj;
269 bool is_truncated;
270 int ret;
271 RGWBucketInfo& bucket_info = target->get_bucket_info();
272 RGWRados::Bucket::List list_op(target);
273 list_op.params.list_versions = false;
274 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
275 list_op.params.filter = &mp_filter;
276 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
277 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
278 continue;
279 }
280 list_op.params.prefix = prefix_iter->first;
281 do {
282 objs.clear();
283 list_op.params.marker = list_op.get_next_marker();
284 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
285 if (ret < 0) {
286 if (ret == (-ENOENT))
287 return 0;
288 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
289 return ret;
290 }
291
292 utime_t now = ceph_clock_now();
293 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
294 if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
295 rgw_obj_key key(obj_iter->key);
296 if (!mp_obj.from_meta(key.name)) {
297 continue;
298 }
299 RGWObjectCtx rctx(store);
300 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
301 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
302 ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
303 return ret;
304 }
305 }
306 }
307 } while(is_truncated);
308 }
309 return 0;
310}
311
312int RGWLC::bucket_lc_process(string& shard_id)
313{
314 RGWLifecycleConfiguration config(cct);
315 RGWBucketInfo bucket_info;
316 map<string, bufferlist> bucket_attrs;
317 string next_marker, no_ns, list_versions;
318 bool is_truncated;
319 vector<rgw_bucket_dir_entry> objs;
320 RGWObjectCtx obj_ctx(store);
321 vector<std::string> result;
322 boost::split(result, shard_id, boost::is_any_of(":"));
323 string bucket_tenant = result[0];
324 string bucket_name = result[1];
325 string bucket_id = result[2];
326 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
327 if (ret < 0) {
328 ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
329 return ret;
330 }
331
332 ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
333 if (ret !=0) {
334 ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
335 return -ENOENT;
336 }
337
338 RGWRados::Bucket target(store, bucket_info);
339 RGWRados::Bucket::List list_op(&target);
340
341 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
342 if (aiter == bucket_attrs.end())
343 return 0;
344
345 bufferlist::iterator iter(&aiter->second);
346 try {
347 config.decode(iter);
348 } catch (const buffer::error& e) {
349 ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;
350 return -1;
351 }
352
353 map<string, lc_op>& prefix_map = config.get_prefix_map();
354 list_op.params.list_versions = bucket_info.versioned();
355 if (!bucket_info.versioned()) {
356 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
224ce89b
WB
357 if (!prefix_iter->second.status ||
358 (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
359 continue;
360 }
361 if (prefix_iter->second.expiration_date != boost::none &&
362 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
7c673cae
FG
363 continue;
364 }
365 list_op.params.prefix = prefix_iter->first;
366 do {
367 objs.clear();
368 list_op.params.marker = list_op.get_next_marker();
369 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
370
371 if (ret < 0) {
372 if (ret == (-ENOENT))
373 return 0;
374 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
375 return ret;
376 }
224ce89b 377
7c673cae 378 utime_t now = ceph_clock_now();
224ce89b 379 bool is_expired;
7c673cae
FG
380 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
381 rgw_obj_key key(obj_iter->key);
382
383 if (!key.ns.empty()) {
384 continue;
385 }
224ce89b
WB
386 if (prefix_iter->second.expiration_date != boost::none) {
387 //we have checked it before
388 is_expired = true;
389 } else {
390 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
391 }
392 if (is_expired) {
7c673cae
FG
393 RGWObjectCtx rctx(store);
394 rgw_obj obj(bucket_info.bucket, key);
395 RGWObjState *state;
396 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
397 if (ret < 0) {
398 return ret;
399 }
400 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
401 continue;
402 ret = remove_expired_obj(bucket_info, obj_iter->key, true);
403 if (ret < 0) {
404 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
405 } else {
406 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
407 }
408 }
409 }
410 } while (is_truncated);
411 }
412 } else {
413 //bucket versioning is enabled or suspended
414 rgw_obj_key pre_marker;
415 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
31f18b77 416 if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0
224ce89b 417 && prefix_iter->second.expiration_date == boost::none
31f18b77 418 && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
7c673cae
FG
419 continue;
420 }
421 if (prefix_iter != prefix_map.begin() &&
422 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
423 list_op.next_marker = pre_marker;
424 } else {
425 pre_marker = list_op.get_next_marker();
426 }
427 list_op.params.prefix = prefix_iter->first;
428 rgw_bucket_dir_entry pre_obj;
429 do {
430 if (!objs.empty()) {
431 pre_obj = objs.back();
432 }
433 objs.clear();
434 list_op.params.marker = list_op.get_next_marker();
435 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
436
437 if (ret < 0) {
438 if (ret == (-ENOENT))
439 return 0;
440 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
441 return ret;
442 }
443
444 utime_t now = ceph_clock_now();
445 ceph::real_time mtime;
446 bool remove_indeed = true;
447 int expiration;
31f18b77 448 bool skip_expiration;
224ce89b 449 bool is_expired;
7c673cae 450 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
31f18b77 451 skip_expiration = false;
224ce89b 452 is_expired = false;
7c673cae 453 if (obj_iter->is_current()) {
224ce89b
WB
454 if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
455 && !prefix_iter->second.dm_expiration) {
7c673cae
FG
456 continue;
457 }
458 if (obj_iter->is_delete_marker()) {
459 if ((obj_iter + 1)==objs.end()) {
460 if (is_truncated) {
461 //deal with it in next round because we can't judge whether this marker is the only version
462 list_op.next_marker = obj_iter->key;
463 break;
464 }
465 } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
466 continue;
467 }
31f18b77 468 skip_expiration = prefix_iter->second.dm_expiration;
7c673cae
FG
469 remove_indeed = true; //we should remove the delete marker if it's the only version
470 } else {
471 remove_indeed = false;
472 }
473 mtime = obj_iter->meta.mtime;
474 expiration = prefix_iter->second.expiration;
224ce89b 475 if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
31f18b77 476 continue;
224ce89b
WB
477 } else if (!skip_expiration) {
478 if (expiration > 0) {
479 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
480 } else {
481 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
482 }
31f18b77 483 }
7c673cae
FG
484 } else {
485 if (prefix_iter->second.noncur_expiration <=0) {
486 continue;
487 }
488 remove_indeed = true;
489 mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
490 expiration = prefix_iter->second.noncur_expiration;
224ce89b 491 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
7c673cae 492 }
224ce89b 493 if (skip_expiration || is_expired) {
7c673cae
FG
494 if (obj_iter->is_visible()) {
495 RGWObjectCtx rctx(store);
496 rgw_obj obj(bucket_info.bucket, obj_iter->key);
497 RGWObjState *state;
498 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
499 if (ret < 0) {
500 return ret;
501 }
502 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
503 continue;
504 }
505 ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
506 if (ret < 0) {
507 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
508 } else {
509 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
510 }
511 }
512 }
513 } while (is_truncated);
514 }
515 }
516
517 ret = handle_multipart_expiration(&target, prefix_map);
518
519 return ret;
520}
521
522int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
523{
524 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
525
526 rados::cls::lock::Lock l(lc_index_lock_name);
527 l.set_cookie(cookie);
528 l.set_duration(lock_duration);
529
530 do {
531 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
532 if (ret == -EBUSY) { /* already locked by another lc processor */
533 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
534 sleep(5);
535 continue;
536 }
537 if (ret < 0)
538 return 0;
539 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;
540 if (result == -ENOENT) {
541 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
542 if (ret < 0) {
543 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
7c673cae 544 }
31f18b77 545 goto clean;
7c673cae
FG
546 } else if (result < 0) {
547 entry.second = lc_failed;
548 } else {
549 entry.second = lc_complete;
550 }
551
552 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
553 if (ret < 0) {
554 dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
555 }
556clean:
557 l.unlock(&store->lc_pool_ctx, obj_names[index]);
558 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;
559 return 0;
560 } while (true);
561}
562
563int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
564{
565 int index = 0;
566 progress_map->clear();
567 for(; index <max_objs; index++) {
568 map<string, int > entries;
569 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
31f18b77
FG
570 if (ret < 0) {
571 if (ret == -ENOENT) {
572 dout(10) << __func__ << " ignoring unfound lc object="
573 << obj_names[index] << dendl;
574 continue;
575 } else {
576 return ret;
577 }
578 }
7c673cae
FG
579 map<string, int>::iterator iter;
580 for (iter = entries.begin(); iter != entries.end(); ++iter) {
581 progress_map->insert(*iter);
582 }
583 }
584 return 0;
585}
586
587int RGWLC::process()
588{
589 int max_secs = cct->_conf->rgw_lc_lock_max_time;
590
591 unsigned start;
592 int ret = get_random_bytes((char *)&start, sizeof(start));
593 if (ret < 0)
594 return ret;
595
596 for (int i = 0; i < max_objs; i++) {
597 int index = (i + start) % max_objs;
598 ret = process(index, max_secs);
599 if (ret < 0)
600 return ret;
601 }
602
603 return 0;
604}
605
606int RGWLC::process(int index, int max_lock_secs)
607{
608 rados::cls::lock::Lock l(lc_index_lock_name);
609 do {
610 utime_t now = ceph_clock_now();
611 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
612 if (max_lock_secs <= 0)
613 return -EAGAIN;
614
615 utime_t time(max_lock_secs, 0);
616 l.set_duration(time);
617
618 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
619 if (ret == -EBUSY) { /* already locked by another lc processor */
620 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
621 sleep(5);
622 continue;
623 }
624 if (ret < 0)
625 return 0;
626
627 string marker;
628 cls_rgw_lc_obj_head head;
629 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
630 if (ret < 0) {
631 dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
632 goto exit;
633 }
634
635 if(!if_already_run_today(head.start_date)) {
636 head.start_date = now;
637 head.marker.clear();
638 ret = bucket_lc_prepare(index);
639 if (ret < 0) {
640 dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
641 goto exit;
642 }
643 }
644
645 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
646 if (ret < 0) {
647 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
648 goto exit;
649 }
650
651 if (entry.first.empty())
652 goto exit;
653
654 entry.second = lc_processing;
655 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
656 if (ret < 0) {
657 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
658 goto exit;
659 }
660
661 head.marker = entry.first;
662 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
663 if (ret < 0) {
664 dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
665 goto exit;
666 }
667 l.unlock(&store->lc_pool_ctx, obj_names[index]);
668 ret = bucket_lc_process(entry.first);
669 bucket_lc_post(index, max_lock_secs, entry, ret);
670 return 0;
671exit:
672 l.unlock(&store->lc_pool_ctx, obj_names[index]);
673 return 0;
674
675 }while(1);
676
677}
678
679void RGWLC::start_processor()
680{
681 worker = new LCWorker(cct, this);
682 worker->create("lifecycle_thr");
683}
684
685void RGWLC::stop_processor()
686{
687 down_flag = true;
688 if (worker) {
689 worker->stop();
690 worker->join();
691 }
692 delete worker;
693 worker = NULL;
694}
695
696void RGWLC::LCWorker::stop()
697{
698 Mutex::Locker l(lock);
699 cond.Signal();
700}
701
702bool RGWLC::going_down()
703{
704 return down_flag;
705}
706
707bool RGWLC::LCWorker::should_work(utime_t& now)
708{
709 int start_hour;
710 int start_minute;
711 int end_hour;
712 int end_minute;
713 string worktime = cct->_conf->rgw_lifecycle_work_time;
714 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
715 struct tm bdt;
716 time_t tt = now.sec();
717 localtime_r(&tt, &bdt);
718
719 if (cct->_conf->rgw_lc_debug_interval > 0) {
720 /* We're debugging, so say we can run */
721 return true;
722 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
723 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
724 return true;
725 } else {
726 return false;
727 }
728
729}
730
731int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
732{
733 if (cct->_conf->rgw_lc_debug_interval > 0) {
734 int secs = start + cct->_conf->rgw_lc_debug_interval - now;
735 if (secs < 0)
736 secs = 0;
737 return (secs);
738 }
739
740 int start_hour;
741 int start_minute;
742 int end_hour;
743 int end_minute;
744 string worktime = cct->_conf->rgw_lifecycle_work_time;
745 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
746 struct tm bdt;
747 time_t tt = now.sec();
748 time_t nt;
749 localtime_r(&tt, &bdt);
750 bdt.tm_hour = start_hour;
751 bdt.tm_min = start_minute;
752 bdt.tm_sec = 0;
753 nt = mktime(&bdt);
754
755 return (nt+24*60*60 - tt);
756}
757