]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_object_expirer_core.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_object_expirer_core.cc
CommitLineData
7c673cae 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <errno.h>
5#include <iostream>
6#include <sstream>
7#include <string>
8
7c673cae
FG
9
10#include "auth/Crypto.h"
11
12#include "common/armor.h"
13#include "common/ceph_json.h"
14#include "common/config.h"
15#include "common/ceph_argparse.h"
16#include "common/Formatter.h"
17#include "common/errno.h"
18
19#include "global/global_init.h"
20
21#include "include/utime.h"
22#include "include/str_list.h"
23
24#include "rgw_user.h"
25#include "rgw_bucket.h"
7c673cae
FG
26#include "rgw_acl.h"
27#include "rgw_acl_s3.h"
28#include "rgw_log.h"
29#include "rgw_formats.h"
30#include "rgw_usage.h"
7c673cae 31#include "rgw_object_expirer_core.h"
9f95a23c 32#include "rgw_zone.h"
f67539c2 33#include "rgw_sal_rados.h"
7c673cae 34
9f95a23c
TL
35#include "services/svc_rados.h"
36#include "services/svc_zone.h"
11fdf7f2 37#include "services/svc_sys_obj.h"
9f95a23c 38#include "services/svc_bi_rados.h"
11fdf7f2 39
7c673cae 40#include "cls/lock/cls_lock_client.h"
9f95a23c 41#include "cls/timeindex/cls_timeindex_client.h"
7c673cae
FG
42
43#define dout_context g_ceph_context
44#define dout_subsys ceph_subsys_rgw
45
20effc67
TL
46using namespace std;
47
7c673cae
FG
48static string objexp_lock_name = "gc_process";
49
9f95a23c
TL
50static string objexp_hint_get_shardname(int shard_num)
51{
52 char buf[64];
53 snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num);
54 return buf;
55}
56
57static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards)
58{
59 string obj_key = key.name + key.instance;
60 return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key, num_shards);
61}
62
63static string objexp_hint_get_keyext(const string& tenant_name,
64 const string& bucket_name,
65 const string& bucket_id,
66 const rgw_obj_key& obj_key) {
67 return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
68 ":" + obj_key.name + ":" + obj_key.instance;
69}
70
71static void objexp_get_shard(int shard_num,
72 string *shard)
73{
74 *shard = objexp_hint_get_shardname(shard_num);
75}
76
20effc67 77static int objexp_hint_parse(const DoutPrefixProvider *dpp, CephContext *cct, cls_timeindex_entry &ti_entry,
9f95a23c
TL
78 objexp_hint_entry *hint_entry)
79{
80 try {
81 auto iter = ti_entry.value.cbegin();
82 decode(*hint_entry, iter);
83 } catch (buffer::error& err) {
20effc67 84 ldpp_dout(dpp, 0) << "ERROR: couldn't decode avail_pools" << dendl;
9f95a23c
TL
85 }
86
87 return 0;
88}
89
b3b6e05e
TL
90int RGWObjExpStore::objexp_hint_add(const DoutPrefixProvider *dpp,
91 const ceph::real_time& delete_at,
9f95a23c
TL
92 const string& tenant_name,
93 const string& bucket_name,
94 const string& bucket_id,
95 const rgw_obj_index_key& obj_key)
96{
97 const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
98 bucket_id, obj_key);
99 objexp_hint_entry he = {
100 .tenant = tenant_name,
101 .bucket_name = bucket_name,
102 .bucket_id = bucket_id,
103 .obj_key = obj_key,
104 .exp_time = delete_at };
105 bufferlist hebl;
106 encode(he, hebl);
107 librados::ObjectWriteOperation op;
108 cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
109
110 string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards));
20effc67 111 auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, shard_name));
b3b6e05e 112 int r = obj.open(dpp);
9f95a23c 113 if (r < 0) {
b3b6e05e 114 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
9f95a23c
TL
115 return r;
116 }
b3b6e05e 117 return obj.operate(dpp, &op, null_yield);
9f95a23c
TL
118}
119
b3b6e05e
TL
120int RGWObjExpStore::objexp_hint_list(const DoutPrefixProvider *dpp,
121 const string& oid,
9f95a23c
TL
122 const ceph::real_time& start_time,
123 const ceph::real_time& end_time,
124 const int max_entries,
125 const string& marker,
126 list<cls_timeindex_entry>& entries, /* out */
127 string *out_marker, /* out */
128 bool *truncated) /* out */
129{
130 librados::ObjectReadOperation op;
131 cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
132 out_marker, truncated);
133
20effc67 134 auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, oid));
b3b6e05e 135 int r = obj.open(dpp);
9f95a23c 136 if (r < 0) {
b3b6e05e 137 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
9f95a23c
TL
138 return r;
139 }
140 bufferlist obl;
b3b6e05e 141 int ret = obj.operate(dpp, &op, &obl, null_yield);
9f95a23c
TL
142
143 if ((ret < 0 ) && (ret != -ENOENT)) {
144 return ret;
145 }
146
147 if ((ret == -ENOENT) && truncated) {
148 *truncated = false;
149 }
150
151 return 0;
152}
153
b3b6e05e
TL
154static int cls_timeindex_trim_repeat(const DoutPrefixProvider *dpp,
155 rgw_rados_ref ref,
9f95a23c
TL
156 const string& oid,
157 const utime_t& from_time,
158 const utime_t& to_time,
159 const string& from_marker,
160 const string& to_marker)
161{
162 bool done = false;
163 do {
164 librados::ObjectWriteOperation op;
165 cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker);
b3b6e05e 166 int r = rgw_rados_operate(dpp, ref.pool.ioctx(), oid, &op, null_yield);
9f95a23c
TL
167 if (r == -ENODATA)
168 done = true;
169 else if (r < 0)
170 return r;
171 } while (!done);
172
173 return 0;
174}
175
b3b6e05e
TL
176int RGWObjExpStore::objexp_hint_trim(const DoutPrefixProvider *dpp,
177 const string& oid,
9f95a23c
TL
178 const ceph::real_time& start_time,
179 const ceph::real_time& end_time,
180 const string& from_marker,
181 const string& to_marker)
182{
20effc67 183 auto obj = rados_svc->obj(rgw_raw_obj(zone_svc->get_params().log_pool, oid));
b3b6e05e 184 int r = obj.open(dpp);
9f95a23c 185 if (r < 0) {
b3b6e05e 186 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
9f95a23c
TL
187 return r;
188 }
189 auto& ref = obj.get_ref();
b3b6e05e 190 int ret = cls_timeindex_trim_repeat(dpp, ref, oid, utime_t(start_time), utime_t(end_time),
9f95a23c
TL
191 from_marker, to_marker);
192 if ((ret < 0 ) && (ret != -ENOENT)) {
193 return ret;
194 }
195
196 return 0;
197}
198
b3b6e05e 199int RGWObjectExpirer::garbage_single_object(const DoutPrefixProvider *dpp, objexp_hint_entry& hint)
7c673cae
FG
200{
201 RGWBucketInfo bucket_info;
20effc67 202 std::unique_ptr<rgw::sal::Bucket> bucket;
7c673cae 203
20effc67 204 int ret = store->get_bucket(dpp, nullptr, rgw_bucket(hint.tenant, hint.bucket_name, hint.bucket_id), &bucket, null_yield);
7c673cae 205 if (-ENOENT == ret) {
b3b6e05e 206 ldpp_dout(dpp, 15) << "NOTICE: cannot find bucket = " \
7c673cae
FG
207 << hint.bucket_name << ". The object must be already removed" << dendl;
208 return -ERR_PRECONDITION_FAILED;
209 } else if (ret < 0) {
b3b6e05e 210 ldpp_dout(dpp, 1) << "ERROR: could not init bucket = " \
7c673cae
FG
211 << hint.bucket_name << "due to ret = " << ret << dendl;
212 return ret;
213 }
214
215 RGWObjectCtx rctx(store);
216
217 rgw_obj_key key = hint.obj_key;
218 if (key.instance.empty()) {
219 key.instance = "null";
220 }
221
20effc67
TL
222 std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(key);
223 obj->set_atomic(&rctx);
224 ret = obj->delete_object(dpp, &rctx, null_yield);
7c673cae
FG
225
226 return ret;
227}
228
b3b6e05e
TL
229void RGWObjectExpirer::garbage_chunk(const DoutPrefixProvider *dpp,
230 list<cls_timeindex_entry>& entries, /* in */
7c673cae
FG
231 bool& need_trim) /* out */
232{
233 need_trim = false;
234
235 for (list<cls_timeindex_entry>::iterator iter = entries.begin();
236 iter != entries.end();
237 ++iter)
238 {
239 objexp_hint_entry hint;
b3b6e05e 240 ldpp_dout(dpp, 15) << "got removal hint for: " << iter->key_ts.sec() \
7c673cae
FG
241 << " - " << iter->key_ext << dendl;
242
20effc67 243 int ret = objexp_hint_parse(dpp, store->ctx(), *iter, &hint);
7c673cae 244 if (ret < 0) {
b3b6e05e 245 ldpp_dout(dpp, 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
7c673cae
FG
246 continue;
247 }
248
249 /* PRECOND_FAILED simply means that our hint is not valid.
250 * We can silently ignore that and move forward. */
b3b6e05e 251 ret = garbage_single_object(dpp, hint);
7c673cae 252 if (ret == -ERR_PRECONDITION_FAILED) {
b3b6e05e 253 ldpp_dout(dpp, 15) << "not actual hint for object: " << hint.obj_key << dendl;
7c673cae 254 } else if (ret < 0) {
b3b6e05e 255 ldpp_dout(dpp, 1) << "cannot remove expired object: " << hint.obj_key << dendl;
7c673cae
FG
256 }
257
258 need_trim = true;
259 }
260
261 return;
262}
263
b3b6e05e
TL
264void RGWObjectExpirer::trim_chunk(const DoutPrefixProvider *dpp,
265 const string& shard,
7c673cae
FG
266 const utime_t& from,
267 const utime_t& to,
268 const string& from_marker,
269 const string& to_marker)
270{
b3b6e05e 271 ldpp_dout(dpp, 20) << "trying to trim removal hints to=" << to
7c673cae
FG
272 << ", to_marker=" << to_marker << dendl;
273
274 real_time rt_from = from.to_real_time();
275 real_time rt_to = to.to_real_time();
276
b3b6e05e 277 int ret = exp_store.objexp_hint_trim(dpp, shard, rt_from, rt_to,
9f95a23c 278 from_marker, to_marker);
7c673cae 279 if (ret < 0) {
b3b6e05e 280 ldpp_dout(dpp, 0) << "ERROR during trim: " << ret << dendl;
7c673cae
FG
281 }
282
283 return;
284}
285
b3b6e05e
TL
286bool RGWObjectExpirer::process_single_shard(const DoutPrefixProvider *dpp,
287 const string& shard,
7c673cae
FG
288 const utime_t& last_run,
289 const utime_t& round_start)
290{
291 string marker;
292 string out_marker;
293 bool truncated = false;
294 bool done = true;
295
296 CephContext *cct = store->ctx();
297 int num_entries = cct->_conf->rgw_objexp_chunk_size;
298
299 int max_secs = cct->_conf->rgw_objexp_gc_interval;
300 utime_t end = ceph_clock_now();
301 end += max_secs;
302
303 rados::cls::lock::Lock l(objexp_lock_name);
304
305 utime_t time(max_secs, 0);
306 l.set_duration(time);
307
20effc67 308 int ret = l.lock_exclusive(&static_cast<rgw::sal::RadosStore*>(store)->getRados()->objexp_pool_ctx, shard);
7c673cae 309 if (ret == -EBUSY) { /* already locked by another processor */
b3b6e05e 310 ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
7c673cae
FG
311 return false;
312 }
313
314 do {
315 real_time rt_last = last_run.to_real_time();
316 real_time rt_start = round_start.to_real_time();
317
318 list<cls_timeindex_entry> entries;
b3b6e05e 319 ret = exp_store.objexp_hint_list(dpp, shard, rt_last, rt_start,
9f95a23c
TL
320 num_entries, marker, entries,
321 &out_marker, &truncated);
7c673cae 322 if (ret < 0) {
b3b6e05e 323 ldpp_dout(dpp, 10) << "cannot get removal hints from shard: " << shard
7c673cae
FG
324 << dendl;
325 continue;
326 }
327
328 bool need_trim;
b3b6e05e 329 garbage_chunk(dpp, entries, need_trim);
7c673cae
FG
330
331 if (need_trim) {
b3b6e05e 332 trim_chunk(dpp, shard, last_run, round_start, marker, out_marker);
7c673cae
FG
333 }
334
335 utime_t now = ceph_clock_now();
336 if (now >= end) {
337 done = false;
338 break;
339 }
340
341 marker = out_marker;
342 } while (truncated);
343
20effc67 344 l.unlock(&static_cast<rgw::sal::RadosStore*>(store)->getRados()->objexp_pool_ctx, shard);
7c673cae
FG
345 return done;
346}
347
348/* Returns true if all shards have been processed successfully. */
b3b6e05e
TL
349bool RGWObjectExpirer::inspect_all_shards(const DoutPrefixProvider *dpp,
350 const utime_t& last_run,
7c673cae
FG
351 const utime_t& round_start)
352{
353 CephContext * const cct = store->ctx();
354 int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
355 bool all_done = true;
356
357 for (int i = 0; i < num_shards; i++) {
358 string shard;
9f95a23c 359 objexp_get_shard(i, &shard);
7c673cae 360
b3b6e05e 361 ldpp_dout(dpp, 20) << "processing shard = " << shard << dendl;
7c673cae 362
b3b6e05e 363 if (! process_single_shard(dpp, shard, last_run, round_start)) {
7c673cae
FG
364 all_done = false;
365 }
366 }
367
368 return all_done;
369}
370
371bool RGWObjectExpirer::going_down()
372{
373 return down_flag;
374}
375
376void RGWObjectExpirer::start_processor()
377{
378 worker = new OEWorker(store->ctx(), this);
379 worker->create("rgw_obj_expirer");
380}
381
382void RGWObjectExpirer::stop_processor()
383{
384 down_flag = true;
385 if (worker) {
386 worker->stop();
387 worker->join();
388 }
389 delete worker;
390 worker = NULL;
391}
392
393void *RGWObjectExpirer::OEWorker::entry() {
394 utime_t last_run;
395 do {
396 utime_t start = ceph_clock_now();
20effc67 397 ldpp_dout(this, 2) << "object expiration: start" << dendl;
b3b6e05e 398 if (oe->inspect_all_shards(this, last_run, start)) {
7c673cae
FG
399 /* All shards have been processed properly. Next time we can start
400 * from this moment. */
401 last_run = start;
402 }
20effc67 403 ldpp_dout(this, 2) << "object expiration: stop" << dendl;
7c673cae
FG
404
405
406 if (oe->going_down())
407 break;
408
409 utime_t end = ceph_clock_now();
410 end -= start;
411 int secs = cct->_conf->rgw_objexp_gc_interval;
412
413 if (secs <= end.sec())
414 continue; // next round
415
416 secs -= end.sec();
417
9f95a23c
TL
418 std::unique_lock l{lock};
419 cond.wait_for(l, std::chrono::seconds(secs));
7c673cae
FG
420 } while (!oe->going_down());
421
422 return NULL;
423}
424
425void RGWObjectExpirer::OEWorker::stop()
426{
9f95a23c
TL
427 std::lock_guard l{lock};
428 cond.notify_all();
7c673cae
FG
429}
430
b3b6e05e
TL
431CephContext *RGWObjectExpirer::OEWorker::get_cct() const
432{
433 return cct;
434}
435
436unsigned RGWObjectExpirer::OEWorker::get_subsys() const
437{
438 return dout_subsys;
439}
440
441std::ostream& RGWObjectExpirer::OEWorker::gen_prefix(std::ostream& out) const
442{
443 return out << "rgw object expirer Worker thread: ";
444}