]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #include <errno.h> | |
5 | #include <iostream> | |
6 | #include <sstream> | |
7 | #include <string> | |
8 | ||
7c673cae FG |
9 | |
10 | #include "auth/Crypto.h" | |
11 | ||
12 | #include "common/armor.h" | |
13 | #include "common/ceph_json.h" | |
14 | #include "common/config.h" | |
15 | #include "common/ceph_argparse.h" | |
16 | #include "common/Formatter.h" | |
17 | #include "common/errno.h" | |
18 | ||
19 | #include "global/global_init.h" | |
20 | ||
21 | #include "include/utime.h" | |
22 | #include "include/str_list.h" | |
23 | ||
24 | #include "rgw_user.h" | |
25 | #include "rgw_bucket.h" | |
7c673cae FG |
26 | #include "rgw_acl.h" |
27 | #include "rgw_acl_s3.h" | |
28 | #include "rgw_log.h" | |
29 | #include "rgw_formats.h" | |
30 | #include "rgw_usage.h" | |
7c673cae | 31 | #include "rgw_object_expirer_core.h" |
9f95a23c | 32 | #include "rgw_zone.h" |
f67539c2 | 33 | #include "rgw_sal_rados.h" |
7c673cae | 34 | |
9f95a23c TL |
35 | #include "services/svc_rados.h" |
36 | #include "services/svc_zone.h" | |
11fdf7f2 | 37 | #include "services/svc_sys_obj.h" |
9f95a23c | 38 | #include "services/svc_bi_rados.h" |
11fdf7f2 | 39 | |
7c673cae | 40 | #include "cls/lock/cls_lock_client.h" |
9f95a23c | 41 | #include "cls/timeindex/cls_timeindex_client.h" |
7c673cae FG |
42 | |
43 | #define dout_context g_ceph_context | |
44 | #define dout_subsys ceph_subsys_rgw | |
45 | ||
20effc67 TL |
46 | using namespace std; |
47 | ||
7c673cae FG |
48 | static string objexp_lock_name = "gc_process"; |
49 | ||
9f95a23c TL |
50 | static string objexp_hint_get_shardname(int shard_num) |
51 | { | |
52 | char buf[64]; | |
53 | snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num); | |
54 | return buf; | |
55 | } | |
56 | ||
57 | static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards) | |
58 | { | |
59 | string obj_key = key.name + key.instance; | |
60 | return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key, num_shards); | |
61 | } | |
62 | ||
63 | static string objexp_hint_get_keyext(const string& tenant_name, | |
64 | const string& bucket_name, | |
65 | const string& bucket_id, | |
66 | const rgw_obj_key& obj_key) { | |
67 | return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id + | |
68 | ":" + obj_key.name + ":" + obj_key.instance; | |
69 | } | |
70 | ||
71 | static void objexp_get_shard(int shard_num, | |
72 | string *shard) | |
73 | { | |
74 | *shard = objexp_hint_get_shardname(shard_num); | |
75 | } | |
76 | ||
20effc67 | 77 | static int objexp_hint_parse(const DoutPrefixProvider *dpp, CephContext *cct, cls_timeindex_entry &ti_entry, |
9f95a23c TL |
78 | objexp_hint_entry *hint_entry) |
79 | { | |
80 | try { | |
81 | auto iter = ti_entry.value.cbegin(); | |
82 | decode(*hint_entry, iter); | |
83 | } catch (buffer::error& err) { | |
20effc67 | 84 | ldpp_dout(dpp, 0) << "ERROR: couldn't decode avail_pools" << dendl; |
9f95a23c TL |
85 | } |
86 | ||
87 | return 0; | |
88 | } | |
89 | ||
b3b6e05e TL |
90 | int RGWObjExpStore::objexp_hint_add(const DoutPrefixProvider *dpp, |
91 | const ceph::real_time& delete_at, | |
9f95a23c TL |
92 | const string& tenant_name, |
93 | const string& bucket_name, | |
94 | const string& bucket_id, | |
95 | const rgw_obj_index_key& obj_key) | |
96 | { | |
97 | const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name, | |
98 | bucket_id, obj_key); | |
99 | objexp_hint_entry he = { | |
100 | .tenant = tenant_name, | |
101 | .bucket_name = bucket_name, | |
102 | .bucket_id = bucket_id, | |
103 | .obj_key = obj_key, | |
104 | .exp_time = delete_at }; | |
105 | bufferlist hebl; | |
106 | encode(he, hebl); | |
107 | librados::ObjectWriteOperation op; | |
108 | cls_timeindex_add(op, utime_t(delete_at), keyext, hebl); | |
109 | ||
110 | string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards)); | |
20effc67 | 111 | auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, shard_name)); |
b3b6e05e | 112 | int r = obj.open(dpp); |
9f95a23c | 113 | if (r < 0) { |
b3b6e05e | 114 | ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl; |
9f95a23c TL |
115 | return r; |
116 | } | |
b3b6e05e | 117 | return obj.operate(dpp, &op, null_yield); |
9f95a23c TL |
118 | } |
119 | ||
b3b6e05e TL |
120 | int RGWObjExpStore::objexp_hint_list(const DoutPrefixProvider *dpp, |
121 | const string& oid, | |
9f95a23c TL |
122 | const ceph::real_time& start_time, |
123 | const ceph::real_time& end_time, | |
124 | const int max_entries, | |
125 | const string& marker, | |
126 | list<cls_timeindex_entry>& entries, /* out */ | |
127 | string *out_marker, /* out */ | |
128 | bool *truncated) /* out */ | |
129 | { | |
130 | librados::ObjectReadOperation op; | |
131 | cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries, | |
132 | out_marker, truncated); | |
133 | ||
20effc67 | 134 | auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, oid)); |
b3b6e05e | 135 | int r = obj.open(dpp); |
9f95a23c | 136 | if (r < 0) { |
b3b6e05e | 137 | ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl; |
9f95a23c TL |
138 | return r; |
139 | } | |
140 | bufferlist obl; | |
b3b6e05e | 141 | int ret = obj.operate(dpp, &op, &obl, null_yield); |
9f95a23c TL |
142 | |
143 | if ((ret < 0 ) && (ret != -ENOENT)) { | |
144 | return ret; | |
145 | } | |
146 | ||
147 | if ((ret == -ENOENT) && truncated) { | |
148 | *truncated = false; | |
149 | } | |
150 | ||
151 | return 0; | |
152 | } | |
153 | ||
b3b6e05e TL |
154 | static int cls_timeindex_trim_repeat(const DoutPrefixProvider *dpp, |
155 | rgw_rados_ref ref, | |
9f95a23c TL |
156 | const string& oid, |
157 | const utime_t& from_time, | |
158 | const utime_t& to_time, | |
159 | const string& from_marker, | |
160 | const string& to_marker) | |
161 | { | |
162 | bool done = false; | |
163 | do { | |
164 | librados::ObjectWriteOperation op; | |
165 | cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker); | |
b3b6e05e | 166 | int r = rgw_rados_operate(dpp, ref.pool.ioctx(), oid, &op, null_yield); |
9f95a23c TL |
167 | if (r == -ENODATA) |
168 | done = true; | |
169 | else if (r < 0) | |
170 | return r; | |
171 | } while (!done); | |
172 | ||
173 | return 0; | |
174 | } | |
175 | ||
b3b6e05e TL |
176 | int RGWObjExpStore::objexp_hint_trim(const DoutPrefixProvider *dpp, |
177 | const string& oid, | |
9f95a23c TL |
178 | const ceph::real_time& start_time, |
179 | const ceph::real_time& end_time, | |
180 | const string& from_marker, | |
181 | const string& to_marker) | |
182 | { | |
20effc67 | 183 | auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, oid)); |
b3b6e05e | 184 | int r = obj.open(dpp); |
9f95a23c | 185 | if (r < 0) { |
b3b6e05e | 186 | ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl; |
9f95a23c TL |
187 | return r; |
188 | } | |
189 | auto& ref = obj.get_ref(); | |
b3b6e05e | 190 | int ret = cls_timeindex_trim_repeat(dpp, ref, oid, utime_t(start_time), utime_t(end_time), |
9f95a23c TL |
191 | from_marker, to_marker); |
192 | if ((ret < 0 ) && (ret != -ENOENT)) { | |
193 | return ret; | |
194 | } | |
195 | ||
196 | return 0; | |
197 | } | |
198 | ||
b3b6e05e | 199 | int RGWObjectExpirer::garbage_single_object(const DoutPrefixProvider *dpp, objexp_hint_entry& hint) |
7c673cae FG |
200 | { |
201 | RGWBucketInfo bucket_info; | |
20effc67 | 202 | std::unique_ptr<rgw::sal::Bucket> bucket; |
7c673cae | 203 | |
20effc67 | 204 | int ret = store->get_bucket(dpp, nullptr, rgw_bucket(hint.tenant, hint.bucket_name, hint.bucket_id), &bucket, null_yield); |
7c673cae | 205 | if (-ENOENT == ret) { |
b3b6e05e | 206 | ldpp_dout(dpp, 15) << "NOTICE: cannot find bucket = " \ |
7c673cae FG |
207 | << hint.bucket_name << ". The object must be already removed" << dendl; |
208 | return -ERR_PRECONDITION_FAILED; | |
209 | } else if (ret < 0) { | |
b3b6e05e | 210 | ldpp_dout(dpp, 1) << "ERROR: could not init bucket = " \ |
7c673cae FG |
211 | << hint.bucket_name << "due to ret = " << ret << dendl; |
212 | return ret; | |
213 | } | |
214 | ||
215 | RGWObjectCtx rctx(store); | |
216 | ||
217 | rgw_obj_key key = hint.obj_key; | |
218 | if (key.instance.empty()) { | |
219 | key.instance = "null"; | |
220 | } | |
221 | ||
20effc67 TL |
222 | std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(key); |
223 | obj->set_atomic(&rctx); | |
224 | ret = obj->delete_object(dpp, &rctx, null_yield); | |
7c673cae FG |
225 | |
226 | return ret; | |
227 | } | |
228 | ||
b3b6e05e TL |
229 | void RGWObjectExpirer::garbage_chunk(const DoutPrefixProvider *dpp, |
230 | list<cls_timeindex_entry>& entries, /* in */ | |
7c673cae FG |
231 | bool& need_trim) /* out */ |
232 | { | |
233 | need_trim = false; | |
234 | ||
235 | for (list<cls_timeindex_entry>::iterator iter = entries.begin(); | |
236 | iter != entries.end(); | |
237 | ++iter) | |
238 | { | |
239 | objexp_hint_entry hint; | |
b3b6e05e | 240 | ldpp_dout(dpp, 15) << "got removal hint for: " << iter->key_ts.sec() \ |
7c673cae FG |
241 | << " - " << iter->key_ext << dendl; |
242 | ||
20effc67 | 243 | int ret = objexp_hint_parse(dpp, store->ctx(), *iter, &hint); |
7c673cae | 244 | if (ret < 0) { |
b3b6e05e | 245 | ldpp_dout(dpp, 1) << "cannot parse removal hint for " << hint.obj_key << dendl; |
7c673cae FG |
246 | continue; |
247 | } | |
248 | ||
249 | /* PRECOND_FAILED simply means that our hint is not valid. | |
250 | * We can silently ignore that and move forward. */ | |
b3b6e05e | 251 | ret = garbage_single_object(dpp, hint); |
7c673cae | 252 | if (ret == -ERR_PRECONDITION_FAILED) { |
b3b6e05e | 253 | ldpp_dout(dpp, 15) << "not actual hint for object: " << hint.obj_key << dendl; |
7c673cae | 254 | } else if (ret < 0) { |
b3b6e05e | 255 | ldpp_dout(dpp, 1) << "cannot remove expired object: " << hint.obj_key << dendl; |
7c673cae FG |
256 | } |
257 | ||
258 | need_trim = true; | |
259 | } | |
260 | ||
261 | return; | |
262 | } | |
263 | ||
b3b6e05e TL |
264 | void RGWObjectExpirer::trim_chunk(const DoutPrefixProvider *dpp, |
265 | const string& shard, | |
7c673cae FG |
266 | const utime_t& from, |
267 | const utime_t& to, | |
268 | const string& from_marker, | |
269 | const string& to_marker) | |
270 | { | |
b3b6e05e | 271 | ldpp_dout(dpp, 20) << "trying to trim removal hints to=" << to |
7c673cae FG |
272 | << ", to_marker=" << to_marker << dendl; |
273 | ||
274 | real_time rt_from = from.to_real_time(); | |
275 | real_time rt_to = to.to_real_time(); | |
276 | ||
b3b6e05e | 277 | int ret = exp_store.objexp_hint_trim(dpp, shard, rt_from, rt_to, |
9f95a23c | 278 | from_marker, to_marker); |
7c673cae | 279 | if (ret < 0) { |
b3b6e05e | 280 | ldpp_dout(dpp, 0) << "ERROR during trim: " << ret << dendl; |
7c673cae FG |
281 | } |
282 | ||
283 | return; | |
284 | } | |
285 | ||
b3b6e05e TL |
286 | bool RGWObjectExpirer::process_single_shard(const DoutPrefixProvider *dpp, |
287 | const string& shard, | |
7c673cae FG |
288 | const utime_t& last_run, |
289 | const utime_t& round_start) | |
290 | { | |
291 | string marker; | |
292 | string out_marker; | |
293 | bool truncated = false; | |
294 | bool done = true; | |
295 | ||
296 | CephContext *cct = store->ctx(); | |
297 | int num_entries = cct->_conf->rgw_objexp_chunk_size; | |
298 | ||
299 | int max_secs = cct->_conf->rgw_objexp_gc_interval; | |
300 | utime_t end = ceph_clock_now(); | |
301 | end += max_secs; | |
302 | ||
303 | rados::cls::lock::Lock l(objexp_lock_name); | |
304 | ||
305 | utime_t time(max_secs, 0); | |
306 | l.set_duration(time); | |
307 | ||
20effc67 | 308 | int ret = l.lock_exclusive(&static_cast<rgw::sal::RadosStore*>(store)->getRados()->objexp_pool_ctx, shard); |
7c673cae | 309 | if (ret == -EBUSY) { /* already locked by another processor */ |
b3b6e05e | 310 | ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " << shard << dendl; |
7c673cae FG |
311 | return false; |
312 | } | |
313 | ||
314 | do { | |
315 | real_time rt_last = last_run.to_real_time(); | |
316 | real_time rt_start = round_start.to_real_time(); | |
317 | ||
318 | list<cls_timeindex_entry> entries; | |
b3b6e05e | 319 | ret = exp_store.objexp_hint_list(dpp, shard, rt_last, rt_start, |
9f95a23c TL |
320 | num_entries, marker, entries, |
321 | &out_marker, &truncated); | |
7c673cae | 322 | if (ret < 0) { |
b3b6e05e | 323 | ldpp_dout(dpp, 10) << "cannot get removal hints from shard: " << shard |
7c673cae FG |
324 | << dendl; |
325 | continue; | |
326 | } | |
327 | ||
328 | bool need_trim; | |
b3b6e05e | 329 | garbage_chunk(dpp, entries, need_trim); |
7c673cae FG |
330 | |
331 | if (need_trim) { | |
b3b6e05e | 332 | trim_chunk(dpp, shard, last_run, round_start, marker, out_marker); |
7c673cae FG |
333 | } |
334 | ||
335 | utime_t now = ceph_clock_now(); | |
336 | if (now >= end) { | |
337 | done = false; | |
338 | break; | |
339 | } | |
340 | ||
341 | marker = out_marker; | |
342 | } while (truncated); | |
343 | ||
20effc67 | 344 | l.unlock(&static_cast<rgw::sal::RadosStore*>(store)->getRados()->objexp_pool_ctx, shard); |
7c673cae FG |
345 | return done; |
346 | } | |
347 | ||
348 | /* Returns true if all shards have been processed successfully. */ | |
b3b6e05e TL |
349 | bool RGWObjectExpirer::inspect_all_shards(const DoutPrefixProvider *dpp, |
350 | const utime_t& last_run, | |
7c673cae FG |
351 | const utime_t& round_start) |
352 | { | |
353 | CephContext * const cct = store->ctx(); | |
354 | int num_shards = cct->_conf->rgw_objexp_hints_num_shards; | |
355 | bool all_done = true; | |
356 | ||
357 | for (int i = 0; i < num_shards; i++) { | |
358 | string shard; | |
9f95a23c | 359 | objexp_get_shard(i, &shard); |
7c673cae | 360 | |
b3b6e05e | 361 | ldpp_dout(dpp, 20) << "processing shard = " << shard << dendl; |
7c673cae | 362 | |
b3b6e05e | 363 | if (! process_single_shard(dpp, shard, last_run, round_start)) { |
7c673cae FG |
364 | all_done = false; |
365 | } | |
366 | } | |
367 | ||
368 | return all_done; | |
369 | } | |
370 | ||
371 | bool RGWObjectExpirer::going_down() | |
372 | { | |
373 | return down_flag; | |
374 | } | |
375 | ||
376 | void RGWObjectExpirer::start_processor() | |
377 | { | |
378 | worker = new OEWorker(store->ctx(), this); | |
379 | worker->create("rgw_obj_expirer"); | |
380 | } | |
381 | ||
382 | void RGWObjectExpirer::stop_processor() | |
383 | { | |
384 | down_flag = true; | |
385 | if (worker) { | |
386 | worker->stop(); | |
387 | worker->join(); | |
388 | } | |
389 | delete worker; | |
390 | worker = NULL; | |
391 | } | |
392 | ||
393 | void *RGWObjectExpirer::OEWorker::entry() { | |
394 | utime_t last_run; | |
395 | do { | |
396 | utime_t start = ceph_clock_now(); | |
20effc67 | 397 | ldpp_dout(this, 2) << "object expiration: start" << dendl; |
b3b6e05e | 398 | if (oe->inspect_all_shards(this, last_run, start)) { |
7c673cae FG |
399 | /* All shards have been processed properly. Next time we can start |
400 | * from this moment. */ | |
401 | last_run = start; | |
402 | } | |
20effc67 | 403 | ldpp_dout(this, 2) << "object expiration: stop" << dendl; |
7c673cae FG |
404 | |
405 | ||
406 | if (oe->going_down()) | |
407 | break; | |
408 | ||
409 | utime_t end = ceph_clock_now(); | |
410 | end -= start; | |
411 | int secs = cct->_conf->rgw_objexp_gc_interval; | |
412 | ||
413 | if (secs <= end.sec()) | |
414 | continue; // next round | |
415 | ||
416 | secs -= end.sec(); | |
417 | ||
9f95a23c TL |
418 | std::unique_lock l{lock}; |
419 | cond.wait_for(l, std::chrono::seconds(secs)); | |
7c673cae FG |
420 | } while (!oe->going_down()); |
421 | ||
422 | return NULL; | |
423 | } | |
424 | ||
425 | void RGWObjectExpirer::OEWorker::stop() | |
426 | { | |
9f95a23c TL |
427 | std::lock_guard l{lock}; |
428 | cond.notify_all(); | |
7c673cae FG |
429 | } |
430 | ||
b3b6e05e TL |
431 | CephContext *RGWObjectExpirer::OEWorker::get_cct() const |
432 | { | |
433 | return cct; | |
434 | } | |
435 | ||
436 | unsigned RGWObjectExpirer::OEWorker::get_subsys() const | |
437 | { | |
438 | return dout_subsys; | |
439 | } | |
440 | ||
441 | std::ostream& RGWObjectExpirer::OEWorker::gen_prefix(std::ostream& out) const | |
442 | { | |
443 | return out << "rgw object expirer Worker thread: "; | |
444 | } |