1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
18 #include <list> // XXX
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
24 using namespace librados
;
26 static string gc_oid_prefix
= "gc";
27 static string gc_index_lock_name
= "gc_process";
29 void RGWGC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
33 max_objs
= min(static_cast<int>(cct
->_conf
->rgw_gc_max_objs
), rgw_shards_max());
35 obj_names
= new string
[max_objs
];
37 for (int i
= 0; i
< max_objs
; i
++) {
38 obj_names
[i
] = gc_oid_prefix
;
40 snprintf(buf
, 32, ".%d", i
);
41 obj_names
[i
].append(buf
);
43 auto it
= transitioned_objects_cache
.begin() + i
;
44 transitioned_objects_cache
.insert(it
, false);
46 //version = 0 -> not ready for transition
47 //version = 1 -> marked ready for transition
48 librados::ObjectWriteOperation op
;
50 const uint64_t queue_size
= cct
->_conf
->rgw_gc_max_queue_size
, num_deferred_entries
= cct
->_conf
->rgw_gc_max_deferred
;
51 gc_log_init2(op
, queue_size
, num_deferred_entries
);
52 store
->gc_operate(obj_names
[i
], &op
);
56 void RGWGC::finalize()
61 int RGWGC::tag_index(const string
& tag
)
63 return rgw_shard_id(tag
, max_objs
);
66 int RGWGC::send_chain(cls_rgw_obj_chain
& chain
, const string
& tag
)
68 ObjectWriteOperation op
;
69 cls_rgw_gc_obj_info info
;
72 gc_log_enqueue2(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
74 int i
= tag_index(tag
);
76 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names
[i
] << "tag is: " << tag
<< dendl
;
78 auto ret
= store
->gc_operate(obj_names
[i
], &op
);
79 if (ret
!= -ECANCELED
&& ret
!= -EPERM
) {
82 ObjectWriteOperation set_entry_op
;
83 cls_rgw_gc_set_entry(set_entry_op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
84 return store
->gc_operate(obj_names
[i
], &set_entry_op
);
87 struct defer_chain_state
{
88 librados::AioCompletion
* completion
= nullptr;
89 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
90 // RGWGC destructs before this completion fires
92 cls_rgw_gc_obj_info info
;
94 ~defer_chain_state() {
96 completion
->release();
101 static void async_defer_callback(librados::completion_t
, void* arg
)
103 std::unique_ptr
<defer_chain_state
> state
{static_cast<defer_chain_state
*>(arg
)};
104 if (state
->completion
->get_return_value() == -ECANCELED
) {
105 state
->gc
->on_defer_canceled(state
->info
);
109 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info
& info
)
111 const std::string
& tag
= info
.tag
;
112 const int i
= tag_index(tag
);
114 // ECANCELED from cls_version_check() tells us that we've transitioned
115 transitioned_objects_cache
[i
] = true;
117 ObjectWriteOperation op
;
118 cls_rgw_gc_queue_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
119 cls_rgw_gc_remove(op
, {tag
});
121 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
122 store
->gc_aio_operate(obj_names
[i
], c
, &op
);
126 int RGWGC::async_defer_chain(const string
& tag
, const cls_rgw_obj_chain
& chain
)
128 const int i
= tag_index(tag
);
129 cls_rgw_gc_obj_info info
;
133 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
134 if (transitioned_objects_cache
[i
]) {
135 ObjectWriteOperation op
;
136 cls_rgw_gc_queue_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
138 // this tag may still be present in omap, so remove it once the cls_rgw_gc
140 cls_rgw_gc_remove(op
, {tag
});
142 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
143 int ret
= store
->gc_aio_operate(obj_names
[i
], c
, &op
);
148 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
149 ObjectWriteOperation op
;
151 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
152 // from writing new entries to omap after the transition
153 gc_log_defer1(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
155 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
156 auto state
= std::make_unique
<defer_chain_state
>();
158 state
->info
.chain
= chain
;
159 state
->info
.tag
= tag
;
160 state
->completion
= librados::Rados::aio_create_completion(
161 state
.get(), async_defer_callback
);
163 int ret
= store
->gc_aio_operate(obj_names
[i
], state
->completion
, &op
);
165 state
.release(); // release ownership until async_defer_callback()
170 int RGWGC::remove(int index
, const std::vector
<string
>& tags
, AioCompletion
**pc
)
172 ObjectWriteOperation op
;
173 cls_rgw_gc_remove(op
, tags
);
175 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
176 int ret
= store
->gc_aio_operate(obj_names
[index
], c
, &op
);
185 int RGWGC::remove(int index
, int num_entries
)
187 ObjectWriteOperation op
;
188 cls_rgw_gc_queue_remove_entries(op
, num_entries
);
190 return store
->gc_operate(obj_names
[index
], &op
);
193 int RGWGC::list(int *index
, string
& marker
, uint32_t max
, bool expired_only
, std::list
<cls_rgw_gc_obj_info
>& result
, bool *truncated
, bool& processing_queue
)
197 bool check_queue
= false;
199 for (; *index
< max_objs
&& result
.size() < max
; (*index
)++, marker
.clear(), check_queue
= false) {
200 std::list
<cls_rgw_gc_obj_info
> entries
, queue_entries
;
203 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
204 if (! transitioned_objects_cache
[*index
] && ! check_queue
&& ! processing_queue
) {
205 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, max
- result
.size(), expired_only
, entries
, truncated
, next_marker
);
206 if (ret
!= -ENOENT
&& ret
< 0) {
210 cls_version_read(store
->gc_pool_ctx
, obj_names
[*index
], &objv
);
211 if (ret
== -ENOENT
|| entries
.size() == 0) {
215 if (! expired_only
) {
216 transitioned_objects_cache
[*index
] = true;
219 std::list
<cls_rgw_gc_obj_info
> non_expired_entries
;
220 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, 1, false, non_expired_entries
, truncated
, next_marker
);
221 if (non_expired_entries
.size() == 0) {
222 transitioned_objects_cache
[*index
] = true;
228 if ((objv
.ver
== 1) && (entries
.size() < max
- result
.size())) {
233 if (transitioned_objects_cache
[*index
] || check_queue
|| processing_queue
) {
234 processing_queue
= false;
235 ret
= cls_rgw_gc_queue_list_entries(store
->gc_pool_ctx
, obj_names
[*index
], marker
, (max
- result
.size()) - entries
.size(), expired_only
, queue_entries
, truncated
, next_marker
);
240 if (entries
.size() == 0 && queue_entries
.size() == 0)
243 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
244 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
245 result
.push_back(*iter
);
248 for (iter
= queue_entries
.begin(); iter
!= queue_entries
.end(); ++iter
) {
249 result
.push_back(*iter
);
252 marker
= next_marker
;
254 if (*index
== max_objs
- 1) {
255 if (queue_entries
.size() > 0 && *truncated
) {
256 processing_queue
= true;
258 processing_queue
= false;
260 /* we cut short here, truncated will hold the correct value */
264 if (result
.size() == max
) {
265 if (queue_entries
.size() > 0 && *truncated
) {
266 processing_queue
= true;
268 processing_queue
= false;
269 *index
+= 1; //move to next gc object
272 /* close approximation, it might be that the next of the objects don't hold
273 * anything, in this case truncated should have been false, but we can find
274 * that out on the next iteration
281 processing_queue
= false;
286 class RGWGCIOManager
{
287 const DoutPrefixProvider
* dpp
;
297 librados::AioCompletion
*c
{nullptr};
304 vector
<std::vector
<string
> > remove_tags
;
305 /* tracks the number of remaining shadow objects for a given tag in order to
306 * only remove the tag once all shadow objects have themselves been removed
308 vector
<map
<string
, size_t> > tag_io_size
;
310 #define MAX_AIO_DEFAULT 10
311 size_t max_aio
{MAX_AIO_DEFAULT
};
314 RGWGCIOManager(const DoutPrefixProvider
* _dpp
, CephContext
*_cct
, RGWGC
*_gc
) : dpp(_dpp
),
317 remove_tags(cct
->_conf
->rgw_gc_max_objs
),
318 tag_io_size(cct
->_conf
->rgw_gc_max_objs
) {
319 max_aio
= cct
->_conf
->rgw_gc_max_concurrent_io
;
323 for (auto io
: ios
) {
328 int schedule_io(IoCtx
*ioctx
, const string
& oid
, ObjectWriteOperation
*op
,
329 int index
, const string
& tag
) {
330 while (ios
.size() > max_aio
) {
331 if (gc
->going_down()) {
334 auto ret
= handle_next_completion();
335 //Return error if we are using queue, else ignore it
336 if (gc
->transitioned_objects_cache
[index
] && ret
< 0) {
341 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
342 int ret
= ioctx
->aio_operate(oid
, c
, op
);
346 ios
.push_back(IO
{IO::TailIO
, c
, oid
, index
, tag
});
351 int handle_next_completion() {
352 ceph_assert(!ios
.empty());
353 IO
& io
= ios
.front();
354 io
.c
->wait_for_complete();
355 int ret
= io
.c
->get_return_value();
358 if (ret
== -ENOENT
) {
362 if (io
.type
== IO::IndexIO
&& ! gc
->transitioned_objects_cache
[io
.index
]) {
364 ldpp_dout(dpp
, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
365 io
.index
<< " returned error, ret=" << ret
<< dendl
;
371 ldpp_dout(dpp
, 0) << "WARNING: gc could not remove oid=" << io
.oid
<<
372 ", ret=" << ret
<< dendl
;
376 if (! gc
->transitioned_objects_cache
[io
.index
]) {
377 schedule_tag_removal(io
.index
, io
.tag
);
385 /* This is a request to schedule a tag removal. It will be called once when
386 * there are no shadow objects. But it will also be called for every shadow
387 * object when there are any. Since we do not want the tag to be removed
388 * until all shadow objects have been successfully removed, the scheduling
389 * will not happen until the shadow object count goes down to zero
391 void schedule_tag_removal(int index
, string tag
) {
392 auto& ts
= tag_io_size
[index
];
393 auto ts_it
= ts
.find(tag
);
394 if (ts_it
!= ts
.end()) {
395 auto& size
= ts_it
->second
;
397 // wait all shadow obj delete return
404 auto& rt
= remove_tags
[index
];
407 if (rt
.size() >= (size_t)cct
->_conf
->rgw_gc_max_trim_chunk
) {
408 flush_remove_tags(index
, rt
);
412 void add_tag_io_size(int index
, string tag
, size_t size
) {
413 auto& ts
= tag_io_size
[index
];
414 ts
.emplace(tag
, size
);
419 while (!ios
.empty()) {
420 if (gc
->going_down()) {
423 auto ret
= handle_next_completion();
434 /* the tags draining might have generated more ios, drain those too */
438 void flush_remove_tags(int index
, vector
<string
>& rt
) {
440 index_io
.type
= IO::IndexIO
;
441 index_io
.index
= index
;
443 ldpp_dout(dpp
, 20) << __func__
<<
444 " removing entries from gc log shard index=" << index
<< ", size=" <<
445 rt
.size() << ", entries=" << rt
<< dendl
;
447 auto rt_guard
= make_scope_guard(
454 int ret
= gc
->remove(index
, rt
, &index_io
.c
);
456 /* we already cleared list of tags, this prevents us from
457 * ballooning in case of a persistent problem
459 ldpp_dout(dpp
, 0) << "WARNING: failed to remove tags on gc shard index=" <<
460 index
<< " ret=" << ret
<< dendl
;
464 /* log the count of tags retired for rate estimation */
465 perfcounter
->inc(l_rgw_gc_retire
, rt
.size());
467 ios
.push_back(index_io
);
470 void flush_remove_tags() {
472 for (auto& rt
: remove_tags
) {
473 if (! gc
->transitioned_objects_cache
[index
]) {
474 flush_remove_tags(index
, rt
);
480 int remove_queue_entries(int index
, int num_entries
) {
481 int ret
= gc
->remove(index
, num_entries
);
483 ldpp_dout(dpp
, 0) << "ERROR: failed to remove queue entries on index=" <<
484 index
<< " ret=" << ret
<< dendl
;
488 /* log the count of tags retired for rate estimation */
489 perfcounter
->inc(l_rgw_gc_retire
, num_entries
);
493 }; // class RGWGCIOManger
495 int RGWGC::process(int index
, int max_secs
, bool expired_only
,
496 RGWGCIOManager
& io_manager
)
498 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
499 index
<< ", max_secs=" << max_secs
<< ", expired_only=" <<
500 expired_only
<< dendl
;
502 rados::cls::lock::Lock
l(gc_index_lock_name
);
503 utime_t end
= ceph_clock_now();
505 /* max_secs should be greater than zero. We don't want a zero max_secs
506 * to be translated as no timeout, since we'd then need to break the
507 * lock and that would require a manual intervention. In this case
508 * we can just wait it out. */
513 utime_t
time(max_secs
, 0);
514 l
.set_duration(time
);
516 int ret
= l
.lock_exclusive(&store
->gc_pool_ctx
, obj_names
[index
]);
517 if (ret
== -EBUSY
) { /* already locked by another gc processor */
518 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
519 obj_names
[index
] << dendl
;
528 IoCtx
*ctx
= new IoCtx
;
531 std::list
<cls_rgw_gc_obj_info
> entries
;
535 if (! transitioned_objects_cache
[index
]) {
536 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
, expired_only
, entries
, &truncated
, next_marker
);
537 ldpp_dout(this, 20) <<
538 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret
<<
539 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
540 ", next_marker='" << next_marker
<< "'" << dendl
;
542 cls_version_read(store
->gc_pool_ctx
, obj_names
[index
], &objv
);
543 if ((objv
.ver
== 1) && entries
.size() == 0) {
544 std::list
<cls_rgw_gc_obj_info
> non_expired_entries
;
545 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, 1, false, non_expired_entries
, &truncated
, next_marker
);
546 if (non_expired_entries
.size() == 0) {
547 transitioned_objects_cache
[index
] = true;
549 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl
;
555 if ((objv
.ver
== 0) && (ret
== -ENOENT
|| entries
.size() == 0)) {
561 if (transitioned_objects_cache
[index
]) {
562 ret
= cls_rgw_gc_queue_list_entries(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
, expired_only
, entries
, &truncated
, next_marker
);
563 ldpp_dout(this, 20) <<
564 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret
<<
565 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
566 ", next_marker='" << next_marker
<< "'" << dendl
;
567 if (entries
.size() == 0) {
576 marker
= next_marker
;
579 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
580 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
581 cls_rgw_gc_obj_info
& info
= *iter
;
583 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
584 info
.tag
<< "', time=" << info
.time
<< ", chain.objs.size()=" <<
585 info
.chain
.objs
.size() << dendl
;
587 std::list
<cls_rgw_obj
>::iterator liter
;
588 cls_rgw_obj_chain
& chain
= info
.chain
;
590 utime_t now
= ceph_clock_now();
594 if (! transitioned_objects_cache
[index
]) {
595 if (chain
.objs
.empty()) {
596 io_manager
.schedule_tag_removal(index
, info
.tag
);
598 io_manager
.add_tag_io_size(index
, info
.tag
, chain
.objs
.size());
601 if (! chain
.objs
.empty()) {
602 for (liter
= chain
.objs
.begin(); liter
!= chain
.objs
.end(); ++liter
) {
603 cls_rgw_obj
& obj
= *liter
;
605 if (obj
.pool
!= last_pool
) {
608 ret
= rgw_init_ioctx(store
->get_rados_handle(), obj
.pool
, *ctx
);
610 if (transitioned_objects_cache
[index
]) {
614 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
618 last_pool
= obj
.pool
;
621 ctx
->locator_set_key(obj
.loc
);
623 const string
& oid
= obj
.key
.name
; /* just stored raw oid there */
625 ldpp_dout(this, 5) << "RGWGC::process removing " << obj
.pool
<<
626 ":" << obj
.key
.name
<< dendl
;
627 ObjectWriteOperation op
;
628 cls_refcount_put(op
, info
.tag
, true);
630 ret
= io_manager
.schedule_io(ctx
, oid
, &op
, index
, info
.tag
);
632 ldpp_dout(this, 0) <<
633 "WARNING: failed to schedule deletion for oid=" << oid
<< dendl
;
634 if (transitioned_objects_cache
[index
]) {
635 //If deleting oid failed for any of them, we will not delete queue entries
640 // leave early, even if tag isn't removed, it's ok since it
641 // will be picked up next time around
645 } // else -- chains not empty
647 if (transitioned_objects_cache
[index
] && entries
.size() > 0) {
648 ret
= io_manager
.drain_ios();
652 //Remove the entries from the queue
653 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker
<< dendl
;
654 ret
= io_manager
.remove_queue_entries(index
, entries
.size());
656 ldpp_dout(this, 0) <<
657 "WARNING: failed to remove queue entries" << dendl
;
664 /* we don't drain here, because if we're going down we don't want to
665 * hold the system if backend is unresponsive
667 l
.unlock(&store
->gc_pool_ctx
, obj_names
[index
]);
673 int RGWGC::process(bool expired_only
)
675 int max_secs
= cct
->_conf
->rgw_gc_processor_max_time
;
677 const int start
= ceph::util::generate_random_number(0, max_objs
- 1);
679 RGWGCIOManager
io_manager(this, store
->ctx(), this);
681 for (int i
= 0; i
< max_objs
; i
++) {
682 int index
= (i
+ start
) % max_objs
;
683 int ret
= process(index
, max_secs
, expired_only
, io_manager
);
694 bool RGWGC::going_down()
699 void RGWGC::start_processor()
701 worker
= new GCWorker(this, cct
, this);
702 worker
->create("rgw_gc");
705 void RGWGC::stop_processor()
716 unsigned RGWGC::get_subsys() const
721 std::ostream
& RGWGC::gen_prefix(std::ostream
& out
) const
723 return out
<< "garbage collection: ";
726 void *RGWGC::GCWorker::entry() {
728 utime_t start
= ceph_clock_now();
729 ldpp_dout(dpp
, 2) << "garbage collection: start" << dendl
;
730 int r
= gc
->process(true);
732 ldpp_dout(dpp
, 0) << "ERROR: garbage collection process() returned error r=" << r
<< dendl
;
734 ldpp_dout(dpp
, 2) << "garbage collection: stop" << dendl
;
736 if (gc
->going_down())
739 utime_t end
= ceph_clock_now();
741 int secs
= cct
->_conf
->rgw_gc_processor_period
;
743 if (secs
<= end
.sec())
744 continue; // next round
748 std::unique_lock locker
{lock
};
749 cond
.wait_for(locker
, std::chrono::seconds(secs
));
750 } while (!gc
->going_down());
755 void RGWGC::GCWorker::stop()
757 std::lock_guard l
{lock
};