1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "include/scope_guard.h"
8 #include "include/rados/librados.hpp"
9 #include "cls/rgw/cls_rgw_client.h"
10 #include "cls/rgw_gc/cls_rgw_gc_client.h"
11 #include "cls/refcount/cls_refcount_client.h"
12 #include "cls/version/cls_version_client.h"
13 #include "rgw_perf_counters.h"
14 #include "cls/lock/cls_lock_client.h"
15 #include "include/random.h"
16 #include "rgw_gc_log.h"
18 #include <list> // XXX
22 #define dout_context g_ceph_context
23 #define dout_subsys ceph_subsys_rgw
25 using namespace librados
;
27 static string gc_oid_prefix
= "gc";
28 static string gc_index_lock_name
= "gc_process";
30 void RGWGC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
34 max_objs
= min(static_cast<int>(cct
->_conf
->rgw_gc_max_objs
), rgw_shards_max());
36 obj_names
= new string
[max_objs
];
38 for (int i
= 0; i
< max_objs
; i
++) {
39 obj_names
[i
] = gc_oid_prefix
;
41 snprintf(buf
, 32, ".%d", i
);
42 obj_names
[i
].append(buf
);
44 auto it
= transitioned_objects_cache
.begin() + i
;
45 transitioned_objects_cache
.insert(it
, false);
47 //version = 0 -> not ready for transition
48 //version = 1 -> marked ready for transition
49 librados::ObjectWriteOperation op
;
51 const uint64_t queue_size
= cct
->_conf
->rgw_gc_max_queue_size
, num_deferred_entries
= cct
->_conf
->rgw_gc_max_deferred
;
52 gc_log_init2(op
, queue_size
, num_deferred_entries
);
53 store
->gc_operate(this, obj_names
[i
], &op
);
57 void RGWGC::finalize()
62 int RGWGC::tag_index(const string
& tag
)
64 return rgw_shards_mod(XXH64(tag
.c_str(), tag
.size(), seed
), max_objs
);
67 int RGWGC::send_chain(cls_rgw_obj_chain
& chain
, const string
& tag
)
69 ObjectWriteOperation op
;
70 cls_rgw_gc_obj_info info
;
73 gc_log_enqueue2(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
75 int i
= tag_index(tag
);
77 ldpp_dout(this, 20) << "RGWGC::send_chain - on object name: " << obj_names
[i
] << "tag is: " << tag
<< dendl
;
79 auto ret
= store
->gc_operate(this, obj_names
[i
], &op
);
80 if (ret
!= -ECANCELED
&& ret
!= -EPERM
) {
83 ObjectWriteOperation set_entry_op
;
84 cls_rgw_gc_set_entry(set_entry_op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
85 return store
->gc_operate(this, obj_names
[i
], &set_entry_op
);
88 struct defer_chain_state
{
89 librados::AioCompletion
* completion
= nullptr;
90 // TODO: hold a reference on the state in RGWGC to avoid use-after-free if
91 // RGWGC destructs before this completion fires
93 cls_rgw_gc_obj_info info
;
95 ~defer_chain_state() {
97 completion
->release();
102 static void async_defer_callback(librados::completion_t
, void* arg
)
104 std::unique_ptr
<defer_chain_state
> state
{static_cast<defer_chain_state
*>(arg
)};
105 if (state
->completion
->get_return_value() == -ECANCELED
) {
106 state
->gc
->on_defer_canceled(state
->info
);
110 void RGWGC::on_defer_canceled(const cls_rgw_gc_obj_info
& info
)
112 const std::string
& tag
= info
.tag
;
113 const int i
= tag_index(tag
);
115 // ECANCELED from cls_version_check() tells us that we've transitioned
116 transitioned_objects_cache
[i
] = true;
118 ObjectWriteOperation op
;
119 cls_rgw_gc_queue_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
120 cls_rgw_gc_remove(op
, {tag
});
122 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
123 store
->gc_aio_operate(obj_names
[i
], c
, &op
);
127 int RGWGC::async_defer_chain(const string
& tag
, const cls_rgw_obj_chain
& chain
)
129 const int i
= tag_index(tag
);
130 cls_rgw_gc_obj_info info
;
134 // if we've transitioned this shard object, we can rely on the cls_rgw_gc queue
135 if (transitioned_objects_cache
[i
]) {
136 ObjectWriteOperation op
;
137 cls_rgw_gc_queue_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
139 // this tag may still be present in omap, so remove it once the cls_rgw_gc
141 cls_rgw_gc_remove(op
, {tag
});
143 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
144 int ret
= store
->gc_aio_operate(obj_names
[i
], c
, &op
);
149 // if we haven't seen the transition yet, write the defer to omap with cls_rgw
150 ObjectWriteOperation op
;
152 // assert that we haven't initialized cls_rgw_gc queue. this prevents us
153 // from writing new entries to omap after the transition
154 gc_log_defer1(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
156 // prepare a callback to detect the transition via ECANCELED from cls_version_check()
157 auto state
= std::make_unique
<defer_chain_state
>();
159 state
->info
.chain
= chain
;
160 state
->info
.tag
= tag
;
161 state
->completion
= librados::Rados::aio_create_completion(
162 state
.get(), async_defer_callback
);
164 int ret
= store
->gc_aio_operate(obj_names
[i
], state
->completion
, &op
);
166 state
.release(); // release ownership until async_defer_callback()
171 int RGWGC::remove(int index
, const std::vector
<string
>& tags
, AioCompletion
**pc
)
173 ObjectWriteOperation op
;
174 cls_rgw_gc_remove(op
, tags
);
176 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
177 int ret
= store
->gc_aio_operate(obj_names
[index
], c
, &op
);
186 int RGWGC::remove(int index
, int num_entries
)
188 ObjectWriteOperation op
;
189 cls_rgw_gc_queue_remove_entries(op
, num_entries
);
191 return store
->gc_operate(this, obj_names
[index
], &op
);
194 int RGWGC::list(int *index
, string
& marker
, uint32_t max
, bool expired_only
, std::list
<cls_rgw_gc_obj_info
>& result
, bool *truncated
, bool& processing_queue
)
198 bool check_queue
= false;
200 for (; *index
< max_objs
&& result
.size() < max
; (*index
)++, marker
.clear(), check_queue
= false) {
201 std::list
<cls_rgw_gc_obj_info
> entries
, queue_entries
;
204 //processing_queue is set to true from previous iteration if the queue was under process and probably has more elements in it.
205 if (! transitioned_objects_cache
[*index
] && ! check_queue
&& ! processing_queue
) {
206 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, max
- result
.size(), expired_only
, entries
, truncated
, next_marker
);
207 if (ret
!= -ENOENT
&& ret
< 0) {
211 cls_version_read(store
->gc_pool_ctx
, obj_names
[*index
], &objv
);
212 if (ret
== -ENOENT
|| entries
.size() == 0) {
216 if (! expired_only
) {
217 transitioned_objects_cache
[*index
] = true;
220 std::list
<cls_rgw_gc_obj_info
> non_expired_entries
;
221 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, 1, false, non_expired_entries
, truncated
, next_marker
);
222 if (non_expired_entries
.size() == 0) {
223 transitioned_objects_cache
[*index
] = true;
229 if ((objv
.ver
== 1) && (entries
.size() < max
- result
.size())) {
234 if (transitioned_objects_cache
[*index
] || check_queue
|| processing_queue
) {
235 processing_queue
= false;
236 ret
= cls_rgw_gc_queue_list_entries(store
->gc_pool_ctx
, obj_names
[*index
], marker
, (max
- result
.size()) - entries
.size(), expired_only
, queue_entries
, truncated
, next_marker
);
241 if (entries
.size() == 0 && queue_entries
.size() == 0)
244 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
245 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
246 result
.push_back(*iter
);
249 for (iter
= queue_entries
.begin(); iter
!= queue_entries
.end(); ++iter
) {
250 result
.push_back(*iter
);
253 marker
= next_marker
;
255 if (*index
== max_objs
- 1) {
256 if (queue_entries
.size() > 0 && *truncated
) {
257 processing_queue
= true;
259 processing_queue
= false;
261 /* we cut short here, truncated will hold the correct value */
265 if (result
.size() == max
) {
266 if (queue_entries
.size() > 0 && *truncated
) {
267 processing_queue
= true;
269 processing_queue
= false;
270 *index
+= 1; //move to next gc object
273 /* close approximation, it might be that the next of the objects don't hold
274 * anything, in this case truncated should have been false, but we can find
275 * that out on the next iteration
282 processing_queue
= false;
287 class RGWGCIOManager
{
288 const DoutPrefixProvider
* dpp
;
298 librados::AioCompletion
*c
{nullptr};
305 vector
<std::vector
<string
> > remove_tags
;
306 /* tracks the number of remaining shadow objects for a given tag in order to
307 * only remove the tag once all shadow objects have themselves been removed
309 vector
<map
<string
, size_t> > tag_io_size
;
311 #define MAX_AIO_DEFAULT 10
312 size_t max_aio
{MAX_AIO_DEFAULT
};
315 RGWGCIOManager(const DoutPrefixProvider
* _dpp
, CephContext
*_cct
, RGWGC
*_gc
) : dpp(_dpp
),
318 max_aio
= cct
->_conf
->rgw_gc_max_concurrent_io
;
319 remove_tags
.resize(min(static_cast<int>(cct
->_conf
->rgw_gc_max_objs
), rgw_shards_max()));
320 tag_io_size
.resize(min(static_cast<int>(cct
->_conf
->rgw_gc_max_objs
), rgw_shards_max()));
324 for (auto io
: ios
) {
329 int schedule_io(IoCtx
*ioctx
, const string
& oid
, ObjectWriteOperation
*op
,
330 int index
, const string
& tag
) {
331 while (ios
.size() > max_aio
) {
332 if (gc
->going_down()) {
335 auto ret
= handle_next_completion();
336 //Return error if we are using queue, else ignore it
337 if (gc
->transitioned_objects_cache
[index
] && ret
< 0) {
342 auto c
= librados::Rados::aio_create_completion(nullptr, nullptr);
343 int ret
= ioctx
->aio_operate(oid
, c
, op
);
347 ios
.push_back(IO
{IO::TailIO
, c
, oid
, index
, tag
});
352 int handle_next_completion() {
353 ceph_assert(!ios
.empty());
354 IO
& io
= ios
.front();
355 io
.c
->wait_for_complete();
356 int ret
= io
.c
->get_return_value();
359 if (ret
== -ENOENT
) {
363 if (io
.type
== IO::IndexIO
&& ! gc
->transitioned_objects_cache
[io
.index
]) {
365 ldpp_dout(dpp
, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
366 io
.index
<< " returned error, ret=" << ret
<< dendl
;
372 ldpp_dout(dpp
, 0) << "WARNING: gc could not remove oid=" << io
.oid
<<
373 ", ret=" << ret
<< dendl
;
377 if (! gc
->transitioned_objects_cache
[io
.index
]) {
378 schedule_tag_removal(io
.index
, io
.tag
);
386 /* This is a request to schedule a tag removal. It will be called once when
387 * there are no shadow objects. But it will also be called for every shadow
388 * object when there are any. Since we do not want the tag to be removed
389 * until all shadow objects have been successfully removed, the scheduling
390 * will not happen until the shadow object count goes down to zero
392 void schedule_tag_removal(int index
, string tag
) {
393 auto& ts
= tag_io_size
[index
];
394 auto ts_it
= ts
.find(tag
);
395 if (ts_it
!= ts
.end()) {
396 auto& size
= ts_it
->second
;
398 // wait all shadow obj delete return
405 auto& rt
= remove_tags
[index
];
408 if (rt
.size() >= (size_t)cct
->_conf
->rgw_gc_max_trim_chunk
) {
409 flush_remove_tags(index
, rt
);
413 void add_tag_io_size(int index
, string tag
, size_t size
) {
414 auto& ts
= tag_io_size
[index
];
415 ts
.emplace(tag
, size
);
420 while (!ios
.empty()) {
421 if (gc
->going_down()) {
424 auto ret
= handle_next_completion();
435 /* the tags draining might have generated more ios, drain those too */
439 void flush_remove_tags(int index
, vector
<string
>& rt
) {
441 index_io
.type
= IO::IndexIO
;
442 index_io
.index
= index
;
444 ldpp_dout(dpp
, 20) << __func__
<<
445 " removing entries from gc log shard index=" << index
<< ", size=" <<
446 rt
.size() << ", entries=" << rt
<< dendl
;
448 auto rt_guard
= make_scope_guard(
455 int ret
= gc
->remove(index
, rt
, &index_io
.c
);
457 /* we already cleared list of tags, this prevents us from
458 * ballooning in case of a persistent problem
460 ldpp_dout(dpp
, 0) << "WARNING: failed to remove tags on gc shard index=" <<
461 index
<< " ret=" << ret
<< dendl
;
465 /* log the count of tags retired for rate estimation */
466 perfcounter
->inc(l_rgw_gc_retire
, rt
.size());
468 ios
.push_back(index_io
);
471 void flush_remove_tags() {
473 for (auto& rt
: remove_tags
) {
474 if (! gc
->transitioned_objects_cache
[index
]) {
475 flush_remove_tags(index
, rt
);
481 int remove_queue_entries(int index
, int num_entries
) {
482 int ret
= gc
->remove(index
, num_entries
);
484 ldpp_dout(dpp
, 0) << "ERROR: failed to remove queue entries on index=" <<
485 index
<< " ret=" << ret
<< dendl
;
489 /* log the count of tags retired for rate estimation */
490 perfcounter
->inc(l_rgw_gc_retire
, num_entries
);
494 }; // class RGWGCIOManger
496 int RGWGC::process(int index
, int max_secs
, bool expired_only
,
497 RGWGCIOManager
& io_manager
)
499 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
500 index
<< ", max_secs=" << max_secs
<< ", expired_only=" <<
501 expired_only
<< dendl
;
503 rados::cls::lock::Lock
l(gc_index_lock_name
);
504 utime_t end
= ceph_clock_now();
506 /* max_secs should be greater than zero. We don't want a zero max_secs
507 * to be translated as no timeout, since we'd then need to break the
508 * lock and that would require a manual intervention. In this case
509 * we can just wait it out. */
514 utime_t
time(max_secs
, 0);
515 l
.set_duration(time
);
517 int ret
= l
.lock_exclusive(&store
->gc_pool_ctx
, obj_names
[index
]);
518 if (ret
== -EBUSY
) { /* already locked by another gc processor */
519 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
520 obj_names
[index
] << dendl
;
529 IoCtx
*ctx
= new IoCtx
;
532 std::list
<cls_rgw_gc_obj_info
> entries
;
536 if (! transitioned_objects_cache
[index
]) {
537 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
, expired_only
, entries
, &truncated
, next_marker
);
538 ldpp_dout(this, 20) <<
539 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret
<<
540 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
541 ", next_marker='" << next_marker
<< "'" << dendl
;
543 cls_version_read(store
->gc_pool_ctx
, obj_names
[index
], &objv
);
544 if ((objv
.ver
== 1) && entries
.size() == 0) {
545 std::list
<cls_rgw_gc_obj_info
> non_expired_entries
;
546 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, 1, false, non_expired_entries
, &truncated
, next_marker
);
547 if (non_expired_entries
.size() == 0) {
548 transitioned_objects_cache
[index
] = true;
550 ldpp_dout(this, 20) << "RGWGC::process cls_rgw_gc_list returned NO non expired entries, so setting cache entry to TRUE" << dendl
;
556 if ((objv
.ver
== 0) && (ret
== -ENOENT
|| entries
.size() == 0)) {
562 if (transitioned_objects_cache
[index
]) {
563 ret
= cls_rgw_gc_queue_list_entries(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
, expired_only
, entries
, &truncated
, next_marker
);
564 ldpp_dout(this, 20) <<
565 "RGWGC::process cls_rgw_gc_queue_list_entries returned with return value:" << ret
<<
566 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
567 ", next_marker='" << next_marker
<< "'" << dendl
;
568 if (entries
.size() == 0) {
577 marker
= next_marker
;
580 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
581 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
582 cls_rgw_gc_obj_info
& info
= *iter
;
584 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
585 info
.tag
<< "', time=" << info
.time
<< ", chain.objs.size()=" <<
586 info
.chain
.objs
.size() << dendl
;
588 std::list
<cls_rgw_obj
>::iterator liter
;
589 cls_rgw_obj_chain
& chain
= info
.chain
;
591 utime_t now
= ceph_clock_now();
595 if (! transitioned_objects_cache
[index
]) {
596 if (chain
.objs
.empty()) {
597 io_manager
.schedule_tag_removal(index
, info
.tag
);
599 io_manager
.add_tag_io_size(index
, info
.tag
, chain
.objs
.size());
602 if (! chain
.objs
.empty()) {
603 for (liter
= chain
.objs
.begin(); liter
!= chain
.objs
.end(); ++liter
) {
604 cls_rgw_obj
& obj
= *liter
;
606 if (obj
.pool
!= last_pool
) {
609 ret
= rgw_init_ioctx(this, store
->get_rados_handle(), obj
.pool
, *ctx
);
611 if (transitioned_objects_cache
[index
]) {
615 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
619 last_pool
= obj
.pool
;
622 ctx
->locator_set_key(obj
.loc
);
624 const string
& oid
= obj
.key
.name
; /* just stored raw oid there */
626 ldpp_dout(this, 5) << "RGWGC::process removing " << obj
.pool
<<
627 ":" << obj
.key
.name
<< dendl
;
628 ObjectWriteOperation op
;
629 cls_refcount_put(op
, info
.tag
, true);
631 ret
= io_manager
.schedule_io(ctx
, oid
, &op
, index
, info
.tag
);
633 ldpp_dout(this, 0) <<
634 "WARNING: failed to schedule deletion for oid=" << oid
<< dendl
;
635 if (transitioned_objects_cache
[index
]) {
636 //If deleting oid failed for any of them, we will not delete queue entries
641 // leave early, even if tag isn't removed, it's ok since it
642 // will be picked up next time around
646 } // else -- chains not empty
648 if (transitioned_objects_cache
[index
] && entries
.size() > 0) {
649 ret
= io_manager
.drain_ios();
653 //Remove the entries from the queue
654 ldpp_dout(this, 5) << "RGWGC::process removing entries, marker: " << marker
<< dendl
;
655 ret
= io_manager
.remove_queue_entries(index
, entries
.size());
657 ldpp_dout(this, 0) <<
658 "WARNING: failed to remove queue entries" << dendl
;
665 /* we don't drain here, because if we're going down we don't want to
666 * hold the system if backend is unresponsive
668 l
.unlock(&store
->gc_pool_ctx
, obj_names
[index
]);
674 int RGWGC::process(bool expired_only
)
676 int max_secs
= cct
->_conf
->rgw_gc_processor_max_time
;
678 const int start
= ceph::util::generate_random_number(0, max_objs
- 1);
680 RGWGCIOManager
io_manager(this, store
->ctx(), this);
682 for (int i
= 0; i
< max_objs
; i
++) {
683 int index
= (i
+ start
) % max_objs
;
684 int ret
= process(index
, max_secs
, expired_only
, io_manager
);
695 bool RGWGC::going_down()
700 void RGWGC::start_processor()
702 worker
= new GCWorker(this, cct
, this);
703 worker
->create("rgw_gc");
706 void RGWGC::stop_processor()
717 unsigned RGWGC::get_subsys() const
722 std::ostream
& RGWGC::gen_prefix(std::ostream
& out
) const
724 return out
<< "garbage collection: ";
727 void *RGWGC::GCWorker::entry() {
729 utime_t start
= ceph_clock_now();
730 ldpp_dout(dpp
, 2) << "garbage collection: start" << dendl
;
731 int r
= gc
->process(true);
733 ldpp_dout(dpp
, 0) << "ERROR: garbage collection process() returned error r=" << r
<< dendl
;
735 ldpp_dout(dpp
, 2) << "garbage collection: stop" << dendl
;
737 if (gc
->going_down())
740 utime_t end
= ceph_clock_now();
742 int secs
= cct
->_conf
->rgw_gc_processor_period
;
744 if (secs
<= end
.sec())
745 continue; // next round
749 std::unique_lock locker
{lock
};
750 cond
.wait_for(locker
, std::chrono::seconds(secs
));
751 } while (!gc
->going_down());
756 void RGWGC::GCWorker::stop()
758 std::lock_guard l
{lock
};