1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
18 #include <list> // XXX
21 #define dout_context g_ceph_context
22 #define dout_subsys ceph_subsys_rgw
24 using namespace librados
;
26 static string gc_oid_prefix
= "gc";
27 static string gc_index_lock_name
= "gc_process";
29 void RGWGC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
33 max_objs
= min(static_cast<int>(cct
->_conf
->rgw_gc_max_objs
), rgw_shards_max());
35 obj_names
= new string
[max_objs
];
37 for (int i
= 0; i
< max_objs
; i
++) {
38 obj_names
[i
] = gc_oid_prefix
;
40 snprintf(buf
, 32, ".%d", i
);
41 obj_names
[i
].append(buf
);
43 auto it
= transitioned_objects_cache
.begin() + i
;
44 transitioned_objects_cache
.insert(it
, false);
46 //version = 0 -> not ready for transition
47 //version = 1 -> marked ready for transition
48 librados::ObjectWriteOperation op
;
50 const uint64_t queue_size
= cct
->_conf
->rgw_gc_max_queue_size
, num_deferred_entries
= cct
->_conf
->rgw_gc_max_deferred
;
51 gc_log_init2(op
, queue_size
, num_deferred_entries
);
52 store
->gc_operate(obj_names
[i
], &op
);
56 void RGWGC::finalize()
61 int RGWGC::tag_index(const string
& tag
)
63 return rgw_shard_id(tag
, max_objs
);
66 int RGWGC::send_chain(cls_rgw_obj_chain
& chain
, const string
& tag
)
68 ObjectWriteOperation op
;
69 cls_rgw_gc_obj_info info
;
72 gc_log_enqueue2(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
74 int i
= tag_index(tag
);
76 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names
[i
] << "tag is: " << tag
<< dendl
;
78 auto ret
= store
->gc_operate(obj_names
[i
], &op
);
79 if (ret
!= -ECANCELED
&& ret
!= -EPERM
) {
82 ObjectWriteOperation set_entry_op
;
83 cls_rgw_gc_set_entry(set_entry_op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
84 return store
->gc_operate(obj_names
[i
], &set_entry_op
);
87 struct defer_chain_state
{
88 librados::AioCompletion
* completion
= nullptr;
89 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
90 // RGWGC destructs before this completion fires
92 cls_rgw_gc_obj_info info
;
94 ~defer_chain_state() {
96 completion
->release();
101 static void async_defer_callback(librados::completion_t
, void* arg
)
103 std::unique_ptr
<defer_chain_state
> state
{static_cast<defer_chain_state
*>(arg
)};
104 if (state
->completion
->get_return_value() == -ECANCELED
) {
105 state
->gc
->on_defer_canceled(state
->info
);
109 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info
& info
)
111 const std::string
& tag
= info
.tag
;
112 const int i
= tag_index(tag
);
114 // ECANCELED from cls_version_check() tells us that we've transitioned
115 transitioned_objects_cache
[i
] = true;
117 ObjectWriteOperation op
;
118 cls_rgw_gc_queue_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
119 cls_rgw_gc_remove(op
, {tag
});
121 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
122 store
->gc_aio_operate(obj_names
[i
], c
, &op
);
126 int RGWGC::async_defer_chain(const string
& tag
, const cls_rgw_obj_chain
& chain
)
128 const int i
= tag_index(tag
);
129 cls_rgw_gc_obj_info info
;
133 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
134 if (transitioned_objects_cache
[i
]) {
135 ObjectWriteOperation op
;
136 cls_rgw_gc_queue_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
138 // this tag may still be present in omap, so remove it once the cls_rgw_gc
140 cls_rgw_gc_remove(op
, {tag
});
142 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
143 int ret
= store
->gc_aio_operate(obj_names
[i
], c
, &op
);
148 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
149 ObjectWriteOperation op
;
151 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
152 // from writing new entries to omap after the transition
153 gc_log_defer1(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
155 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
156 auto state
= std::make_unique
<defer_chain_state
>();
158 state
->info
.chain
= chain
;
159 state
->info
.tag
= tag
;
160 state
->completion
= librados::Rados::aio_create_completion(
161 state
.get(), async_defer_callback
);
163 int ret
= store
->gc_aio_operate(obj_names
[i
], state
->completion
, &op
);
165 state
.release(); // release ownership until async_defer_callback()
170 int RGWGC::remove(int index
, const std::vector
<string
>& tags
, AioCompletion
**pc
)
172 ObjectWriteOperation op
;
173 cls_rgw_gc_remove(op
, tags
);
175 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
176 int ret
= store
->gc_aio_operate(obj_names
[index
], c
, &op
);
185 int RGWGC::remove(int index
, int num_entries
)
187 ObjectWriteOperation op
;
188 cls_rgw_gc_queue_remove_entries(op
, num_entries
);
190 return store
->gc_operate(obj_names
[index
], &op
);
193 int RGWGC::list(int *index
, string
& marker
, uint32_t max
, bool expired_only
, std::list
<cls_rgw_gc_obj_info
>& result
, bool *truncated
, bool& processing_queue
)
197 bool check_queue
= false;
199 for (; *index
< max_objs
&& result
.size() < max
; (*index
)++, marker
.clear(), check_queue
= false) {
200 std::list
<cls_rgw_gc_obj_info
> entries
, queue_entries
;
203 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
204 if (! transitioned_objects_cache
[*index
] && ! check_queue
&& ! processing_queue
) {
205 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, max
- result
.size(), expired_only
, entries
, truncated
, next_marker
);
206 if (ret
!= -ENOENT
&& ret
< 0) {
210 cls_version_read(store
->gc_pool_ctx
, obj_names
[*index
], &objv
);
211 if (ret
== -ENOENT
) {
215 if (! expired_only
) {
216 transitioned_objects_cache
[*index
] = true;
219 std::list
<cls_rgw_gc_obj_info
> non_expired_entries
;
220 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, 1, false, non_expired_entries
, truncated
, next_marker
);
221 if (non_expired_entries
.size() == 0) {
222 transitioned_objects_cache
[*index
] = true;
228 if ((objv
.ver
== 1) && (entries
.size() < max
- result
.size())) {
233 if (transitioned_objects_cache
[*index
] || check_queue
|| processing_queue
) {
234 processing_queue
= false;
235 ret
= cls_rgw_gc_queue_list_entries(store
->gc_pool_ctx
, obj_names
[*index
], marker
, (max
- result
.size()) - entries
.size(), expired_only
, queue_entries
, truncated
, next_marker
);
240 if (entries
.size() == 0 && queue_entries
.size() == 0)
243 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
244 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
245 result
.push_back(*iter
);
248 for (iter
= queue_entries
.begin(); iter
!= queue_entries
.end(); ++iter
) {
249 result
.push_back(*iter
);
252 marker
= next_marker
;
254 if (*index
== max_objs
- 1) {
255 /* we cut short here, truncated will hold the correct value */
259 if (result
.size() == max
) {
260 if (queue_entries
.size() > 0 && *truncated
) {
261 processing_queue
= true;
263 processing_queue
= false;
264 *index
+= 1; //move to next gc object
267 /* close approximation, it might be that the next of the objects don't hold
268 * anything, in this case truncated should have been false, but we can find
269 * that out on the next iteration
276 processing_queue
= false;
281 class RGWGCIOManager
{
282 const DoutPrefixProvider
* dpp
;
292 librados::AioCompletion
*c
{nullptr};
299 vector
<std::vector
<string
> > remove_tags
;
300 /* tracks the number of remaining shadow objects for a given tag in order to
301 * only remove the tag once all shadow objects have themselves been removed
303 vector
<map
<string
, size_t> > tag_io_size
;
305 #define MAX_AIO_DEFAULT 10
306 size_t max_aio
{MAX_AIO_DEFAULT
};
309 RGWGCIOManager(const DoutPrefixProvider
* _dpp
, CephContext
*_cct
, RGWGC
*_gc
) : dpp(_dpp
),
312 remove_tags(cct
->_conf
->rgw_gc_max_objs
),
313 tag_io_size(cct
->_conf
->rgw_gc_max_objs
) {
314 max_aio
= cct
->_conf
->rgw_gc_max_concurrent_io
;
318 for (auto io
: ios
) {
323 int schedule_io(IoCtx
*ioctx
, const string
& oid
, ObjectWriteOperation
*op
,
324 int index
, const string
& tag
) {
325 while (ios
.size() > max_aio
) {
326 if (gc
->going_down()) {
329 auto ret
= handle_next_completion();
330 //Return error if we are using queue, else ignore it
331 if (gc
->transitioned_objects_cache
[index
] && ret
< 0) {
336 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
337 int ret
= ioctx
->aio_operate(oid
, c
, op
);
341 ios
.push_back(IO
{IO::TailIO
, c
, oid
, index
, tag
});
346 int handle_next_completion() {
347 ceph_assert(!ios
.empty());
348 IO
& io
= ios
.front();
349 io
.c
->wait_for_complete();
350 int ret
= io
.c
->get_return_value();
353 if (ret
== -ENOENT
) {
357 if (io
.type
== IO::IndexIO
&& ! gc
->transitioned_objects_cache
[io
.index
]) {
359 ldpp_dout(dpp
, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
360 io
.index
<< " returned error, ret=" << ret
<< dendl
;
366 ldpp_dout(dpp
, 0) << "WARNING: gc could not remove oid=" << io
.oid
<<
367 ", ret=" << ret
<< dendl
;
371 if (! gc
->transitioned_objects_cache
[io
.index
]) {
372 schedule_tag_removal(io
.index
, io
.tag
);
380 /* This is a request to schedule a tag removal. It will be called once when
381 * there are no shadow objects. But it will also be called for every shadow
382 * object when there are any. Since we do not want the tag to be removed
383 * until all shadow objects have been successfully removed, the scheduling
384 * will not happen until the shadow object count goes down to zero
386 void schedule_tag_removal(int index
, string tag
) {
387 auto& ts
= tag_io_size
[index
];
388 auto ts_it
= ts
.find(tag
);
389 if (ts_it
!= ts
.end()) {
390 auto& size
= ts_it
->second
;
392 // wait all shadow obj delete return
399 auto& rt
= remove_tags
[index
];
402 if (rt
.size() >= (size_t)cct
->_conf
->rgw_gc_max_trim_chunk
) {
403 flush_remove_tags(index
, rt
);
407 void add_tag_io_size(int index
, string tag
, size_t size
) {
408 auto& ts
= tag_io_size
[index
];
409 ts
.emplace(tag
, size
);
414 while (!ios
.empty()) {
415 if (gc
->going_down()) {
418 auto ret
= handle_next_completion();
429 /* the tags draining might have generated more ios, drain those too */
433 void flush_remove_tags(int index
, vector
<string
>& rt
) {
435 index_io
.type
= IO::IndexIO
;
436 index_io
.index
= index
;
438 ldpp_dout(dpp
, 20) << __func__
<<
439 " removing entries from gc log shard index=" << index
<< ", size=" <<
440 rt
.size() << ", entries=" << rt
<< dendl
;
442 auto rt_guard
= make_scope_guard(
449 int ret
= gc
->remove(index
, rt
, &index_io
.c
);
451 /* we already cleared list of tags, this prevents us from
452 * ballooning in case of a persistent problem
454 ldpp_dout(dpp
, 0) << "WARNING: failed to remove tags on gc shard index=" <<
455 index
<< " ret=" << ret
<< dendl
;
459 /* log the count of tags retired for rate estimation */
460 perfcounter
->inc(l_rgw_gc_retire
, rt
.size());
462 ios
.push_back(index_io
);
465 void flush_remove_tags() {
467 for (auto& rt
: remove_tags
) {
468 if (! gc
->transitioned_objects_cache
[index
]) {
469 flush_remove_tags(index
, rt
);
475 int remove_queue_entries(int index
, int num_entries
) {
476 int ret
= gc
->remove(index
, num_entries
);
478 ldpp_dout(dpp
, 0) << "ERROR: failed to remove queue entries on index=" <<
479 index
<< " ret=" << ret
<< dendl
;
484 }; // class RGWGCIOManger
486 int RGWGC::process(int index
, int max_secs
, bool expired_only
,
487 RGWGCIOManager
& io_manager
)
489 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
490 index
<< ", max_secs=" << max_secs
<< ", expired_only=" <<
491 expired_only
<< dendl
;
493 rados::cls::lock::Lock
l(gc_index_lock_name
);
494 utime_t end
= ceph_clock_now();
496 /* max_secs should be greater than zero. We don't want a zero max_secs
497 * to be translated as no timeout, since we'd then need to break the
498 * lock and that would require a manual intervention. In this case
499 * we can just wait it out. */
504 utime_t
time(max_secs
, 0);
505 l
.set_duration(time
);
507 int ret
= l
.lock_exclusive(&store
->gc_pool_ctx
, obj_names
[index
]);
508 if (ret
== -EBUSY
) { /* already locked by another gc processor */
509 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
510 obj_names
[index
] << dendl
;
519 IoCtx
*ctx
= new IoCtx
;
522 std::list
<cls_rgw_gc_obj_info
> entries
;
526 if (! transitioned_objects_cache
[index
]) {
527 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
, expired_only
, entries
, &truncated
, next_marker
);
528 ldpp_dout(this, 20) <<
529 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret
<<
530 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
531 ", next_marker='" << next_marker
<< "'" << dendl
;
533 cls_version_read(store
->gc_pool_ctx
, obj_names
[index
], &objv
);
534 if ((objv
.ver
== 1) && entries
.size() == 0) {
535 std::list
<cls_rgw_gc_obj_info
> non_expired_entries
;
536 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, 1, false, non_expired_entries
, &truncated
, next_marker
);
537 if (non_expired_entries
.size() == 0) {
538 transitioned_objects_cache
[index
] = true;
540 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned ENOENT for non expired entries, so setting cache entry to TRUE" << dendl
;
546 if ((objv
.ver
== 0) && (ret
== -ENOENT
)) {
552 if (transitioned_objects_cache
[index
]) {
553 ret
= cls_rgw_gc_queue_list_entries(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
, expired_only
, entries
, &truncated
, next_marker
);
554 ldpp_dout(this, 20) <<
555 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret
<<
556 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
557 ", next_marker='" << next_marker
<< "'" << dendl
;
558 if (entries
.size() == 0) {
567 marker
= next_marker
;
570 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
571 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
572 cls_rgw_gc_obj_info
& info
= *iter
;
574 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
575 info
.tag
<< "', time=" << info
.time
<< ", chain.objs.size()=" <<
576 info
.chain
.objs
.size() << dendl
;
578 std::list
<cls_rgw_obj
>::iterator liter
;
579 cls_rgw_obj_chain
& chain
= info
.chain
;
581 utime_t now
= ceph_clock_now();
585 if (! transitioned_objects_cache
[index
]) {
586 if (chain
.objs
.empty()) {
587 io_manager
.schedule_tag_removal(index
, info
.tag
);
589 io_manager
.add_tag_io_size(index
, info
.tag
, chain
.objs
.size());
592 if (! chain
.objs
.empty()) {
593 for (liter
= chain
.objs
.begin(); liter
!= chain
.objs
.end(); ++liter
) {
594 cls_rgw_obj
& obj
= *liter
;
596 if (obj
.pool
!= last_pool
) {
599 ret
= rgw_init_ioctx(store
->get_rados_handle(), obj
.pool
, *ctx
);
601 if (transitioned_objects_cache
[index
]) {
605 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
609 last_pool
= obj
.pool
;
612 ctx
->locator_set_key(obj
.loc
);
614 const string
& oid
= obj
.key
.name
; /* just stored raw oid there */
616 ldpp_dout(this, 5) << "RGWGC::process removing " << obj
.pool
<<
617 ":" << obj
.key
.name
<< dendl
;
618 ObjectWriteOperation op
;
619 cls_refcount_put(op
, info
.tag
, true);
621 ret
= io_manager
.schedule_io(ctx
, oid
, &op
, index
, info
.tag
);
623 ldpp_dout(this, 0) <<
624 "WARNING: failed to schedule deletion for oid=" << oid
<< dendl
;
625 if (transitioned_objects_cache
[index
]) {
626 //If deleting oid failed for any of them, we will not delete queue entries
631 // leave early, even if tag isn't removed, it's ok since it
632 // will be picked up next time around
636 } // else -- chains not empty
638 if (transitioned_objects_cache
[index
] && entries
.size() > 0) {
639 ret
= io_manager
.drain_ios();
643 //Remove the entries from the queue
644 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker
<< dendl
;
645 ret
= io_manager
.remove_queue_entries(index
, entries
.size());
647 ldpp_dout(this, 0) <<
648 "WARNING: failed to remove queue entries" << dendl
;
655 /* we don't drain here, because if we're going down we don't want to
656 * hold the system if backend is unresponsive
658 l
.unlock(&store
->gc_pool_ctx
, obj_names
[index
]);
664 int RGWGC::process(bool expired_only
)
666 int max_secs
= cct
->_conf
->rgw_gc_processor_max_time
;
668 const int start
= ceph::util::generate_random_number(0, max_objs
- 1);
670 RGWGCIOManager
io_manager(this, store
->ctx(), this);
672 for (int i
= 0; i
< max_objs
; i
++) {
673 int index
= (i
+ start
) % max_objs
;
674 int ret
= process(index
, max_secs
, expired_only
, io_manager
);
685 bool RGWGC::going_down()
690 void RGWGC::start_processor()
692 worker
= new GCWorker(this, cct
, this);
693 worker
->create("rgw_gc");
696 void RGWGC::stop_processor()
707 unsigned RGWGC::get_subsys() const
712 std::ostream
& RGWGC::gen_prefix(std::ostream
& out
) const
714 return out
<< "garbage collection: ";
717 void *RGWGC::GCWorker::entry() {
719 utime_t start
= ceph_clock_now();
720 ldpp_dout(dpp
, 2) << "garbage collection: start" << dendl
;
721 int r
= gc
->process(true);
723 ldpp_dout(dpp
, 0) << "ERROR: garbage collection process() returned error r=" << r
<< dendl
;
725 ldpp_dout(dpp
, 2) << "garbage collection: stop" << dendl
;
727 if (gc
->going_down())
730 utime_t end
= ceph_clock_now();
732 int secs
= cct
->_conf
->rgw_gc_processor_period
;
734 if (secs
<= end
.sec())
735 continue; // next round
739 std::unique_lock locker
{lock
};
740 cond
.wait_for(locker
, std::chrono::seconds(secs
));
741 } while (!gc
->going_down());
746 void RGWGC::GCWorker::stop()
748 std::lock_guard l
{lock
};