1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2013 eNovance SAS <licensing@enovance.com>
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
16 #include "common/ceph_json.h"
17 #include "common/strtol.h"
20 #include "rgw_rest_s3.h"
21 #include "rgw_rest_log.h"
22 #include "rgw_client_io.h"
24 #include "rgw_data_sync.h"
25 #include "rgw_common.h"
27 #include "rgw_mdlog.h"
28 #include "rgw_datalog_notify.h"
29 #include "rgw_trim_bilog.h"
31 #include "services/svc_zone.h"
32 #include "services/svc_mdlog.h"
33 #include "services/svc_bilog_rados.h"
35 #include "common/errno.h"
36 #include "include/ceph_assert.h"
38 #define dout_context g_ceph_context
39 #define LOG_CLASS_LIST_MAX_ENTRIES (1000)
40 #define dout_subsys ceph_subsys_rgw
44 void RGWOp_MDLog_List::execute(optional_yield y
) {
45 string period
= s
->info
.args
.get("period");
46 string shard
= s
->info
.args
.get("id");
47 string max_entries_str
= s
->info
.args
.get("max-entries");
48 string marker
= s
->info
.args
.get("marker"),
51 unsigned shard_id
, max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
53 if (s
->info
.args
.exists("start-time") ||
54 s
->info
.args
.exists("end-time")) {
55 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
60 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
62 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
67 if (!max_entries_str
.empty()) {
68 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
70 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str
<< dendl
;
74 if (max_entries
> LOG_CLASS_LIST_MAX_ENTRIES
) {
75 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
80 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
81 period
= driver
->get_zone()->get_current_period_id();
83 ldpp_dout(this, 5) << "Missing period id" << dendl
;
89 RGWMetadataLog meta_log
{s
->cct
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->zone
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->cls
, period
};
91 meta_log
.init_list_entries(shard_id
, {}, {}, marker
, &handle
);
93 op_ret
= meta_log
.list_entries(this, handle
, max_entries
, entries
,
94 &last_marker
, &truncated
);
96 meta_log
.complete_list_entries(handle
);
99 void RGWOp_MDLog_List::send_response() {
100 set_req_state_err(s
, op_ret
);
107 s
->formatter
->open_object_section("log_entries");
108 s
->formatter
->dump_string("marker", last_marker
);
109 s
->formatter
->dump_bool("truncated", truncated
);
111 s
->formatter
->open_array_section("entries");
112 for (list
<cls_log_entry
>::iterator iter
= entries
.begin();
113 iter
!= entries
.end(); ++iter
) {
114 cls_log_entry
& entry
= *iter
;
115 static_cast<rgw::sal::RadosStore
*>(driver
)->ctl()->meta
.mgr
->dump_log_entry(entry
, s
->formatter
);
118 s
->formatter
->close_section();
120 s
->formatter
->close_section();
124 void RGWOp_MDLog_Info::execute(optional_yield y
) {
125 num_objects
= s
->cct
->_conf
->rgw_md_log_max_shards
;
126 period
= static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->mdlog
->read_oldest_log_period(y
, s
);
127 op_ret
= period
.get_error();
130 void RGWOp_MDLog_Info::send_response() {
131 set_req_state_err(s
, op_ret
);
135 s
->formatter
->open_object_section("mdlog");
136 s
->formatter
->dump_unsigned("num_objects", num_objects
);
138 s
->formatter
->dump_string("period", period
.get_period().get_id());
139 s
->formatter
->dump_unsigned("realm_epoch", period
.get_epoch());
141 s
->formatter
->close_section();
145 void RGWOp_MDLog_ShardInfo::execute(optional_yield y
) {
146 string period
= s
->info
.args
.get("period");
147 string shard
= s
->info
.args
.get("id");
150 unsigned shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
152 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
157 if (period
.empty()) {
158 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
159 period
= driver
->get_zone()->get_current_period_id();
161 if (period
.empty()) {
162 ldpp_dout(this, 5) << "Missing period id" << dendl
;
167 RGWMetadataLog meta_log
{s
->cct
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->zone
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->cls
, period
};
169 op_ret
= meta_log
.get_info(this, shard_id
, &info
);
172 void RGWOp_MDLog_ShardInfo::send_response() {
173 set_req_state_err(s
, op_ret
);
177 encode_json("info", info
, s
->formatter
);
181 void RGWOp_MDLog_Delete::execute(optional_yield y
) {
182 string marker
= s
->info
.args
.get("marker"),
183 period
= s
->info
.args
.get("period"),
184 shard
= s
->info
.args
.get("id"),
189 if (s
->info
.args
.exists("start-time") ||
190 s
->info
.args
.exists("end-time")) {
191 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
195 if (s
->info
.args
.exists("start-marker")) {
196 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl
;
200 if (s
->info
.args
.exists("end-marker")) {
201 if (!s
->info
.args
.exists("marker")) {
202 marker
= s
->info
.args
.get("end-marker");
204 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl
;
211 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
213 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
218 if (marker
.empty()) { /* bounding end */
223 if (period
.empty()) {
224 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
225 period
= driver
->get_zone()->get_current_period_id();
227 if (period
.empty()) {
228 ldpp_dout(this, 5) << "Missing period id" << dendl
;
233 RGWMetadataLog meta_log
{s
->cct
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->zone
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->cls
, period
};
235 op_ret
= meta_log
.trim(this, shard_id
, {}, {}, {}, marker
);
238 void RGWOp_MDLog_Lock::execute(optional_yield y
) {
239 string period
, shard_id_str
, duration_str
, locker_id
, zone_id
;
244 period
= s
->info
.args
.get("period");
245 shard_id_str
= s
->info
.args
.get("id");
246 duration_str
= s
->info
.args
.get("length");
247 locker_id
= s
->info
.args
.get("locker-id");
248 zone_id
= s
->info
.args
.get("zone-id");
250 if (period
.empty()) {
251 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
252 period
= driver
->get_zone()->get_current_period_id();
255 if (period
.empty() ||
256 shard_id_str
.empty() ||
257 (duration_str
.empty()) ||
260 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl
;
266 shard_id
= (unsigned)strict_strtol(shard_id_str
.c_str(), 10, &err
);
268 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str
<< dendl
;
273 RGWMetadataLog meta_log
{s
->cct
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->zone
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->cls
, period
};
275 dur
= (unsigned)strict_strtol(duration_str
.c_str(), 10, &err
);
276 if (!err
.empty() || dur
<= 0) {
277 ldpp_dout(this, 5) << "invalid length param " << duration_str
<< dendl
;
281 op_ret
= meta_log
.lock_exclusive(s
, shard_id
, make_timespan(dur
), zone_id
,
283 if (op_ret
== -EBUSY
)
284 op_ret
= -ERR_LOCKED
;
287 void RGWOp_MDLog_Unlock::execute(optional_yield y
) {
288 string period
, shard_id_str
, locker_id
, zone_id
;
293 period
= s
->info
.args
.get("period");
294 shard_id_str
= s
->info
.args
.get("id");
295 locker_id
= s
->info
.args
.get("locker-id");
296 zone_id
= s
->info
.args
.get("zone-id");
298 if (period
.empty()) {
299 ldpp_dout(this, 5) << "Missing period id trying to use current" << dendl
;
300 period
= driver
->get_zone()->get_current_period_id();
303 if (period
.empty() ||
304 shard_id_str
.empty() ||
307 ldpp_dout(this, 5) << "Error invalid parameter list" << dendl
;
313 shard_id
= (unsigned)strict_strtol(shard_id_str
.c_str(), 10, &err
);
315 ldpp_dout(this, 5) << "Error parsing shard_id param " << shard_id_str
<< dendl
;
320 RGWMetadataLog meta_log
{s
->cct
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->zone
, static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->cls
, period
};
321 op_ret
= meta_log
.unlock(s
, shard_id
, zone_id
, locker_id
);
324 void RGWOp_MDLog_Notify::execute(optional_yield y
) {
325 #define LARGE_ENOUGH_BUF (128 * 1024)
329 std::tie(r
, data
) = read_all_input(s
, LARGE_ENOUGH_BUF
);
335 char* buf
= data
.c_str();
336 ldpp_dout(this, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
339 r
= p
.parse(buf
, data
.length());
341 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl
;
346 set
<int> updated_shards
;
348 decode_json_obj(updated_shards
, &p
);
349 } catch (JSONDecoder::err
& err
) {
350 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl
;
355 if (driver
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
356 for (set
<int>::iterator iter
= updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
357 ldpp_dout(this, 20) << __func__
<< "(): updated shard=" << *iter
<< dendl
;
361 driver
->wakeup_meta_sync_shards(updated_shards
);
366 void RGWOp_BILog_List::execute(optional_yield y
) {
367 bool gen_specified
= false;
368 string tenant_name
= s
->info
.args
.get("tenant"),
369 bucket_name
= s
->info
.args
.get("bucket"),
370 marker
= s
->info
.args
.get("marker"),
371 max_entries_str
= s
->info
.args
.get("max-entries"),
372 bucket_instance
= s
->info
.args
.get("bucket-instance"),
373 gen_str
= s
->info
.args
.get("generation", &gen_specified
),
374 format_version_str
= s
->info
.args
.get("format-ver");
375 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
376 rgw_bucket
b(rgw_bucket_key(tenant_name
, bucket_name
));
378 unsigned max_entries
;
380 if (bucket_name
.empty() && bucket_instance
.empty()) {
381 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl
;
387 std::optional
<uint64_t> gen
;
389 gen
= strict_strtoll(gen_str
.c_str(), 10, &err
);
391 ldpp_dout(s
, 5) << "Error parsing generation param " << gen_str
<< dendl
;
397 if (!format_version_str
.empty()) {
398 format_ver
= strict_strtoll(format_version_str
.c_str(), 10, &err
);
400 ldpp_dout(s
, 5) << "Failed to parse format-ver param: " << format_ver
<< dendl
;
408 op_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
413 if (!bucket_instance
.empty()) {
415 b
.bucket_id
= bucket_instance
;
417 op_ret
= driver
->get_bucket(s
, nullptr, b
, &bucket
, y
);
419 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
423 const auto& logs
= bucket
->get_info().layout
.logs
;
425 ldpp_dout(s
, 5) << "ERROR: bucket=" << bucket_name
<< " has no log layouts" << dendl
;
430 auto log
= std::prev(logs
.end());
432 log
= std::find_if(logs
.begin(), logs
.end(), rgw::matches_gen(*gen
));
433 if (log
== logs
.end()) {
434 ldpp_dout(s
, 5) << "ERROR: no log layout with gen=" << *gen
<< dendl
;
439 if (auto next
= std::next(log
); next
!= logs
.end()) {
440 next_log_layout
= *next
; // get the next log after the current latest
442 auto& log_layout
= *log
; // current log layout for log listing
447 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
449 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
453 list
<rgw_bi_log_entry
> entries
;
454 int ret
= static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->bilog_rados
->log_list(s
, bucket
->get_info(), log_layout
, shard_id
,
455 marker
, max_entries
- count
,
456 entries
, &truncated
);
458 ldpp_dout(this, 5) << "ERROR: list_bi_log_entries()" << dendl
;
462 count
+= entries
.size();
464 send_response(entries
, marker
);
465 } while (truncated
&& count
< max_entries
);
470 void RGWOp_BILog_List::send_response() {
474 set_req_state_err(s
, op_ret
);
483 if (format_ver
>= 2) {
484 s
->formatter
->open_object_section("result");
487 s
->formatter
->open_array_section("entries");
490 void RGWOp_BILog_List::send_response(list
<rgw_bi_log_entry
>& entries
, string
& marker
)
492 for (list
<rgw_bi_log_entry
>::iterator iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
493 rgw_bi_log_entry
& entry
= *iter
;
494 encode_json("entry", entry
, s
->formatter
);
501 void RGWOp_BILog_List::send_response_end() {
502 s
->formatter
->close_section();
504 if (format_ver
>= 2) {
505 encode_json("truncated", truncated
, s
->formatter
);
507 if (next_log_layout
) {
508 s
->formatter
->open_object_section("next_log");
509 encode_json("generation", next_log_layout
->gen
, s
->formatter
);
510 encode_json("num_shards", rgw::num_shards(next_log_layout
->layout
.in_index
.layout
), s
->formatter
);
511 s
->formatter
->close_section(); // next_log
514 s
->formatter
->close_section(); // result
520 void RGWOp_BILog_Info::execute(optional_yield y
) {
521 string tenant_name
= s
->info
.args
.get("tenant"),
522 bucket_name
= s
->info
.args
.get("bucket"),
523 bucket_instance
= s
->info
.args
.get("bucket-instance");
524 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
525 rgw_bucket
b(rgw_bucket_key(tenant_name
, bucket_name
));
527 if (bucket_name
.empty() && bucket_instance
.empty()) {
528 ldpp_dout(this, 5) << "ERROR: neither bucket nor bucket instance specified" << dendl
;
535 op_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
540 if (!bucket_instance
.empty()) {
542 b
.bucket_id
= bucket_instance
;
544 op_ret
= driver
->get_bucket(s
, nullptr, b
, &bucket
, y
);
546 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
550 const auto& logs
= bucket
->get_info().layout
.logs
;
552 ldpp_dout(s
, 5) << "ERROR: bucket=" << bucket_name
<< " has no log layouts" << dendl
;
557 map
<RGWObjCategory
, RGWStorageStats
> stats
;
558 const auto& index
= log_to_index_layout(logs
.back());
560 int ret
= bucket
->read_stats(s
, index
, shard_id
, &bucket_ver
, &master_ver
, stats
, &max_marker
, &syncstopped
);
561 if (ret
< 0 && ret
!= -ENOENT
) {
566 oldest_gen
= logs
.front().gen
;
567 latest_gen
= logs
.back().gen
;
569 for (auto& log
: logs
) {
570 uint32_t num_shards
= rgw::num_shards(log
.layout
.in_index
.layout
);
571 generations
.push_back({log
.gen
, num_shards
});
575 void RGWOp_BILog_Info::send_response() {
576 set_req_state_err(s
, op_ret
);
583 s
->formatter
->open_object_section("info");
584 encode_json("bucket_ver", bucket_ver
, s
->formatter
);
585 encode_json("master_ver", master_ver
, s
->formatter
);
586 encode_json("max_marker", max_marker
, s
->formatter
);
587 encode_json("syncstopped", syncstopped
, s
->formatter
);
588 encode_json("oldest_gen", oldest_gen
, s
->formatter
);
589 encode_json("latest_gen", latest_gen
, s
->formatter
);
590 encode_json("generations", generations
, s
->formatter
);
591 s
->formatter
->close_section();
596 void RGWOp_BILog_Delete::execute(optional_yield y
) {
597 bool gen_specified
= false;
598 string tenant_name
= s
->info
.args
.get("tenant"),
599 bucket_name
= s
->info
.args
.get("bucket"),
600 start_marker
= s
->info
.args
.get("start-marker"),
601 end_marker
= s
->info
.args
.get("end-marker"),
602 bucket_instance
= s
->info
.args
.get("bucket-instance"),
603 gen_str
= s
->info
.args
.get("generation", &gen_specified
);
605 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
606 rgw_bucket
b(rgw_bucket_key(tenant_name
, bucket_name
));
609 if ((bucket_name
.empty() && bucket_instance
.empty()) ||
610 end_marker
.empty()) {
611 ldpp_dout(this, 5) << "ERROR: one of bucket or bucket instance, and also end-marker is mandatory" << dendl
;
619 gen
= strict_strtoll(gen_str
.c_str(), 10, &err
);
621 ldpp_dout(s
, 5) << "Error parsing generation param " << gen_str
<< dendl
;
629 op_ret
= rgw_bucket_parse_bucket_instance(bucket_instance
, &bn
, &bucket_instance
, &shard_id
);
634 if (!bucket_instance
.empty()) {
636 b
.bucket_id
= bucket_instance
;
638 op_ret
= driver
->get_bucket(s
, nullptr, b
, &bucket
, y
);
640 ldpp_dout(this, 5) << "could not get bucket info for bucket=" << bucket_name
<< dendl
;
644 op_ret
= bilog_trim(this, static_cast<rgw::sal::RadosStore
*>(driver
),
645 bucket
->get_info(), gen
, shard_id
,
646 start_marker
, end_marker
);
648 ldpp_dout(s
, 5) << "bilog_trim failed with op_ret=" << op_ret
<< dendl
;
654 void RGWOp_DATALog_List::execute(optional_yield y
) {
655 string shard
= s
->info
.args
.get("id");
657 string max_entries_str
= s
->info
.args
.get("max-entries"),
658 marker
= s
->info
.args
.get("marker"),
660 unsigned shard_id
, max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
662 if (s
->info
.args
.exists("start-time") ||
663 s
->info
.args
.exists("end-time")) {
664 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
668 s
->info
.args
.get_bool("extra-info", &extra_info
, false);
670 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
672 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
677 if (!max_entries_str
.empty()) {
678 max_entries
= (unsigned)strict_strtol(max_entries_str
.c_str(), 10, &err
);
680 ldpp_dout(this, 5) << "Error parsing max-entries " << max_entries_str
<< dendl
;
684 if (max_entries
> LOG_CLASS_LIST_MAX_ENTRIES
) {
685 max_entries
= LOG_CLASS_LIST_MAX_ENTRIES
;
689 // Note that last_marker is updated to be the marker of the last
691 op_ret
= static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->
692 datalog_rados
->list_entries(this, shard_id
, max_entries
, entries
,
693 marker
, &last_marker
, &truncated
, y
);
696 void RGWOp_DATALog_List::send_response() {
697 set_req_state_err(s
, op_ret
);
704 s
->formatter
->open_object_section("log_entries");
705 s
->formatter
->dump_string("marker", last_marker
);
706 s
->formatter
->dump_bool("truncated", truncated
);
708 s
->formatter
->open_array_section("entries");
709 for (const auto& entry
: entries
) {
711 encode_json("entry", entry
.entry
, s
->formatter
);
713 encode_json("entry", entry
, s
->formatter
);
717 s
->formatter
->close_section();
719 s
->formatter
->close_section();
724 void RGWOp_DATALog_Info::execute(optional_yield y
) {
725 num_objects
= s
->cct
->_conf
->rgw_data_log_num_shards
;
729 void RGWOp_DATALog_Info::send_response() {
730 set_req_state_err(s
, op_ret
);
734 s
->formatter
->open_object_section("num_objects");
735 s
->formatter
->dump_unsigned("num_objects", num_objects
);
736 s
->formatter
->close_section();
740 void RGWOp_DATALog_ShardInfo::execute(optional_yield y
) {
741 string shard
= s
->info
.args
.get("id");
744 unsigned shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
746 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
751 op_ret
= static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->
752 datalog_rados
->get_info(this, shard_id
, &info
, y
);
755 void RGWOp_DATALog_ShardInfo::send_response() {
756 set_req_state_err(s
, op_ret
);
760 encode_json("info", info
, s
->formatter
);
764 void RGWOp_DATALog_Notify::execute(optional_yield y
) {
765 string source_zone
= s
->info
.args
.get("source-zone");
766 #define LARGE_ENOUGH_BUF (128 * 1024)
770 std::tie(r
, data
) = read_all_input(s
, LARGE_ENOUGH_BUF
);
776 char* buf
= data
.c_str();
777 ldpp_dout(this, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
780 r
= p
.parse(buf
, data
.length());
782 ldpp_dout(this, 0) << "ERROR: failed to parse JSON" << dendl
;
787 bc::flat_map
<int, bc::flat_set
<rgw_data_notify_entry
>> updated_shards
;
789 auto decoder
= rgw_data_notify_v1_decoder
{updated_shards
};
790 decode_json_obj(decoder
, &p
);
791 } catch (JSONDecoder::err
& err
) {
792 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl
;
797 if (driver
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
798 for (bc::flat_map
<int, bc::flat_set
<rgw_data_notify_entry
> >::iterator iter
= updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
799 ldpp_dout(this, 20) << __func__
<< "(): updated shard=" << iter
->first
<< dendl
;
800 bc::flat_set
<rgw_data_notify_entry
>& entries
= iter
->second
;
801 for (const auto& [key
, gen
] : entries
) {
802 ldpp_dout(this, 20) << __func__
<< "(): modified key=" << key
803 << " of gen=" << gen
<< dendl
;
808 driver
->wakeup_data_sync_shards(this, source_zone
, updated_shards
);
813 void RGWOp_DATALog_Notify2::execute(optional_yield y
) {
814 string source_zone
= s
->info
.args
.get("source-zone");
815 #define LARGE_ENOUGH_BUF (128 * 1024)
819 std::tie(r
, data
) = rgw_rest_read_all_input(s
, LARGE_ENOUGH_BUF
);
825 char* buf
= data
.c_str();
826 ldout(s
->cct
, 20) << __func__
<< "(): read data: " << buf
<< dendl
;
829 r
= p
.parse(buf
, data
.length());
831 ldout(s
->cct
, 0) << "ERROR: failed to parse JSON" << dendl
;
836 bc::flat_map
<int, bc::flat_set
<rgw_data_notify_entry
> > updated_shards
;
838 decode_json_obj(updated_shards
, &p
);
839 } catch (JSONDecoder::err
& err
) {
840 ldpp_dout(this, 0) << "ERROR: failed to decode JSON" << dendl
;
845 if (driver
->ctx()->_conf
->subsys
.should_gather
<ceph_subsys_rgw
, 20>()) {
846 for (bc::flat_map
<int, bc::flat_set
<rgw_data_notify_entry
> >::iterator iter
=
847 updated_shards
.begin(); iter
!= updated_shards
.end(); ++iter
) {
848 ldpp_dout(this, 20) << __func__
<< "(): updated shard=" << iter
->first
<< dendl
;
849 bc::flat_set
<rgw_data_notify_entry
>& entries
= iter
->second
;
850 for (const auto& [key
, gen
] : entries
) {
851 ldpp_dout(this, 20) << __func__
<< "(): modified key=" << key
<<
852 " of generation=" << gen
<< dendl
;
857 driver
->wakeup_data_sync_shards(this, source_zone
, updated_shards
);
862 void RGWOp_DATALog_Delete::execute(optional_yield y
) {
863 string marker
= s
->info
.args
.get("marker"),
864 shard
= s
->info
.args
.get("id"),
870 if (s
->info
.args
.exists("start-time") ||
871 s
->info
.args
.exists("end-time")) {
872 ldpp_dout(this, 5) << "start-time and end-time are no longer accepted" << dendl
;
876 if (s
->info
.args
.exists("start-marker")) {
877 ldpp_dout(this, 5) << "start-marker is no longer accepted" << dendl
;
881 if (s
->info
.args
.exists("end-marker")) {
882 if (!s
->info
.args
.exists("marker")) {
883 marker
= s
->info
.args
.get("end-marker");
885 ldpp_dout(this, 5) << "end-marker and marker cannot both be provided" << dendl
;
890 shard_id
= (unsigned)strict_strtol(shard
.c_str(), 10, &err
);
892 ldpp_dout(this, 5) << "Error parsing shard_id " << shard
<< dendl
;
896 if (marker
.empty()) { /* bounding end */
901 op_ret
= static_cast<rgw::sal::RadosStore
*>(driver
)->svc()->
902 datalog_rados
->trim_entries(this, shard_id
, marker
, y
);
905 // not in header to avoid pulling in rgw_sync.h
906 class RGWOp_MDLog_Status
: public RGWRESTOp
{
907 rgw_meta_sync_status status
;
909 int check_caps(const RGWUserCaps
& caps
) override
{
910 return caps
.check_cap("mdlog", RGW_CAP_READ
);
912 int verify_permission(optional_yield
) override
{
913 return check_caps(s
->user
->get_caps());
915 void execute(optional_yield y
) override
;
916 void send_response() override
;
917 const char* name() const override
{ return "get_metadata_log_status"; }
920 void RGWOp_MDLog_Status::execute(optional_yield y
)
922 auto sync
= static_cast<rgw::sal::RadosStore
*>(driver
)->getRados()->get_meta_sync_manager();
923 if (sync
== nullptr) {
924 ldpp_dout(this, 1) << "no sync manager" << dendl
;
928 op_ret
= sync
->read_sync_status(this, &status
);
931 void RGWOp_MDLog_Status::send_response()
933 set_req_state_err(s
, op_ret
);
938 encode_json("status", status
, s
->formatter
);
943 // not in header to avoid pulling in rgw_data_sync.h
944 class RGWOp_BILog_Status
: public RGWRESTOp
{
945 bilog_status_v2 status
;
948 int check_caps(const RGWUserCaps
& caps
) override
{
949 return caps
.check_cap("bilog", RGW_CAP_READ
);
951 int verify_permission(optional_yield y
) override
{
952 return check_caps(s
->user
->get_caps());
954 void execute(optional_yield y
) override
;
955 void send_response() override
;
956 const char* name() const override
{ return "get_bucket_index_log_status"; }
959 void RGWOp_BILog_Status::execute(optional_yield y
)
961 const auto options
= s
->info
.args
.get("options");
962 bool merge
= (options
== "merge");
963 const auto source_zone
= s
->info
.args
.get("source-zone");
964 const auto source_key
= s
->info
.args
.get("source-bucket");
965 auto key
= s
->info
.args
.get("bucket");
966 op_ret
= s
->info
.args
.get_int("version", &version
, 1);
972 ldpp_dout(this, 4) << "no 'bucket' provided" << dendl
;
978 int shard_id
{-1}; // unused
979 op_ret
= rgw_bucket_parse_bucket_key(s
->cct
, key
, &b
, &shard_id
);
981 ldpp_dout(this, 4) << "invalid 'bucket' provided" << dendl
;
986 // read the bucket instance info for num_shards
987 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
988 op_ret
= driver
->get_bucket(s
, nullptr, b
, &bucket
, y
);
990 ldpp_dout(this, 4) << "failed to read bucket info: " << cpp_strerror(op_ret
) << dendl
;
994 rgw_bucket source_bucket
;
996 if (source_key
.empty() ||
998 source_bucket
= bucket
->get_key();
1000 op_ret
= rgw_bucket_parse_bucket_key(s
->cct
, source_key
, &source_bucket
, nullptr);
1002 ldpp_dout(this, 4) << "invalid 'source-bucket' provided (key=" << source_key
<< ")" << dendl
;
1007 const auto& local_zone_id
= driver
->get_zone()->get_id();
1010 rgw_sync_bucket_pipe pipe
;
1011 pipe
.source
.zone
= source_zone
;
1012 pipe
.source
.bucket
= source_bucket
;
1013 pipe
.dest
.zone
= local_zone_id
;
1014 pipe
.dest
.bucket
= bucket
->get_key();
1016 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe
<< dendl
;
1018 op_ret
= rgw_read_bucket_full_sync_status(
1020 static_cast<rgw::sal::RadosStore
*>(driver
),
1022 &status
.sync_status
,
1025 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_full_sync_status() on pipe=" << pipe
<< " returned ret=" << op_ret
<< dendl
;
1028 status
.inc_status
.resize(status
.sync_status
.shards_done_with_gen
.size());
1030 op_ret
= rgw_read_bucket_inc_sync_status(
1032 static_cast<rgw::sal::RadosStore
*>(driver
),
1034 status
.sync_status
.incremental_gen
,
1035 &status
.inc_status
);
1037 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_inc_sync_status() on pipe=" << pipe
<< " returned ret=" << op_ret
<< dendl
;
1042 rgw_zone_id
source_zone_id(source_zone
);
1044 RGWBucketSyncPolicyHandlerRef source_handler
;
1045 op_ret
= driver
->get_sync_policy_handler(s
, source_zone_id
, source_bucket
, &source_handler
, y
);
1047 ldpp_dout(this, -1) << "could not get bucket sync policy handler (r=" << op_ret
<< ")" << dendl
;
1051 auto local_dests
= source_handler
->get_all_dests_in_zone(local_zone_id
);
1053 std::vector
<rgw_bucket_shard_sync_info
> current_status
;
1054 for (auto& entry
: local_dests
) {
1055 auto pipe
= entry
.second
;
1057 ldpp_dout(this, 20) << "RGWOp_BILog_Status::execute(optional_yield y): getting sync status for pipe=" << pipe
<< dendl
;
1059 RGWBucketInfo
*pinfo
= &bucket
->get_info();
1060 std::optional
<RGWBucketInfo
> opt_dest_info
;
1062 if (!pipe
.dest
.bucket
) {
1063 /* Uh oh, something went wrong */
1064 ldpp_dout(this, 20) << "ERROR: RGWOp_BILog_Status::execute(optional_yield y): BUG: pipe.dest.bucket was not initialized" << pipe
<< dendl
;
1069 if (*pipe
.dest
.bucket
!= pinfo
->bucket
) {
1070 opt_dest_info
.emplace();
1071 std::unique_ptr
<rgw::sal::Bucket
> dest_bucket
;
1072 op_ret
= driver
->get_bucket(s
, nullptr, *pipe
.dest
.bucket
, &dest_bucket
, y
);
1074 ldpp_dout(this, 4) << "failed to read target bucket info (bucket=: " << cpp_strerror(op_ret
) << dendl
;
1078 *opt_dest_info
= dest_bucket
->get_info();
1079 pinfo
= &(*opt_dest_info
);
1080 pipe
.dest
.bucket
= pinfo
->bucket
;
1083 op_ret
= rgw_read_bucket_full_sync_status(
1085 static_cast<rgw::sal::RadosStore
*>(driver
),
1087 &status
.sync_status
,
1090 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_full_sync_status() on pipe=" << pipe
<< " returned ret=" << op_ret
<< dendl
;
1094 current_status
.resize(status
.sync_status
.shards_done_with_gen
.size());
1095 int r
= rgw_read_bucket_inc_sync_status(this, static_cast<rgw::sal::RadosStore
*>(driver
),
1096 pipe
, status
.sync_status
.incremental_gen
, ¤t_status
);
1098 ldpp_dout(this, -1) << "ERROR: rgw_read_bucket_inc_sync_status() on pipe=" << pipe
<< " returned ret=" << r
<< dendl
;
1103 if (status
.inc_status
.empty()) {
1104 status
.inc_status
= std::move(current_status
);
1106 if (current_status
.size() != status
.inc_status
.size()) {
1108 ldpp_dout(this, -1) << "ERROR: different number of shards for sync status of buckets "
1109 "syncing from the same source: status.size()= "
1110 << status
.inc_status
.size()
1111 << " current_status.size()="
1112 << current_status
.size() << dendl
;
1115 auto m
= status
.inc_status
.begin();
1116 for (auto& cur_shard_status
: current_status
) {
1117 auto& result_shard_status
= *m
++;
1118 // always take the first marker, or any later marker that's smaller
1119 if (cur_shard_status
.inc_marker
.position
< result_shard_status
.inc_marker
.position
) {
1120 result_shard_status
= std::move(cur_shard_status
);
1127 void RGWOp_BILog_Status::send_response()
1129 set_req_state_err(s
, op_ret
);
1135 encode_json("status", status
.inc_status
, s
->formatter
);
1137 encode_json("status", status
, s
->formatter
);
1143 // not in header to avoid pulling in rgw_data_sync.h
1144 class RGWOp_DATALog_Status
: public RGWRESTOp
{
1145 rgw_data_sync_status status
;
1147 int check_caps(const RGWUserCaps
& caps
) override
{
1148 return caps
.check_cap("datalog", RGW_CAP_READ
);
1150 int verify_permission(optional_yield y
) override
{
1151 return check_caps(s
->user
->get_caps());
1153 void execute(optional_yield y
) override
;
1154 void send_response() override
;
1155 const char* name() const override
{ return "get_data_changes_log_status"; }
1158 void RGWOp_DATALog_Status::execute(optional_yield y
)
1160 const auto source_zone
= s
->info
.args
.get("source-zone");
1161 auto sync
= driver
->get_data_sync_manager(source_zone
);
1162 if (sync
== nullptr) {
1163 ldpp_dout(this, 1) << "no sync manager for source-zone " << source_zone
<< dendl
;
1167 op_ret
= sync
->read_sync_status(this, &status
);
1170 void RGWOp_DATALog_Status::send_response()
1172 set_req_state_err(s
, op_ret
);
1177 encode_json("status", status
, s
->formatter
);
1183 RGWOp
*RGWHandler_Log::op_get() {
1185 string type
= s
->info
.args
.get("type", &exists
);
1191 if (type
.compare("metadata") == 0) {
1192 if (s
->info
.args
.exists("id")) {
1193 if (s
->info
.args
.exists("info")) {
1194 return new RGWOp_MDLog_ShardInfo
;
1196 return new RGWOp_MDLog_List
;
1198 } else if (s
->info
.args
.exists("status")) {
1199 return new RGWOp_MDLog_Status
;
1201 return new RGWOp_MDLog_Info
;
1203 } else if (type
.compare("bucket-index") == 0) {
1204 if (s
->info
.args
.exists("info")) {
1205 return new RGWOp_BILog_Info
;
1206 } else if (s
->info
.args
.exists("status")) {
1207 return new RGWOp_BILog_Status
;
1209 return new RGWOp_BILog_List
;
1211 } else if (type
.compare("data") == 0) {
1212 if (s
->info
.args
.exists("id")) {
1213 if (s
->info
.args
.exists("info")) {
1214 return new RGWOp_DATALog_ShardInfo
;
1216 return new RGWOp_DATALog_List
;
1218 } else if (s
->info
.args
.exists("status")) {
1219 return new RGWOp_DATALog_Status
;
1221 return new RGWOp_DATALog_Info
;
1227 RGWOp
*RGWHandler_Log::op_delete() {
1229 string type
= s
->info
.args
.get("type", &exists
);
1235 if (type
.compare("metadata") == 0)
1236 return new RGWOp_MDLog_Delete
;
1237 else if (type
.compare("bucket-index") == 0)
1238 return new RGWOp_BILog_Delete
;
1239 else if (type
.compare("data") == 0)
1240 return new RGWOp_DATALog_Delete
;
1244 RGWOp
*RGWHandler_Log::op_post() {
1246 string type
= s
->info
.args
.get("type", &exists
);
1252 if (type
.compare("metadata") == 0) {
1253 if (s
->info
.args
.exists("lock"))
1254 return new RGWOp_MDLog_Lock
;
1255 else if (s
->info
.args
.exists("unlock"))
1256 return new RGWOp_MDLog_Unlock
;
1257 else if (s
->info
.args
.exists("notify"))
1258 return new RGWOp_MDLog_Notify
;
1259 } else if (type
.compare("data") == 0) {
1260 if (s
->info
.args
.exists("notify")) {
1261 return new RGWOp_DATALog_Notify
;
1262 } else if (s
->info
.args
.exists("notify2")) {
1263 return new RGWOp_DATALog_Notify2
;