1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include "auth/Crypto.h"
12 #include "common/armor.h"
13 #include "common/ceph_json.h"
14 #include "common/config.h"
15 #include "common/ceph_argparse.h"
16 #include "common/Formatter.h"
17 #include "common/errno.h"
19 #include "global/global_init.h"
21 #include "include/utime.h"
22 #include "include/str_list.h"
25 #include "rgw_bucket.h"
26 #include "rgw_rados.h"
28 #include "rgw_acl_s3.h"
30 #include "rgw_formats.h"
31 #include "rgw_usage.h"
32 #include "rgw_object_expirer_core.h"
34 #include "services/svc_sys_obj.h"
36 #include "cls/lock/cls_lock_client.h"
38 #define dout_context g_ceph_context
39 #define dout_subsys ceph_subsys_rgw
41 static string objexp_lock_name
= "gc_process";
43 int RGWObjectExpirer::init_bucket_info(const string
& tenant_name
,
44 const string
& bucket_name
,
45 const string
& bucket_id
,
46 RGWBucketInfo
& bucket_info
)
48 auto obj_ctx
= store
->svc
.sysobj
->init_obj_ctx();
51 * XXX Here's where it gets tricky. We went to all the trouble of
52 * punching the tenant through the objexp_hint_entry, but now we
53 * find that our instances do not actually have tenants. They are
54 * unique thanks to IDs. So the tenant string is not needed...
56 const string bucket_instance_id
= bucket_name
+ ":" + bucket_id
;
57 int ret
= store
->get_bucket_instance_info(obj_ctx
, bucket_instance_id
,
58 bucket_info
, NULL
, NULL
);
62 int RGWObjectExpirer::garbage_single_object(objexp_hint_entry
& hint
)
64 RGWBucketInfo bucket_info
;
66 int ret
= init_bucket_info(hint
.tenant
, hint
.bucket_name
,
67 hint
.bucket_id
, bucket_info
);
69 ldout(store
->ctx(), 15) << "NOTICE: cannot find bucket = " \
70 << hint
.bucket_name
<< ". The object must be already removed" << dendl
;
71 return -ERR_PRECONDITION_FAILED
;
73 ldout(store
->ctx(), 1) << "ERROR: could not init bucket = " \
74 << hint
.bucket_name
<< "due to ret = " << ret
<< dendl
;
78 RGWObjectCtx
rctx(store
);
80 rgw_obj_key key
= hint
.obj_key
;
81 if (key
.instance
.empty()) {
82 key
.instance
= "null";
85 rgw_obj
obj(bucket_info
.bucket
, key
);
86 store
->set_atomic(&rctx
, obj
);
87 ret
= store
->delete_obj(rctx
, bucket_info
, obj
,
88 bucket_info
.versioning_status(), 0, hint
.exp_time
);
93 void RGWObjectExpirer::garbage_chunk(list
<cls_timeindex_entry
>& entries
, /* in */
94 bool& need_trim
) /* out */
98 for (list
<cls_timeindex_entry
>::iterator iter
= entries
.begin();
99 iter
!= entries
.end();
102 objexp_hint_entry hint
;
103 ldout(store
->ctx(), 15) << "got removal hint for: " << iter
->key_ts
.sec() \
104 << " - " << iter
->key_ext
<< dendl
;
106 int ret
= store
->objexp_hint_parse(*iter
, hint
);
108 ldout(store
->ctx(), 1) << "cannot parse removal hint for " << hint
.obj_key
<< dendl
;
112 /* PRECOND_FAILED simply means that our hint is not valid.
113 * We can silently ignore that and move forward. */
114 ret
= garbage_single_object(hint
);
115 if (ret
== -ERR_PRECONDITION_FAILED
) {
116 ldout(store
->ctx(), 15) << "not actual hint for object: " << hint
.obj_key
<< dendl
;
117 } else if (ret
< 0) {
118 ldout(store
->ctx(), 1) << "cannot remove expired object: " << hint
.obj_key
<< dendl
;
127 void RGWObjectExpirer::trim_chunk(const string
& shard
,
130 const string
& from_marker
,
131 const string
& to_marker
)
133 ldout(store
->ctx(), 20) << "trying to trim removal hints to=" << to
134 << ", to_marker=" << to_marker
<< dendl
;
136 real_time rt_from
= from
.to_real_time();
137 real_time rt_to
= to
.to_real_time();
139 int ret
= store
->objexp_hint_trim(shard
, rt_from
, rt_to
,
140 from_marker
, to_marker
);
142 ldout(store
->ctx(), 0) << "ERROR during trim: " << ret
<< dendl
;
148 bool RGWObjectExpirer::process_single_shard(const string
& shard
,
149 const utime_t
& last_run
,
150 const utime_t
& round_start
)
154 bool truncated
= false;
157 CephContext
*cct
= store
->ctx();
158 int num_entries
= cct
->_conf
->rgw_objexp_chunk_size
;
160 int max_secs
= cct
->_conf
->rgw_objexp_gc_interval
;
161 utime_t end
= ceph_clock_now();
164 rados::cls::lock::Lock
l(objexp_lock_name
);
166 utime_t
time(max_secs
, 0);
167 l
.set_duration(time
);
169 int ret
= l
.lock_exclusive(&store
->objexp_pool_ctx
, shard
);
170 if (ret
== -EBUSY
) { /* already locked by another processor */
171 dout(5) << __func__
<< "(): failed to acquire lock on " << shard
<< dendl
;
176 real_time rt_last
= last_run
.to_real_time();
177 real_time rt_start
= round_start
.to_real_time();
179 list
<cls_timeindex_entry
> entries
;
180 ret
= store
->objexp_hint_list(shard
, rt_last
, rt_start
,
181 num_entries
, marker
, entries
,
182 &out_marker
, &truncated
);
184 ldout(cct
, 10) << "cannot get removal hints from shard: " << shard
190 garbage_chunk(entries
, need_trim
);
193 trim_chunk(shard
, last_run
, round_start
, marker
, out_marker
);
196 utime_t now
= ceph_clock_now();
205 l
.unlock(&store
->objexp_pool_ctx
, shard
);
209 /* Returns true if all shards have been processed successfully. */
210 bool RGWObjectExpirer::inspect_all_shards(const utime_t
& last_run
,
211 const utime_t
& round_start
)
213 CephContext
* const cct
= store
->ctx();
214 int num_shards
= cct
->_conf
->rgw_objexp_hints_num_shards
;
215 bool all_done
= true;
217 for (int i
= 0; i
< num_shards
; i
++) {
219 store
->objexp_get_shard(i
, shard
);
221 ldout(store
->ctx(), 20) << "proceeding shard = " << shard
<< dendl
;
223 if (! process_single_shard(shard
, last_run
, round_start
)) {
231 bool RGWObjectExpirer::going_down()
236 void RGWObjectExpirer::start_processor()
238 worker
= new OEWorker(store
->ctx(), this);
239 worker
->create("rgw_obj_expirer");
242 void RGWObjectExpirer::stop_processor()
253 void *RGWObjectExpirer::OEWorker::entry() {
256 utime_t start
= ceph_clock_now();
257 ldout(cct
, 2) << "object expiration: start" << dendl
;
258 if (oe
->inspect_all_shards(last_run
, start
)) {
259 /* All shards have been processed properly. Next time we can start
260 * from this moment. */
263 ldout(cct
, 2) << "object expiration: stop" << dendl
;
266 if (oe
->going_down())
269 utime_t end
= ceph_clock_now();
271 int secs
= cct
->_conf
->rgw_objexp_gc_interval
;
273 if (secs
<= end
.sec())
274 continue; // next round
279 cond
.WaitInterval(lock
, utime_t(secs
, 0));
281 } while (!oe
->going_down());
286 void RGWObjectExpirer::OEWorker::stop()
288 Mutex::Locker
l(lock
);