]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/rgw/cls_rgw_client.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / cls / rgw / cls_rgw_client.h
1 #ifndef CEPH_CLS_RGW_CLIENT_H
2 #define CEPH_CLS_RGW_CLIENT_H
3
4 #include "include/types.h"
5 #include "include/str_list.h"
6 #include "include/rados/librados.hpp"
7 #include "cls_rgw_types.h"
8 #include "cls_rgw_ops.h"
9 #include "common/RefCountedObj.h"
10 #include "include/compat.h"
11 #include "common/ceph_time.h"
12
13 // Forward declaration
14 class BucketIndexAioManager;
15
16 /*
17 * Bucket index AIO request argument, this is used to pass a argument
18 * to callback.
19 */
20 struct BucketIndexAioArg : public RefCountedObject {
21 BucketIndexAioArg(int _id, BucketIndexAioManager* _manager) :
22 id(_id), manager(_manager) {}
23 int id;
24 BucketIndexAioManager* manager;
25 };
26
27 /*
28 * This class manages AIO completions. This class is not completely thread-safe,
29 * methods like *get_next* is not thread-safe and is expected to be called from
30 * within one thread.
31 */
32 class BucketIndexAioManager {
33 private:
34 map<int, librados::AioCompletion*> pendings;
35 map<int, librados::AioCompletion*> completions;
36 map<int, string> pending_objs;
37 map<int, string> completion_objs;
38 int next;
39 Mutex lock;
40 Cond cond;
41 /*
42 * Callback implementation for AIO request.
43 */
44 static void bucket_index_op_completion_cb(void* cb, void* arg) {
45 BucketIndexAioArg* cb_arg = (BucketIndexAioArg*) arg;
46 cb_arg->manager->do_completion(cb_arg->id);
47 cb_arg->put();
48 }
49
50 /*
51 * Get next request ID. This method is not thread-safe.
52 *
53 * Return next request ID.
54 */
55 int get_next() { return next++; }
56
57 /*
58 * Add a new pending AIO completion instance.
59 *
60 * @param id - the request ID.
61 * @param completion - the AIO completion instance.
62 * @param oid - the object id associated with the object, if it is NULL, we don't
63 * track the object id per callback.
64 */
65 void add_pending(int id, librados::AioCompletion* completion, const string& oid) {
66 pendings[id] = completion;
67 pending_objs[id] = oid;
68 }
69 public:
70 /*
71 * Create a new instance.
72 */
73 BucketIndexAioManager() : next(0), lock("BucketIndexAioManager::lock") {}
74
75
76 /*
77 * Do completion for the given AIO request.
78 */
79 void do_completion(int id);
80
81 /*
82 * Wait for AIO completions.
83 *
84 * valid_ret_code - valid AIO return code.
85 * num_completions - number of completions.
86 * ret_code - return code of failed AIO.
87 * objs - a list of objects that has been finished the AIO.
88 *
89 * Return false if there is no pending AIO, true otherwise.
90 */
91 bool wait_for_completions(int valid_ret_code, int *num_completions, int *ret_code,
92 map<int, string> *objs);
93
94 /**
95 * Do aio read operation.
96 */
97 bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectReadOperation *op) {
98 Mutex::Locker l(lock);
99 BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
100 librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
101 int r = io_ctx.aio_operate(oid, c, (librados::ObjectReadOperation*)op, NULL);
102 if (r >= 0) {
103 add_pending(arg->id, c, oid);
104 } else {
105 c->release();
106 }
107 return r;
108 }
109
110 /**
111 * Do aio write operation.
112 */
113 bool aio_operate(librados::IoCtx& io_ctx, const string& oid, librados::ObjectWriteOperation *op) {
114 Mutex::Locker l(lock);
115 BucketIndexAioArg *arg = new BucketIndexAioArg(get_next(), this);
116 librados::AioCompletion *c = librados::Rados::aio_create_completion((void*)arg, NULL, bucket_index_op_completion_cb);
117 int r = io_ctx.aio_operate(oid, c, (librados::ObjectWriteOperation*)op);
118 if (r >= 0) {
119 add_pending(arg->id, c, oid);
120 } else {
121 c->release();
122 }
123 return r;
124 }
125 };
126
127 class RGWGetDirHeader_CB : public RefCountedObject {
128 public:
129 ~RGWGetDirHeader_CB() override {}
130 virtual void handle_response(int r, rgw_bucket_dir_header& header) = 0;
131 };
132
133 class BucketIndexShardsManager {
134 private:
135 // Per shard setting manager, for example, marker.
136 map<int, string> value_by_shards;
137 public:
138 const static string KEY_VALUE_SEPARATOR;
139 const static string SHARDS_SEPARATOR;
140
141 void add(int shard, const string& value) {
142 value_by_shards[shard] = value;
143 }
144
145 const string& get(int shard, const string& default_value) {
146 map<int, string>::iterator iter = value_by_shards.find(shard);
147 return (iter == value_by_shards.end() ? default_value : iter->second);
148 }
149
150 map<int, string>& get() {
151 return value_by_shards;
152 }
153
154 bool empty() {
155 return value_by_shards.empty();
156 }
157
158 void to_string(string *out) const {
159 if (!out) {
160 return;
161 }
162 out->clear();
163 map<int, string>::const_iterator iter = value_by_shards.begin();
164 for (; iter != value_by_shards.end(); ++iter) {
165 if (out->length()) {
166 // Not the first item, append a separator first
167 out->append(SHARDS_SEPARATOR);
168 }
169 char buf[16];
170 snprintf(buf, sizeof(buf), "%d", iter->first);
171 out->append(buf);
172 out->append(KEY_VALUE_SEPARATOR);
173 out->append(iter->second);
174 }
175 }
176
177 static bool is_shards_marker(const string& marker) {
178 return marker.find(KEY_VALUE_SEPARATOR) != string::npos;
179 }
180
181 /*
182 * convert from string. There are two options of how the string looks like:
183 *
184 * 1. Single shard, no shard id specified, e.g. 000001.23.1
185 *
186 * for this case, if passed shard_id >= 0, use this shard id, otherwise assume that it's a
187 * bucket with no shards.
188 *
189 * 2. One or more shards, shard id specified for each shard, e.g., 0#00002.12,1#00003.23.2
190 *
191 */
192 int from_string(const string& composed_marker, int shard_id) {
193 value_by_shards.clear();
194 vector<string> shards;
195 get_str_vec(composed_marker, SHARDS_SEPARATOR.c_str(), shards);
196 if (shards.size() > 1 && shard_id >= 0) {
197 return -EINVAL;
198 }
199 vector<string>::const_iterator iter = shards.begin();
200 for (; iter != shards.end(); ++iter) {
201 size_t pos = iter->find(KEY_VALUE_SEPARATOR);
202 if (pos == string::npos) {
203 if (!value_by_shards.empty()) {
204 return -EINVAL;
205 }
206 if (shard_id < 0) {
207 add(0, *iter);
208 } else {
209 add(shard_id, *iter);
210 }
211 return 0;
212 }
213 string shard_str = iter->substr(0, pos);
214 string err;
215 int shard = (int)strict_strtol(shard_str.c_str(), 10, &err);
216 if (!err.empty()) {
217 return -EINVAL;
218 }
219 add(shard, iter->substr(pos + 1));
220 }
221 return 0;
222 }
223 };
224
225 /* bucket index */
226 void cls_rgw_bucket_init(librados::ObjectWriteOperation& o);
227
228 class CLSRGWConcurrentIO {
229 protected:
230 librados::IoCtx& io_ctx;
231 map<int, string>& objs_container;
232 map<int, string>::iterator iter;
233 uint32_t max_aio;
234 BucketIndexAioManager manager;
235
236 virtual int issue_op(int shard_id, const string& oid) = 0;
237
238 virtual void cleanup() {}
239 virtual int valid_ret_code() { return 0; }
240 // Return true if multiple rounds of OPs might be needed, this happens when
241 // OP needs to be re-send until a certain code is returned.
242 virtual bool need_multiple_rounds() { return false; }
243 // Add a new object to the end of the container.
244 virtual void add_object(int shard, const string& oid) {}
245 virtual void reset_container(map<int, string>& objs) {}
246
247 public:
248 CLSRGWConcurrentIO(librados::IoCtx& ioc, map<int, string>& _objs_container,
249 uint32_t _max_aio) : io_ctx(ioc), objs_container(_objs_container), max_aio(_max_aio) {}
250 virtual ~CLSRGWConcurrentIO() {}
251
252 int operator()() {
253 int ret = 0;
254 iter = objs_container.begin();
255 for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
256 ret = issue_op(iter->first, iter->second);
257 if (ret < 0)
258 break;
259 }
260
261 int num_completions, r = 0;
262 map<int, string> objs;
263 map<int, string> *pobjs = (need_multiple_rounds() ? &objs : NULL);
264 while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, pobjs)) {
265 if (r >= 0 && ret >= 0) {
266 for(int i = 0; i < num_completions && iter != objs_container.end(); ++i, ++iter) {
267 int issue_ret = issue_op(iter->first, iter->second);
268 if(issue_ret < 0) {
269 ret = issue_ret;
270 break;
271 }
272 }
273 } else if (ret >= 0) {
274 ret = r;
275 }
276 if (need_multiple_rounds() && iter == objs_container.end() && !objs.empty()) {
277 // For those objects which need another round, use them to reset
278 // the container
279 reset_container(objs);
280 }
281 }
282
283 if (ret < 0) {
284 cleanup();
285 }
286 return ret;
287 }
288 };
289
290 class CLSRGWIssueBucketIndexInit : public CLSRGWConcurrentIO {
291 protected:
292 int issue_op(int shard_id, const string& oid) override;
293 int valid_ret_code() override { return -EEXIST; }
294 void cleanup() override;
295 public:
296 CLSRGWIssueBucketIndexInit(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
297 uint32_t _max_aio) :
298 CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio) {}
299 };
300
301 class CLSRGWIssueSetTagTimeout : public CLSRGWConcurrentIO {
302 uint64_t tag_timeout;
303 protected:
304 int issue_op(int shard_id, const string& oid) override;
305 public:
306 CLSRGWIssueSetTagTimeout(librados::IoCtx& ioc, map<int, string>& _bucket_objs,
307 uint32_t _max_aio, uint64_t _tag_timeout) :
308 CLSRGWConcurrentIO(ioc, _bucket_objs, _max_aio), tag_timeout(_tag_timeout) {}
309 };
310
311 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, bool absolute,
312 const map<uint8_t, rgw_bucket_category_stats>& stats);
313
314 void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
315 const cls_rgw_obj_key& key, const string& locator, bool log_op,
316 uint16_t bilog_op);
317
318 void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, RGWModifyOp op, string& tag,
319 rgw_bucket_entry_ver& ver,
320 const cls_rgw_obj_key& key,
321 rgw_bucket_dir_entry_meta& dir_meta,
322 list<cls_rgw_obj_key> *remove_objs, bool log_op,
323 uint16_t bilog_op);
324
325 void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes);
326 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const string& attr);
327 void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist);
328 void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const ceph::real_time& mtime, bool high_precision_time, RGWCheckMTimeType type);
329
330 int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
331 BIIndexType index_type, cls_rgw_obj_key& key,
332 rgw_cls_bi_entry *entry);
333 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, rgw_cls_bi_entry& entry);
334 void cls_rgw_bi_put(librados::ObjectWriteOperation& op, const string oid, rgw_cls_bi_entry& entry);
335 int cls_rgw_bi_list(librados::IoCtx& io_ctx, const string oid,
336 const string& name, const string& marker, uint32_t max,
337 list<rgw_cls_bi_entry> *entries, bool *is_truncated);
338
339
340 int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, bufferlist& olh_tag,
341 bool delete_marker, const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
342 uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op);
343 int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, const cls_rgw_obj_key& key, const string& op_tag,
344 const string& olh_tag, uint64_t olh_epoch, bool log_op);
345 int cls_rgw_get_olh_log(librados::IoCtx& io_ctx, string& oid, librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker,
346 const string& olh_tag,
347 map<uint64_t, vector<struct rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
348 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag);
349 int cls_rgw_clear_olh(librados::IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag);
350
351 /**
352 * List the bucket with the starting object and filter prefix.
353 * NOTE: this method do listing requests for each bucket index shards identified by
354 * the keys of the *list_results* map, which means the map should be popludated
355 * by the caller to fill with each bucket index object id.
356 *
357 * io_ctx - IO context for rados.
358 * start_obj - marker for the listing.
359 * filter_prefix - filter prefix.
360 * num_entries - number of entries to request for each object (note the total
361 * amount of entries returned depends on the number of shardings).
362 * list_results - the list results keyed by bucket index object id.
363 * max_aio - the maximum number of AIO (for throttling).
364 *
365 * Return 0 on success, a failure code otherwise.
366 */
367
368 class CLSRGWIssueBucketList : public CLSRGWConcurrentIO {
369 cls_rgw_obj_key start_obj;
370 string filter_prefix;
371 uint32_t num_entries;
372 bool list_versions;
373 map<int, rgw_cls_list_ret>& result;
374 protected:
375 int issue_op(int shard_id, const string& oid) override;
376 public:
377 CLSRGWIssueBucketList(librados::IoCtx& io_ctx, const cls_rgw_obj_key& _start_obj,
378 const string& _filter_prefix, uint32_t _num_entries,
379 bool _list_versions,
380 map<int, string>& oids,
381 map<int, struct rgw_cls_list_ret>& list_results,
382 uint32_t max_aio) :
383 CLSRGWConcurrentIO(io_ctx, oids, max_aio),
384 start_obj(_start_obj), filter_prefix(_filter_prefix), num_entries(_num_entries), list_versions(_list_versions), result(list_results) {}
385 };
386
387 class CLSRGWIssueBILogList : public CLSRGWConcurrentIO {
388 map<int, struct cls_rgw_bi_log_list_ret>& result;
389 BucketIndexShardsManager& marker_mgr;
390 uint32_t max;
391 protected:
392 int issue_op(int shard_id, const string& oid) override;
393 public:
394 CLSRGWIssueBILogList(librados::IoCtx& io_ctx, BucketIndexShardsManager& _marker_mgr, uint32_t _max,
395 map<int, string>& oids,
396 map<int, struct cls_rgw_bi_log_list_ret>& bi_log_lists, uint32_t max_aio) :
397 CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(bi_log_lists),
398 marker_mgr(_marker_mgr), max(_max) {}
399 };
400
401 class CLSRGWIssueBILogTrim : public CLSRGWConcurrentIO {
402 BucketIndexShardsManager& start_marker_mgr;
403 BucketIndexShardsManager& end_marker_mgr;
404 protected:
405 int issue_op(int shard_id, const string& oid) override;
406 // Trim until -ENODATA is returned.
407 int valid_ret_code() override { return -ENODATA; }
408 bool need_multiple_rounds() override { return true; }
409 void add_object(int shard, const string& oid) override { objs_container[shard] = oid; }
410 void reset_container(map<int, string>& objs) override {
411 objs_container.swap(objs);
412 iter = objs_container.begin();
413 objs.clear();
414 }
415 public:
416 CLSRGWIssueBILogTrim(librados::IoCtx& io_ctx, BucketIndexShardsManager& _start_marker_mgr,
417 BucketIndexShardsManager& _end_marker_mgr, map<int, string>& _bucket_objs, uint32_t max_aio) :
418 CLSRGWConcurrentIO(io_ctx, _bucket_objs, max_aio),
419 start_marker_mgr(_start_marker_mgr), end_marker_mgr(_end_marker_mgr) {}
420 };
421
422 /**
423 * Check the bucket index.
424 *
425 * io_ctx - IO context for rados.
426 * bucket_objs_ret - check result for all shards.
427 * max_aio - the maximum number of AIO (for throttling).
428 *
429 * Return 0 on success, a failure code otherwise.
430 */
431 class CLSRGWIssueBucketCheck : public CLSRGWConcurrentIO /*<map<string, struct rgw_cls_check_index_ret> >*/ {
432 map<int, struct rgw_cls_check_index_ret>& result;
433 protected:
434 int issue_op(int shard_id, const string& oid) override;
435 public:
436 CLSRGWIssueBucketCheck(librados::IoCtx& ioc, map<int, string>& oids, map<int, struct rgw_cls_check_index_ret>& bucket_objs_ret,
437 uint32_t _max_aio) :
438 CLSRGWConcurrentIO(ioc, oids, _max_aio), result(bucket_objs_ret) {}
439 };
440
441 class CLSRGWIssueBucketRebuild : public CLSRGWConcurrentIO {
442 protected:
443 int issue_op(int shard_id, const string& oid) override;
444 public:
445 CLSRGWIssueBucketRebuild(librados::IoCtx& io_ctx, map<int, string>& bucket_objs,
446 uint32_t max_aio) : CLSRGWConcurrentIO(io_ctx, bucket_objs, max_aio) {}
447 };
448
449 class CLSRGWIssueGetDirHeader : public CLSRGWConcurrentIO {
450 map<int, rgw_cls_list_ret>& result;
451 protected:
452 int issue_op(int shard_id, const string& oid) override;
453 public:
454 CLSRGWIssueGetDirHeader(librados::IoCtx& io_ctx, map<int, string>& oids, map<int, rgw_cls_list_ret>& dir_headers,
455 uint32_t max_aio) :
456 CLSRGWConcurrentIO(io_ctx, oids, max_aio), result(dir_headers) {}
457 };
458
459 int cls_rgw_get_dir_header_async(librados::IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx);
460
461 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates);
462
463 void cls_rgw_suggest_changes(librados::ObjectWriteOperation& o, bufferlist& updates);
464
465 /* usage logging */
466 int cls_rgw_usage_log_read(librados::IoCtx& io_ctx, string& oid, string& user,
467 uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
468 string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
469 bool *is_truncated);
470
471 void cls_rgw_usage_log_trim(librados::ObjectWriteOperation& op, string& user,
472 uint64_t start_epoch, uint64_t end_epoch);
473
474 void cls_rgw_usage_log_add(librados::ObjectWriteOperation& op, rgw_usage_log_info& info);
475
476 /* garbage collection */
477 void cls_rgw_gc_set_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info);
478 void cls_rgw_gc_defer_entry(librados::ObjectWriteOperation& op, uint32_t expiration_secs, const string& tag);
479
480 int cls_rgw_gc_list(librados::IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
481 list<cls_rgw_gc_obj_info>& entries, bool *truncated);
482
483 void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const list<string>& tags);
484
485 /* lifecycle */
486 int cls_rgw_lc_get_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head);
487 int cls_rgw_lc_put_head(librados::IoCtx& io_ctx, string& oid, cls_rgw_lc_obj_head& head);
488 int cls_rgw_lc_get_next_entry(librados::IoCtx& io_ctx, string& oid, string& marker, pair<string, int>& entry);
489 int cls_rgw_lc_rm_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry);
490 int cls_rgw_lc_set_entry(librados::IoCtx& io_ctx, string& oid, pair<string, int>& entry);
491 int cls_rgw_lc_list(librados::IoCtx& io_ctx, string& oid,
492 const string& marker,
493 uint32_t max_entries,
494 map<string, int>& entries);
495
496
497
498
499
500
501 #endif