]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/rgw/cls_rgw_client.cc
4667de8994d5a70d98348ddc497065d9e2d7d22f
[ceph.git] / ceph / src / cls / rgw / cls_rgw_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include "cls/rgw/cls_rgw_const.h"
7 #include "cls/rgw/cls_rgw_client.h"
8
9 #include "common/debug.h"
10
11 using std::list;
12 using std::map;
13 using std::pair;
14 using std::string;
15 using std::vector;
16
17 using ceph::real_time;
18
19 using namespace librados;
20
21 const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
22 const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
23
24
25 int CLSRGWConcurrentIO::operator()() {
26 int ret = 0;
27 iter = objs_container.begin();
28 for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
29 ret = issue_op(iter->first, iter->second);
30 if (ret < 0)
31 break;
32 }
33
34 int num_completions = 0, r = 0;
35 std::map<int, std::string> completed_objs;
36 std::map<int, std::string> retry_objs;
37 while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
38 need_multiple_rounds() ? &completed_objs : nullptr,
39 !need_multiple_rounds() ? &retry_objs : nullptr)) {
40 if (r >= 0 && ret >= 0) {
41 for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
42 int issue_ret = issue_op(iter->first, iter->second);
43 if (issue_ret < 0) {
44 ret = issue_ret;
45 break;
46 }
47 }
48 } else if (ret >= 0) {
49 ret = r;
50 }
51
52 // if we're at the end with this round, see if another round is needed
53 if (iter == objs_container.end()) {
54 if (need_multiple_rounds() && !completed_objs.empty()) {
55 // For those objects which need another round, use them to reset
56 // the container
57 reset_container(completed_objs);
58 iter = objs_container.begin();
59 } else if (! need_multiple_rounds() && !retry_objs.empty()) {
60 reset_container(retry_objs);
61 iter = objs_container.begin();
62 }
63
64 // re-issue ops if container was reset above (i.e., iter !=
65 // objs_container.end()); if it was not reset above (i.e., iter
66 // == objs_container.end()) the loop will exit immediately
67 // without iterating
68 for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
69 int issue_ret = issue_op(iter->first, iter->second);
70 if (issue_ret < 0) {
71 ret = issue_ret;
72 break;
73 }
74 }
75 }
76 }
77
78 if (ret < 0) {
79 cleanup();
80 }
81 return ret;
82 } // CLSRGWConcurrintIO::operator()()
83
84
85 /**
86 * This class represents the bucket index object operation callback context.
87 */
88 template <typename T>
89 class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
90 private:
91 T *data;
92 int *ret_code;
93 public:
94 ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
95 ~ClsBucketIndexOpCtx() override {}
96 void handle_completion(int r, bufferlist& outbl) override {
97 // if successful, or we're asked for a retry, copy result into
98 // destination (*data)
99 if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
100 try {
101 auto iter = outbl.cbegin();
102 decode((*data), iter);
103 } catch (ceph::buffer::error& err) {
104 r = -EIO;
105 }
106 }
107 if (ret_code) {
108 *ret_code = r;
109 }
110 }
111 };
112
113 void BucketIndexAioManager::do_completion(const int request_id) {
114 std::lock_guard l{lock};
115
116 auto iter = pendings.find(request_id);
117 ceph_assert(iter != pendings.end());
118 completions[request_id] = iter->second;
119 pendings.erase(iter);
120
121 // If the caller needs a list of finished objects, store them
122 // for further processing
123 auto miter = pending_objs.find(request_id);
124 if (miter != pending_objs.end()) {
125 completion_objs.emplace(request_id, miter->second);
126 pending_objs.erase(miter);
127 }
128
129 cond.notify_all();
130 }
131
132 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
133 int *num_completions,
134 int *ret_code,
135 std::map<int, std::string> *completed_objs,
136 std::map<int, std::string> *retry_objs)
137 {
138 std::unique_lock locker{lock};
139 if (pendings.empty() && completions.empty()) {
140 return false;
141 }
142
143 if (completions.empty()) {
144 // Wait for AIO completion
145 cond.wait(locker);
146 }
147
148 // Clear the completed AIOs
149 auto iter = completions.begin();
150 for (; iter != completions.end(); ++iter) {
151 int r = iter->second->get_return_value();
152
153 // see if we may need to copy completions or retries
154 if (completed_objs || retry_objs) {
155 auto liter = completion_objs.find(iter->first);
156 if (liter != completion_objs.end()) {
157 if (completed_objs && r == 0) { /* update list of successfully completed objs */
158 (*completed_objs)[liter->second.shard_id] = liter->second.oid;
159 }
160
161 if (r == RGWBIAdvanceAndRetryError) {
162 r = 0;
163 if (retry_objs) {
164 (*retry_objs)[liter->second.shard_id] = liter->second.oid;
165 }
166 }
167 } else {
168 // NB: should we log an error here; currently no logging
169 // context to use
170 }
171 }
172
173 if (ret_code && (r < 0 && r != valid_ret_code)) {
174 (*ret_code) = r;
175 }
176
177 iter->second->release();
178 }
179
180 if (num_completions) {
181 (*num_completions) = completions.size();
182 }
183
184 completions.clear();
185
186 return true;
187 }
188
189 // note: currently only called by tesing code
190 void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
191 {
192 bufferlist in;
193 o.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
194 }
195
196 static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
197 const int shard_id,
198 const string& oid,
199 BucketIndexAioManager *manager) {
200 bufferlist in;
201 librados::ObjectWriteOperation op;
202 op.create(true);
203 op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
204 return manager->aio_operate(io_ctx, shard_id, oid, &op);
205 }
206
207 static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
208 const int shard_id,
209 const string& oid,
210 BucketIndexAioManager *manager) {
211 bufferlist in;
212 librados::ObjectWriteOperation op;
213 op.remove();
214 return manager->aio_operate(io_ctx, shard_id, oid, &op);
215 }
216
217 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
218 const int shard_id,
219 const string& oid,
220 uint64_t timeout,
221 BucketIndexAioManager *manager) {
222 bufferlist in;
223 rgw_cls_tag_timeout_op call;
224 call.tag_timeout = timeout;
225 encode(call, in);
226 ObjectWriteOperation op;
227 op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
228 return manager->aio_operate(io_ctx, shard_id, oid, &op);
229 }
230
231 int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
232 {
233 return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
234 }
235
236 void CLSRGWIssueBucketIndexInit::cleanup()
237 {
238 // Do best effort removal
239 for (auto citer = objs_container.begin(); citer != iter; ++citer) {
240 io_ctx.remove(citer->second);
241 }
242 }
243
244 int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
245 {
246 return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
247 }
248
249 int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
250 {
251 return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
252 }
253
254 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
255 bool absolute,
256 const map<RGWObjCategory, rgw_bucket_category_stats>& stats)
257 {
258 rgw_cls_bucket_update_stats_op call;
259 call.absolute = absolute;
260 call.stats = stats;
261 bufferlist in;
262 encode(call, in);
263 o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in);
264 }
265
266 void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
267 const cls_rgw_obj_key& key, const string& locator, bool log_op,
268 uint16_t bilog_flags, const rgw_zone_set& zones_trace)
269 {
270 rgw_cls_obj_prepare_op call;
271 call.op = op;
272 call.tag = tag;
273 call.key = key;
274 call.locator = locator;
275 call.log_op = log_op;
276 call.bilog_flags = bilog_flags;
277 call.zones_trace = zones_trace;
278 bufferlist in;
279 encode(call, in);
280 o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in);
281 }
282
283 void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
284 const rgw_bucket_entry_ver& ver,
285 const cls_rgw_obj_key& key,
286 const rgw_bucket_dir_entry_meta& dir_meta,
287 const list<cls_rgw_obj_key> *remove_objs, bool log_op,
288 uint16_t bilog_flags,
289 const rgw_zone_set *zones_trace)
290 {
291
292 bufferlist in;
293 rgw_cls_obj_complete_op call;
294 call.op = op;
295 call.tag = tag;
296 call.key = key;
297 call.ver = ver;
298 call.meta = dir_meta;
299 call.log_op = log_op;
300 call.bilog_flags = bilog_flags;
301 if (remove_objs)
302 call.remove_objs = *remove_objs;
303 if (zones_trace) {
304 call.zones_trace = *zones_trace;
305 }
306 encode(call, in);
307 o.exec(RGW_CLASS, RGW_BUCKET_COMPLETE_OP, in);
308 }
309
310 void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
311 const cls_rgw_obj_key& start_obj,
312 const std::string& filter_prefix,
313 const std::string& delimiter,
314 uint32_t num_entries,
315 bool list_versions,
316 rgw_cls_list_ret* result)
317 {
318 bufferlist in;
319 rgw_cls_list_op call;
320 call.start_obj = start_obj;
321 call.filter_prefix = filter_prefix;
322 call.delimiter = delimiter;
323 call.num_entries = num_entries;
324 call.list_versions = list_versions;
325 encode(call, in);
326
327 op.exec(RGW_CLASS, RGW_BUCKET_LIST, in,
328 new ClsBucketIndexOpCtx<rgw_cls_list_ret>(result, NULL));
329 }
330
331 static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
332 const int shard_id,
333 const std::string& oid,
334 const cls_rgw_obj_key& start_obj,
335 const std::string& filter_prefix,
336 const std::string& delimiter,
337 uint32_t num_entries,
338 bool list_versions,
339 BucketIndexAioManager *manager,
340 rgw_cls_list_ret *pdata)
341 {
342 librados::ObjectReadOperation op;
343 cls_rgw_bucket_list_op(op,
344 start_obj, filter_prefix, delimiter,
345 num_entries, list_versions, pdata);
346 return manager->aio_operate(io_ctx, shard_id, oid, &op);
347 }
348
349 int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
350 {
351 // set the marker depending on whether we've already queried this
352 // shard and gotten a RGWBIAdvanceAndRetryError (defined
353 // constant) return value; if we have use the marker in the return
354 // to advance the search, otherwise use the marker passed in by the
355 // caller
356 cls_rgw_obj_key marker;
357 auto iter = result.find(shard_id);
358 if (iter != result.end()) {
359 marker = iter->second.marker;
360 } else {
361 marker = start_obj;
362 }
363
364 return issue_bucket_list_op(io_ctx, shard_id, oid,
365 marker, filter_prefix, delimiter,
366 num_entries, list_versions, &manager,
367 &result[shard_id]);
368 }
369
370
371 void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs)
372 {
373 objs_container.swap(objs);
374 iter = objs_container.begin();
375 objs.clear();
376 }
377
378
379 void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes)
380 {
381 bufferlist in;
382 rgw_cls_obj_remove_op call;
383 call.keep_attr_prefixes = keep_attr_prefixes;
384 encode(call, in);
385 o.exec(RGW_CLASS, RGW_OBJ_REMOVE, in);
386 }
387
388 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const string& attr)
389 {
390 bufferlist in;
391 rgw_cls_obj_store_pg_ver_op call;
392 call.attr = attr;
393 encode(call, in);
394 o.exec(RGW_CLASS, RGW_OBJ_STORE_PG_VER, in);
395 }
396
397 void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist)
398 {
399 bufferlist in;
400 rgw_cls_obj_check_attrs_prefix call;
401 call.check_prefix = prefix;
402 call.fail_if_exist = fail_if_exist;
403 encode(call, in);
404 o.exec(RGW_CLASS, RGW_OBJ_CHECK_ATTRS_PREFIX, in);
405 }
406
407 void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const real_time& mtime, bool high_precision_time, RGWCheckMTimeType type)
408 {
409 bufferlist in;
410 rgw_cls_obj_check_mtime call;
411 call.mtime = mtime;
412 call.high_precision_time = high_precision_time;
413 call.type = type;
414 encode(call, in);
415 o.exec(RGW_CLASS, RGW_OBJ_CHECK_MTIME, in);
416 }
417
418 int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
419 BIIndexType index_type, const cls_rgw_obj_key& key,
420 rgw_cls_bi_entry *entry)
421 {
422 bufferlist in, out;
423 rgw_cls_bi_get_op call;
424 call.key = key;
425 call.type = index_type;
426 encode(call, in);
427 int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET, in, out);
428 if (r < 0)
429 return r;
430
431 rgw_cls_bi_get_ret op_ret;
432 auto iter = out.cbegin();
433 try {
434 decode(op_ret, iter);
435 } catch (ceph::buffer::error& err) {
436 return -EIO;
437 }
438
439 *entry = op_ret.entry;
440
441 return 0;
442 }
443
444 int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
445 {
446 bufferlist in, out;
447 rgw_cls_bi_put_op call;
448 call.entry = entry;
449 encode(call, in);
450 int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_PUT, in, out);
451 if (r < 0)
452 return r;
453
454 return 0;
455 }
456
457 void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi_entry& entry)
458 {
459 bufferlist in, out;
460 rgw_cls_bi_put_op call;
461 call.entry = entry;
462 encode(call, in);
463 op.exec(RGW_CLASS, RGW_BI_PUT, in);
464 }
465
466 /* nb: any entries passed in are replaced with the results of the cls
467 * call, so caller does not need to clear entries between calls
468 */
469 int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
470 const std::string& name_filter, const std::string& marker, uint32_t max,
471 std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
472 {
473 bufferlist in, out;
474 rgw_cls_bi_list_op call;
475 call.name_filter = name_filter;
476 call.marker = marker;
477 call.max = max;
478 encode(call, in);
479 int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out);
480 if (r < 0)
481 return r;
482
483 rgw_cls_bi_list_ret op_ret;
484 auto iter = out.cbegin();
485 try {
486 decode(op_ret, iter);
487 } catch (ceph::buffer::error& err) {
488 return -EIO;
489 }
490
491 entries->swap(op_ret.entries);
492 *is_truncated = op_ret.is_truncated;
493
494 return 0;
495 }
496
497 int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
498 const cls_rgw_obj_key& key, const bufferlist& olh_tag,
499 bool delete_marker, const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
500 uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
501 {
502 librados::ObjectWriteOperation op;
503 cls_rgw_bucket_link_olh(op, key, olh_tag, delete_marker, op_tag, meta,
504 olh_epoch, unmod_since, high_precision_time, log_op,
505 zones_trace);
506
507 return io_ctx.operate(oid, &op);
508 }
509
510
511 void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key,
512 const bufferlist& olh_tag, bool delete_marker,
513 const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
514 uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
515 {
516 bufferlist in, out;
517 rgw_cls_link_olh_op call;
518 call.key = key;
519 call.olh_tag = olh_tag.to_str();
520 call.op_tag = op_tag;
521 call.delete_marker = delete_marker;
522 if (meta) {
523 call.meta = *meta;
524 }
525 call.olh_epoch = olh_epoch;
526 call.log_op = log_op;
527 call.unmod_since = unmod_since;
528 call.high_precision_time = high_precision_time;
529 call.zones_trace = zones_trace;
530 encode(call, in);
531 op.exec(RGW_CLASS, RGW_BUCKET_LINK_OLH, in);
532 }
533
534 int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
535 const cls_rgw_obj_key& key, const string& op_tag,
536 const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
537 {
538 librados::ObjectWriteOperation op;
539 cls_rgw_bucket_unlink_instance(op, key, op_tag, olh_tag, olh_epoch, log_op, zones_trace);
540 int r = io_ctx.operate(oid, &op);
541 if (r < 0)
542 return r;
543
544 return 0;
545 }
546
547 void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op,
548 const cls_rgw_obj_key& key, const string& op_tag,
549 const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
550 {
551 bufferlist in, out;
552 rgw_cls_unlink_instance_op call;
553 call.key = key;
554 call.op_tag = op_tag;
555 call.olh_epoch = olh_epoch;
556 call.olh_tag = olh_tag;
557 call.log_op = log_op;
558 call.zones_trace = zones_trace;
559 encode(call, in);
560 op.exec(RGW_CLASS, RGW_BUCKET_UNLINK_INSTANCE, in);
561 }
562
563 void cls_rgw_get_olh_log(librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const string& olh_tag, rgw_cls_read_olh_log_ret& log_ret, int& op_ret)
564 {
565 bufferlist in;
566 rgw_cls_read_olh_log_op call;
567 call.olh = olh;
568 call.ver_marker = ver_marker;
569 call.olh_tag = olh_tag;
570 encode(call, in);
571 op.exec(RGW_CLASS, RGW_BUCKET_READ_OLH_LOG, in, new ClsBucketIndexOpCtx<rgw_cls_read_olh_log_ret>(&log_ret, &op_ret));
572 }
573
574 int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, uint64_t ver_marker,
575 const string& olh_tag,
576 rgw_cls_read_olh_log_ret& log_ret)
577 {
578 int op_ret = 0;
579 librados::ObjectReadOperation op;
580 cls_rgw_get_olh_log(op, olh, ver_marker, olh_tag, log_ret, op_ret);
581 int r = io_ctx.operate(oid, &op, NULL);
582 if (r < 0) {
583 return r;
584 }
585 if (op_ret < 0) {
586 return op_ret;
587 }
588
589 return r;
590 }
591
592 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag)
593 {
594 bufferlist in;
595 rgw_cls_trim_olh_log_op call;
596 call.olh = olh;
597 call.ver = ver;
598 call.olh_tag = olh_tag;
599 encode(call, in);
600 op.exec(RGW_CLASS, RGW_BUCKET_TRIM_OLH_LOG, in);
601 }
602
603 int cls_rgw_clear_olh(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag)
604 {
605 librados::ObjectWriteOperation op;
606 cls_rgw_clear_olh(op, olh, olh_tag);
607
608 return io_ctx.operate(oid, &op);
609 }
610
611 void cls_rgw_clear_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, const string& olh_tag)
612 {
613 bufferlist in;
614 rgw_cls_bucket_clear_olh_op call;
615 call.key = olh;
616 call.olh_tag = olh_tag;
617 encode(call, in);
618 op.exec(RGW_CLASS, RGW_BUCKET_CLEAR_OLH, in);
619 }
620
621 void cls_rgw_bilog_list(librados::ObjectReadOperation& op,
622 const std::string& marker, uint32_t max,
623 cls_rgw_bi_log_list_ret *pdata, int *ret)
624 {
625 cls_rgw_bi_log_list_op call;
626 call.marker = marker;
627 call.max = max;
628
629 bufferlist in;
630 encode(call, in);
631 op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
632 }
633
634 static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
635 BucketIndexShardsManager& marker_mgr, uint32_t max,
636 BucketIndexAioManager *manager,
637 cls_rgw_bi_log_list_ret *pdata)
638 {
639 librados::ObjectReadOperation op;
640 cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
641 return manager->aio_operate(io_ctx, shard_id, oid, &op);
642 }
643
644 int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
645 {
646 return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
647 }
648
649 void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
650 const std::string& start_marker,
651 const std::string& end_marker)
652 {
653 cls_rgw_bi_log_trim_op call;
654 call.start_marker = start_marker;
655 call.end_marker = end_marker;
656
657 bufferlist in;
658 encode(call, in);
659 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
660 }
661
662 static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
663 BucketIndexShardsManager& start_marker_mgr,
664 BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
665 cls_rgw_bi_log_trim_op call;
666 librados::ObjectWriteOperation op;
667 cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
668 end_marker_mgr.get(shard_id, ""));
669 return manager->aio_operate(io_ctx, shard_id, oid, &op);
670 }
671
672 int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
673 {
674 return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
675 }
676
677 static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
678 rgw_cls_check_index_ret *pdata) {
679 bufferlist in;
680 librados::ObjectReadOperation op;
681 op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
682 pdata, NULL));
683 return manager->aio_operate(io_ctx, shard_id, oid, &op);
684 }
685
686 int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
687 {
688 return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
689 }
690
691 static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
692 BucketIndexAioManager *manager) {
693 bufferlist in;
694 librados::ObjectWriteOperation op;
695 op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
696 return manager->aio_operate(io_ctx, shard_id, oid, &op);
697 }
698
699 int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
700 {
701 return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
702 }
703
704 void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
705 {
706 updates.append(op);
707 encode(dirent, updates);
708 }
709
710 void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
711 {
712 o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
713 }
714
715 int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
716 {
717 cls_rgw_obj_key empty_key;
718 string empty_prefix;
719 string empty_delimiter;
720 return issue_bucket_list_op(io_ctx, shard_id, oid,
721 empty_key, empty_prefix, empty_delimiter,
722 0, false, &manager, &result[shard_id]);
723 }
724
725 static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
726 {
727 bufferlist in;
728 librados::ObjectWriteOperation op;
729 op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
730 return manager->aio_operate(io_ctx, shard_id, oid, &op);
731 }
732
733 int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
734 {
735 return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
736 }
737
738 static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
739 {
740 bufferlist in;
741 librados::ObjectWriteOperation op;
742 op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
743 return manager->aio_operate(io_ctx, shard_id, oid, &op);
744 }
745
746 int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
747 {
748 return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
749 }
750
751 class GetDirHeaderCompletion : public ObjectOperationCompletion {
752 RGWGetDirHeader_CB *ret_ctx;
753 public:
754 explicit GetDirHeaderCompletion(RGWGetDirHeader_CB *_ctx) : ret_ctx(_ctx) {}
755 ~GetDirHeaderCompletion() override {
756 ret_ctx->put();
757 }
758 void handle_completion(int r, bufferlist& outbl) override {
759 rgw_cls_list_ret ret;
760 try {
761 auto iter = outbl.cbegin();
762 decode(ret, iter);
763 } catch (ceph::buffer::error& err) {
764 r = -EIO;
765 }
766
767 ret_ctx->handle_response(r, ret.dir.header);
768 }
769 };
770
771 int cls_rgw_get_dir_header_async(IoCtx& io_ctx, string& oid, RGWGetDirHeader_CB *ctx)
772 {
773 bufferlist in, out;
774 rgw_cls_list_op call;
775 call.num_entries = 0;
776 encode(call, in);
777 ObjectReadOperation op;
778 GetDirHeaderCompletion *cb = new GetDirHeaderCompletion(ctx);
779 op.exec(RGW_CLASS, RGW_BUCKET_LIST, in, cb);
780 AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr);
781 int r = io_ctx.aio_operate(oid, c, &op, NULL);
782 c->release();
783 if (r < 0)
784 return r;
785
786 return 0;
787 }
788
789 int cls_rgw_usage_log_read(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket,
790 uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
791 string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
792 bool *is_truncated)
793 {
794 if (is_truncated)
795 *is_truncated = false;
796
797 bufferlist in, out;
798 rgw_cls_usage_log_read_op call;
799 call.start_epoch = start_epoch;
800 call.end_epoch = end_epoch;
801 call.owner = user;
802 call.max_entries = max_entries;
803 call.bucket = bucket;
804 call.iter = read_iter;
805 encode(call, in);
806 int r = io_ctx.exec(oid, RGW_CLASS, RGW_USER_USAGE_LOG_READ, in, out);
807 if (r < 0)
808 return r;
809
810 try {
811 rgw_cls_usage_log_read_ret result;
812 auto iter = out.cbegin();
813 decode(result, iter);
814 read_iter = result.next_iter;
815 if (is_truncated)
816 *is_truncated = result.truncated;
817
818 usage = result.usage;
819 } catch (ceph::buffer::error& e) {
820 return -EINVAL;
821 }
822
823 return 0;
824 }
825
826 int cls_rgw_usage_log_trim(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket,
827 uint64_t start_epoch, uint64_t end_epoch)
828 {
829 bufferlist in;
830 rgw_cls_usage_log_trim_op call;
831 call.start_epoch = start_epoch;
832 call.end_epoch = end_epoch;
833 call.user = user;
834 call.bucket = bucket;
835 encode(call, in);
836
837 bool done = false;
838 do {
839 ObjectWriteOperation op;
840 op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in);
841 int r = io_ctx.operate(oid, &op);
842 if (r == -ENODATA)
843 done = true;
844 else if (r < 0)
845 return r;
846 } while (!done);
847
848 return 0;
849 }
850
851 void cls_rgw_usage_log_trim(librados::ObjectWriteOperation& op, const string& user, const string& bucket, uint64_t start_epoch, uint64_t end_epoch)
852 {
853 bufferlist in;
854 rgw_cls_usage_log_trim_op call;
855 call.start_epoch = start_epoch;
856 call.end_epoch = end_epoch;
857 call.user = user;
858 call.bucket = bucket;
859 encode(call, in);
860
861 op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in);
862 }
863
864 void cls_rgw_usage_log_clear(ObjectWriteOperation& op)
865 {
866 bufferlist in;
867 op.exec(RGW_CLASS, RGW_USAGE_LOG_CLEAR, in);
868 }
869
870 void cls_rgw_usage_log_add(ObjectWriteOperation& op, rgw_usage_log_info& info)
871 {
872 bufferlist in;
873 rgw_cls_usage_log_add_op call;
874 call.info = info;
875 encode(call, in);
876 op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_ADD, in);
877 }
878
879 /* garbage collection */
880
881 void cls_rgw_gc_set_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
882 {
883 bufferlist in;
884 cls_rgw_gc_set_entry_op call;
885 call.expiration_secs = expiration_secs;
886 call.info = info;
887 encode(call, in);
888 op.exec(RGW_CLASS, RGW_GC_SET_ENTRY, in);
889 }
890
891 void cls_rgw_gc_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, const string& tag)
892 {
893 bufferlist in;
894 cls_rgw_gc_defer_entry_op call;
895 call.expiration_secs = expiration_secs;
896 call.tag = tag;
897 encode(call, in);
898 op.exec(RGW_CLASS, RGW_GC_DEFER_ENTRY, in);
899 }
900
901 int cls_rgw_gc_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
902 list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker)
903 {
904 bufferlist in, out;
905 cls_rgw_gc_list_op call;
906 call.marker = marker;
907 call.max = max;
908 call.expired_only = expired_only;
909 encode(call, in);
910 int r = io_ctx.exec(oid, RGW_CLASS, RGW_GC_LIST, in, out);
911 if (r < 0)
912 return r;
913
914 cls_rgw_gc_list_ret ret;
915 try {
916 auto iter = out.cbegin();
917 decode(ret, iter);
918 } catch (ceph::buffer::error& err) {
919 return -EIO;
920 }
921
922 entries.swap(ret.entries);
923
924 if (truncated)
925 *truncated = ret.truncated;
926 next_marker = std::move(ret.next_marker);
927 return r;
928 }
929
930 void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const vector<string>& tags)
931 {
932 bufferlist in;
933 cls_rgw_gc_remove_op call;
934 call.tags = tags;
935 encode(call, in);
936 op.exec(RGW_CLASS, RGW_GC_REMOVE, in);
937 }
938
939 int cls_rgw_lc_get_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head)
940 {
941 bufferlist in, out;
942 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_HEAD, in, out);
943 if (r < 0)
944 return r;
945
946 cls_rgw_lc_get_head_ret ret;
947 try {
948 auto iter = out.cbegin();
949 decode(ret, iter);
950 } catch (ceph::buffer::error& err) {
951 return -EIO;
952 }
953 head = ret.head;
954
955 return r;
956 }
957
958 int cls_rgw_lc_put_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head)
959 {
960 bufferlist in, out;
961 cls_rgw_lc_put_head_op call;
962 call.head = head;
963 encode(call, in);
964 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_PUT_HEAD, in, out);
965 return r;
966 }
967
968 int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, string& marker,
969 cls_rgw_lc_entry& entry)
970 {
971 bufferlist in, out;
972 cls_rgw_lc_get_next_entry_op call;
973 call.marker = marker;
974 encode(call, in);
975 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_NEXT_ENTRY, in, out);
976 if (r < 0)
977 return r;
978
979 cls_rgw_lc_get_next_entry_ret ret;
980 try {
981 auto iter = out.cbegin();
982 decode(ret, iter);
983 } catch (ceph::buffer::error& err) {
984 return -EIO;
985 }
986 entry = ret.entry;
987
988 return r;
989 }
990
991 int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid,
992 const cls_rgw_lc_entry& entry)
993 {
994 bufferlist in, out;
995 cls_rgw_lc_rm_entry_op call;
996 call.entry = entry;
997 encode(call, in);
998 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_RM_ENTRY, in, out);
999 return r;
1000 }
1001
1002 int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid,
1003 const cls_rgw_lc_entry& entry)
1004 {
1005 bufferlist in, out;
1006 cls_rgw_lc_set_entry_op call;
1007 call.entry = entry;
1008 encode(call, in);
1009 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_SET_ENTRY, in, out);
1010 return r;
1011 }
1012
1013 int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid,
1014 const std::string& marker, cls_rgw_lc_entry& entry)
1015 {
1016 bufferlist in, out;
1017 cls_rgw_lc_get_entry_op call{marker};;
1018 encode(call, in);
1019 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_ENTRY, in, out);
1020
1021 if (r < 0) {
1022 return r;
1023 }
1024
1025 cls_rgw_lc_get_entry_ret ret;
1026 try {
1027 auto iter = out.cbegin();
1028 decode(ret, iter);
1029 } catch (ceph::buffer::error& err) {
1030 return -EIO;
1031 }
1032
1033 entry = std::move(ret.entry);
1034 return r;
1035 }
1036
1037 int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
1038 const string& marker,
1039 uint32_t max_entries,
1040 vector<cls_rgw_lc_entry>& entries)
1041 {
1042 bufferlist in, out;
1043 cls_rgw_lc_list_entries_op op;
1044
1045 entries.clear();
1046
1047 op.marker = marker;
1048 op.max_entries = max_entries;
1049
1050 encode(op, in);
1051
1052 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_LIST_ENTRIES, in, out);
1053 if (r < 0)
1054 return r;
1055
1056 cls_rgw_lc_list_entries_ret ret;
1057 try {
1058 auto iter = out.cbegin();
1059 decode(ret, iter);
1060 } catch (ceph::buffer::error& err) {
1061 return -EIO;
1062 }
1063
1064 std::sort(std::begin(ret.entries), std::end(ret.entries),
1065 [](const cls_rgw_lc_entry& a, const cls_rgw_lc_entry& b)
1066 { return a.bucket < b.bucket; });
1067 entries = std::move(ret.entries);
1068 return r;
1069 }
1070
1071 void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
1072 {
1073 bufferlist in;
1074 cls_rgw_reshard_add_op call;
1075 call.entry = entry;
1076 encode(call, in);
1077 op.exec(RGW_CLASS, RGW_RESHARD_ADD, in);
1078 }
1079
1080 int cls_rgw_reshard_list(librados::IoCtx& io_ctx, const string& oid, string& marker, uint32_t max,
1081 list<cls_rgw_reshard_entry>& entries, bool* is_truncated)
1082 {
1083 bufferlist in, out;
1084 cls_rgw_reshard_list_op call;
1085 call.marker = marker;
1086 call.max = max;
1087 encode(call, in);
1088 int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_LIST, in, out);
1089 if (r < 0)
1090 return r;
1091
1092 cls_rgw_reshard_list_ret op_ret;
1093 auto iter = out.cbegin();
1094 try {
1095 decode(op_ret, iter);
1096 } catch (ceph::buffer::error& err) {
1097 return -EIO;
1098 }
1099
1100 entries.swap(op_ret.entries);
1101 *is_truncated = op_ret.is_truncated;
1102
1103 return 0;
1104 }
1105
1106 int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const string& oid, cls_rgw_reshard_entry& entry)
1107 {
1108 bufferlist in, out;
1109 cls_rgw_reshard_get_op call;
1110 call.entry = entry;
1111 encode(call, in);
1112 int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_GET, in, out);
1113 if (r < 0)
1114 return r;
1115
1116 cls_rgw_reshard_get_ret op_ret;
1117 auto iter = out.cbegin();
1118 try {
1119 decode(op_ret, iter);
1120 } catch (ceph::buffer::error& err) {
1121 return -EIO;
1122 }
1123
1124 entry = op_ret.entry;
1125
1126 return 0;
1127 }
1128
1129 void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
1130 {
1131 bufferlist in;
1132 cls_rgw_reshard_remove_op call;
1133 call.tenant = entry.tenant;
1134 call.bucket_name = entry.bucket_name;
1135 call.bucket_id = entry.bucket_id;
1136 encode(call, in);
1137 op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in);
1138 }
1139
1140 int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
1141 const cls_rgw_bucket_instance_entry& entry)
1142 {
1143 bufferlist in, out;
1144 cls_rgw_set_bucket_resharding_op call;
1145 call.entry = entry;
1146 encode(call, in);
1147 return io_ctx.exec(oid, RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in, out);
1148 }
1149
1150 int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid)
1151 {
1152 bufferlist in, out;
1153 cls_rgw_clear_bucket_resharding_op call;
1154 encode(call, in);
1155 return io_ctx.exec(oid, RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in, out);
1156 }
1157
1158 int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
1159 cls_rgw_bucket_instance_entry *entry)
1160 {
1161 bufferlist in, out;
1162 cls_rgw_get_bucket_resharding_op call;
1163 encode(call, in);
1164 int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out);
1165 if (r < 0)
1166 return r;
1167
1168 cls_rgw_get_bucket_resharding_ret op_ret;
1169 auto iter = out.cbegin();
1170 try {
1171 decode(op_ret, iter);
1172 } catch (ceph::buffer::error& err) {
1173 return -EIO;
1174 }
1175
1176 *entry = op_ret.new_instance;
1177
1178 return 0;
1179 }
1180
1181 void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
1182 {
1183 bufferlist in, out;
1184 cls_rgw_guard_bucket_resharding_op call;
1185 call.ret_err = ret_err;
1186 encode(call, in);
1187 op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
1188 }
1189
1190 static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
1191 const int shard_id, const string& oid,
1192 const cls_rgw_bucket_instance_entry& entry,
1193 BucketIndexAioManager *manager) {
1194 bufferlist in;
1195 cls_rgw_set_bucket_resharding_op call;
1196 call.entry = entry;
1197 encode(call, in);
1198 librados::ObjectWriteOperation op;
1199 op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
1200 return manager->aio_operate(io_ctx, shard_id, oid, &op);
1201 }
1202
1203 int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
1204 {
1205 return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
1206 }