1 #ifndef CEPH_CLS_RGW_CLIENT_H
2 #define CEPH_CLS_RGW_CLIENT_H
4 #include "include/types.h"
5 #include "include/str_list.h"
6 #include "include/rados/librados.hpp"
7 #include "cls_rgw_types.h"
8 #include "cls_rgw_ops.h"
9 #include "common/RefCountedObj.h"
10 #include "include/compat.h"
11 #include "common/ceph_time.h"
13 // Forward declaration
14 class BucketIndexAioManager
;
17 * Bucket index AIO request argument, this is used to pass a argument
20 struct BucketIndexAioArg
: public RefCountedObject
{
21 BucketIndexAioArg(int _id
, BucketIndexAioManager
* _manager
) :
22 id(_id
), manager(_manager
) {}
24 BucketIndexAioManager
* manager
;
28 * This class manages AIO completions. This class is not completely thread-safe,
29 * methods like *get_next* is not thread-safe and is expected to be called from
32 class BucketIndexAioManager
{
34 map
<int, librados::AioCompletion
*> pendings
;
35 map
<int, librados::AioCompletion
*> completions
;
36 map
<int, string
> pending_objs
;
37 map
<int, string
> completion_objs
;
42 * Callback implementation for AIO request.
44 static void bucket_index_op_completion_cb(void* cb
, void* arg
) {
45 BucketIndexAioArg
* cb_arg
= (BucketIndexAioArg
*) arg
;
46 cb_arg
->manager
->do_completion(cb_arg
->id
);
51 * Get next request ID. This method is not thread-safe.
53 * Return next request ID.
55 int get_next() { return next
++; }
58 * Add a new pending AIO completion instance.
60 * @param id - the request ID.
61 * @param completion - the AIO completion instance.
62 * @param oid - the object id associated with the object, if it is NULL, we don't
63 * track the object id per callback.
65 void add_pending(int id
, librados::AioCompletion
* completion
, const string
& oid
) {
66 pendings
[id
] = completion
;
67 pending_objs
[id
] = oid
;
71 * Create a new instance.
73 BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
77 * Do completion for the given AIO request.
79 void do_completion(int id
);
82 * Wait for AIO completions.
84 * valid_ret_code - valid AIO return code.
85 * num_completions - number of completions.
86 * ret_code - return code of failed AIO.
87 * objs - a list of objects that has been finished the AIO.
89 * Return false if there is no pending AIO, true otherwise.
91 bool wait_for_completions(int valid_ret_code
, int *num_completions
, int *ret_code
,
92 map
<int, string
> *objs
);
95 * Do aio read operation.
97 bool aio_operate(librados::IoCtx
& io_ctx
, const string
& oid
, librados::ObjectReadOperation
*op
) {
98 Mutex::Locker
l(lock
);
99 BucketIndexAioArg
*arg
= new BucketIndexAioArg(get_next(), this);
100 librados::AioCompletion
*c
= librados::Rados::aio_create_completion((void*)arg
, NULL
, bucket_index_op_completion_cb
);
101 int r
= io_ctx
.aio_operate(oid
, c
, (librados::ObjectReadOperation
*)op
, NULL
);
103 add_pending(arg
->id
, c
, oid
);
111 * Do aio write operation.
113 bool aio_operate(librados::IoCtx
& io_ctx
, const string
& oid
, librados::ObjectWriteOperation
*op
) {
114 Mutex::Locker
l(lock
);
115 BucketIndexAioArg
*arg
= new BucketIndexAioArg(get_next(), this);
116 librados::AioCompletion
*c
= librados::Rados::aio_create_completion((void*)arg
, NULL
, bucket_index_op_completion_cb
);
117 int r
= io_ctx
.aio_operate(oid
, c
, (librados::ObjectWriteOperation
*)op
);
119 add_pending(arg
->id
, c
, oid
);
127 class RGWGetDirHeader_CB
: public RefCountedObject
{
129 ~RGWGetDirHeader_CB() override
{}
130 virtual void handle_response(int r
, rgw_bucket_dir_header
& header
) = 0;
133 class BucketIndexShardsManager
{
135 // Per shard setting manager, for example, marker.
136 map
<int, string
> value_by_shards
;
138 const static string KEY_VALUE_SEPARATOR
;
139 const static string SHARDS_SEPARATOR
;
141 void add(int shard
, const string
& value
) {
142 value_by_shards
[shard
] = value
;
145 const string
& get(int shard
, const string
& default_value
) {
146 map
<int, string
>::iterator iter
= value_by_shards
.find(shard
);
147 return (iter
== value_by_shards
.end() ? default_value
: iter
->second
);
150 map
<int, string
>& get() {
151 return value_by_shards
;
155 return value_by_shards
.empty();
158 void to_string(string
*out
) const {
163 map
<int, string
>::const_iterator iter
= value_by_shards
.begin();
164 for (; iter
!= value_by_shards
.end(); ++iter
) {
166 // Not the first item, append a separator first
167 out
->append(SHARDS_SEPARATOR
);
170 snprintf(buf
, sizeof(buf
), "%d", iter
->first
);
172 out
->append(KEY_VALUE_SEPARATOR
);
173 out
->append(iter
->second
);
177 static bool is_shards_marker(const string
& marker
) {
178 return marker
.find(KEY_VALUE_SEPARATOR
) != string::npos
;
182 * convert from string. There are two options of how the string looks like:
184 * 1. Single shard, no shard id specified, e.g. 000001.23.1
186 * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
187 * bucket with no shards.
189 * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
192 int from_string(const string
& composed_marker
, int shard_id
) {
193 value_by_shards
.clear();
194 vector
<string
> shards
;
195 get_str_vec(composed_marker
, SHARDS_SEPARATOR
.c_str(), shards
);
196 if (shards
.size() > 1 && shard_id
>= 0) {
199 vector
<string
>::const_iterator iter
= shards
.begin();
200 for (; iter
!= shards
.end(); ++iter
) {
201 size_t pos
= iter
->find(KEY_VALUE_SEPARATOR
);
202 if (pos
== string::npos
) {
203 if (!value_by_shards
.empty()) {
209 add(shard_id
, *iter
);
213 string shard_str
= iter
->substr(0, pos
);
215 int shard
= (int)strict_strtol(shard_str
.c_str(), 10, &err
);
219 add(shard
, iter
->substr(pos
+ 1));
226 void cls_rgw_bucket_init(librados::ObjectWriteOperation
& o
);
228 class CLSRGWConcurrentIO
{
230 librados::IoCtx
& io_ctx
;
231 map
<int, string
>& objs_container
;
232 map
<int, string
>::iterator iter
;
234 BucketIndexAioManager manager
;
236 virtual int issue_op(int shard_id
, const string
& oid
) = 0;
238 virtual void cleanup() {}
239 virtual int valid_ret_code() { return 0; }
240 // Return true if multiple rounds of OPs might be needed, this happens when
241 // OP needs to be re-send until a certain code is returned.
242 virtual bool need_multiple_rounds() { return false; }
243 // Add a new object to the end of the container.
244 virtual void add_object(int shard
, const string
& oid
) {}
245 virtual void reset_container(map
<int, string
>& objs
) {}
248 CLSRGWConcurrentIO(librados::IoCtx
& ioc
, map
<int, string
>& _objs_container
,
249 uint32_t _max_aio
) : io_ctx(ioc
), objs_container(_objs_container
), max_aio(_max_aio
) {}
250 virtual ~CLSRGWConcurrentIO() {}
254 iter
= objs_container
.begin();
255 for (; iter
!= objs_container
.end() && max_aio
-- > 0; ++iter
) {
256 ret
= issue_op(iter
->first
, iter
->second
);
261 int num_completions
, r
= 0;
262 map
<int, string
> objs
;
263 map
<int, string
> *pobjs
= (need_multiple_rounds() ? &objs
: NULL
);
264 while (manager
.wait_for_completions(valid_ret_code(), &num_completions
, &r
, pobjs
)) {
265 if (r
>= 0 && ret
>= 0) {
266 for(int i
= 0; i
< num_completions
&& iter
!= objs_container
.end(); ++i
, ++iter
) {
267 int issue_ret
= issue_op(iter
->first
, iter
->second
);
273 } else if (ret
>= 0) {
276 if (need_multiple_rounds() && iter
== objs_container
.end() && !objs
.empty()) {
277 // For those objects which need another round, use them to reset
279 reset_container(objs
);
290 class CLSRGWIssueBucketIndexInit
: public CLSRGWConcurrentIO
{
292 int issue_op(int shard_id
, const string
& oid
) override
;
293 int valid_ret_code() override
{ return -EEXIST
; }
294 void cleanup() override
;
296 CLSRGWIssueBucketIndexInit(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
298 CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
) {}
301 class CLSRGWIssueSetTagTimeout
: public CLSRGWConcurrentIO
{
302 uint64_t tag_timeout
;
304 int issue_op(int shard_id
, const string
& oid
) override
;
306 CLSRGWIssueSetTagTimeout(librados::IoCtx
& ioc
, map
<int, string
>& _bucket_objs
,
307 uint32_t _max_aio
, uint64_t _tag_timeout
) :
308 CLSRGWConcurrentIO(ioc
, _bucket_objs
, _max_aio
), tag_timeout(_tag_timeout
) {}
311 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation
& o
, bool absolute
,
312 const map
<uint8_t, rgw_bucket_category_stats
>& stats
);
314 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation
& o
, RGWModifyOp op
, string
& tag
,
315 const cls_rgw_obj_key
& key
, const string
& locator
, bool log_op
,
318 void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation
& o
, RGWModifyOp op
, string
& tag
,
319 rgw_bucket_entry_ver
& ver
,
320 const cls_rgw_obj_key
& key
,
321 rgw_bucket_dir_entry_meta
& dir_meta
,
322 list
<cls_rgw_obj_key
> *remove_objs
, bool log_op
,
325 void cls_rgw_remove_obj(librados::ObjectWriteOperation
& o
, list
<string
>& keep_attr_prefixes
);
326 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation
& o
, const string
& attr
);
327 void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation
& o
, const string
& prefix
, bool fail_if_exist
);
328 void cls_rgw_obj_check_mtime(librados::ObjectOperation
& o
, const ceph::real_time
& mtime
, bool high_precision_time
, RGWCheckMTimeType type
);
330 int cls_rgw_bi_get(librados::IoCtx
& io_ctx
, const string oid
,
331 BIIndexType index_type
, cls_rgw_obj_key
& key
,
332 rgw_cls_bi_entry
*entry
);
333 int cls_rgw_bi_put(librados::IoCtx
& io_ctx
, const string oid
, rgw_cls_bi_entry
& entry
);
334 void cls_rgw_bi_put(librados::ObjectWriteOperation
& op
, const string oid
, rgw_cls_bi_entry
& entry
);
335 int cls_rgw_bi_list(librados::IoCtx
& io_ctx
, const string oid
,
336 const string
& name
, const string
& marker
, uint32_t max
,
337 list
<rgw_cls_bi_entry
> *entries
, bool *is_truncated
);
340 int cls_rgw_bucket_link_olh(librados::IoCtx
& io_ctx
, const string
& oid
, const cls_rgw_obj_key
& key
, bufferlist
& olh_tag
,
341 bool delete_marker
, const string
& op_tag
, struct rgw_bucket_dir_entry_meta
*meta
,
342 uint64_t olh_epoch
, ceph::real_time unmod_since
, bool high_precision_time
, bool log_op
);
343 int cls_rgw_bucket_unlink_instance(librados::IoCtx
& io_ctx
, const string
& oid
, const cls_rgw_obj_key
& key
, const string
& op_tag
,
344 const string
& olh_tag
, uint64_t olh_epoch
, bool log_op
);
345 int cls_rgw_get_olh_log(librados::IoCtx
& io_ctx
, string
& oid
, librados::ObjectReadOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver_marker
,
346 const string
& olh_tag
,
347 map
<uint64_t, vector
<struct rgw_bucket_olh_log_entry
> > *log
, bool *is_truncated
);
348 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver
, const string
& olh_tag
);
349 int cls_rgw_clear_olh(librados::IoCtx
& io_ctx
, string
& oid
, const cls_rgw_obj_key
& olh
, const string
& olh_tag
);
352 * List the bucket with the starting object and filter prefix.
353 * NOTE: this method do listing requests for each bucket index shards identified by
354 * the keys of the *list_results* map, which means the map should be popludated
355 * by the caller to fill with each bucket index object id.
357 * io_ctx - IO context for rados.
358 * start_obj - marker for the listing.
359 * filter_prefix - filter prefix.
360 * num_entries - number of entries to request for each object (note the total
361 * amount of entries returned depends on the number of shardings).
362 * list_results - the list results keyed by bucket index object id.
363 * max_aio - the maximum number of AIO (for throttling).
365 * Return 0 on success, a failure code otherwise.
368 class CLSRGWIssueBucketList
: public CLSRGWConcurrentIO
{
369 cls_rgw_obj_key start_obj
;
370 string filter_prefix
;
371 uint32_t num_entries
;
373 map
<int, rgw_cls_list_ret
>& result
;
375 int issue_op(int shard_id
, const string
& oid
) override
;
377 CLSRGWIssueBucketList(librados::IoCtx
& io_ctx
, const cls_rgw_obj_key
& _start_obj
,
378 const string
& _filter_prefix
, uint32_t _num_entries
,
380 map
<int, string
>& oids
,
381 map
<int, struct rgw_cls_list_ret
>& list_results
,
383 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
),
384 start_obj(_start_obj
), filter_prefix(_filter_prefix
), num_entries(_num_entries
), list_versions(_list_versions
), result(list_results
) {}
387 class CLSRGWIssueBILogList
: public CLSRGWConcurrentIO
{
388 map
<int, struct cls_rgw_bi_log_list_ret
>& result
;
389 BucketIndexShardsManager
& marker_mgr
;
392 int issue_op(int shard_id
, const string
& oid
) override
;
394 CLSRGWIssueBILogList(librados::IoCtx
& io_ctx
, BucketIndexShardsManager
& _marker_mgr
, uint32_t _max
,
395 map
<int, string
>& oids
,
396 map
<int, struct cls_rgw_bi_log_list_ret
>& bi_log_lists
, uint32_t max_aio
) :
397 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
), result(bi_log_lists
),
398 marker_mgr(_marker_mgr
), max(_max
) {}
401 class CLSRGWIssueBILogTrim
: public CLSRGWConcurrentIO
{
402 BucketIndexShardsManager
& start_marker_mgr
;
403 BucketIndexShardsManager
& end_marker_mgr
;
405 int issue_op(int shard_id
, const string
& oid
) override
;
406 // Trim until -ENODATA is returned.
407 int valid_ret_code() override
{ return -ENODATA
; }
408 bool need_multiple_rounds() override
{ return true; }
409 void add_object(int shard
, const string
& oid
) override
{ objs_container
[shard
] = oid
; }
410 void reset_container(map
<int, string
>& objs
) override
{
411 objs_container
.swap(objs
);
412 iter
= objs_container
.begin();
416 CLSRGWIssueBILogTrim(librados::IoCtx
& io_ctx
, BucketIndexShardsManager
& _start_marker_mgr
,
417 BucketIndexShardsManager
& _end_marker_mgr
, map
<int, string
>& _bucket_objs
, uint32_t max_aio
) :
418 CLSRGWConcurrentIO(io_ctx
, _bucket_objs
, max_aio
),
419 start_marker_mgr(_start_marker_mgr
), end_marker_mgr(_end_marker_mgr
) {}
423 * Check the bucket index.
425 * io_ctx - IO context for rados.
426 * bucket_objs_ret - check result for all shards.
427 * max_aio - the maximum number of AIO (for throttling).
429 * Return 0 on success, a failure code otherwise.
431 class CLSRGWIssueBucketCheck
: public CLSRGWConcurrentIO
/*<map<string, struct rgw_cls_check_index_ret> >*/ {
432 map
<int, struct rgw_cls_check_index_ret
>& result
;
434 int issue_op(int shard_id
, const string
& oid
) override
;
436 CLSRGWIssueBucketCheck(librados::IoCtx
& ioc
, map
<int, string
>& oids
, map
<int, struct rgw_cls_check_index_ret
>& bucket_objs_ret
,
438 CLSRGWConcurrentIO(ioc
, oids
, _max_aio
), result(bucket_objs_ret
) {}
441 class CLSRGWIssueBucketRebuild
: public CLSRGWConcurrentIO
{
443 int issue_op(int shard_id
, const string
& oid
) override
;
445 CLSRGWIssueBucketRebuild(librados::IoCtx
& io_ctx
, map
<int, string
>& bucket_objs
,
446 uint32_t max_aio
) : CLSRGWConcurrentIO(io_ctx
, bucket_objs
, max_aio
) {}
449 class CLSRGWIssueGetDirHeader
: public CLSRGWConcurrentIO
{
450 map
<int, rgw_cls_list_ret
>& result
;
452 int issue_op(int shard_id
, const string
& oid
) override
;
454 CLSRGWIssueGetDirHeader(librados::IoCtx
& io_ctx
, map
<int, string
>& oids
, map
<int, rgw_cls_list_ret
>& dir_headers
,
456 CLSRGWConcurrentIO(io_ctx
, oids
, max_aio
), result(dir_headers
) {}
459 int cls_rgw_get_dir_header_async(librados::IoCtx
& io_ctx
, string
& oid
, RGWGetDirHeader_CB
*ctx
);
461 void cls_rgw_encode_suggestion(char op
, rgw_bucket_dir_entry
& dirent
, bufferlist
& updates
);
463 void cls_rgw_suggest_changes(librados::ObjectWriteOperation
& o
, bufferlist
& updates
);
466 int cls_rgw_usage_log_read(librados::IoCtx
& io_ctx
, string
& oid
, string
& user
,
467 uint64_t start_epoch
, uint64_t end_epoch
, uint32_t max_entries
,
468 string
& read_iter
, map
<rgw_user_bucket
, rgw_usage_log_entry
>& usage
,
471 void cls_rgw_usage_log_trim(librados::ObjectWriteOperation
& op
, string
& user
,
472 uint64_t start_epoch
, uint64_t end_epoch
);
474 void cls_rgw_usage_log_add(librados::ObjectWriteOperation
& op
, rgw_usage_log_info
& info
);
476 /* garbage collection */
477 void cls_rgw_gc_set_entry(librados::ObjectWriteOperation
& op
, uint32_t expiration_secs
, cls_rgw_gc_obj_info
& info
);
478 void cls_rgw_gc_defer_entry(librados::ObjectWriteOperation
& op
, uint32_t expiration_secs
, const string
& tag
);
480 int cls_rgw_gc_list(librados::IoCtx
& io_ctx
, string
& oid
, string
& marker
, uint32_t max
, bool expired_only
,
481 list
<cls_rgw_gc_obj_info
>& entries
, bool *truncated
);
483 void cls_rgw_gc_remove(librados::ObjectWriteOperation
& op
, const list
<string
>& tags
);
486 int cls_rgw_lc_get_head(librados::IoCtx
& io_ctx
, string
& oid
, cls_rgw_lc_obj_head
& head
);
487 int cls_rgw_lc_put_head(librados::IoCtx
& io_ctx
, string
& oid
, cls_rgw_lc_obj_head
& head
);
488 int cls_rgw_lc_get_next_entry(librados::IoCtx
& io_ctx
, string
& oid
, string
& marker
, pair
<string
, int>& entry
);
489 int cls_rgw_lc_rm_entry(librados::IoCtx
& io_ctx
, string
& oid
, pair
<string
, int>& entry
);
490 int cls_rgw_lc_set_entry(librados::IoCtx
& io_ctx
, string
& oid
, pair
<string
, int>& entry
);
491 int cls_rgw_lc_list(librados::IoCtx
& io_ctx
, string
& oid
,
492 const string
& marker
,
493 uint32_t max_entries
,
494 map
<string
, int>& entries
);