]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_gc.h" | |
5 | #include "include/rados/librados.hpp" | |
6 | #include "cls/rgw/cls_rgw_client.h" | |
7 | #include "cls/refcount/cls_refcount_client.h" | |
8 | #include "cls/lock/cls_lock_client.h" | |
11fdf7f2 | 9 | #include "include/random.h" |
7c673cae FG |
10 | |
11 | #include <list> | |
11fdf7f2 | 12 | #include <sstream> |
7c673cae FG |
13 | |
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
7c673cae FG |
17 | using namespace librados; |
18 | ||
19 | static string gc_oid_prefix = "gc"; | |
20 | static string gc_index_lock_name = "gc_process"; | |
21 | ||
22 | ||
7c673cae FG |
23 | void RGWGC::initialize(CephContext *_cct, RGWRados *_store) { |
24 | cct = _cct; | |
25 | store = _store; | |
26 | ||
c07f9fc5 | 27 | max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()); |
7c673cae FG |
28 | |
29 | obj_names = new string[max_objs]; | |
30 | ||
31 | for (int i = 0; i < max_objs; i++) { | |
32 | obj_names[i] = gc_oid_prefix; | |
33 | char buf[32]; | |
34 | snprintf(buf, 32, ".%d", i); | |
35 | obj_names[i].append(buf); | |
36 | } | |
37 | } | |
38 | ||
39 | void RGWGC::finalize() | |
40 | { | |
41 | delete[] obj_names; | |
42 | } | |
43 | ||
44 | int RGWGC::tag_index(const string& tag) | |
45 | { | |
1adf2230 | 46 | return rgw_shard_id(tag, max_objs); |
7c673cae FG |
47 | } |
48 | ||
49 | void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag) | |
50 | { | |
51 | cls_rgw_gc_obj_info info; | |
52 | info.chain = chain; | |
53 | info.tag = tag; | |
54 | ||
55 | cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); | |
56 | } | |
57 | ||
58 | int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync) | |
59 | { | |
60 | ObjectWriteOperation op; | |
61 | add_chain(op, chain, tag); | |
62 | ||
63 | int i = tag_index(tag); | |
64 | ||
65 | if (sync) | |
66 | return store->gc_operate(obj_names[i], &op); | |
67 | ||
68 | return store->gc_aio_operate(obj_names[i], &op); | |
69 | } | |
70 | ||
71 | int RGWGC::defer_chain(const string& tag, bool sync) | |
72 | { | |
73 | ObjectWriteOperation op; | |
74 | cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag); | |
75 | ||
76 | int i = tag_index(tag); | |
77 | ||
78 | if (sync) | |
79 | return store->gc_operate(obj_names[i], &op); | |
80 | ||
81 | return store->gc_aio_operate(obj_names[i], &op); | |
82 | } | |
83 | ||
11fdf7f2 | 84 | int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc) |
7c673cae FG |
85 | { |
86 | ObjectWriteOperation op; | |
87 | cls_rgw_gc_remove(op, tags); | |
11fdf7f2 | 88 | return store->gc_aio_operate(obj_names[index], &op, pc); |
7c673cae FG |
89 | } |
90 | ||
91 | int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated) | |
92 | { | |
93 | result.clear(); | |
31f18b77 | 94 | string next_marker; |
7c673cae FG |
95 | |
96 | for (; *index < max_objs && result.size() < max; (*index)++, marker.clear()) { | |
97 | std::list<cls_rgw_gc_obj_info> entries; | |
31f18b77 | 98 | int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker); |
7c673cae FG |
99 | if (ret == -ENOENT) |
100 | continue; | |
101 | if (ret < 0) | |
102 | return ret; | |
103 | ||
104 | std::list<cls_rgw_gc_obj_info>::iterator iter; | |
105 | for (iter = entries.begin(); iter != entries.end(); ++iter) { | |
106 | result.push_back(*iter); | |
107 | } | |
108 | ||
31f18b77 FG |
109 | marker = next_marker; |
110 | ||
7c673cae FG |
111 | if (*index == max_objs - 1) { |
112 | /* we cut short here, truncated will hold the correct value */ | |
113 | return 0; | |
114 | } | |
115 | ||
116 | if (result.size() == max) { | |
117 | /* close approximation, it might be that the next of the objects don't hold | |
118 | * anything, in this case truncated should have been false, but we can find | |
119 | * that out on the next iteration | |
120 | */ | |
121 | *truncated = true; | |
122 | return 0; | |
123 | } | |
124 | ||
125 | } | |
126 | *truncated = false; | |
127 | ||
128 | return 0; | |
129 | } | |
130 | ||
11fdf7f2 TL |
131 | class RGWGCIOManager { |
132 | const DoutPrefixProvider* dpp; | |
133 | CephContext *cct; | |
134 | RGWGC *gc; | |
135 | ||
136 | struct IO { | |
137 | enum Type { | |
138 | UnknownIO = 0, | |
139 | TailIO = 1, | |
140 | IndexIO = 2, | |
141 | } type{UnknownIO}; | |
142 | librados::AioCompletion *c{nullptr}; | |
143 | string oid; | |
144 | int index{-1}; | |
145 | string tag; | |
146 | }; | |
147 | ||
148 | deque<IO> ios; | |
149 | vector<std::vector<string> > remove_tags; | |
150 | ||
151 | #define MAX_AIO_DEFAULT 10 | |
152 | size_t max_aio{MAX_AIO_DEFAULT}; | |
153 | ||
154 | public: | |
155 | RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), | |
156 | cct(_cct), | |
157 | gc(_gc), | |
158 | remove_tags(cct->_conf->rgw_gc_max_objs) { | |
159 | max_aio = cct->_conf->rgw_gc_max_concurrent_io; | |
160 | } | |
161 | ||
162 | ~RGWGCIOManager() { | |
163 | for (auto io : ios) { | |
164 | io.c->release(); | |
165 | } | |
166 | } | |
167 | ||
168 | int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op, | |
169 | int index, const string& tag) { | |
170 | while (ios.size() > max_aio) { | |
171 | if (gc->going_down()) { | |
172 | return 0; | |
173 | } | |
174 | handle_next_completion(); | |
175 | } | |
176 | ||
177 | AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); | |
178 | int ret = ioctx->aio_operate(oid, c, op); | |
179 | if (ret < 0) { | |
180 | return ret; | |
181 | } | |
182 | ios.push_back(IO{IO::TailIO, c, oid, index, tag}); | |
183 | ||
184 | return 0; | |
185 | } | |
186 | ||
187 | void handle_next_completion() { | |
188 | ceph_assert(!ios.empty()); | |
189 | IO& io = ios.front(); | |
190 | io.c->wait_for_safe(); | |
191 | int ret = io.c->get_return_value(); | |
192 | io.c->release(); | |
193 | ||
194 | if (ret == -ENOENT) { | |
195 | ret = 0; | |
196 | } | |
197 | ||
198 | if (io.type == IO::IndexIO) { | |
199 | if (ret < 0) { | |
200 | ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" << | |
201 | io.index << " returned error, ret=" << ret << dendl; | |
202 | } | |
203 | goto done; | |
204 | } | |
205 | ||
206 | if (ret < 0) { | |
207 | ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid << | |
208 | ", ret=" << ret << dendl; | |
209 | goto done; | |
210 | } | |
211 | ||
212 | schedule_tag_removal(io.index, io.tag); | |
213 | ||
214 | done: | |
215 | ios.pop_front(); | |
216 | } | |
217 | ||
218 | void schedule_tag_removal(int index, string tag) { | |
219 | auto& rt = remove_tags[index]; | |
220 | ||
221 | // since every element of a chain tries to add the same tag, and | |
222 | // since chains are handled sequentially, check to make sure it's | |
223 | // not already on the list | |
224 | if (rt.empty() || rt.back() != tag) { | |
225 | rt.push_back(tag); | |
226 | if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) { | |
227 | flush_remove_tags(index, rt); | |
228 | } | |
229 | } | |
230 | } | |
231 | ||
232 | void drain_ios() { | |
233 | while (!ios.empty()) { | |
234 | if (gc->going_down()) { | |
235 | return; | |
236 | } | |
237 | handle_next_completion(); | |
238 | } | |
239 | } | |
240 | ||
241 | void drain() { | |
242 | drain_ios(); | |
243 | flush_remove_tags(); | |
244 | /* the tags draining might have generated more ios, drain those too */ | |
245 | drain_ios(); | |
246 | } | |
247 | ||
248 | void flush_remove_tags(int index, vector<string>& rt) { | |
249 | IO index_io; | |
250 | index_io.type = IO::IndexIO; | |
251 | index_io.index = index; | |
252 | ||
253 | // use lambda to assemble list, so it will only get executed if | |
254 | // we're at the appropirate logging level | |
255 | auto lister = [&rt]() -> std::string { | |
256 | std::stringstream out; | |
257 | bool first = true; | |
258 | ||
259 | for (const auto& s : rt) { | |
260 | if (first) { | |
261 | first = false; | |
262 | } else { | |
263 | out << ", "; | |
264 | } | |
265 | out << s; | |
266 | } | |
267 | ||
268 | return out.str(); | |
269 | }; | |
270 | ||
271 | ldpp_dout(dpp, 20) << __func__ << | |
272 | " removing entries from gc log shard index=" << index << ", size=" << | |
273 | rt.size() << ", entries=[" << lister() << "]" << dendl; | |
274 | ||
275 | int ret = gc->remove(index, rt, &index_io.c); | |
276 | rt.clear(); | |
277 | if (ret < 0) { | |
278 | /* we already cleared list of tags, this prevents us from | |
279 | * ballooning in case of a persistent problem | |
280 | */ | |
281 | ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" << | |
282 | index << " ret=" << ret << dendl; | |
283 | return; | |
284 | } | |
285 | ||
286 | ios.push_back(index_io); | |
287 | } | |
288 | ||
289 | void flush_remove_tags() { | |
290 | int index = 0; | |
291 | for (auto& rt : remove_tags) { | |
292 | flush_remove_tags(index, rt); | |
293 | ++index; | |
294 | } | |
295 | } | |
296 | }; // class RGWGCIOManger | |
297 | ||
298 | int RGWGC::process(int index, int max_secs, bool expired_only, | |
299 | RGWGCIOManager& io_manager) | |
7c673cae | 300 | { |
11fdf7f2 TL |
301 | ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" << |
302 | index << ", max_secs=" << max_secs << ", expired_only=" << | |
303 | expired_only << dendl; | |
304 | ||
7c673cae FG |
305 | rados::cls::lock::Lock l(gc_index_lock_name); |
306 | utime_t end = ceph_clock_now(); | |
7c673cae FG |
307 | |
308 | /* max_secs should be greater than zero. We don't want a zero max_secs | |
309 | * to be translated as no timeout, since we'd then need to break the | |
310 | * lock and that would require a manual intervention. In this case | |
311 | * we can just wait it out. */ | |
312 | if (max_secs <= 0) | |
313 | return -EAGAIN; | |
314 | ||
315 | end += max_secs; | |
316 | utime_t time(max_secs, 0); | |
317 | l.set_duration(time); | |
318 | ||
319 | int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]); | |
320 | if (ret == -EBUSY) { /* already locked by another gc processor */ | |
11fdf7f2 TL |
321 | ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " << |
322 | obj_names[index] << dendl; | |
7c673cae FG |
323 | return 0; |
324 | } | |
325 | if (ret < 0) | |
326 | return ret; | |
327 | ||
328 | string marker; | |
31f18b77 | 329 | string next_marker; |
7c673cae FG |
330 | bool truncated; |
331 | IoCtx *ctx = new IoCtx; | |
332 | do { | |
333 | int max = 100; | |
334 | std::list<cls_rgw_gc_obj_info> entries; | |
11fdf7f2 TL |
335 | |
336 | ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, | |
337 | expired_only, entries, &truncated, next_marker); | |
338 | ldpp_dout(this, 20) << | |
339 | "RGWGC::process cls_rgw_gc_list returned with returned:" << ret << | |
340 | ", entries.size=" << entries.size() << ", truncated=" << truncated << | |
341 | ", next_marker='" << next_marker << "'" << dendl; | |
342 | ||
7c673cae FG |
343 | if (ret == -ENOENT) { |
344 | ret = 0; | |
345 | goto done; | |
346 | } | |
347 | if (ret < 0) | |
348 | goto done; | |
349 | ||
11fdf7f2 TL |
350 | marker = next_marker; |
351 | ||
7c673cae FG |
352 | string last_pool; |
353 | std::list<cls_rgw_gc_obj_info>::iterator iter; | |
354 | for (iter = entries.begin(); iter != entries.end(); ++iter) { | |
7c673cae | 355 | cls_rgw_gc_obj_info& info = *iter; |
11fdf7f2 TL |
356 | |
357 | ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" << | |
358 | info.tag << "', time=" << info.time << ", chain.objs.size()=" << | |
359 | info.chain.objs.size() << dendl; | |
360 | ||
7c673cae FG |
361 | std::list<cls_rgw_obj>::iterator liter; |
362 | cls_rgw_obj_chain& chain = info.chain; | |
363 | ||
364 | utime_t now = ceph_clock_now(); | |
11fdf7f2 | 365 | if (now >= end) { |
7c673cae | 366 | goto done; |
11fdf7f2 TL |
367 | } |
368 | ||
369 | if (chain.objs.empty()) { | |
370 | io_manager.schedule_tag_removal(index, info.tag); | |
371 | } else { | |
372 | for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) { | |
373 | cls_rgw_obj& obj = *liter; | |
374 | ||
375 | if (obj.pool != last_pool) { | |
376 | delete ctx; | |
377 | ctx = new IoCtx; | |
378 | ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx); | |
379 | if (ret < 0) { | |
380 | last_pool = ""; | |
381 | ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" << | |
382 | obj.pool << dendl; | |
383 | continue; | |
384 | } | |
385 | last_pool = obj.pool; | |
386 | } | |
387 | ||
388 | ctx->locator_set_key(obj.loc); | |
389 | ||
390 | const string& oid = obj.key.name; /* just stored raw oid there */ | |
7c673cae | 391 | |
11fdf7f2 TL |
392 | ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool << |
393 | ":" << obj.key.name << dendl; | |
394 | ObjectWriteOperation op; | |
395 | cls_refcount_put(op, info.tag, true); | |
7c673cae | 396 | |
11fdf7f2 | 397 | ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag); |
7c673cae | 398 | if (ret < 0) { |
11fdf7f2 TL |
399 | ldpp_dout(this, 0) << |
400 | "WARNING: failed to schedule deletion for oid=" << oid << dendl; | |
7c673cae | 401 | } |
11fdf7f2 TL |
402 | if (going_down()) { |
403 | // leave early, even if tag isn't removed, it's ok since it | |
404 | // will be picked up next time around | |
405 | goto done; | |
406 | } | |
407 | } // chains loop | |
408 | } // else -- chains not empty | |
409 | } // entries loop | |
7c673cae FG |
410 | } while (truncated); |
411 | ||
412 | done: | |
11fdf7f2 TL |
413 | /* we don't drain here, because if we're going down we don't want to |
414 | * hold the system if backend is unresponsive | |
415 | */ | |
7c673cae FG |
416 | l.unlock(&store->gc_pool_ctx, obj_names[index]); |
417 | delete ctx; | |
11fdf7f2 | 418 | |
7c673cae FG |
419 | return 0; |
420 | } | |
421 | ||
11fdf7f2 | 422 | int RGWGC::process(bool expired_only) |
7c673cae FG |
423 | { |
424 | int max_secs = cct->_conf->rgw_gc_processor_max_time; | |
425 | ||
11fdf7f2 TL |
426 | const int start = ceph::util::generate_random_number(0, max_objs - 1); |
427 | ||
428 | RGWGCIOManager io_manager(this, store->ctx(), this); | |
7c673cae FG |
429 | |
430 | for (int i = 0; i < max_objs; i++) { | |
431 | int index = (i + start) % max_objs; | |
11fdf7f2 | 432 | int ret = process(index, max_secs, expired_only, io_manager); |
7c673cae FG |
433 | if (ret < 0) |
434 | return ret; | |
435 | } | |
11fdf7f2 TL |
436 | if (!going_down()) { |
437 | io_manager.drain(); | |
438 | } | |
7c673cae FG |
439 | |
440 | return 0; | |
441 | } | |
442 | ||
443 | bool RGWGC::going_down() | |
444 | { | |
445 | return down_flag; | |
446 | } | |
447 | ||
448 | void RGWGC::start_processor() | |
449 | { | |
11fdf7f2 | 450 | worker = new GCWorker(this, cct, this); |
7c673cae FG |
451 | worker->create("rgw_gc"); |
452 | } | |
453 | ||
454 | void RGWGC::stop_processor() | |
455 | { | |
456 | down_flag = true; | |
457 | if (worker) { | |
458 | worker->stop(); | |
459 | worker->join(); | |
460 | } | |
461 | delete worker; | |
462 | worker = NULL; | |
463 | } | |
464 | ||
11fdf7f2 TL |
465 | unsigned RGWGC::get_subsys() const |
466 | { | |
467 | return dout_subsys; | |
468 | } | |
469 | ||
470 | std::ostream& RGWGC::gen_prefix(std::ostream& out) const | |
471 | { | |
472 | return out << "garbage collection: "; | |
473 | } | |
474 | ||
7c673cae FG |
475 | void *RGWGC::GCWorker::entry() { |
476 | do { | |
477 | utime_t start = ceph_clock_now(); | |
11fdf7f2 TL |
478 | ldpp_dout(dpp, 2) << "garbage collection: start" << dendl; |
479 | int r = gc->process(true); | |
7c673cae | 480 | if (r < 0) { |
11fdf7f2 | 481 | ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl; |
7c673cae | 482 | } |
11fdf7f2 | 483 | ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl; |
7c673cae FG |
484 | |
485 | if (gc->going_down()) | |
486 | break; | |
487 | ||
488 | utime_t end = ceph_clock_now(); | |
489 | end -= start; | |
490 | int secs = cct->_conf->rgw_gc_processor_period; | |
491 | ||
492 | if (secs <= end.sec()) | |
493 | continue; // next round | |
494 | ||
495 | secs -= end.sec(); | |
496 | ||
497 | lock.Lock(); | |
498 | cond.WaitInterval(lock, utime_t(secs, 0)); | |
499 | lock.Unlock(); | |
500 | } while (!gc->going_down()); | |
501 | ||
502 | return NULL; | |
503 | } | |
504 | ||
505 | void RGWGC::GCWorker::stop() | |
506 | { | |
507 | Mutex::Locker l(lock); | |
508 | cond.Signal(); | |
509 | } |