1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "include/rados/librados.hpp"
6 #include "cls/rgw/cls_rgw_client.h"
7 #include "cls/refcount/cls_refcount_client.h"
8 #include "cls/lock/cls_lock_client.h"
9 #include "include/random.h"
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rgw
17 using namespace librados
;
19 static string gc_oid_prefix
= "gc";
20 static string gc_index_lock_name
= "gc_process";
23 void RGWGC::initialize(CephContext
*_cct
, RGWRados
*_store
) {
27 max_objs
= min(static_cast<int>(cct
->_conf
->rgw_gc_max_objs
), rgw_shards_max());
29 obj_names
= new string
[max_objs
];
31 for (int i
= 0; i
< max_objs
; i
++) {
32 obj_names
[i
] = gc_oid_prefix
;
34 snprintf(buf
, 32, ".%d", i
);
35 obj_names
[i
].append(buf
);
39 void RGWGC::finalize()
44 int RGWGC::tag_index(const string
& tag
)
46 return rgw_shard_id(tag
, max_objs
);
49 void RGWGC::add_chain(ObjectWriteOperation
& op
, cls_rgw_obj_chain
& chain
, const string
& tag
)
51 cls_rgw_gc_obj_info info
;
55 cls_rgw_gc_set_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, info
);
58 int RGWGC::send_chain(cls_rgw_obj_chain
& chain
, const string
& tag
, bool sync
)
60 ObjectWriteOperation op
;
61 add_chain(op
, chain
, tag
);
63 int i
= tag_index(tag
);
66 return store
->gc_operate(obj_names
[i
], &op
);
68 return store
->gc_aio_operate(obj_names
[i
], &op
);
71 int RGWGC::defer_chain(const string
& tag
, bool sync
)
73 ObjectWriteOperation op
;
74 cls_rgw_gc_defer_entry(op
, cct
->_conf
->rgw_gc_obj_min_wait
, tag
);
76 int i
= tag_index(tag
);
79 return store
->gc_operate(obj_names
[i
], &op
);
81 return store
->gc_aio_operate(obj_names
[i
], &op
);
84 int RGWGC::remove(int index
, const std::vector
<string
>& tags
, AioCompletion
**pc
)
86 ObjectWriteOperation op
;
87 cls_rgw_gc_remove(op
, tags
);
88 return store
->gc_aio_operate(obj_names
[index
], &op
, pc
);
91 int RGWGC::list(int *index
, string
& marker
, uint32_t max
, bool expired_only
, std::list
<cls_rgw_gc_obj_info
>& result
, bool *truncated
)
96 for (; *index
< max_objs
&& result
.size() < max
; (*index
)++, marker
.clear()) {
97 std::list
<cls_rgw_gc_obj_info
> entries
;
98 int ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[*index
], marker
, max
- result
.size(), expired_only
, entries
, truncated
, next_marker
);
104 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
105 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
106 result
.push_back(*iter
);
109 marker
= next_marker
;
111 if (*index
== max_objs
- 1) {
112 /* we cut short here, truncated will hold the correct value */
116 if (result
.size() == max
) {
117 /* close approximation, it might be that the next of the objects don't hold
118 * anything, in this case truncated should have been false, but we can find
119 * that out on the next iteration
131 class RGWGCIOManager
{
132 const DoutPrefixProvider
* dpp
;
142 librados::AioCompletion
*c
{nullptr};
149 vector
<std::vector
<string
> > remove_tags
;
151 #define MAX_AIO_DEFAULT 10
152 size_t max_aio
{MAX_AIO_DEFAULT
};
155 RGWGCIOManager(const DoutPrefixProvider
* _dpp
, CephContext
*_cct
, RGWGC
*_gc
) : dpp(_dpp
),
158 remove_tags(cct
->_conf
->rgw_gc_max_objs
) {
159 max_aio
= cct
->_conf
->rgw_gc_max_concurrent_io
;
163 for (auto io
: ios
) {
168 int schedule_io(IoCtx
*ioctx
, const string
& oid
, ObjectWriteOperation
*op
,
169 int index
, const string
& tag
) {
170 while (ios
.size() > max_aio
) {
171 if (gc
->going_down()) {
174 handle_next_completion();
177 AioCompletion
*c
= librados::Rados::aio_create_completion(NULL
, NULL
, NULL
);
178 int ret
= ioctx
->aio_operate(oid
, c
, op
);
182 ios
.push_back(IO
{IO::TailIO
, c
, oid
, index
, tag
});
187 void handle_next_completion() {
188 ceph_assert(!ios
.empty());
189 IO
& io
= ios
.front();
190 io
.c
->wait_for_safe();
191 int ret
= io
.c
->get_return_value();
194 if (ret
== -ENOENT
) {
198 if (io
.type
== IO::IndexIO
) {
200 ldpp_dout(dpp
, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
201 io
.index
<< " returned error, ret=" << ret
<< dendl
;
207 ldpp_dout(dpp
, 0) << "WARNING: gc could not remove oid=" << io
.oid
<<
208 ", ret=" << ret
<< dendl
;
212 schedule_tag_removal(io
.index
, io
.tag
);
218 void schedule_tag_removal(int index
, string tag
) {
219 auto& rt
= remove_tags
[index
];
221 // since every element of a chain tries to add the same tag, and
222 // since chains are handled sequentially, check to make sure it's
223 // not already on the list
224 if (rt
.empty() || rt
.back() != tag
) {
226 if (rt
.size() >= (size_t)cct
->_conf
->rgw_gc_max_trim_chunk
) {
227 flush_remove_tags(index
, rt
);
233 while (!ios
.empty()) {
234 if (gc
->going_down()) {
237 handle_next_completion();
244 /* the tags draining might have generated more ios, drain those too */
248 void flush_remove_tags(int index
, vector
<string
>& rt
) {
250 index_io
.type
= IO::IndexIO
;
251 index_io
.index
= index
;
253 ldpp_dout(dpp
, 20) << __func__
<<
254 " removing entries from gc log shard index=" << index
<< ", size=" <<
255 rt
.size() << ", entries=" << rt
<< dendl
;
257 int ret
= gc
->remove(index
, rt
, &index_io
.c
);
260 /* we already cleared list of tags, this prevents us from
261 * ballooning in case of a persistent problem
263 ldpp_dout(dpp
, 0) << "WARNING: failed to remove tags on gc shard index=" <<
264 index
<< " ret=" << ret
<< dendl
;
268 ios
.push_back(index_io
);
271 void flush_remove_tags() {
273 for (auto& rt
: remove_tags
) {
274 flush_remove_tags(index
, rt
);
278 }; // class RGWGCIOManger
280 int RGWGC::process(int index
, int max_secs
, bool expired_only
,
281 RGWGCIOManager
& io_manager
)
283 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
284 index
<< ", max_secs=" << max_secs
<< ", expired_only=" <<
285 expired_only
<< dendl
;
287 rados::cls::lock::Lock
l(gc_index_lock_name
);
288 utime_t end
= ceph_clock_now();
290 /* max_secs should be greater than zero. We don't want a zero max_secs
291 * to be translated as no timeout, since we'd then need to break the
292 * lock and that would require a manual intervention. In this case
293 * we can just wait it out. */
298 utime_t
time(max_secs
, 0);
299 l
.set_duration(time
);
301 int ret
= l
.lock_exclusive(&store
->gc_pool_ctx
, obj_names
[index
]);
302 if (ret
== -EBUSY
) { /* already locked by another gc processor */
303 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
304 obj_names
[index
] << dendl
;
313 IoCtx
*ctx
= new IoCtx
;
316 std::list
<cls_rgw_gc_obj_info
> entries
;
318 ret
= cls_rgw_gc_list(store
->gc_pool_ctx
, obj_names
[index
], marker
, max
,
319 expired_only
, entries
, &truncated
, next_marker
);
320 ldpp_dout(this, 20) <<
321 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret
<<
322 ", entries.size=" << entries
.size() << ", truncated=" << truncated
<<
323 ", next_marker='" << next_marker
<< "'" << dendl
;
325 if (ret
== -ENOENT
) {
332 marker
= next_marker
;
335 std::list
<cls_rgw_gc_obj_info
>::iterator iter
;
336 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
337 cls_rgw_gc_obj_info
& info
= *iter
;
339 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
340 info
.tag
<< "', time=" << info
.time
<< ", chain.objs.size()=" <<
341 info
.chain
.objs
.size() << dendl
;
343 std::list
<cls_rgw_obj
>::iterator liter
;
344 cls_rgw_obj_chain
& chain
= info
.chain
;
346 utime_t now
= ceph_clock_now();
351 if (chain
.objs
.empty()) {
352 io_manager
.schedule_tag_removal(index
, info
.tag
);
354 for (liter
= chain
.objs
.begin(); liter
!= chain
.objs
.end(); ++liter
) {
355 cls_rgw_obj
& obj
= *liter
;
357 if (obj
.pool
!= last_pool
) {
360 ret
= rgw_init_ioctx(store
->get_rados_handle(), obj
.pool
, *ctx
);
363 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
367 last_pool
= obj
.pool
;
370 ctx
->locator_set_key(obj
.loc
);
372 const string
& oid
= obj
.key
.name
; /* just stored raw oid there */
374 ldpp_dout(this, 5) << "RGWGC::process removing " << obj
.pool
<<
375 ":" << obj
.key
.name
<< dendl
;
376 ObjectWriteOperation op
;
377 cls_refcount_put(op
, info
.tag
, true);
379 ret
= io_manager
.schedule_io(ctx
, oid
, &op
, index
, info
.tag
);
381 ldpp_dout(this, 0) <<
382 "WARNING: failed to schedule deletion for oid=" << oid
<< dendl
;
385 // leave early, even if tag isn't removed, it's ok since it
386 // will be picked up next time around
390 } // else -- chains not empty
395 /* we don't drain here, because if we're going down we don't want to
396 * hold the system if backend is unresponsive
398 l
.unlock(&store
->gc_pool_ctx
, obj_names
[index
]);
404 int RGWGC::process(bool expired_only
)
406 int max_secs
= cct
->_conf
->rgw_gc_processor_max_time
;
408 const int start
= ceph::util::generate_random_number(0, max_objs
- 1);
410 RGWGCIOManager
io_manager(this, store
->ctx(), this);
412 for (int i
= 0; i
< max_objs
; i
++) {
413 int index
= (i
+ start
) % max_objs
;
414 int ret
= process(index
, max_secs
, expired_only
, io_manager
);
425 bool RGWGC::going_down()
430 void RGWGC::start_processor()
432 worker
= new GCWorker(this, cct
, this);
433 worker
->create("rgw_gc");
436 void RGWGC::stop_processor()
447 unsigned RGWGC::get_subsys() const
452 std::ostream
& RGWGC::gen_prefix(std::ostream
& out
) const
454 return out
<< "garbage collection: ";
457 void *RGWGC::GCWorker::entry() {
459 utime_t start
= ceph_clock_now();
460 ldpp_dout(dpp
, 2) << "garbage collection: start" << dendl
;
461 int r
= gc
->process(true);
463 ldpp_dout(dpp
, 0) << "ERROR: garbage collection process() returned error r=" << r
<< dendl
;
465 ldpp_dout(dpp
, 2) << "garbage collection: stop" << dendl
;
467 if (gc
->going_down())
470 utime_t end
= ceph_clock_now();
472 int secs
= cct
->_conf
->rgw_gc_processor_period
;
474 if (secs
<= end
.sec())
475 continue; // next round
480 cond
.WaitInterval(lock
, utime_t(secs
, 0));
482 } while (!gc
->going_down());
487 void RGWGC::GCWorker::stop()
489 Mutex::Locker
l(lock
);