1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include "common/ceph_json.h"
17 #include "common/strtol.h"
20 #include "rgw_rest_s3.h"
21 #include "rgw_rest_log.h"
22 #include "rgw_client_io.h"
24 #include "rgw_data_sync.h"
25 #include "rgw_common.h"
27 #include "rgw_mdlog.h"
29 #include "services/svc_zone.h"
30 #include "services/svc_mdlog.h"
31 #include "services/svc_bilog_rados.h"
32 #include "services/svc_datalog_rados.h"
34 #include "common/errno.h"
35 #include "include/ceph_assert.h"
37 #define dout_context g_ceph_context
38 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
39 #define dout_subsys ceph_subsys_rgw
41 static int parse_date_str(string
& in
, real_time
& out
) {
46 if (utime_t::parse_date(in
, &epoch
, &nsec
) < 0) {
47 dout(5) << "Error parsing date " << in
<< dendl
;
51 out
= utime_t(epoch
, nsec
).to_real_time();
55 void RGWOp_MDLog_List::execute() {
56 string period
= s
->info
.args
.get("period");
57 string shard
= s
->info
.args
.get("id");
58 string max_entries_str
= s
->info
.args
.get("max-entries");
59 string st
= s
->info
.args
.get("start-time"),
60 et
= s
->info
.args
.get("end-time"),
61 marker
= s
->info
.args
.get("marker"),
66 unsigned shard_id
, max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
68 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
70 dout(5) << "Error parsing shard_id " << shard
<< dendl
;
75 if (parse_date_str(st
, ut_st
) < 0) {
80 if (parse_date_str(et
, ut_et
) < 0) {
85 if (!max_entries_str
.empty()) {
86 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
88 dout(5) << "Error parsing max-entries " << max_entries_str
<< dendl
;
92 if (max_entries
> LOG_CLASS_LIST_MAX_ENTRIES
) {
93 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
98 ldout(s
->cct
, 5) << "Missing period id trying to use current" << dendl
;
99 period
= store
->svc()->zone
->get_current_period_id();
100 if (period
.empty()) {
101 ldout(s
->cct
, 5) << "Missing period id" << dendl
;
107 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
109 meta_log
.init_list_entries(shard_id
, ut_st
, ut_et
, marker
, &handle
);
111 http_ret
= meta_log
.list_entries(handle
, max_entries
, entries
,
112 &last_marker
, &truncated
);
114 meta_log
.complete_list_entries(handle
);
117 void RGWOp_MDLog_List::send_response() {
118 set_req_state_err(s
, http_ret
);
125 s
->formatter
->open_object_section("log_entries");
126 s
->formatter
->dump_string("marker", last_marker
);
127 s
->formatter
->dump_bool("truncated", truncated
);
129 s
->formatter
->open_array_section("entries");
130 for (list
<cls_log_entry
>::iterator iter
= entries
.begin();
131 iter
!= entries
.end(); ++iter
) {
132 cls_log_entry
& entry
= *iter
;
133 store
->ctl()->meta
.mgr
->dump_log_entry(entry
, s
->formatter
);
136 s
->formatter
->close_section();
138 s
->formatter
->close_section();
142 void RGWOp_MDLog_Info::execute() {
143 num_objects
= s
->cct
->_conf
->rgw_md_log_max_shards
;
144 period
= store
->svc()->mdlog
->read_oldest_log_period();
145 http_ret
= period
.get_error();
148 void RGWOp_MDLog_Info::send_response() {
149 set_req_state_err(s
, http_ret
);
153 s
->formatter
->open_object_section("mdlog");
154 s
->formatter
->dump_unsigned("num_objects", num_objects
);
156 s
->formatter
->dump_string("period", period
.get_period().get_id());
157 s
->formatter
->dump_unsigned("realm_epoch", period
.get_epoch());
159 s
->formatter
->close_section();
163 void RGWOp_MDLog_ShardInfo::execute() {
164 string period
= s
->info
.args
.get("period");
165 string shard
= s
->info
.args
.get("id");
168 unsigned shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
170 dout(5) << "Error parsing shard_id " << shard
<< dendl
;
175 if (period
.empty()) {
176 ldout(s
->cct
, 5) << "Missing period id trying to use current" << dendl
;
177 period
= store
->svc()->zone
->get_current_period_id();
179 if (period
.empty()) {
180 ldout(s
->cct
, 5) << "Missing period id" << dendl
;
185 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
187 http_ret
= meta_log
.get_info(shard_id
, &info
);
190 void RGWOp_MDLog_ShardInfo::send_response() {
191 set_req_state_err(s
, http_ret
);
195 encode_json("info", info
, s
->formatter
);
199 void RGWOp_MDLog_Delete::execute() {
200 string st
= s
->info
.args
.get("start-time"),
201 et
= s
->info
.args
.get("end-time"),
202 start_marker
= s
->info
.args
.get("start-marker"),
203 end_marker
= s
->info
.args
.get("end-marker"),
204 period
= s
->info
.args
.get("period"),
205 shard
= s
->info
.args
.get("id"),
213 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
215 dout(5) << "Error parsing shard_id " << shard
<< dendl
;
219 if (et
.empty() && end_marker
.empty()) { /* bounding end */
224 if (parse_date_str(st
, ut_st
) < 0) {
229 if (parse_date_str(et
, ut_et
) < 0) {
234 if (period
.empty()) {
235 ldout(s
->cct
, 5) << "Missing period id trying to use current" << dendl
;
236 period
= store
->svc()->zone
->get_current_period_id();
238 if (period
.empty()) {
239 ldout(s
->cct
, 5) << "Missing period id" << dendl
;
244 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
246 http_ret
= meta_log
.trim(shard_id
, ut_st
, ut_et
, start_marker
, end_marker
);
249 void RGWOp_MDLog_Lock::execute() {
250 string period
, shard_id_str
, duration_str
, locker_id
, zone_id
;
255 period
= s
->info
.args
.get("period");
256 shard_id_str
= s
->info
.args
.get("id");
257 duration_str
= s
->info
.args
.get("length");
258 locker_id
= s
->info
.args
.get("locker-id");
259 zone_id
= s
->info
.args
.get("zone-id");
261 if (period
.empty()) {
262 ldout(s
->cct
, 5) << "Missing period id trying to use current" << dendl
;
263 period
= store
->svc()->zone
->get_current_period_id();
266 if (period
.empty() ||
267 shard_id_str
.empty() ||
268 (duration_str
.empty()) ||
271 dout(5) << "Error invalid parameter list" << dendl
;
277 shard_id
= (unsigned)strict_strtol(shard_id_str
.c_str(), 10, &err
);
279 dout(5) << "Error parsing shard_id param " << shard_id_str
<< dendl
;
284 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
286 dur
= (unsigned)strict_strtol(duration_str
.c_str(), 10, &err
);
287 if (!err
.empty() || dur
<= 0) {
288 dout(5) << "invalid length param " << duration_str
<< dendl
;
292 http_ret
= meta_log
.lock_exclusive(shard_id
, make_timespan(dur
), zone_id
,
294 if (http_ret
== -EBUSY
)
295 http_ret
= -ERR_LOCKED
;
298 void RGWOp_MDLog_Unlock::execute() {
299 string period
, shard_id_str
, locker_id
, zone_id
;
304 period
= s
->info
.args
.get("period");
305 shard_id_str
= s
->info
.args
.get("id");
306 locker_id
= s
->info
.args
.get("locker-id");
307 zone_id
= s
->info
.args
.get("zone-id");
309 if (period
.empty()) {
310 ldout(s
->cct
, 5) << "Missing period id trying to use current" << dendl
;
311 period
= store
->svc()->zone
->get_current_period_id();
314 if (period
.empty() ||
315 shard_id_str
.empty() ||
318 dout(5) << "Error invalid parameter list" << dendl
;
324 shard_id
= (unsigned)strict_strtol(shard_id_str
.c_str(), 10, &err
);
326 dout(5) << "Error parsing shard_id param " << shard_id_str
<< dendl
;
331 RGWMetadataLog meta_log
{s
->cct
, store
->svc()->zone
, store
->svc()->cls
, period
};
332 http_ret
= meta_log
.unlock(shard_id
, zone_id
, locker_id
);
335 void RGWOp_MDLog_Notify::execute() {
336 #define LARGE_ENOUGH_BUF (128 * 1024)
340 std::tie(r
, data
) = rgw_rest_read_all_input(s
, LARGE_ENOUGH_BUF
);
346 char* buf
= data
.c_str();
347 ldout(s
->cct
, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
350 r
= p
.parse(buf
, data
.length());
352 ldout(s
->cct
, 0) << "ERROR: failed to parse JSON" << dendl
;
357 set
<int> updated_shards
;
359 decode_json_obj(updated_shards
, &p
);
360 } catch (JSONDecoder::err
& err
) {
361 ldout(s
->cct
, 0) << "ERROR: failed to decode JSON" << dendl
;
366 if (store
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
367 for (set
<int>::iterator iter
= updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
368 ldout(s
->cct
, 20) << __func__
<< "(): updated shard=" << *iter
<< dendl
;
372 store
->getRados()->wakeup_meta_sync_shards(updated_shards
);
377 void RGWOp_BILog_List::execute() {
378 string tenant_name
= s
->info
.args
.get("tenant"),
379 bucket_name
= s
->info
.args
.get("bucket"),
380 marker
= s
->info
.args
.get("marker"),
381 max_entries_str
= s
->info
.args
.get("max-entries"),
382 bucket_instance
= s
->info
.args
.get("bucket-instance");
383 RGWBucketInfo bucket_info
;
384 unsigned max_entries
;
386 if (bucket_name
.empty() && bucket_instance
.empty()) {
387 dout(5) << "ERROR: neither bucket nor bucket instance specified" << dendl
;
394 http_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
399 if (!bucket_instance
.empty()) {
400 rgw_bucket
b(rgw_bucket_key(tenant_name
, bn
, bucket_instance
));
401 http_ret
= store
->getRados()->get_bucket_instance_info(*s
->sysobj_ctx
, b
, bucket_info
, NULL
, NULL
, s
->yield
);
403 ldpp_dout(s
, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance
<< dendl
;
406 } else { /* !bucket_name.empty() */
407 http_ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
, bucket_info
, NULL
, s
->yield
, NULL
);
409 ldpp_dout(s
, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
418 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
420 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
424 list
<rgw_bi_log_entry
> entries
;
425 int ret
= store
->svc()->bilog_rados
->log_list(bucket_info
, shard_id
,
426 marker
, max_entries
- count
,
427 entries
, &truncated
);
429 ldpp_dout(s
, 5) << "ERROR: list_bi_log_entries()" << dendl
;
433 count
+= entries
.size();
435 send_response(entries
, marker
);
436 } while (truncated
&& count
< max_entries
);
441 void RGWOp_BILog_List::send_response() {
445 set_req_state_err(s
, http_ret
);
454 s
->formatter
->open_array_section("entries");
457 void RGWOp_BILog_List::send_response(list
<rgw_bi_log_entry
>& entries
, string
& marker
)
459 for (list
<rgw_bi_log_entry
>::iterator iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
460 rgw_bi_log_entry
& entry
= *iter
;
461 encode_json("entry", entry
, s
->formatter
);
468 void RGWOp_BILog_List::send_response_end() {
469 s
->formatter
->close_section();
473 void RGWOp_BILog_Info::execute() {
474 string tenant_name
= s
->info
.args
.get("tenant"),
475 bucket_name
= s
->info
.args
.get("bucket"),
476 bucket_instance
= s
->info
.args
.get("bucket-instance");
477 RGWBucketInfo bucket_info
;
479 if (bucket_name
.empty() && bucket_instance
.empty()) {
480 ldpp_dout(s
, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl
;
487 http_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
492 if (!bucket_instance
.empty()) {
493 rgw_bucket
b(rgw_bucket_key(tenant_name
, bn
, bucket_instance
));
494 http_ret
= store
->getRados()->get_bucket_instance_info(*s
->sysobj_ctx
, b
, bucket_info
, NULL
, NULL
, s
->yield
);
496 ldpp_dout(s
, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance
<< dendl
;
499 } else { /* !bucket_name.empty() */
500 http_ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
, bucket_info
, NULL
, s
->yield
, NULL
);
502 ldpp_dout(s
, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
506 map
<RGWObjCategory
, RGWStorageStats
> stats
;
507 int ret
= store
->getRados()->get_bucket_stats(bucket_info
, shard_id
, &bucket_ver
, &master_ver
, stats
, &max_marker
, &syncstopped
);
508 if (ret
< 0 && ret
!= -ENOENT
) {
514 void RGWOp_BILog_Info::send_response() {
515 set_req_state_err(s
, http_ret
);
522 s
->formatter
->open_object_section("info");
523 encode_json("bucket_ver", bucket_ver
, s
->formatter
);
524 encode_json("master_ver", master_ver
, s
->formatter
);
525 encode_json("max_marker", max_marker
, s
->formatter
);
526 encode_json("syncstopped", syncstopped
, s
->formatter
);
527 s
->formatter
->close_section();
532 void RGWOp_BILog_Delete::execute() {
533 string tenant_name
= s
->info
.args
.get("tenant"),
534 bucket_name
= s
->info
.args
.get("bucket"),
535 start_marker
= s
->info
.args
.get("start-marker"),
536 end_marker
= s
->info
.args
.get("end-marker"),
537 bucket_instance
= s
->info
.args
.get("bucket-instance");
539 RGWBucketInfo bucket_info
;
542 if ((bucket_name
.empty() && bucket_instance
.empty()) ||
543 end_marker
.empty()) {
544 ldpp_dout(s
, 5) << "ERROR: one of bucket and bucket instance, and also end-marker is mandatory" << dendl
;
551 http_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
556 if (!bucket_instance
.empty()) {
557 rgw_bucket
b(rgw_bucket_key(tenant_name
, bn
, bucket_instance
));
558 http_ret
= store
->getRados()->get_bucket_instance_info(*s
->sysobj_ctx
, b
, bucket_info
, NULL
, NULL
, s
->yield
);
560 ldpp_dout(s
, 5) << "could not get bucket instance info for bucket instance id=" << bucket_instance
<< dendl
;
563 } else { /* !bucket_name.empty() */
564 http_ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
, bucket_info
, NULL
, s
->yield
, NULL
);
566 ldpp_dout(s
, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
570 http_ret
= store
->svc()->bilog_rados
->log_trim(bucket_info
, shard_id
, start_marker
, end_marker
);
572 ldpp_dout(s
, 5) << "ERROR: trim_bi_log_entries() " << dendl
;
577 void RGWOp_DATALog_List::execute() {
578 string shard
= s
->info
.args
.get("id");
580 string st
= s
->info
.args
.get("start-time"),
581 et
= s
->info
.args
.get("end-time"),
582 max_entries_str
= s
->info
.args
.get("max-entries"),
583 marker
= s
->info
.args
.get("marker"),
587 unsigned shard_id
, max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
589 s
->info
.args
.get_bool("extra-info", &extra_info
, false);
591 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
593 dout(5) << "Error parsing shard_id " << shard
<< dendl
;
598 if (parse_date_str(st
, ut_st
) < 0) {
603 if (parse_date_str(et
, ut_et
) < 0) {
608 if (!max_entries_str
.empty()) {
609 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
611 dout(5) << "Error parsing max-entries " << max_entries_str
<< dendl
;
615 if (max_entries
> LOG_CLASS_LIST_MAX_ENTRIES
) {
616 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
620 // Note that last_marker is updated to be the marker of the last
622 http_ret
= store
->svc()->datalog_rados
->list_entries(shard_id
, ut_st
, ut_et
,
623 max_entries
, entries
, marker
,
624 &last_marker
, &truncated
);
627 void RGWOp_DATALog_List::send_response() {
628 set_req_state_err(s
, http_ret
);
635 s
->formatter
->open_object_section("log_entries");
636 s
->formatter
->dump_string("marker", last_marker
);
637 s
->formatter
->dump_bool("truncated", truncated
);
639 s
->formatter
->open_array_section("entries");
640 for (list
<rgw_data_change_log_entry
>::iterator iter
= entries
.begin();
641 iter
!= entries
.end(); ++iter
) {
642 rgw_data_change_log_entry
& entry
= *iter
;
644 encode_json("entry", entry
.entry
, s
->formatter
);
646 encode_json("entry", entry
, s
->formatter
);
650 s
->formatter
->close_section();
652 s
->formatter
->close_section();
657 void RGWOp_DATALog_Info::execute() {
658 num_objects
= s
->cct
->_conf
->rgw_data_log_num_shards
;
662 void RGWOp_DATALog_Info::send_response() {
663 set_req_state_err(s
, http_ret
);
667 s
->formatter
->open_object_section("num_objects");
668 s
->formatter
->dump_unsigned("num_objects", num_objects
);
669 s
->formatter
->close_section();
673 void RGWOp_DATALog_ShardInfo::execute() {
674 string shard
= s
->info
.args
.get("id");
677 unsigned shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
679 dout(5) << "Error parsing shard_id " << shard
<< dendl
;
684 http_ret
= store
->svc()->datalog_rados
->get_info(shard_id
, &info
);
687 void RGWOp_DATALog_ShardInfo::send_response() {
688 set_req_state_err(s
, http_ret
);
692 encode_json("info", info
, s
->formatter
);
696 void RGWOp_DATALog_Notify::execute() {
697 string source_zone
= s
->info
.args
.get("source-zone");
698 #define LARGE_ENOUGH_BUF (128 * 1024)
702 std::tie(r
, data
) = rgw_rest_read_all_input(s
, LARGE_ENOUGH_BUF
);
708 char* buf
= data
.c_str();
709 ldout(s
->cct
, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
712 r
= p
.parse(buf
, data
.length());
714 ldout(s
->cct
, 0) << "ERROR: failed to parse JSON" << dendl
;
719 map
<int, set
<string
> > updated_shards
;
721 decode_json_obj(updated_shards
, &p
);
722 } catch (JSONDecoder::err
& err
) {
723 ldout(s
->cct
, 0) << "ERROR: failed to decode JSON" << dendl
;
728 if (store
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
729 for (map
<int, set
<string
> >::iterator iter
= updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
730 ldout(s
->cct
, 20) << __func__
<< "(): updated shard=" << iter
->first
<< dendl
;
731 set
<string
>& keys
= iter
->second
;
732 for (set
<string
>::iterator kiter
= keys
.begin(); kiter
!= keys
.end(); ++kiter
) {
733 ldout(s
->cct
, 20) << __func__
<< "(): modified key=" << *kiter
<< dendl
;
738 store
->getRados()->wakeup_data_sync_shards(source_zone
, updated_shards
);
743 void RGWOp_DATALog_Delete::execute() {
744 string st
= s
->info
.args
.get("start-time"),
745 et
= s
->info
.args
.get("end-time"),
746 start_marker
= s
->info
.args
.get("start-marker"),
747 end_marker
= s
->info
.args
.get("end-marker"),
748 shard
= s
->info
.args
.get("id"),
756 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
758 dout(5) << "Error parsing shard_id " << shard
<< dendl
;
762 if (et
.empty() && end_marker
.empty()) { /* bounding end */
767 if (parse_date_str(st
, ut_st
) < 0) {
772 if (parse_date_str(et
, ut_et
) < 0) {
777 http_ret
= store
->svc()->datalog_rados
->trim_entries(shard_id
, ut_st
, ut_et
, start_marker
, end_marker
);
780 // not in header to avoid pulling in rgw_sync.h
781 class RGWOp_MDLog_Status
: public RGWRESTOp
{
782 rgw_meta_sync_status status
;
784 int check_caps(const RGWUserCaps
& caps
) override
{
785 return caps
.check_cap("mdlog", RGW_CAP_READ
);
787 int verify_permission() override
{
788 return check_caps(s
->user
->get_caps());
790 void execute() override
;
791 void send_response() override
;
792 const char* name() const override
{ return "get_metadata_log_status"; }
795 void RGWOp_MDLog_Status::execute()
797 auto sync
= store
->getRados()->get_meta_sync_manager();
798 if (sync
== nullptr) {
799 ldout(s
->cct
, 1) << "no sync manager" << dendl
;
803 http_ret
= sync
->read_sync_status(&status
);
806 void RGWOp_MDLog_Status::send_response()
808 set_req_state_err(s
, http_ret
);
813 encode_json("status", status
, s
->formatter
);
818 // not in header to avoid pulling in rgw_data_sync.h
819 class RGWOp_BILog_Status
: public RGWRESTOp
{
820 std::vector
<rgw_bucket_shard_sync_info
> status
;
822 int check_caps(const RGWUserCaps
& caps
) override
{
823 return caps
.check_cap("bilog", RGW_CAP_READ
);
825 int verify_permission() override
{
826 return check_caps(s
->user
->get_caps());
828 void execute() override
;
829 void send_response() override
;
830 const char* name() const override
{ return "get_bucket_index_log_status"; }
833 void RGWOp_BILog_Status::execute()
835 const auto options
= s
->info
.args
.get("options");
836 bool merge
= (options
== "merge");
837 const auto source_zone
= s
->info
.args
.get("source-zone");
838 const auto source_key
= s
->info
.args
.get("source-bucket");
839 auto key
= s
->info
.args
.get("bucket");
844 ldpp_dout(s
, 4) << "no 'bucket' provided" << dendl
;
850 int shard_id
{-1}; // unused
851 http_ret
= rgw_bucket_parse_bucket_key(s
->cct
, key
, &bucket
, &shard_id
);
853 ldpp_dout(s
, 4) << "invalid 'bucket' provided" << dendl
;
858 // read the bucket instance info for num_shards
859 auto ctx
= store
->svc()->sysobj
->init_obj_ctx();
861 http_ret
= store
->getRados()->get_bucket_instance_info(ctx
, bucket
, info
, nullptr, nullptr, s
->yield
);
863 ldpp_dout(s
, 4) << "failed to read bucket info: " << cpp_strerror(http_ret
) << dendl
;
867 rgw_bucket source_bucket
;
869 if (source_key
.empty() ||
871 source_bucket
= info
.bucket
;
873 http_ret
= rgw_bucket_parse_bucket_key(s
->cct
, source_key
, &source_bucket
, nullptr);
875 ldpp_dout(s
, 4) << "invalid 'source-bucket' provided (key=" << source_key
<< ")" << dendl
;
880 const auto& local_zone_id
= store
->svc()->zone
->zone_id();
883 rgw_sync_bucket_pipe pipe
;
884 pipe
.source
.zone
= source_zone
;
885 pipe
.source
.bucket
= source_bucket
;
886 pipe
.dest
.zone
= local_zone_id
;
887 pipe
.dest
.bucket
= info
.bucket
;
889 ldout(s
->cct
, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe
<< dendl
;
891 http_ret
= rgw_bucket_sync_status(this, store
, pipe
, info
, nullptr, &status
);
894 lderr(s
->cct
) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe
<< " returned ret=" << http_ret
<< dendl
;
899 rgw_zone_id
source_zone_id(source_zone
);
901 RGWBucketSyncPolicyHandlerRef source_handler
;
902 http_ret
= store
->ctl()->bucket
->get_sync_policy_handler(source_zone_id
, source_bucket
, &source_handler
, null_yield
);
904 lderr(s
->cct
) << "could not get bucket sync policy handler (r=" << http_ret
<< ")" << dendl
;
908 auto local_dests
= source_handler
->get_all_dests_in_zone(local_zone_id
);
910 std::vector
<rgw_bucket_shard_sync_info
> current_status
;
911 for (auto& entry
: local_dests
) {
912 auto pipe
= entry
.second
;
914 ldout(s
->cct
, 20) << "RGWOp_BILog_Status::execute(): getting sync status for pipe=" << pipe
<< dendl
;
916 RGWBucketInfo
*pinfo
= &info
;
917 std::optional
<RGWBucketInfo
> opt_dest_info
;
919 if (!pipe
.dest
.bucket
) {
920 /* Uh oh, something went wrong */
921 ldout(s
->cct
, 20) << "ERROR: RGWOp_BILog_Status::execute(): BUG: pipe.dest.bucket was not initialized" << pipe
<< dendl
;
926 if (*pipe
.dest
.bucket
!= info
.bucket
) {
927 opt_dest_info
.emplace();
928 pinfo
= &(*opt_dest_info
);
930 /* dest bucket might not have a bucket id */
931 http_ret
= store
->ctl()->bucket
->read_bucket_info(*pipe
.dest
.bucket
,
934 RGWBucketCtl::BucketInstance::GetParams(),
937 ldpp_dout(s
, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(http_ret
) << dendl
;
941 pipe
.dest
.bucket
= pinfo
->bucket
;
944 int r
= rgw_bucket_sync_status(this, store
, pipe
, *pinfo
, &info
, ¤t_status
);
946 lderr(s
->cct
) << "ERROR: rgw_bucket_sync_status() on pipe=" << pipe
<< " returned ret=" << r
<< dendl
;
951 if (status
.empty()) {
952 status
= std::move(current_status
);
954 if (current_status
.size() !=
957 lderr(s
->cct
) << "ERROR: different number of shards for sync status of buckets syncing from the same source: status.size()= " << status
.size() << " current_status.size()=" << current_status
.size() << dendl
;
960 auto m
= status
.begin();
961 for (auto& cur_shard_status
: current_status
) {
962 auto& result_shard_status
= *m
++;
963 // always take the first marker, or any later marker that's smaller
964 if (cur_shard_status
.inc_marker
.position
< result_shard_status
.inc_marker
.position
) {
965 result_shard_status
= std::move(cur_shard_status
);
972 void RGWOp_BILog_Status::send_response()
974 set_req_state_err(s
, http_ret
);
979 encode_json("status", status
, s
->formatter
);
984 // not in header to avoid pulling in rgw_data_sync.h
985 class RGWOp_DATALog_Status
: public RGWRESTOp
{
986 rgw_data_sync_status status
;
988 int check_caps(const RGWUserCaps
& caps
) override
{
989 return caps
.check_cap("datalog", RGW_CAP_READ
);
991 int verify_permission() override
{
992 return check_caps(s
->user
->get_caps());
994 void execute() override
;
995 void send_response() override
;
996 const char* name() const override
{ return "get_data_changes_log_status"; }
999 void RGWOp_DATALog_Status::execute()
1001 const auto source_zone
= s
->info
.args
.get("source-zone");
1002 auto sync
= store
->getRados()->get_data_sync_manager(source_zone
);
1003 if (sync
== nullptr) {
1004 ldout(s
->cct
, 1) << "no sync manager for source-zone " << source_zone
<< dendl
;
1008 http_ret
= sync
->read_sync_status(&status
);
1011 void RGWOp_DATALog_Status::send_response()
1013 set_req_state_err(s
, http_ret
);
1017 if (http_ret
>= 0) {
1018 encode_json("status", status
, s
->formatter
);
1024 RGWOp
*RGWHandler_Log::op_get() {
1026 string type
= s
->info
.args
.get("type", &exists
);
1032 if (type
.compare("metadata") == 0) {
1033 if (s
->info
.args
.exists("id")) {
1034 if (s
->info
.args
.exists("info")) {
1035 return new RGWOp_MDLog_ShardInfo
;
1037 return new RGWOp_MDLog_List
;
1039 } else if (s
->info
.args
.exists("status")) {
1040 return new RGWOp_MDLog_Status
;
1042 return new RGWOp_MDLog_Info
;
1044 } else if (type
.compare("bucket-index") == 0) {
1045 if (s
->info
.args
.exists("info")) {
1046 return new RGWOp_BILog_Info
;
1047 } else if (s
->info
.args
.exists("status")) {
1048 return new RGWOp_BILog_Status
;
1050 return new RGWOp_BILog_List
;
1052 } else if (type
.compare("data") == 0) {
1053 if (s
->info
.args
.exists("id")) {
1054 if (s
->info
.args
.exists("info")) {
1055 return new RGWOp_DATALog_ShardInfo
;
1057 return new RGWOp_DATALog_List
;
1059 } else if (s
->info
.args
.exists("status")) {
1060 return new RGWOp_DATALog_Status
;
1062 return new RGWOp_DATALog_Info
;
1068 RGWOp
*RGWHandler_Log::op_delete() {
1070 string type
= s
->info
.args
.get("type", &exists
);
1076 if (type
.compare("metadata") == 0)
1077 return new RGWOp_MDLog_Delete
;
1078 else if (type
.compare("bucket-index") == 0)
1079 return new RGWOp_BILog_Delete
;
1080 else if (type
.compare("data") == 0)
1081 return new RGWOp_DATALog_Delete
;
1085 RGWOp
*RGWHandler_Log::op_post() {
1087 string type
= s
->info
.args
.get("type", &exists
);
1093 if (type
.compare("metadata") == 0) {
1094 if (s
->info
.args
.exists("lock"))
1095 return new RGWOp_MDLog_Lock
;
1096 else if (s
->info
.args
.exists("unlock"))
1097 return new RGWOp_MDLog_Unlock
;
1098 else if (s
->info
.args
.exists("notify"))
1099 return new RGWOp_MDLog_Notify
;
1100 } else if (type
.compare("data") == 0) {
1101 if (s
->info
.args
.exists("notify"))
1102 return new RGWOp_DATALog_Notify
;