1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
10 #include "auth/Crypto.h"
12 #include "common/armor.h"
13 #include "common/ceph_json.h"
14 #include "common/config.h"
15 #include "common/ceph_argparse.h"
16 #include "common/Formatter.h"
17 #include "common/errno.h"
19 #include "global/global_init.h"
21 #include "include/utime.h"
22 #include "include/str_list.h"
25 #include "rgw_bucket.h"
27 #include "rgw_acl_s3.h"
29 #include "rgw_formats.h"
30 #include "rgw_usage.h"
31 #include "rgw_object_expirer_core.h"
33 #include "rgw_sal_rados.h"
35 #include "services/svc_rados.h"
36 #include "services/svc_zone.h"
37 #include "services/svc_sys_obj.h"
38 #include "services/svc_bi_rados.h"
40 #include "cls/lock/cls_lock_client.h"
41 #include "cls/timeindex/cls_timeindex_client.h"
43 #define dout_context g_ceph_context
44 #define dout_subsys ceph_subsys_rgw
48 static string objexp_lock_name
= "gc_process";
50 static string
objexp_hint_get_shardname(int shard_num
)
53 snprintf(buf
, sizeof(buf
), "obj_delete_at_hint.%010u", (unsigned)shard_num
);
57 static int objexp_key_shard(const rgw_obj_index_key
& key
, int num_shards
)
59 string obj_key
= key
.name
+ key
.instance
;
60 return RGWSI_BucketIndex_RADOS::bucket_shard_index(obj_key
, num_shards
);
63 static string
objexp_hint_get_keyext(const string
& tenant_name
,
64 const string
& bucket_name
,
65 const string
& bucket_id
,
66 const rgw_obj_key
& obj_key
) {
67 return tenant_name
+ (tenant_name
.empty() ? "" : ":") + bucket_name
+ ":" + bucket_id
+
68 ":" + obj_key
.name
+ ":" + obj_key
.instance
;
71 static void objexp_get_shard(int shard_num
,
74 *shard
= objexp_hint_get_shardname(shard_num
);
77 static int objexp_hint_parse(const DoutPrefixProvider
*dpp
, CephContext
*cct
, cls_timeindex_entry
&ti_entry
,
78 objexp_hint_entry
*hint_entry
)
81 auto iter
= ti_entry
.value
.cbegin();
82 decode(*hint_entry
, iter
);
83 } catch (buffer::error
& err
) {
84 ldpp_dout(dpp
, 0) << "ERROR: couldn't decode avail_pools" << dendl
;
90 int RGWObjExpStore::objexp_hint_add(const DoutPrefixProvider
*dpp
,
91 const ceph::real_time
& delete_at
,
92 const string
& tenant_name
,
93 const string
& bucket_name
,
94 const string
& bucket_id
,
95 const rgw_obj_index_key
& obj_key
)
97 const string keyext
= objexp_hint_get_keyext(tenant_name
, bucket_name
,
99 objexp_hint_entry he
= {
100 .tenant
= tenant_name
,
101 .bucket_name
= bucket_name
,
102 .bucket_id
= bucket_id
,
104 .exp_time
= delete_at
};
107 librados::ObjectWriteOperation op
;
108 cls_timeindex_add(op
, utime_t(delete_at
), keyext
, hebl
);
110 string shard_name
= objexp_hint_get_shardname(objexp_key_shard(obj_key
, cct
->_conf
->rgw_objexp_hints_num_shards
));
111 auto obj
= rados_svc
->obj(rgw_raw_obj(driver
->svc()->zone
->get_zone_params().log_pool
, shard_name
));
112 int r
= obj
.open(dpp
);
114 ldpp_dout(dpp
, 0) << "ERROR: " << __func__
<< "(): failed to open obj=" << obj
<< " (r=" << r
<< ")" << dendl
;
117 return obj
.operate(dpp
, &op
, null_yield
);
120 int RGWObjExpStore::objexp_hint_list(const DoutPrefixProvider
*dpp
,
122 const ceph::real_time
& start_time
,
123 const ceph::real_time
& end_time
,
124 const int max_entries
,
125 const string
& marker
,
126 list
<cls_timeindex_entry
>& entries
, /* out */
127 string
*out_marker
, /* out */
128 bool *truncated
) /* out */
130 librados::ObjectReadOperation op
;
131 cls_timeindex_list(op
, utime_t(start_time
), utime_t(end_time
), marker
, max_entries
, entries
,
132 out_marker
, truncated
);
134 auto obj
= rados_svc
->obj(rgw_raw_obj(driver
->svc()->zone
->get_zone_params().log_pool
, oid
));
135 int r
= obj
.open(dpp
);
137 ldpp_dout(dpp
, 0) << "ERROR: " << __func__
<< "(): failed to open obj=" << obj
<< " (r=" << r
<< ")" << dendl
;
141 int ret
= obj
.operate(dpp
, &op
, &obl
, null_yield
);
143 if ((ret
< 0 ) && (ret
!= -ENOENT
)) {
147 if ((ret
== -ENOENT
) && truncated
) {
154 static int cls_timeindex_trim_repeat(const DoutPrefixProvider
*dpp
,
157 const utime_t
& from_time
,
158 const utime_t
& to_time
,
159 const string
& from_marker
,
160 const string
& to_marker
)
164 librados::ObjectWriteOperation op
;
165 cls_timeindex_trim(op
, from_time
, to_time
, from_marker
, to_marker
);
166 int r
= rgw_rados_operate(dpp
, ref
.pool
.ioctx(), oid
, &op
, null_yield
);
176 int RGWObjExpStore::objexp_hint_trim(const DoutPrefixProvider
*dpp
,
178 const ceph::real_time
& start_time
,
179 const ceph::real_time
& end_time
,
180 const string
& from_marker
,
181 const string
& to_marker
)
183 auto obj
= rados_svc
->obj(rgw_raw_obj(driver
->svc()->zone
->get_zone_params().log_pool
, oid
));
184 int r
= obj
.open(dpp
);
186 ldpp_dout(dpp
, 0) << "ERROR: " << __func__
<< "(): failed to open obj=" << obj
<< " (r=" << r
<< ")" << dendl
;
189 auto& ref
= obj
.get_ref();
190 int ret
= cls_timeindex_trim_repeat(dpp
, ref
, oid
, utime_t(start_time
), utime_t(end_time
),
191 from_marker
, to_marker
);
192 if ((ret
< 0 ) && (ret
!= -ENOENT
)) {
199 int RGWObjectExpirer::garbage_single_object(const DoutPrefixProvider
*dpp
, objexp_hint_entry
& hint
)
201 RGWBucketInfo bucket_info
;
202 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
204 int ret
= driver
->get_bucket(dpp
, nullptr, rgw_bucket(hint
.tenant
, hint
.bucket_name
, hint
.bucket_id
), &bucket
, null_yield
);
205 if (-ENOENT
== ret
) {
206 ldpp_dout(dpp
, 15) << "NOTICE: cannot find bucket = " \
207 << hint
.bucket_name
<< ". The object must be already removed" << dendl
;
208 return -ERR_PRECONDITION_FAILED
;
209 } else if (ret
< 0) {
210 ldpp_dout(dpp
, 1) << "ERROR: could not init bucket = " \
211 << hint
.bucket_name
<< "due to ret = " << ret
<< dendl
;
215 rgw_obj_key key
= hint
.obj_key
;
216 if (key
.instance
.empty()) {
217 key
.instance
= "null";
220 std::unique_ptr
<rgw::sal::Object
> obj
= bucket
->get_object(key
);
222 ret
= obj
->delete_object(dpp
, null_yield
);
227 void RGWObjectExpirer::garbage_chunk(const DoutPrefixProvider
*dpp
,
228 list
<cls_timeindex_entry
>& entries
, /* in */
229 bool& need_trim
) /* out */
233 for (list
<cls_timeindex_entry
>::iterator iter
= entries
.begin();
234 iter
!= entries
.end();
237 objexp_hint_entry hint
;
238 ldpp_dout(dpp
, 15) << "got removal hint for: " << iter
->key_ts
.sec() \
239 << " - " << iter
->key_ext
<< dendl
;
241 int ret
= objexp_hint_parse(dpp
, driver
->ctx(), *iter
, &hint
);
243 ldpp_dout(dpp
, 1) << "cannot parse removal hint for " << hint
.obj_key
<< dendl
;
247 /* PRECOND_FAILED simply means that our hint is not valid.
248 * We can silently ignore that and move forward. */
249 ret
= garbage_single_object(dpp
, hint
);
250 if (ret
== -ERR_PRECONDITION_FAILED
) {
251 ldpp_dout(dpp
, 15) << "not actual hint for object: " << hint
.obj_key
<< dendl
;
252 } else if (ret
< 0) {
253 ldpp_dout(dpp
, 1) << "cannot remove expired object: " << hint
.obj_key
<< dendl
;
262 void RGWObjectExpirer::trim_chunk(const DoutPrefixProvider
*dpp
,
266 const string
& from_marker
,
267 const string
& to_marker
)
269 ldpp_dout(dpp
, 20) << "trying to trim removal hints to=" << to
270 << ", to_marker=" << to_marker
<< dendl
;
272 real_time rt_from
= from
.to_real_time();
273 real_time rt_to
= to
.to_real_time();
275 int ret
= exp_store
.objexp_hint_trim(dpp
, shard
, rt_from
, rt_to
,
276 from_marker
, to_marker
);
278 ldpp_dout(dpp
, 0) << "ERROR during trim: " << ret
<< dendl
;
284 bool RGWObjectExpirer::process_single_shard(const DoutPrefixProvider
*dpp
,
286 const utime_t
& last_run
,
287 const utime_t
& round_start
)
291 bool truncated
= false;
294 CephContext
*cct
= driver
->ctx();
295 int num_entries
= cct
->_conf
->rgw_objexp_chunk_size
;
297 int max_secs
= cct
->_conf
->rgw_objexp_gc_interval
;
298 utime_t end
= ceph_clock_now();
301 rados::cls::lock::Lock
l(objexp_lock_name
);
303 utime_t
time(max_secs
, 0);
304 l
.set_duration(time
);
306 int ret
= l
.lock_exclusive(&static_cast<rgw::sal::RadosStore
*>(driver
)->getRados()->objexp_pool_ctx
, shard
);
307 if (ret
== -EBUSY
) { /* already locked by another processor */
308 ldpp_dout(dpp
, 5) << __func__
<< "(): failed to acquire lock on " << shard
<< dendl
;
313 real_time rt_last
= last_run
.to_real_time();
314 real_time rt_start
= round_start
.to_real_time();
316 list
<cls_timeindex_entry
> entries
;
317 ret
= exp_store
.objexp_hint_list(dpp
, shard
, rt_last
, rt_start
,
318 num_entries
, marker
, entries
,
319 &out_marker
, &truncated
);
321 ldpp_dout(dpp
, 10) << "cannot get removal hints from shard: " << shard
327 garbage_chunk(dpp
, entries
, need_trim
);
330 trim_chunk(dpp
, shard
, last_run
, round_start
, marker
, out_marker
);
333 utime_t now
= ceph_clock_now();
342 l
.unlock(&static_cast<rgw::sal::RadosStore
*>(driver
)->getRados()->objexp_pool_ctx
, shard
);
346 /* Returns true if all shards have been processed successfully. */
347 bool RGWObjectExpirer::inspect_all_shards(const DoutPrefixProvider
*dpp
,
348 const utime_t
& last_run
,
349 const utime_t
& round_start
)
351 CephContext
* const cct
= driver
->ctx();
352 int num_shards
= cct
->_conf
->rgw_objexp_hints_num_shards
;
353 bool all_done
= true;
355 for (int i
= 0; i
< num_shards
; i
++) {
357 objexp_get_shard(i
, &shard
);
359 ldpp_dout(dpp
, 20) << "processing shard = " << shard
<< dendl
;
361 if (! process_single_shard(dpp
, shard
, last_run
, round_start
)) {
369 bool RGWObjectExpirer::going_down()
374 void RGWObjectExpirer::start_processor()
376 worker
= new OEWorker(driver
->ctx(), this);
377 worker
->create("rgw_obj_expirer");
380 void RGWObjectExpirer::stop_processor()
391 void *RGWObjectExpirer::OEWorker::entry() {
394 utime_t start
= ceph_clock_now();
395 ldpp_dout(this, 2) << "object expiration: start" << dendl
;
396 if (oe
->inspect_all_shards(this, last_run
, start
)) {
397 /* All shards have been processed properly. Next time we can start
398 * from this moment. */
401 ldpp_dout(this, 2) << "object expiration: stop" << dendl
;
404 if (oe
->going_down())
407 utime_t end
= ceph_clock_now();
409 int secs
= cct
->_conf
->rgw_objexp_gc_interval
;
411 if (secs
<= end
.sec())
412 continue; // next round
416 std::unique_lock l
{lock
};
417 cond
.wait_for(l
, std::chrono::seconds(secs
));
418 } while (!oe
->going_down());
423 void RGWObjectExpirer::OEWorker::stop()
425 std::lock_guard l
{lock
};
429 CephContext
*RGWObjectExpirer::OEWorker::get_cct() const
434 unsigned RGWObjectExpirer::OEWorker::get_subsys() const
439 std::ostream
& RGWObjectExpirer::OEWorker::gen_prefix(std::ostream
& out
) const
441 return out
<< "rgw object expirer Worker thread: ";