1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include "common/ceph_json.h"
17 #include "common/strtol.h"
20 #include "rgw_rest_s3.h"
21 #include "rgw_rest_log.h"
22 #include "rgw_client_io.h"
24 #include "rgw_data_sync.h"
25 #include "rgw_common.h"
27 #include "rgw_mdlog.h"
29 #include "services/svc_zone.h"
30 #include "services/svc_mdlog.h"
31 #include "services/svc_bilog_rados.h"
33 #include "common/errno.h"
34 #include "include/ceph_assert.h"
36 #define dout_context g_ceph_context
37 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
38 #define dout_subsys ceph_subsys_rgw
40 void RGWOp_MDLog_List::execute(optional_yield y
) {
41 string period
= s
->info
.args
.get("period");
42 string shard
= s
->info
.args
.get("id");
43 string max_entries_str
= s
->info
.args
.get("max-entries");
44 string marker
= s
->info
.args
.get("marker"),
47 unsigned shard_id
, max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
49 if (s
->info
.args
.exists("start-time") ||
50 s
->info
.args
.exists("end-time")) {
51 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
56 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
58 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
63 if (!max_entries_str
.empty()) {
64 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
66 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str
<< dendl
;
70 if (max_entries
> LOG_CLASS_LIST_MAX_ENTRIES
) {
71 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
76 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
77 period
= store
->svc()->zone
->get_current_period_id();
79 ldpp_dout(this, 5) << "Missing period id" << dendl
;
85 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
87 meta_log
.init_list_entries(shard_id
, {}, {}, marker
, &handle
);
89 op_ret
= meta_log
.list_entries(this, handle
, max_entries
, entries
,
90 &last_marker
, &truncated
);
92 meta_log
.complete_list_entries(handle
);
95 void RGWOp_MDLog_List::send_response() {
96 set_req_state_err(s
, op_ret
);
103 s
->formatter
->open_object_section("log_entries");
104 s
->formatter
->dump_string("marker", last_marker
);
105 s
->formatter
->dump_bool("truncated", truncated
);
107 s
->formatter
->open_array_section("entries");
108 for (list
<cls_log_entry
>::iterator iter
= entries
.begin();
109 iter
!= entries
.end(); ++iter
) {
110 cls_log_entry
& entry
= *iter
;
111 store
->ctl()->meta
.mgr
->dump_log_entry(entry
, s
->formatter
);
114 s
->formatter
->close_section();
116 s
->formatter
->close_section();
120 void RGWOp_MDLog_Info::execute(optional_yield y
) {
121 num_objects
= s
->cct
->_conf
->rgw_md_log_max_shards
;
122 period
= store
->svc()->mdlog
->read_oldest_log_period(y
, s
);
123 op_ret
= period
.get_error();
126 void RGWOp_MDLog_Info::send_response() {
127 set_req_state_err(s
, op_ret
);
131 s
->formatter
->open_object_section("mdlog");
132 s
->formatter
->dump_unsigned("num_objects", num_objects
);
134 s
->formatter
->dump_string("period", period
.get_period().get_id());
135 s
->formatter
->dump_unsigned("realm_epoch", period
.get_epoch());
137 s
->formatter
->close_section();
141 void RGWOp_MDLog_ShardInfo::execute(optional_yield y
) {
142 string period
= s
->info
.args
.get("period");
143 string shard
= s
->info
.args
.get("id");
146 unsigned shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
148 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
153 if (period
.empty()) {
154 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
155 period
= store
->svc()->zone
->get_current_period_id();
157 if (period
.empty()) {
158 ldpp_dout(this, 5) << "Missing period id" << dendl
;
163 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
165 op_ret
= meta_log
.get_info(this, shard_id
, &info
);
168 void RGWOp_MDLog_ShardInfo::send_response() {
169 set_req_state_err(s
, op_ret
);
173 encode_json("info", info
, s
->formatter
);
177 void RGWOp_MDLog_Delete::execute(optional_yield y
) {
178 string marker
= s
->info
.args
.get("marker"),
179 period
= s
->info
.args
.get("period"),
180 shard
= s
->info
.args
.get("id"),
185 if (s
->info
.args
.exists("start-time") ||
186 s
->info
.args
.exists("end-time")) {
187 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
191 if (s
->info
.args
.exists("start-marker")) {
192 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl
;
196 if (s
->info
.args
.exists("end-marker")) {
197 if (!s
->info
.args
.exists("marker")) {
198 marker
= s
->info
.args
.get("end-marker");
200 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl
;
207 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
209 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
214 if (marker
.empty()) { /* bounding end */
219 if (period
.empty()) {
220 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
221 period
= store
->svc()->zone
->get_current_period_id();
223 if (period
.empty()) {
224 ldpp_dout(this, 5) << "Missing period id" << dendl
;
229 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
231 op_ret
= meta_log
.trim(this, shard_id
, {}, {}, {}, marker
);
234 void RGWOp_MDLog_Lock::execute(optional_yield y
) {
235 string period
, shard_id_str
, duration_str
, locker_id
, zone_id
;
240 period
= s
->info
.args
.get("period");
241 shard_id_str
= s
->info
.args
.get("id");
242 duration_str
= s
->info
.args
.get("length");
243 locker_id
= s
->info
.args
.get("locker-id");
244 zone_id
= s
->info
.args
.get("zone-id");
246 if (period
.empty()) {
247 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
248 period
= store
->svc()->zone
->get_current_period_id();
251 if (period
.empty() ||
252 shard_id_str
.empty() ||
253 (duration_str
.empty()) ||
256 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl
;
262 shard_id
= (unsigned)strict_strtol(shard_id_str
.c_str(), 10, &err
);
264 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str
<< dendl
;
269 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
271 dur
= (unsigned)strict_strtol(duration_str
.c_str(), 10, &err
);
272 if (!err
.empty() || dur
<= 0) {
273 ldpp_dout(this, 5) << "invalid length param " << duration_str
<< dendl
;
277 op_ret
= meta_log
.lock_exclusive(s
, shard_id
, make_timespan(dur
), zone_id
,
279 if (op_ret
== -EBUSY
)
280 op_ret
= -ERR_LOCKED
;
283 void RGWOp_MDLog_Unlock::execute(optional_yield y
) {
284 string period
, shard_id_str
, locker_id
, zone_id
;
289 period
= s
->info
.args
.get("period");
290 shard_id_str
= s
->info
.args
.get("id");
291 locker_id
= s
->info
.args
.get("locker-id");
292 zone_id
= s
->info
.args
.get("zone-id");
294 if (period
.empty()) {
295 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
296 period
= store
->svc()->zone
->get_current_period_id();
299 if (period
.empty() ||
300 shard_id_str
.empty() ||
303 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl
;
309 shard_id
= (unsigned)strict_strtol(shard_id_str
.c_str(), 10, &err
);
311 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str
<< dendl
;
316 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
317 op_ret
= meta_log
.unlock(s
, shard_id
, zone_id
, locker_id
);
320 void RGWOp_MDLog_Notify::execute(optional_yield y
) {
321 #define LARGE_ENOUGH_BUF (128 * 1024)
325 std::tie(r
, data
) = rgw_rest_read_all_input(s
, LARGE_ENOUGH_BUF
);
331 char* buf
= data
.c_str();
332 ldpp_dout(this, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
335 r
= p
.parse(buf
, data
.length());
337 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl
;
342 set
<int> updated_shards
;
344 decode_json_obj(updated_shards
, &p
);
345 } catch (JSONDecoder::err
& err
) {
346 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl
;
351 if (store
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
352 for (set
<int>::iterator iter
= updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
353 ldpp_dout(this, 20) << __func__
<< "(): updated shard=" << *iter
<< dendl
;
357 store
->getRados()->wakeup_meta_sync_shards(updated_shards
);
362 void RGWOp_BILog_List::execute(optional_yield y
) {
363 string tenant_name
= s
->info
.args
.get("tenant"),
364 bucket_name
= s
->info
.args
.get("bucket"),
365 marker
= s
->info
.args
.get("marker"),
366 max_entries_str
= s
->info
.args
.get("max-entries"),
367 bucket_instance
= s
->info
.args
.get("bucket-instance");
368 RGWBucketInfo bucket_info
;
369 unsigned max_entries
;
371 if (bucket_name
.empty() && bucket_instance
.empty()) {
372 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl
;
379 op_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
384 if (!bucket_instance
.empty()) {
385 rgw_bucket
b(rgw_bucket_key(tenant_name
, bn
, bucket_instance
));
386 op_ret
= store
->getRados()->get_bucket_instance_info(*s
->sysobj_ctx
, b
, bucket_info
, NULL
, NULL
, s
->yield
, this);
388 ldpp_dout(this, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance
<< dendl
;
391 } else { /* !bucket_name.empty() */
392 op_ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
, bucket_info
, NULL
, s
->yield
, NULL
);
394 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
403 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
405 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
409 list
<rgw_bi_log_entry
> entries
;
410 int ret
= store
->svc()->bilog_rados
->log_list(s
, bucket_info
, shard_id
,
411 marker
, max_entries
- count
,
412 entries
, &truncated
);
414 ldpp_dout(this, 5) << "ERROR: list_bi_log_entries()" << dendl
;
418 count
+= entries
.size();
420 send_response(entries
, marker
);
421 } while (truncated
&& count
< max_entries
);
426 void RGWOp_BILog_List::send_response() {
430 set_req_state_err(s
, op_ret
);
439 s
->formatter
->open_array_section("entries");
442 void RGWOp_BILog_List::send_response(list
<rgw_bi_log_entry
>& entries
, string
& marker
)
444 for (list
<rgw_bi_log_entry
>::iterator iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
445 rgw_bi_log_entry
& entry
= *iter
;
446 encode_json("entry", entry
, s
->formatter
);
453 void RGWOp_BILog_List::send_response_end() {
454 s
->formatter
->close_section();
458 void RGWOp_BILog_Info::execute(optional_yield y
) {
459 string tenant_name
= s
->info
.args
.get("tenant"),
460 bucket_name
= s
->info
.args
.get("bucket"),
461 bucket_instance
= s
->info
.args
.get("bucket-instance");
462 RGWBucketInfo bucket_info
;
464 if (bucket_name
.empty() && bucket_instance
.empty()) {
465 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl
;
472 op_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
477 if (!bucket_instance
.empty()) {
478 rgw_bucket
b(rgw_bucket_key(tenant_name
, bn
, bucket_instance
));
479 op_ret
= store
->getRados()->get_bucket_instance_info(*s
->sysobj_ctx
, b
, bucket_info
, NULL
, NULL
, s
->yield
, this);
481 ldpp_dout(this, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance
<< dendl
;
484 } else { /* !bucket_name.empty() */
485 op_ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
, bucket_info
, NULL
, s
->yield
, NULL
);
487 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
491 map
<RGWObjCategory
, RGWStorageStats
> stats
;
492 int ret
= store
->getRados()->get_bucket_stats(s
, bucket_info
, shard_id
, &bucket_ver
, &master_ver
, stats
, &max_marker
, &syncstopped
);
493 if (ret
< 0 && ret
!= -ENOENT
) {
499 void RGWOp_BILog_Info::send_response() {
500 set_req_state_err(s
, op_ret
);
507 s
->formatter
->open_object_section("info");
508 encode_json("bucket_ver", bucket_ver
, s
->formatter
);
509 encode_json("master_ver", master_ver
, s
->formatter
);
510 encode_json("max_marker", max_marker
, s
->formatter
);
511 encode_json("syncstopped", syncstopped
, s
->formatter
);
512 s
->formatter
->close_section();
517 void RGWOp_BILog_Delete::execute(optional_yield y
) {
518 string tenant_name
= s
->info
.args
.get("tenant"),
519 bucket_name
= s
->info
.args
.get("bucket"),
520 start_marker
= s
->info
.args
.get("start-marker"),
521 end_marker
= s
->info
.args
.get("end-marker"),
522 bucket_instance
= s
->info
.args
.get("bucket-instance");
524 RGWBucketInfo bucket_info
;
527 if ((bucket_name
.empty() && bucket_instance
.empty()) ||
528 end_marker
.empty()) {
529 ldpp_dout(this, 5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl
;
536 op_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
541 if (!bucket_instance
.empty()) {
542 rgw_bucket
b(rgw_bucket_key(tenant_name
, bn
, bucket_instance
));
543 op_ret
= store
->getRados()->get_bucket_instance_info(*s
->sysobj_ctx
, b
, bucket_info
, NULL
, NULL
, s
->yield
, this);
545 ldpp_dout(this, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance
<< dendl
;
548 } else { /* !bucket_name.empty() */
549 op_ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
, bucket_info
, NULL
, s
->yield
, NULL
);
551 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
555 op_ret
= store
->svc()->bilog_rados
->log_trim(s
, bucket_info
, shard_id
, start_marker
, end_marker
);
557 ldpp_dout(this, 5) << "ERROR: trim_bi_log_entries() " << dendl
;
562 void RGWOp_DATALog_List::execute(optional_yield y
) {
563 string shard
= s
->info
.args
.get("id");
565 string max_entries_str
= s
->info
.args
.get("max-entries"),
566 marker
= s
->info
.args
.get("marker"),
568 unsigned shard_id
, max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
570 if (s
->info
.args
.exists("start-time") ||
571 s
->info
.args
.exists("end-time")) {
572 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
576 s
->info
.args
.get_bool("extra-info", &extra_info
, false);
578 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
580 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
585 if (!max_entries_str
.empty()) {
586 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
588 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str
<< dendl
;
592 if (max_entries
> LOG_CLASS_LIST_MAX_ENTRIES
) {
593 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
597 // Note that last_marker is updated to be the marker of the last
599 op_ret
= store
->svc()->datalog_rados
->list_entries(this, shard_id
,
600 max_entries
, entries
,
601 marker
, &last_marker
,
605 void RGWOp_DATALog_List::send_response() {
606 set_req_state_err(s
, op_ret
);
613 s
->formatter
->open_object_section("log_entries");
614 s
->formatter
->dump_string("marker", last_marker
);
615 s
->formatter
->dump_bool("truncated", truncated
);
617 s
->formatter
->open_array_section("entries");
618 for (const auto& entry
: entries
) {
620 encode_json("entry", entry
.entry
, s
->formatter
);
622 encode_json("entry", entry
, s
->formatter
);
626 s
->formatter
->close_section();
628 s
->formatter
->close_section();
633 void RGWOp_DATALog_Info::execute(optional_yield y
) {
634 num_objects
= s
->cct
->_conf
->rgw_data_log_num_shards
;
638 void RGWOp_DATALog_Info::send_response() {
639 set_req_state_err(s
, op_ret
);
643 s
->formatter
->open_object_section("num_objects");
644 s
->formatter
->dump_unsigned("num_objects", num_objects
);
645 s
->formatter
->close_section();
649 void RGWOp_DATALog_ShardInfo::execute(optional_yield y
) {
650 string shard
= s
->info
.args
.get("id");
653 unsigned shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
655 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
660 op_ret
= store
->svc()->datalog_rados
->get_info(this, shard_id
, &info
);
663 void RGWOp_DATALog_ShardInfo::send_response() {
664 set_req_state_err(s
, op_ret
);
668 encode_json("info", info
, s
->formatter
);
672 void RGWOp_DATALog_Notify::execute(optional_yield y
) {
673 string source_zone
= s
->info
.args
.get("source-zone");
674 #define LARGE_ENOUGH_BUF (128 * 1024)
678 std::tie(r
, data
) = rgw_rest_read_all_input(s
, LARGE_ENOUGH_BUF
);
684 char* buf
= data
.c_str();
685 ldpp_dout(this, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
688 r
= p
.parse(buf
, data
.length());
690 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl
;
695 map
<int, set
<string
> > updated_shards
;
697 decode_json_obj(updated_shards
, &p
);
698 } catch (JSONDecoder::err
& err
) {
699 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl
;
704 if (store
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
705 for (map
<int, set
<string
> >::iterator iter
= updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
706 ldpp_dout(this, 20) << __func__
<< "(): updated shard=" << iter
->first
<< dendl
;
707 set
<string
>& keys
= iter
->second
;
708 for (set
<string
>::iterator kiter
= keys
.begin(); kiter
!= keys
.end(); ++kiter
) {
709 ldpp_dout(this, 20) << __func__
<< "(): modified key=" << *kiter
<< dendl
;
714 store
->getRados()->wakeup_data_sync_shards(source_zone
, updated_shards
);
719 void RGWOp_DATALog_Delete::execute(optional_yield y
) {
720 string marker
= s
->info
.args
.get("marker"),
721 shard
= s
->info
.args
.get("id"),
727 if (s
->info
.args
.exists("start-time") ||
728 s
->info
.args
.exists("end-time")) {
729 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
733 if (s
->info
.args
.exists("start-marker")) {
734 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl
;
738 if (s
->info
.args
.exists("end-marker")) {
739 if (!s
->info
.args
.exists("marker")) {
740 marker
= s
->info
.args
.get("end-marker");
742 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl
;
747 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
749 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
753 if (marker
.empty()) { /* bounding end */
758 op_ret
= store
->svc()->datalog_rados
->trim_entries(this, shard_id
, marker
);
761 // not in header to avoid pulling in rgw_sync.h
762 class RGWOp_MDLog_Status
: public RGWRESTOp
{
763 rgw_meta_sync_status status
;
765 int check_caps(const RGWUserCaps
& caps
) override
{
766 return caps
.check_cap("mdlog", RGW_CAP_READ
);
768 int verify_permission(optional_yield
) override
{
769 return check_caps(s
->user
->get_caps());
771 void execute(optional_yield y
) override
;
772 void send_response() override
;
773 const char* name() const override
{ return "get_metadata_log_status"; }
776 void RGWOp_MDLog_Status::execute(optional_yield y
)
778 auto sync
= store
->getRados()->get_meta_sync_manager();
779 if (sync
== nullptr) {
780 ldpp_dout(this, 1) << "no sync manager" << dendl
;
784 op_ret
= sync
->read_sync_status(this, &status
);
787 void RGWOp_MDLog_Status::send_response()
789 set_req_state_err(s
, op_ret
);
794 encode_json("status", status
, s
->formatter
);
799 // not in header to avoid pulling in rgw_data_sync.h
800 class RGWOp_BILog_Status
: public RGWRESTOp
{
801 std::vector
<rgw_bucket_shard_sync_info
> status
;
803 int check_caps(const RGWUserCaps
& caps
) override
{
804 return caps
.check_cap("bilog", RGW_CAP_READ
);
806 int verify_permission(optional_yield y
) override
{
807 return check_caps(s
->user
->get_caps());
809 void execute(optional_yield y
) override
;
810 void send_response() override
;
811 const char* name() const override
{ return "get_bucket_index_log_status"; }
814 void RGWOp_BILog_Status::execute(optional_yield y
)
816 const auto options
= s
->info
.args
.get("options");
817 bool merge
= (options
== "merge");
818 const auto source_zone
= s
->info
.args
.get("source-zone");
819 const auto source_key
= s
->info
.args
.get("source-bucket");
820 auto key
= s
->info
.args
.get("bucket");
825 ldpp_dout(this, 4) << "no 'bucket' provided" << dendl
;
831 int shard_id
{-1}; // unused
832 op_ret
= rgw_bucket_parse_bucket_key(s
->cct
, key
, &bucket
, &shard_id
);
834 ldpp_dout(this, 4) << "invalid 'bucket' provided" << dendl
;
839 // read the bucket instance info for num_shards
840 auto ctx
= store
->svc()->sysobj
->init_obj_ctx();
842 op_ret
= store
->getRados()->get_bucket_instance_info(ctx
, bucket
, info
, nullptr, nullptr, s
->yield
, this);
844 ldpp_dout(this, 4) << "failed to read bucket info: " << cpp_strerror(op_ret
) << dendl
;
848 rgw_bucket source_bucket
;
850 if (source_key
.empty() ||
852 source_bucket
= info
.bucket
;
854 op_ret
= rgw_bucket_parse_bucket_key(s
->cct
, source_key
, &source_bucket
, nullptr);
856 ldpp_dout(this, 4) << "invalid 'source-bucket' provided (key=" << source_key
<< ")" << dendl
;
861 const auto& local_zone_id
= store
->svc()->zone
->zone_id();
864 rgw_sync_bucket_pipe pipe
;
865 pipe
.source
.zone
= source_zone
;
866 pipe
.source
.bucket
= source_bucket
;
867 pipe
.dest
.zone
= local_zone_id
;
868 pipe
.dest
.bucket
= info
.bucket
;
870 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe
<< dendl
;
872 op_ret
= rgw_bucket_sync_status(this, store
, pipe
, info
, nullptr, &status
);
875 ldpp_dout(this, -1) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe
<< " returned ret=" << op_ret
<< dendl
;
880 rgw_zone_id
source_zone_id(source_zone
);
882 RGWBucketSyncPolicyHandlerRef source_handler
;
883 op_ret
= store
->ctl()->bucket
->get_sync_policy_handler(source_zone_id
, source_bucket
, &source_handler
, y
, s
);
885 ldpp_dout(this, -1) << "could not get bucket sync policy handler (r=" << op_ret
<< ")" << dendl
;
889 auto local_dests
= source_handler
->get_all_dests_in_zone(local_zone_id
);
891 std::vector
<rgw_bucket_shard_sync_info
> current_status
;
892 for (auto& entry
: local_dests
) {
893 auto pipe
= entry
.second
;
895 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe
<< dendl
;
897 RGWBucketInfo
*pinfo
= &info
;
898 std::optional
<RGWBucketInfo
> opt_dest_info
;
900 if (!pipe
.dest
.bucket
) {
901 /* Uh oh, something went wrong */
902 ldpp_dout(this, 20) << "ERROR: RGWOp_BILog_Status::execute(optional_yield y): BUG: pipe.dest.bucket was not initialized" << pipe
<< dendl
;
907 if (*pipe
.dest
.bucket
!= info
.bucket
) {
908 opt_dest_info
.emplace();
909 pinfo
= &(*opt_dest_info
);
911 /* dest bucket might not have a bucket id */
912 op_ret
= store
->ctl()->bucket
->read_bucket_info(*pipe
.dest
.bucket
,
916 RGWBucketCtl::BucketInstance::GetParams(),
919 ldpp_dout(this, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(op_ret
) << dendl
;
923 pipe
.dest
.bucket
= pinfo
->bucket
;
926 int r
= rgw_bucket_sync_status(this, store
, pipe
, *pinfo
, &info
, ¤t_status
);
928 ldpp_dout(this, -1) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe
<< " returned ret=" << r
<< dendl
;
933 if (status
.empty()) {
934 status
= std::move(current_status
);
936 if (current_status
.size() !=
939 ldpp_dout(this, -1) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status
.size() << " current_status.size()=" << current_status
.size() << dendl
;
942 auto m
= status
.begin();
943 for (auto& cur_shard_status
: current_status
) {
944 auto& result_shard_status
= *m
++;
945 // always take the first marker, or any later marker that's smaller
946 if (cur_shard_status
.inc_marker
.position
< result_shard_status
.inc_marker
.position
) {
947 result_shard_status
= std::move(cur_shard_status
);
954 void RGWOp_BILog_Status::send_response()
956 set_req_state_err(s
, op_ret
);
961 encode_json("status", status
, s
->formatter
);
966 // not in header to avoid pulling in rgw_data_sync.h
967 class RGWOp_DATALog_Status
: public RGWRESTOp
{
968 rgw_data_sync_status status
;
970 int check_caps(const RGWUserCaps
& caps
) override
{
971 return caps
.check_cap("datalog", RGW_CAP_READ
);
973 int verify_permission(optional_yield y
) override
{
974 return check_caps(s
->user
->get_caps());
976 void execute(optional_yield y
) override
;
977 void send_response() override
;
978 const char* name() const override
{ return "get_data_changes_log_status"; }
981 void RGWOp_DATALog_Status::execute(optional_yield y
)
983 const auto source_zone
= s
->info
.args
.get("source-zone");
984 auto sync
= store
->getRados()->get_data_sync_manager(source_zone
);
985 if (sync
== nullptr) {
986 ldpp_dout(this, 1) << "no sync manager for source-zone " << source_zone
<< dendl
;
990 op_ret
= sync
->read_sync_status(this, &status
);
993 void RGWOp_DATALog_Status::send_response()
995 set_req_state_err(s
, op_ret
);
1000 encode_json("status", status
, s
->formatter
);
1006 RGWOp
*RGWHandler_Log::op_get() {
1008 string type
= s
->info
.args
.get("type", &exists
);
1014 if (type
.compare("metadata") == 0) {
1015 if (s
->info
.args
.exists("id")) {
1016 if (s
->info
.args
.exists("info")) {
1017 return new RGWOp_MDLog_ShardInfo
;
1019 return new RGWOp_MDLog_List
;
1021 } else if (s
->info
.args
.exists("status")) {
1022 return new RGWOp_MDLog_Status
;
1024 return new RGWOp_MDLog_Info
;
1026 } else if (type
.compare("bucket-index") == 0) {
1027 if (s
->info
.args
.exists("info")) {
1028 return new RGWOp_BILog_Info
;
1029 } else if (s
->info
.args
.exists("status")) {
1030 return new RGWOp_BILog_Status
;
1032 return new RGWOp_BILog_List
;
1034 } else if (type
.compare("data") == 0) {
1035 if (s
->info
.args
.exists("id")) {
1036 if (s
->info
.args
.exists("info")) {
1037 return new RGWOp_DATALog_ShardInfo
;
1039 return new RGWOp_DATALog_List
;
1041 } else if (s
->info
.args
.exists("status")) {
1042 return new RGWOp_DATALog_Status
;
1044 return new RGWOp_DATALog_Info
;
1050 RGWOp
*RGWHandler_Log::op_delete() {
1052 string type
= s
->info
.args
.get("type", &exists
);
1058 if (type
.compare("metadata") == 0)
1059 return new RGWOp_MDLog_Delete
;
1060 else if (type
.compare("bucket-index") == 0)
1061 return new RGWOp_BILog_Delete
;
1062 else if (type
.compare("data") == 0)
1063 return new RGWOp_DATALog_Delete
;
1067 RGWOp
*RGWHandler_Log::op_post() {
1069 string type
= s
->info
.args
.get("type", &exists
);
1075 if (type
.compare("metadata") == 0) {
1076 if (s
->info
.args
.exists("lock"))
1077 return new RGWOp_MDLog_Lock
;
1078 else if (s
->info
.args
.exists("unlock"))
1079 return new RGWOp_MDLog_Unlock
;
1080 else if (s
->info
.args
.exists("notify"))
1081 return new RGWOp_MDLog_Notify
;
1082 } else if (type
.compare("data") == 0) {
1083 if (s
->info
.args
.exists("notify"))
1084 return new RGWOp_DATALog_Notify
;