]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_checkpoint.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_sync_checkpoint.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab ft=cpp
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2020 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16#include <fmt/format.h>
17#include "common/errno.h"
18#include "rgw_sync_checkpoint.h"
19#include "rgw_sal_rados.h"
20#include "rgw_bucket_sync.h"
21#include "rgw_data_sync.h"
22#include "rgw_http_errors.h"
23#include "cls/rgw/cls_rgw_client.h"
24#include "services/svc_sys_obj.h"
25#include "services/svc_zone.h"
26#include "rgw_zone.h"
27
28#define dout_subsys ceph_subsys_rgw
29
30namespace {
31
32std::string incremental_marker(const rgw_bucket_shard_sync_info& info)
33{
34 if (info.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
35 return "";
36 }
37 return BucketIndexShardsManager::get_shard_marker(info.inc_marker.position);
38}
39
40bool operator<(const std::vector<rgw_bucket_shard_sync_info>& lhs,
41 const BucketIndexShardsManager& rhs)
42{
43 for (size_t i = 0; i < lhs.size(); ++i) {
44 const auto& l = incremental_marker(lhs[i]);
45 const auto& r = rhs.get(i, "");
46 if (l < r) {
47 return true;
48 }
49 }
50 return false;
51}
52
53bool empty(const BucketIndexShardsManager& markers, int size)
54{
55 for (int i = 0; i < size; ++i) {
56 const auto& m = markers.get(i, "");
57 if (!m.empty()) {
58 return false;
59 }
60 }
61 return true;
62}
63
64std::ostream& operator<<(std::ostream& out, const std::vector<rgw_bucket_shard_sync_info>& rhs)
65{
66 const char* separator = ""; // first entry has no comma
67 out << '[';
68 for (auto& i : rhs) {
69 out << std::exchange(separator, ", ") << incremental_marker(i);
70 }
71 return out << ']';
72}
73
74std::ostream& operator<<(std::ostream& out, const BucketIndexShardsManager& rhs)
75{
76 out << '[';
77 const char* separator = ""; // first entry has no comma
78 for (auto& [i, marker] : rhs.get()) {
79 out << std::exchange(separator, ", ") << marker;
80 }
81 return out << ']';
82}
83
84int bucket_source_sync_checkpoint(const DoutPrefixProvider* dpp,
85 rgw::sal::RGWRadosStore *store,
86 const RGWBucketInfo& bucket_info,
87 const RGWBucketInfo& source_bucket_info,
88 const rgw_sync_bucket_pipe& pipe,
89 const BucketIndexShardsManager& remote_markers,
90 ceph::timespan retry_delay,
91 ceph::coarse_mono_time timeout_at)
92{
93 const auto num_shards = source_bucket_info.layout.current_index.layout.normal.num_shards;
94
95 if (empty(remote_markers, num_shards)) {
96 ldpp_dout(dpp, 1) << "bucket sync caught up with empty source" << dendl;
97 return 0;
98 }
99
100 std::vector<rgw_bucket_shard_sync_info> status;
101 status.resize(std::max<size_t>(1, num_shards));
102 int r = rgw_bucket_sync_status(dpp, store, pipe, bucket_info,
103 &source_bucket_info, &status);
104 if (r < 0) {
105 return r;
106 }
107
108 while (status < remote_markers) {
109 auto delay_until = ceph::coarse_mono_clock::now() + retry_delay;
110 if (delay_until > timeout_at) {
111 ldpp_dout(dpp, 0) << "bucket checkpoint timed out waiting for incremental sync to catch up" << dendl;
112 return -ETIMEDOUT;
113 }
114 ldpp_dout(dpp, 1) << "waiting for incremental sync to catch up:\n"
115 << " local status: " << status << '\n'
116 << " remote markers: " << remote_markers << dendl;
117 std::this_thread::sleep_until(delay_until);
118 r = rgw_bucket_sync_status(dpp, store, pipe, bucket_info, &source_bucket_info, &status);
119 if (r < 0) {
120 return r;
121 }
122 }
123 ldpp_dout(dpp, 1) << "bucket sync caught up with source:\n"
124 << " local status: " << status << '\n'
125 << " remote markers: " << remote_markers << dendl;
126 return 0;
127}
128
b3b6e05e
TL
129int source_bilog_markers(const DoutPrefixProvider *dpp,
130 RGWSI_Zone* zone_svc,
f67539c2
TL
131 const rgw_sync_bucket_pipe& pipe,
132 BucketIndexShardsManager& remote_markers,
133 optional_yield y)
134{
135 ceph_assert(pipe.source.zone);
136
137 auto& zone_conn_map = zone_svc->get_zone_conn_map();
138 auto conn = zone_conn_map.find(pipe.source.zone->id);
139 if (conn == zone_conn_map.end()) {
140 return -EINVAL;
141 }
142
b3b6e05e 143 return rgw_read_remote_bilog_info(dpp, conn->second, *pipe.source.bucket,
f67539c2
TL
144 remote_markers, y);
145}
146
147} // anonymous namespace
148
149int rgw_bucket_sync_checkpoint(const DoutPrefixProvider* dpp,
150 rgw::sal::RGWRadosStore *store,
151 const RGWBucketSyncPolicyHandler& policy,
152 const RGWBucketInfo& info,
153 std::optional<rgw_zone_id> opt_source_zone,
154 std::optional<rgw_bucket> opt_source_bucket,
155 ceph::timespan retry_delay,
156 ceph::coarse_mono_time timeout_at)
157{
158 struct sync_source_entry {
159 rgw_sync_bucket_pipe pipe;
160 BucketIndexShardsManager remote_markers;
161 RGWBucketInfo source_bucket_info;
162 };
163 std::list<sync_source_entry> sources;
164
165 // fetch remote markers and bucket info in parallel
166 boost::asio::io_context ioctx;
167
168 for (const auto& [source_zone_id, pipe] : policy.get_all_sources()) {
169 // filter by source zone/bucket
170 if (opt_source_zone && *opt_source_zone != *pipe.source.zone) {
171 continue;
172 }
173 if (opt_source_bucket && !opt_source_bucket->match(*pipe.source.bucket)) {
174 continue;
175 }
176 auto& entry = sources.emplace_back();
177 entry.pipe = pipe;
178
179 // fetch remote markers
180 spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
181 auto y = optional_yield{ioctx, yield};
b3b6e05e 182 int r = source_bilog_markers(dpp, store->svc()->zone, entry.pipe,
f67539c2
TL
183 entry.remote_markers, y);
184 if (r < 0) {
185 ldpp_dout(dpp, 0) << "failed to fetch remote bilog markers: "
186 << cpp_strerror(r) << dendl;
187 throw std::system_error(-r, std::system_category());
188 }
189 });
190 // fetch source bucket info
191 spawn::spawn(ioctx, [&] (spawn::yield_context yield) {
192 auto y = optional_yield{ioctx, yield};
193 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
194 int r = store->getRados()->get_bucket_instance_info(
195 obj_ctx, *entry.pipe.source.bucket, entry.source_bucket_info,
b3b6e05e 196 nullptr, nullptr, y, dpp);
f67539c2
TL
197 if (r < 0) {
198 ldpp_dout(dpp, 0) << "failed to read source bucket info: "
199 << cpp_strerror(r) << dendl;
200 throw std::system_error(-r, std::system_category());
201 }
202 });
203 }
204
205 try {
206 ioctx.run();
207 } catch (const std::system_error& e) {
208 return -e.code().value();
209 }
210
211 // checkpoint each source sequentially
212 for (const auto& [pipe, remote_markers, source_bucket_info] : sources) {
213 int r = bucket_source_sync_checkpoint(dpp, store, info, source_bucket_info,
214 pipe, remote_markers,
215 retry_delay, timeout_at);
216 if (r < 0) {
217 ldpp_dout(dpp, 0) << "bucket sync checkpoint failed: " << cpp_strerror(r) << dendl;
218 return r;
219 }
220 }
221 ldpp_dout(dpp, 0) << "bucket checkpoint complete" << dendl;
222 return 0;
223}
224