]> git.proxmox.com Git - ceph.git/blame_incremental - ceph/src/cls/rgw/cls_rgw_client.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / cls / rgw / cls_rgw_client.cc
... / ...
CommitLineData
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <errno.h>
5
6#include "cls/rgw/cls_rgw_const.h"
7#include "cls/rgw/cls_rgw_client.h"
8
9#include "common/debug.h"
10
11using std::list;
12using std::map;
13using std::pair;
14using std::string;
15using std::vector;
16
17using ceph::real_time;
18
19using namespace librados;
20
21const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#";
22const string BucketIndexShardsManager::SHARDS_SEPARATOR = ",";
23
24
25int CLSRGWConcurrentIO::operator()() {
26 int ret = 0;
27 iter = objs_container.begin();
28 for (; iter != objs_container.end() && max_aio-- > 0; ++iter) {
29 ret = issue_op(iter->first, iter->second);
30 if (ret < 0)
31 break;
32 }
33
34 int num_completions = 0, r = 0;
35 std::map<int, std::string> completed_objs;
36 std::map<int, std::string> retry_objs;
37 while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r,
38 need_multiple_rounds() ? &completed_objs : nullptr,
39 !need_multiple_rounds() ? &retry_objs : nullptr)) {
40 if (r >= 0 && ret >= 0) {
41 for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
42 int issue_ret = issue_op(iter->first, iter->second);
43 if (issue_ret < 0) {
44 ret = issue_ret;
45 break;
46 }
47 }
48 } else if (ret >= 0) {
49 ret = r;
50 }
51
52 // if we're at the end with this round, see if another round is needed
53 if (iter == objs_container.end()) {
54 if (need_multiple_rounds() && !completed_objs.empty()) {
55 // For those objects which need another round, use them to reset
56 // the container
57 reset_container(completed_objs);
58 iter = objs_container.begin();
59 } else if (! need_multiple_rounds() && !retry_objs.empty()) {
60 reset_container(retry_objs);
61 iter = objs_container.begin();
62 }
63
64 // re-issue ops if container was reset above (i.e., iter !=
65 // objs_container.end()); if it was not reset above (i.e., iter
66 // == objs_container.end()) the loop will exit immediately
67 // without iterating
68 for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) {
69 int issue_ret = issue_op(iter->first, iter->second);
70 if (issue_ret < 0) {
71 ret = issue_ret;
72 break;
73 }
74 }
75 }
76 }
77
78 if (ret < 0) {
79 cleanup();
80 }
81 return ret;
82} // CLSRGWConcurrentIO::operator()()
83
84
85/**
86 * This class represents the bucket index object operation callback context.
87 */
88template <typename T>
89class ClsBucketIndexOpCtx : public ObjectOperationCompletion {
90private:
91 T *data;
92 int *ret_code;
93public:
94 ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); }
95 ~ClsBucketIndexOpCtx() override {}
96 void handle_completion(int r, bufferlist& outbl) override {
97 // if successful, or we're asked for a retry, copy result into
98 // destination (*data)
99 if (r >= 0 || r == RGWBIAdvanceAndRetryError) {
100 try {
101 auto iter = outbl.cbegin();
102 decode((*data), iter);
103 } catch (ceph::buffer::error& err) {
104 r = -EIO;
105 }
106 }
107 if (ret_code) {
108 *ret_code = r;
109 }
110 }
111};
112
113void BucketIndexAioManager::do_completion(const int request_id) {
114 std::lock_guard l{lock};
115
116 auto iter = pendings.find(request_id);
117 ceph_assert(iter != pendings.end());
118 completions[request_id] = iter->second;
119 pendings.erase(iter);
120
121 // If the caller needs a list of finished objects, store them
122 // for further processing
123 auto miter = pending_objs.find(request_id);
124 if (miter != pending_objs.end()) {
125 completion_objs.emplace(request_id, miter->second);
126 pending_objs.erase(miter);
127 }
128
129 cond.notify_all();
130}
131
132bool BucketIndexAioManager::wait_for_completions(int valid_ret_code,
133 int *num_completions,
134 int *ret_code,
135 std::map<int, std::string> *completed_objs,
136 std::map<int, std::string> *retry_objs)
137{
138 std::unique_lock locker{lock};
139 if (pendings.empty() && completions.empty()) {
140 return false;
141 }
142
143 if (completions.empty()) {
144 // Wait for AIO completion
145 cond.wait(locker);
146 }
147
148 // Clear the completed AIOs
149 auto iter = completions.begin();
150 for (; iter != completions.end(); ++iter) {
151 int r = iter->second->get_return_value();
152
153 // see if we may need to copy completions or retries
154 if (completed_objs || retry_objs) {
155 auto liter = completion_objs.find(iter->first);
156 if (liter != completion_objs.end()) {
157 if (completed_objs && r == 0) { /* update list of successfully completed objs */
158 (*completed_objs)[liter->second.shard_id] = liter->second.oid;
159 }
160
161 if (r == RGWBIAdvanceAndRetryError) {
162 r = 0;
163 if (retry_objs) {
164 (*retry_objs)[liter->second.shard_id] = liter->second.oid;
165 }
166 }
167 } else {
168 // NB: should we log an error here; currently no logging
169 // context to use
170 }
171 }
172
173 if (ret_code && (r < 0 && r != valid_ret_code)) {
174 (*ret_code) = r;
175 }
176
177 iter->second->release();
178 }
179
180 if (num_completions) {
181 (*num_completions) = completions.size();
182 }
183
184 completions.clear();
185
186 return true;
187}
188
189// note: currently only called by testing code
190void cls_rgw_bucket_init_index(ObjectWriteOperation& o)
191{
192 bufferlist in;
193 o.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
194}
195
196static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx,
197 const int shard_id,
198 const string& oid,
199 BucketIndexAioManager *manager) {
200 bufferlist in;
201 librados::ObjectWriteOperation op;
202 op.create(true);
203 op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in);
204 return manager->aio_operate(io_ctx, shard_id, oid, &op);
205}
206
207static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx,
208 const int shard_id,
209 const string& oid,
210 BucketIndexAioManager *manager) {
211 bufferlist in;
212 librados::ObjectWriteOperation op;
213 op.remove();
214 return manager->aio_operate(io_ctx, shard_id, oid, &op);
215}
216
217static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx,
218 const int shard_id,
219 const string& oid,
220 uint64_t timeout,
221 BucketIndexAioManager *manager) {
222 bufferlist in;
223 rgw_cls_tag_timeout_op call;
224 call.tag_timeout = timeout;
225 encode(call, in);
226 ObjectWriteOperation op;
227 op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in);
228 return manager->aio_operate(io_ctx, shard_id, oid, &op);
229}
230
231int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid)
232{
233 return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager);
234}
235
236void CLSRGWIssueBucketIndexInit::cleanup()
237{
238 // Do best effort removal
239 for (auto citer = objs_container.begin(); citer != iter; ++citer) {
240 io_ctx.remove(citer->second);
241 }
242}
243
244int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid)
245{
246 return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager);
247}
248
249int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid)
250{
251 return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager);
252}
253
254void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o,
255 bool absolute,
256 const map<RGWObjCategory, rgw_bucket_category_stats>& stats)
257{
258 rgw_cls_bucket_update_stats_op call;
259 call.absolute = absolute;
260 call.stats = stats;
261 bufferlist in;
262 encode(call, in);
263 o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in);
264}
265
266void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
267 const cls_rgw_obj_key& key, const string& locator, bool log_op,
268 uint16_t bilog_flags, const rgw_zone_set& zones_trace)
269{
270 rgw_cls_obj_prepare_op call;
271 call.op = op;
272 call.tag = tag;
273 call.key = key;
274 call.locator = locator;
275 call.log_op = log_op;
276 call.bilog_flags = bilog_flags;
277 call.zones_trace = zones_trace;
278 bufferlist in;
279 encode(call, in);
280 o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in);
281}
282
283void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag,
284 const rgw_bucket_entry_ver& ver,
285 const cls_rgw_obj_key& key,
286 const rgw_bucket_dir_entry_meta& dir_meta,
287 const list<cls_rgw_obj_key> *remove_objs, bool log_op,
288 uint16_t bilog_flags,
289 const rgw_zone_set *zones_trace,
290 const std::string& obj_locator)
291{
292
293 bufferlist in;
294 rgw_cls_obj_complete_op call;
295 call.op = op;
296 call.tag = tag;
297 call.key = key;
298 call.ver = ver;
299 call.locator = obj_locator;
300 call.meta = dir_meta;
301 call.log_op = log_op;
302 call.bilog_flags = bilog_flags;
303 if (remove_objs)
304 call.remove_objs = *remove_objs;
305 if (zones_trace) {
306 call.zones_trace = *zones_trace;
307 }
308 encode(call, in);
309 o.exec(RGW_CLASS, RGW_BUCKET_COMPLETE_OP, in);
310}
311
312void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op,
313 const cls_rgw_obj_key& start_obj,
314 const std::string& filter_prefix,
315 const std::string& delimiter,
316 uint32_t num_entries,
317 bool list_versions,
318 rgw_cls_list_ret* result)
319{
320 bufferlist in;
321 rgw_cls_list_op call;
322 call.start_obj = start_obj;
323 call.filter_prefix = filter_prefix;
324 call.delimiter = delimiter;
325 call.num_entries = num_entries;
326 call.list_versions = list_versions;
327 encode(call, in);
328
329 op.exec(RGW_CLASS, RGW_BUCKET_LIST, in,
330 new ClsBucketIndexOpCtx<rgw_cls_list_ret>(result, NULL));
331}
332
333static bool issue_bucket_list_op(librados::IoCtx& io_ctx,
334 const int shard_id,
335 const std::string& oid,
336 const cls_rgw_obj_key& start_obj,
337 const std::string& filter_prefix,
338 const std::string& delimiter,
339 uint32_t num_entries,
340 bool list_versions,
341 BucketIndexAioManager *manager,
342 rgw_cls_list_ret *pdata)
343{
344 librados::ObjectReadOperation op;
345 cls_rgw_bucket_list_op(op,
346 start_obj, filter_prefix, delimiter,
347 num_entries, list_versions, pdata);
348 return manager->aio_operate(io_ctx, shard_id, oid, &op);
349}
350
351int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid)
352{
353 // set the marker depending on whether we've already queried this
354 // shard and gotten a RGWBIAdvanceAndRetryError (defined
355 // constant) return value; if we have use the marker in the return
356 // to advance the search, otherwise use the marker passed in by the
357 // caller
358 cls_rgw_obj_key marker;
359 auto iter = result.find(shard_id);
360 if (iter != result.end()) {
361 marker = iter->second.marker;
362 } else {
363 marker = start_obj;
364 }
365
366 return issue_bucket_list_op(io_ctx, shard_id, oid,
367 marker, filter_prefix, delimiter,
368 num_entries, list_versions, &manager,
369 &result[shard_id]);
370}
371
372
373void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs)
374{
375 objs_container.swap(objs);
376 iter = objs_container.begin();
377 objs.clear();
378}
379
380
381void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes)
382{
383 bufferlist in;
384 rgw_cls_obj_remove_op call;
385 call.keep_attr_prefixes = keep_attr_prefixes;
386 encode(call, in);
387 o.exec(RGW_CLASS, RGW_OBJ_REMOVE, in);
388}
389
390void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const string& attr)
391{
392 bufferlist in;
393 rgw_cls_obj_store_pg_ver_op call;
394 call.attr = attr;
395 encode(call, in);
396 o.exec(RGW_CLASS, RGW_OBJ_STORE_PG_VER, in);
397}
398
399void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist)
400{
401 bufferlist in;
402 rgw_cls_obj_check_attrs_prefix call;
403 call.check_prefix = prefix;
404 call.fail_if_exist = fail_if_exist;
405 encode(call, in);
406 o.exec(RGW_CLASS, RGW_OBJ_CHECK_ATTRS_PREFIX, in);
407}
408
409void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const real_time& mtime, bool high_precision_time, RGWCheckMTimeType type)
410{
411 bufferlist in;
412 rgw_cls_obj_check_mtime call;
413 call.mtime = mtime;
414 call.high_precision_time = high_precision_time;
415 call.type = type;
416 encode(call, in);
417 o.exec(RGW_CLASS, RGW_OBJ_CHECK_MTIME, in);
418}
419
420int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid,
421 BIIndexType index_type, const cls_rgw_obj_key& key,
422 rgw_cls_bi_entry *entry)
423{
424 bufferlist in, out;
425 rgw_cls_bi_get_op call;
426 call.key = key;
427 call.type = index_type;
428 encode(call, in);
429 int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET, in, out);
430 if (r < 0)
431 return r;
432
433 rgw_cls_bi_get_ret op_ret;
434 auto iter = out.cbegin();
435 try {
436 decode(op_ret, iter);
437 } catch (ceph::buffer::error& err) {
438 return -EIO;
439 }
440
441 *entry = op_ret.entry;
442
443 return 0;
444}
445
446int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry)
447{
448 bufferlist in, out;
449 rgw_cls_bi_put_op call;
450 call.entry = entry;
451 encode(call, in);
452 int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_PUT, in, out);
453 if (r < 0)
454 return r;
455
456 return 0;
457}
458
459void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi_entry& entry)
460{
461 bufferlist in, out;
462 rgw_cls_bi_put_op call;
463 call.entry = entry;
464 encode(call, in);
465 op.exec(RGW_CLASS, RGW_BI_PUT, in);
466}
467
468/* nb: any entries passed in are replaced with the results of the cls
469 * call, so caller does not need to clear entries between calls
470 */
471int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid,
472 const std::string& name_filter, const std::string& marker, uint32_t max,
473 std::list<rgw_cls_bi_entry> *entries, bool *is_truncated)
474{
475 bufferlist in, out;
476 rgw_cls_bi_list_op call;
477 call.name_filter = name_filter;
478 call.marker = marker;
479 call.max = max;
480 encode(call, in);
481 int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out);
482 if (r < 0)
483 return r;
484
485 rgw_cls_bi_list_ret op_ret;
486 auto iter = out.cbegin();
487 try {
488 decode(op_ret, iter);
489 } catch (ceph::buffer::error& err) {
490 return -EIO;
491 }
492
493 entries->swap(op_ret.entries);
494 *is_truncated = op_ret.is_truncated;
495
496 return 0;
497}
498
499int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid,
500 const cls_rgw_obj_key& key, const bufferlist& olh_tag,
501 bool delete_marker, const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
502 uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
503{
504 librados::ObjectWriteOperation op;
505 cls_rgw_bucket_link_olh(op, key, olh_tag, delete_marker, op_tag, meta,
506 olh_epoch, unmod_since, high_precision_time, log_op,
507 zones_trace);
508
509 return io_ctx.operate(oid, &op);
510}
511
512
513void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key,
514 const bufferlist& olh_tag, bool delete_marker,
515 const string& op_tag, const rgw_bucket_dir_entry_meta *meta,
516 uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace)
517{
518 bufferlist in, out;
519 rgw_cls_link_olh_op call;
520 call.key = key;
521 call.olh_tag = olh_tag.to_str();
522 call.op_tag = op_tag;
523 call.delete_marker = delete_marker;
524 if (meta) {
525 call.meta = *meta;
526 }
527 call.olh_epoch = olh_epoch;
528 call.log_op = log_op;
529 call.unmod_since = unmod_since;
530 call.high_precision_time = high_precision_time;
531 call.zones_trace = zones_trace;
532 encode(call, in);
533 op.exec(RGW_CLASS, RGW_BUCKET_LINK_OLH, in);
534}
535
536int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid,
537 const cls_rgw_obj_key& key, const string& op_tag,
538 const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
539{
540 librados::ObjectWriteOperation op;
541 cls_rgw_bucket_unlink_instance(op, key, op_tag, olh_tag, olh_epoch, log_op, zones_trace);
542 int r = io_ctx.operate(oid, &op);
543 if (r < 0)
544 return r;
545
546 return 0;
547}
548
549void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op,
550 const cls_rgw_obj_key& key, const string& op_tag,
551 const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace)
552{
553 bufferlist in, out;
554 rgw_cls_unlink_instance_op call;
555 call.key = key;
556 call.op_tag = op_tag;
557 call.olh_epoch = olh_epoch;
558 call.olh_tag = olh_tag;
559 call.log_op = log_op;
560 call.zones_trace = zones_trace;
561 encode(call, in);
562 op.exec(RGW_CLASS, RGW_BUCKET_UNLINK_INSTANCE, in);
563}
564
565void cls_rgw_get_olh_log(librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const string& olh_tag, rgw_cls_read_olh_log_ret& log_ret, int& op_ret)
566{
567 bufferlist in;
568 rgw_cls_read_olh_log_op call;
569 call.olh = olh;
570 call.ver_marker = ver_marker;
571 call.olh_tag = olh_tag;
572 encode(call, in);
573 op.exec(RGW_CLASS, RGW_BUCKET_READ_OLH_LOG, in, new ClsBucketIndexOpCtx<rgw_cls_read_olh_log_ret>(&log_ret, &op_ret));
574}
575
576int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, uint64_t ver_marker,
577 const string& olh_tag,
578 rgw_cls_read_olh_log_ret& log_ret)
579{
580 int op_ret = 0;
581 librados::ObjectReadOperation op;
582 cls_rgw_get_olh_log(op, olh, ver_marker, olh_tag, log_ret, op_ret);
583 int r = io_ctx.operate(oid, &op, NULL);
584 if (r < 0) {
585 return r;
586 }
587 if (op_ret < 0) {
588 return op_ret;
589 }
590
591 return r;
592}
593
594void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag)
595{
596 bufferlist in;
597 rgw_cls_trim_olh_log_op call;
598 call.olh = olh;
599 call.ver = ver;
600 call.olh_tag = olh_tag;
601 encode(call, in);
602 op.exec(RGW_CLASS, RGW_BUCKET_TRIM_OLH_LOG, in);
603}
604
605int cls_rgw_clear_olh(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag)
606{
607 librados::ObjectWriteOperation op;
608 cls_rgw_clear_olh(op, olh, olh_tag);
609
610 return io_ctx.operate(oid, &op);
611}
612
613void cls_rgw_clear_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, const string& olh_tag)
614{
615 bufferlist in;
616 rgw_cls_bucket_clear_olh_op call;
617 call.key = olh;
618 call.olh_tag = olh_tag;
619 encode(call, in);
620 op.exec(RGW_CLASS, RGW_BUCKET_CLEAR_OLH, in);
621}
622
623void cls_rgw_bilog_list(librados::ObjectReadOperation& op,
624 const std::string& marker, uint32_t max,
625 cls_rgw_bi_log_list_ret *pdata, int *ret)
626{
627 cls_rgw_bi_log_list_op call;
628 call.marker = marker;
629 call.max = max;
630
631 bufferlist in;
632 encode(call, in);
633 op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret));
634}
635
636static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
637 BucketIndexShardsManager& marker_mgr, uint32_t max,
638 BucketIndexAioManager *manager,
639 cls_rgw_bi_log_list_ret *pdata)
640{
641 librados::ObjectReadOperation op;
642 cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr);
643 return manager->aio_operate(io_ctx, shard_id, oid, &op);
644}
645
646int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid)
647{
648 return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]);
649}
650
651void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op,
652 const std::string& start_marker,
653 const std::string& end_marker)
654{
655 cls_rgw_bi_log_trim_op call;
656 call.start_marker = start_marker;
657 call.end_marker = end_marker;
658
659 bufferlist in;
660 encode(call, in);
661 op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in);
662}
663
664static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id,
665 BucketIndexShardsManager& start_marker_mgr,
666 BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) {
667 cls_rgw_bi_log_trim_op call;
668 librados::ObjectWriteOperation op;
669 cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""),
670 end_marker_mgr.get(shard_id, ""));
671 return manager->aio_operate(io_ctx, shard_id, oid, &op);
672}
673
674int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid)
675{
676 return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager);
677}
678
679static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager,
680 rgw_cls_check_index_ret *pdata) {
681 bufferlist in;
682 librados::ObjectReadOperation op;
683 op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>(
684 pdata, NULL));
685 return manager->aio_operate(io_ctx, shard_id, oid, &op);
686}
687
688int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid)
689{
690 return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]);
691}
692
693static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid,
694 BucketIndexAioManager *manager) {
695 bufferlist in;
696 librados::ObjectWriteOperation op;
697 op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in);
698 return manager->aio_operate(io_ctx, shard_id, oid, &op);
699}
700
701int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid)
702{
703 return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager);
704}
705
706void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates)
707{
708 updates.append(op);
709 encode(dirent, updates);
710}
711
712void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates)
713{
714 o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates);
715}
716
717int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid)
718{
719 cls_rgw_obj_key empty_key;
720 string empty_prefix;
721 string empty_delimiter;
722 return issue_bucket_list_op(io_ctx, shard_id, oid,
723 empty_key, empty_prefix, empty_delimiter,
724 0, false, &manager, &result[shard_id]);
725}
726
727static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
728{
729 bufferlist in;
730 librados::ObjectWriteOperation op;
731 op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in);
732 return manager->aio_operate(io_ctx, shard_id, oid, &op);
733}
734
735int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid)
736{
737 return issue_resync_bi_log(io_ctx, shard_id, oid, &manager);
738}
739
740static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager)
741{
742 bufferlist in;
743 librados::ObjectWriteOperation op;
744 op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in);
745 return manager->aio_operate(io_ctx, shard_id, oid, &op);
746}
747
748int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid)
749{
750 return issue_bi_log_stop(io_ctx, shard_id, oid, &manager);
751}
752
753class GetDirHeaderCompletion : public ObjectOperationCompletion {
754 boost::intrusive_ptr<RGWGetDirHeader_CB> cb;
755public:
756 explicit GetDirHeaderCompletion(boost::intrusive_ptr<RGWGetDirHeader_CB> cb)
757 : cb(std::move(cb)) {}
758
759 void handle_completion(int r, bufferlist& outbl) override {
760 rgw_cls_list_ret ret;
761 try {
762 auto iter = outbl.cbegin();
763 decode(ret, iter);
764 } catch (ceph::buffer::error& err) {
765 r = -EIO;
766 }
767 cb->handle_response(r, ret.dir.header);
768 }
769};
770
771int cls_rgw_get_dir_header_async(IoCtx& io_ctx, const string& oid,
772 boost::intrusive_ptr<RGWGetDirHeader_CB> cb)
773{
774 bufferlist in, out;
775 rgw_cls_list_op call;
776 call.num_entries = 0;
777 encode(call, in);
778 ObjectReadOperation op;
779 op.exec(RGW_CLASS, RGW_BUCKET_LIST, in,
780 new GetDirHeaderCompletion(std::move(cb)));
781 AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr);
782 int r = io_ctx.aio_operate(oid, c, &op, NULL);
783 c->release();
784 if (r < 0)
785 return r;
786
787 return 0;
788}
789
790int cls_rgw_usage_log_read(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket,
791 uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
792 string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage,
793 bool *is_truncated)
794{
795 if (is_truncated)
796 *is_truncated = false;
797
798 bufferlist in, out;
799 rgw_cls_usage_log_read_op call;
800 call.start_epoch = start_epoch;
801 call.end_epoch = end_epoch;
802 call.owner = user;
803 call.max_entries = max_entries;
804 call.bucket = bucket;
805 call.iter = read_iter;
806 encode(call, in);
807 int r = io_ctx.exec(oid, RGW_CLASS, RGW_USER_USAGE_LOG_READ, in, out);
808 if (r < 0)
809 return r;
810
811 try {
812 rgw_cls_usage_log_read_ret result;
813 auto iter = out.cbegin();
814 decode(result, iter);
815 read_iter = result.next_iter;
816 if (is_truncated)
817 *is_truncated = result.truncated;
818
819 usage = result.usage;
820 } catch (ceph::buffer::error& e) {
821 return -EINVAL;
822 }
823
824 return 0;
825}
826
827int cls_rgw_usage_log_trim(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket,
828 uint64_t start_epoch, uint64_t end_epoch)
829{
830 bufferlist in;
831 rgw_cls_usage_log_trim_op call;
832 call.start_epoch = start_epoch;
833 call.end_epoch = end_epoch;
834 call.user = user;
835 call.bucket = bucket;
836 encode(call, in);
837
838 bool done = false;
839 do {
840 ObjectWriteOperation op;
841 op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in);
842 int r = io_ctx.operate(oid, &op);
843 if (r == -ENODATA)
844 done = true;
845 else if (r < 0)
846 return r;
847 } while (!done);
848
849 return 0;
850}
851
852void cls_rgw_usage_log_trim(librados::ObjectWriteOperation& op, const string& user, const string& bucket, uint64_t start_epoch, uint64_t end_epoch)
853{
854 bufferlist in;
855 rgw_cls_usage_log_trim_op call;
856 call.start_epoch = start_epoch;
857 call.end_epoch = end_epoch;
858 call.user = user;
859 call.bucket = bucket;
860 encode(call, in);
861
862 op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in);
863}
864
865void cls_rgw_usage_log_clear(ObjectWriteOperation& op)
866{
867 bufferlist in;
868 op.exec(RGW_CLASS, RGW_USAGE_LOG_CLEAR, in);
869}
870
871void cls_rgw_usage_log_add(ObjectWriteOperation& op, rgw_usage_log_info& info)
872{
873 bufferlist in;
874 rgw_cls_usage_log_add_op call;
875 call.info = info;
876 encode(call, in);
877 op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_ADD, in);
878}
879
880/* garbage collection */
881
882void cls_rgw_gc_set_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info)
883{
884 bufferlist in;
885 cls_rgw_gc_set_entry_op call;
886 call.expiration_secs = expiration_secs;
887 call.info = info;
888 encode(call, in);
889 op.exec(RGW_CLASS, RGW_GC_SET_ENTRY, in);
890}
891
892void cls_rgw_gc_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, const string& tag)
893{
894 bufferlist in;
895 cls_rgw_gc_defer_entry_op call;
896 call.expiration_secs = expiration_secs;
897 call.tag = tag;
898 encode(call, in);
899 op.exec(RGW_CLASS, RGW_GC_DEFER_ENTRY, in);
900}
901
902int cls_rgw_gc_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only,
903 list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker)
904{
905 bufferlist in, out;
906 cls_rgw_gc_list_op call;
907 call.marker = marker;
908 call.max = max;
909 call.expired_only = expired_only;
910 encode(call, in);
911 int r = io_ctx.exec(oid, RGW_CLASS, RGW_GC_LIST, in, out);
912 if (r < 0)
913 return r;
914
915 cls_rgw_gc_list_ret ret;
916 try {
917 auto iter = out.cbegin();
918 decode(ret, iter);
919 } catch (ceph::buffer::error& err) {
920 return -EIO;
921 }
922
923 entries.swap(ret.entries);
924
925 if (truncated)
926 *truncated = ret.truncated;
927 next_marker = std::move(ret.next_marker);
928 return r;
929}
930
931void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const vector<string>& tags)
932{
933 bufferlist in;
934 cls_rgw_gc_remove_op call;
935 call.tags = tags;
936 encode(call, in);
937 op.exec(RGW_CLASS, RGW_GC_REMOVE, in);
938}
939
940int cls_rgw_lc_get_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head)
941{
942 bufferlist in, out;
943 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_HEAD, in, out);
944 if (r < 0)
945 return r;
946
947 cls_rgw_lc_get_head_ret ret;
948 try {
949 auto iter = out.cbegin();
950 decode(ret, iter);
951 } catch (ceph::buffer::error& err) {
952 return -EIO;
953 }
954 head = ret.head;
955
956 return r;
957}
958
959int cls_rgw_lc_put_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head)
960{
961 bufferlist in, out;
962 cls_rgw_lc_put_head_op call;
963 call.head = head;
964 encode(call, in);
965 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_PUT_HEAD, in, out);
966 return r;
967}
968
969int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, const string& marker,
970 cls_rgw_lc_entry& entry)
971{
972 bufferlist in, out;
973 cls_rgw_lc_get_next_entry_op call;
974 call.marker = marker;
975 encode(call, in);
976 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_NEXT_ENTRY, in, out);
977 if (r < 0)
978 return r;
979
980 cls_rgw_lc_get_next_entry_ret ret;
981 try {
982 auto iter = out.cbegin();
983 decode(ret, iter);
984 } catch (ceph::buffer::error& err) {
985 return -EIO;
986 }
987 entry = ret.entry;
988
989 return r;
990}
991
992int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid,
993 const cls_rgw_lc_entry& entry)
994{
995 bufferlist in, out;
996 cls_rgw_lc_rm_entry_op call;
997 call.entry = entry;
998 encode(call, in);
999 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_RM_ENTRY, in, out);
1000 return r;
1001}
1002
1003int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid,
1004 const cls_rgw_lc_entry& entry)
1005{
1006 bufferlist in, out;
1007 cls_rgw_lc_set_entry_op call;
1008 call.entry = entry;
1009 encode(call, in);
1010 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_SET_ENTRY, in, out);
1011 return r;
1012}
1013
1014int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid,
1015 const std::string& marker, cls_rgw_lc_entry& entry)
1016{
1017 bufferlist in, out;
1018 cls_rgw_lc_get_entry_op call{marker};;
1019 encode(call, in);
1020 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_ENTRY, in, out);
1021
1022 if (r < 0) {
1023 return r;
1024 }
1025
1026 cls_rgw_lc_get_entry_ret ret;
1027 try {
1028 auto iter = out.cbegin();
1029 decode(ret, iter);
1030 } catch (ceph::buffer::error& err) {
1031 return -EIO;
1032 }
1033
1034 entry = std::move(ret.entry);
1035 return r;
1036}
1037
1038int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid,
1039 const string& marker,
1040 uint32_t max_entries,
1041 vector<cls_rgw_lc_entry>& entries)
1042{
1043 bufferlist in, out;
1044 cls_rgw_lc_list_entries_op op;
1045
1046 entries.clear();
1047
1048 op.marker = marker;
1049 op.max_entries = max_entries;
1050
1051 encode(op, in);
1052
1053 int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_LIST_ENTRIES, in, out);
1054 if (r < 0)
1055 return r;
1056
1057 cls_rgw_lc_list_entries_ret ret;
1058 try {
1059 auto iter = out.cbegin();
1060 decode(ret, iter);
1061 } catch (ceph::buffer::error& err) {
1062 return -EIO;
1063 }
1064
1065 std::sort(std::begin(ret.entries), std::end(ret.entries),
1066 [](const cls_rgw_lc_entry& a, const cls_rgw_lc_entry& b)
1067 { return a.bucket < b.bucket; });
1068 entries = std::move(ret.entries);
1069 return r;
1070}
1071
1072void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op,
1073 const std::string& part_key,
1074 const RGWUploadPartInfo& info)
1075{
1076 cls_rgw_mp_upload_part_info_update_op call;
1077 call.part_key = part_key;
1078 call.info = info;
1079
1080 buffer::list in;
1081 encode(call, in);
1082
1083 op.exec(RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in);
1084}
1085
1086void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
1087{
1088 bufferlist in;
1089 cls_rgw_reshard_add_op call;
1090 call.entry = entry;
1091 encode(call, in);
1092 op.exec(RGW_CLASS, RGW_RESHARD_ADD, in);
1093}
1094
1095int cls_rgw_reshard_list(librados::IoCtx& io_ctx, const string& oid, string& marker, uint32_t max,
1096 list<cls_rgw_reshard_entry>& entries, bool* is_truncated)
1097{
1098 bufferlist in, out;
1099 cls_rgw_reshard_list_op call;
1100 call.marker = marker;
1101 call.max = max;
1102 encode(call, in);
1103 int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_LIST, in, out);
1104 if (r < 0)
1105 return r;
1106
1107 cls_rgw_reshard_list_ret op_ret;
1108 auto iter = out.cbegin();
1109 try {
1110 decode(op_ret, iter);
1111 } catch (ceph::buffer::error& err) {
1112 return -EIO;
1113 }
1114
1115 entries.swap(op_ret.entries);
1116 *is_truncated = op_ret.is_truncated;
1117
1118 return 0;
1119}
1120
1121int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const string& oid, cls_rgw_reshard_entry& entry)
1122{
1123 bufferlist in, out;
1124 cls_rgw_reshard_get_op call;
1125 call.entry = entry;
1126 encode(call, in);
1127 int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_GET, in, out);
1128 if (r < 0)
1129 return r;
1130
1131 cls_rgw_reshard_get_ret op_ret;
1132 auto iter = out.cbegin();
1133 try {
1134 decode(op_ret, iter);
1135 } catch (ceph::buffer::error& err) {
1136 return -EIO;
1137 }
1138
1139 entry = op_ret.entry;
1140
1141 return 0;
1142}
1143
1144void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
1145{
1146 bufferlist in;
1147 cls_rgw_reshard_remove_op call;
1148 call.tenant = entry.tenant;
1149 call.bucket_name = entry.bucket_name;
1150 call.bucket_id = entry.bucket_id;
1151 encode(call, in);
1152 op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in);
1153}
1154
1155int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
1156 const cls_rgw_bucket_instance_entry& entry)
1157{
1158 bufferlist in, out;
1159 cls_rgw_set_bucket_resharding_op call;
1160 call.entry = entry;
1161 encode(call, in);
1162 return io_ctx.exec(oid, RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in, out);
1163}
1164
1165int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid)
1166{
1167 bufferlist in, out;
1168 cls_rgw_clear_bucket_resharding_op call;
1169 encode(call, in);
1170 return io_ctx.exec(oid, RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in, out);
1171}
1172
1173int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid,
1174 cls_rgw_bucket_instance_entry *entry)
1175{
1176 bufferlist in, out;
1177 cls_rgw_get_bucket_resharding_op call;
1178 encode(call, in);
1179 int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out);
1180 if (r < 0)
1181 return r;
1182
1183 cls_rgw_get_bucket_resharding_ret op_ret;
1184 auto iter = out.cbegin();
1185 try {
1186 decode(op_ret, iter);
1187 } catch (ceph::buffer::error& err) {
1188 return -EIO;
1189 }
1190
1191 *entry = op_ret.new_instance;
1192
1193 return 0;
1194}
1195
1196void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err)
1197{
1198 bufferlist in, out;
1199 cls_rgw_guard_bucket_resharding_op call;
1200 call.ret_err = ret_err;
1201 encode(call, in);
1202 op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in);
1203}
1204
1205static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx,
1206 const int shard_id, const string& oid,
1207 const cls_rgw_bucket_instance_entry& entry,
1208 BucketIndexAioManager *manager) {
1209 bufferlist in;
1210 cls_rgw_set_bucket_resharding_op call;
1211 call.entry = entry;
1212 encode(call, in);
1213 librados::ObjectWriteOperation op;
1214 op.assert_exists(); // the shard must exist; if not fail rather than recreate
1215 op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in);
1216 return manager->aio_operate(io_ctx, shard_id, oid, &op);
1217}
1218
1219int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid)
1220{
1221 return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager);
1222}