]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_object_expirer_core.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_object_expirer_core.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5 #include <iostream>
6 #include <sstream>
7 #include <string>
8
9
10 #include "auth/Crypto.h"
11
12 #include "common/armor.h"
13 #include "common/ceph_json.h"
14 #include "common/config.h"
15 #include "common/ceph_argparse.h"
16 #include "common/Formatter.h"
17 #include "common/errno.h"
18
19 #include "global/global_init.h"
20
21 #include "include/utime.h"
22 #include "include/str_list.h"
23
24 #include "rgw_user.h"
25 #include "rgw_bucket.h"
26 #include "rgw_acl.h"
27 #include "rgw_acl_s3.h"
28 #include "rgw_log.h"
29 #include "rgw_formats.h"
30 #include "rgw_usage.h"
31 #include "rgw_object_expirer_core.h"
32 #include "rgw_zone.h"
33 #include "rgw_sal_rados.h"
34
35 #include "services/svc_rados.h"
36 #include "services/svc_zone.h"
37 #include "services/svc_sys_obj.h"
38 #include "services/svc_bi_rados.h"
39
40 #include "cls/lock/cls_lock_client.h"
41 #include "cls/timeindex/cls_timeindex_client.h"
42
43 #define dout_context g_ceph_context
44 #define dout_subsys ceph_subsys_rgw
45
46 using namespace std;
47
48 static string objexp_lock_name = "gc_process";
49
50 static string objexp_hint_get_shardname(int shard_num)
51 {
52 char buf[64];
53 snprintf(buf, sizeof(buf), "obj_delete_at_hint.%010u", (unsigned)shard_num);
54 return buf;
55 }
56
57 static int objexp_key_shard(const rgw_obj_index_key& key, int num_shards)
58 {
59 string obj_key = key.name + key.instance;
60 return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key, num_shards);
61 }
62
63 static string objexp_hint_get_keyext(const string& tenant_name,
64 const string& bucket_name,
65 const string& bucket_id,
66 const rgw_obj_key& obj_key) {
67 return tenant_name + (tenant_name.empty() ? "" : ":") + bucket_name + ":" + bucket_id +
68 ":" + obj_key.name + ":" + obj_key.instance;
69 }
70
71 static void objexp_get_shard(int shard_num,
72 string *shard)
73 {
74 *shard = objexp_hint_get_shardname(shard_num);
75 }
76
77 static int objexp_hint_parse(const DoutPrefixProvider *dpp, CephContext *cct, cls_timeindex_entry &ti_entry,
78 objexp_hint_entry *hint_entry)
79 {
80 try {
81 auto iter = ti_entry.value.cbegin();
82 decode(*hint_entry, iter);
83 } catch (buffer::error& err) {
84 ldpp_dout(dpp, 0) << "ERROR: couldn't decode avail_pools" << dendl;
85 }
86
87 return 0;
88 }
89
90 int RGWObjExpStore::objexp_hint_add(const DoutPrefixProvider *dpp,
91 const ceph::real_time& delete_at,
92 const string& tenant_name,
93 const string& bucket_name,
94 const string& bucket_id,
95 const rgw_obj_index_key& obj_key)
96 {
97 const string keyext = objexp_hint_get_keyext(tenant_name, bucket_name,
98 bucket_id, obj_key);
99 objexp_hint_entry he = {
100 .tenant = tenant_name,
101 .bucket_name = bucket_name,
102 .bucket_id = bucket_id,
103 .obj_key = obj_key,
104 .exp_time = delete_at };
105 bufferlist hebl;
106 encode(he, hebl);
107 librados::ObjectWriteOperation op;
108 cls_timeindex_add(op, utime_t(delete_at), keyext, hebl);
109
110 string shard_name = objexp_hint_get_shardname(objexp_key_shard(obj_key, cct->_conf->rgw_objexp_hints_num_shards));
111 auto obj = rados_svc->obj(rgw_raw_obj(driver->svc()->zone->get_zone_params().log_pool, shard_name));
112 int r = obj.open(dpp);
113 if (r < 0) {
114 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
115 return r;
116 }
117 return obj.operate(dpp, &op, null_yield);
118 }
119
120 int RGWObjExpStore::objexp_hint_list(const DoutPrefixProvider *dpp,
121 const string& oid,
122 const ceph::real_time& start_time,
123 const ceph::real_time& end_time,
124 const int max_entries,
125 const string& marker,
126 list<cls_timeindex_entry>& entries, /* out */
127 string *out_marker, /* out */
128 bool *truncated) /* out */
129 {
130 librados::ObjectReadOperation op;
131 cls_timeindex_list(op, utime_t(start_time), utime_t(end_time), marker, max_entries, entries,
132 out_marker, truncated);
133
134 auto obj = rados_svc->obj(rgw_raw_obj(driver->svc()->zone->get_zone_params().log_pool, oid));
135 int r = obj.open(dpp);
136 if (r < 0) {
137 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
138 return r;
139 }
140 bufferlist obl;
141 int ret = obj.operate(dpp, &op, &obl, null_yield);
142
143 if ((ret < 0 ) && (ret != -ENOENT)) {
144 return ret;
145 }
146
147 if ((ret == -ENOENT) && truncated) {
148 *truncated = false;
149 }
150
151 return 0;
152 }
153
154 static int cls_timeindex_trim_repeat(const DoutPrefixProvider *dpp,
155 rgw_rados_ref ref,
156 const string& oid,
157 const utime_t& from_time,
158 const utime_t& to_time,
159 const string& from_marker,
160 const string& to_marker)
161 {
162 bool done = false;
163 do {
164 librados::ObjectWriteOperation op;
165 cls_timeindex_trim(op, from_time, to_time, from_marker, to_marker);
166 int r = rgw_rados_operate(dpp, ref.pool.ioctx(), oid, &op, null_yield);
167 if (r == -ENODATA)
168 done = true;
169 else if (r < 0)
170 return r;
171 } while (!done);
172
173 return 0;
174 }
175
176 int RGWObjExpStore::objexp_hint_trim(const DoutPrefixProvider *dpp,
177 const string& oid,
178 const ceph::real_time& start_time,
179 const ceph::real_time& end_time,
180 const string& from_marker,
181 const string& to_marker)
182 {
183 auto obj = rados_svc->obj(rgw_raw_obj(driver->svc()->zone->get_zone_params().log_pool, oid));
184 int r = obj.open(dpp);
185 if (r < 0) {
186 ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): failed to open obj=" << obj << " (r=" << r << ")" << dendl;
187 return r;
188 }
189 auto& ref = obj.get_ref();
190 int ret = cls_timeindex_trim_repeat(dpp, ref, oid, utime_t(start_time), utime_t(end_time),
191 from_marker, to_marker);
192 if ((ret < 0 ) && (ret != -ENOENT)) {
193 return ret;
194 }
195
196 return 0;
197 }
198
199 int RGWObjectExpirer::garbage_single_object(const DoutPrefixProvider *dpp, objexp_hint_entry& hint)
200 {
201 RGWBucketInfo bucket_info;
202 std::unique_ptr<rgw::sal::Bucket> bucket;
203
204 int ret = driver->get_bucket(dpp, nullptr, rgw_bucket(hint.tenant, hint.bucket_name, hint.bucket_id), &bucket, null_yield);
205 if (-ENOENT == ret) {
206 ldpp_dout(dpp, 15) << "NOTICE: cannot find bucket = " \
207 << hint.bucket_name << ". The object must be already removed" << dendl;
208 return -ERR_PRECONDITION_FAILED;
209 } else if (ret < 0) {
210 ldpp_dout(dpp, 1) << "ERROR: could not init bucket = " \
211 << hint.bucket_name << "due to ret = " << ret << dendl;
212 return ret;
213 }
214
215 rgw_obj_key key = hint.obj_key;
216 if (key.instance.empty()) {
217 key.instance = "null";
218 }
219
220 std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(key);
221 obj->set_atomic();
222 ret = obj->delete_object(dpp, null_yield);
223
224 return ret;
225 }
226
227 void RGWObjectExpirer::garbage_chunk(const DoutPrefixProvider *dpp,
228 list<cls_timeindex_entry>& entries, /* in */
229 bool& need_trim) /* out */
230 {
231 need_trim = false;
232
233 for (list<cls_timeindex_entry>::iterator iter = entries.begin();
234 iter != entries.end();
235 ++iter)
236 {
237 objexp_hint_entry hint;
238 ldpp_dout(dpp, 15) << "got removal hint for: " << iter->key_ts.sec() \
239 << " - " << iter->key_ext << dendl;
240
241 int ret = objexp_hint_parse(dpp, driver->ctx(), *iter, &hint);
242 if (ret < 0) {
243 ldpp_dout(dpp, 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
244 continue;
245 }
246
247 /* PRECOND_FAILED simply means that our hint is not valid.
248 * We can silently ignore that and move forward. */
249 ret = garbage_single_object(dpp, hint);
250 if (ret == -ERR_PRECONDITION_FAILED) {
251 ldpp_dout(dpp, 15) << "not actual hint for object: " << hint.obj_key << dendl;
252 } else if (ret < 0) {
253 ldpp_dout(dpp, 1) << "cannot remove expired object: " << hint.obj_key << dendl;
254 }
255
256 need_trim = true;
257 }
258
259 return;
260 }
261
262 void RGWObjectExpirer::trim_chunk(const DoutPrefixProvider *dpp,
263 const string& shard,
264 const utime_t& from,
265 const utime_t& to,
266 const string& from_marker,
267 const string& to_marker)
268 {
269 ldpp_dout(dpp, 20) << "trying to trim removal hints to=" << to
270 << ", to_marker=" << to_marker << dendl;
271
272 real_time rt_from = from.to_real_time();
273 real_time rt_to = to.to_real_time();
274
275 int ret = exp_store.objexp_hint_trim(dpp, shard, rt_from, rt_to,
276 from_marker, to_marker);
277 if (ret < 0) {
278 ldpp_dout(dpp, 0) << "ERROR during trim: " << ret << dendl;
279 }
280
281 return;
282 }
283
284 bool RGWObjectExpirer::process_single_shard(const DoutPrefixProvider *dpp,
285 const string& shard,
286 const utime_t& last_run,
287 const utime_t& round_start)
288 {
289 string marker;
290 string out_marker;
291 bool truncated = false;
292 bool done = true;
293
294 CephContext *cct = driver->ctx();
295 int num_entries = cct->_conf->rgw_objexp_chunk_size;
296
297 int max_secs = cct->_conf->rgw_objexp_gc_interval;
298 utime_t end = ceph_clock_now();
299 end += max_secs;
300
301 rados::cls::lock::Lock l(objexp_lock_name);
302
303 utime_t time(max_secs, 0);
304 l.set_duration(time);
305
306 int ret = l.lock_exclusive(&static_cast<rgw::sal::RadosStore*>(driver)->getRados()->objexp_pool_ctx, shard);
307 if (ret == -EBUSY) { /* already locked by another processor */
308 ldpp_dout(dpp, 5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
309 return false;
310 }
311
312 do {
313 real_time rt_last = last_run.to_real_time();
314 real_time rt_start = round_start.to_real_time();
315
316 list<cls_timeindex_entry> entries;
317 ret = exp_store.objexp_hint_list(dpp, shard, rt_last, rt_start,
318 num_entries, marker, entries,
319 &out_marker, &truncated);
320 if (ret < 0) {
321 ldpp_dout(dpp, 10) << "cannot get removal hints from shard: " << shard
322 << dendl;
323 continue;
324 }
325
326 bool need_trim;
327 garbage_chunk(dpp, entries, need_trim);
328
329 if (need_trim) {
330 trim_chunk(dpp, shard, last_run, round_start, marker, out_marker);
331 }
332
333 utime_t now = ceph_clock_now();
334 if (now >= end) {
335 done = false;
336 break;
337 }
338
339 marker = out_marker;
340 } while (truncated);
341
342 l.unlock(&static_cast<rgw::sal::RadosStore*>(driver)->getRados()->objexp_pool_ctx, shard);
343 return done;
344 }
345
346 /* Returns true if all shards have been processed successfully. */
347 bool RGWObjectExpirer::inspect_all_shards(const DoutPrefixProvider *dpp,
348 const utime_t& last_run,
349 const utime_t& round_start)
350 {
351 CephContext * const cct = driver->ctx();
352 int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
353 bool all_done = true;
354
355 for (int i = 0; i < num_shards; i++) {
356 string shard;
357 objexp_get_shard(i, &shard);
358
359 ldpp_dout(dpp, 20) << "processing shard = " << shard << dendl;
360
361 if (! process_single_shard(dpp, shard, last_run, round_start)) {
362 all_done = false;
363 }
364 }
365
366 return all_done;
367 }
368
369 bool RGWObjectExpirer::going_down()
370 {
371 return down_flag;
372 }
373
374 void RGWObjectExpirer::start_processor()
375 {
376 worker = new OEWorker(driver->ctx(), this);
377 worker->create("rgw_obj_expirer");
378 }
379
380 void RGWObjectExpirer::stop_processor()
381 {
382 down_flag = true;
383 if (worker) {
384 worker->stop();
385 worker->join();
386 }
387 delete worker;
388 worker = NULL;
389 }
390
391 void *RGWObjectExpirer::OEWorker::entry() {
392 utime_t last_run;
393 do {
394 utime_t start = ceph_clock_now();
395 ldpp_dout(this, 2) << "object expiration: start" << dendl;
396 if (oe->inspect_all_shards(this, last_run, start)) {
397 /* All shards have been processed properly. Next time we can start
398 * from this moment. */
399 last_run = start;
400 }
401 ldpp_dout(this, 2) << "object expiration: stop" << dendl;
402
403
404 if (oe->going_down())
405 break;
406
407 utime_t end = ceph_clock_now();
408 end -= start;
409 int secs = cct->_conf->rgw_objexp_gc_interval;
410
411 if (secs <= end.sec())
412 continue; // next round
413
414 secs -= end.sec();
415
416 std::unique_lock l{lock};
417 cond.wait_for(l, std::chrono::seconds(secs));
418 } while (!oe->going_down());
419
420 return NULL;
421 }
422
423 void RGWObjectExpirer::OEWorker::stop()
424 {
425 std::lock_guard l{lock};
426 cond.notify_all();
427 }
428
429 CephContext *RGWObjectExpirer::OEWorker::get_cct() const
430 {
431 return cct;
432 }
433
434 unsigned RGWObjectExpirer::OEWorker::get_subsys() const
435 {
436 return dout_subsys;
437 }
438
439 std::ostream& RGWObjectExpirer::OEWorker::gen_prefix(std::ostream& out) const
440 {
441 return out << "rgw object expirer Worker thread: ";
442 }