]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_lc.cc
update sources to 12.2.8
[ceph.git] / ceph / src / rgw / rgw_lc.cc
CommitLineData
1adf2230
AA
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include <string.h>
5#include <iostream>
6#include <map>
7
8#include <boost/algorithm/string/split.hpp>
9#include <boost/algorithm/string.hpp>
10
11#include "common/Formatter.h"
12#include <common/errno.h>
13#include "auth/Crypto.h"
14#include "cls/rgw/cls_rgw_client.h"
15#include "cls/lock/cls_lock_client.h"
16#include "rgw_common.h"
17#include "rgw_bucket.h"
18#include "rgw_lc.h"
19
20#define dout_context g_ceph_context
21#define dout_subsys ceph_subsys_rgw
22
23const char* LC_STATUS[] = {
24 "UNINITIAL",
25 "PROCESSING",
26 "FAILED",
27 "COMPLETE"
28};
29
30using namespace std;
31using namespace librados;
32
224ce89b 33bool LCRule::valid()
7c673cae
FG
34{
35 if (id.length() > MAX_ID_LEN) {
36 return false;
37 }
31f18b77 38 else if(expiration.empty() && noncur_expiration.empty() && mp_expiration.empty() && !dm_expiration) {
7c673cae
FG
39 return false;
40 }
224ce89b 41 else if (!expiration.valid() || !noncur_expiration.valid() || !mp_expiration.valid()) {
7c673cae
FG
42 return false;
43 }
44 return true;
45}
46
47void RGWLifecycleConfiguration::add_rule(LCRule *rule)
48{
49 string id;
50 rule->get_id(id); // not that this will return false for groups, but that's ok, we won't search groups
51 rule_map.insert(pair<string, LCRule>(id, *rule));
52}
53
54bool RGWLifecycleConfiguration::_add_rule(LCRule *rule)
55{
56 lc_op op;
57 if (rule->get_status().compare("Enabled") == 0) {
58 op.status = true;
59 }
224ce89b 60 if (rule->get_expiration().has_days()) {
7c673cae
FG
61 op.expiration = rule->get_expiration().get_days();
62 }
224ce89b
WB
63 if (rule->get_expiration().has_date()) {
64 op.expiration_date = ceph::from_iso_8601(rule->get_expiration().get_date());
65 }
66 if (rule->get_noncur_expiration().has_days()) {
7c673cae
FG
67 op.noncur_expiration = rule->get_noncur_expiration().get_days();
68 }
224ce89b 69 if (rule->get_mp_expiration().has_days()) {
7c673cae
FG
70 op.mp_expiration = rule->get_mp_expiration().get_days();
71 }
31f18b77 72 op.dm_expiration = rule->get_dm_expiration();
181888fb
FG
73
74 std::string prefix;
75 if (rule->get_filter().has_prefix()){
76 prefix = rule->get_filter().get_prefix();
77 } else {
78 prefix = rule->get_prefix();
79 }
80 auto ret = prefix_map.emplace(std::move(prefix), std::move(op));
7c673cae
FG
81 return ret.second;
82}
83
84int RGWLifecycleConfiguration::check_and_add_rule(LCRule *rule)
85{
224ce89b 86 if (!rule->valid()) {
7c673cae
FG
87 return -EINVAL;
88 }
89 string id;
90 rule->get_id(id);
91 if (rule_map.find(id) != rule_map.end()) { //id shouldn't be the same
92 return -EINVAL;
93 }
94 rule_map.insert(pair<string, LCRule>(id, *rule));
95
96 if (!_add_rule(rule)) {
97 return -ERR_INVALID_REQUEST;
98 }
99 return 0;
100}
101
224ce89b
WB
102bool RGWLifecycleConfiguration::has_same_action(const lc_op& first, const lc_op& second) {
103 if ((first.expiration > 0 || first.expiration_date != boost::none) &&
104 (second.expiration > 0 || second.expiration_date != boost::none)) {
105 return true;
106 } else if (first.noncur_expiration > 0 && second.noncur_expiration > 0) {
107 return true;
108 } else if (first.mp_expiration > 0 && second.mp_expiration > 0) {
109 return true;
110 } else {
111 return false;
112 }
113}
114
7c673cae
FG
115//Rules are conflicted: if one rule's prefix starts with other rule's prefix, and these two rules
116//define same action.
224ce89b 117bool RGWLifecycleConfiguration::valid()
7c673cae
FG
118{
119 if (prefix_map.size() < 2) {
120 return true;
121 }
122 auto cur_iter = prefix_map.begin();
123 while (cur_iter != prefix_map.end()) {
124 auto next_iter = cur_iter;
125 ++next_iter;
126 while (next_iter != prefix_map.end()) {
127 string c_pre = cur_iter->first;
128 string n_pre = next_iter->first;
129 if (n_pre.compare(0, c_pre.length(), c_pre) == 0) {
224ce89b 130 if (has_same_action(cur_iter->second, next_iter->second)) {
7c673cae
FG
131 return false;
132 } else {
133 ++next_iter;
134 }
135 } else {
136 break;
137 }
138 }
139 ++cur_iter;
140 }
141 return true;
142}
143
144void *RGWLC::LCWorker::entry() {
145 do {
146 utime_t start = ceph_clock_now();
147 if (should_work(start)) {
148 dout(5) << "life cycle: start" << dendl;
149 int r = lc->process();
150 if (r < 0) {
151 dout(0) << "ERROR: do life cycle process() returned error r=" << r << dendl;
152 }
153 dout(5) << "life cycle: stop" << dendl;
154 }
155 if (lc->going_down())
156 break;
157
158 utime_t end = ceph_clock_now();
159 int secs = schedule_next_start_time(start, end);
c07f9fc5
FG
160 utime_t next;
161 next.set_from_double(end + secs);
162
163 dout(5) << "schedule life cycle next start time: " << rgw_to_asctime(next) <<dendl;
7c673cae
FG
164
165 lock.Lock();
166 cond.WaitInterval(lock, utime_t(secs, 0));
167 lock.Unlock();
168 } while (!lc->going_down());
169
170 return NULL;
171}
172
173void RGWLC::initialize(CephContext *_cct, RGWRados *_store) {
174 cct = _cct;
175 store = _store;
176 max_objs = cct->_conf->rgw_lc_max_objs;
177 if (max_objs > HASH_PRIME)
178 max_objs = HASH_PRIME;
179
180 obj_names = new string[max_objs];
181
182 for (int i = 0; i < max_objs; i++) {
183 obj_names[i] = lc_oid_prefix;
184 char buf[32];
185 snprintf(buf, 32, ".%d", i);
186 obj_names[i].append(buf);
187 }
188
189#define COOKIE_LEN 16
190 char cookie_buf[COOKIE_LEN + 1];
191 gen_rand_alphanumeric(cct, cookie_buf, sizeof(cookie_buf) - 1);
192 cookie = cookie_buf;
193}
194
195void RGWLC::finalize()
196{
197 delete[] obj_names;
198}
199
200bool RGWLC::if_already_run_today(time_t& start_date)
201{
202 struct tm bdt;
203 time_t begin_of_day;
204 utime_t now = ceph_clock_now();
205 localtime_r(&start_date, &bdt);
206
207 if (cct->_conf->rgw_lc_debug_interval > 0) {
3efd9988
FG
208 if (now - start_date < cct->_conf->rgw_lc_debug_interval)
209 return true;
210 else
211 return false;
7c673cae
FG
212 }
213
214 bdt.tm_hour = 0;
215 bdt.tm_min = 0;
216 bdt.tm_sec = 0;
217 begin_of_day = mktime(&bdt);
218 if (now - begin_of_day < 24*60*60)
219 return true;
220 else
221 return false;
222}
223
224int RGWLC::bucket_lc_prepare(int index)
225{
226 map<string, int > entries;
227
228 string marker;
229
230#define MAX_LC_LIST_ENTRIES 100
231 do {
232 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, MAX_LC_LIST_ENTRIES, entries);
233 if (ret < 0)
234 return ret;
235 map<string, int>::iterator iter;
236 for (iter = entries.begin(); iter != entries.end(); ++iter) {
237 pair<string, int > entry(iter->first, lc_uninitial);
238 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
239 if (ret < 0) {
240 dout(0) << "RGWLC::bucket_lc_prepare() failed to set entry " << obj_names[index] << dendl;
241 break;
242 }
243 marker = iter->first;
244 }
245 } while (!entries.empty());
246
247 return 0;
248}
249
250bool RGWLC::obj_has_expired(double timediff, int days)
251{
31f18b77
FG
252 double cmp;
253 if (cct->_conf->rgw_lc_debug_interval <= 0) {
254 /* Normal case, run properly */
255 cmp = days*24*60*60;
256 } else {
257 /* We're in debug mode; Treat each rgw_lc_debug_interval seconds as a day */
258 cmp = days*cct->_conf->rgw_lc_debug_interval;
259 }
7c673cae 260
31f18b77 261 return (timediff >= cmp);
7c673cae
FG
262}
263
264int RGWLC::remove_expired_obj(RGWBucketInfo& bucket_info, rgw_obj_key obj_key, bool remove_indeed)
265{
266 if (remove_indeed) {
267 return rgw_remove_object(store, bucket_info, bucket_info.bucket, obj_key);
268 } else {
269 obj_key.instance.clear();
270 RGWObjectCtx rctx(store);
271 rgw_obj obj(bucket_info.bucket, obj_key);
272 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
273 }
274}
275
276int RGWLC::handle_multipart_expiration(RGWRados::Bucket *target, const map<string, lc_op>& prefix_map)
277{
278 MultipartMetaFilter mp_filter;
279 vector<rgw_bucket_dir_entry> objs;
280 RGWMPObj mp_obj;
281 bool is_truncated;
282 int ret;
283 RGWBucketInfo& bucket_info = target->get_bucket_info();
284 RGWRados::Bucket::List list_op(target);
285 list_op.params.list_versions = false;
286 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
287 list_op.params.filter = &mp_filter;
288 for (auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
289 if (!prefix_iter->second.status || prefix_iter->second.mp_expiration <= 0) {
290 continue;
291 }
292 list_op.params.prefix = prefix_iter->first;
293 do {
294 objs.clear();
295 list_op.params.marker = list_op.get_next_marker();
296 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
297 if (ret < 0) {
298 if (ret == (-ENOENT))
299 return 0;
300 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
301 return ret;
302 }
303
304 utime_t now = ceph_clock_now();
305 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
306 if (obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.mp_expiration)) {
307 rgw_obj_key key(obj_iter->key);
308 if (!mp_obj.from_meta(key.name)) {
309 continue;
310 }
311 RGWObjectCtx rctx(store);
312 ret = abort_multipart_upload(store, cct, &rctx, bucket_info, mp_obj);
313 if (ret < 0 && ret != -ERR_NO_SUCH_UPLOAD) {
314 ldout(cct, 0) << "ERROR: abort_multipart_upload failed, ret=" << ret <<dendl;
315 return ret;
316 }
b32b8144
FG
317 if (going_down())
318 return 0;
7c673cae
FG
319 }
320 }
321 } while(is_truncated);
322 }
323 return 0;
324}
325
326int RGWLC::bucket_lc_process(string& shard_id)
327{
328 RGWLifecycleConfiguration config(cct);
329 RGWBucketInfo bucket_info;
330 map<string, bufferlist> bucket_attrs;
331 string next_marker, no_ns, list_versions;
332 bool is_truncated;
333 vector<rgw_bucket_dir_entry> objs;
334 RGWObjectCtx obj_ctx(store);
335 vector<std::string> result;
336 boost::split(result, shard_id, boost::is_any_of(":"));
337 string bucket_tenant = result[0];
338 string bucket_name = result[1];
339 string bucket_id = result[2];
340 int ret = store->get_bucket_info(obj_ctx, bucket_tenant, bucket_name, bucket_info, NULL, &bucket_attrs);
341 if (ret < 0) {
342 ldout(cct, 0) << "LC:get_bucket_info failed" << bucket_name <<dendl;
343 return ret;
344 }
345
346 ret = bucket_info.bucket.bucket_id.compare(bucket_id) ;
347 if (ret !=0) {
348 ldout(cct, 0) << "LC:old bucket id find, should be delete" << bucket_name <<dendl;
349 return -ENOENT;
350 }
351
352 RGWRados::Bucket target(store, bucket_info);
353 RGWRados::Bucket::List list_op(&target);
354
355 map<string, bufferlist>::iterator aiter = bucket_attrs.find(RGW_ATTR_LC);
356 if (aiter == bucket_attrs.end())
357 return 0;
358
359 bufferlist::iterator iter(&aiter->second);
360 try {
361 config.decode(iter);
362 } catch (const buffer::error& e) {
363 ldout(cct, 0) << __func__ << "decode life cycle config failed" << dendl;
364 return -1;
365 }
366
367 map<string, lc_op>& prefix_map = config.get_prefix_map();
368 list_op.params.list_versions = bucket_info.versioned();
369 if (!bucket_info.versioned()) {
370 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
224ce89b
WB
371 if (!prefix_iter->second.status ||
372 (prefix_iter->second.expiration <=0 && prefix_iter->second.expiration_date == boost::none)) {
373 continue;
374 }
375 if (prefix_iter->second.expiration_date != boost::none &&
376 ceph_clock_now() < ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date)) {
7c673cae
FG
377 continue;
378 }
379 list_op.params.prefix = prefix_iter->first;
380 do {
381 objs.clear();
382 list_op.params.marker = list_op.get_next_marker();
383 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
384
385 if (ret < 0) {
386 if (ret == (-ENOENT))
387 return 0;
388 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
389 return ret;
390 }
224ce89b 391
7c673cae 392 utime_t now = ceph_clock_now();
224ce89b 393 bool is_expired;
7c673cae
FG
394 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
395 rgw_obj_key key(obj_iter->key);
396
397 if (!key.ns.empty()) {
398 continue;
399 }
224ce89b
WB
400 if (prefix_iter->second.expiration_date != boost::none) {
401 //we have checked it before
402 is_expired = true;
403 } else {
404 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(obj_iter->meta.mtime), prefix_iter->second.expiration);
405 }
406 if (is_expired) {
7c673cae
FG
407 RGWObjectCtx rctx(store);
408 rgw_obj obj(bucket_info.bucket, key);
409 RGWObjState *state;
410 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
411 if (ret < 0) {
412 return ret;
413 }
414 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
415 continue;
416 ret = remove_expired_obj(bucket_info, obj_iter->key, true);
417 if (ret < 0) {
418 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
419 } else {
420 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << key << dendl;
421 }
b32b8144
FG
422
423 if (going_down())
424 return 0;
7c673cae
FG
425 }
426 }
427 } while (is_truncated);
428 }
429 } else {
430 //bucket versioning is enabled or suspended
431 rgw_obj_key pre_marker;
432 for(auto prefix_iter = prefix_map.begin(); prefix_iter != prefix_map.end(); ++prefix_iter) {
31f18b77 433 if (!prefix_iter->second.status || (prefix_iter->second.expiration <= 0
224ce89b 434 && prefix_iter->second.expiration_date == boost::none
31f18b77 435 && prefix_iter->second.noncur_expiration <= 0 && !prefix_iter->second.dm_expiration)) {
7c673cae
FG
436 continue;
437 }
438 if (prefix_iter != prefix_map.begin() &&
1adf2230
AA
439 (prefix_iter->first.compare(0, prev(prefix_iter)->first.length(), prev(prefix_iter)->first) == 0)) {
440 list_op.get_next_marker() = pre_marker;
7c673cae 441 } else {
1adf2230 442 pre_marker = list_op.get_next_marker();
7c673cae
FG
443 }
444 list_op.params.prefix = prefix_iter->first;
445 rgw_bucket_dir_entry pre_obj;
446 do {
447 if (!objs.empty()) {
448 pre_obj = objs.back();
449 }
450 objs.clear();
451 list_op.params.marker = list_op.get_next_marker();
452 ret = list_op.list_objects(1000, &objs, NULL, &is_truncated);
453
454 if (ret < 0) {
455 if (ret == (-ENOENT))
456 return 0;
457 ldout(cct, 0) << "ERROR: store->list_objects():" <<dendl;
458 return ret;
459 }
460
461 utime_t now = ceph_clock_now();
462 ceph::real_time mtime;
463 bool remove_indeed = true;
464 int expiration;
31f18b77 465 bool skip_expiration;
224ce89b 466 bool is_expired;
7c673cae 467 for (auto obj_iter = objs.begin(); obj_iter != objs.end(); ++obj_iter) {
31f18b77 468 skip_expiration = false;
224ce89b 469 is_expired = false;
7c673cae 470 if (obj_iter->is_current()) {
224ce89b
WB
471 if (prefix_iter->second.expiration <= 0 && prefix_iter->second.expiration_date == boost::none
472 && !prefix_iter->second.dm_expiration) {
7c673cae
FG
473 continue;
474 }
475 if (obj_iter->is_delete_marker()) {
476 if ((obj_iter + 1)==objs.end()) {
477 if (is_truncated) {
478 //deal with it in next round because we can't judge whether this marker is the only version
1adf2230 479 list_op.get_next_marker() = obj_iter->key;
7c673cae
FG
480 break;
481 }
482 } else if (obj_iter->key.name.compare((obj_iter + 1)->key.name) == 0) { //*obj_iter is delete marker and isn't the only version, do nothing.
483 continue;
484 }
31f18b77 485 skip_expiration = prefix_iter->second.dm_expiration;
7c673cae
FG
486 remove_indeed = true; //we should remove the delete marker if it's the only version
487 } else {
488 remove_indeed = false;
489 }
490 mtime = obj_iter->meta.mtime;
491 expiration = prefix_iter->second.expiration;
224ce89b 492 if (!skip_expiration && expiration <= 0 && prefix_iter->second.expiration_date == boost::none) {
31f18b77 493 continue;
224ce89b
WB
494 } else if (!skip_expiration) {
495 if (expiration > 0) {
496 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
497 } else {
498 is_expired = now >= ceph::real_clock::to_time_t(*prefix_iter->second.expiration_date);
499 }
31f18b77 500 }
7c673cae
FG
501 } else {
502 if (prefix_iter->second.noncur_expiration <=0) {
503 continue;
504 }
505 remove_indeed = true;
506 mtime = (obj_iter == objs.begin())?pre_obj.meta.mtime:(obj_iter - 1)->meta.mtime;
507 expiration = prefix_iter->second.noncur_expiration;
224ce89b 508 is_expired = obj_has_expired(now - ceph::real_clock::to_time_t(mtime), expiration);
7c673cae 509 }
224ce89b 510 if (skip_expiration || is_expired) {
7c673cae
FG
511 if (obj_iter->is_visible()) {
512 RGWObjectCtx rctx(store);
513 rgw_obj obj(bucket_info.bucket, obj_iter->key);
514 RGWObjState *state;
515 int ret = store->get_obj_state(&rctx, bucket_info, obj, &state, false);
516 if (ret < 0) {
517 return ret;
518 }
519 if (state->mtime != obj_iter->meta.mtime)//Check mtime again to avoid delete a recently update object as much as possible
520 continue;
521 }
522 ret = remove_expired_obj(bucket_info, obj_iter->key, remove_indeed);
523 if (ret < 0) {
524 ldout(cct, 0) << "ERROR: remove_expired_obj " << dendl;
525 } else {
526 ldout(cct, 10) << "DELETED:" << bucket_name << ":" << obj_iter->key << dendl;
527 }
b32b8144
FG
528
529 if (going_down())
530 return 0;
7c673cae
FG
531 }
532 }
533 } while (is_truncated);
534 }
535 }
536
537 ret = handle_multipart_expiration(&target, prefix_map);
538
539 return ret;
540}
541
542int RGWLC::bucket_lc_post(int index, int max_lock_sec, pair<string, int >& entry, int& result)
543{
544 utime_t lock_duration(cct->_conf->rgw_lc_lock_max_time, 0);
545
546 rados::cls::lock::Lock l(lc_index_lock_name);
547 l.set_cookie(cookie);
548 l.set_duration(lock_duration);
549
550 do {
551 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
552 if (ret == -EBUSY) { /* already locked by another lc processor */
553 dout(0) << "RGWLC::bucket_lc_post() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
554 sleep(5);
555 continue;
556 }
557 if (ret < 0)
558 return 0;
559 dout(20) << "RGWLC::bucket_lc_post() get lock" << obj_names[index] << dendl;
560 if (result == -ENOENT) {
561 ret = cls_rgw_lc_rm_entry(store->lc_pool_ctx, obj_names[index], entry);
562 if (ret < 0) {
563 dout(0) << "RGWLC::bucket_lc_post() failed to remove entry " << obj_names[index] << dendl;
7c673cae 564 }
31f18b77 565 goto clean;
7c673cae
FG
566 } else if (result < 0) {
567 entry.second = lc_failed;
568 } else {
569 entry.second = lc_complete;
570 }
571
572 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
573 if (ret < 0) {
574 dout(0) << "RGWLC::process() failed to set entry " << obj_names[index] << dendl;
575 }
576clean:
577 l.unlock(&store->lc_pool_ctx, obj_names[index]);
578 dout(20) << "RGWLC::bucket_lc_post() unlock" << obj_names[index] << dendl;
579 return 0;
580 } while (true);
581}
582
583int RGWLC::list_lc_progress(const string& marker, uint32_t max_entries, map<string, int> *progress_map)
584{
585 int index = 0;
586 progress_map->clear();
587 for(; index <max_objs; index++) {
588 map<string, int > entries;
589 int ret = cls_rgw_lc_list(store->lc_pool_ctx, obj_names[index], marker, max_entries, entries);
31f18b77
FG
590 if (ret < 0) {
591 if (ret == -ENOENT) {
592 dout(10) << __func__ << " ignoring unfound lc object="
593 << obj_names[index] << dendl;
594 continue;
595 } else {
596 return ret;
597 }
598 }
7c673cae
FG
599 map<string, int>::iterator iter;
600 for (iter = entries.begin(); iter != entries.end(); ++iter) {
601 progress_map->insert(*iter);
602 }
603 }
604 return 0;
605}
606
607int RGWLC::process()
608{
609 int max_secs = cct->_conf->rgw_lc_lock_max_time;
610
611 unsigned start;
612 int ret = get_random_bytes((char *)&start, sizeof(start));
613 if (ret < 0)
614 return ret;
615
616 for (int i = 0; i < max_objs; i++) {
617 int index = (i + start) % max_objs;
618 ret = process(index, max_secs);
619 if (ret < 0)
620 return ret;
621 }
622
623 return 0;
624}
625
626int RGWLC::process(int index, int max_lock_secs)
627{
628 rados::cls::lock::Lock l(lc_index_lock_name);
629 do {
630 utime_t now = ceph_clock_now();
631 pair<string, int > entry;//string = bucket_name:bucket_id ,int = LC_BUCKET_STATUS
632 if (max_lock_secs <= 0)
633 return -EAGAIN;
634
635 utime_t time(max_lock_secs, 0);
636 l.set_duration(time);
637
638 int ret = l.lock_exclusive(&store->lc_pool_ctx, obj_names[index]);
639 if (ret == -EBUSY) { /* already locked by another lc processor */
640 dout(0) << "RGWLC::process() failed to acquire lock on, sleep 5, try again" << obj_names[index] << dendl;
641 sleep(5);
642 continue;
643 }
644 if (ret < 0)
645 return 0;
646
647 string marker;
648 cls_rgw_lc_obj_head head;
649 ret = cls_rgw_lc_get_head(store->lc_pool_ctx, obj_names[index], head);
650 if (ret < 0) {
651 dout(0) << "RGWLC::process() failed to get obj head " << obj_names[index] << ret << dendl;
652 goto exit;
653 }
654
655 if(!if_already_run_today(head.start_date)) {
656 head.start_date = now;
657 head.marker.clear();
658 ret = bucket_lc_prepare(index);
659 if (ret < 0) {
660 dout(0) << "RGWLC::process() failed to update lc object " << obj_names[index] << ret << dendl;
661 goto exit;
662 }
663 }
664
665 ret = cls_rgw_lc_get_next_entry(store->lc_pool_ctx, obj_names[index], head.marker, entry);
666 if (ret < 0) {
667 dout(0) << "RGWLC::process() failed to get obj entry " << obj_names[index] << dendl;
668 goto exit;
669 }
670
671 if (entry.first.empty())
672 goto exit;
673
674 entry.second = lc_processing;
675 ret = cls_rgw_lc_set_entry(store->lc_pool_ctx, obj_names[index], entry);
676 if (ret < 0) {
677 dout(0) << "RGWLC::process() failed to set obj entry " << obj_names[index] << entry.first << entry.second << dendl;
678 goto exit;
679 }
680
681 head.marker = entry.first;
682 ret = cls_rgw_lc_put_head(store->lc_pool_ctx, obj_names[index], head);
683 if (ret < 0) {
684 dout(0) << "RGWLC::process() failed to put head " << obj_names[index] << dendl;
685 goto exit;
686 }
687 l.unlock(&store->lc_pool_ctx, obj_names[index]);
688 ret = bucket_lc_process(entry.first);
689 bucket_lc_post(index, max_lock_secs, entry, ret);
3efd9988
FG
690 }while(1);
691
7c673cae
FG
692exit:
693 l.unlock(&store->lc_pool_ctx, obj_names[index]);
694 return 0;
7c673cae
FG
695}
696
697void RGWLC::start_processor()
698{
699 worker = new LCWorker(cct, this);
700 worker->create("lifecycle_thr");
701}
702
703void RGWLC::stop_processor()
704{
705 down_flag = true;
706 if (worker) {
707 worker->stop();
708 worker->join();
709 }
710 delete worker;
711 worker = NULL;
712}
713
714void RGWLC::LCWorker::stop()
715{
716 Mutex::Locker l(lock);
717 cond.Signal();
718}
719
720bool RGWLC::going_down()
721{
722 return down_flag;
723}
724
725bool RGWLC::LCWorker::should_work(utime_t& now)
726{
727 int start_hour;
728 int start_minute;
729 int end_hour;
730 int end_minute;
731 string worktime = cct->_conf->rgw_lifecycle_work_time;
732 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
733 struct tm bdt;
734 time_t tt = now.sec();
735 localtime_r(&tt, &bdt);
736
737 if (cct->_conf->rgw_lc_debug_interval > 0) {
738 /* We're debugging, so say we can run */
739 return true;
740 } else if ((bdt.tm_hour*60 + bdt.tm_min >= start_hour*60 + start_minute) &&
741 (bdt.tm_hour*60 + bdt.tm_min <= end_hour*60 + end_minute)) {
742 return true;
743 } else {
744 return false;
745 }
746
747}
748
749int RGWLC::LCWorker::schedule_next_start_time(utime_t &start, utime_t& now)
750{
751 if (cct->_conf->rgw_lc_debug_interval > 0) {
752 int secs = start + cct->_conf->rgw_lc_debug_interval - now;
753 if (secs < 0)
754 secs = 0;
755 return (secs);
756 }
757
758 int start_hour;
759 int start_minute;
760 int end_hour;
761 int end_minute;
762 string worktime = cct->_conf->rgw_lifecycle_work_time;
763 sscanf(worktime.c_str(),"%d:%d-%d:%d",&start_hour, &start_minute, &end_hour, &end_minute);
764 struct tm bdt;
765 time_t tt = now.sec();
766 time_t nt;
767 localtime_r(&tt, &bdt);
768 bdt.tm_hour = start_hour;
769 bdt.tm_min = start_minute;
770 bdt.tm_sec = 0;
771 nt = mktime(&bdt);
772
773 return (nt+24*60*60 - tt);
774}
775