]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_gc.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / rgw / rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_gc.h"
5
6 #include "rgw_tools.h"
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
17
18 #include <list> // XXX
19 #include <sstream>
20
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
23
24 using namespace librados;
25
26 static string gc_oid_prefix = "gc";
27 static string gc_index_lock_name = "gc_process";
28
29 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
30 cct = _cct;
31 store = _store;
32
33 max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
34
35 obj_names = new string[max_objs];
36
37 for (int i = 0; i < max_objs; i++) {
38 obj_names[i] = gc_oid_prefix;
39 char buf[32];
40 snprintf(buf, 32, ".%d", i);
41 obj_names[i].append(buf);
42
43 auto it = transitioned_objects_cache.begin() + i;
44 transitioned_objects_cache.insert(it, false);
45
46 //version = 0 -> not ready for transition
47 //version = 1 -> marked ready for transition
48 librados::ObjectWriteOperation op;
49 op.create(false);
50 const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
51 gc_log_init2(op, queue_size, num_deferred_entries);
52 store->gc_operate(obj_names[i], &op);
53 }
54 }
55
56 void RGWGC::finalize()
57 {
58 delete[] obj_names;
59 }
60
61 int RGWGC::tag_index(const string& tag)
62 {
63 return rgw_shard_id(tag, max_objs);
64 }
65
66 int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag)
67 {
68 ObjectWriteOperation op;
69 cls_rgw_gc_obj_info info;
70 info.chain = chain;
71 info.tag = tag;
72 gc_log_enqueue2(op, cct->_conf->rgw_gc_obj_min_wait, info);
73
74 int i = tag_index(tag);
75
76 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
77
78 auto ret = store->gc_operate(obj_names[i], &op);
79 if (ret != -ECANCELED && ret != -EPERM) {
80 return ret;
81 }
82 ObjectWriteOperation set_entry_op;
83 cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
84 return store->gc_operate(obj_names[i], &set_entry_op);
85 }
86
87 struct defer_chain_state {
88 librados::AioCompletion* completion = nullptr;
89 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
90 // RGWGC destructs before this completion fires
91 RGWGC* gc = nullptr;
92 cls_rgw_gc_obj_info info;
93
94 ~defer_chain_state() {
95 if (completion) {
96 completion->release();
97 }
98 }
99 };
100
101 static void async_defer_callback(librados::completion_t, void* arg)
102 {
103 std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
104 if (state->completion->get_return_value() == -ECANCELED) {
105 state->gc->on_defer_canceled(state->info);
106 }
107 }
108
109 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
110 {
111 const std::string& tag = info.tag;
112 const int i = tag_index(tag);
113
114 // ECANCELED from cls_version_check() tells us that we've transitioned
115 transitioned_objects_cache[i] = true;
116
117 ObjectWriteOperation op;
118 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
119 cls_rgw_gc_remove(op, {tag});
120
121 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
122 store->gc_aio_operate(obj_names[i], c, &op);
123 c->release();
124 }
125
126 int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
127 {
128 const int i = tag_index(tag);
129 cls_rgw_gc_obj_info info;
130 info.chain = chain;
131 info.tag = tag;
132
133 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
134 if (transitioned_objects_cache[i]) {
135 ObjectWriteOperation op;
136 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
137
138 // this tag may still be present in omap, so remove it once the cls_rgw_gc
139 // enqueue succeeds
140 cls_rgw_gc_remove(op, {tag});
141
142 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
143 int ret = store->gc_aio_operate(obj_names[i], c, &op);
144 c->release();
145 return ret;
146 }
147
148 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
149 ObjectWriteOperation op;
150
151 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
152 // from writing new entries to omap after the transition
153 gc_log_defer1(op, cct->_conf->rgw_gc_obj_min_wait, info);
154
155 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
156 auto state = std::make_unique<defer_chain_state>();
157 state->gc = this;
158 state->info.chain = chain;
159 state->info.tag = tag;
160 state->completion = librados::Rados::aio_create_completion(
161 state.get(), async_defer_callback);
162
163 int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
164 if (ret == 0) {
165 state.release(); // release ownership until async_defer_callback()
166 }
167 return ret;
168 }
169
170 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
171 {
172 ObjectWriteOperation op;
173 cls_rgw_gc_remove(op, tags);
174
175 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
176 int ret = store->gc_aio_operate(obj_names[index], c, &op);
177 if (ret < 0) {
178 c->release();
179 } else {
180 *pc = c;
181 }
182 return ret;
183 }
184
185 int RGWGC::remove(int index, int num_entries)
186 {
187 ObjectWriteOperation op;
188 cls_rgw_gc_queue_remove_entries(op, num_entries);
189
190 return store->gc_operate(obj_names[index], &op);
191 }
192
193 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
194 {
195 result.clear();
196 string next_marker;
197 bool check_queue = false;
198
199 for (; *index < max_objs && result.size() < max; (*index)++, marker.clear(), check_queue = false) {
200 std::list<cls_rgw_gc_obj_info> entries, queue_entries;
201 int ret = 0;
202
203 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
204 if (! transitioned_objects_cache[*index] && ! check_queue && ! processing_queue) {
205 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
206 if (ret != -ENOENT && ret < 0) {
207 return ret;
208 }
209 obj_version objv;
210 cls_version_read(store->gc_pool_ctx, obj_names[*index], &objv);
211 if (ret == -ENOENT || entries.size() == 0) {
212 if (objv.ver == 0) {
213 continue;
214 } else {
215 if (! expired_only) {
216 transitioned_objects_cache[*index] = true;
217 marker.clear();
218 } else {
219 std::list<cls_rgw_gc_obj_info> non_expired_entries;
220 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, 1, false, non_expired_entries, truncated, next_marker);
221 if (non_expired_entries.size() == 0) {
222 transitioned_objects_cache[*index] = true;
223 marker.clear();
224 }
225 }
226 }
227 }
228 if ((objv.ver == 1) && (entries.size() < max - result.size())) {
229 check_queue = true;
230 marker.clear();
231 }
232 }
233 if (transitioned_objects_cache[*index] || check_queue || processing_queue) {
234 processing_queue = false;
235 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[*index], marker, (max - result.size()) - entries.size(), expired_only, queue_entries, truncated, next_marker);
236 if (ret < 0) {
237 return ret;
238 }
239 }
240 if (entries.size() == 0 && queue_entries.size() == 0)
241 continue;
242
243 std::list<cls_rgw_gc_obj_info>::iterator iter;
244 for (iter = entries.begin(); iter != entries.end(); ++iter) {
245 result.push_back(*iter);
246 }
247
248 for (iter = queue_entries.begin(); iter != queue_entries.end(); ++iter) {
249 result.push_back(*iter);
250 }
251
252 marker = next_marker;
253
254 if (*index == max_objs - 1) {
255 if (queue_entries.size() > 0 && *truncated) {
256 processing_queue = true;
257 } else {
258 processing_queue = false;
259 }
260 /* we cut short here, truncated will hold the correct value */
261 return 0;
262 }
263
264 if (result.size() == max) {
265 if (queue_entries.size() > 0 && *truncated) {
266 processing_queue = true;
267 } else {
268 processing_queue = false;
269 *index += 1; //move to next gc object
270 }
271
272 /* close approximation, it might be that the next of the objects don't hold
273 * anything, in this case truncated should have been false, but we can find
274 * that out on the next iteration
275 */
276 *truncated = true;
277 return 0;
278 }
279 }
280 *truncated = false;
281 processing_queue = false;
282
283 return 0;
284 }
285
286 class RGWGCIOManager {
287 const DoutPrefixProvider* dpp;
288 CephContext *cct;
289 RGWGC *gc;
290
291 struct IO {
292 enum Type {
293 UnknownIO = 0,
294 TailIO = 1,
295 IndexIO = 2,
296 } type{UnknownIO};
297 librados::AioCompletion *c{nullptr};
298 string oid;
299 int index{-1};
300 string tag;
301 };
302
303 deque<IO> ios;
304 vector<std::vector<string> > remove_tags;
305 /* tracks the number of remaining shadow objects for a given tag in order to
306 * only remove the tag once all shadow objects have themselves been removed
307 */
308 vector<map<string, size_t> > tag_io_size;
309
310 #define MAX_AIO_DEFAULT 10
311 size_t max_aio{MAX_AIO_DEFAULT};
312
313 public:
314 RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
315 cct(_cct),
316 gc(_gc),
317 remove_tags(cct->_conf->rgw_gc_max_objs),
318 tag_io_size(cct->_conf->rgw_gc_max_objs) {
319 max_aio = cct->_conf->rgw_gc_max_concurrent_io;
320 }
321
322 ~RGWGCIOManager() {
323 for (auto io : ios) {
324 io.c->release();
325 }
326 }
327
328 int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
329 int index, const string& tag) {
330 while (ios.size() > max_aio) {
331 if (gc->going_down()) {
332 return 0;
333 }
334 auto ret = handle_next_completion();
335 //Return error if we are using queue, else ignore it
336 if (gc->transitioned_objects_cache[index] && ret < 0) {
337 return ret;
338 }
339 }
340
341 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
342 int ret = ioctx->aio_operate(oid, c, op);
343 if (ret < 0) {
344 return ret;
345 }
346 ios.push_back(IO{IO::TailIO, c, oid, index, tag});
347
348 return 0;
349 }
350
351 int handle_next_completion() {
352 ceph_assert(!ios.empty());
353 IO& io = ios.front();
354 io.c->wait_for_complete();
355 int ret = io.c->get_return_value();
356 io.c->release();
357
358 if (ret == -ENOENT) {
359 ret = 0;
360 }
361
362 if (io.type == IO::IndexIO && ! gc->transitioned_objects_cache[io.index]) {
363 if (ret < 0) {
364 ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
365 io.index << " returned error, ret=" << ret << dendl;
366 }
367 goto done;
368 }
369
370 if (ret < 0) {
371 ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid <<
372 ", ret=" << ret << dendl;
373 goto done;
374 }
375
376 if (! gc->transitioned_objects_cache[io.index]) {
377 schedule_tag_removal(io.index, io.tag);
378 }
379
380 done:
381 ios.pop_front();
382 return ret;
383 }
384
385 /* This is a request to schedule a tag removal. It will be called once when
386 * there are no shadow objects. But it will also be called for every shadow
387 * object when there are any. Since we do not want the tag to be removed
388 * until all shadow objects have been successfully removed, the scheduling
389 * will not happen until the shadow object count goes down to zero
390 */
391 void schedule_tag_removal(int index, string tag) {
392 auto& ts = tag_io_size[index];
393 auto ts_it = ts.find(tag);
394 if (ts_it != ts.end()) {
395 auto& size = ts_it->second;
396 --size;
397 // wait all shadow obj delete return
398 if (size != 0)
399 return;
400
401 ts.erase(ts_it);
402 }
403
404 auto& rt = remove_tags[index];
405
406 rt.push_back(tag);
407 if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
408 flush_remove_tags(index, rt);
409 }
410 }
411
412 void add_tag_io_size(int index, string tag, size_t size) {
413 auto& ts = tag_io_size[index];
414 ts.emplace(tag, size);
415 }
416
417 int drain_ios() {
418 int ret_val = 0;
419 while (!ios.empty()) {
420 if (gc->going_down()) {
421 return -EAGAIN;
422 }
423 auto ret = handle_next_completion();
424 if (ret < 0) {
425 ret_val = ret;
426 }
427 }
428 return ret_val;
429 }
430
431 void drain() {
432 drain_ios();
433 flush_remove_tags();
434 /* the tags draining might have generated more ios, drain those too */
435 drain_ios();
436 }
437
438 void flush_remove_tags(int index, vector<string>& rt) {
439 IO index_io;
440 index_io.type = IO::IndexIO;
441 index_io.index = index;
442
443 ldpp_dout(dpp, 20) << __func__ <<
444 " removing entries from gc log shard index=" << index << ", size=" <<
445 rt.size() << ", entries=" << rt << dendl;
446
447 auto rt_guard = make_scope_guard(
448 [&]
449 {
450 rt.clear();
451 }
452 );
453
454 int ret = gc->remove(index, rt, &index_io.c);
455 if (ret < 0) {
456 /* we already cleared list of tags, this prevents us from
457 * ballooning in case of a persistent problem
458 */
459 ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" <<
460 index << " ret=" << ret << dendl;
461 return;
462 }
463 if (perfcounter) {
464 /* log the count of tags retired for rate estimation */
465 perfcounter->inc(l_rgw_gc_retire, rt.size());
466 }
467 ios.push_back(index_io);
468 }
469
470 void flush_remove_tags() {
471 int index = 0;
472 for (auto& rt : remove_tags) {
473 if (! gc->transitioned_objects_cache[index]) {
474 flush_remove_tags(index, rt);
475 }
476 ++index;
477 }
478 }
479
480 int remove_queue_entries(int index, int num_entries) {
481 int ret = gc->remove(index, num_entries);
482 if (ret < 0) {
483 ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
484 index << " ret=" << ret << dendl;
485 return ret;
486 }
487 if (perfcounter) {
488 /* log the count of tags retired for rate estimation */
489 perfcounter->inc(l_rgw_gc_retire, num_entries);
490 }
491 return 0;
492 }
493 }; // class RGWGCIOManger
494
495 int RGWGC::process(int index, int max_secs, bool expired_only,
496 RGWGCIOManager& io_manager)
497 {
498 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
499 index << ", max_secs=" << max_secs << ", expired_only=" <<
500 expired_only << dendl;
501
502 rados::cls::lock::Lock l(gc_index_lock_name);
503 utime_t end = ceph_clock_now();
504
505 /* max_secs should be greater than zero. We don't want a zero max_secs
506 * to be translated as no timeout, since we'd then need to break the
507 * lock and that would require a manual intervention. In this case
508 * we can just wait it out. */
509 if (max_secs <= 0)
510 return -EAGAIN;
511
512 end += max_secs;
513 utime_t time(max_secs, 0);
514 l.set_duration(time);
515
516 int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
517 if (ret == -EBUSY) { /* already locked by another gc processor */
518 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
519 obj_names[index] << dendl;
520 return 0;
521 }
522 if (ret < 0)
523 return ret;
524
525 string marker;
526 string next_marker;
527 bool truncated;
528 IoCtx *ctx = new IoCtx;
529 do {
530 int max = 100;
531 std::list<cls_rgw_gc_obj_info> entries;
532
533 int ret = 0;
534
535 if (! transitioned_objects_cache[index]) {
536 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
537 ldpp_dout(this, 20) <<
538 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
539 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
540 ", next_marker='" << next_marker << "'" << dendl;
541 obj_version objv;
542 cls_version_read(store->gc_pool_ctx, obj_names[index], &objv);
543 if ((objv.ver == 1) && entries.size() == 0) {
544 std::list<cls_rgw_gc_obj_info> non_expired_entries;
545 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, 1, false, non_expired_entries, &truncated, next_marker);
546 if (non_expired_entries.size() == 0) {
547 transitioned_objects_cache[index] = true;
548 marker.clear();
549 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl;
550 } else {
551 ret = 0;
552 goto done;
553 }
554 }
555 if ((objv.ver == 0) && (ret == -ENOENT || entries.size() == 0)) {
556 ret = 0;
557 goto done;
558 }
559 }
560
561 if (transitioned_objects_cache[index]) {
562 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
563 ldpp_dout(this, 20) <<
564 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret <<
565 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
566 ", next_marker='" << next_marker << "'" << dendl;
567 if (entries.size() == 0) {
568 ret = 0;
569 goto done;
570 }
571 }
572
573 if (ret < 0)
574 goto done;
575
576 marker = next_marker;
577
578 string last_pool;
579 std::list<cls_rgw_gc_obj_info>::iterator iter;
580 for (iter = entries.begin(); iter != entries.end(); ++iter) {
581 cls_rgw_gc_obj_info& info = *iter;
582
583 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
584 info.tag << "', time=" << info.time << ", chain.objs.size()=" <<
585 info.chain.objs.size() << dendl;
586
587 std::list<cls_rgw_obj>::iterator liter;
588 cls_rgw_obj_chain& chain = info.chain;
589
590 utime_t now = ceph_clock_now();
591 if (now >= end) {
592 goto done;
593 }
594 if (! transitioned_objects_cache[index]) {
595 if (chain.objs.empty()) {
596 io_manager.schedule_tag_removal(index, info.tag);
597 } else {
598 io_manager.add_tag_io_size(index, info.tag, chain.objs.size());
599 }
600 }
601 if (! chain.objs.empty()) {
602 for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
603 cls_rgw_obj& obj = *liter;
604
605 if (obj.pool != last_pool) {
606 delete ctx;
607 ctx = new IoCtx;
608 ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
609 if (ret < 0) {
610 if (transitioned_objects_cache[index]) {
611 goto done;
612 }
613 last_pool = "";
614 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
615 obj.pool << dendl;
616 continue;
617 }
618 last_pool = obj.pool;
619 }
620
621 ctx->locator_set_key(obj.loc);
622
623 const string& oid = obj.key.name; /* just stored raw oid there */
624
625 ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool <<
626 ":" << obj.key.name << dendl;
627 ObjectWriteOperation op;
628 cls_refcount_put(op, info.tag, true);
629
630 ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
631 if (ret < 0) {
632 ldpp_dout(this, 0) <<
633 "WARNING: failed to schedule deletion for oid=" << oid << dendl;
634 if (transitioned_objects_cache[index]) {
635 //If deleting oid failed for any of them, we will not delete queue entries
636 goto done;
637 }
638 }
639 if (going_down()) {
640 // leave early, even if tag isn't removed, it's ok since it
641 // will be picked up next time around
642 goto done;
643 }
644 } // chains loop
645 } // else -- chains not empty
646 } // entries loop
647 if (transitioned_objects_cache[index] && entries.size() > 0) {
648 ret = io_manager.drain_ios();
649 if (ret < 0) {
650 goto done;
651 }
652 //Remove the entries from the queue
653 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
654 ret = io_manager.remove_queue_entries(index, entries.size());
655 if (ret < 0) {
656 ldpp_dout(this, 0) <<
657 "WARNING: failed to remove queue entries" << dendl;
658 goto done;
659 }
660 }
661 } while (truncated);
662
663 done:
664 /* we don't drain here, because if we're going down we don't want to
665 * hold the system if backend is unresponsive
666 */
667 l.unlock(&store->gc_pool_ctx, obj_names[index]);
668 delete ctx;
669
670 return 0;
671 }
672
673 int RGWGC::process(bool expired_only)
674 {
675 int max_secs = cct->_conf->rgw_gc_processor_max_time;
676
677 const int start = ceph::util::generate_random_number(0, max_objs - 1);
678
679 RGWGCIOManager io_manager(this, store->ctx(), this);
680
681 for (int i = 0; i < max_objs; i++) {
682 int index = (i + start) % max_objs;
683 int ret = process(index, max_secs, expired_only, io_manager);
684 if (ret < 0)
685 return ret;
686 }
687 if (!going_down()) {
688 io_manager.drain();
689 }
690
691 return 0;
692 }
693
694 bool RGWGC::going_down()
695 {
696 return down_flag;
697 }
698
699 void RGWGC::start_processor()
700 {
701 worker = new GCWorker(this, cct, this);
702 worker->create("rgw_gc");
703 }
704
705 void RGWGC::stop_processor()
706 {
707 down_flag = true;
708 if (worker) {
709 worker->stop();
710 worker->join();
711 }
712 delete worker;
713 worker = NULL;
714 }
715
716 unsigned RGWGC::get_subsys() const
717 {
718 return dout_subsys;
719 }
720
721 std::ostream& RGWGC::gen_prefix(std::ostream& out) const
722 {
723 return out << "garbage collection: ";
724 }
725
726 void *RGWGC::GCWorker::entry() {
727 do {
728 utime_t start = ceph_clock_now();
729 ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;
730 int r = gc->process(true);
731 if (r < 0) {
732 ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
733 }
734 ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl;
735
736 if (gc->going_down())
737 break;
738
739 utime_t end = ceph_clock_now();
740 end -= start;
741 int secs = cct->_conf->rgw_gc_processor_period;
742
743 if (secs <= end.sec())
744 continue; // next round
745
746 secs -= end.sec();
747
748 std::unique_lock locker{lock};
749 cond.wait_for(locker, std::chrono::seconds(secs));
750 } while (!gc->going_down());
751
752 return NULL;
753 }
754
755 void RGWGC::GCWorker::stop()
756 {
757 std::lock_guard l{lock};
758 cond.notify_all();
759 }