1 #ifndef CEPH_CLS_RGW_CLIENT_H
2 #define CEPH_CLS_RGW_CLIENT_H
4 #include "include/str_list.h"
5 #include "include/rados/librados.hpp"
6 #include "cls_rgw_ops.h"
7 #include "common/RefCountedObj.h"
8 #include "include/compat.h"
9 #include "common/ceph_time.h"
11 // Forward declaration
12 class BucketIndexAioManager
;
14 * Bucket index AIO request argument, this is used to pass a argument
17 struct BucketIndexAioArg
: public RefCountedObject
{
18 BucketIndexAioArg(int _id
, BucketIndexAioManager
* _manager
) :
19 id(_id
), manager(_manager
) {}
21 BucketIndexAioManager
* manager
;
25 * This class manages AIO completions. This class is not completely thread-safe,
26 * methods like *get_next* is not thread-safe and is expected to be called from
29 class BucketIndexAioManager
{
31 map
<int, librados::AioCompletion
*> pendings
;
32 map
<int, librados::AioCompletion
*> completions
;
33 map
<int, string
> pending_objs
;
34 map
<int, string
> completion_objs
;
39 * Callback implementation for AIO request.
41 static void bucket_index_op_completion_cb(void* cb
, void* arg
) {
42 BucketIndexAioArg
* cb_arg
= (BucketIndexAioArg
*) arg
;
43 cb_arg
->manager
->do_completion(cb_arg
->id
);
48 * Get next request ID. This method is not thread-safe.
50 * Return next request ID.
52 int get_next() { return next
++; }
55 * Add a new pending AIO completion instance.
57 * @param id - the request ID.
58 * @param completion - the AIO completion instance.
59 * @param oid - the object id associated with the object, if it is NULL, we don't
60 * track the object id per callback.
62 void add_pending(int id
, librados::AioCompletion
* completion
, const string
& oid
) {
63 pendings
[id
] = completion
;
64 pending_objs
[id
] = oid
;
68 * Create a new instance.
70 BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
74 * Do completion for the given AIO request.
76 void do_completion(int id
);
79 * Wait for AIO completions.
81 * valid_ret_code - valid AIO return code.
82 * num_completions - number of completions.
83 * ret_code - return code of failed AIO.
84 * objs - a list of objects that has been finished the AIO.
86 * Return false if there is no pending AIO, true otherwise.
88 bool wait_for_completions(int valid_ret_code
, int *num_completions
, int *ret_code
,
89 map
<int, string
> *objs
);
92 * Do aio read operation.
94 bool aio_operate(librados::IoCtx
& io_ctx
, const string
& oid
, librados::ObjectReadOperation
*op
) {
95 Mutex::Locker
l(lock
);
96 BucketIndexAioArg
*arg
= new BucketIndexAioArg(get_next(), this);
97 librados::AioCompletion
*c
= librados::Rados::aio_create_completion((void*)arg
, NULL
, bucket_index_op_completion_cb
);
98 int r
= io_ctx
.aio_operate(oid
, c
, (librados::ObjectReadOperation
*)op
, NULL
);
100 add_pending(arg
->id
, c
, oid
);
108 * Do aio write operation.
110 bool aio_operate(librados::IoCtx
& io_ctx
, const string
& oid
, librados::ObjectWriteOperation
*op
) {
111 Mutex::Locker
l(lock
);
112 BucketIndexAioArg
*arg
= new BucketIndexAioArg(get_next(), this);
113 librados::AioCompletion
*c
= librados::Rados::aio_create_completion((void*)arg
, NULL
, bucket_index_op_completion_cb
);
114 int r
= io_ctx
.aio_operate(oid
, c
, (librados::ObjectWriteOperation
*)op
);
116 add_pending(arg
->id
, c
, oid
);
124 class RGWGetDirHeader_CB
: public RefCountedObject
{
126 ~RGWGetDirHeader_CB() override
{}
127 virtual void handle_response(int r
, rgw_bucket_dir_header
& header
) = 0;
130 class BucketIndexShardsManager
{
132 // Per shard setting manager, for example, marker.
133 map
<int, string
> value_by_shards
;
135 const static string KEY_VALUE_SEPARATOR
;
136 const static string SHARDS_SEPARATOR
;
138 void add(int shard
, const string
& value
) {
139 value_by_shards
[shard
] = value
;
142 const string
& get(int shard
, const string
& default_value
) {
143 map
<int, string
>::iterator iter
= value_by_shards
.find(shard
);
144 return (iter
== value_by_shards
.end() ? default_value
: iter
->second
);
147 map
<int, string
>& get() {
148 return value_by_shards
;
152 return value_by_shards
.empty();
155 void to_string(string
*out
) const {
160 map
<int, string
>::const_iterator iter
= value_by_shards
.begin();
161 for (; iter
!= value_by_shards
.end(); ++iter
) {
163 // Not the first item, append a separator first
164 out
->append(SHARDS_SEPARATOR
);
167 snprintf(buf
, sizeof(buf
), "%d", iter
->first
);
169 out
->append(KEY_VALUE_SEPARATOR
);
170 out
->append(iter
->second
);
174 static bool is_shards_marker(const string
& marker
) {
175 return marker
.find(KEY_VALUE_SEPARATOR
) != string::npos
;
179 * convert from string. There are two options of how the string looks like:
181 * 1. Single shard, no shard id specified, e.g. 000001.23.1
183 * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
184 * bucket with no shards.
186 * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
189 int from_string(const string
& composed_marker
, int shard_id
) {
190 value_by_shards
.clear();
191 vector
<string
> shards
;
192 get_str_vec(composed_marker
, SHARDS_SEPARATOR
.c_str(), shards
);
193 if (shards
.size() > 1 && shard_id
>= 0) {
196 vector
<string
>::const_iterator iter
= shards
.begin();
197 for (; iter
!= shards
.end(); ++iter
) {
198 size_t pos
= iter
->find(KEY_VALUE_SEPARATOR
);
199 if (pos
== string::npos
) {
200 if (!value_by_shards
.empty()) {
206 add(shard_id
, *iter
);
210 string shard_str
= iter
->substr(0, pos
);
212 int shard
= (int)strict_strtol(shard_str
.c_str(), 10, &err
);
216 add(shard
, iter
->substr(pos
+ 1));
223 void cls_rgw_bucket_init(librados::ObjectWriteOperation
& o
);
225 class CLSRGWConcurrentIO
{
227 librados::IoCtx
& io_ctx
;
228 map
<int, string
>& objs_container
;
229 map
<int, string
>::iterator iter
;
231 BucketIndexAioManager manager
;
233 virtual int issue_op(int shard_id
, const string
& oid
) = 0;
235 virtual void cleanup() {}
236 virtual int valid_ret_code() { return 0; }
237 // Return true if multiple rounds of OPs might be needed, this happens when
238 // OP needs to be re-send until a certain code is returned.
239 virtual bool need_multiple_rounds() { return false; }
240 // Add a new object to the end of the container.
241 virtual void add_object(int shard
, const string
& oid
) {}
242 virtual void reset_container(map
<int, string
>& objs
) {}
245 CLSRGWConcurrentIO(librados::IoCtx
& ioc
, map
<int, string
>& _objs_container
,
246 uint32_t _max_aio
) : io_ctx(ioc
), objs_container(_objs_container
), max_aio(_max_aio
) {}
247 virtual ~CLSRGWConcurrentIO() {}
251 iter
= objs_container
.begin();
252 for (; iter
!= objs_container
.end() && max_aio
-- > 0; ++iter
) {
253 ret
= issue_op(iter
->first
, iter
->second
);
258 int num_completions
, r
= 0;
259 map
<int, string
> objs
;
260 map
<int, string
> *pobjs
= (need_multiple_rounds() ? &objs
: NULL
);
261 while (manager
.wait_for_completions(valid_ret_code(), &num_completions
, &r
, pobjs
)) {
262 if (r
>= 0 && ret
>= 0) {
263 for(int i
= 0; i
< num_completions
&& iter
!= objs_container
.end(); ++i
, ++iter
) {
264 int issue_ret
= issue_op(iter
->first
, iter
->second
);
270 } else if (ret
>= 0) {
273 if (need_multiple_rounds() && iter
== objs_container
.end() && !objs
.empty()) {
274 // For those objects which need another round, use them to reset
276 reset_container(objs
);
287 class CLSRGWIssueBucketIndexInit
: public CLSRGWConcurrentIO
{
289 int issue_op(int shard_id
, const string
& oid
) override
;
290 int valid_ret_code() override
{ return -EEXIST
; }
291 void cleanup() override
;
293 CLSRGWIssueBucketIndexInit(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
295 CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
) {}
298 class CLSRGWIssueSetTagTimeout
: public CLSRGWConcurrentIO
{
299 uint64_t tag_timeout
;
301 int issue_op(int shard_id
, const string
& oid
) override
;
303 CLSRGWIssueSetTagTimeout(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
304 uint32_t _max_aio
, uint64_t _tag_timeout
) :
305 CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
), tag_timeout(_tag_timeout
) {}
308 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation
& o
, bool absolute
,
309 const map
<uint8_t, rgw_bucket_category_stats
>& stats
);
311 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation
& o
, RGWModifyOp op
, string
& tag
,
312 const cls_rgw_obj_key
& key
, const string
& locator
, bool log_op
,
313 uint16_t bilog_op
, rgw_zone_set
& zones_trace
);
315 void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation
& o
, RGWModifyOp op
, string
& tag
,
316 rgw_bucket_entry_ver
& ver
,
317 const cls_rgw_obj_key
& key
,
318 rgw_bucket_dir_entry_meta
& dir_meta
,
319 list
<cls_rgw_obj_key
> *remove_objs
, bool log_op
,
320 uint16_t bilog_op
, rgw_zone_set
*zones_trace
);
322 void cls_rgw_remove_obj(librados::ObjectWriteOperation
& o
, list
<string
>& keep_attr_prefixes
);
323 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation
& o
, const string
& attr
);
324 void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation
& o
, const string
& prefix
, bool fail_if_exist
);
325 void cls_rgw_obj_check_mtime(librados::ObjectOperation
& o
, const ceph::real_time
& mtime
, bool high_precision_time
, RGWCheckMTimeType type
);
327 int cls_rgw_bi_get(librados::IoCtx
& io_ctx
, const string oid
,
328 BIIndexType index_type
, cls_rgw_obj_key
& key
,
329 rgw_cls_bi_entry
*entry
);
330 int cls_rgw_bi_put(librados::IoCtx
& io_ctx
, const string oid
, rgw_cls_bi_entry
& entry
);
331 void cls_rgw_bi_put(librados::ObjectWriteOperation
& op
, const string oid
, rgw_cls_bi_entry
& entry
);
332 int cls_rgw_bi_list(librados::IoCtx
& io_ctx
, const string oid
,
333 const string
& name
, const string
& marker
, uint32_t max
,
334 list
<rgw_cls_bi_entry
> *entries
, bool *is_truncated
);
337 int cls_rgw_bucket_link_olh(librados::IoCtx
& io_ctx
, librados::ObjectWriteOperation
& op
,
338 const string
& oid
, const cls_rgw_obj_key
& key
, bufferlist
& olh_tag
,
339 bool delete_marker
, const string
& op_tag
, struct rgw_bucket_dir_entry_meta
*meta
,
340 uint64_t olh_epoch
, ceph::real_time unmod_since
, bool high_precision_time
, bool log_op
, rgw_zone_set
& zones_trace
);
341 int cls_rgw_bucket_unlink_instance(librados::IoCtx
& io_ctx
, librados::ObjectWriteOperation
& op
,
342 const string
& oid
, const cls_rgw_obj_key
& key
, const string
& op_tag
,
343 const string
& olh_tag
, uint64_t olh_epoch
, bool log_op
, rgw_zone_set
& zones_trace
);
344 int cls_rgw_get_olh_log(librados::IoCtx
& io_ctx
, string
& oid
, librados::ObjectReadOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver_marker
,
345 const string
& olh_tag
,
346 map
<uint64_t, vector
<struct rgw_bucket_olh_log_entry
> > *log
, bool *is_truncated
);
347 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver
, const string
& olh_tag
);
348 int cls_rgw_clear_olh(librados::IoCtx
& io_ctx
, librados::ObjectWriteOperation
& op
, string
& oid
, const cls_rgw_obj_key
& olh
, const string
& olh_tag
);
351 * List the bucket with the starting object and filter prefix.
352 * NOTE: this method do listing requests for each bucket index shards identified by
353 * the keys of the *list_results* map, which means the map should be popludated
354 * by the caller to fill with each bucket index object id.
356 * io_ctx - IO context for rados.
357 * start_obj - marker for the listing.
358 * filter_prefix - filter prefix.
359 * num_entries - number of entries to request for each object (note the total
360 * amount of entries returned depends on the number of shardings).
361 * list_results - the list results keyed by bucket index object id.
362 * max_aio - the maximum number of AIO (for throttling).
364 * Return 0 on success, a failure code otherwise.
367 class CLSRGWIssueBucketList
: public CLSRGWConcurrentIO
{
368 cls_rgw_obj_key start_obj
;
369 string filter_prefix
;
370 uint32_t num_entries
;
372 map
<int, rgw_cls_list_ret
>& result
;
374 int issue_op(int shard_id
, const string
& oid
) override
;
376 CLSRGWIssueBucketList(librados::IoCtx
& io_ctx
, const cls_rgw_obj_key
& _start_obj
,
377 const string
& _filter_prefix
, uint32_t _num_entries
,
379 map
<int, string
>& oids
,
380 map
<int, struct rgw_cls_list_ret
>& list_results
,
382 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
),
383 start_obj(_start_obj
), filter_prefix(_filter_prefix
), num_entries(_num_entries
), list_versions(_list_versions
), result(list_results
) {}
386 class CLSRGWIssueBILogList
: public CLSRGWConcurrentIO
{
387 map
<int, struct cls_rgw_bi_log_list_ret
>& result
;
388 BucketIndexShardsManager
& marker_mgr
;
391 int issue_op(int shard_id
, const string
& oid
) override
;
393 CLSRGWIssueBILogList(librados::IoCtx
& io_ctx
, BucketIndexShardsManager
& _marker_mgr
, uint32_t _max
,
394 map
<int, string
>& oids
,
395 map
<int, struct cls_rgw_bi_log_list_ret
>& bi_log_lists
, uint32_t max_aio
) :
396 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
), result(bi_log_lists
),
397 marker_mgr(_marker_mgr
), max(_max
) {}
400 class CLSRGWIssueBILogTrim
: public CLSRGWConcurrentIO
{
401 BucketIndexShardsManager
& start_marker_mgr
;
402 BucketIndexShardsManager
& end_marker_mgr
;
404 int issue_op(int shard_id
, const string
& oid
) override
;
405 // Trim until -ENODATA is returned.
406 int valid_ret_code() override
{ return -ENODATA
; }
407 bool need_multiple_rounds() override
{ return true; }
408 void add_object(int shard
, const string
& oid
) override
{ objs_container
[shard
] = oid
; }
409 void reset_container(map
<int, string
>& objs
) override
{
410 objs_container
.swap(objs
);
411 iter
= objs_container
.begin();
415 CLSRGWIssueBILogTrim(librados::IoCtx
& io_ctx
, BucketIndexShardsManager
& _start_marker_mgr
,
416 BucketIndexShardsManager
& _end_marker_mgr
, map
<int, string
>& _bucket_objs
, uint32_t max_aio
) :
417 CLSRGWConcurrentIO(io_ctx
, _bucket_objs
, max_aio
),
418 start_marker_mgr(_start_marker_mgr
), end_marker_mgr(_end_marker_mgr
) {}
422 * Check the bucket index.
424 * io_ctx - IO context for rados.
425 * bucket_objs_ret - check result for all shards.
426 * max_aio - the maximum number of AIO (for throttling).
428 * Return 0 on success, a failure code otherwise.
430 class CLSRGWIssueBucketCheck
: public CLSRGWConcurrentIO
/*<map<string, struct rgw_cls_check_index_ret> >*/ {
431 map
<int, struct rgw_cls_check_index_ret
>& result
;
433 int issue_op(int shard_id
, const string
& oid
) override
;
435 CLSRGWIssueBucketCheck(librados::IoCtx
& ioc
, map
<int, string
>& oids
, map
<int, struct rgw_cls_check_index_ret
>& bucket_objs_ret
,
437 CLSRGWConcurrentIO(ioc
, oids
, _max_aio
), result(bucket_objs_ret
) {}
440 class CLSRGWIssueBucketRebuild
: public CLSRGWConcurrentIO
{
442 int issue_op(int shard_id
, const string
& oid
) override
;
444 CLSRGWIssueBucketRebuild(librados::IoCtx
& io_ctx
, map
<int, string
>& bucket_objs
,
445 uint32_t max_aio
) : CLSRGWConcurrentIO(io_ctx
, bucket_objs
, max_aio
) {}
448 class CLSRGWIssueGetDirHeader
: public CLSRGWConcurrentIO
{
449 map
<int, rgw_cls_list_ret
>& result
;
451 int issue_op(int shard_id
, const string
& oid
) override
;
453 CLSRGWIssueGetDirHeader(librados::IoCtx
& io_ctx
, map
<int, string
>& oids
, map
<int, rgw_cls_list_ret
>& dir_headers
,
455 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
), result(dir_headers
) {}
458 class CLSRGWIssueSetBucketResharding
: public CLSRGWConcurrentIO
{
459 cls_rgw_bucket_instance_entry entry
;
461 int issue_op(int shard_id
, const string
& oid
) override
;
463 CLSRGWIssueSetBucketResharding(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
464 const cls_rgw_bucket_instance_entry
& _entry
,
465 uint32_t _max_aio
) : CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
), entry(_entry
) {}
468 int cls_rgw_get_dir_header_async(librados::IoCtx
& io_ctx
, string
& oid
, RGWGetDirHeader_CB
*ctx
);
470 void cls_rgw_encode_suggestion(char op
, rgw_bucket_dir_entry
& dirent
, bufferlist
& updates
);
472 void cls_rgw_suggest_changes(librados::ObjectWriteOperation
& o
, bufferlist
& updates
);
475 int cls_rgw_usage_log_read(librados::IoCtx
& io_ctx
, string
& oid
, string
& user
,
476 uint64_t start_epoch
, uint64_t end_epoch
, uint32_t max_entries
,
477 string
& read_iter
, map
<rgw_user_bucket
, rgw_usage_log_entry
>& usage
,
480 void cls_rgw_usage_log_trim(librados::ObjectWriteOperation
& op
, string
& user
,
481 uint64_t start_epoch
, uint64_t end_epoch
);
483 void cls_rgw_usage_log_add(librados::ObjectWriteOperation
& op
, rgw_usage_log_info
& info
);
485 /* garbage collection */
486 void cls_rgw_gc_set_entry(librados::ObjectWriteOperation
& op
, uint32_t expiration_secs
, cls_rgw_gc_obj_info
& info
);
487 void cls_rgw_gc_defer_entry(librados::ObjectWriteOperation
& op
, uint32_t expiration_secs
, const string
& tag
);
489 int cls_rgw_gc_list(librados::IoCtx
& io_ctx
, string
& oid
, string
& marker
, uint32_t max
, bool expired_only
,
490 list
<cls_rgw_gc_obj_info
>& entries
, bool *truncated
, string
& next_marker
);
492 void cls_rgw_gc_remove(librados::ObjectWriteOperation
& op
, const list
<string
>& tags
);
495 int cls_rgw_lc_get_head(librados::IoCtx
& io_ctx
, string
& oid
, cls_rgw_lc_obj_head
& head
);
496 int cls_rgw_lc_put_head(librados::IoCtx
& io_ctx
, string
& oid
, cls_rgw_lc_obj_head
& head
);
497 int cls_rgw_lc_get_next_entry(librados::IoCtx
& io_ctx
, string
& oid
, string
& marker
, pair
<string
, int>& entry
);
498 int cls_rgw_lc_rm_entry(librados::IoCtx
& io_ctx
, string
& oid
, pair
<string
, int>& entry
);
499 int cls_rgw_lc_set_entry(librados::IoCtx
& io_ctx
, string
& oid
, pair
<string
, int>& entry
);
500 int cls_rgw_lc_list(librados::IoCtx
& io_ctx
, string
& oid
,
501 const string
& marker
,
502 uint32_t max_entries
,
503 map
<string
, int>& entries
);
506 void cls_rgw_reshard_add(librados::ObjectWriteOperation
& op
, const cls_rgw_reshard_entry
& entry
);
507 int cls_rgw_reshard_list(librados::IoCtx
& io_ctx
, const string
& oid
, string
& marker
, uint32_t max
,
508 list
<cls_rgw_reshard_entry
>& entries
, bool* is_truncated
);
509 int cls_rgw_reshard_get(librados::IoCtx
& io_ctx
, const string
& oid
, cls_rgw_reshard_entry
& entry
);
510 int cls_rgw_reshard_get_head(librados::IoCtx
& io_ctx
, const string
& oid
, cls_rgw_reshard_entry
& entry
);
511 void cls_rgw_reshard_remove(librados::ObjectWriteOperation
& op
, const cls_rgw_reshard_entry
& entry
);
513 /* resharding attribute */
514 int cls_rgw_set_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
,
515 const cls_rgw_bucket_instance_entry
& entry
);
516 int cls_rgw_clear_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
);
517 void cls_rgw_guard_bucket_resharding(librados::ObjectOperation
& op
, int ret_err
);
518 int cls_rgw_get_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
,
519 cls_rgw_bucket_instance_entry
*entry
);