]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_gc.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "rgw_gc.h"
5
6 #include "rgw_tools.h"
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
17
18 #include <list> // XXX
19 #include <sstream>
20 #include "xxhash.h"
21
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace std;
26 using namespace librados;
27
28 static string gc_oid_prefix = "gc";
29 static string gc_index_lock_name = "gc_process";
30
31 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
32 cct = _cct;
33 store = _store;
34
35 max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
36
37 obj_names = new string[max_objs];
38
39 for (int i = 0; i < max_objs; i++) {
40 obj_names[i] = gc_oid_prefix;
41 char buf[32];
42 snprintf(buf, 32, ".%d", i);
43 obj_names[i].append(buf);
44
45 auto it = transitioned_objects_cache.begin() + i;
46 transitioned_objects_cache.insert(it, false);
47
48 //version = 0 -> not ready for transition
49 //version = 1 -> marked ready for transition
50 librados::ObjectWriteOperation op;
51 op.create(false);
52 const uint64_t queue_size = cct->_conf->rgw_gc_max_queue_size, num_deferred_entries = cct->_conf->rgw_gc_max_deferred;
53 gc_log_init2(op, queue_size, num_deferred_entries);
54 store->gc_operate(this, obj_names[i], &op);
55 }
56 }
57
58 void RGWGC::finalize()
59 {
60 delete[] obj_names;
61 }
62
63 int RGWGC::tag_index(const string& tag)
64 {
65 return rgw_shards_mod(XXH64(tag.c_str(), tag.size(), seed), max_objs);
66 }
67
68 std::tuple<int, std::optional<cls_rgw_obj_chain>> RGWGC::send_split_chain(const cls_rgw_obj_chain& chain, const std::string& tag)
69 {
70 ldpp_dout(this, 20) << "RGWGC::send_split_chain - tag is: " << tag << dendl;
71
72 if (cct->_conf->rgw_max_chunk_size) {
73 cls_rgw_obj_chain broken_chain;
74 ldpp_dout(this, 20) << "RGWGC::send_split_chain - rgw_max_chunk_size is: " << cct->_conf->rgw_max_chunk_size << dendl;
75
76 for (auto it = chain.objs.begin(); it != chain.objs.end(); it++) {
77 ldpp_dout(this, 20) << "RGWGC::send_split_chain - adding obj with name: " << it->key << dendl;
78 broken_chain.objs.emplace_back(*it);
79 cls_rgw_gc_obj_info info;
80 info.tag = tag;
81 info.chain = broken_chain;
82 cls_rgw_gc_set_entry_op op;
83 op.info = info;
84 size_t total_encoded_size = op.estimate_encoded_size();
85 ldpp_dout(this, 20) << "RGWGC::send_split_chain - total_encoded_size is: " << total_encoded_size << dendl;
86
87 if (total_encoded_size > cct->_conf->rgw_max_chunk_size) { //dont add to chain, and send to gc
88 broken_chain.objs.pop_back();
89 --it;
90 ldpp_dout(this, 20) << "RGWGC::send_split_chain - more than, dont add to broken chain and send chain" << dendl;
91 auto ret = send_chain(broken_chain, tag);
92 if (ret < 0) {
93 broken_chain.objs.insert(broken_chain.objs.end(), it, chain.objs.end()); // add all the remainder objs to the list to be deleted inline
94 ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
95 return {ret, {broken_chain}};
96 }
97 broken_chain.objs.clear();
98 }
99 }
100 if (!broken_chain.objs.empty()) { //when the chain is smaller than or equal to rgw_max_chunk_size
101 ldpp_dout(this, 20) << "RGWGC::send_split_chain - sending leftover objects" << dendl;
102 auto ret = send_chain(broken_chain, tag);
103 if (ret < 0) {
104 ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
105 return {ret, {broken_chain}};
106 }
107 }
108 } else {
109 auto ret = send_chain(chain, tag);
110 if (ret < 0) {
111 ldpp_dout(this, 0) << "RGWGC::send_split_chain - send chain returned error: " << ret << dendl;
112 return {ret, {std::move(chain)}};
113 }
114 }
115 return {0, {}};
116 }
117
118 int RGWGC::send_chain(const cls_rgw_obj_chain& chain, const string& tag)
119 {
120 ObjectWriteOperation op;
121 cls_rgw_gc_obj_info info;
122 info.chain = chain;
123 info.tag = tag;
124 gc_log_enqueue2(op, cct->_conf->rgw_gc_obj_min_wait, info);
125
126 int i = tag_index(tag);
127
128 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names[i] << "tag is: " << tag << dendl;
129
130 auto ret = store->gc_operate(this, obj_names[i], &op);
131 if (ret != -ECANCELED && ret != -EPERM) {
132 return ret;
133 }
134 ObjectWriteOperation set_entry_op;
135 cls_rgw_gc_set_entry(set_entry_op, cct->_conf->rgw_gc_obj_min_wait, info);
136 return store->gc_operate(this, obj_names[i], &set_entry_op);
137 }
138
139 struct defer_chain_state {
140 librados::AioCompletion* completion = nullptr;
141 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
142 // RGWGC destructs before this completion fires
143 RGWGC* gc = nullptr;
144 cls_rgw_gc_obj_info info;
145
146 ~defer_chain_state() {
147 if (completion) {
148 completion->release();
149 }
150 }
151 };
152
153 static void async_defer_callback(librados::completion_t, void* arg)
154 {
155 std::unique_ptr<defer_chain_state> state{static_cast<defer_chain_state*>(arg)};
156 if (state->completion->get_return_value() == -ECANCELED) {
157 state->gc->on_defer_canceled(state->info);
158 }
159 }
160
161 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info& info)
162 {
163 const std::string& tag = info.tag;
164 const int i = tag_index(tag);
165
166 // ECANCELED from cls_version_check() tells us that we've transitioned
167 transitioned_objects_cache[i] = true;
168
169 ObjectWriteOperation op;
170 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
171 cls_rgw_gc_remove(op, {tag});
172
173 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
174 store->gc_aio_operate(obj_names[i], c, &op);
175 c->release();
176 }
177
178 int RGWGC::async_defer_chain(const string& tag, const cls_rgw_obj_chain& chain)
179 {
180 const int i = tag_index(tag);
181 cls_rgw_gc_obj_info info;
182 info.chain = chain;
183 info.tag = tag;
184
185 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
186 if (transitioned_objects_cache[i]) {
187 ObjectWriteOperation op;
188 cls_rgw_gc_queue_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
189
190 // this tag may still be present in omap, so remove it once the cls_rgw_gc
191 // enqueue succeeds
192 cls_rgw_gc_remove(op, {tag});
193
194 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
195 int ret = store->gc_aio_operate(obj_names[i], c, &op);
196 c->release();
197 return ret;
198 }
199
200 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
201 ObjectWriteOperation op;
202
203 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
204 // from writing new entries to omap after the transition
205 gc_log_defer1(op, cct->_conf->rgw_gc_obj_min_wait, info);
206
207 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
208 auto state = std::make_unique<defer_chain_state>();
209 state->gc = this;
210 state->info.chain = chain;
211 state->info.tag = tag;
212 state->completion = librados::Rados::aio_create_completion(
213 state.get(), async_defer_callback);
214
215 int ret = store->gc_aio_operate(obj_names[i], state->completion, &op);
216 if (ret == 0) {
217 state.release(); // release ownership until async_defer_callback()
218 }
219 return ret;
220 }
221
222 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
223 {
224 ObjectWriteOperation op;
225 cls_rgw_gc_remove(op, tags);
226
227 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
228 int ret = store->gc_aio_operate(obj_names[index], c, &op);
229 if (ret < 0) {
230 c->release();
231 } else {
232 *pc = c;
233 }
234 return ret;
235 }
236
237 int RGWGC::remove(int index, int num_entries)
238 {
239 ObjectWriteOperation op;
240 cls_rgw_gc_queue_remove_entries(op, num_entries);
241
242 return store->gc_operate(this, obj_names[index], &op);
243 }
244
245 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated, bool& processing_queue)
246 {
247 result.clear();
248 string next_marker;
249 bool check_queue = false;
250
251 for (; *index < max_objs && result.size() < max; (*index)++, marker.clear(), check_queue = false) {
252 std::list<cls_rgw_gc_obj_info> entries, queue_entries;
253 int ret = 0;
254
255 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
256 if (! transitioned_objects_cache[*index] && ! check_queue && ! processing_queue) {
257 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
258 if (ret != -ENOENT && ret < 0) {
259 return ret;
260 }
261 obj_version objv;
262 cls_version_read(store->gc_pool_ctx, obj_names[*index], &objv);
263 if (ret == -ENOENT || entries.size() == 0) {
264 if (objv.ver == 0) {
265 continue;
266 } else {
267 if (! expired_only) {
268 transitioned_objects_cache[*index] = true;
269 marker.clear();
270 } else {
271 std::list<cls_rgw_gc_obj_info> non_expired_entries;
272 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, 1, false, non_expired_entries, truncated, next_marker);
273 if (non_expired_entries.size() == 0) {
274 transitioned_objects_cache[*index] = true;
275 marker.clear();
276 }
277 }
278 }
279 }
280 if ((objv.ver == 1) && (entries.size() < max - result.size())) {
281 check_queue = true;
282 marker.clear();
283 }
284 }
285 if (transitioned_objects_cache[*index] || check_queue || processing_queue) {
286 processing_queue = false;
287 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[*index], marker, (max - result.size()) - entries.size(), expired_only, queue_entries, truncated, next_marker);
288 if (ret < 0) {
289 return ret;
290 }
291 }
292 if (entries.size() == 0 && queue_entries.size() == 0)
293 continue;
294
295 std::list<cls_rgw_gc_obj_info>::iterator iter;
296 for (iter = entries.begin(); iter != entries.end(); ++iter) {
297 result.push_back(*iter);
298 }
299
300 for (iter = queue_entries.begin(); iter != queue_entries.end(); ++iter) {
301 result.push_back(*iter);
302 }
303
304 marker = next_marker;
305
306 if (*index == max_objs - 1) {
307 if (queue_entries.size() > 0 && *truncated) {
308 processing_queue = true;
309 } else {
310 processing_queue = false;
311 }
312 /* we cut short here, truncated will hold the correct value */
313 return 0;
314 }
315
316 if (result.size() == max) {
317 if (queue_entries.size() > 0 && *truncated) {
318 processing_queue = true;
319 } else {
320 processing_queue = false;
321 *index += 1; //move to next gc object
322 }
323
324 /* close approximation, it might be that the next of the objects don't hold
325 * anything, in this case truncated should have been false, but we can find
326 * that out on the next iteration
327 */
328 *truncated = true;
329 return 0;
330 }
331 }
332 *truncated = false;
333 processing_queue = false;
334
335 return 0;
336 }
337
338 class RGWGCIOManager {
339 const DoutPrefixProvider* dpp;
340 CephContext *cct;
341 RGWGC *gc;
342
343 struct IO {
344 enum Type {
345 UnknownIO = 0,
346 TailIO = 1,
347 IndexIO = 2,
348 } type{UnknownIO};
349 librados::AioCompletion *c{nullptr};
350 string oid;
351 int index{-1};
352 string tag;
353 };
354
355 deque<IO> ios;
356 vector<std::vector<string> > remove_tags;
357 /* tracks the number of remaining shadow objects for a given tag in order to
358 * only remove the tag once all shadow objects have themselves been removed
359 */
360 vector<map<string, size_t> > tag_io_size;
361
362 #define MAX_AIO_DEFAULT 10
363 size_t max_aio{MAX_AIO_DEFAULT};
364
365 public:
366 RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
367 cct(_cct),
368 gc(_gc) {
369 max_aio = cct->_conf->rgw_gc_max_concurrent_io;
370 remove_tags.resize(min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()));
371 tag_io_size.resize(min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()));
372 }
373
374 ~RGWGCIOManager() {
375 for (auto io : ios) {
376 io.c->release();
377 }
378 }
379
380 int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
381 int index, const string& tag) {
382 while (ios.size() > max_aio) {
383 if (gc->going_down()) {
384 return 0;
385 }
386 auto ret = handle_next_completion();
387 //Return error if we are using queue, else ignore it
388 if (gc->transitioned_objects_cache[index] && ret < 0) {
389 return ret;
390 }
391 }
392
393 auto c = librados::Rados::aio_create_completion(nullptr, nullptr);
394 int ret = ioctx->aio_operate(oid, c, op);
395 if (ret < 0) {
396 return ret;
397 }
398 ios.push_back(IO{IO::TailIO, c, oid, index, tag});
399
400 return 0;
401 }
402
403 int handle_next_completion() {
404 ceph_assert(!ios.empty());
405 IO& io = ios.front();
406 io.c->wait_for_complete();
407 int ret = io.c->get_return_value();
408 io.c->release();
409
410 if (ret == -ENOENT) {
411 ret = 0;
412 }
413
414 if (io.type == IO::IndexIO && ! gc->transitioned_objects_cache[io.index]) {
415 if (ret < 0) {
416 ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
417 io.index << " returned error, ret=" << ret << dendl;
418 }
419 goto done;
420 }
421
422 if (ret < 0) {
423 ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid <<
424 ", ret=" << ret << dendl;
425 goto done;
426 }
427
428 if (! gc->transitioned_objects_cache[io.index]) {
429 schedule_tag_removal(io.index, io.tag);
430 }
431
432 done:
433 ios.pop_front();
434 return ret;
435 }
436
437 /* This is a request to schedule a tag removal. It will be called once when
438 * there are no shadow objects. But it will also be called for every shadow
439 * object when there are any. Since we do not want the tag to be removed
440 * until all shadow objects have been successfully removed, the scheduling
441 * will not happen until the shadow object count goes down to zero
442 */
443 void schedule_tag_removal(int index, string tag) {
444 auto& ts = tag_io_size[index];
445 auto ts_it = ts.find(tag);
446 if (ts_it != ts.end()) {
447 auto& size = ts_it->second;
448 --size;
449 // wait all shadow obj delete return
450 if (size != 0)
451 return;
452
453 ts.erase(ts_it);
454 }
455
456 auto& rt = remove_tags[index];
457
458 rt.push_back(tag);
459 if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
460 flush_remove_tags(index, rt);
461 }
462 }
463
464 void add_tag_io_size(int index, string tag, size_t size) {
465 auto& ts = tag_io_size[index];
466 ts.emplace(tag, size);
467 }
468
469 int drain_ios() {
470 int ret_val = 0;
471 while (!ios.empty()) {
472 if (gc->going_down()) {
473 return -EAGAIN;
474 }
475 auto ret = handle_next_completion();
476 if (ret < 0) {
477 ret_val = ret;
478 }
479 }
480 return ret_val;
481 }
482
483 void drain() {
484 drain_ios();
485 flush_remove_tags();
486 /* the tags draining might have generated more ios, drain those too */
487 drain_ios();
488 }
489
490 void flush_remove_tags(int index, vector<string>& rt) {
491 IO index_io;
492 index_io.type = IO::IndexIO;
493 index_io.index = index;
494
495 ldpp_dout(dpp, 20) << __func__ <<
496 " removing entries from gc log shard index=" << index << ", size=" <<
497 rt.size() << ", entries=" << rt << dendl;
498
499 auto rt_guard = make_scope_guard(
500 [&]
501 {
502 rt.clear();
503 }
504 );
505
506 int ret = gc->remove(index, rt, &index_io.c);
507 if (ret < 0) {
508 /* we already cleared list of tags, this prevents us from
509 * ballooning in case of a persistent problem
510 */
511 ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" <<
512 index << " ret=" << ret << dendl;
513 return;
514 }
515 if (perfcounter) {
516 /* log the count of tags retired for rate estimation */
517 perfcounter->inc(l_rgw_gc_retire, rt.size());
518 }
519 ios.push_back(index_io);
520 }
521
522 void flush_remove_tags() {
523 int index = 0;
524 for (auto& rt : remove_tags) {
525 if (! gc->transitioned_objects_cache[index]) {
526 flush_remove_tags(index, rt);
527 }
528 ++index;
529 }
530 }
531
532 int remove_queue_entries(int index, int num_entries) {
533 int ret = gc->remove(index, num_entries);
534 if (ret < 0) {
535 ldpp_dout(dpp, 0) << "ERROR: failed to remove queue entries on index=" <<
536 index << " ret=" << ret << dendl;
537 return ret;
538 }
539 if (perfcounter) {
540 /* log the count of tags retired for rate estimation */
541 perfcounter->inc(l_rgw_gc_retire, num_entries);
542 }
543 return 0;
544 }
545 }; // class RGWGCIOManger
546
547 int RGWGC::process(int index, int max_secs, bool expired_only,
548 RGWGCIOManager& io_manager)
549 {
550 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
551 index << ", max_secs=" << max_secs << ", expired_only=" <<
552 expired_only << dendl;
553
554 rados::cls::lock::Lock l(gc_index_lock_name);
555 utime_t end = ceph_clock_now();
556
557 /* max_secs should be greater than zero. We don't want a zero max_secs
558 * to be translated as no timeout, since we'd then need to break the
559 * lock and that would require a manual intervention. In this case
560 * we can just wait it out. */
561 if (max_secs <= 0)
562 return -EAGAIN;
563
564 end += max_secs;
565 utime_t time(max_secs, 0);
566 l.set_duration(time);
567
568 int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
569 if (ret == -EBUSY) { /* already locked by another gc processor */
570 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
571 obj_names[index] << dendl;
572 return 0;
573 }
574 if (ret < 0)
575 return ret;
576
577 string marker;
578 string next_marker;
579 bool truncated;
580 IoCtx *ctx = new IoCtx;
581 do {
582 int max = 100;
583 std::list<cls_rgw_gc_obj_info> entries;
584
585 int ret = 0;
586
587 if (! transitioned_objects_cache[index]) {
588 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
589 ldpp_dout(this, 20) <<
590 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
591 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
592 ", next_marker='" << next_marker << "'" << dendl;
593 obj_version objv;
594 cls_version_read(store->gc_pool_ctx, obj_names[index], &objv);
595 if ((objv.ver == 1) && entries.size() == 0) {
596 std::list<cls_rgw_gc_obj_info> non_expired_entries;
597 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, 1, false, non_expired_entries, &truncated, next_marker);
598 if (non_expired_entries.size() == 0) {
599 transitioned_objects_cache[index] = true;
600 marker.clear();
601 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl;
602 } else {
603 ret = 0;
604 goto done;
605 }
606 }
607 if ((objv.ver == 0) && (ret == -ENOENT || entries.size() == 0)) {
608 ret = 0;
609 goto done;
610 }
611 }
612
613 if (transitioned_objects_cache[index]) {
614 ret = cls_rgw_gc_queue_list_entries(store->gc_pool_ctx, obj_names[index], marker, max, expired_only, entries, &truncated, next_marker);
615 ldpp_dout(this, 20) <<
616 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret <<
617 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
618 ", next_marker='" << next_marker << "'" << dendl;
619 if (entries.size() == 0) {
620 ret = 0;
621 goto done;
622 }
623 }
624
625 if (ret < 0)
626 goto done;
627
628 marker = next_marker;
629
630 string last_pool;
631 std::list<cls_rgw_gc_obj_info>::iterator iter;
632 for (iter = entries.begin(); iter != entries.end(); ++iter) {
633 cls_rgw_gc_obj_info& info = *iter;
634
635 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
636 info.tag << "', time=" << info.time << ", chain.objs.size()=" <<
637 info.chain.objs.size() << dendl;
638
639 std::list<cls_rgw_obj>::iterator liter;
640 cls_rgw_obj_chain& chain = info.chain;
641
642 utime_t now = ceph_clock_now();
643 if (now >= end) {
644 goto done;
645 }
646 if (! transitioned_objects_cache[index]) {
647 if (chain.objs.empty()) {
648 io_manager.schedule_tag_removal(index, info.tag);
649 } else {
650 io_manager.add_tag_io_size(index, info.tag, chain.objs.size());
651 }
652 }
653 if (! chain.objs.empty()) {
654 for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
655 cls_rgw_obj& obj = *liter;
656
657 if (obj.pool != last_pool) {
658 delete ctx;
659 ctx = new IoCtx;
660 ret = rgw_init_ioctx(this, store->get_rados_handle(), obj.pool, *ctx);
661 if (ret < 0) {
662 if (transitioned_objects_cache[index]) {
663 goto done;
664 }
665 last_pool = "";
666 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
667 obj.pool << dendl;
668 continue;
669 }
670 last_pool = obj.pool;
671 }
672
673 ctx->locator_set_key(obj.loc);
674
675 const string& oid = obj.key.name; /* just stored raw oid there */
676
677 ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool <<
678 ":" << obj.key.name << dendl;
679 ObjectWriteOperation op;
680 cls_refcount_put(op, info.tag, true);
681
682 ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
683 if (ret < 0) {
684 ldpp_dout(this, 0) <<
685 "WARNING: failed to schedule deletion for oid=" << oid << dendl;
686 if (transitioned_objects_cache[index]) {
687 //If deleting oid failed for any of them, we will not delete queue entries
688 goto done;
689 }
690 }
691 if (going_down()) {
692 // leave early, even if tag isn't removed, it's ok since it
693 // will be picked up next time around
694 goto done;
695 }
696 } // chains loop
697 } // else -- chains not empty
698 } // entries loop
699 if (transitioned_objects_cache[index] && entries.size() > 0) {
700 ret = io_manager.drain_ios();
701 if (ret < 0) {
702 goto done;
703 }
704 //Remove the entries from the queue
705 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker << dendl;
706 ret = io_manager.remove_queue_entries(index, entries.size());
707 if (ret < 0) {
708 ldpp_dout(this, 0) <<
709 "WARNING: failed to remove queue entries" << dendl;
710 goto done;
711 }
712 }
713 } while (truncated);
714
715 done:
716 /* we don't drain here, because if we're going down we don't want to
717 * hold the system if backend is unresponsive
718 */
719 l.unlock(&store->gc_pool_ctx, obj_names[index]);
720 delete ctx;
721
722 return 0;
723 }
724
725 int RGWGC::process(bool expired_only)
726 {
727 int max_secs = cct->_conf->rgw_gc_processor_max_time;
728
729 const int start = ceph::util::generate_random_number(0, max_objs - 1);
730
731 RGWGCIOManager io_manager(this, store->ctx(), this);
732
733 for (int i = 0; i < max_objs; i++) {
734 int index = (i + start) % max_objs;
735 int ret = process(index, max_secs, expired_only, io_manager);
736 if (ret < 0)
737 return ret;
738 }
739 if (!going_down()) {
740 io_manager.drain();
741 }
742
743 return 0;
744 }
745
746 bool RGWGC::going_down()
747 {
748 return down_flag;
749 }
750
751 void RGWGC::start_processor()
752 {
753 worker = new GCWorker(this, cct, this);
754 worker->create("rgw_gc");
755 }
756
757 void RGWGC::stop_processor()
758 {
759 down_flag = true;
760 if (worker) {
761 worker->stop();
762 worker->join();
763 }
764 delete worker;
765 worker = NULL;
766 }
767
768 unsigned RGWGC::get_subsys() const
769 {
770 return dout_subsys;
771 }
772
773 std::ostream& RGWGC::gen_prefix(std::ostream& out) const
774 {
775 return out << "garbage collection: ";
776 }
777
778 void *RGWGC::GCWorker::entry() {
779 do {
780 utime_t start = ceph_clock_now();
781 ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;
782 int r = gc->process(true);
783 if (r < 0) {
784 ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
785 }
786 ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl;
787
788 if (gc->going_down())
789 break;
790
791 utime_t end = ceph_clock_now();
792 end -= start;
793 int secs = cct->_conf->rgw_gc_processor_period;
794
795 if (secs <= end.sec())
796 continue; // next round
797
798 secs -= end.sec();
799
800 std::unique_lock locker{lock};
801 cond.wait_for(locker, std::chrono::seconds(secs));
802 } while (!gc->going_down());
803
804 return NULL;
805 }
806
807 void RGWGC::GCWorker::stop()
808 {
809 std::lock_guard l{lock};
810 cond.notify_all();
811 }