1 #ifndef CEPH_CLS_RGW_CLIENT_H
2 #define CEPH_CLS_RGW_CLIENT_H
4 #include "include/str_list.h"
5 #include "include/rados/librados.hpp"
6 #include "cls_rgw_ops.h"
7 #include "cls_rgw_const.h"
8 #include "common/RefCountedObj.h"
9 #include "include/compat.h"
10 #include "common/ceph_time.h"
12 // Forward declaration
13 class BucketIndexAioManager
;
15 * Bucket index AIO request argument, this is used to pass a argument
18 struct BucketIndexAioArg
: public RefCountedObject
{
19 BucketIndexAioArg(int _id
, BucketIndexAioManager
* _manager
) :
20 id(_id
), manager(_manager
) {}
22 BucketIndexAioManager
* manager
;
26 * This class manages AIO completions. This class is not completely thread-safe,
27 * methods like *get_next* is not thread-safe and is expected to be called from
30 class BucketIndexAioManager
{
32 map
<int, librados::AioCompletion
*> pendings
;
33 map
<int, librados::AioCompletion
*> completions
;
34 map
<int, string
> pending_objs
;
35 map
<int, string
> completion_objs
;
40 * Callback implementation for AIO request.
42 static void bucket_index_op_completion_cb(void* cb
, void* arg
) {
43 BucketIndexAioArg
* cb_arg
= (BucketIndexAioArg
*) arg
;
44 cb_arg
->manager
->do_completion(cb_arg
->id
);
49 * Get next request ID. This method is not thread-safe.
51 * Return next request ID.
53 int get_next() { return next
++; }
56 * Add a new pending AIO completion instance.
58 * @param id - the request ID.
59 * @param completion - the AIO completion instance.
60 * @param oid - the object id associated with the object, if it is NULL, we don't
61 * track the object id per callback.
63 void add_pending(int id
, librados::AioCompletion
* completion
, const string
& oid
) {
64 pendings
[id
] = completion
;
65 pending_objs
[id
] = oid
;
69 * Create a new instance.
71 BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
75 * Do completion for the given AIO request.
77 void do_completion(int id
);
80 * Wait for AIO completions.
82 * valid_ret_code - valid AIO return code.
83 * num_completions - number of completions.
84 * ret_code - return code of failed AIO.
85 * objs - a list of objects that has been finished the AIO.
87 * Return false if there is no pending AIO, true otherwise.
89 bool wait_for_completions(int valid_ret_code
, int *num_completions
, int *ret_code
,
90 map
<int, string
> *objs
);
93 * Do aio read operation.
95 bool aio_operate(librados::IoCtx
& io_ctx
, const string
& oid
, librados::ObjectReadOperation
*op
) {
96 Mutex::Locker
l(lock
);
97 BucketIndexAioArg
*arg
= new BucketIndexAioArg(get_next(), this);
98 librados::AioCompletion
*c
= librados::Rados::aio_create_completion((void*)arg
, NULL
, bucket_index_op_completion_cb
);
99 int r
= io_ctx
.aio_operate(oid
, c
, (librados::ObjectReadOperation
*)op
, NULL
);
101 add_pending(arg
->id
, c
, oid
);
109 * Do aio write operation.
111 bool aio_operate(librados::IoCtx
& io_ctx
, const string
& oid
, librados::ObjectWriteOperation
*op
) {
112 Mutex::Locker
l(lock
);
113 BucketIndexAioArg
*arg
= new BucketIndexAioArg(get_next(), this);
114 librados::AioCompletion
*c
= librados::Rados::aio_create_completion((void*)arg
, NULL
, bucket_index_op_completion_cb
);
115 int r
= io_ctx
.aio_operate(oid
, c
, (librados::ObjectWriteOperation
*)op
);
117 add_pending(arg
->id
, c
, oid
);
125 class RGWGetDirHeader_CB
: public RefCountedObject
{
127 ~RGWGetDirHeader_CB() override
{}
128 virtual void handle_response(int r
, rgw_bucket_dir_header
& header
) = 0;
131 class BucketIndexShardsManager
{
133 // Per shard setting manager, for example, marker.
134 map
<int, string
> value_by_shards
;
136 const static string KEY_VALUE_SEPARATOR
;
137 const static string SHARDS_SEPARATOR
;
139 void add(int shard
, const string
& value
) {
140 value_by_shards
[shard
] = value
;
143 const string
& get(int shard
, const string
& default_value
) {
144 map
<int, string
>::iterator iter
= value_by_shards
.find(shard
);
145 return (iter
== value_by_shards
.end() ? default_value
: iter
->second
);
148 map
<int, string
>& get() {
149 return value_by_shards
;
153 return value_by_shards
.empty();
156 void to_string(string
*out
) const {
161 map
<int, string
>::const_iterator iter
= value_by_shards
.begin();
162 for (; iter
!= value_by_shards
.end(); ++iter
) {
164 // Not the first item, append a separator first
165 out
->append(SHARDS_SEPARATOR
);
168 snprintf(buf
, sizeof(buf
), "%d", iter
->first
);
170 out
->append(KEY_VALUE_SEPARATOR
);
171 out
->append(iter
->second
);
175 static bool is_shards_marker(const string
& marker
) {
176 return marker
.find(KEY_VALUE_SEPARATOR
) != string::npos
;
180 * convert from string. There are two options of how the string looks like:
182 * 1. Single shard, no shard id specified, e.g. 000001.23.1
184 * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
185 * bucket with no shards.
187 * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
190 int from_string(const string
& composed_marker
, int shard_id
) {
191 value_by_shards
.clear();
192 vector
<string
> shards
;
193 get_str_vec(composed_marker
, SHARDS_SEPARATOR
.c_str(), shards
);
194 if (shards
.size() > 1 && shard_id
>= 0) {
197 vector
<string
>::const_iterator iter
= shards
.begin();
198 for (; iter
!= shards
.end(); ++iter
) {
199 size_t pos
= iter
->find(KEY_VALUE_SEPARATOR
);
200 if (pos
== string::npos
) {
201 if (!value_by_shards
.empty()) {
207 add(shard_id
, *iter
);
211 string shard_str
= iter
->substr(0, pos
);
213 int shard
= (int)strict_strtol(shard_str
.c_str(), 10, &err
);
217 add(shard
, iter
->substr(pos
+ 1));
222 // trim the '<shard-id>#' prefix from a single shard marker if present
223 static std::string
get_shard_marker(const std::string
& marker
) {
224 auto p
= marker
.find(KEY_VALUE_SEPARATOR
);
225 if (p
== marker
.npos
) {
228 return marker
.substr(p
+ 1);
233 void cls_rgw_bucket_init(librados::ObjectWriteOperation
& o
);
235 class CLSRGWConcurrentIO
{
237 librados::IoCtx
& io_ctx
;
238 map
<int, string
>& objs_container
;
239 map
<int, string
>::iterator iter
;
241 BucketIndexAioManager manager
;
243 virtual int issue_op(int shard_id
, const string
& oid
) = 0;
245 virtual void cleanup() {}
246 virtual int valid_ret_code() { return 0; }
247 // Return true if multiple rounds of OPs might be needed, this happens when
248 // OP needs to be re-send until a certain code is returned.
249 virtual bool need_multiple_rounds() { return false; }
250 // Add a new object to the end of the container.
251 virtual void add_object(int shard
, const string
& oid
) {}
252 virtual void reset_container(map
<int, string
>& objs
) {}
255 CLSRGWConcurrentIO(librados::IoCtx
& ioc
, map
<int, string
>& _objs_container
,
256 uint32_t _max_aio
) : io_ctx(ioc
), objs_container(_objs_container
), max_aio(_max_aio
) {}
257 virtual ~CLSRGWConcurrentIO() {}
261 iter
= objs_container
.begin();
262 for (; iter
!= objs_container
.end() && max_aio
-- > 0; ++iter
) {
263 ret
= issue_op(iter
->first
, iter
->second
);
268 int num_completions
, r
= 0;
269 map
<int, string
> objs
;
270 map
<int, string
> *pobjs
= (need_multiple_rounds() ? &objs
: NULL
);
271 while (manager
.wait_for_completions(valid_ret_code(), &num_completions
, &r
, pobjs
)) {
272 if (r
>= 0 && ret
>= 0) {
273 for(int i
= 0; i
< num_completions
&& iter
!= objs_container
.end(); ++i
, ++iter
) {
274 int issue_ret
= issue_op(iter
->first
, iter
->second
);
280 } else if (ret
>= 0) {
283 if (need_multiple_rounds() && iter
== objs_container
.end() && !objs
.empty()) {
284 // For those objects which need another round, use them to reset
286 reset_container(objs
);
297 class CLSRGWIssueBucketIndexInit
: public CLSRGWConcurrentIO
{
299 int issue_op(int shard_id
, const string
& oid
) override
;
300 int valid_ret_code() override
{ return -EEXIST
; }
301 void cleanup() override
;
303 CLSRGWIssueBucketIndexInit(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
305 CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
) {}
308 class CLSRGWIssueSetTagTimeout
: public CLSRGWConcurrentIO
{
309 uint64_t tag_timeout
;
311 int issue_op(int shard_id
, const string
& oid
) override
;
313 CLSRGWIssueSetTagTimeout(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
314 uint32_t _max_aio
, uint64_t _tag_timeout
) :
315 CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
), tag_timeout(_tag_timeout
) {}
318 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation
& o
, bool absolute
,
319 const map
<uint8_t, rgw_bucket_category_stats
>& stats
);
321 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation
& o
, RGWModifyOp op
, string
& tag
,
322 const cls_rgw_obj_key
& key
, const string
& locator
, bool log_op
,
323 uint16_t bilog_op
, rgw_zone_set
& zones_trace
);
325 void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation
& o
, RGWModifyOp op
, string
& tag
,
326 rgw_bucket_entry_ver
& ver
,
327 const cls_rgw_obj_key
& key
,
328 rgw_bucket_dir_entry_meta
& dir_meta
,
329 list
<cls_rgw_obj_key
> *remove_objs
, bool log_op
,
330 uint16_t bilog_op
, rgw_zone_set
*zones_trace
);
332 void cls_rgw_remove_obj(librados::ObjectWriteOperation
& o
, list
<string
>& keep_attr_prefixes
);
333 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation
& o
, const string
& attr
);
334 void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation
& o
, const string
& prefix
, bool fail_if_exist
);
335 void cls_rgw_obj_check_mtime(librados::ObjectOperation
& o
, const ceph::real_time
& mtime
, bool high_precision_time
, RGWCheckMTimeType type
);
337 int cls_rgw_bi_get(librados::IoCtx
& io_ctx
, const string oid
,
338 BIIndexType index_type
, cls_rgw_obj_key
& key
,
339 rgw_cls_bi_entry
*entry
);
340 int cls_rgw_bi_put(librados::IoCtx
& io_ctx
, const string oid
, rgw_cls_bi_entry
& entry
);
341 void cls_rgw_bi_put(librados::ObjectWriteOperation
& op
, const string oid
, rgw_cls_bi_entry
& entry
);
342 int cls_rgw_bi_list(librados::IoCtx
& io_ctx
, const string oid
,
343 const string
& name
, const string
& marker
, uint32_t max
,
344 list
<rgw_cls_bi_entry
> *entries
, bool *is_truncated
);
347 int cls_rgw_bucket_link_olh(librados::IoCtx
& io_ctx
, librados::ObjectWriteOperation
& op
,
348 const string
& oid
, const cls_rgw_obj_key
& key
, bufferlist
& olh_tag
,
349 bool delete_marker
, const string
& op_tag
, struct rgw_bucket_dir_entry_meta
*meta
,
350 uint64_t olh_epoch
, ceph::real_time unmod_since
, bool high_precision_time
, bool log_op
, rgw_zone_set
& zones_trace
);
351 int cls_rgw_bucket_unlink_instance(librados::IoCtx
& io_ctx
, librados::ObjectWriteOperation
& op
,
352 const string
& oid
, const cls_rgw_obj_key
& key
, const string
& op_tag
,
353 const string
& olh_tag
, uint64_t olh_epoch
, bool log_op
, rgw_zone_set
& zones_trace
);
354 int cls_rgw_get_olh_log(librados::IoCtx
& io_ctx
, string
& oid
, librados::ObjectReadOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver_marker
,
355 const string
& olh_tag
,
356 map
<uint64_t, vector
<struct rgw_bucket_olh_log_entry
> > *log
, bool *is_truncated
);
357 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver
, const string
& olh_tag
);
358 int cls_rgw_clear_olh(librados::IoCtx
& io_ctx
, librados::ObjectWriteOperation
& op
, string
& oid
, const cls_rgw_obj_key
& olh
, const string
& olh_tag
);
361 * List the bucket with the starting object and filter prefix.
362 * NOTE: this method do listing requests for each bucket index shards identified by
363 * the keys of the *list_results* map, which means the map should be popludated
364 * by the caller to fill with each bucket index object id.
366 * io_ctx - IO context for rados.
367 * start_obj - marker for the listing.
368 * filter_prefix - filter prefix.
369 * num_entries - number of entries to request for each object (note the total
370 * amount of entries returned depends on the number of shardings).
371 * list_results - the list results keyed by bucket index object id.
372 * max_aio - the maximum number of AIO (for throttling).
374 * Return 0 on success, a failure code otherwise.
377 class CLSRGWIssueBucketList
: public CLSRGWConcurrentIO
{
378 cls_rgw_obj_key start_obj
;
379 string filter_prefix
;
380 uint32_t num_entries
;
382 map
<int, rgw_cls_list_ret
>& result
;
384 int issue_op(int shard_id
, const string
& oid
) override
;
386 CLSRGWIssueBucketList(librados::IoCtx
& io_ctx
, const cls_rgw_obj_key
& _start_obj
,
387 const string
& _filter_prefix
, uint32_t _num_entries
,
389 map
<int, string
>& oids
,
390 map
<int, struct rgw_cls_list_ret
>& list_results
,
392 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
),
393 start_obj(_start_obj
), filter_prefix(_filter_prefix
), num_entries(_num_entries
), list_versions(_list_versions
), result(list_results
) {}
396 class CLSRGWIssueBILogList
: public CLSRGWConcurrentIO
{
397 map
<int, struct cls_rgw_bi_log_list_ret
>& result
;
398 BucketIndexShardsManager
& marker_mgr
;
401 int issue_op(int shard_id
, const string
& oid
) override
;
403 CLSRGWIssueBILogList(librados::IoCtx
& io_ctx
, BucketIndexShardsManager
& _marker_mgr
, uint32_t _max
,
404 map
<int, string
>& oids
,
405 map
<int, struct cls_rgw_bi_log_list_ret
>& bi_log_lists
, uint32_t max_aio
) :
406 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
), result(bi_log_lists
),
407 marker_mgr(_marker_mgr
), max(_max
) {}
410 class CLSRGWIssueBILogTrim
: public CLSRGWConcurrentIO
{
411 BucketIndexShardsManager
& start_marker_mgr
;
412 BucketIndexShardsManager
& end_marker_mgr
;
414 int issue_op(int shard_id
, const string
& oid
) override
;
415 // Trim until -ENODATA is returned.
416 int valid_ret_code() override
{ return -ENODATA
; }
417 bool need_multiple_rounds() override
{ return true; }
418 void add_object(int shard
, const string
& oid
) override
{ objs_container
[shard
] = oid
; }
419 void reset_container(map
<int, string
>& objs
) override
{
420 objs_container
.swap(objs
);
421 iter
= objs_container
.begin();
425 CLSRGWIssueBILogTrim(librados::IoCtx
& io_ctx
, BucketIndexShardsManager
& _start_marker_mgr
,
426 BucketIndexShardsManager
& _end_marker_mgr
, map
<int, string
>& _bucket_objs
, uint32_t max_aio
) :
427 CLSRGWConcurrentIO(io_ctx
, _bucket_objs
, max_aio
),
428 start_marker_mgr(_start_marker_mgr
), end_marker_mgr(_end_marker_mgr
) {}
432 * Check the bucket index.
434 * io_ctx - IO context for rados.
435 * bucket_objs_ret - check result for all shards.
436 * max_aio - the maximum number of AIO (for throttling).
438 * Return 0 on success, a failure code otherwise.
440 class CLSRGWIssueBucketCheck
: public CLSRGWConcurrentIO
/*<map<string, struct rgw_cls_check_index_ret> >*/ {
441 map
<int, struct rgw_cls_check_index_ret
>& result
;
443 int issue_op(int shard_id
, const string
& oid
) override
;
445 CLSRGWIssueBucketCheck(librados::IoCtx
& ioc
, map
<int, string
>& oids
, map
<int, struct rgw_cls_check_index_ret
>& bucket_objs_ret
,
447 CLSRGWConcurrentIO(ioc
, oids
, _max_aio
), result(bucket_objs_ret
) {}
450 class CLSRGWIssueBucketRebuild
: public CLSRGWConcurrentIO
{
452 int issue_op(int shard_id
, const string
& oid
) override
;
454 CLSRGWIssueBucketRebuild(librados::IoCtx
& io_ctx
, map
<int, string
>& bucket_objs
,
455 uint32_t max_aio
) : CLSRGWConcurrentIO(io_ctx
, bucket_objs
, max_aio
) {}
458 class CLSRGWIssueGetDirHeader
: public CLSRGWConcurrentIO
{
459 map
<int, rgw_cls_list_ret
>& result
;
461 int issue_op(int shard_id
, const string
& oid
) override
;
463 CLSRGWIssueGetDirHeader(librados::IoCtx
& io_ctx
, map
<int, string
>& oids
, map
<int, rgw_cls_list_ret
>& dir_headers
,
465 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
), result(dir_headers
) {}
468 class CLSRGWIssueSetBucketResharding
: public CLSRGWConcurrentIO
{
469 cls_rgw_bucket_instance_entry entry
;
471 int issue_op(int shard_id
, const string
& oid
) override
;
473 CLSRGWIssueSetBucketResharding(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
474 const cls_rgw_bucket_instance_entry
& _entry
,
475 uint32_t _max_aio
) : CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
), entry(_entry
) {}
478 class CLSRGWIssueResyncBucketBILog
: public CLSRGWConcurrentIO
{
480 int issue_op(int shard_id
, const string
& oid
);
482 CLSRGWIssueResyncBucketBILog(librados::IoCtx
& io_ctx
, map
<int, string
>& _bucket_objs
, uint32_t max_aio
) :
483 CLSRGWConcurrentIO(io_ctx
, _bucket_objs
, max_aio
) {}
486 class CLSRGWIssueBucketBILogStop
: public CLSRGWConcurrentIO
{
488 int issue_op(int shard_id
, const string
& oid
);
490 CLSRGWIssueBucketBILogStop(librados::IoCtx
& io_ctx
, map
<int, string
>& _bucket_objs
, uint32_t max_aio
) :
491 CLSRGWConcurrentIO(io_ctx
, _bucket_objs
, max_aio
) {}
494 int cls_rgw_get_dir_header_async(librados::IoCtx
& io_ctx
, string
& oid
, RGWGetDirHeader_CB
*ctx
);
496 void cls_rgw_encode_suggestion(char op
, rgw_bucket_dir_entry
& dirent
, bufferlist
& updates
);
498 void cls_rgw_suggest_changes(librados::ObjectWriteOperation
& o
, bufferlist
& updates
);
501 int cls_rgw_usage_log_read(librados::IoCtx
& io_ctx
, string
& oid
, string
& user
,
502 uint64_t start_epoch
, uint64_t end_epoch
, uint32_t max_entries
,
503 string
& read_iter
, map
<rgw_user_bucket
, rgw_usage_log_entry
>& usage
,
506 int cls_rgw_usage_log_trim(librados::IoCtx
& io_ctx
, const string
& oid
, string
& user
,
507 uint64_t start_epoch
, uint64_t end_epoch
);
509 void cls_rgw_usage_log_add(librados::ObjectWriteOperation
& op
, rgw_usage_log_info
& info
);
511 /* garbage collection */
512 void cls_rgw_gc_set_entry(librados::ObjectWriteOperation
& op
, uint32_t expiration_secs
, cls_rgw_gc_obj_info
& info
);
513 void cls_rgw_gc_defer_entry(librados::ObjectWriteOperation
& op
, uint32_t expiration_secs
, const string
& tag
);
515 int cls_rgw_gc_list(librados::IoCtx
& io_ctx
, string
& oid
, string
& marker
, uint32_t max
, bool expired_only
,
516 list
<cls_rgw_gc_obj_info
>& entries
, bool *truncated
, string
& next_marker
);
518 void cls_rgw_gc_remove(librados::ObjectWriteOperation
& op
, const list
<string
>& tags
);
521 int cls_rgw_lc_get_head(librados::IoCtx
& io_ctx
, string
& oid
, cls_rgw_lc_obj_head
& head
);
522 int cls_rgw_lc_put_head(librados::IoCtx
& io_ctx
, string
& oid
, cls_rgw_lc_obj_head
& head
);
523 int cls_rgw_lc_get_next_entry(librados::IoCtx
& io_ctx
, string
& oid
, string
& marker
, pair
<string
, int>& entry
);
524 int cls_rgw_lc_rm_entry(librados::IoCtx
& io_ctx
, string
& oid
, pair
<string
, int>& entry
);
525 int cls_rgw_lc_set_entry(librados::IoCtx
& io_ctx
, string
& oid
, pair
<string
, int>& entry
);
526 int cls_rgw_lc_list(librados::IoCtx
& io_ctx
, string
& oid
,
527 const string
& marker
,
528 uint32_t max_entries
,
529 map
<string
, int>& entries
);
532 void cls_rgw_reshard_add(librados::ObjectWriteOperation
& op
, const cls_rgw_reshard_entry
& entry
);
533 int cls_rgw_reshard_list(librados::IoCtx
& io_ctx
, const string
& oid
, string
& marker
, uint32_t max
,
534 list
<cls_rgw_reshard_entry
>& entries
, bool* is_truncated
);
535 int cls_rgw_reshard_get(librados::IoCtx
& io_ctx
, const string
& oid
, cls_rgw_reshard_entry
& entry
);
536 int cls_rgw_reshard_get_head(librados::IoCtx
& io_ctx
, const string
& oid
, cls_rgw_reshard_entry
& entry
);
537 void cls_rgw_reshard_remove(librados::ObjectWriteOperation
& op
, const cls_rgw_reshard_entry
& entry
);
539 /* resharding attribute */
540 int cls_rgw_set_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
,
541 const cls_rgw_bucket_instance_entry
& entry
);
542 int cls_rgw_clear_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
);
543 void cls_rgw_guard_bucket_resharding(librados::ObjectOperation
& op
, int ret_err
);
544 int cls_rgw_get_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
,
545 cls_rgw_bucket_instance_entry
*entry
);