]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_object_expirer_core.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / rgw / rgw_object_expirer_core.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5 #include <iostream>
6 #include <sstream>
7 #include <string>
8
9
10 #include "auth/Crypto.h"
11
12 #include "common/armor.h"
13 #include "common/ceph_json.h"
14 #include "common/config.h"
15 #include "common/ceph_argparse.h"
16 #include "common/Formatter.h"
17 #include "common/errno.h"
18
19 #include "global/global_init.h"
20
21 #include "include/utime.h"
22 #include "include/str_list.h"
23
24 #include "rgw_user.h"
25 #include "rgw_bucket.h"
26 #include "rgw_rados.h"
27 #include "rgw_acl.h"
28 #include "rgw_acl_s3.h"
29 #include "rgw_log.h"
30 #include "rgw_formats.h"
31 #include "rgw_usage.h"
32 #include "rgw_object_expirer_core.h"
33
34 #include "services/svc_sys_obj.h"
35
36 #include "cls/lock/cls_lock_client.h"
37
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_rgw
40
41 static string objexp_lock_name = "gc_process";
42
43 int RGWObjectExpirer::init_bucket_info(const string& tenant_name,
44 const string& bucket_name,
45 const string& bucket_id,
46 RGWBucketInfo& bucket_info)
47 {
48 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
49
50 /*
51 * XXX Here's where it gets tricky. We went to all the trouble of
52 * punching the tenant through the objexp_hint_entry, but now we
53 * find that our instances do not actually have tenants. They are
54 * unique thanks to IDs. So the tenant string is not needed...
55
56 * XXX reloaded: it turns out tenants were needed after all since bucket ids
57 * are ephemeral, good call encoding tenant info!
58 */
59
60 return store->get_bucket_info(obj_ctx, tenant_name, bucket_name,
61 bucket_info, nullptr, nullptr);
62
63 }
64
65 int RGWObjectExpirer::garbage_single_object(objexp_hint_entry& hint)
66 {
67 RGWBucketInfo bucket_info;
68
69 int ret = init_bucket_info(hint.tenant, hint.bucket_name,
70 hint.bucket_id, bucket_info);
71 if (-ENOENT == ret) {
72 ldout(store->ctx(), 15) << "NOTICE: cannot find bucket = " \
73 << hint.bucket_name << ". The object must be already removed" << dendl;
74 return -ERR_PRECONDITION_FAILED;
75 } else if (ret < 0) {
76 ldout(store->ctx(), 1) << "ERROR: could not init bucket = " \
77 << hint.bucket_name << "due to ret = " << ret << dendl;
78 return ret;
79 }
80
81 RGWObjectCtx rctx(store);
82
83 rgw_obj_key key = hint.obj_key;
84 if (key.instance.empty()) {
85 key.instance = "null";
86 }
87
88 rgw_obj obj(bucket_info.bucket, key);
89 store->set_atomic(&rctx, obj);
90 ret = store->delete_obj(rctx, bucket_info, obj,
91 bucket_info.versioning_status(), 0, hint.exp_time);
92
93 return ret;
94 }
95
96 void RGWObjectExpirer::garbage_chunk(list<cls_timeindex_entry>& entries, /* in */
97 bool& need_trim) /* out */
98 {
99 need_trim = false;
100
101 for (list<cls_timeindex_entry>::iterator iter = entries.begin();
102 iter != entries.end();
103 ++iter)
104 {
105 objexp_hint_entry hint;
106 ldout(store->ctx(), 15) << "got removal hint for: " << iter->key_ts.sec() \
107 << " - " << iter->key_ext << dendl;
108
109 int ret = store->objexp_hint_parse(*iter, hint);
110 if (ret < 0) {
111 ldout(store->ctx(), 1) << "cannot parse removal hint for " << hint.obj_key << dendl;
112 continue;
113 }
114
115 /* PRECOND_FAILED simply means that our hint is not valid.
116 * We can silently ignore that and move forward. */
117 ret = garbage_single_object(hint);
118 if (ret == -ERR_PRECONDITION_FAILED) {
119 ldout(store->ctx(), 15) << "not actual hint for object: " << hint.obj_key << dendl;
120 } else if (ret < 0) {
121 ldout(store->ctx(), 1) << "cannot remove expired object: " << hint.obj_key << dendl;
122 }
123
124 need_trim = true;
125 }
126
127 return;
128 }
129
130 void RGWObjectExpirer::trim_chunk(const string& shard,
131 const utime_t& from,
132 const utime_t& to,
133 const string& from_marker,
134 const string& to_marker)
135 {
136 ldout(store->ctx(), 20) << "trying to trim removal hints to=" << to
137 << ", to_marker=" << to_marker << dendl;
138
139 real_time rt_from = from.to_real_time();
140 real_time rt_to = to.to_real_time();
141
142 int ret = store->objexp_hint_trim(shard, rt_from, rt_to,
143 from_marker, to_marker);
144 if (ret < 0) {
145 ldout(store->ctx(), 0) << "ERROR during trim: " << ret << dendl;
146 }
147
148 return;
149 }
150
151 bool RGWObjectExpirer::process_single_shard(const string& shard,
152 const utime_t& last_run,
153 const utime_t& round_start)
154 {
155 string marker;
156 string out_marker;
157 bool truncated = false;
158 bool done = true;
159
160 CephContext *cct = store->ctx();
161 int num_entries = cct->_conf->rgw_objexp_chunk_size;
162
163 int max_secs = cct->_conf->rgw_objexp_gc_interval;
164 utime_t end = ceph_clock_now();
165 end += max_secs;
166
167 rados::cls::lock::Lock l(objexp_lock_name);
168
169 utime_t time(max_secs, 0);
170 l.set_duration(time);
171
172 int ret = l.lock_exclusive(&store->objexp_pool_ctx, shard);
173 if (ret == -EBUSY) { /* already locked by another processor */
174 dout(5) << __func__ << "(): failed to acquire lock on " << shard << dendl;
175 return false;
176 }
177
178 do {
179 real_time rt_last = last_run.to_real_time();
180 real_time rt_start = round_start.to_real_time();
181
182 list<cls_timeindex_entry> entries;
183 ret = store->objexp_hint_list(shard, rt_last, rt_start,
184 num_entries, marker, entries,
185 &out_marker, &truncated);
186 if (ret < 0) {
187 ldout(cct, 10) << "cannot get removal hints from shard: " << shard
188 << dendl;
189 continue;
190 }
191
192 bool need_trim;
193 garbage_chunk(entries, need_trim);
194
195 if (need_trim) {
196 trim_chunk(shard, last_run, round_start, marker, out_marker);
197 }
198
199 utime_t now = ceph_clock_now();
200 if (now >= end) {
201 done = false;
202 break;
203 }
204
205 marker = out_marker;
206 } while (truncated);
207
208 l.unlock(&store->objexp_pool_ctx, shard);
209 return done;
210 }
211
212 /* Returns true if all shards have been processed successfully. */
213 bool RGWObjectExpirer::inspect_all_shards(const utime_t& last_run,
214 const utime_t& round_start)
215 {
216 CephContext * const cct = store->ctx();
217 int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
218 bool all_done = true;
219
220 for (int i = 0; i < num_shards; i++) {
221 string shard;
222 store->objexp_get_shard(i, shard);
223
224 ldout(store->ctx(), 20) << "processing shard = " << shard << dendl;
225
226 if (! process_single_shard(shard, last_run, round_start)) {
227 all_done = false;
228 }
229 }
230
231 return all_done;
232 }
233
234 bool RGWObjectExpirer::going_down()
235 {
236 return down_flag;
237 }
238
239 void RGWObjectExpirer::start_processor()
240 {
241 worker = new OEWorker(store->ctx(), this);
242 worker->create("rgw_obj_expirer");
243 }
244
245 void RGWObjectExpirer::stop_processor()
246 {
247 down_flag = true;
248 if (worker) {
249 worker->stop();
250 worker->join();
251 }
252 delete worker;
253 worker = NULL;
254 }
255
256 void *RGWObjectExpirer::OEWorker::entry() {
257 utime_t last_run;
258 do {
259 utime_t start = ceph_clock_now();
260 ldout(cct, 2) << "object expiration: start" << dendl;
261 if (oe->inspect_all_shards(last_run, start)) {
262 /* All shards have been processed properly. Next time we can start
263 * from this moment. */
264 last_run = start;
265 }
266 ldout(cct, 2) << "object expiration: stop" << dendl;
267
268
269 if (oe->going_down())
270 break;
271
272 utime_t end = ceph_clock_now();
273 end -= start;
274 int secs = cct->_conf->rgw_objexp_gc_interval;
275
276 if (secs <= end.sec())
277 continue; // next round
278
279 secs -= end.sec();
280
281 lock.Lock();
282 cond.WaitInterval(lock, utime_t(secs, 0));
283 lock.Unlock();
284 } while (!oe->going_down());
285
286 return NULL;
287 }
288
289 void RGWObjectExpirer::OEWorker::stop()
290 {
291 Mutex::Locker l(lock);
292 cond.Signal();
293 }
294