]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_gc.h" | |
5 | #include "include/rados/librados.hpp" | |
6 | #include "cls/rgw/cls_rgw_client.h" | |
7 | #include "cls/refcount/cls_refcount_client.h" | |
8 | #include "cls/lock/cls_lock_client.h" | |
11fdf7f2 | 9 | #include "include/random.h" |
7c673cae FG |
10 | |
11 | #include <list> | |
11fdf7f2 | 12 | #include <sstream> |
7c673cae FG |
13 | |
14 | #define dout_context g_ceph_context | |
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
7c673cae FG |
17 | using namespace librados; |
18 | ||
19 | static string gc_oid_prefix = "gc"; | |
20 | static string gc_index_lock_name = "gc_process"; | |
21 | ||
22 | ||
7c673cae FG |
23 | void RGWGC::initialize(CephContext *_cct, RGWRados *_store) { |
24 | cct = _cct; | |
25 | store = _store; | |
26 | ||
c07f9fc5 | 27 | max_objs = min(static_cast<int>(cct->_conf->rgw_gc_max_objs), rgw_shards_max()); |
7c673cae FG |
28 | |
29 | obj_names = new string[max_objs]; | |
30 | ||
31 | for (int i = 0; i < max_objs; i++) { | |
32 | obj_names[i] = gc_oid_prefix; | |
33 | char buf[32]; | |
34 | snprintf(buf, 32, ".%d", i); | |
35 | obj_names[i].append(buf); | |
36 | } | |
37 | } | |
38 | ||
39 | void RGWGC::finalize() | |
40 | { | |
41 | delete[] obj_names; | |
42 | } | |
43 | ||
44 | int RGWGC::tag_index(const string& tag) | |
45 | { | |
1adf2230 | 46 | return rgw_shard_id(tag, max_objs); |
7c673cae FG |
47 | } |
48 | ||
49 | void RGWGC::add_chain(ObjectWriteOperation& op, cls_rgw_obj_chain& chain, const string& tag) | |
50 | { | |
51 | cls_rgw_gc_obj_info info; | |
52 | info.chain = chain; | |
53 | info.tag = tag; | |
54 | ||
55 | cls_rgw_gc_set_entry(op, cct->_conf->rgw_gc_obj_min_wait, info); | |
56 | } | |
57 | ||
58 | int RGWGC::send_chain(cls_rgw_obj_chain& chain, const string& tag, bool sync) | |
59 | { | |
60 | ObjectWriteOperation op; | |
61 | add_chain(op, chain, tag); | |
62 | ||
63 | int i = tag_index(tag); | |
64 | ||
65 | if (sync) | |
66 | return store->gc_operate(obj_names[i], &op); | |
67 | ||
68 | return store->gc_aio_operate(obj_names[i], &op); | |
69 | } | |
70 | ||
71 | int RGWGC::defer_chain(const string& tag, bool sync) | |
72 | { | |
73 | ObjectWriteOperation op; | |
74 | cls_rgw_gc_defer_entry(op, cct->_conf->rgw_gc_obj_min_wait, tag); | |
75 | ||
76 | int i = tag_index(tag); | |
77 | ||
78 | if (sync) | |
79 | return store->gc_operate(obj_names[i], &op); | |
80 | ||
81 | return store->gc_aio_operate(obj_names[i], &op); | |
82 | } | |
83 | ||
11fdf7f2 | 84 | int RGWGC::remove(int index, const std::vector<string>& tags, AioCompletion **pc) |
7c673cae FG |
85 | { |
86 | ObjectWriteOperation op; | |
87 | cls_rgw_gc_remove(op, tags); | |
11fdf7f2 | 88 | return store->gc_aio_operate(obj_names[index], &op, pc); |
7c673cae FG |
89 | } |
90 | ||
91 | int RGWGC::list(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated) | |
92 | { | |
93 | result.clear(); | |
31f18b77 | 94 | string next_marker; |
7c673cae FG |
95 | |
96 | for (; *index < max_objs && result.size() < max; (*index)++, marker.clear()) { | |
97 | std::list<cls_rgw_gc_obj_info> entries; | |
31f18b77 | 98 | int ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[*index], marker, max - result.size(), expired_only, entries, truncated, next_marker); |
7c673cae FG |
99 | if (ret == -ENOENT) |
100 | continue; | |
101 | if (ret < 0) | |
102 | return ret; | |
103 | ||
104 | std::list<cls_rgw_gc_obj_info>::iterator iter; | |
105 | for (iter = entries.begin(); iter != entries.end(); ++iter) { | |
106 | result.push_back(*iter); | |
107 | } | |
108 | ||
31f18b77 FG |
109 | marker = next_marker; |
110 | ||
7c673cae FG |
111 | if (*index == max_objs - 1) { |
112 | /* we cut short here, truncated will hold the correct value */ | |
113 | return 0; | |
114 | } | |
115 | ||
116 | if (result.size() == max) { | |
117 | /* close approximation, it might be that the next of the objects don't hold | |
118 | * anything, in this case truncated should have been false, but we can find | |
119 | * that out on the next iteration | |
120 | */ | |
121 | *truncated = true; | |
122 | return 0; | |
123 | } | |
124 | ||
125 | } | |
126 | *truncated = false; | |
127 | ||
128 | return 0; | |
129 | } | |
130 | ||
11fdf7f2 TL |
131 | class RGWGCIOManager { |
132 | const DoutPrefixProvider* dpp; | |
133 | CephContext *cct; | |
134 | RGWGC *gc; | |
135 | ||
136 | struct IO { | |
137 | enum Type { | |
138 | UnknownIO = 0, | |
139 | TailIO = 1, | |
140 | IndexIO = 2, | |
141 | } type{UnknownIO}; | |
142 | librados::AioCompletion *c{nullptr}; | |
143 | string oid; | |
144 | int index{-1}; | |
145 | string tag; | |
146 | }; | |
147 | ||
148 | deque<IO> ios; | |
149 | vector<std::vector<string> > remove_tags; | |
150 | ||
151 | #define MAX_AIO_DEFAULT 10 | |
152 | size_t max_aio{MAX_AIO_DEFAULT}; | |
153 | ||
154 | public: | |
155 | RGWGCIOManager(const DoutPrefixProvider* _dpp, CephContext *_cct, RGWGC *_gc) : dpp(_dpp), | |
156 | cct(_cct), | |
157 | gc(_gc), | |
158 | remove_tags(cct->_conf->rgw_gc_max_objs) { | |
159 | max_aio = cct->_conf->rgw_gc_max_concurrent_io; | |
160 | } | |
161 | ||
162 | ~RGWGCIOManager() { | |
163 | for (auto io : ios) { | |
164 | io.c->release(); | |
165 | } | |
166 | } | |
167 | ||
168 | int schedule_io(IoCtx *ioctx, const string& oid, ObjectWriteOperation *op, | |
169 | int index, const string& tag) { | |
170 | while (ios.size() > max_aio) { | |
171 | if (gc->going_down()) { | |
172 | return 0; | |
173 | } | |
174 | handle_next_completion(); | |
175 | } | |
176 | ||
177 | AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); | |
178 | int ret = ioctx->aio_operate(oid, c, op); | |
179 | if (ret < 0) { | |
180 | return ret; | |
181 | } | |
182 | ios.push_back(IO{IO::TailIO, c, oid, index, tag}); | |
183 | ||
184 | return 0; | |
185 | } | |
186 | ||
187 | void handle_next_completion() { | |
188 | ceph_assert(!ios.empty()); | |
189 | IO& io = ios.front(); | |
190 | io.c->wait_for_safe(); | |
191 | int ret = io.c->get_return_value(); | |
192 | io.c->release(); | |
193 | ||
194 | if (ret == -ENOENT) { | |
195 | ret = 0; | |
196 | } | |
197 | ||
198 | if (io.type == IO::IndexIO) { | |
199 | if (ret < 0) { | |
200 | ldpp_dout(dpp, 0) << "WARNING: gc cleanup of tags on gc shard index=" << | |
201 | io.index << " returned error, ret=" << ret << dendl; | |
202 | } | |
203 | goto done; | |
204 | } | |
205 | ||
206 | if (ret < 0) { | |
207 | ldpp_dout(dpp, 0) << "WARNING: gc could not remove oid=" << io.oid << | |
208 | ", ret=" << ret << dendl; | |
209 | goto done; | |
210 | } | |
211 | ||
212 | schedule_tag_removal(io.index, io.tag); | |
213 | ||
214 | done: | |
215 | ios.pop_front(); | |
216 | } | |
217 | ||
218 | void schedule_tag_removal(int index, string tag) { | |
219 | auto& rt = remove_tags[index]; | |
220 | ||
221 | // since every element of a chain tries to add the same tag, and | |
222 | // since chains are handled sequentially, check to make sure it's | |
223 | // not already on the list | |
224 | if (rt.empty() || rt.back() != tag) { | |
225 | rt.push_back(tag); | |
226 | if (rt.size() >= (size_t)cct->_conf->rgw_gc_max_trim_chunk) { | |
227 | flush_remove_tags(index, rt); | |
228 | } | |
229 | } | |
230 | } | |
231 | ||
232 | void drain_ios() { | |
233 | while (!ios.empty()) { | |
234 | if (gc->going_down()) { | |
235 | return; | |
236 | } | |
237 | handle_next_completion(); | |
238 | } | |
239 | } | |
240 | ||
241 | void drain() { | |
242 | drain_ios(); | |
243 | flush_remove_tags(); | |
244 | /* the tags draining might have generated more ios, drain those too */ | |
245 | drain_ios(); | |
246 | } | |
247 | ||
248 | void flush_remove_tags(int index, vector<string>& rt) { | |
249 | IO index_io; | |
250 | index_io.type = IO::IndexIO; | |
251 | index_io.index = index; | |
252 | ||
11fdf7f2 TL |
253 | ldpp_dout(dpp, 20) << __func__ << |
254 | " removing entries from gc log shard index=" << index << ", size=" << | |
81eedcae | 255 | rt.size() << ", entries=" << rt << dendl; |
11fdf7f2 TL |
256 | |
257 | int ret = gc->remove(index, rt, &index_io.c); | |
258 | rt.clear(); | |
259 | if (ret < 0) { | |
260 | /* we already cleared list of tags, this prevents us from | |
261 | * ballooning in case of a persistent problem | |
262 | */ | |
263 | ldpp_dout(dpp, 0) << "WARNING: failed to remove tags on gc shard index=" << | |
264 | index << " ret=" << ret << dendl; | |
265 | return; | |
266 | } | |
267 | ||
268 | ios.push_back(index_io); | |
269 | } | |
270 | ||
271 | void flush_remove_tags() { | |
272 | int index = 0; | |
273 | for (auto& rt : remove_tags) { | |
274 | flush_remove_tags(index, rt); | |
275 | ++index; | |
276 | } | |
277 | } | |
278 | }; // class RGWGCIOManger | |
279 | ||
280 | int RGWGC::process(int index, int max_secs, bool expired_only, | |
281 | RGWGCIOManager& io_manager) | |
7c673cae | 282 | { |
11fdf7f2 TL |
283 | ldpp_dout(this, 20) << "RGWGC::process entered with GC index_shard=" << |
284 | index << ", max_secs=" << max_secs << ", expired_only=" << | |
285 | expired_only << dendl; | |
286 | ||
7c673cae FG |
287 | rados::cls::lock::Lock l(gc_index_lock_name); |
288 | utime_t end = ceph_clock_now(); | |
7c673cae FG |
289 | |
290 | /* max_secs should be greater than zero. We don't want a zero max_secs | |
291 | * to be translated as no timeout, since we'd then need to break the | |
292 | * lock and that would require a manual intervention. In this case | |
293 | * we can just wait it out. */ | |
294 | if (max_secs <= 0) | |
295 | return -EAGAIN; | |
296 | ||
297 | end += max_secs; | |
298 | utime_t time(max_secs, 0); | |
299 | l.set_duration(time); | |
300 | ||
301 | int ret = l.lock_exclusive(&store->gc_pool_ctx, obj_names[index]); | |
302 | if (ret == -EBUSY) { /* already locked by another gc processor */ | |
11fdf7f2 TL |
303 | ldpp_dout(this, 10) << "RGWGC::process failed to acquire lock on " << |
304 | obj_names[index] << dendl; | |
7c673cae FG |
305 | return 0; |
306 | } | |
307 | if (ret < 0) | |
308 | return ret; | |
309 | ||
310 | string marker; | |
31f18b77 | 311 | string next_marker; |
7c673cae FG |
312 | bool truncated; |
313 | IoCtx *ctx = new IoCtx; | |
314 | do { | |
315 | int max = 100; | |
316 | std::list<cls_rgw_gc_obj_info> entries; | |
11fdf7f2 TL |
317 | |
318 | ret = cls_rgw_gc_list(store->gc_pool_ctx, obj_names[index], marker, max, | |
319 | expired_only, entries, &truncated, next_marker); | |
320 | ldpp_dout(this, 20) << | |
321 | "RGWGC::process cls_rgw_gc_list returned with returned:" << ret << | |
322 | ", entries.size=" << entries.size() << ", truncated=" << truncated << | |
323 | ", next_marker='" << next_marker << "'" << dendl; | |
324 | ||
7c673cae FG |
325 | if (ret == -ENOENT) { |
326 | ret = 0; | |
327 | goto done; | |
328 | } | |
329 | if (ret < 0) | |
330 | goto done; | |
331 | ||
11fdf7f2 TL |
332 | marker = next_marker; |
333 | ||
7c673cae FG |
334 | string last_pool; |
335 | std::list<cls_rgw_gc_obj_info>::iterator iter; | |
336 | for (iter = entries.begin(); iter != entries.end(); ++iter) { | |
7c673cae | 337 | cls_rgw_gc_obj_info& info = *iter; |
11fdf7f2 TL |
338 | |
339 | ldpp_dout(this, 20) << "RGWGC::process iterating over entry tag='" << | |
340 | info.tag << "', time=" << info.time << ", chain.objs.size()=" << | |
341 | info.chain.objs.size() << dendl; | |
342 | ||
7c673cae FG |
343 | std::list<cls_rgw_obj>::iterator liter; |
344 | cls_rgw_obj_chain& chain = info.chain; | |
345 | ||
346 | utime_t now = ceph_clock_now(); | |
11fdf7f2 | 347 | if (now >= end) { |
7c673cae | 348 | goto done; |
11fdf7f2 TL |
349 | } |
350 | ||
351 | if (chain.objs.empty()) { | |
352 | io_manager.schedule_tag_removal(index, info.tag); | |
353 | } else { | |
354 | for (liter = chain.objs.begin(); liter != chain.objs.end(); ++liter) { | |
355 | cls_rgw_obj& obj = *liter; | |
356 | ||
357 | if (obj.pool != last_pool) { | |
358 | delete ctx; | |
359 | ctx = new IoCtx; | |
360 | ret = rgw_init_ioctx(store->get_rados_handle(), obj.pool, *ctx); | |
361 | if (ret < 0) { | |
362 | last_pool = ""; | |
363 | ldpp_dout(this, 0) << "ERROR: failed to create ioctx pool=" << | |
364 | obj.pool << dendl; | |
365 | continue; | |
366 | } | |
367 | last_pool = obj.pool; | |
368 | } | |
369 | ||
370 | ctx->locator_set_key(obj.loc); | |
371 | ||
372 | const string& oid = obj.key.name; /* just stored raw oid there */ | |
7c673cae | 373 | |
11fdf7f2 TL |
374 | ldpp_dout(this, 5) << "RGWGC::process removing " << obj.pool << |
375 | ":" << obj.key.name << dendl; | |
376 | ObjectWriteOperation op; | |
377 | cls_refcount_put(op, info.tag, true); | |
7c673cae | 378 | |
11fdf7f2 | 379 | ret = io_manager.schedule_io(ctx, oid, &op, index, info.tag); |
7c673cae | 380 | if (ret < 0) { |
11fdf7f2 TL |
381 | ldpp_dout(this, 0) << |
382 | "WARNING: failed to schedule deletion for oid=" << oid << dendl; | |
7c673cae | 383 | } |
11fdf7f2 TL |
384 | if (going_down()) { |
385 | // leave early, even if tag isn't removed, it's ok since it | |
386 | // will be picked up next time around | |
387 | goto done; | |
388 | } | |
389 | } // chains loop | |
390 | } // else -- chains not empty | |
391 | } // entries loop | |
7c673cae FG |
392 | } while (truncated); |
393 | ||
394 | done: | |
11fdf7f2 TL |
395 | /* we don't drain here, because if we're going down we don't want to |
396 | * hold the system if backend is unresponsive | |
397 | */ | |
7c673cae FG |
398 | l.unlock(&store->gc_pool_ctx, obj_names[index]); |
399 | delete ctx; | |
11fdf7f2 | 400 | |
7c673cae FG |
401 | return 0; |
402 | } | |
403 | ||
11fdf7f2 | 404 | int RGWGC::process(bool expired_only) |
7c673cae FG |
405 | { |
406 | int max_secs = cct->_conf->rgw_gc_processor_max_time; | |
407 | ||
11fdf7f2 TL |
408 | const int start = ceph::util::generate_random_number(0, max_objs - 1); |
409 | ||
410 | RGWGCIOManager io_manager(this, store->ctx(), this); | |
7c673cae FG |
411 | |
412 | for (int i = 0; i < max_objs; i++) { | |
413 | int index = (i + start) % max_objs; | |
11fdf7f2 | 414 | int ret = process(index, max_secs, expired_only, io_manager); |
7c673cae FG |
415 | if (ret < 0) |
416 | return ret; | |
417 | } | |
11fdf7f2 TL |
418 | if (!going_down()) { |
419 | io_manager.drain(); | |
420 | } | |
7c673cae FG |
421 | |
422 | return 0; | |
423 | } | |
424 | ||
425 | bool RGWGC::going_down() | |
426 | { | |
427 | return down_flag; | |
428 | } | |
429 | ||
430 | void RGWGC::start_processor() | |
431 | { | |
11fdf7f2 | 432 | worker = new GCWorker(this, cct, this); |
7c673cae FG |
433 | worker->create("rgw_gc"); |
434 | } | |
435 | ||
436 | void RGWGC::stop_processor() | |
437 | { | |
438 | down_flag = true; | |
439 | if (worker) { | |
440 | worker->stop(); | |
441 | worker->join(); | |
442 | } | |
443 | delete worker; | |
444 | worker = NULL; | |
445 | } | |
446 | ||
11fdf7f2 TL |
447 | unsigned RGWGC::get_subsys() const |
448 | { | |
449 | return dout_subsys; | |
450 | } | |
451 | ||
452 | std::ostream& RGWGC::gen_prefix(std::ostream& out) const | |
453 | { | |
454 | return out << "garbage collection: "; | |
455 | } | |
456 | ||
7c673cae FG |
457 | void *RGWGC::GCWorker::entry() { |
458 | do { | |
459 | utime_t start = ceph_clock_now(); | |
11fdf7f2 TL |
460 | ldpp_dout(dpp, 2) << "garbage collection: start" << dendl; |
461 | int r = gc->process(true); | |
7c673cae | 462 | if (r < 0) { |
11fdf7f2 | 463 | ldpp_dout(dpp, 0) << "ERROR: garbage collection process() returned error r=" << r << dendl; |
7c673cae | 464 | } |
11fdf7f2 | 465 | ldpp_dout(dpp, 2) << "garbage collection: stop" << dendl; |
7c673cae FG |
466 | |
467 | if (gc->going_down()) | |
468 | break; | |
469 | ||
470 | utime_t end = ceph_clock_now(); | |
471 | end -= start; | |
472 | int secs = cct->_conf->rgw_gc_processor_period; | |
473 | ||
474 | if (secs <= end.sec()) | |
475 | continue; // next round | |
476 | ||
477 | secs -= end.sec(); | |
478 | ||
479 | lock.Lock(); | |
480 | cond.WaitInterval(lock, utime_t(secs, 0)); | |
481 | lock.Unlock(); | |
482 | } while (!gc->going_down()); | |
483 | ||
484 | return NULL; | |
485 | } | |
486 | ||
487 | void RGWGC::GCWorker::stop() | |
488 | { | |
489 | Mutex::Locker l(lock); | |
490 | cond.Signal(); | |
491 | } |