1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
10 #include "auth/Crypto.h"
12 #include "common/armor.h"
13 #include "common/ceph_json.h"
14 #include "common/config.h"
15 #include "common/ceph_argparse.h"
16 #include "common/Formatter.h"
17 #include "common/errno.h"
19 #include "global/global_init.h"
21 #include "include/utime.h"
22 #include "include/str_list.h"
25 #include "rgw_bucket.h"
26 #include "rgw_rados.h"
28 #include "rgw_acl_s3.h"
30 #include "rgw_formats.h"
31 #include "rgw_usage.h"
32 #include "rgw_object_expirer_core.h"
35 #include "services/svc_rados.h"
36 #include "services/svc_zone.h"
37 #include "services/svc_sys_obj.h"
38 #include "services/svc_bi_rados.h"
40 #include "cls/lock/cls_lock_client.h"
41 #include "cls/timeindex/cls_timeindex_client.h"
43 #define dout_context g_ceph_context
44 #define dout_subsys ceph_subsys_rgw
46 static string objexp_lock_name
= "gc_process";
48 static string
objexp_hint_get_shardname(int shard_num
)
51 snprintf(buf
, sizeof(buf
), "obj_delete_at_hint.%010u", (unsigned)shard_num
);
55 static int objexp_key_shard(const rgw_obj_index_key
& key
, int num_shards
)
57 string obj_key
= key
.name
+ key
.instance
;
58 return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key
, num_shards
);
61 static string
objexp_hint_get_keyext(const string
& tenant_name
,
62 const string
& bucket_name
,
63 const string
& bucket_id
,
64 const rgw_obj_key
& obj_key
) {
65 return tenant_name
+ (tenant_name
.empty() ? "" : ":") + bucket_name
+ ":" + bucket_id
+
66 ":" + obj_key
.name
+ ":" + obj_key
.instance
;
69 static void objexp_get_shard(int shard_num
,
72 *shard
= objexp_hint_get_shardname(shard_num
);
75 static int objexp_hint_parse(CephContext
*cct
, cls_timeindex_entry
&ti_entry
,
76 objexp_hint_entry
*hint_entry
)
79 auto iter
= ti_entry
.value
.cbegin();
80 decode(*hint_entry
, iter
);
81 } catch (buffer::error
& err
) {
82 ldout(cct
, 0) << "ERROR: couldn't decode avail_pools" << dendl
;
88 int RGWObjExpStore::objexp_hint_add(const ceph::real_time
& delete_at
,
89 const string
& tenant_name
,
90 const string
& bucket_name
,
91 const string
& bucket_id
,
92 const rgw_obj_index_key
& obj_key
)
94 const string keyext
= objexp_hint_get_keyext(tenant_name
, bucket_name
,
96 objexp_hint_entry he
= {
97 .tenant
= tenant_name
,
98 .bucket_name
= bucket_name
,
99 .bucket_id
= bucket_id
,
101 .exp_time
= delete_at
};
104 librados::ObjectWriteOperation op
;
105 cls_timeindex_add(op
, utime_t(delete_at
), keyext
, hebl
);
107 string shard_name
= objexp_hint_get_shardname(objexp_key_shard(obj_key
, cct
->_conf
->rgw_objexp_hints_num_shards
));
108 auto obj
= rados_svc
->obj(rgw_raw_obj(zone_svc
->get_zone_params().log_pool
, shard_name
));
111 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): failed to open obj=" << obj
<< " (r=" << r
<< ")" << dendl
;
114 return obj
.operate(&op
, null_yield
);
117 int RGWObjExpStore::objexp_hint_list(const string
& oid
,
118 const ceph::real_time
& start_time
,
119 const ceph::real_time
& end_time
,
120 const int max_entries
,
121 const string
& marker
,
122 list
<cls_timeindex_entry
>& entries
, /* out */
123 string
*out_marker
, /* out */
124 bool *truncated
) /* out */
126 librados::ObjectReadOperation op
;
127 cls_timeindex_list(op
, utime_t(start_time
), utime_t(end_time
), marker
, max_entries
, entries
,
128 out_marker
, truncated
);
130 auto obj
= rados_svc
->obj(rgw_raw_obj(zone_svc
->get_zone_params().log_pool
, oid
));
133 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): failed to open obj=" << obj
<< " (r=" << r
<< ")" << dendl
;
137 int ret
= obj
.operate(&op
, &obl
, null_yield
);
139 if ((ret
< 0 ) && (ret
!= -ENOENT
)) {
143 if ((ret
== -ENOENT
) && truncated
) {
150 static int cls_timeindex_trim_repeat(rgw_rados_ref ref
,
152 const utime_t
& from_time
,
153 const utime_t
& to_time
,
154 const string
& from_marker
,
155 const string
& to_marker
)
159 librados::ObjectWriteOperation op
;
160 cls_timeindex_trim(op
, from_time
, to_time
, from_marker
, to_marker
);
161 int r
= rgw_rados_operate(ref
.pool
.ioctx(), oid
, &op
, null_yield
);
171 int RGWObjExpStore::objexp_hint_trim(const string
& oid
,
172 const ceph::real_time
& start_time
,
173 const ceph::real_time
& end_time
,
174 const string
& from_marker
,
175 const string
& to_marker
)
177 auto obj
= rados_svc
->obj(rgw_raw_obj(zone_svc
->get_zone_params().log_pool
, oid
));
180 ldout(cct
, 0) << "ERROR: " << __func__
<< "(): failed to open obj=" << obj
<< " (r=" << r
<< ")" << dendl
;
183 auto& ref
= obj
.get_ref();
184 int ret
= cls_timeindex_trim_repeat(ref
, oid
, utime_t(start_time
), utime_t(end_time
),
185 from_marker
, to_marker
);
186 if ((ret
< 0 ) && (ret
!= -ENOENT
)) {
193 int RGWObjectExpirer::init_bucket_info(const string
& tenant_name
,
194 const string
& bucket_name
,
195 const string
& bucket_id
,
196 RGWBucketInfo
& bucket_info
)
199 * XXX Here's where it gets tricky. We went to all the trouble of
200 * punching the tenant through the objexp_hint_entry, but now we
201 * find that our instances do not actually have tenants. They are
202 * unique thanks to IDs. So the tenant string is not needed...
204 * XXX reloaded: it turns out tenants were needed after all since bucket ids
205 * are ephemeral, good call encoding tenant info!
208 return store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
,
209 bucket_info
, nullptr, null_yield
, nullptr);
213 int RGWObjectExpirer::garbage_single_object(objexp_hint_entry
& hint
)
215 RGWBucketInfo bucket_info
;
217 int ret
= init_bucket_info(hint
.tenant
, hint
.bucket_name
,
218 hint
.bucket_id
, bucket_info
);
219 if (-ENOENT
== ret
) {
220 ldout(store
->ctx(), 15) << "NOTICE: cannot find bucket = " \
221 << hint
.bucket_name
<< ". The object must be already removed" << dendl
;
222 return -ERR_PRECONDITION_FAILED
;
223 } else if (ret
< 0) {
224 ldout(store
->ctx(), 1) << "ERROR: could not init bucket = " \
225 << hint
.bucket_name
<< "due to ret = " << ret
<< dendl
;
229 RGWObjectCtx
rctx(store
);
231 rgw_obj_key key
= hint
.obj_key
;
232 if (key
.instance
.empty()) {
233 key
.instance
= "null";
236 rgw_obj
obj(bucket_info
.bucket
, key
);
237 store
->getRados()->set_atomic(&rctx
, obj
);
238 ret
= store
->getRados()->delete_obj(rctx
, bucket_info
, obj
,
239 bucket_info
.versioning_status(), 0, hint
.exp_time
);
244 void RGWObjectExpirer::garbage_chunk(list
<cls_timeindex_entry
>& entries
, /* in */
245 bool& need_trim
) /* out */
249 for (list
<cls_timeindex_entry
>::iterator iter
= entries
.begin();
250 iter
!= entries
.end();
253 objexp_hint_entry hint
;
254 ldout(store
->ctx(), 15) << "got removal hint for: " << iter
->key_ts
.sec() \
255 << " - " << iter
->key_ext
<< dendl
;
257 int ret
= objexp_hint_parse(store
->getRados()->ctx(), *iter
, &hint
);
259 ldout(store
->ctx(), 1) << "cannot parse removal hint for " << hint
.obj_key
<< dendl
;
263 /* PRECOND_FAILED simply means that our hint is not valid.
264 * We can silently ignore that and move forward. */
265 ret
= garbage_single_object(hint
);
266 if (ret
== -ERR_PRECONDITION_FAILED
) {
267 ldout(store
->ctx(), 15) << "not actual hint for object: " << hint
.obj_key
<< dendl
;
268 } else if (ret
< 0) {
269 ldout(store
->ctx(), 1) << "cannot remove expired object: " << hint
.obj_key
<< dendl
;
278 void RGWObjectExpirer::trim_chunk(const string
& shard
,
281 const string
& from_marker
,
282 const string
& to_marker
)
284 ldout(store
->ctx(), 20) << "trying to trim removal hints to=" << to
285 << ", to_marker=" << to_marker
<< dendl
;
287 real_time rt_from
= from
.to_real_time();
288 real_time rt_to
= to
.to_real_time();
290 int ret
= exp_store
.objexp_hint_trim(shard
, rt_from
, rt_to
,
291 from_marker
, to_marker
);
293 ldout(store
->ctx(), 0) << "ERROR during trim: " << ret
<< dendl
;
299 bool RGWObjectExpirer::process_single_shard(const string
& shard
,
300 const utime_t
& last_run
,
301 const utime_t
& round_start
)
305 bool truncated
= false;
308 CephContext
*cct
= store
->ctx();
309 int num_entries
= cct
->_conf
->rgw_objexp_chunk_size
;
311 int max_secs
= cct
->_conf
->rgw_objexp_gc_interval
;
312 utime_t end
= ceph_clock_now();
315 rados::cls::lock::Lock
l(objexp_lock_name
);
317 utime_t
time(max_secs
, 0);
318 l
.set_duration(time
);
320 int ret
= l
.lock_exclusive(&store
->getRados()->objexp_pool_ctx
, shard
);
321 if (ret
== -EBUSY
) { /* already locked by another processor */
322 dout(5) << __func__
<< "(): failed to acquire lock on " << shard
<< dendl
;
327 real_time rt_last
= last_run
.to_real_time();
328 real_time rt_start
= round_start
.to_real_time();
330 list
<cls_timeindex_entry
> entries
;
331 ret
= exp_store
.objexp_hint_list(shard
, rt_last
, rt_start
,
332 num_entries
, marker
, entries
,
333 &out_marker
, &truncated
);
335 ldout(cct
, 10) << "cannot get removal hints from shard: " << shard
341 garbage_chunk(entries
, need_trim
);
344 trim_chunk(shard
, last_run
, round_start
, marker
, out_marker
);
347 utime_t now
= ceph_clock_now();
356 l
.unlock(&store
->getRados()->objexp_pool_ctx
, shard
);
360 /* Returns true if all shards have been processed successfully. */
361 bool RGWObjectExpirer::inspect_all_shards(const utime_t
& last_run
,
362 const utime_t
& round_start
)
364 CephContext
* const cct
= store
->ctx();
365 int num_shards
= cct
->_conf
->rgw_objexp_hints_num_shards
;
366 bool all_done
= true;
368 for (int i
= 0; i
< num_shards
; i
++) {
370 objexp_get_shard(i
, &shard
);
372 ldout(store
->ctx(), 20) << "processing shard = " << shard
<< dendl
;
374 if (! process_single_shard(shard
, last_run
, round_start
)) {
382 bool RGWObjectExpirer::going_down()
387 void RGWObjectExpirer::start_processor()
389 worker
= new OEWorker(store
->ctx(), this);
390 worker
->create("rgw_obj_expirer");
393 void RGWObjectExpirer::stop_processor()
404 void *RGWObjectExpirer::OEWorker::entry() {
407 utime_t start
= ceph_clock_now();
408 ldout(cct
, 2) << "object expiration: start" << dendl
;
409 if (oe
->inspect_all_shards(last_run
, start
)) {
410 /* All shards have been processed properly. Next time we can start
411 * from this moment. */
414 ldout(cct
, 2) << "object expiration: stop" << dendl
;
417 if (oe
->going_down())
420 utime_t end
= ceph_clock_now();
422 int secs
= cct
->_conf
->rgw_objexp_gc_interval
;
424 if (secs
<= end
.sec())
425 continue; // next round
429 std::unique_lock l
{lock
};
430 cond
.wait_for(l
, std::chrono::seconds(secs
));
431 } while (!oe
->going_down());
436 void RGWObjectExpirer::OEWorker::stop()
438 std::lock_guard l
{lock
};