]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_gc.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / rgw / rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_gc.h"
5 #include "include/rados/librados.hpp"
6 #include "cls/rgw/cls_rgw_client.h"
7 #include "cls/refcount/cls_refcount_client.h"
8 #include "cls/lock/cls_lock_client.h"
9 #include "include/random.h"
10
11 #include <list>
12 #include <sstream>
13
14 #define dout_context g_ceph_context
15 #define dout_subsys ceph_subsys_rgw
16
17 using namespace librados;
18
19 static string gc_oid_prefix = "gc";
20 static string gc_index_lock_name = "gc_process";
21
22
23 void RGWGC::initialize(CephContext *_cct, RGWRados *_store) {
24 cct = _cct;
25 store = _store;
26
27 max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max());
28
29 obj_names = new string[max_objs];
30
31 for (int i = 0; i < max_objs; i++) {
32 obj_names[i] = gc_oid_prefix;
33 char buf[32];
34 snprintf(buf, 32, ".%d", i);
35 obj_names[i].append(buf);
36 }
37 }
38
39 void RGWGC::finalize()
40 {
41 delete[] obj_names;
42 }
43
44 int RGWGC::tag_index(const string& tag)
45 {
46 return rgw_shard_id(tag, max_objs);
47 }
48
49 void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag)
50 {
51 cls_rgw_gc_obj_info info;
52 info.chain = chain;
53 info.tag = tag;
54
55 cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info);
56 }
57
58 int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync)
59 {
60 ObjectWriteOperation op;
61 add_chain(op, chain, tag);
62
63 int i = tag_index(tag);
64
65 if (sync)
66 return store->gc_operate(obj_names[i], &op);
67
68 return store->gc_aio_operate(obj_names[i], &op);
69 }
70
71 int RGWGC::defer_chain(const string& tag, bool sync)
72 {
73 ObjectWriteOperation op;
74 cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag);
75
76 int i = tag_index(tag);
77
78 if (sync)
79 return store->gc_operate(obj_names[i], &op);
80
81 return store->gc_aio_operate(obj_names[i], &op);
82 }
83
84 int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc)
85 {
86 ObjectWriteOperation op;
87 cls_rgw_gc_remove(op, tags);
88 return store->gc_aio_operate(obj_names[index], &op, pc);
89 }
90
91 int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated)
92 {
93 result.clear();
94 string next_marker;
95
96 for (; *index < max_objs && result.size() < max; (*index)++, marker.clear()) {
97 std::list<cls_rgw_gc_obj_info> entries;
98 int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker);
99 if (ret == -ENOENT)
100 continue;
101 if (ret < 0)
102 return ret;
103
104 std::list<cls_rgw_gc_obj_info>::iterator iter;
105 for (iter = entries.begin(); iter != entries.end(); ++iter) {
106 result.push_back(*iter);
107 }
108
109 marker = next_marker;
110
111 if (*index == max_objs - 1) {
112 /* we cut short here, truncated will hold the correct value */
113 return 0;
114 }
115
116 if (result.size() == max) {
117 /* close approximation, it might be that the next of the objects don't hold
118 * anything, in this case truncated should have been false, but we can find
119 * that out on the next iteration
120 */
121 *truncated = true;
122 return 0;
123 }
124
125 }
126 *truncated = false;
127
128 return 0;
129 }
130
131 class RGWGCIOManager {
132 const DoutPrefixProvider* dpp;
133 CephContext *cct;
134 RGWGC *gc;
135
136 struct IO {
137 enum Type {
138 UnknownIO = 0,
139 TailIO = 1,
140 IndexIO = 2,
141 } type{UnknownIO};
142 librados::AioCompletion *c{nullptr};
143 string oid;
144 int index{-1};
145 string tag;
146 };
147
148 deque<IO> ios;
149 vector<std::vector<string> > remove_tags;
150
151 #define MAX_AIO_DEFAULT 10
152 size_t max_aio{MAX_AIO_DEFAULT};
153
154 public:
155 RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp),
156 cct(_cct),
157 gc(_gc),
158 remove_tags(cct->_conf->rgw_gc_max_objs) {
159 max_aio = cct->_conf->rgw_gc_max_concurrent_io;
160 }
161
162 ~RGWGCIOManager() {
163 for (auto io : ios) {
164 io.c->release();
165 }
166 }
167
168 int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op,
169 int index, const string& tag) {
170 while (ios.size() > max_aio) {
171 if (gc->going_down()) {
172 return 0;
173 }
174 handle_next_completion();
175 }
176
177 AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
178 int ret = ioctx->aio_operate(oid, c, op);
179 if (ret < 0) {
180 return ret;
181 }
182 ios.push_back(IO{IO::TailIO, c, oid, index, tag});
183
184 return 0;
185 }
186
187 void handle_next_completion() {
188 ceph_assert(!ios.empty());
189 IO& io = ios.front();
190 io.c->wait_for_safe();
191 int ret = io.c->get_return_value();
192 io.c->release();
193
194 if (ret == -ENOENT) {
195 ret = 0;
196 }
197
198 if (io.type == IO::IndexIO) {
199 if (ret < 0) {
200 ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" <<
201 io.index << " returned error, ret=" << ret << dendl;
202 }
203 goto done;
204 }
205
206 if (ret < 0) {
207 ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid <<
208 ", ret=" << ret << dendl;
209 goto done;
210 }
211
212 schedule_tag_removal(io.index, io.tag);
213
214 done:
215 ios.pop_front();
216 }
217
218 void schedule_tag_removal(int index, string tag) {
219 auto& rt = remove_tags[index];
220
221 // since every element of a chain tries to add the same tag, and
222 // since chains are handled sequentially, check to make sure it's
223 // not already on the list
224 if (rt.empty() || rt.back() != tag) {
225 rt.push_back(tag);
226 if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) {
227 flush_remove_tags(index, rt);
228 }
229 }
230 }
231
232 void drain_ios() {
233 while (!ios.empty()) {
234 if (gc->going_down()) {
235 return;
236 }
237 handle_next_completion();
238 }
239 }
240
241 void drain() {
242 drain_ios();
243 flush_remove_tags();
244 /* the tags draining might have generated more ios, drain those too */
245 drain_ios();
246 }
247
248 void flush_remove_tags(int index, vector<string>& rt) {
249 IO index_io;
250 index_io.type = IO::IndexIO;
251 index_io.index = index;
252
253 ldpp_dout(dpp, 20) << __func__ <<
254 " removing entries from gc log shard index=" << index << ", size=" <<
255 rt.size() << ", entries=" << rt << dendl;
256
257 int ret = gc->remove(index, rt, &index_io.c);
258 rt.clear();
259 if (ret < 0) {
260 /* we already cleared list of tags, this prevents us from
261 * ballooning in case of a persistent problem
262 */
263 ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" <<
264 index << " ret=" << ret << dendl;
265 return;
266 }
267
268 ios.push_back(index_io);
269 }
270
271 void flush_remove_tags() {
272 int index = 0;
273 for (auto& rt : remove_tags) {
274 flush_remove_tags(index, rt);
275 ++index;
276 }
277 }
278 }; // class RGWGCIOManger
279
280 int RGWGC::process(int index, int max_secs, bool expired_only,
281 RGWGCIOManager& io_manager)
282 {
283 ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" <<
284 index << ", max_secs=" << max_secs << ", expired_only=" <<
285 expired_only << dendl;
286
287 rados::cls::lock::Lock l(gc_index_lock_name);
288 utime_t end = ceph_clock_now();
289
290 /* max_secs should be greater than zero. We don't want a zero max_secs
291 * to be translated as no timeout, since we'd then need to break the
292 * lock and that would require a manual intervention. In this case
293 * we can just wait it out. */
294 if (max_secs <= 0)
295 return -EAGAIN;
296
297 end += max_secs;
298 utime_t time(max_secs, 0);
299 l.set_duration(time);
300
301 int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]);
302 if (ret == -EBUSY) { /* already locked by another gc processor */
303 ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " <<
304 obj_names[index] << dendl;
305 return 0;
306 }
307 if (ret < 0)
308 return ret;
309
310 string marker;
311 string next_marker;
312 bool truncated;
313 IoCtx *ctx = new IoCtx;
314 do {
315 int max = 100;
316 std::list<cls_rgw_gc_obj_info> entries;
317
318 ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max,
319 expired_only, entries, &truncated, next_marker);
320 ldpp_dout(this, 20) <<
321 "RGWGC::process cls_rgw_gc_list returned with returned:" << ret <<
322 ", entries.size=" << entries.size() << ", truncated=" << truncated <<
323 ", next_marker='" << next_marker << "'" << dendl;
324
325 if (ret == -ENOENT) {
326 ret = 0;
327 goto done;
328 }
329 if (ret < 0)
330 goto done;
331
332 marker = next_marker;
333
334 string last_pool;
335 std::list<cls_rgw_gc_obj_info>::iterator iter;
336 for (iter = entries.begin(); iter != entries.end(); ++iter) {
337 cls_rgw_gc_obj_info& info = *iter;
338
339 ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" <<
340 info.tag << "', time=" << info.time << ", chain.objs.size()=" <<
341 info.chain.objs.size() << dendl;
342
343 std::list<cls_rgw_obj>::iterator liter;
344 cls_rgw_obj_chain& chain = info.chain;
345
346 utime_t now = ceph_clock_now();
347 if (now >= end) {
348 goto done;
349 }
350
351 if (chain.objs.empty()) {
352 io_manager.schedule_tag_removal(index, info.tag);
353 } else {
354 for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) {
355 cls_rgw_obj& obj = *liter;
356
357 if (obj.pool != last_pool) {
358 delete ctx;
359 ctx = new IoCtx;
360 ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx);
361 if (ret < 0) {
362 last_pool = "";
363 ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" <<
364 obj.pool << dendl;
365 continue;
366 }
367 last_pool = obj.pool;
368 }
369
370 ctx->locator_set_key(obj.loc);
371
372 const string& oid = obj.key.name; /* just stored raw oid there */
373
374 ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool <<
375 ":" << obj.key.name << dendl;
376 ObjectWriteOperation op;
377 cls_refcount_put(op, info.tag, true);
378
379 ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag);
380 if (ret < 0) {
381 ldpp_dout(this, 0) <<
382 "WARNING: failed to schedule deletion for oid=" << oid << dendl;
383 }
384 if (going_down()) {
385 // leave early, even if tag isn't removed, it's ok since it
386 // will be picked up next time around
387 goto done;
388 }
389 } // chains loop
390 } // else -- chains not empty
391 } // entries loop
392 } while (truncated);
393
394 done:
395 /* we don't drain here, because if we're going down we don't want to
396 * hold the system if backend is unresponsive
397 */
398 l.unlock(&store->gc_pool_ctx, obj_names[index]);
399 delete ctx;
400
401 return 0;
402 }
403
404 int RGWGC::process(bool expired_only)
405 {
406 int max_secs = cct->_conf->rgw_gc_processor_max_time;
407
408 const int start = ceph::util::generate_random_number(0, max_objs - 1);
409
410 RGWGCIOManager io_manager(this, store->ctx(), this);
411
412 for (int i = 0; i < max_objs; i++) {
413 int index = (i + start) % max_objs;
414 int ret = process(index, max_secs, expired_only, io_manager);
415 if (ret < 0)
416 return ret;
417 }
418 if (!going_down()) {
419 io_manager.drain();
420 }
421
422 return 0;
423 }
424
425 bool RGWGC::going_down()
426 {
427 return down_flag;
428 }
429
430 void RGWGC::start_processor()
431 {
432 worker = new GCWorker(this, cct, this);
433 worker->create("rgw_gc");
434 }
435
436 void RGWGC::stop_processor()
437 {
438 down_flag = true;
439 if (worker) {
440 worker->stop();
441 worker->join();
442 }
443 delete worker;
444 worker = NULL;
445 }
446
447 unsigned RGWGC::get_subsys() const
448 {
449 return dout_subsys;
450 }
451
452 std::ostream& RGWGC::gen_prefix(std::ostream& out) const
453 {
454 return out << "garbage collection: ";
455 }
456
457 void *RGWGC::GCWorker::entry() {
458 do {
459 utime_t start = ceph_clock_now();
460 ldpp_dout(dpp, 2) << "garbage collection: start" << dendl;
461 int r = gc->process(true);
462 if (r < 0) {
463 ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl;
464 }
465 ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl;
466
467 if (gc->going_down())
468 break;
469
470 utime_t end = ceph_clock_now();
471 end -= start;
472 int secs = cct->_conf->rgw_gc_processor_period;
473
474 if (secs <= end.sec())
475 continue; // next round
476
477 secs -= end.sec();
478
479 lock.Lock();
480 cond.WaitInterval(lock, utime_t(secs, 0));
481 lock.Unlock();
482 } while (!gc->going_down());
483
484 return NULL;
485 }
486
487 void RGWGC::GCWorker::stop()
488 {
489 Mutex::Locker l(lock);
490 cond.Signal();
491 }