]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_object_expirer_core.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_object_expirer_core.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5 #include <iostream>
6 #include <sstream>
7 #include <string>
8
9
10 #include "auth/Crypto.h"
11
12 #include "common/armor.h"
13 #include "common/ceph_json.h"
14 #include "common/config.h"
15 #include "common/ceph_argparse.h"
16 #include "common/Formatter.h"
17 #include "common/errno.h"
18
19 #include "global/global_init.h"
20
21 #include "include/utime.h"
22 #include "include/str_list.h"
23
24 #include "rgw_user.h"
25 #include "rgw_bucket.h"
26 #include "rgw_rados.h"
27 #include "rgw_acl.h"
28 #include "rgw_acl_s3.h"
29 #include "rgw_log.h"
30 #include "rgw_formats.h"
31 #include "rgw_usage.h"
32 #include "rgw_object_expirer_core.h"
33 #include "rgw_zone.h"
34
35 #include "services/svc_rados.h"
36 #include "services/svc_zone.h"
37 #include "services/svc_sys_obj.h"
38 #include "services/svc_bi_rados.h"
39
40 #include "cls/lock/cls_lock_client.h"
41 #include "cls/timeindex/cls_timeindex_client.h"
42
43 #define dout_context g_ceph_context
44 #define dout_subsys ceph_subsys_rgw
45
46 static string objexp_lock_name = "gc_process";
47
48 static string objexp_hint_get_shardname(int shard_num)
49 {
50 char buf[64];
51 snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num);
52 return buf;
53 }
54
55 static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards)
56 {
57 string obj_key = key.name + key.instance;
58 return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key, num_shards);
59 }
60
61 static string objexp_hint_get_keyext(const string& tenant_name,
62 const string& bucket_name,
63 const string& bucket_id,
64 const rgw_obj_key& obj_key) {
65 return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
66 ":" + obj_key.name + ":" + obj_key.instance;
67 }
68
69 static void objexp_get_shard(int shard_num,
70 string *shard)
71 {
72 *shard = objexp_hint_get_shardname(shard_num);
73 }
74
75 static int objexp_hint_parse(CephContext *cct, cls_timeindex_entry &ti_entry,
76 objexp_hint_entry *hint_entry)
77 {
78 try {
79 auto iter = ti_entry.value.cbegin();
80 decode(*hint_entry, iter);
81 } catch (buffer::error& err) {
82 ldout(cct, 0) << "ERROR: couldn't decode avail_pools" << dendl;
83 }
84
85 return 0;
86 }
87
88 int RGWObjExpStore::objexp_hint_add(const ceph::real_time& delete_at,
89 const string& tenant_name,
90 const string& bucket_name,
91 const string& bucket_id,
92 const rgw_obj_index_key& obj_key)
93 {
94 const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
95 bucket_id, obj_key);
96 objexp_hint_entry he = {
97 .tenant = tenant_name,
98 .bucket_name = bucket_name,
99 .bucket_id = bucket_id,
100 .obj_key = obj_key,
101 .exp_time = delete_at };
102 bufferlist hebl;
103 encode(he, hebl);
104 librados::ObjectWriteOperation op;
105 cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
106
107 string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards));
108 auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, shard_name));
109 int r = obj.open();
110 if (r < 0) {
111 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
112 return r;
113 }
114 return obj.operate(&op, null_yield);
115 }
116
117 int RGWObjExpStore::objexp_hint_list(const string& oid,
118 const ceph::real_time& start_time,
119 const ceph::real_time& end_time,
120 const int max_entries,
121 const string& marker,
122 list<cls_timeindex_entry>& entries, /* out */
123 string *out_marker, /* out */
124 bool *truncated) /* out */
125 {
126 librados::ObjectReadOperation op;
127 cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
128 out_marker, truncated);
129
130 auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid));
131 int r = obj.open();
132 if (r < 0) {
133 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
134 return r;
135 }
136 bufferlist obl;
137 int ret = obj.operate(&op, &obl, null_yield);
138
139 if ((ret < 0 ) && (ret != -ENOENT)) {
140 return ret;
141 }
142
143 if ((ret == -ENOENT) && truncated) {
144 *truncated = false;
145 }
146
147 return 0;
148 }
149
150 static int cls_timeindex_trim_repeat(rgw_rados_ref ref,
151 const string& oid,
152 const utime_t& from_time,
153 const utime_t& to_time,
154 const string& from_marker,
155 const string& to_marker)
156 {
157 bool done = false;
158 do {
159 librados::ObjectWriteOperation op;
160 cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker);
161 int r = rgw_rados_operate(ref.pool.ioctx(), oid, &op, null_yield);
162 if (r == -ENODATA)
163 done = true;
164 else if (r < 0)
165 return r;
166 } while (!done);
167
168 return 0;
169 }
170
171 int RGWObjExpStore::objexp_hint_trim(const string& oid,
172 const ceph::real_time& start_time,
173 const ceph::real_time& end_time,
174 const string& from_marker,
175 const string& to_marker)
176 {
177 auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_zone_params().log_pool, oid));
178 int r = obj.open();
179 if (r < 0) {
180 ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
181 return r;
182 }
183 auto& ref = obj.get_ref();
184 int ret = cls_timeindex_trim_repeat(ref, oid, utime_t(start_time), utime_t(end_time),
185 from_marker, to_marker);
186 if ((ret < 0 ) && (ret != -ENOENT)) {
187 return ret;
188 }
189
190 return 0;
191 }
192
193 int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
194 const string& bucket_name,
195 const string& bucket_id,
196 RGWBucketInfo& bucket_info)
197 {
198 /*
199 * XXX Here's where it gets tricky. We went to all the trouble of
200 * punching the tenant through the objexp_hint_entry, but now we
201 * find that our instances do not actually have tenants. They are
202 * unique thanks to IDs. So the tenant string is not needed...
203
204 * XXX reloaded: it turns out tenants were needed after all since bucket ids
205 * are ephemeral, good call encoding tenant info!
206 */
207
208 return store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name,
209 bucket_info, nullptr, null_yield, nullptr);
210
211 }
212
213 int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
214 {
215 RGWBucketInfo bucket_info;
216
217 int ret = init_bucket_info(hint.tenant, hint.bucket_name,
218 hint.bucket_id, bucket_info);
219 if (-ENOENT == ret) {
220 ldout(store->ctx(), 15) << "NOTICE: cannot find bucket = " \
221 << hint.bucket_name << ". The object must be already removed" << dendl;
222 return -ERR_PRECONDITION_FAILED;
223 } else if (ret < 0) {
224 ldout(store->ctx(), 1) << "ERROR: could not init bucket = " \
225 << hint.bucket_name << "due to ret = " << ret << dendl;
226 return ret;
227 }
228
229 RGWObjectCtx rctx(store);
230
231 rgw_obj_key key = hint.obj_key;
232 if (key.instance.empty()) {
233 key.instance = "null";
234 }
235
236 rgw_obj obj(bucket_info.bucket, key);
237 store->getRados()->set_atomic(&rctx, obj);
238 ret = store->getRados()->delete_obj(rctx, bucket_info, obj,
239 bucket_info.versioning_status(), 0, hint.exp_time);
240
241 return ret;
242 }
243
244 void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
245 bool& need_trim) /* out */
246 {
247 need_trim = false;
248
249 for (list<cls_timeindex_entry>::iterator iter = entries.begin();
250 iter != entries.end();
251 ++iter)
252 {
253 objexp_hint_entry hint;
254 ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
255 << " - " << iter->key_ext << dendl;
256
257 int ret = objexp_hint_parse(store->getRados()->ctx(), *iter, &hint);
258 if (ret < 0) {
259 ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
260 continue;
261 }
262
263 /* PRECOND_FAILED simply means that our hint is not valid.
264 * We can silently ignore that and move forward. */
265 ret = garbage_single_object(hint);
266 if (ret == -ERR_PRECONDITION_FAILED) {
267 ldout(store->ctx(), 15) << "not actual hint for object: " << hint.obj_key << dendl;
268 } else if (ret < 0) {
269 ldout(store->ctx(), 1) << "cannot remove expired object: " << hint.obj_key << dendl;
270 }
271
272 need_trim = true;
273 }
274
275 return;
276 }
277
278 void RGWObjectExpirer::trim_chunk(const string& shard,
279 const utime_t& from,
280 const utime_t& to,
281 const string& from_marker,
282 const string& to_marker)
283 {
284 ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
285 << ", to_marker=" << to_marker << dendl;
286
287 real_time rt_from = from.to_real_time();
288 real_time rt_to = to.to_real_time();
289
290 int ret = exp_store.objexp_hint_trim(shard, rt_from, rt_to,
291 from_marker, to_marker);
292 if (ret < 0) {
293 ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
294 }
295
296 return;
297 }
298
299 bool RGWObjectExpirer::process_single_shard(const string& shard,
300 const utime_t& last_run,
301 const utime_t& round_start)
302 {
303 string marker;
304 string out_marker;
305 bool truncated = false;
306 bool done = true;
307
308 CephContext *cct = store->ctx();
309 int num_entries = cct->_conf->rgw_objexp_chunk_size;
310
311 int max_secs = cct->_conf->rgw_objexp_gc_interval;
312 utime_t end = ceph_clock_now();
313 end += max_secs;
314
315 rados::cls::lock::Lock l(objexp_lock_name);
316
317 utime_t time(max_secs, 0);
318 l.set_duration(time);
319
320 int ret = l.lock_exclusive(&store->getRados()->objexp_pool_ctx, shard);
321 if (ret == -EBUSY) { /* already locked by another processor */
322 dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
323 return false;
324 }
325
326 do {
327 real_time rt_last = last_run.to_real_time();
328 real_time rt_start = round_start.to_real_time();
329
330 list<cls_timeindex_entry> entries;
331 ret = exp_store.objexp_hint_list(shard, rt_last, rt_start,
332 num_entries, marker, entries,
333 &out_marker, &truncated);
334 if (ret < 0) {
335 ldout(cct, 10) << "cannot get removal hints from shard: " << shard
336 << dendl;
337 continue;
338 }
339
340 bool need_trim;
341 garbage_chunk(entries, need_trim);
342
343 if (need_trim) {
344 trim_chunk(shard, last_run, round_start, marker, out_marker);
345 }
346
347 utime_t now = ceph_clock_now();
348 if (now >= end) {
349 done = false;
350 break;
351 }
352
353 marker = out_marker;
354 } while (truncated);
355
356 l.unlock(&store->getRados()->objexp_pool_ctx, shard);
357 return done;
358 }
359
360 /* Returns true if all shards have been processed successfully. */
361 bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
362 const utime_t& round_start)
363 {
364 CephContext * const cct = store->ctx();
365 int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
366 bool all_done = true;
367
368 for (int i = 0; i < num_shards; i++) {
369 string shard;
370 objexp_get_shard(i, &shard);
371
372 ldout(store->ctx(), 20) << "processing shard = " << shard << dendl;
373
374 if (! process_single_shard(shard, last_run, round_start)) {
375 all_done = false;
376 }
377 }
378
379 return all_done;
380 }
381
382 bool RGWObjectExpirer::going_down()
383 {
384 return down_flag;
385 }
386
387 void RGWObjectExpirer::start_processor()
388 {
389 worker = new OEWorker(store->ctx(), this);
390 worker->create("rgw_obj_expirer");
391 }
392
393 void RGWObjectExpirer::stop_processor()
394 {
395 down_flag = true;
396 if (worker) {
397 worker->stop();
398 worker->join();
399 }
400 delete worker;
401 worker = NULL;
402 }
403
404 void *RGWObjectExpirer::OEWorker::entry() {
405 utime_t last_run;
406 do {
407 utime_t start = ceph_clock_now();
408 ldout(cct, 2) << "object expiration: start" << dendl;
409 if (oe->inspect_all_shards(last_run, start)) {
410 /* All shards have been processed properly. Next time we can start
411 * from this moment. */
412 last_run = start;
413 }
414 ldout(cct, 2) << "object expiration: stop" << dendl;
415
416
417 if (oe->going_down())
418 break;
419
420 utime_t end = ceph_clock_now();
421 end -= start;
422 int secs = cct->_conf->rgw_objexp_gc_interval;
423
424 if (secs <= end.sec())
425 continue; // next round
426
427 secs -= end.sec();
428
429 std::unique_lock l{lock};
430 cond.wait_for(l, std::chrono::seconds(secs));
431 } while (!oe->going_down());
432
433 return NULL;
434 }
435
436 void RGWObjectExpirer::OEWorker::stop()
437 {
438 std::lock_guard l{lock};
439 cond.notify_all();
440 }
441