]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_gc.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_gc.h"
5
6 #include "rgw_tools.h"
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
17
18 #include <list> // XXX
19 #include <sstream>
20 #include "xxhash.h"
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace librados;
26
27 static string gc_oid_prefix = "gc";
28 static string gc_index_lock_name = "gc_process";
29
30 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
31 cct = _cct;
32 store = _store;
33
34 max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
35
36 obj_names = new string[max_objs];
37
38 for (int i = 0; i < max_objs; i++) {
39 obj_names[i] = gc_oid_prefix;
40 char buf[32];
41 snprintf(buf, 32, ".%d", i);
42 obj_names[i].append(buf);
43
44 auto it = transitioned_objects_cache.begin() + i;
45 transitioned_objects_cache.insert(it, false);
46
47 //version = 0 -> not ready for transition
48 //version = 1 -> marked ready for transition
49 librados::ObjectWriteOperation op;
50 op.create(false);
51 const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
52 gc_log_init2(op, queue_size, num_deferred_entries);
53 store->gc_operate(this, obj_names[i], &op);
54 }
55 }
56
57 void RGWGC::finalize()
58 {
59 delete[] obj_names;
60 }
61
62 int RGWGC::tag_index(const string& tag)
63 {
64 return rgw_shards_mod(XXH64(tag.c_str(), tag.size(), seed), max_objs);
65 }
66
67 int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag)
68 {
69 ObjectWriteOperation op;
70 cls_rgw_gc_obj_info info;
71 info.chain = chain;
72 info.tag = tag;
73 gc_log_enqueue2(op, cct->_conf->rgw_gc_obj_min_wait, info);
74
75 int i = tag_index(tag);
76
77 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
78
79 auto ret = store->gc_operate(this, obj_names[i], &op);
80 if (ret != -ECANCELED && ret != -EPERM) {
81 return ret;
82 }
83 ObjectWriteOperation set_entry_op;
84 cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
85 return store->gc_operate(this, obj_names[i], &set_entry_op);
86 }
87
88 struct defer_chain_state {
89 librados::AioCompletion* completion = nullptr;
90 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
91 // RGWGC destructs before this completion fires
92 RGWGC* gc = nullptr;
93 cls_rgw_gc_obj_info info;
94
95 ~defer_chain_state() {
96 if (completion) {
97 completion->release();
98 }
99 }
100 };
101
102 static void async_defer_callback(librados::completion_t, void* arg)
103 {
104 std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
105 if (state->completion->get_return_value() == -ECANCELED) {
106 state->gc->on_defer_canceled(state->info);
107 }
108 }
109
110 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
111 {
112 const std::string& tag = info.tag;
113 const int i = tag_index(tag);
114
115 // ECANCELED from cls_version_check() tells us that we've transitioned
116 transitioned_objects_cache[i] = true;
117
118 ObjectWriteOperation op;
119 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
120 cls_rgw_gc_remove(op, {tag});
121
122 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
123 store->gc_aio_operate(obj_names[i], c, &op);
124 c->release();
125 }
126
127 int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
128 {
129 const int i = tag_index(tag);
130 cls_rgw_gc_obj_info info;
131 info.chain = chain;
132 info.tag = tag;
133
134 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
135 if (transitioned_objects_cache[i]) {
136 ObjectWriteOperation op;
137 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
138
139 // this tag may still be present in omap, so remove it once the cls_rgw_gc
140 // enqueue succeeds
141 cls_rgw_gc_remove(op, {tag});
142
143 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
144 int ret = store->gc_aio_operate(obj_names[i], c, &op);
145 c->release();
146 return ret;
147 }
148
149 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
150 ObjectWriteOperation op;
151
152 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
153 // from writing new entries to omap after the transition
154 gc_log_defer1(op, cct->_conf->rgw_gc_obj_min_wait, info);
155
156 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
157 auto state = std::make_unique<defer_chain_state>();
158 state->gc = this;
159 state->info.chain = chain;
160 state->info.tag = tag;
161 state->completion = librados::Rados::aio_create_completion(
162 state.get(), async_defer_callback);
163
164 int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
165 if (ret == 0) {
166 state.release(); // release ownership until async_defer_callback()
167 }
168 return ret;
169 }
170
171 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
172 {
173 ObjectWriteOperation op;
174 cls_rgw_gc_remove(op, tags);
175
176 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
177 int ret = store->gc_aio_operate(obj_names[index], c, &op);
178 if (ret < 0) {
179 c->release();
180 } else {
181 *pc = c;
182 }
183 return ret;
184 }
185
186 int RGWGC::remove(int index, int num_entries)
187 {
188 ObjectWriteOperation op;
189 cls_rgw_gc_queue_remove_entries(op, num_entries);
190
191 return store->gc_operate(this, obj_names[index], &op);
192 }
193
194 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
195 {
196 result.clear();
197 string next_marker;
198 bool check_queue = false;
199
200 for (; *index < max_objs && result.size() < max; (*index)++, marker.clear(), check_queue = false) {
201 std::list<cls_rgw_gc_obj_info> entries, queue_entries;
202 int ret = 0;
203
204 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
205 if (! transitioned_objects_cache[*index] && ! check_queue && ! processing_queue) {
206 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
207 if (ret != -ENOENT && ret < 0) {
208 return ret;
209 }
210 obj_version objv;
211 cls_version_read(store->gc_pool_ctx, obj_names[*index], &objv);
212 if (ret == -ENOENT || entries.size() == 0) {
213 if (objv.ver == 0) {
214 continue;
215 } else {
216 if (! expired_only) {
217 transitioned_objects_cache[*index] = true;
218 marker.clear();
219 } else {
220 std::list<cls_rgw_gc_obj_info> non_expired_entries;
221 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, 1, false, non_expired_entries, truncated, next_marker);
222 if (non_expired_entries.size() == 0) {
223 transitioned_objects_cache[*index] = true;
224 marker.clear();
225 }
226 }
227 }
228 }
229 if ((objv.ver == 1) && (entries.size() < max - result.size())) {
230 check_queue = true;
231 marker.clear();
232 }
233 }
234 if (transitioned_objects_cache[*index] || check_queue || processing_queue) {
235 processing_queue = false;
236 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[*index], marker, (max - result.size()) - entries.size(), expired_only, queue_entries, truncated, next_marker);
237 if (ret < 0) {
238 return ret;
239 }
240 }
241 if (entries.size() == 0 && queue_entries.size() == 0)
242 continue;
243
244 std::list<cls_rgw_gc_obj_info>::iterator iter;
245 for (iter = entries.begin(); iter != entries.end(); ++iter) {
246 result.push_back(*iter);
247 }
248
249 for (iter = queue_entries.begin(); iter != queue_entries.end(); ++iter) {
250 result.push_back(*iter);
251 }
252
253 marker = next_marker;
254
255 if (*index == max_objs - 1) {
256 if (queue_entries.size() > 0 && *truncated) {
257 processing_queue = true;
258 } else {
259 processing_queue = false;
260 }
261 /* we cut short here, truncated will hold the correct value */
262 return 0;
263 }
264
265 if (result.size() == max) {
266 if (queue_entries.size() > 0 && *truncated) {
267 processing_queue = true;
268 } else {
269 processing_queue = false;
270 *index += 1; //move to next gc object
271 }
272
273 /* close approximation, it might be that the next of the objects don't hold
274 * anything, in this case truncated should have been false, but we can find
275 * that out on the next iteration
276 */
277 *truncated = true;
278 return 0;
279 }
280 }
281 *truncated = false;
282 processing_queue = false;
283
284 return 0;
285 }
286
287 class RGWGCIOManager {
288 const DoutPrefixProvider* dpp;
289 CephContext *cct;
290 RGWGC *gc;
291
292 struct IO {
293 enum Type {
294 UnknownIO = 0,
295 TailIO = 1,
296 IndexIO = 2,
297 } type{UnknownIO};
298 librados::AioCompletion *c{nullptr};
299 string oid;
300 int index{-1};
301 string tag;
302 };
303
304 deque<IO> ios;
305 vector<std::vector<string> > remove_tags;
306 /* tracks the number of remaining shadow objects for a given tag in order to
307 * only remove the tag once all shadow objects have themselves been removed
308 */
309 vector<map<string, size_t> > tag_io_size;
310
311 #define MAX_AIO_DEFAULT 10
312 size_t max_aio{MAX_AIO_DEFAULT};
313
314 public:
315 RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
316 cct(_cct),
317 gc(_gc) {
318 max_aio = cct->_conf->rgw_gc_max_concurrent_io;
319 remove_tags.resize(min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()));
320 tag_io_size.resize(min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()));
321 }
322
323 ~RGWGCIOManager() {
324 for (auto io : ios) {
325 io.c->release();
326 }
327 }
328
329 int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
330 int index, const string& tag) {
331 while (ios.size() > max_aio) {
332 if (gc->going_down()) {
333 return 0;
334 }
335 auto ret = handle_next_completion();
336 //Return error if we are using queue, else ignore it
337 if (gc->transitioned_objects_cache[index] && ret < 0) {
338 return ret;
339 }
340 }
341
342 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
343 int ret = ioctx->aio_operate(oid, c, op);
344 if (ret < 0) {
345 return ret;
346 }
347 ios.push_back(IO{IO::TailIO, c, oid, index, tag});
348
349 return 0;
350 }
351
352 int handle_next_completion() {
353 ceph_assert(!ios.empty());
354 IO& io = ios.front();
355 io.c->wait_for_complete();
356 int ret = io.c->get_return_value();
357 io.c->release();
358
359 if (ret == -ENOENT) {
360 ret = 0;
361 }
362
363 if (io.type == IO::IndexIO && ! gc->transitioned_objects_cache[io.index]) {
364 if (ret < 0) {
365 ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
366 io.index << " returned error, ret=" << ret << dendl;
367 }
368 goto done;
369 }
370
371 if (ret < 0) {
372 ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid <<
373 ", ret=" << ret << dendl;
374 goto done;
375 }
376
377 if (! gc->transitioned_objects_cache[io.index]) {
378 schedule_tag_removal(io.index, io.tag);
379 }
380
381 done:
382 ios.pop_front();
383 return ret;
384 }
385
386 /* This is a request to schedule a tag removal. It will be called once when
387 * there are no shadow objects. But it will also be called for every shadow
388 * object when there are any. Since we do not want the tag to be removed
389 * until all shadow objects have been successfully removed, the scheduling
390 * will not happen until the shadow object count goes down to zero
391 */
392 void schedule_tag_removal(int index, string tag) {
393 auto& ts = tag_io_size[index];
394 auto ts_it = ts.find(tag);
395 if (ts_it != ts.end()) {
396 auto& size = ts_it->second;
397 --size;
398 // wait all shadow obj delete return
399 if (size != 0)
400 return;
401
402 ts.erase(ts_it);
403 }
404
405 auto& rt = remove_tags[index];
406
407 rt.push_back(tag);
408 if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
409 flush_remove_tags(index, rt);
410 }
411 }
412
413 void add_tag_io_size(int index, string tag, size_t size) {
414 auto& ts = tag_io_size[index];
415 ts.emplace(tag, size);
416 }
417
418 int drain_ios() {
419 int ret_val = 0;
420 while (!ios.empty()) {
421 if (gc->going_down()) {
422 return -EAGAIN;
423 }
424 auto ret = handle_next_completion();
425 if (ret < 0) {
426 ret_val = ret;
427 }
428 }
429 return ret_val;
430 }
431
432 void drain() {
433 drain_ios();
434 flush_remove_tags();
435 /* the tags draining might have generated more ios, drain those too */
436 drain_ios();
437 }
438
439 void flush_remove_tags(int index, vector<string>& rt) {
440 IO index_io;
441 index_io.type = IO::IndexIO;
442 index_io.index = index;
443
444 ldpp_dout(dpp, 20) << __func__ <<
445 " removing entries from gc log shard index=" << index << ", size=" <<
446 rt.size() << ", entries=" << rt << dendl;
447
448 auto rt_guard = make_scope_guard(
449 [&]
450 {
451 rt.clear();
452 }
453 );
454
455 int ret = gc->remove(index, rt, &index_io.c);
456 if (ret < 0) {
457 /* we already cleared list of tags, this prevents us from
458 * ballooning in case of a persistent problem
459 */
460 ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" <<
461 index << " ret=" << ret << dendl;
462 return;
463 }
464 if (perfcounter) {
465 /* log the count of tags retired for rate estimation */
466 perfcounter->inc(l_rgw_gc_retire, rt.size());
467 }
468 ios.push_back(index_io);
469 }
470
471 void flush_remove_tags() {
472 int index = 0;
473 for (auto& rt : remove_tags) {
474 if (! gc->transitioned_objects_cache[index]) {
475 flush_remove_tags(index, rt);
476 }
477 ++index;
478 }
479 }
480
481 int remove_queue_entries(int index, int num_entries) {
482 int ret = gc->remove(index, num_entries);
483 if (ret < 0) {
484 ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
485 index << " ret=" << ret << dendl;
486 return ret;
487 }
488 if (perfcounter) {
489 /* log the count of tags retired for rate estimation */
490 perfcounter->inc(l_rgw_gc_retire, num_entries);
491 }
492 return 0;
493 }
494 }; // class RGWGCIOManger
495
496 int RGWGC::process(int index, int max_secs, bool expired_only,
497 RGWGCIOManager& io_manager)
498 {
499 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
500 index << ", max_secs=" << max_secs << ", expired_only=" <<
501 expired_only << dendl;
502
503 rados::cls::lock::Lock l(gc_index_lock_name);
504 utime_t end = ceph_clock_now();
505
506 /* max_secs should be greater than zero. We don't want a zero max_secs
507 * to be translated as no timeout, since we'd then need to break the
508 * lock and that would require a manual intervention. In this case
509 * we can just wait it out. */
510 if (max_secs <= 0)
511 return -EAGAIN;
512
513 end += max_secs;
514 utime_t time(max_secs, 0);
515 l.set_duration(time);
516
517 int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
518 if (ret == -EBUSY) { /* already locked by another gc processor */
519 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
520 obj_names[index] << dendl;
521 return 0;
522 }
523 if (ret < 0)
524 return ret;
525
526 string marker;
527 string next_marker;
528 bool truncated;
529 IoCtx *ctx = new IoCtx;
530 do {
531 int max = 100;
532 std::list<cls_rgw_gc_obj_info> entries;
533
534 int ret = 0;
535
536 if (! transitioned_objects_cache[index]) {
537 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
538 ldpp_dout(this, 20) <<
539 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
540 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
541 ", next_marker='" << next_marker << "'" << dendl;
542 obj_version objv;
543 cls_version_read(store->gc_pool_ctx, obj_names[index], &objv);
544 if ((objv.ver == 1) && entries.size() == 0) {
545 std::list<cls_rgw_gc_obj_info> non_expired_entries;
546 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, 1, false, non_expired_entries, &truncated, next_marker);
547 if (non_expired_entries.size() == 0) {
548 transitioned_objects_cache[index] = true;
549 marker.clear();
550 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl;
551 } else {
552 ret = 0;
553 goto done;
554 }
555 }
556 if ((objv.ver == 0) && (ret == -ENOENT || entries.size() == 0)) {
557 ret = 0;
558 goto done;
559 }
560 }
561
562 if (transitioned_objects_cache[index]) {
563 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
564 ldpp_dout(this, 20) <<
565 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret <<
566 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
567 ", next_marker='" << next_marker << "'" << dendl;
568 if (entries.size() == 0) {
569 ret = 0;
570 goto done;
571 }
572 }
573
574 if (ret < 0)
575 goto done;
576
577 marker = next_marker;
578
579 string last_pool;
580 std::list<cls_rgw_gc_obj_info>::iterator iter;
581 for (iter = entries.begin(); iter != entries.end(); ++iter) {
582 cls_rgw_gc_obj_info& info = *iter;
583
584 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
585 info.tag << "', time=" << info.time << ", chain.objs.size()=" <<
586 info.chain.objs.size() << dendl;
587
588 std::list<cls_rgw_obj>::iterator liter;
589 cls_rgw_obj_chain& chain = info.chain;
590
591 utime_t now = ceph_clock_now();
592 if (now >= end) {
593 goto done;
594 }
595 if (! transitioned_objects_cache[index]) {
596 if (chain.objs.empty()) {
597 io_manager.schedule_tag_removal(index, info.tag);
598 } else {
599 io_manager.add_tag_io_size(index, info.tag, chain.objs.size());
600 }
601 }
602 if (! chain.objs.empty()) {
603 for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
604 cls_rgw_obj& obj = *liter;
605
606 if (obj.pool != last_pool) {
607 delete ctx;
608 ctx = new IoCtx;
609 ret = rgw_init_ioctx(this, store->get_rados_handle(), obj.pool, *ctx);
610 if (ret < 0) {
611 if (transitioned_objects_cache[index]) {
612 goto done;
613 }
614 last_pool = "";
615 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
616 obj.pool << dendl;
617 continue;
618 }
619 last_pool = obj.pool;
620 }
621
622 ctx->locator_set_key(obj.loc);
623
624 const string& oid = obj.key.name; /* just stored raw oid there */
625
626 ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool <<
627 ":" << obj.key.name << dendl;
628 ObjectWriteOperation op;
629 cls_refcount_put(op, info.tag, true);
630
631 ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
632 if (ret < 0) {
633 ldpp_dout(this, 0) <<
634 "WARNING: failed to schedule deletion for oid=" << oid << dendl;
635 if (transitioned_objects_cache[index]) {
636 //If deleting oid failed for any of them, we will not delete queue entries
637 goto done;
638 }
639 }
640 if (going_down()) {
641 // leave early, even if tag isn't removed, it's ok since it
642 // will be picked up next time around
643 goto done;
644 }
645 } // chains loop
646 } // else -- chains not empty
647 } // entries loop
648 if (transitioned_objects_cache[index] && entries.size() > 0) {
649 ret = io_manager.drain_ios();
650 if (ret < 0) {
651 goto done;
652 }
653 //Remove the entries from the queue
654 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
655 ret = io_manager.remove_queue_entries(index, entries.size());
656 if (ret < 0) {
657 ldpp_dout(this, 0) <<
658 "WARNING: failed to remove queue entries" << dendl;
659 goto done;
660 }
661 }
662 } while (truncated);
663
664 done:
665 /* we don't drain here, because if we're going down we don't want to
666 * hold the system if backend is unresponsive
667 */
668 l.unlock(&store->gc_pool_ctx, obj_names[index]);
669 delete ctx;
670
671 return 0;
672 }
673
674 int RGWGC::process(bool expired_only)
675 {
676 int max_secs = cct->_conf->rgw_gc_processor_max_time;
677
678 const int start = ceph::util::generate_random_number(0, max_objs - 1);
679
680 RGWGCIOManager io_manager(this, store->ctx(), this);
681
682 for (int i = 0; i < max_objs; i++) {
683 int index = (i + start) % max_objs;
684 int ret = process(index, max_secs, expired_only, io_manager);
685 if (ret < 0)
686 return ret;
687 }
688 if (!going_down()) {
689 io_manager.drain();
690 }
691
692 return 0;
693 }
694
695 bool RGWGC::going_down()
696 {
697 return down_flag;
698 }
699
700 void RGWGC::start_processor()
701 {
702 worker = new GCWorker(this, cct, this);
703 worker->create("rgw_gc");
704 }
705
706 void RGWGC::stop_processor()
707 {
708 down_flag = true;
709 if (worker) {
710 worker->stop();
711 worker->join();
712 }
713 delete worker;
714 worker = NULL;
715 }
716
717 unsigned RGWGC::get_subsys() const
718 {
719 return dout_subsys;
720 }
721
722 std::ostream& RGWGC::gen_prefix(std::ostream& out) const
723 {
724 return out << "garbage collection: ";
725 }
726
727 void *RGWGC::GCWorker::entry() {
728 do {
729 utime_t start = ceph_clock_now();
730 ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;
731 int r = gc->process(true);
732 if (r < 0) {
733 ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
734 }
735 ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl;
736
737 if (gc->going_down())
738 break;
739
740 utime_t end = ceph_clock_now();
741 end -= start;
742 int secs = cct->_conf->rgw_gc_processor_period;
743
744 if (secs <= end.sec())
745 continue; // next round
746
747 secs -= end.sec();
748
749 std::unique_lock locker{lock};
750 cond.wait_for(locker, std::chrono::seconds(secs));
751 } while (!gc->going_down());
752
753 return NULL;
754 }
755
756 void RGWGC::GCWorker::stop()
757 {
758 std::lock_guard l{lock};
759 cond.notify_all();
760 }