]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_gc.cc
d59cc65b4b2b2117a29c42ad8cb0083a7d441d44
[ceph.git] / ceph / src / rgw / rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_gc.h"
5
6 #include "rgw_tools.h"
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
17
18 #include <list> // XXX
19 #include <sstream>
20 #include "xxhash.h"
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace std;
26 using namespace librados;
27
28 static string gc_oid_prefix = "gc";
29 static string gc_index_lock_name = "gc_process";
30
31 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
32 cct = _cct;
33 store = _store;
34
35 max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
36
37 obj_names = new string[max_objs];
38
39 for (int i = 0; i < max_objs; i++) {
40 obj_names[i] = gc_oid_prefix;
41 char buf[32];
42 snprintf(buf, 32, ".%d", i);
43 obj_names[i].append(buf);
44
45 auto it = transitioned_objects_cache.begin() + i;
46 transitioned_objects_cache.insert(it, false);
47
48 //version = 0 -> not ready for transition
49 //version = 1 -> marked ready for transition
50 librados::ObjectWriteOperation op;
51 op.create(false);
52 const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
53 gc_log_init2(op, queue_size, num_deferred_entries);
54 store->gc_operate(this, obj_names[i], &op);
55 }
56 }
57
58 void RGWGC::finalize()
59 {
60 delete[] obj_names;
61 }
62
63 int RGWGC::tag_index(const string& tag)
64 {
65 return rgw_shards_mod(XXH64(tag.c_str(), tag.size(), seed), max_objs);
66 }
67
68 int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag)
69 {
70 ObjectWriteOperation op;
71 cls_rgw_gc_obj_info info;
72 info.chain = chain;
73 info.tag = tag;
74 gc_log_enqueue2(op, cct->_conf->rgw_gc_obj_min_wait, info);
75
76 int i = tag_index(tag);
77
78 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
79
80 auto ret = store->gc_operate(this, obj_names[i], &op);
81 if (ret != -ECANCELED && ret != -EPERM) {
82 return ret;
83 }
84 ObjectWriteOperation set_entry_op;
85 cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
86 return store->gc_operate(this, obj_names[i], &set_entry_op);
87 }
88
89 struct defer_chain_state {
90 librados::AioCompletion* completion = nullptr;
91 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
92 // RGWGC destructs before this completion fires
93 RGWGC* gc = nullptr;
94 cls_rgw_gc_obj_info info;
95
96 ~defer_chain_state() {
97 if (completion) {
98 completion->release();
99 }
100 }
101 };
102
103 static void async_defer_callback(librados::completion_t, void* arg)
104 {
105 std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
106 if (state->completion->get_return_value() == -ECANCELED) {
107 state->gc->on_defer_canceled(state->info);
108 }
109 }
110
111 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
112 {
113 const std::string& tag = info.tag;
114 const int i = tag_index(tag);
115
116 // ECANCELED from cls_version_check() tells us that we've transitioned
117 transitioned_objects_cache[i] = true;
118
119 ObjectWriteOperation op;
120 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
121 cls_rgw_gc_remove(op, {tag});
122
123 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
124 store->gc_aio_operate(obj_names[i], c, &op);
125 c->release();
126 }
127
128 int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
129 {
130 const int i = tag_index(tag);
131 cls_rgw_gc_obj_info info;
132 info.chain = chain;
133 info.tag = tag;
134
135 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
136 if (transitioned_objects_cache[i]) {
137 ObjectWriteOperation op;
138 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
139
140 // this tag may still be present in omap, so remove it once the cls_rgw_gc
141 // enqueue succeeds
142 cls_rgw_gc_remove(op, {tag});
143
144 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
145 int ret = store->gc_aio_operate(obj_names[i], c, &op);
146 c->release();
147 return ret;
148 }
149
150 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
151 ObjectWriteOperation op;
152
153 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
154 // from writing new entries to omap after the transition
155 gc_log_defer1(op, cct->_conf->rgw_gc_obj_min_wait, info);
156
157 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
158 auto state = std::make_unique<defer_chain_state>();
159 state->gc = this;
160 state->info.chain = chain;
161 state->info.tag = tag;
162 state->completion = librados::Rados::aio_create_completion(
163 state.get(), async_defer_callback);
164
165 int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
166 if (ret == 0) {
167 state.release(); // release ownership until async_defer_callback()
168 }
169 return ret;
170 }
171
172 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
173 {
174 ObjectWriteOperation op;
175 cls_rgw_gc_remove(op, tags);
176
177 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
178 int ret = store->gc_aio_operate(obj_names[index], c, &op);
179 if (ret < 0) {
180 c->release();
181 } else {
182 *pc = c;
183 }
184 return ret;
185 }
186
187 int RGWGC::remove(int index, int num_entries)
188 {
189 ObjectWriteOperation op;
190 cls_rgw_gc_queue_remove_entries(op, num_entries);
191
192 return store->gc_operate(this, obj_names[index], &op);
193 }
194
195 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
196 {
197 result.clear();
198 string next_marker;
199 bool check_queue = false;
200
201 for (; *index < max_objs && result.size() < max; (*index)++, marker.clear(), check_queue = false) {
202 std::list<cls_rgw_gc_obj_info> entries, queue_entries;
203 int ret = 0;
204
205 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
206 if (! transitioned_objects_cache[*index] && ! check_queue && ! processing_queue) {
207 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
208 if (ret != -ENOENT && ret < 0) {
209 return ret;
210 }
211 obj_version objv;
212 cls_version_read(store->gc_pool_ctx, obj_names[*index], &objv);
213 if (ret == -ENOENT || entries.size() == 0) {
214 if (objv.ver == 0) {
215 continue;
216 } else {
217 if (! expired_only) {
218 transitioned_objects_cache[*index] = true;
219 marker.clear();
220 } else {
221 std::list<cls_rgw_gc_obj_info> non_expired_entries;
222 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, 1, false, non_expired_entries, truncated, next_marker);
223 if (non_expired_entries.size() == 0) {
224 transitioned_objects_cache[*index] = true;
225 marker.clear();
226 }
227 }
228 }
229 }
230 if ((objv.ver == 1) && (entries.size() < max - result.size())) {
231 check_queue = true;
232 marker.clear();
233 }
234 }
235 if (transitioned_objects_cache[*index] || check_queue || processing_queue) {
236 processing_queue = false;
237 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[*index], marker, (max - result.size()) - entries.size(), expired_only, queue_entries, truncated, next_marker);
238 if (ret < 0) {
239 return ret;
240 }
241 }
242 if (entries.size() == 0 && queue_entries.size() == 0)
243 continue;
244
245 std::list<cls_rgw_gc_obj_info>::iterator iter;
246 for (iter = entries.begin(); iter != entries.end(); ++iter) {
247 result.push_back(*iter);
248 }
249
250 for (iter = queue_entries.begin(); iter != queue_entries.end(); ++iter) {
251 result.push_back(*iter);
252 }
253
254 marker = next_marker;
255
256 if (*index == max_objs - 1) {
257 if (queue_entries.size() > 0 && *truncated) {
258 processing_queue = true;
259 } else {
260 processing_queue = false;
261 }
262 /* we cut short here, truncated will hold the correct value */
263 return 0;
264 }
265
266 if (result.size() == max) {
267 if (queue_entries.size() > 0 && *truncated) {
268 processing_queue = true;
269 } else {
270 processing_queue = false;
271 *index += 1; //move to next gc object
272 }
273
274 /* close approximation, it might be that the next of the objects don't hold
275 * anything, in this case truncated should have been false, but we can find
276 * that out on the next iteration
277 */
278 *truncated = true;
279 return 0;
280 }
281 }
282 *truncated = false;
283 processing_queue = false;
284
285 return 0;
286 }
287
288 class RGWGCIOManager {
289 const DoutPrefixProvider* dpp;
290 CephContext *cct;
291 RGWGC *gc;
292
293 struct IO {
294 enum Type {
295 UnknownIO = 0,
296 TailIO = 1,
297 IndexIO = 2,
298 } type{UnknownIO};
299 librados::AioCompletion *c{nullptr};
300 string oid;
301 int index{-1};
302 string tag;
303 };
304
305 deque<IO> ios;
306 vector<std::vector<string> > remove_tags;
307 /* tracks the number of remaining shadow objects for a given tag in order to
308 * only remove the tag once all shadow objects have themselves been removed
309 */
310 vector<map<string, size_t> > tag_io_size;
311
312 #define MAX_AIO_DEFAULT 10
313 size_t max_aio{MAX_AIO_DEFAULT};
314
315 public:
316 RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
317 cct(_cct),
318 gc(_gc) {
319 max_aio = cct->_conf->rgw_gc_max_concurrent_io;
320 remove_tags.resize(min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()));
321 tag_io_size.resize(min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()));
322 }
323
324 ~RGWGCIOManager() {
325 for (auto io : ios) {
326 io.c->release();
327 }
328 }
329
330 int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
331 int index, const string& tag) {
332 while (ios.size() > max_aio) {
333 if (gc->going_down()) {
334 return 0;
335 }
336 auto ret = handle_next_completion();
337 //Return error if we are using queue, else ignore it
338 if (gc->transitioned_objects_cache[index] && ret < 0) {
339 return ret;
340 }
341 }
342
343 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
344 int ret = ioctx->aio_operate(oid, c, op);
345 if (ret < 0) {
346 return ret;
347 }
348 ios.push_back(IO{IO::TailIO, c, oid, index, tag});
349
350 return 0;
351 }
352
353 int handle_next_completion() {
354 ceph_assert(!ios.empty());
355 IO& io = ios.front();
356 io.c->wait_for_complete();
357 int ret = io.c->get_return_value();
358 io.c->release();
359
360 if (ret == -ENOENT) {
361 ret = 0;
362 }
363
364 if (io.type == IO::IndexIO && ! gc->transitioned_objects_cache[io.index]) {
365 if (ret < 0) {
366 ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
367 io.index << " returned error, ret=" << ret << dendl;
368 }
369 goto done;
370 }
371
372 if (ret < 0) {
373 ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid <<
374 ", ret=" << ret << dendl;
375 goto done;
376 }
377
378 if (! gc->transitioned_objects_cache[io.index]) {
379 schedule_tag_removal(io.index, io.tag);
380 }
381
382 done:
383 ios.pop_front();
384 return ret;
385 }
386
387 /* This is a request to schedule a tag removal. It will be called once when
388 * there are no shadow objects. But it will also be called for every shadow
389 * object when there are any. Since we do not want the tag to be removed
390 * until all shadow objects have been successfully removed, the scheduling
391 * will not happen until the shadow object count goes down to zero
392 */
393 void schedule_tag_removal(int index, string tag) {
394 auto& ts = tag_io_size[index];
395 auto ts_it = ts.find(tag);
396 if (ts_it != ts.end()) {
397 auto& size = ts_it->second;
398 --size;
399 // wait all shadow obj delete return
400 if (size != 0)
401 return;
402
403 ts.erase(ts_it);
404 }
405
406 auto& rt = remove_tags[index];
407
408 rt.push_back(tag);
409 if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
410 flush_remove_tags(index, rt);
411 }
412 }
413
414 void add_tag_io_size(int index, string tag, size_t size) {
415 auto& ts = tag_io_size[index];
416 ts.emplace(tag, size);
417 }
418
419 int drain_ios() {
420 int ret_val = 0;
421 while (!ios.empty()) {
422 if (gc->going_down()) {
423 return -EAGAIN;
424 }
425 auto ret = handle_next_completion();
426 if (ret < 0) {
427 ret_val = ret;
428 }
429 }
430 return ret_val;
431 }
432
433 void drain() {
434 drain_ios();
435 flush_remove_tags();
436 /* the tags draining might have generated more ios, drain those too */
437 drain_ios();
438 }
439
440 void flush_remove_tags(int index, vector<string>& rt) {
441 IO index_io;
442 index_io.type = IO::IndexIO;
443 index_io.index = index;
444
445 ldpp_dout(dpp, 20) << __func__ <<
446 " removing entries from gc log shard index=" << index << ", size=" <<
447 rt.size() << ", entries=" << rt << dendl;
448
449 auto rt_guard = make_scope_guard(
450 [&]
451 {
452 rt.clear();
453 }
454 );
455
456 int ret = gc->remove(index, rt, &index_io.c);
457 if (ret < 0) {
458 /* we already cleared list of tags, this prevents us from
459 * ballooning in case of a persistent problem
460 */
461 ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" <<
462 index << " ret=" << ret << dendl;
463 return;
464 }
465 if (perfcounter) {
466 /* log the count of tags retired for rate estimation */
467 perfcounter->inc(l_rgw_gc_retire, rt.size());
468 }
469 ios.push_back(index_io);
470 }
471
472 void flush_remove_tags() {
473 int index = 0;
474 for (auto& rt : remove_tags) {
475 if (! gc->transitioned_objects_cache[index]) {
476 flush_remove_tags(index, rt);
477 }
478 ++index;
479 }
480 }
481
482 int remove_queue_entries(int index, int num_entries) {
483 int ret = gc->remove(index, num_entries);
484 if (ret < 0) {
485 ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
486 index << " ret=" << ret << dendl;
487 return ret;
488 }
489 if (perfcounter) {
490 /* log the count of tags retired for rate estimation */
491 perfcounter->inc(l_rgw_gc_retire, num_entries);
492 }
493 return 0;
494 }
495 }; // class RGWGCIOManger
496
497 int RGWGC::process(int index, int max_secs, bool expired_only,
498 RGWGCIOManager& io_manager)
499 {
500 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
501 index << ", max_secs=" << max_secs << ", expired_only=" <<
502 expired_only << dendl;
503
504 rados::cls::lock::Lock l(gc_index_lock_name);
505 utime_t end = ceph_clock_now();
506
507 /* max_secs should be greater than zero. We don't want a zero max_secs
508 * to be translated as no timeout, since we'd then need to break the
509 * lock and that would require a manual intervention. In this case
510 * we can just wait it out. */
511 if (max_secs <= 0)
512 return -EAGAIN;
513
514 end += max_secs;
515 utime_t time(max_secs, 0);
516 l.set_duration(time);
517
518 int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
519 if (ret == -EBUSY) { /* already locked by another gc processor */
520 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
521 obj_names[index] << dendl;
522 return 0;
523 }
524 if (ret < 0)
525 return ret;
526
527 string marker;
528 string next_marker;
529 bool truncated;
530 IoCtx *ctx = new IoCtx;
531 do {
532 int max = 100;
533 std::list<cls_rgw_gc_obj_info> entries;
534
535 int ret = 0;
536
537 if (! transitioned_objects_cache[index]) {
538 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
539 ldpp_dout(this, 20) <<
540 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
541 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
542 ", next_marker='" << next_marker << "'" << dendl;
543 obj_version objv;
544 cls_version_read(store->gc_pool_ctx, obj_names[index], &objv);
545 if ((objv.ver == 1) && entries.size() == 0) {
546 std::list<cls_rgw_gc_obj_info> non_expired_entries;
547 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, 1, false, non_expired_entries, &truncated, next_marker);
548 if (non_expired_entries.size() == 0) {
549 transitioned_objects_cache[index] = true;
550 marker.clear();
551 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl;
552 } else {
553 ret = 0;
554 goto done;
555 }
556 }
557 if ((objv.ver == 0) && (ret == -ENOENT || entries.size() == 0)) {
558 ret = 0;
559 goto done;
560 }
561 }
562
563 if (transitioned_objects_cache[index]) {
564 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
565 ldpp_dout(this, 20) <<
566 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret <<
567 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
568 ", next_marker='" << next_marker << "'" << dendl;
569 if (entries.size() == 0) {
570 ret = 0;
571 goto done;
572 }
573 }
574
575 if (ret < 0)
576 goto done;
577
578 marker = next_marker;
579
580 string last_pool;
581 std::list<cls_rgw_gc_obj_info>::iterator iter;
582 for (iter = entries.begin(); iter != entries.end(); ++iter) {
583 cls_rgw_gc_obj_info& info = *iter;
584
585 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
586 info.tag << "', time=" << info.time << ", chain.objs.size()=" <<
587 info.chain.objs.size() << dendl;
588
589 std::list<cls_rgw_obj>::iterator liter;
590 cls_rgw_obj_chain& chain = info.chain;
591
592 utime_t now = ceph_clock_now();
593 if (now >= end) {
594 goto done;
595 }
596 if (! transitioned_objects_cache[index]) {
597 if (chain.objs.empty()) {
598 io_manager.schedule_tag_removal(index, info.tag);
599 } else {
600 io_manager.add_tag_io_size(index, info.tag, chain.objs.size());
601 }
602 }
603 if (! chain.objs.empty()) {
604 for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
605 cls_rgw_obj& obj = *liter;
606
607 if (obj.pool != last_pool) {
608 delete ctx;
609 ctx = new IoCtx;
610 ret = rgw_init_ioctx(this, store->get_rados_handle(), obj.pool, *ctx);
611 if (ret < 0) {
612 if (transitioned_objects_cache[index]) {
613 goto done;
614 }
615 last_pool = "";
616 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
617 obj.pool << dendl;
618 continue;
619 }
620 last_pool = obj.pool;
621 }
622
623 ctx->locator_set_key(obj.loc);
624
625 const string& oid = obj.key.name; /* just stored raw oid there */
626
627 ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool <<
628 ":" << obj.key.name << dendl;
629 ObjectWriteOperation op;
630 cls_refcount_put(op, info.tag, true);
631
632 ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
633 if (ret < 0) {
634 ldpp_dout(this, 0) <<
635 "WARNING: failed to schedule deletion for oid=" << oid << dendl;
636 if (transitioned_objects_cache[index]) {
637 //If deleting oid failed for any of them, we will not delete queue entries
638 goto done;
639 }
640 }
641 if (going_down()) {
642 // leave early, even if tag isn't removed, it's ok since it
643 // will be picked up next time around
644 goto done;
645 }
646 } // chains loop
647 } // else -- chains not empty
648 } // entries loop
649 if (transitioned_objects_cache[index] && entries.size() > 0) {
650 ret = io_manager.drain_ios();
651 if (ret < 0) {
652 goto done;
653 }
654 //Remove the entries from the queue
655 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
656 ret = io_manager.remove_queue_entries(index, entries.size());
657 if (ret < 0) {
658 ldpp_dout(this, 0) <<
659 "WARNING: failed to remove queue entries" << dendl;
660 goto done;
661 }
662 }
663 } while (truncated);
664
665 done:
666 /* we don't drain here, because if we're going down we don't want to
667 * hold the system if backend is unresponsive
668 */
669 l.unlock(&store->gc_pool_ctx, obj_names[index]);
670 delete ctx;
671
672 return 0;
673 }
674
675 int RGWGC::process(bool expired_only)
676 {
677 int max_secs = cct->_conf->rgw_gc_processor_max_time;
678
679 const int start = ceph::util::generate_random_number(0, max_objs - 1);
680
681 RGWGCIOManager io_manager(this, store->ctx(), this);
682
683 for (int i = 0; i < max_objs; i++) {
684 int index = (i + start) % max_objs;
685 int ret = process(index, max_secs, expired_only, io_manager);
686 if (ret < 0)
687 return ret;
688 }
689 if (!going_down()) {
690 io_manager.drain();
691 }
692
693 return 0;
694 }
695
696 bool RGWGC::going_down()
697 {
698 return down_flag;
699 }
700
701 void RGWGC::start_processor()
702 {
703 worker = new GCWorker(this, cct, this);
704 worker->create("rgw_gc");
705 }
706
707 void RGWGC::stop_processor()
708 {
709 down_flag = true;
710 if (worker) {
711 worker->stop();
712 worker->join();
713 }
714 delete worker;
715 worker = NULL;
716 }
717
718 unsigned RGWGC::get_subsys() const
719 {
720 return dout_subsys;
721 }
722
723 std::ostream& RGWGC::gen_prefix(std::ostream& out) const
724 {
725 return out << "garbage collection: ";
726 }
727
728 void *RGWGC::GCWorker::entry() {
729 do {
730 utime_t start = ceph_clock_now();
731 ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;
732 int r = gc->process(true);
733 if (r < 0) {
734 ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
735 }
736 ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl;
737
738 if (gc->going_down())
739 break;
740
741 utime_t end = ceph_clock_now();
742 end -= start;
743 int secs = cct->_conf->rgw_gc_processor_period;
744
745 if (secs <= end.sec())
746 continue; // next round
747
748 secs -= end.sec();
749
750 std::unique_lock locker{lock};
751 cond.wait_for(locker, std::chrono::seconds(secs));
752 } while (!gc->going_down());
753
754 return NULL;
755 }
756
757 void RGWGC::GCWorker::stop()
758 {
759 std::lock_guard l{lock};
760 cond.notify_all();
761 }