]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_lc.cc
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_lc.cc
CommitLineData
7c673cae
FG
1#include <string.h>
2#include <iostream>
3#include <map>
4
5#include <boost/algorithm/string/split.hpp>
6#include <boost/algorithm/string.hpp>
7
8#include "common/Formatter.h"
9#include <common/errno.h>
10#include "auth/Crypto.h"
11#include "cls/rgw/cls_rgw_client.h"
12#include "cls/lock/cls_lock_client.h"
13#include "rgw_common.h"
14#include "rgw_bucket.h"
15#include "rgw_lc.h"
16
17#define dout_context g_ceph_context
18#define dout_subsys ceph_subsys_rgw
19
20const char* LC_STATUS[] = {
21 "UNINITIAL",
22 "PROCESSING",
23 "FAILED",
24 "COMPLETE"
25};
26
27using namespace std;
28using namespace librados;
29
224ce89b 30bool LCRule::valid()
7c673cae
FG
31{
32 if (id.length() > MAX_ID_LEN) {
33 return false;
34 }
31f18b77 35 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
7c673cae
FG
36 return false;
37 }
224ce89b 38 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
7c673cae
FG
39 return false;
40 }
41 return true;
42}
43
44void RGWLifecycleConfiguration::add_rule(LCRule *rule)
45{
46 string id;
47 rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
48 rule_map.insert(pair<string, LCRule>(id, *rule));
49}
50
51bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
52{
53 lc_op op;
54 if (rule->get_status().compare("Enabled") == 0) {
55 op.status = true;
56 }
224ce89b 57 if (rule->get_expiration().has_days()) {
7c673cae
FG
58 op.expiration = rule->get_expiration().get_days();
59 }
224ce89b
WB
60 if (rule->get_expiration().has_date()) {
61 op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
62 }
63 if (rule->get_noncur_expiration().has_days()) {
7c673cae
FG
64 op.noncur_expiration = rule->get_noncur_expiration().get_days();
65 }
224ce89b 66 if (rule->get_mp_expiration().has_days()) {
7c673cae
FG
67 op.mp_expiration = rule->get_mp_expiration().get_days();
68 }
31f18b77 69 op.dm_expiration = rule->get_dm_expiration();
181888fb
FG
70
71 std::string prefix;
72 if (rule->get_filter().has_prefix()){
73 prefix = rule->get_filter().get_prefix();
74 } else {
75 prefix = rule->get_prefix();
76 }
77 auto ret = prefix_map.emplace(std::move(prefix), std::move(op));
7c673cae
FG
78 return ret.second;
79}
80
81int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
82{
224ce89b 83 if (!rule->valid()) {
7c673cae
FG
84 return -EINVAL;
85 }
86 string id;
87 rule->get_id(id);
88 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
89 return -EINVAL;
90 }
91 rule_map.insert(pair<string, LCRule>(id, *rule));
92
93 if (!_add_rule(rule)) {
94 return -ERR_INVALID_REQUEST;
95 }
96 return 0;
97}
98
224ce89b
WB
99bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
100 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
101 (second.expiration > 0 || second.expiration_date != boost::none)) {
102 return true;
103 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
104 return true;
105 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
106 return true;
107 } else {
108 return false;
109 }
110}
111
7c673cae
FG
112//Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
113//define same action.
224ce89b 114bool RGWLifecycleConfiguration::valid()
7c673cae
FG
115{
116 if (prefix_map.size() < 2) {
117 return true;
118 }
119 auto cur_iter = prefix_map.begin();
120 while (cur_iter != prefix_map.end()) {
121 auto next_iter = cur_iter;
122 ++next_iter;
123 while (next_iter != prefix_map.end()) {
124 string c_pre = cur_iter->first;
125 string n_pre = next_iter->first;
126 if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
224ce89b 127 if (has_same_action(cur_iter->second, next_iter->second)) {
7c673cae
FG
128 return false;
129 } else {
130 ++next_iter;
131 }
132 } else {
133 break;
134 }
135 }
136 ++cur_iter;
137 }
138 return true;
139}
140
141void *RGWLC::LCWorker::entry() {
142 do {
143 utime_t start = ceph_clock_now();
144 if (should_work(start)) {
145 dout(5) << "life cycle: start" << dendl;
146 int r = lc->process();
147 if (r < 0) {
148 dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
149 }
150 dout(5) << "life cycle: stop" << dendl;
151 }
152 if (lc->going_down())
153 break;
154
155 utime_t end = ceph_clock_now();
156 int secs = schedule_next_start_time(start, end);
c07f9fc5
FG
157 utime_t next;
158 next.set_from_double(end + secs);
159
160 dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
7c673cae
FG
161
162 lock.Lock();
163 cond.WaitInterval(lock, utime_t(secs, 0));
164 lock.Unlock();
165 } while (!lc->going_down());
166
167 return NULL;
168}
169
170void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
171 cct = _cct;
172 store = _store;
173 max_objs = cct->_conf->rgw_lc_max_objs;
174 if (max_objs > HASH_PRIME)
175 max_objs = HASH_PRIME;
176
177 obj_names = new string[max_objs];
178
179 for (int i = 0; i < max_objs; i++) {
180 obj_names[i] = lc_oid_prefix;
181 char buf[32];
182 snprintf(buf, 32, ".%d", i);
183 obj_names[i].append(buf);
184 }
185
186#define COOKIE_LEN 16
187 char cookie_buf[COOKIE_LEN + 1];
188 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
189 cookie = cookie_buf;
190}
191
192void RGWLC::finalize()
193{
194 delete[] obj_names;
195}
196
197bool RGWLC::if_already_run_today(time_t& start_date)
198{
199 struct tm bdt;
200 time_t begin_of_day;
201 utime_t now = ceph_clock_now();
202 localtime_r(&start_date, &bdt);
203
204 if (cct->_conf->rgw_lc_debug_interval > 0) {
3efd9988
FG
205 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
206 return true;
207 else
208 return false;
7c673cae
FG
209 }
210
211 bdt.tm_hour = 0;
212 bdt.tm_min = 0;
213 bdt.tm_sec = 0;
214 begin_of_day = mktime(&bdt);
215 if (now - begin_of_day < 24*60*60)
216 return true;
217 else
218 return false;
219}
220
221int RGWLC::bucket_lc_prepare(int index)
222{
223 map<string, int > entries;
224
225 string marker;
226
227#define MAX_LC_LIST_ENTRIES 100
228 do {
229 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
230 if (ret < 0)
231 return ret;
232 map<string, int>::iterator iter;
233 for (iter = entries.begin(); iter != entries.end(); ++iter) {
234 pair<string, int > entry(iter->first, lc_uninitial);
235 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
236 if (ret < 0) {
237 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
238 break;
239 }
240 marker = iter->first;
241 }
242 } while (!entries.empty());
243
244 return 0;
245}
246
247bool RGWLC::obj_has_expired(double timediff, int days)
248{
31f18b77
FG
249 double cmp;
250 if (cct->_conf->rgw_lc_debug_interval <= 0) {
251 /* Normal case, run properly */
252 cmp = days*24*60*60;
253 } else {
254 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
255 cmp = days*cct->_conf->rgw_lc_debug_interval;
256 }
7c673cae 257
31f18b77 258 return (timediff >= cmp);
7c673cae
FG
259}
260
261int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
262{
263 if (remove_indeed) {
264 return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
265 } else {
266 obj_key.instance.clear();
267 RGWObjectCtx rctx(store);
268 rgw_obj obj(bucket_info.bucket, obj_key);
269 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
270 }
271}
272
273int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
274{
275 MultipartMetaFilter mp_filter;
276 vector<rgw_bucket_dir_entry> objs;
277 RGWMPObj mp_obj;
278 bool is_truncated;
279 int ret;
280 RGWBucketInfo& bucket_info = target->get_bucket_info();
281 RGWRados::Bucket::List list_op(target);
282 list_op.params.list_versions = false;
283 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
284 list_op.params.filter = &mp_filter;
285 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
286 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
287 continue;
288 }
289 list_op.params.prefix = prefix_iter->first;
290 do {
291 objs.clear();
292 list_op.params.marker = list_op.get_next_marker();
293 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
294 if (ret < 0) {
295 if (ret == (-ENOENT))
296 return 0;
297 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
298 return ret;
299 }
300
301 utime_t now = ceph_clock_now();
302 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
303 if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
304 rgw_obj_key key(obj_iter->key);
305 if (!mp_obj.from_meta(key.name)) {
306 continue;
307 }
308 RGWObjectCtx rctx(store);
309 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
310 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
311 ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
312 return ret;
313 }
b32b8144
FG
314 if (going_down())
315 return 0;
7c673cae
FG
316 }
317 }
318 } while(is_truncated);
319 }
320 return 0;
321}
322
323int RGWLC::bucket_lc_process(string& shard_id)
324{
325 RGWLifecycleConfiguration config(cct);
326 RGWBucketInfo bucket_info;
327 map<string, bufferlist> bucket_attrs;
328 string next_marker, no_ns, list_versions;
329 bool is_truncated;
330 vector<rgw_bucket_dir_entry> objs;
331 RGWObjectCtx obj_ctx(store);
332 vector<std::string> result;
333 boost::split(result, shard_id, boost::is_any_of(":"));
334 string bucket_tenant = result[0];
335 string bucket_name = result[1];
336 string bucket_id = result[2];
337 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
338 if (ret < 0) {
339 ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
340 return ret;
341 }
342
343 ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
344 if (ret !=0) {
345 ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
346 return -ENOENT;
347 }
348
349 RGWRados::Bucket target(store, bucket_info);
350 RGWRados::Bucket::List list_op(&target);
351
352 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
353 if (aiter == bucket_attrs.end())
354 return 0;
355
356 bufferlist::iterator iter(&aiter->second);
357 try {
358 config.decode(iter);
359 } catch (const buffer::error& e) {
360 ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;
361 return -1;
362 }
363
364 map<string, lc_op>& prefix_map = config.get_prefix_map();
365 list_op.params.list_versions = bucket_info.versioned();
366 if (!bucket_info.versioned()) {
367 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
224ce89b
WB
368 if (!prefix_iter->second.status ||
369 (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
370 continue;
371 }
372 if (prefix_iter->second.expiration_date != boost::none &&
373 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
7c673cae
FG
374 continue;
375 }
376 list_op.params.prefix = prefix_iter->first;
377 do {
378 objs.clear();
379 list_op.params.marker = list_op.get_next_marker();
380 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
381
382 if (ret < 0) {
383 if (ret == (-ENOENT))
384 return 0;
385 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
386 return ret;
387 }
224ce89b 388
7c673cae 389 utime_t now = ceph_clock_now();
224ce89b 390 bool is_expired;
7c673cae
FG
391 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
392 rgw_obj_key key(obj_iter->key);
393
394 if (!key.ns.empty()) {
395 continue;
396 }
224ce89b
WB
397 if (prefix_iter->second.expiration_date != boost::none) {
398 //we have checked it before
399 is_expired = true;
400 } else {
401 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
402 }
403 if (is_expired) {
7c673cae
FG
404 RGWObjectCtx rctx(store);
405 rgw_obj obj(bucket_info.bucket, key);
406 RGWObjState *state;
407 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
408 if (ret < 0) {
409 return ret;
410 }
411 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
412 continue;
413 ret = remove_expired_obj(bucket_info, obj_iter->key, true);
414 if (ret < 0) {
415 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
416 } else {
417 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
418 }
b32b8144
FG
419
420 if (going_down())
421 return 0;
7c673cae
FG
422 }
423 }
424 } while (is_truncated);
425 }
426 } else {
427 //bucket versioning is enabled or suspended
428 rgw_obj_key pre_marker;
429 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
31f18b77 430 if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0
224ce89b 431 && prefix_iter->second.expiration_date == boost::none
31f18b77 432 && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
7c673cae
FG
433 continue;
434 }
435 if (prefix_iter != prefix_map.begin() &&
436 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
437 list_op.next_marker = pre_marker;
438 } else {
439 pre_marker = list_op.get_next_marker();
440 }
441 list_op.params.prefix = prefix_iter->first;
442 rgw_bucket_dir_entry pre_obj;
443 do {
444 if (!objs.empty()) {
445 pre_obj = objs.back();
446 }
447 objs.clear();
448 list_op.params.marker = list_op.get_next_marker();
449 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
450
451 if (ret < 0) {
452 if (ret == (-ENOENT))
453 return 0;
454 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
455 return ret;
456 }
457
458 utime_t now = ceph_clock_now();
459 ceph::real_time mtime;
460 bool remove_indeed = true;
461 int expiration;
31f18b77 462 bool skip_expiration;
224ce89b 463 bool is_expired;
7c673cae 464 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
31f18b77 465 skip_expiration = false;
224ce89b 466 is_expired = false;
7c673cae 467 if (obj_iter->is_current()) {
224ce89b
WB
468 if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
469 && !prefix_iter->second.dm_expiration) {
7c673cae
FG
470 continue;
471 }
472 if (obj_iter->is_delete_marker()) {
473 if ((obj_iter + 1)==objs.end()) {
474 if (is_truncated) {
475 //deal with it in next round because we can't judge whether this marker is the only version
476 list_op.next_marker = obj_iter->key;
477 break;
478 }
479 } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
480 continue;
481 }
31f18b77 482 skip_expiration = prefix_iter->second.dm_expiration;
7c673cae
FG
483 remove_indeed = true; //we should remove the delete marker if it's the only version
484 } else {
485 remove_indeed = false;
486 }
487 mtime = obj_iter->meta.mtime;
488 expiration = prefix_iter->second.expiration;
224ce89b 489 if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
31f18b77 490 continue;
224ce89b
WB
491 } else if (!skip_expiration) {
492 if (expiration > 0) {
493 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
494 } else {
495 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
496 }
31f18b77 497 }
7c673cae
FG
498 } else {
499 if (prefix_iter->second.noncur_expiration <=0) {
500 continue;
501 }
502 remove_indeed = true;
503 mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
504 expiration = prefix_iter->second.noncur_expiration;
224ce89b 505 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
7c673cae 506 }
224ce89b 507 if (skip_expiration || is_expired) {
7c673cae
FG
508 if (obj_iter->is_visible()) {
509 RGWObjectCtx rctx(store);
510 rgw_obj obj(bucket_info.bucket, obj_iter->key);
511 RGWObjState *state;
512 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
513 if (ret < 0) {
514 return ret;
515 }
516 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
517 continue;
518 }
519 ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
520 if (ret < 0) {
521 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
522 } else {
523 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
524 }
b32b8144
FG
525
526 if (going_down())
527 return 0;
7c673cae
FG
528 }
529 }
530 } while (is_truncated);
531 }
532 }
533
534 ret = handle_multipart_expiration(&target, prefix_map);
535
536 return ret;
537}
538
539int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
540{
541 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
542
543 rados::cls::lock::Lock l(lc_index_lock_name);
544 l.set_cookie(cookie);
545 l.set_duration(lock_duration);
546
547 do {
548 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
549 if (ret == -EBUSY) { /* already locked by another lc processor */
550 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
551 sleep(5);
552 continue;
553 }
554 if (ret < 0)
555 return 0;
556 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;
557 if (result == -ENOENT) {
558 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
559 if (ret < 0) {
560 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
7c673cae 561 }
31f18b77 562 goto clean;
7c673cae
FG
563 } else if (result < 0) {
564 entry.second = lc_failed;
565 } else {
566 entry.second = lc_complete;
567 }
568
569 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
570 if (ret < 0) {
571 dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
572 }
573clean:
574 l.unlock(&store->lc_pool_ctx, obj_names[index]);
575 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;
576 return 0;
577 } while (true);
578}
579
580int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
581{
582 int index = 0;
583 progress_map->clear();
584 for(; index <max_objs; index++) {
585 map<string, int > entries;
586 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
31f18b77
FG
587 if (ret < 0) {
588 if (ret == -ENOENT) {
589 dout(10) << __func__ << " ignoring unfound lc object="
590 << obj_names[index] << dendl;
591 continue;
592 } else {
593 return ret;
594 }
595 }
7c673cae
FG
596 map<string, int>::iterator iter;
597 for (iter = entries.begin(); iter != entries.end(); ++iter) {
598 progress_map->insert(*iter);
599 }
600 }
601 return 0;
602}
603
604int RGWLC::process()
605{
606 int max_secs = cct->_conf->rgw_lc_lock_max_time;
607
608 unsigned start;
609 int ret = get_random_bytes((char *)&start, sizeof(start));
610 if (ret < 0)
611 return ret;
612
613 for (int i = 0; i < max_objs; i++) {
614 int index = (i + start) % max_objs;
615 ret = process(index, max_secs);
616 if (ret < 0)
617 return ret;
618 }
619
620 return 0;
621}
622
623int RGWLC::process(int index, int max_lock_secs)
624{
625 rados::cls::lock::Lock l(lc_index_lock_name);
626 do {
627 utime_t now = ceph_clock_now();
628 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
629 if (max_lock_secs <= 0)
630 return -EAGAIN;
631
632 utime_t time(max_lock_secs, 0);
633 l.set_duration(time);
634
635 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
636 if (ret == -EBUSY) { /* already locked by another lc processor */
637 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
638 sleep(5);
639 continue;
640 }
641 if (ret < 0)
642 return 0;
643
644 string marker;
645 cls_rgw_lc_obj_head head;
646 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
647 if (ret < 0) {
648 dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
649 goto exit;
650 }
651
652 if(!if_already_run_today(head.start_date)) {
653 head.start_date = now;
654 head.marker.clear();
655 ret = bucket_lc_prepare(index);
656 if (ret < 0) {
657 dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
658 goto exit;
659 }
660 }
661
662 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
663 if (ret < 0) {
664 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
665 goto exit;
666 }
667
668 if (entry.first.empty())
669 goto exit;
670
671 entry.second = lc_processing;
672 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
673 if (ret < 0) {
674 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
675 goto exit;
676 }
677
678 head.marker = entry.first;
679 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
680 if (ret < 0) {
681 dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
682 goto exit;
683 }
684 l.unlock(&store->lc_pool_ctx, obj_names[index]);
685 ret = bucket_lc_process(entry.first);
686 bucket_lc_post(index, max_lock_secs, entry, ret);
3efd9988
FG
687 }while(1);
688
7c673cae
FG
689exit:
690 l.unlock(&store->lc_pool_ctx, obj_names[index]);
691 return 0;
7c673cae
FG
692}
693
694void RGWLC::start_processor()
695{
696 worker = new LCWorker(cct, this);
697 worker->create("lifecycle_thr");
698}
699
700void RGWLC::stop_processor()
701{
702 down_flag = true;
703 if (worker) {
704 worker->stop();
705 worker->join();
706 }
707 delete worker;
708 worker = NULL;
709}
710
711void RGWLC::LCWorker::stop()
712{
713 Mutex::Locker l(lock);
714 cond.Signal();
715}
716
717bool RGWLC::going_down()
718{
719 return down_flag;
720}
721
722bool RGWLC::LCWorker::should_work(utime_t& now)
723{
724 int start_hour;
725 int start_minute;
726 int end_hour;
727 int end_minute;
728 string worktime = cct->_conf->rgw_lifecycle_work_time;
729 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
730 struct tm bdt;
731 time_t tt = now.sec();
732 localtime_r(&tt, &bdt);
733
734 if (cct->_conf->rgw_lc_debug_interval > 0) {
735 /* We're debugging, so say we can run */
736 return true;
737 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
738 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
739 return true;
740 } else {
741 return false;
742 }
743
744}
745
746int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
747{
748 if (cct->_conf->rgw_lc_debug_interval > 0) {
749 int secs = start + cct->_conf->rgw_lc_debug_interval - now;
750 if (secs < 0)
751 secs = 0;
752 return (secs);
753 }
754
755 int start_hour;
756 int start_minute;
757 int end_hour;
758 int end_minute;
759 string worktime = cct->_conf->rgw_lifecycle_work_time;
760 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
761 struct tm bdt;
762 time_t tt = now.sec();
763 time_t nt;
764 localtime_r(&tt, &bdt);
765 bdt.tm_hour = start_hour;
766 bdt.tm_min = start_minute;
767 bdt.tm_sec = 0;
768 nt = mktime(&bdt);
769
770 return (nt+24*60*60 - tt);
771}
772