]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/driver/rados/rgw_data_sync.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_data_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/ceph_json.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_zone.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_cr_tools.h"
18 #include "rgw_http_client.h"
19 #include "rgw_bucket.h"
20 #include "rgw_bucket_sync.h"
21 #include "rgw_bucket_sync_cache.h"
22 #include "rgw_datalog.h"
23 #include "rgw_metadata.h"
24 #include "rgw_sync_counters.h"
25 #include "rgw_sync_error_repo.h"
26 #include "rgw_sync_module.h"
27 #include "rgw_sal.h"
28
29 #include "cls/lock/cls_lock_client.h"
30 #include "cls/rgw/cls_rgw_client.h"
31
32 #include "services/svc_zone.h"
33 #include "services/svc_sync_modules.h"
34
35 #include "include/common_fwd.h"
36 #include "include/random.h"
37
38 #include <boost/asio/yield.hpp>
39 #include <string_view>
40
41 #define dout_subsys ceph_subsys_rgw
42
43 #undef dout_prefix
44 #define dout_prefix (*_dout << "data sync: ")
45
46 using namespace std;
47
48 static const string datalog_sync_status_oid_prefix = "datalog.sync-status";
49 static const string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
50 static const string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
51 static const string bucket_full_status_oid_prefix = "bucket.full-sync-status";
52 static const string bucket_status_oid_prefix = "bucket.sync-status";
53 static const string object_status_oid_prefix = "bucket.sync-status";
54
55 void rgw_datalog_info::decode_json(JSONObj *obj) {
56 JSONDecoder::decode_json("num_objects", num_shards, obj);
57 }
58
59 void rgw_datalog_entry::decode_json(JSONObj *obj) {
60 JSONDecoder::decode_json("key", key, obj);
61 utime_t ut;
62 JSONDecoder::decode_json("timestamp", ut, obj);
63 timestamp = ut.to_real_time();
64 }
65
66 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
67 JSONDecoder::decode_json("marker", marker, obj);
68 JSONDecoder::decode_json("truncated", truncated, obj);
69 JSONDecoder::decode_json("entries", entries, obj);
70 };
71
72 // print a bucket shard with [gen]
73 std::string to_string(const rgw_bucket_shard& bs, std::optional<uint64_t> gen)
74 {
75 constexpr auto digits10 = std::numeric_limits<uint64_t>::digits10;
76 constexpr auto reserve = 2 + digits10; // [value]
77 auto str = bs.get_key('/', ':', ':', reserve);
78 str.append(1, '[');
79 str.append(std::to_string(gen.value_or(0)));
80 str.append(1, ']');
81 return str;
82 }
83
84 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
85 static constexpr int MAX_CONCURRENT_SHARDS = 16;
86
87 RGWDataSyncCtx *sc;
88 RGWDataSyncEnv *env;
89 const int num_shards;
90 int shard_id{0};;
91
92 map<uint32_t, rgw_data_sync_marker>& markers;
93 std::vector<RGWObjVersionTracker>& objvs;
94
95 int handle_result(int r) override {
96 if (r == -ENOENT) { // ENOENT is not a fatal error
97 return 0;
98 }
99 if (r < 0) {
100 ldout(cct, 4) << "failed to read data sync status: "
101 << cpp_strerror(r) << dendl;
102 }
103 return r;
104 }
105 public:
106 RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx *sc, int num_shards,
107 map<uint32_t, rgw_data_sync_marker>& markers,
108 std::vector<RGWObjVersionTracker>& objvs)
109 : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS),
110 sc(sc), env(sc->env), num_shards(num_shards), markers(markers), objvs(objvs)
111 {}
112 bool spawn_next() override;
113 };
114
115 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
116 {
117 if (shard_id >= num_shards) {
118 return false;
119 }
120 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
121 spawn(new CR(env->dpp, env->driver,
122 rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
123 &markers[shard_id], true, &objvs[shard_id]),
124 false);
125 shard_id++;
126 return true;
127 }
128
129 class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
130 static constexpr int MAX_CONCURRENT_SHARDS = 16;
131
132 RGWDataSyncCtx *sc;
133 RGWDataSyncEnv *env;
134
135 uint64_t max_entries;
136 int num_shards;
137 int shard_id{0};
138
139 string marker;
140 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
141
142 int handle_result(int r) override {
143 if (r == -ENOENT) { // ENOENT is not a fatal error
144 return 0;
145 }
146 if (r < 0) {
147 ldout(cct, 4) << "failed to list recovering data sync: "
148 << cpp_strerror(r) << dendl;
149 }
150 return r;
151 }
152 public:
153 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncCtx *sc, uint64_t _max_entries, int _num_shards,
154 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
155 : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS), sc(sc), env(sc->env),
156 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
157 {}
158 bool spawn_next() override;
159 };
160
161 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
162 {
163 if (shard_id >= num_shards)
164 return false;
165
166 string error_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id) + ".retry";
167 auto& shard_keys = omapkeys[shard_id];
168 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
169 spawn(new RGWRadosGetOmapKeysCR(env->driver, rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, error_oid),
170 marker, max_entries, shard_keys), false);
171
172 ++shard_id;
173 return true;
174 }
175
176 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
177 RGWDataSyncCtx *sc;
178 RGWDataSyncEnv *sync_env;
179 rgw_data_sync_status *sync_status;
180 RGWObjVersionTracker* objv_tracker;
181 std::vector<RGWObjVersionTracker>& objvs;
182
183 public:
184 RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc,
185 rgw_data_sync_status *_status,
186 RGWObjVersionTracker* objv_tracker,
187 std::vector<RGWObjVersionTracker>& objvs)
188 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status),
189 objv_tracker(objv_tracker), objvs(objvs)
190 {}
191 int operate(const DoutPrefixProvider *dpp) override;
192 };
193
194 int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
195 {
196 reenter(this) {
197 // read sync info
198 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
199 yield {
200 bool empty_on_enoent = false; // fail on ENOENT
201 call(new ReadInfoCR(dpp, sync_env->driver,
202 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
203 &sync_status->sync_info, empty_on_enoent, objv_tracker));
204 }
205 if (retcode < 0) {
206 ldpp_dout(dpp, 4) << "failed to read sync status info with "
207 << cpp_strerror(retcode) << dendl;
208 return set_cr_error(retcode);
209 }
210 // read shard markers
211 objvs.resize(sync_status->sync_info.num_shards);
212 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
213 yield call(new ReadMarkersCR(sc, sync_status->sync_info.num_shards,
214 sync_status->sync_markers, objvs));
215 if (retcode < 0) {
216 ldpp_dout(dpp, 4) << "failed to read sync status markers with "
217 << cpp_strerror(retcode) << dendl;
218 return set_cr_error(retcode);
219 }
220 return set_cr_done();
221 }
222 return 0;
223 }
224
225 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
226 RGWDataSyncCtx *sc;
227 RGWDataSyncEnv *sync_env;
228
229 RGWRESTReadResource *http_op;
230
231 int shard_id;
232 RGWDataChangesLogInfo *shard_info;
233
234 public:
235 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx *_sc,
236 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sc->cct),
237 sc(_sc),
238 sync_env(_sc->env),
239 http_op(NULL),
240 shard_id(_shard_id),
241 shard_info(_shard_info) {
242 }
243
244 ~RGWReadRemoteDataLogShardInfoCR() override {
245 if (http_op) {
246 http_op->put();
247 }
248 }
249
250 int operate(const DoutPrefixProvider *dpp) override {
251 reenter(this) {
252 yield {
253 char buf[16];
254 snprintf(buf, sizeof(buf), "%d", shard_id);
255 rgw_http_param_pair pairs[] = { { "type" , "data" },
256 { "id", buf },
257 { "info" , NULL },
258 { NULL, NULL } };
259
260 string p = "/admin/log/";
261
262 http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
263
264 init_new_io(http_op);
265
266 int ret = http_op->aio_read(dpp);
267 if (ret < 0) {
268 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
269 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
270 return set_cr_error(ret);
271 }
272
273 return io_block(0);
274 }
275 yield {
276 int ret = http_op->wait(shard_info, null_yield);
277 if (ret < 0) {
278 return set_cr_error(ret);
279 }
280 return set_cr_done();
281 }
282 }
283 return 0;
284 }
285 };
286
287 struct read_remote_data_log_response {
288 string marker;
289 bool truncated;
290 vector<rgw_data_change_log_entry> entries;
291
292 read_remote_data_log_response() : truncated(false) {}
293
294 void decode_json(JSONObj *obj) {
295 JSONDecoder::decode_json("marker", marker, obj);
296 JSONDecoder::decode_json("truncated", truncated, obj);
297 JSONDecoder::decode_json("entries", entries, obj);
298 };
299 };
300
301 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
302 RGWDataSyncCtx *sc;
303 RGWDataSyncEnv *sync_env;
304
305 RGWRESTReadResource *http_op = nullptr;
306
307 int shard_id;
308 const std::string& marker;
309 string *pnext_marker;
310 vector<rgw_data_change_log_entry> *entries;
311 bool *truncated;
312
313 read_remote_data_log_response response;
314 std::optional<TOPNSPC::common::PerfGuard> timer;
315
316 public:
317 RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
318 const std::string& marker, string *pnext_marker,
319 vector<rgw_data_change_log_entry> *_entries,
320 bool *_truncated)
321 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
322 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
323 entries(_entries), truncated(_truncated) {
324 }
325 ~RGWReadRemoteDataLogShardCR() override {
326 if (http_op) {
327 http_op->put();
328 }
329 }
330
331 int operate(const DoutPrefixProvider *dpp) override {
332 reenter(this) {
333 yield {
334 char buf[16];
335 snprintf(buf, sizeof(buf), "%d", shard_id);
336 rgw_http_param_pair pairs[] = { { "type" , "data" },
337 { "id", buf },
338 { "marker", marker.c_str() },
339 { "extra-info", "true" },
340 { NULL, NULL } };
341
342 string p = "/admin/log/";
343
344 http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
345
346 init_new_io(http_op);
347
348 if (sync_env->counters) {
349 timer.emplace(sync_env->counters, sync_counters::l_poll);
350 }
351 int ret = http_op->aio_read(dpp);
352 if (ret < 0) {
353 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
354 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
355 if (sync_env->counters) {
356 sync_env->counters->inc(sync_counters::l_poll_err);
357 }
358 return set_cr_error(ret);
359 }
360
361 return io_block(0);
362 }
363 yield {
364 timer.reset();
365 int ret = http_op->wait(&response, null_yield);
366 if (ret < 0) {
367 if (sync_env->counters && ret != -ENOENT) {
368 sync_env->counters->inc(sync_counters::l_poll_err);
369 }
370 return set_cr_error(ret);
371 }
372 entries->clear();
373 entries->swap(response.entries);
374 *pnext_marker = response.marker;
375 *truncated = response.truncated;
376 return set_cr_done();
377 }
378 }
379 return 0;
380 }
381 };
382
383 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
384 RGWDataSyncCtx *sc;
385 RGWDataSyncEnv *sync_env;
386
387 int num_shards;
388 map<int, RGWDataChangesLogInfo> *datalog_info;
389
390 int shard_id;
391 #define READ_DATALOG_MAX_CONCURRENT 10
392
393 int handle_result(int r) override {
394 if (r == -ENOENT) { // ENOENT is not a fatal error
395 return 0;
396 }
397 if (r < 0) {
398 ldout(cct, 4) << "failed to fetch remote datalog info: "
399 << cpp_strerror(r) << dendl;
400 }
401 return r;
402 }
403 public:
404 RGWReadRemoteDataLogInfoCR(RGWDataSyncCtx *_sc,
405 int _num_shards,
406 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sc->cct, READ_DATALOG_MAX_CONCURRENT),
407 sc(_sc), sync_env(_sc->env), num_shards(_num_shards),
408 datalog_info(_datalog_info), shard_id(0) {}
409 bool spawn_next() override;
410 };
411
412 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
413 if (shard_id >= num_shards) {
414 return false;
415 }
416 spawn(new RGWReadRemoteDataLogShardInfoCR(sc, shard_id, &(*datalog_info)[shard_id]), false);
417 shard_id++;
418 return true;
419 }
420
421 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
422 RGWDataSyncCtx *sc;
423 RGWDataSyncEnv *sync_env;
424 RGWRESTReadResource *http_op;
425
426 int shard_id;
427 string marker;
428 uint32_t max_entries;
429 rgw_datalog_shard_data *result;
430
431 public:
432 RGWListRemoteDataLogShardCR(RGWDataSyncCtx *sc, int _shard_id,
433 const string& _marker, uint32_t _max_entries,
434 rgw_datalog_shard_data *_result)
435 : RGWSimpleCoroutine(sc->cct), sc(sc), sync_env(sc->env), http_op(NULL),
436 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
437
438 int send_request(const DoutPrefixProvider *dpp) override {
439 RGWRESTConn *conn = sc->conn;
440
441 char buf[32];
442 snprintf(buf, sizeof(buf), "%d", shard_id);
443
444 char max_entries_buf[32];
445 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
446
447 const char *marker_key = (marker.empty() ? "" : "marker");
448
449 rgw_http_param_pair pairs[] = { { "type", "data" },
450 { "id", buf },
451 { "max-entries", max_entries_buf },
452 { marker_key, marker.c_str() },
453 { NULL, NULL } };
454
455 string p = "/admin/log/";
456
457 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
458 init_new_io(http_op);
459
460 int ret = http_op->aio_read(dpp);
461 if (ret < 0) {
462 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
463 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
464 http_op->put();
465 return ret;
466 }
467
468 return 0;
469 }
470
471 int request_complete() override {
472 int ret = http_op->wait(result, null_yield);
473 http_op->put();
474 if (ret < 0 && ret != -ENOENT) {
475 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
476 return ret;
477 }
478 return 0;
479 }
480 };
481
482 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
483 RGWDataSyncCtx *sc;
484 RGWDataSyncEnv *sync_env;
485
486 map<int, string> shards;
487 int max_entries_per_shard;
488 map<int, rgw_datalog_shard_data> *result;
489
490 map<int, string>::iterator iter;
491 #define READ_DATALOG_MAX_CONCURRENT 10
492
493 int handle_result(int r) override {
494 if (r == -ENOENT) { // ENOENT is not a fatal error
495 return 0;
496 }
497 if (r < 0) {
498 ldout(cct, 4) << "failed to list remote datalog: "
499 << cpp_strerror(r) << dendl;
500 }
501 return r;
502 }
503 public:
504 RGWListRemoteDataLogCR(RGWDataSyncCtx *_sc,
505 map<int, string>& _shards,
506 int _max_entries_per_shard,
507 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sc->cct, READ_DATALOG_MAX_CONCURRENT),
508 sc(_sc), sync_env(_sc->env), max_entries_per_shard(_max_entries_per_shard),
509 result(_result) {
510 shards.swap(_shards);
511 iter = shards.begin();
512 }
513 bool spawn_next() override;
514 };
515
516 bool RGWListRemoteDataLogCR::spawn_next() {
517 if (iter == shards.end()) {
518 return false;
519 }
520
521 spawn(new RGWListRemoteDataLogShardCR(sc, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
522 ++iter;
523 return true;
524 }
525
526 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
527 static constexpr auto lock_name{ "sync_lock"sv };
528 RGWDataSyncCtx* const sc;
529 RGWDataSyncEnv* const sync_env{ sc->env };
530 const uint32_t num_shards;
531 rgw_data_sync_status* const status;
532 RGWSyncTraceNodeRef tn;
533 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
534 RGWObjVersionTracker& objv_tracker;
535 std::vector<RGWObjVersionTracker>& objvs;
536
537 const rgw_pool& pool{ sync_env->svc->zone->get_zone_params().log_pool };
538 const string sync_status_oid{
539 RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) };
540
541 map<int, RGWDataChangesLogInfo> shards_info;
542
543
544 public:
545 RGWInitDataSyncStatusCoroutine(
546 RGWDataSyncCtx* _sc, uint32_t num_shards, uint64_t instance_id,
547 const RGWSyncTraceNodeRef& tn_parent, rgw_data_sync_status* status,
548 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
549 RGWObjVersionTracker& objv_tracker,
550 std::vector<RGWObjVersionTracker>& objvs)
551 : RGWCoroutine(_sc->cct), sc(_sc), num_shards(num_shards), status(status),
552 tn(sync_env->sync_tracer->add_node(tn_parent, "init_data_sync_status")),
553 lease_cr(std::move(lease_cr)), objv_tracker(objv_tracker), objvs(objvs) {
554 status->sync_info.instance_id = instance_id;
555 }
556
557 static auto continuous_lease_cr(RGWDataSyncCtx* const sc,
558 RGWCoroutine* const caller) {
559 auto lock_duration = sc->cct->_conf->rgw_sync_lease_period;
560 return new RGWContinuousLeaseCR(
561 sc->env->async_rados, sc->env->driver,
562 { sc->env->svc->zone->get_zone_params().log_pool,
563 RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) },
564 string(lock_name), lock_duration, caller, &sc->lcc);
565 }
566
567 int operate(const DoutPrefixProvider *dpp) override {
568 int ret;
569 reenter(this) {
570 if (!lease_cr->is_locked()) {
571 drain_all();
572 return set_cr_error(-ECANCELED);
573 }
574
575 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
576 yield call(new WriteInfoCR(dpp, sync_env->driver,
577 rgw_raw_obj{pool, sync_status_oid},
578 status->sync_info, &objv_tracker));
579 if (retcode < 0) {
580 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
581 return set_cr_error(retcode);
582 }
583
584 // In the original code we reacquired the lock. Since
585 // RGWSimpleRadosWriteCR doesn't appear to touch the attributes
586 // and cls_version works across it, this should be unnecessary.
587 // Putting a note here just in case. If we see ECANCELED where
588 // we expect EBUSY, we can revisit this.
589
590 /* fetch current position in logs */
591 yield {
592 RGWRESTConn *conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
593 if (!conn) {
594 tn->log(0, SSTR("ERROR: connection to zone " << sc->source_zone << " does not exist!"));
595 return set_cr_error(-EIO);
596 }
597 for (uint32_t i = 0; i < num_shards; i++) {
598 spawn(new RGWReadRemoteDataLogShardInfoCR(sc, i, &shards_info[i]), true);
599 }
600 }
601 while (collect(&ret, NULL)) {
602 if (ret < 0) {
603 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
604 return set_state(RGWCoroutine_Error);
605 }
606 yield;
607 }
608 yield {
609 objvs.resize(num_shards);
610 for (uint32_t i = 0; i < num_shards; i++) {
611 RGWDataChangesLogInfo& info = shards_info[i];
612 auto& marker = status->sync_markers[i];
613 marker.next_step_marker = info.marker;
614 marker.timestamp = info.last_update;
615 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, i);
616 auto& objv = objvs[i];
617 objv.generate_new_write_ver(cct);
618 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
619 spawn(new WriteMarkerCR(dpp, sync_env->driver,
620 rgw_raw_obj{pool, oid}, marker, &objv), true);
621 }
622 }
623 while (collect(&ret, NULL)) {
624 if (ret < 0) {
625 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
626 return set_state(RGWCoroutine_Error);
627 }
628 yield;
629 }
630
631 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
632 yield call(new WriteInfoCR(dpp, sync_env->driver,
633 rgw_raw_obj{pool, sync_status_oid},
634 status->sync_info, &objv_tracker));
635 if (retcode < 0) {
636 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
637 return set_cr_error(retcode);
638 }
639 return set_cr_done();
640 }
641 return 0;
642 }
643 };
644
645 RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp,
646 rgw::sal::RadosStore* driver,
647 RGWAsyncRadosProcessor *async_rados)
648 : RGWCoroutinesManager(driver->ctx(), driver->getRados()->get_cr_registry()),
649 dpp(dpp), driver(driver),
650 cct(driver->ctx()), cr_registry(driver->getRados()->get_cr_registry()),
651 async_rados(async_rados),
652 http_manager(driver->ctx(), completion_mgr),
653 data_sync_cr(NULL),
654 initialized(false)
655 {
656 }
657
658 int RGWRemoteDataLog::read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info)
659 {
660 rgw_http_param_pair pairs[] = { { "type", "data" },
661 { NULL, NULL } };
662
663 int ret = sc.conn->get_json_resource(dpp, "/admin/log", pairs, null_yield, *log_info);
664 if (ret < 0) {
665 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
666 return ret;
667 }
668
669 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
670
671 return 0;
672 }
673
674 int RGWRemoteDataLog::read_source_log_shards_info(const DoutPrefixProvider *dpp, map<int, RGWDataChangesLogInfo> *shards_info)
675 {
676 rgw_datalog_info log_info;
677 int ret = read_log_info(dpp, &log_info);
678 if (ret < 0) {
679 return ret;
680 }
681
682 return run(dpp, new RGWReadRemoteDataLogInfoCR(&sc, log_info.num_shards, shards_info));
683 }
684
685 int RGWRemoteDataLog::read_source_log_shards_next(const DoutPrefixProvider *dpp, map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
686 {
687 return run(dpp, new RGWListRemoteDataLogCR(&sc, shard_markers, 1, result));
688 }
689
690 int RGWRemoteDataLog::init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
691 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
692 PerfCounters* counters)
693 {
694 sync_env.init(dpp, cct, driver, driver->svc(), async_rados, &http_manager, _error_logger,
695 _sync_tracer, _sync_module, counters);
696 sc.init(&sync_env, _conn, _source_zone);
697
698 if (initialized) {
699 return 0;
700 }
701
702 int ret = http_manager.start();
703 if (ret < 0) {
704 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
705 return ret;
706 }
707
708 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
709
710 initialized = true;
711
712 return 0;
713 }
714
715 void RGWRemoteDataLog::finish()
716 {
717 stop();
718 }
719
720 int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status)
721 {
722 // cannot run concurrently with run_sync(), so run in a separate manager
723 RGWObjVersionTracker objv;
724 std::vector<RGWObjVersionTracker> shard_objvs;
725 RGWCoroutinesManager crs(cct, cr_registry);
726 RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
727 int ret = http_manager.start();
728 if (ret < 0) {
729 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
730 return ret;
731 }
732 RGWDataSyncEnv sync_env_local = sync_env;
733 sync_env_local.http_manager = &http_manager;
734
735 RGWDataSyncCtx sc_local = sc;
736 sc_local.env = &sync_env_local;
737
738 ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status,
739 &objv, shard_objvs));
740 http_manager.stop();
741 return ret;
742 }
743
744 int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, set<int>& recovering_shards)
745 {
746 // cannot run concurrently with run_sync(), so run in a separate manager
747 RGWCoroutinesManager crs(cct, cr_registry);
748 RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
749 int ret = http_manager.start();
750 if (ret < 0) {
751 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
752 return ret;
753 }
754 RGWDataSyncEnv sync_env_local = sync_env;
755 sync_env_local.http_manager = &http_manager;
756
757 RGWDataSyncCtx sc_local = sc;
758 sc_local.env = &sync_env_local;
759
760 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
761 omapkeys.resize(num_shards);
762 uint64_t max_entries{1};
763
764 ret = crs.run(dpp, new RGWReadDataSyncRecoveringShardsCR(&sc_local, max_entries, num_shards, omapkeys));
765 http_manager.stop();
766
767 if (ret == 0) {
768 for (int i = 0; i < num_shards; i++) {
769 if (omapkeys[i]->entries.size() != 0) {
770 recovering_shards.insert(i);
771 }
772 }
773 }
774
775 return ret;
776 }
777
778 namespace RGWRDL {
779 class DataSyncInitCR : public RGWCoroutine {
780 RGWDataSyncCtx* const sc;
781 const uint32_t num_shards;
782 uint64_t instance_id;
783 const RGWSyncTraceNodeRef& tn;
784 rgw_data_sync_status* const sync_status;
785 std::vector<RGWObjVersionTracker>& objvs;
786
787 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
788
789 RGWObjVersionTracker objv_tracker;
790
791 public:
792
793 DataSyncInitCR(RGWDataSyncCtx* sc, uint32_t num_shards, uint64_t instance_id,
794 const RGWSyncTraceNodeRef& tn,
795 rgw_data_sync_status* sync_status,
796 std::vector<RGWObjVersionTracker>& objvs)
797 : RGWCoroutine(sc->cct), sc(sc), num_shards(num_shards),
798 instance_id(instance_id), tn(tn),
799 sync_status(sync_status), objvs(objvs) {}
800
801 ~DataSyncInitCR() override {
802 if (lease_cr) {
803 lease_cr->abort();
804 }
805 }
806
807 int operate(const DoutPrefixProvider *dpp) override {
808 reenter(this) {
809 lease_cr.reset(
810 RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
811
812 yield spawn(lease_cr.get(), false);
813 while (!lease_cr->is_locked()) {
814 if (lease_cr->is_done()) {
815 tn->log(5, "ERROR: failed to take data sync status lease");
816 set_status("lease lock failed, early abort");
817 drain_all();
818 return set_cr_error(lease_cr->get_ret_status());
819 }
820 tn->log(5, "waiting on data sync status lease");
821 yield set_sleeping(true);
822 }
823 tn->log(5, "acquired data sync status lease");
824 objv_tracker.generate_new_write_ver(sc->cct);
825 yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id,
826 tn, sync_status, lease_cr,
827 objv_tracker, objvs));
828 lease_cr->go_down();
829 lease_cr.reset();
830 drain_all();
831 if (retcode < 0) {
832 set_cr_error(retcode);
833 }
834 return set_cr_done();
835 }
836 return 0;
837 }
838 };
839 }
840
841 int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
842 {
843 rgw_data_sync_status sync_status;
844 std::vector<RGWObjVersionTracker> objvs;
845 sync_status.sync_info.num_shards = num_shards;
846
847 RGWCoroutinesManager crs(cct, cr_registry);
848 RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
849 int ret = http_manager.start();
850 if (ret < 0) {
851 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
852 return ret;
853 }
854 RGWDataSyncEnv sync_env_local = sync_env;
855 sync_env_local.http_manager = &http_manager;
856 auto instance_id = ceph::util::generate_random_number<uint64_t>();
857 RGWDataSyncCtx sc_local = sc;
858 sc_local.env = &sync_env_local;
859 ret = crs.run(dpp, new RGWRDL::DataSyncInitCR(&sc_local, num_shards,
860 instance_id, tn, &sync_status, objvs));
861 http_manager.stop();
862 return ret;
863 }
864
865 static string full_data_sync_index_shard_oid(const rgw_zone_id& source_zone, int shard_id)
866 {
867 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.id.size() + 1 + 16];
868 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.id.c_str(), shard_id);
869 return string(buf);
870 }
871
872 struct read_metadata_list {
873 string marker;
874 bool truncated;
875 list<string> keys;
876 int count;
877
878 read_metadata_list() : truncated(false), count(0) {}
879
880 void decode_json(JSONObj *obj) {
881 JSONDecoder::decode_json("marker", marker, obj);
882 JSONDecoder::decode_json("truncated", truncated, obj);
883 JSONDecoder::decode_json("keys", keys, obj);
884 JSONDecoder::decode_json("count", count, obj);
885 }
886 };
887
888 struct bucket_instance_meta_info {
889 string key;
890 obj_version ver;
891 utime_t mtime;
892 RGWBucketInstanceMetadataObject data;
893
894 bucket_instance_meta_info() {}
895
896 void decode_json(JSONObj *obj) {
897 JSONDecoder::decode_json("key", key, obj);
898 JSONDecoder::decode_json("ver", ver, obj);
899 JSONDecoder::decode_json("mtime", mtime, obj);
900 JSONDecoder::decode_json("data", data, obj);
901 }
902 };
903
904 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
905 RGWDataSyncCtx *sc;
906 RGWDataSyncEnv *sync_env;
907 const string instance_key;
908
909 rgw_bucket_index_marker_info *info;
910
911 public:
912 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
913 const rgw_bucket& bucket,
914 rgw_bucket_index_marker_info *_info)
915 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
916 instance_key(bucket.get_key()), info(_info) {}
917
918 int operate(const DoutPrefixProvider *dpp) override {
919 reenter(this) {
920 yield {
921 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
922 { "bucket-instance", instance_key.c_str() },
923 { "info" , NULL },
924 { NULL, NULL } };
925
926 string p = "/admin/log/";
927 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info));
928 }
929 if (retcode < 0) {
930 return set_cr_error(retcode);
931 }
932
933 return set_cr_done();
934 }
935 return 0;
936 }
937 };
938
939
940 class RGWListBucketIndexesCR : public RGWCoroutine {
941 RGWDataSyncCtx *sc;
942 RGWDataSyncEnv *sync_env = sc->env;
943
944 rgw::sal::RadosStore* driver = sync_env->driver;
945
946 rgw_data_sync_status *sync_status;
947 std::vector<RGWObjVersionTracker>& objvs;
948
949 int req_ret = 0;
950 int ret = 0;
951
952 list<string>::iterator iter;
953
954 unique_ptr<RGWShardedOmapCRManager> entries_index;
955 string oid_prefix =
956 datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id;
957
958 string path = "/admin/metadata/bucket.instance";
959 bucket_instance_meta_info meta_info;
960 string key;
961
962 bool failed = false;
963 bool truncated = false;
964 read_metadata_list result;
965
966 public:
967 RGWListBucketIndexesCR(RGWDataSyncCtx* sc,
968 rgw_data_sync_status* sync_status, std::vector<RGWObjVersionTracker>& objvs)
969 : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status), objvs(objvs) {}
970 ~RGWListBucketIndexesCR() override { }
971
972 int operate(const DoutPrefixProvider *dpp) override {
973 reenter(this) {
974 entries_index = std::make_unique<RGWShardedOmapCRManager>(
975 sync_env->async_rados, driver, this,
976 cct->_conf->rgw_data_log_num_shards,
977 sync_env->svc->zone->get_zone_params().log_pool,
978 oid_prefix);
979 yield; // yield so OmapAppendCRs can start
980
981 do {
982 yield {
983 string entrypoint = "/admin/metadata/bucket.instance"s;
984
985 rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
986 {"marker", result.marker.c_str()},
987 {NULL, NULL}};
988
989 call(new RGWReadRESTResourceCR<read_metadata_list>(
990 sync_env->cct, sc->conn, sync_env->http_manager,
991 entrypoint, pairs, &result));
992 }
993 if (retcode < 0) {
994 ldpp_dout(dpp, 0)
995 << "ERROR: failed to fetch metadata for section bucket.instance"
996 << dendl;
997 return set_cr_error(retcode);
998 }
999
1000 for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
1001 ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key="
1002 << *iter << dendl;
1003 key = *iter;
1004
1005 yield {
1006 rgw_http_param_pair pairs[] = {{"key", key.c_str()},
1007 {NULL, NULL}};
1008
1009 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(
1010 sync_env->cct, sc->conn, sync_env->http_manager, path, pairs,
1011 &meta_info));
1012 }
1013 if (retcode < 0) {
1014 ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for key: "
1015 << key << dendl;
1016 return set_cr_error(retcode);
1017 }
1018 // Now that bucket full sync is bucket-wide instead of
1019 // per-shard, we only need to register a single shard of
1020 // each bucket to guarantee that sync will see everything
1021 // that happened before data full sync starts. This also
1022 // means we don't have to care about the bucket's current
1023 // shard count.
1024 yield entries_index->append(
1025 fmt::format("{}:{}", key, 0),
1026 sync_env->svc->datalog_rados->get_log_shard_id(
1027 meta_info.data.get_bucket_info().bucket, 0));
1028 }
1029 truncated = result.truncated;
1030 } while (truncated);
1031
1032 yield {
1033 if (!entries_index->finish()) {
1034 failed = true;
1035 }
1036 }
1037 if (!failed) {
1038 for (auto iter = sync_status->sync_markers.begin();
1039 iter != sync_status->sync_markers.end();
1040 ++iter) {
1041 int shard_id = (int)iter->first;
1042 rgw_data_sync_marker& marker = iter->second;
1043 marker.total_entries = entries_index->get_total_entries(shard_id);
1044 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
1045 dpp, sync_env->driver,
1046 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
1047 RGWDataSyncStatusManager::shard_obj_name(
1048 sc->source_zone, shard_id)),
1049 marker, &objvs[shard_id]),
1050 true);
1051 }
1052 } else {
1053 yield call(sync_env->error_logger->log_error_cr(
1054 dpp, sc->conn->get_remote_id(), "data.init", "",
1055 EIO, string("failed to build bucket instances map")));
1056 }
1057 while (collect(&ret, NULL)) {
1058 if (ret < 0) {
1059 yield call(sync_env->error_logger->log_error_cr(
1060 dpp, sc->conn->get_remote_id(), "data.init", "",
1061 -ret, string("failed to driver sync status: ") +
1062 cpp_strerror(-ret)));
1063 req_ret = ret;
1064 }
1065 yield;
1066 }
1067 drain_all();
1068 if (req_ret < 0) {
1069 yield return set_cr_error(req_ret);
1070 }
1071 yield return set_cr_done();
1072 }
1073 return 0;
1074 }
1075 };
1076
1077 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
1078
1079 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
1080 RGWDataSyncCtx *sc;
1081 RGWDataSyncEnv *sync_env;
1082 string marker_oid;
1083 rgw_data_sync_marker sync_marker;
1084 RGWSyncTraceNodeRef tn;
1085 RGWObjVersionTracker& objv;
1086
1087 public:
1088 RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
1089 const string& _marker_oid,
1090 const rgw_data_sync_marker& _marker,
1091 RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
1092 sc(_sc), sync_env(_sc->env),
1093 marker_oid(_marker_oid),
1094 sync_marker(_marker),
1095 tn(_tn), objv(objv) {}
1096
1097 RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1098 sync_marker.marker = new_marker;
1099 sync_marker.pos = index_pos;
1100 sync_marker.timestamp = timestamp;
1101
1102 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
1103
1104 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
1105 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
1106 sync_marker, &objv);
1107 }
1108
1109 RGWOrderCallCR *allocate_order_control_cr() override {
1110 return new RGWLastCallerWinsCR(sync_env->cct);
1111 }
1112 };
1113
1114 // ostream wrappers to print buckets without copying strings
1115 struct bucket_str {
1116 const rgw_bucket& b;
1117 explicit bucket_str(const rgw_bucket& b) : b(b) {}
1118 };
1119 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
1120 auto& b = rhs.b;
1121 if (!b.tenant.empty()) {
1122 out << b.tenant << '/';
1123 }
1124 out << b.name;
1125 if (!b.bucket_id.empty()) {
1126 out << ':' << b.bucket_id;
1127 }
1128 return out;
1129 }
1130
1131 struct bucket_str_noinstance {
1132 const rgw_bucket& b;
1133 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
1134 };
1135 std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
1136 auto& b = rhs.b;
1137 if (!b.tenant.empty()) {
1138 out << b.tenant << '/';
1139 }
1140 out << b.name;
1141 return out;
1142 }
1143
1144 struct bucket_shard_str {
1145 const rgw_bucket_shard& bs;
1146 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
1147 };
1148 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
1149 auto& bs = rhs.bs;
1150 out << bucket_str{bs.bucket};
1151 if (bs.shard_id >= 0) {
1152 out << ':' << bs.shard_id;
1153 }
1154 return out;
1155 }
1156 #if FMT_VERSION >= 90000
1157 template <> struct fmt::formatter<bucket_shard_str> : fmt::ostream_formatter {};
1158 #endif
1159
1160 struct all_bucket_info {
1161 RGWBucketInfo bucket_info;
1162 map<string, bufferlist> attrs;
1163 };
1164
1165 struct rgw_sync_pipe_info_entity
1166 {
1167 private:
1168 RGWBucketInfo bucket_info;
1169 map<string, bufferlist> bucket_attrs;
1170 bool _has_bucket_info{false};
1171
1172 public:
1173 rgw_zone_id zone;
1174
1175 rgw_sync_pipe_info_entity() {}
1176 rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
1177 std::optional<all_bucket_info>& binfo) {
1178 if (e.zone) {
1179 zone = *e.zone;
1180 }
1181 if (!e.bucket) {
1182 return;
1183 }
1184 if (!binfo ||
1185 binfo->bucket_info.bucket != *e.bucket) {
1186 bucket_info.bucket = *e.bucket;
1187 } else {
1188 set_bucket_info(*binfo);
1189 }
1190 }
1191
1192 void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
1193 if (_has_bucket_info) {
1194 return;
1195 }
1196 if (bucket_info.bucket.name.empty()) {
1197 return;
1198 }
1199
1200 auto iter = buckets_info.find(bucket_info.bucket);
1201 if (iter == buckets_info.end()) {
1202 return;
1203 }
1204
1205 set_bucket_info(iter->second);
1206 }
1207
1208 bool has_bucket_info() const {
1209 return _has_bucket_info;
1210 }
1211
1212 void set_bucket_info(const all_bucket_info& all_info) {
1213 bucket_info = all_info.bucket_info;
1214 bucket_attrs = all_info.attrs;
1215 _has_bucket_info = true;
1216 }
1217
1218 const RGWBucketInfo& get_bucket_info() const {
1219 return bucket_info;
1220 }
1221
1222 const rgw_bucket& get_bucket() const {
1223 return bucket_info.bucket;
1224 }
1225
1226 bool operator<(const rgw_sync_pipe_info_entity& e) const {
1227 if (zone < e.zone) {
1228 return false;
1229 }
1230 if (zone > e.zone) {
1231 return true;
1232 }
1233 return (bucket_info.bucket < e.bucket_info.bucket);
1234 }
1235 };
1236
1237 std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e) {
1238 auto& bucket = e.get_bucket_info().bucket;
1239
1240 out << e.zone << ":" << bucket.get_key();
1241 return out;
1242 }
1243
1244 struct rgw_sync_pipe_handler_info {
1245 RGWBucketSyncFlowManager::pipe_handler handler;
1246 rgw_sync_pipe_info_entity source;
1247 rgw_sync_pipe_info_entity target;
1248
1249 rgw_sync_pipe_handler_info() {}
1250 rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler& _handler,
1251 std::optional<all_bucket_info> source_bucket_info,
1252 std::optional<all_bucket_info> target_bucket_info) : handler(_handler),
1253 source(handler.source, source_bucket_info),
1254 target(handler.dest, target_bucket_info) {
1255 }
1256
1257 bool operator<(const rgw_sync_pipe_handler_info& p) const {
1258 if (source < p.source) {
1259 return true;
1260 }
1261 if (p.source < source) {
1262 return false;
1263 }
1264 return (target < p.target);
1265 }
1266
1267 void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
1268 source.update_empty_bucket_info(buckets_info);
1269 target.update_empty_bucket_info(buckets_info);
1270 }
1271 };
1272
1273 std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_handler_info& p) {
1274 out << p.source << ">" << p.target;
1275 return out;
1276 }
1277
1278 struct rgw_sync_pipe_info_set {
1279 std::set<rgw_sync_pipe_handler_info> handlers;
1280
1281 using iterator = std::set<rgw_sync_pipe_handler_info>::iterator;
1282
1283 void clear() {
1284 handlers.clear();
1285 }
1286
1287 void insert(const RGWBucketSyncFlowManager::pipe_handler& handler,
1288 std::optional<all_bucket_info>& source_bucket_info,
1289 std::optional<all_bucket_info>& target_bucket_info) {
1290 rgw_sync_pipe_handler_info p(handler, source_bucket_info, target_bucket_info);
1291 handlers.insert(p);
1292 }
1293
1294 iterator begin() {
1295 return handlers.begin();
1296 }
1297
1298 iterator end() {
1299 return handlers.end();
1300 }
1301
1302 size_t size() const {
1303 return handlers.size();
1304 }
1305
1306 bool empty() const {
1307 return handlers.empty();
1308 }
1309
1310 void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
1311 if (buckets_info.empty()) {
1312 return;
1313 }
1314
1315 std::set<rgw_sync_pipe_handler_info> p;
1316
1317 for (auto pipe : handlers) {
1318 pipe.update_empty_bucket_info(buckets_info);
1319 p.insert(pipe);
1320 }
1321
1322 handlers = std::move(p);
1323 }
1324 };
1325
1326 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
1327 RGWDataSyncCtx *sc;
1328 RGWDataSyncEnv *sync_env;
1329 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
1330
1331 rgw_sync_pipe_info_set pipes;
1332 rgw_sync_pipe_info_set::iterator siter;
1333
1334 rgw_bucket_sync_pair_info sync_pair;
1335
1336 RGWSyncTraceNodeRef tn;
1337 ceph::real_time* progress;
1338 std::vector<ceph::real_time> shard_progress;
1339 std::vector<ceph::real_time>::iterator cur_shard_progress;
1340
1341 RGWRESTConn *conn{nullptr};
1342 rgw_zone_id last_zone;
1343
1344 std::optional<uint64_t> gen;
1345 rgw_bucket_index_marker_info marker_info;
1346 BucketIndexShardsManager marker_mgr;
1347
1348 public:
1349 RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
1350 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1351 const rgw_bucket_shard& source_bs,
1352 const RGWSyncTraceNodeRef& _tn_parent,
1353 std::optional<uint64_t> gen,
1354 ceph::real_time* progress);
1355
1356 int operate(const DoutPrefixProvider *dpp) override;
1357 };
1358
1359 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1360 RGWDataSyncCtx *sc;
1361 RGWDataSyncEnv *sync_env;
1362 rgw::bucket_sync::Handle state; // cached bucket-shard state
1363 rgw_data_sync_obligation obligation; // input obligation
1364 std::optional<rgw_data_sync_obligation> complete; // obligation to complete
1365 uint32_t obligation_counter = 0;
1366 RGWDataSyncShardMarkerTrack *marker_tracker;
1367 rgw_raw_obj error_repo;
1368 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
1369 RGWSyncTraceNodeRef tn;
1370
1371 ceph::real_time progress;
1372 int sync_status = 0;
1373 public:
1374 RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
1375 rgw_data_sync_obligation _obligation,
1376 RGWDataSyncShardMarkerTrack *_marker_tracker,
1377 const rgw_raw_obj& error_repo,
1378 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1379 const RGWSyncTraceNodeRef& _tn_parent)
1380 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
1381 state(std::move(state)), obligation(std::move(_obligation)),
1382 marker_tracker(_marker_tracker), error_repo(error_repo),
1383 lease_cr(std::move(lease_cr)) {
1384 set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
1385 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", to_string(obligation.bs, obligation.gen));
1386 }
1387
1388 int operate(const DoutPrefixProvider *dpp) override {
1389 reenter(this) {
1390 if (state->obligation) {
1391 // this is already syncing in another DataSyncSingleEntryCR
1392 if (state->obligation->timestamp < obligation.timestamp) {
1393 // cancel existing obligation and overwrite it
1394 tn->log(10, SSTR("canceling existing obligation " << *state->obligation));
1395 complete = std::move(*state->obligation);
1396 *state->obligation = std::move(obligation);
1397 state->counter++;
1398 } else {
1399 // cancel new obligation
1400 tn->log(10, SSTR("canceling new obligation " << obligation));
1401 complete = std::move(obligation);
1402 }
1403 } else {
1404 // start syncing a new obligation
1405 state->obligation = obligation;
1406 obligation_counter = state->counter;
1407 state->counter++;
1408
1409 // loop until the latest obligation is satisfied, because other callers
1410 // may update the obligation while we're syncing
1411 while ((state->obligation->timestamp == ceph::real_time() ||
1412 state->progress_timestamp < state->obligation->timestamp) &&
1413 obligation_counter != state->counter) {
1414 obligation_counter = state->counter;
1415 progress = ceph::real_time{};
1416
1417 ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key.first}
1418 << ' ' << *state->obligation << " progress timestamp " << state->progress_timestamp
1419 << " progress " << progress << dendl;
1420 yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
1421 state->key.first, tn,
1422 state->obligation->gen,
1423 &progress));
1424 if (retcode < 0) {
1425 break;
1426 }
1427 state->progress_timestamp = std::max(progress, state->progress_timestamp);
1428 }
1429 // any new obligations will process themselves
1430 complete = std::move(*state->obligation);
1431 state->obligation.reset();
1432
1433 tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key.first}
1434 << " progress=" << progress << ' ' << complete << " r=" << retcode));
1435 }
1436 sync_status = retcode;
1437
1438 if (sync_status == -ENOENT) {
1439 // this was added when 'tenant/' was added to datalog entries, because
1440 // preexisting tenant buckets could never sync and would stay in the
1441 // error_repo forever
1442 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->bs));
1443 sync_status = 0;
1444 }
1445
1446 if (sync_status < 0) {
1447 // write actual sync failures for 'radosgw-admin sync error list'
1448 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1449 yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data",
1450 to_string(complete->bs, complete->gen),
1451 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1452 if (retcode < 0) {
1453 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
1454 }
1455 }
1456 if (complete->timestamp != ceph::real_time{}) {
1457 tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
1458 yield call(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
1459 rgw::error_repo::encode_key(complete->bs, complete->gen),
1460 complete->timestamp));
1461 if (retcode < 0) {
1462 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
1463 }
1464 }
1465 } else if (complete->retry) {
1466 yield call(rgw::error_repo::remove_cr(sync_env->driver->svc()->rados, error_repo,
1467 rgw::error_repo::encode_key(complete->bs, complete->gen),
1468 complete->timestamp));
1469 if (retcode < 0) {
1470 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1471 << error_repo << " retcode=" << retcode));
1472 }
1473 }
1474 /* FIXME: what do do in case of error */
1475 if (marker_tracker && !complete->marker.empty()) {
1476 /* update marker */
1477 yield call(marker_tracker->finish(complete->marker));
1478 if (retcode < 0) {
1479 return set_cr_error(retcode);
1480 }
1481 }
1482 if (sync_status == 0) {
1483 sync_status = retcode;
1484 }
1485 if (sync_status < 0) {
1486 return set_cr_error(sync_status);
1487 }
1488 return set_cr_done();
1489 }
1490 return 0;
1491 }
1492 };
1493
1494 rgw_raw_obj datalog_oid_for_error_repo(RGWDataSyncCtx *sc, rgw::sal::RadosStore* driver,
1495 rgw_pool& pool, rgw_bucket_shard& bs) {
1496 int datalog_shard = driver->svc()->datalog_rados->choose_oid(bs);
1497 string oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, datalog_shard);
1498 return rgw_raw_obj(pool, oid + ".retry");
1499 }
1500
1501 class RGWDataIncrementalSyncFullObligationCR: public RGWCoroutine {
1502 RGWDataSyncCtx *sc;
1503 RGWDataSyncEnv *sync_env;
1504 rgw_bucket_shard source_bs;
1505 rgw_raw_obj error_repo;
1506 std::string error_marker;
1507 ceph::real_time timestamp;
1508 RGWSyncTraceNodeRef tn;
1509 rgw_bucket_index_marker_info remote_info;
1510 rgw_pool pool;
1511 uint32_t sid;
1512 rgw_bucket_shard bs;
1513 std::vector<store_gen_shards>::const_iterator each;
1514
1515 public:
1516 RGWDataIncrementalSyncFullObligationCR(RGWDataSyncCtx *_sc, rgw_bucket_shard& _source_bs,
1517 const rgw_raw_obj& error_repo, const std::string& _error_marker,
1518 ceph::real_time& _timestamp, RGWSyncTraceNodeRef& _tn)
1519 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source_bs(_source_bs),
1520 error_repo(error_repo), error_marker(_error_marker), timestamp(_timestamp),
1521 tn(sync_env->sync_tracer->add_node(_tn, "error_repo", SSTR(bucket_shard_str(source_bs))))
1522 {}
1523
1524 int operate(const DoutPrefixProvider *dpp) override {
1525 reenter(this) {
1526 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
1527 if (retcode < 0) {
1528 return set_cr_error(retcode);
1529 }
1530
1531 each = remote_info.generations.cbegin();
1532 for (; each != remote_info.generations.cend(); each++) {
1533 for (sid = 0; sid < each->num_shards; sid++) {
1534 bs.bucket = source_bs.bucket;
1535 bs.shard_id = sid;
1536 pool = sync_env->svc->zone->get_zone_params().log_pool;
1537 error_repo = datalog_oid_for_error_repo(sc, sync_env->driver, pool, source_bs);
1538 tn->log(10, SSTR("writing shard_id " << sid << " of gen " << each->gen << " to error repo for retry"));
1539 yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
1540 rgw::error_repo::encode_key(bs, each->gen),
1541 timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
1542 [&](uint64_t stack_id, int ret) {
1543 if (ret < 0) {
1544 retcode = ret;
1545 }
1546 return 0;
1547 });
1548 }
1549 }
1550 drain_all_cb([&](uint64_t stack_id, int ret) {
1551 if (ret < 0) {
1552 tn->log(10, SSTR("writing to error repo returned error: " << ret));
1553 }
1554 return ret;
1555 });
1556
1557 // once everything succeeds, remove the full sync obligation from the error repo
1558 yield call(rgw::error_repo::remove_cr(sync_env->driver->svc()->rados, error_repo,
1559 error_marker, timestamp));
1560 return set_cr_done();
1561 }
1562 return 0;
1563 }
1564 };
1565
1566 RGWCoroutine* data_sync_single_entry(RGWDataSyncCtx *sc, const rgw_bucket_shard& src,
1567 std::optional<uint64_t> gen,
1568 const std::string marker,
1569 ceph::real_time timestamp,
1570 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1571 boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache,
1572 RGWDataSyncShardMarkerTrack* marker_tracker,
1573 rgw_raw_obj error_repo,
1574 RGWSyncTraceNodeRef& tn,
1575 bool retry) {
1576 auto state = bucket_shard_cache->get(src, gen);
1577 auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
1578 return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
1579 &*marker_tracker, error_repo,
1580 lease_cr.get(), tn);
1581 }
1582
1583 static ceph::real_time timestamp_for_bucket_shard(rgw::sal::RadosStore* driver,
1584 const rgw_data_sync_status& sync_status,
1585 const rgw_bucket_shard& bs) {
1586 int datalog_shard = driver->svc()->datalog_rados->choose_oid(bs);
1587 auto status = sync_status.sync_markers.find(datalog_shard);
1588 if (status == sync_status.sync_markers.end()) {
1589 return ceph::real_clock::zero();
1590 }
1591 return status->second.timestamp;
1592 }
1593
1594 class RGWDataFullSyncSingleEntryCR : public RGWCoroutine {
1595 RGWDataSyncCtx *sc;
1596 RGWDataSyncEnv *sync_env;
1597 rgw_pool pool;
1598 rgw_bucket_shard source_bs;
1599 const std::string key;
1600 rgw_data_sync_status sync_status;
1601 rgw_raw_obj error_repo;
1602 ceph::real_time timestamp;
1603 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
1604 boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
1605 RGWDataSyncShardMarkerTrack* marker_tracker;
1606 RGWSyncTraceNodeRef tn;
1607 rgw_bucket_index_marker_info remote_info;
1608 uint32_t sid;
1609 std::vector<store_gen_shards>::iterator each;
1610 uint64_t i{0};
1611 RGWCoroutine* shard_cr = nullptr;
1612 bool first_shard = true;
1613 bool error_inject;
1614
1615 public:
1616 RGWDataFullSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool, const rgw_bucket_shard& _source_bs,
1617 const std::string& _key, const rgw_data_sync_status& sync_status, const rgw_raw_obj& _error_repo,
1618 ceph::real_time _timestamp, boost::intrusive_ptr<const RGWContinuousLeaseCR> _lease_cr,
1619 boost::intrusive_ptr<rgw::bucket_sync::Cache> _bucket_shard_cache,
1620 RGWDataSyncShardMarkerTrack* _marker_tracker,
1621 RGWSyncTraceNodeRef& _tn)
1622 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), pool(_pool), source_bs(_source_bs), key(_key),
1623 error_repo(_error_repo), timestamp(_timestamp), lease_cr(std::move(_lease_cr)),
1624 bucket_shard_cache(_bucket_shard_cache), marker_tracker(_marker_tracker), tn(_tn) {
1625 error_inject = (sync_env->cct->_conf->rgw_sync_data_full_inject_err_probability > 0);
1626 }
1627
1628
1629 int operate(const DoutPrefixProvider *dpp) override {
1630 reenter(this) {
1631 if (error_inject &&
1632 rand() % 10000 < cct->_conf->rgw_sync_data_full_inject_err_probability * 10000.0) {
1633 tn->log(0, SSTR("injecting read bilog info error on key=" << key));
1634 retcode = -ENOENT;
1635 } else {
1636 tn->log(0, SSTR("read bilog info key=" << key));
1637 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, source_bs.bucket, &remote_info));
1638 }
1639
1640 if (retcode < 0) {
1641 tn->log(10, SSTR("full sync: failed to read remote bucket info. Writing "
1642 << source_bs.shard_id << " to error repo for retry"));
1643 yield call(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
1644 rgw::error_repo::encode_key(source_bs, std::nullopt),
1645 timestamp));
1646 if (retcode < 0) {
1647 tn->log(0, SSTR("ERROR: failed to log " << source_bs.shard_id << " in error repo: retcode=" << retcode));
1648 }
1649 yield call(marker_tracker->finish(key));
1650 return set_cr_error(retcode);
1651 }
1652
1653 //wait to sync the first shard of the oldest generation and then sync all other shards.
1654 //if any of the operations fail at any time, write them into error repo for later retry.
1655
1656 each = remote_info.generations.begin();
1657 for (; each != remote_info.generations.end(); each++) {
1658 for (sid = 0; sid < each->num_shards; sid++) {
1659 source_bs.shard_id = sid;
1660 // use the error repo and sync status timestamp from the datalog shard corresponding to source_bs
1661 error_repo = datalog_oid_for_error_repo(sc, sync_env->driver, pool, source_bs);
1662 timestamp = timestamp_for_bucket_shard(sync_env->driver, sync_status, source_bs);
1663 if (retcode < 0) {
1664 tn->log(10, SSTR("Write " << source_bs.shard_id << " to error repo for retry"));
1665 yield_spawn_window(rgw::error_repo::write_cr(sync_env->driver->svc()->rados, error_repo,
1666 rgw::error_repo::encode_key(source_bs, each->gen),
1667 timestamp), sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window), std::nullopt);
1668 } else {
1669 shard_cr = data_sync_single_entry(sc, source_bs, each->gen, key, timestamp,
1670 lease_cr, bucket_shard_cache, nullptr, error_repo, tn, false);
1671 tn->log(10, SSTR("full sync: syncing shard_id " << sid << " of gen " << each->gen));
1672 if (first_shard) {
1673 yield call(shard_cr);
1674 first_shard = false;
1675 } else {
1676 yield_spawn_window(shard_cr, sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
1677 [&](uint64_t stack_id, int ret) {
1678 if (ret < 0) {
1679 retcode = ret;
1680 }
1681 return retcode;
1682 });
1683 }
1684 }
1685 }
1686 drain_all_cb([&](uint64_t stack_id, int ret) {
1687 if (ret < 0) {
1688 retcode = ret;
1689 }
1690 return retcode;
1691 });
1692 }
1693
1694 yield call(marker_tracker->finish(key));
1695 if (retcode < 0) {
1696 return set_cr_error(retcode);
1697 }
1698
1699 return set_cr_done();
1700 }
1701 return 0;
1702 }
1703 };
1704
1705 class RGWDataBaseSyncShardCR : public RGWCoroutine {
1706 protected:
1707 RGWDataSyncCtx *const sc;
1708 const rgw_pool& pool;
1709 const uint32_t shard_id;
1710 rgw_data_sync_marker& sync_marker;
1711 RGWSyncTraceNodeRef tn;
1712 const string& status_oid;
1713 const rgw_raw_obj& error_repo;
1714 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
1715 const rgw_data_sync_status& sync_status;
1716 RGWObjVersionTracker& objv;
1717 boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
1718
1719 std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
1720 RGWRadosGetOmapValsCR::ResultPtr omapvals;
1721 rgw_bucket_shard source_bs;
1722
1723 int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
1724 int ret = rgw_bucket_parse_bucket_key(sc->env->cct, key,
1725 &bs.bucket, &bs.shard_id);
1726 //for the case of num_shards 0, shard_id gets a value of -1
1727 //because of the way bucket instance gets parsed in the absence of shard_id delimiter.
1728 //interpret it as a non-negative value.
1729 if (ret == 0) {
1730 if (bs.shard_id < 0) {
1731 bs.shard_id = 0;
1732 }
1733 }
1734 return ret;
1735 }
1736
1737 RGWDataBaseSyncShardCR(
1738 RGWDataSyncCtx *const _sc, const rgw_pool& pool, const uint32_t shard_id,
1739 rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
1740 const string& status_oid, const rgw_raw_obj& error_repo,
1741 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1742 const rgw_data_sync_status& sync_status,
1743 RGWObjVersionTracker& objv,
1744 const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
1745 : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
1746 sync_marker(sync_marker), tn(tn), status_oid(status_oid),
1747 error_repo(error_repo), lease_cr(std::move(lease_cr)),
1748 sync_status(sync_status), objv(objv),
1749 bucket_shard_cache(bucket_shard_cache) {}
1750 };
1751
1752 class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
1753 static constexpr auto OMAP_GET_MAX_ENTRIES = 100;
1754
1755 string oid;
1756 uint64_t total_entries = 0;
1757 ceph::real_time entry_timestamp;
1758 std::map<std::string, bufferlist> entries;
1759 std::map<std::string, bufferlist>::iterator iter;
1760 string error_marker;
1761
1762 public:
1763
1764 RGWDataFullSyncShardCR(
1765 RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id,
1766 rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
1767 const string& status_oid, const rgw_raw_obj& error_repo,
1768 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1769 const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
1770 const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
1771 : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
1772 status_oid, error_repo, std::move(lease_cr),
1773 sync_status, objv, bucket_shard_cache) {}
1774
1775 int operate(const DoutPrefixProvider *dpp) override {
1776 reenter(this) {
1777 tn->log(10, "start full sync");
1778 oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
1779 marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
1780 total_entries = sync_marker.pos;
1781 entry_timestamp = sync_marker.timestamp; // time when full sync started
1782 do {
1783 if (!lease_cr->is_locked()) {
1784 drain_all();
1785 tn->log(1, "lease is lost, abort");
1786 return set_cr_error(-ECANCELED);
1787 }
1788 omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
1789 yield call(new RGWRadosGetOmapValsCR(sc->env->driver,
1790 rgw_raw_obj(pool, oid),
1791 sync_marker.marker,
1792 OMAP_GET_MAX_ENTRIES, omapvals));
1793 if (retcode < 0) {
1794 drain_all();
1795 return set_cr_error(retcode);
1796 }
1797 entries = std::move(omapvals->entries);
1798 if (entries.size() > 0) {
1799 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1800 }
1801 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1802 iter = entries.begin();
1803 for (; iter != entries.end(); ++iter) {
1804 retcode = parse_bucket_key(iter->first, source_bs);
1805 if (retcode < 0) {
1806 tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
1807 marker_tracker->try_update_high_marker(iter->first, 0,
1808 entry_timestamp);
1809 continue;
1810 }
1811 tn->log(20, SSTR("full sync: " << iter->first));
1812 total_entries++;
1813 if (!marker_tracker->start(iter->first, total_entries,
1814 entry_timestamp)) {
1815 tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first
1816 << ". Duplicate entry?"));
1817 } else {
1818 tn->log(10, SSTR("timestamp for " << iter->first << " is :" << entry_timestamp));
1819 yield_spawn_window(new RGWDataFullSyncSingleEntryCR(
1820 sc, pool, source_bs, iter->first, sync_status,
1821 error_repo, entry_timestamp, lease_cr,
1822 bucket_shard_cache, &*marker_tracker, tn),
1823 sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
1824 std::nullopt);
1825 }
1826 sync_marker.marker = iter->first;
1827 }
1828 } while (omapvals->more);
1829 omapvals.reset();
1830
1831 drain_all();
1832
1833 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1834
1835 /* update marker to reflect we're done with full sync */
1836 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1837 sync_marker.marker = sync_marker.next_step_marker;
1838 sync_marker.next_step_marker.clear();
1839 yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
1840 sc->env->dpp, sc->env->driver,
1841 rgw_raw_obj(pool, status_oid), sync_marker, &objv));
1842 if (retcode < 0) {
1843 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
1844 return set_cr_error(retcode);
1845 }
1846
1847 // clean up full sync index, ignoring errors
1848 yield call(new RGWRadosRemoveCR(sc->env->driver, {pool, oid}));
1849
1850 // transition to incremental sync
1851 return set_cr_done();
1852 }
1853 return 0;
1854 }
1855 };
1856
1857 class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
1858 static constexpr int max_error_entries = 10;
1859 static constexpr uint32_t retry_backoff_secs = 60;
1860
1861 ceph::mutex& inc_lock;
1862 bc::flat_set<rgw_data_notify_entry>& modified_shards;
1863
1864 bc::flat_set<rgw_data_notify_entry> current_modified;
1865 decltype(current_modified)::iterator modified_iter;
1866
1867 ceph::coarse_real_time error_retry_time;
1868 string error_marker;
1869 std::map<std::string, bufferlist> error_entries;
1870 decltype(error_entries)::iterator iter;
1871 ceph::real_time entry_timestamp;
1872 std::optional<uint64_t> gen;
1873
1874 string next_marker;
1875 vector<rgw_data_change_log_entry> log_entries;
1876 decltype(log_entries)::iterator log_iter;
1877 bool truncated = false;
1878 int cbret = 0;
1879
1880 utime_t get_idle_interval() const {
1881 ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
1882 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1883 auto now = ceph::coarse_real_clock::now();
1884 if (error_retry_time > now) {
1885 auto d = error_retry_time - now;
1886 if (interval > d) {
1887 interval = d;
1888 }
1889 }
1890 }
1891 // convert timespan -> time_point -> utime_t
1892 return utime_t(ceph::coarse_real_clock::zero() + interval);
1893 }
1894
1895
1896 public:
1897
1898 RGWDataIncSyncShardCR(
1899 RGWDataSyncCtx *const sc, const rgw_pool& pool, const uint32_t shard_id,
1900 rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
1901 const string& status_oid, const rgw_raw_obj& error_repo,
1902 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1903 const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
1904 const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache,
1905 ceph::mutex& inc_lock,
1906 bc::flat_set<rgw_data_notify_entry>& modified_shards)
1907 : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
1908 status_oid, error_repo, std::move(lease_cr),
1909 sync_status, objv, bucket_shard_cache),
1910 inc_lock(inc_lock), modified_shards(modified_shards) {}
1911
1912 int operate(const DoutPrefixProvider *dpp) override {
1913 reenter(this) {
1914 tn->log(10, "start incremental sync");
1915 marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
1916 do {
1917 if (!lease_cr->is_locked()) {
1918 drain_all();
1919 tn->log(1, "lease is lost, abort");
1920 return set_cr_error(-ECANCELED);
1921 }
1922 {
1923 current_modified.clear();
1924 std::unique_lock il(inc_lock);
1925 current_modified.swap(modified_shards);
1926 il.unlock();
1927 }
1928
1929 if (current_modified.size() > 0) {
1930 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1931 }
1932 /* process out of band updates */
1933 for (modified_iter = current_modified.begin();
1934 modified_iter != current_modified.end();
1935 ++modified_iter) {
1936 if (!lease_cr->is_locked()) {
1937 drain_all();
1938 yield call(marker_tracker->flush());
1939 if (retcode < 0) {
1940 tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
1941 return set_cr_error(retcode);
1942 }
1943 return set_cr_error(-ECANCELED);
1944 }
1945 retcode = parse_bucket_key(modified_iter->key, source_bs);
1946 if (retcode < 0) {
1947 tn->log(1, SSTR("failed to parse bucket shard: "
1948 << modified_iter->key));
1949 continue;
1950 }
1951 tn->log(20, SSTR("received async update notification: "
1952 << modified_iter->key));
1953 spawn(data_sync_single_entry(sc, source_bs, modified_iter->gen, {},
1954 ceph::real_time{}, lease_cr,
1955 bucket_shard_cache, &*marker_tracker,
1956 error_repo, tn, false), false);
1957 }
1958
1959 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1960 /* process bucket shards that previously failed */
1961 omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
1962 yield call(new RGWRadosGetOmapValsCR(sc->env->driver, error_repo,
1963 error_marker, max_error_entries,
1964 omapvals));
1965 error_entries = std::move(omapvals->entries);
1966 tn->log(20, SSTR("read error repo, got " << error_entries.size()
1967 << " entries"));
1968 iter = error_entries.begin();
1969 for (; iter != error_entries.end(); ++iter) {
1970 if (!lease_cr->is_locked()) {
1971 drain_all();
1972 yield call(marker_tracker->flush());
1973 if (retcode < 0) {
1974 tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
1975 return set_cr_error(retcode);
1976 }
1977 return set_cr_error(-ECANCELED);
1978 }
1979 error_marker = iter->first;
1980 entry_timestamp = rgw::error_repo::decode_value(iter->second);
1981 retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
1982 if (retcode == -EINVAL) {
1983 // backward compatibility for string keys that don't encode a gen
1984 retcode = parse_bucket_key(error_marker, source_bs);
1985 }
1986 if (retcode < 0) {
1987 tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
1988 spawn(rgw::error_repo::remove_cr(sc->env->driver->svc()->rados,
1989 error_repo, error_marker,
1990 entry_timestamp),
1991 false);
1992 continue;
1993 }
1994 tn->log(10, SSTR("gen is " << gen));
1995 if (!gen) {
1996 // write all full sync obligations for the bucket to error repo
1997 spawn(new RGWDataIncrementalSyncFullObligationCR(sc, source_bs,
1998 error_repo, error_marker, entry_timestamp, tn), false);
1999 } else {
2000 tn->log(20, SSTR("handle error entry key="
2001 << to_string(source_bs, gen)
2002 << " timestamp=" << entry_timestamp));
2003 spawn(data_sync_single_entry(sc, source_bs, gen, "",
2004 entry_timestamp, lease_cr,
2005 bucket_shard_cache, &*marker_tracker,
2006 error_repo, tn, true), false);
2007 }
2008 }
2009 if (!omapvals->more) {
2010 error_retry_time = ceph::coarse_real_clock::now() +
2011 make_timespan(retry_backoff_secs);
2012 error_marker.clear();
2013 }
2014 }
2015 omapvals.reset();
2016
2017 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker="
2018 << sync_marker.marker));
2019 yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id,
2020 sync_marker.marker,
2021 &next_marker, &log_entries,
2022 &truncated));
2023 if (retcode < 0 && retcode != -ENOENT) {
2024 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret="
2025 << retcode));
2026 drain_all();
2027 return set_cr_error(retcode);
2028 }
2029
2030 if (log_entries.size() > 0) {
2031 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
2032 }
2033
2034 for (log_iter = log_entries.begin();
2035 log_iter != log_entries.end();
2036 ++log_iter) {
2037 if (!lease_cr->is_locked()) {
2038 drain_all();
2039 yield call(marker_tracker->flush());
2040 if (retcode < 0) {
2041 tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
2042 return set_cr_error(retcode);
2043 }
2044 return set_cr_error(-ECANCELED);
2045 }
2046
2047 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
2048 retcode = parse_bucket_key(log_iter->entry.key, source_bs);
2049 if (retcode < 0) {
2050 tn->log(1, SSTR("failed to parse bucket shard: "
2051 << log_iter->entry.key));
2052 marker_tracker->try_update_high_marker(log_iter->log_id, 0,
2053 log_iter->log_timestamp);
2054 continue;
2055 }
2056 if (!marker_tracker->start(log_iter->log_id, 0,
2057 log_iter->log_timestamp)) {
2058 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id
2059 << ". Duplicate entry?"));
2060 } else {
2061 tn->log(1, SSTR("incremental sync on " << log_iter->entry.key << "shard: " << shard_id << "on gen " << log_iter->entry.gen));
2062 yield_spawn_window(data_sync_single_entry(sc, source_bs, log_iter->entry.gen, log_iter->log_id,
2063 log_iter->log_timestamp, lease_cr,bucket_shard_cache,
2064 &*marker_tracker, error_repo, tn, false),
2065 sc->lcc.adj_concurrency(cct->_conf->rgw_data_sync_spawn_window),
2066 [&](uint64_t stack_id, int ret) {
2067 if (ret < 0) {
2068 tn->log(10, SSTR("data_sync_single_entry returned error: " << ret));
2069 cbret = ret;
2070 }
2071 return 0;
2072 });
2073 }
2074 }
2075 if (cbret < 0 ) {
2076 retcode = cbret;
2077 drain_all();
2078 return set_cr_error(retcode);
2079 }
2080
2081 tn->log(20, SSTR("shard_id=" << shard_id <<
2082 " sync_marker="<< sync_marker.marker
2083 << " next_marker=" << next_marker
2084 << " truncated=" << truncated));
2085 if (!next_marker.empty()) {
2086 sync_marker.marker = next_marker;
2087 } else if (!log_entries.empty()) {
2088 sync_marker.marker = log_entries.back().log_id;
2089 }
2090 if (!truncated) {
2091 // we reached the end, wait a while before checking for more
2092 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
2093 yield wait(get_idle_interval());
2094 }
2095 } while (true);
2096 }
2097 return 0;
2098 }
2099 };
2100
2101 class RGWDataSyncShardCR : public RGWCoroutine {
2102 RGWDataSyncCtx *const sc;
2103 const rgw_pool pool;
2104 const uint32_t shard_id;
2105 rgw_data_sync_marker& sync_marker;
2106 rgw_data_sync_status sync_status;
2107 const RGWSyncTraceNodeRef tn;
2108 RGWObjVersionTracker& objv;
2109 bool *reset_backoff;
2110
2111 ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
2112 ceph::condition_variable inc_cond;
2113
2114 RGWDataSyncEnv *const sync_env{ sc->env };
2115
2116 const string status_oid{ RGWDataSyncStatusManager::shard_obj_name(
2117 sc->source_zone, shard_id) };
2118 const rgw_raw_obj error_repo{ pool, status_oid + ".retry" };
2119
2120 // target number of entries to cache before recycling idle ones
2121 static constexpr size_t target_cache_size = 256;
2122 boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache {
2123 rgw::bucket_sync::Cache::create(target_cache_size) };
2124
2125 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2126 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
2127
2128 bc::flat_set<rgw_data_notify_entry> modified_shards;
2129
2130 public:
2131 RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool,
2132 const uint32_t shard_id, rgw_data_sync_marker& marker,
2133 const rgw_data_sync_status& sync_status,
2134 RGWSyncTraceNodeRef& tn, RGWObjVersionTracker& objv, bool *reset_backoff)
2135 : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
2136 sync_marker(marker), sync_status(sync_status), tn(tn),
2137 objv(objv), reset_backoff(reset_backoff) {
2138 set_description() << "data sync shard source_zone=" << sc->source_zone
2139 << " shard_id=" << shard_id;
2140 }
2141
2142 ~RGWDataSyncShardCR() override {
2143 if (lease_cr) {
2144 lease_cr->abort();
2145 }
2146 }
2147
2148 void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& entries) {
2149 std::lock_guard l{inc_lock};
2150 modified_shards.insert(entries.begin(), entries.end());
2151 }
2152
2153 int operate(const DoutPrefixProvider *dpp) override {
2154 reenter(this) {
2155 yield init_lease_cr();
2156 while (!lease_cr->is_locked()) {
2157 if (lease_cr->is_done()) {
2158 tn->log(5, "failed to take lease");
2159 set_status("lease lock failed, early abort");
2160 drain_all();
2161 return set_cr_error(lease_cr->get_ret_status());
2162 }
2163 set_sleeping(true);
2164 yield;
2165 }
2166 *reset_backoff = true;
2167 tn->log(10, "took lease");
2168 /* Reread data sync status to fech latest marker and objv */
2169 objv.clear();
2170 yield call(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
2171 rgw_raw_obj(pool, status_oid),
2172 &sync_marker, true, &objv));
2173 if (retcode < 0) {
2174 lease_cr->go_down();
2175 drain_all();
2176 return set_cr_error(retcode);
2177 }
2178
2179 while (true) {
2180 if (sync_marker.state == rgw_data_sync_marker::FullSync) {
2181 yield call(new RGWDataFullSyncShardCR(sc, pool, shard_id,
2182 sync_marker, tn,
2183 status_oid, error_repo,
2184 lease_cr, sync_status,
2185 objv, bucket_shard_cache));
2186 if (retcode < 0) {
2187 if (retcode != -EBUSY) {
2188 tn->log(10, SSTR("full sync failed (retcode=" << retcode << ")"));
2189 }
2190 lease_cr->go_down();
2191 drain_all();
2192 return set_cr_error(retcode);
2193 }
2194 } else if (sync_marker.state == rgw_data_sync_marker::IncrementalSync) {
2195 yield call(new RGWDataIncSyncShardCR(sc, pool, shard_id,
2196 sync_marker, tn,
2197 status_oid, error_repo,
2198 lease_cr, sync_status,
2199 objv, bucket_shard_cache,
2200 inc_lock, modified_shards));
2201 if (retcode < 0) {
2202 if (retcode != -EBUSY) {
2203 tn->log(10, SSTR("incremental sync failed (retcode=" << retcode
2204 << ")"));
2205 }
2206 lease_cr->go_down();
2207 drain_all();
2208 return set_cr_error(retcode);
2209 }
2210 } else {
2211 lease_cr->go_down();
2212 drain_all();
2213 return set_cr_error(-EIO);
2214 }
2215 }
2216 }
2217 return 0;
2218 }
2219
2220 void init_lease_cr() {
2221 set_status("acquiring sync lock");
2222 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
2223 string lock_name = "sync_lock";
2224 if (lease_cr) {
2225 lease_cr->abort();
2226 }
2227 auto driver = sync_env->driver;
2228 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, driver,
2229 rgw_raw_obj(pool, status_oid),
2230 lock_name, lock_duration, this,
2231 &sc->lcc));
2232 lease_stack.reset(spawn(lease_cr.get(), false));
2233 }
2234 };
2235
2236 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
2237 RGWDataSyncCtx *sc;
2238 RGWDataSyncEnv *sync_env;
2239
2240 rgw_pool pool;
2241
2242 uint32_t shard_id;
2243 rgw_data_sync_marker sync_marker;
2244 rgw_data_sync_status sync_status;
2245
2246 RGWSyncTraceNodeRef tn;
2247 RGWObjVersionTracker& objv;
2248 public:
2249 RGWDataSyncShardControlCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool,
2250 uint32_t _shard_id, rgw_data_sync_marker& _marker,
2251 const rgw_data_sync_status& sync_status,
2252 RGWObjVersionTracker& objv,
2253 RGWSyncTraceNodeRef& _tn_parent)
2254 : RGWBackoffControlCR(_sc->cct, false),
2255 sc(_sc), sync_env(_sc->env),
2256 pool(_pool),
2257 shard_id(_shard_id),
2258 sync_marker(_marker), objv(objv) {
2259 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
2260 }
2261
2262 RGWCoroutine *alloc_cr() override {
2263 return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, objv, backoff_ptr());
2264 }
2265
2266 RGWCoroutine *alloc_finisher_cr() override {
2267 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
2268 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
2269 &sync_marker, true, &objv);
2270 }
2271
2272 void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& keys) {
2273 std::lock_guard l{cr_lock()};
2274
2275 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
2276 if (!cr) {
2277 return;
2278 }
2279
2280 cr->append_modified_shards(keys);
2281 }
2282 };
2283
2284 class RGWDataSyncCR : public RGWCoroutine {
2285 RGWDataSyncCtx *sc;
2286 RGWDataSyncEnv *sync_env;
2287 uint32_t num_shards;
2288
2289 rgw_data_sync_status sync_status;
2290 std::vector<RGWObjVersionTracker> objvs;
2291
2292 ceph::mutex shard_crs_lock =
2293 ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
2294 map<int, RGWDataSyncShardControlCR *> shard_crs;
2295
2296 bool *reset_backoff;
2297
2298 RGWSyncTraceNodeRef tn;
2299
2300 RGWDataSyncModule *data_sync_module{nullptr};
2301
2302 boost::intrusive_ptr<RGWContinuousLeaseCR> init_lease;
2303 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
2304
2305 RGWObjVersionTracker obj_version;
2306 public:
2307 RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
2308 sc(_sc), sync_env(_sc->env),
2309 num_shards(_num_shards),
2310 reset_backoff(_reset_backoff), tn(_tn) {
2311
2312 }
2313
2314 ~RGWDataSyncCR() override {
2315 for (auto iter : shard_crs) {
2316 iter.second->put();
2317 }
2318 if (init_lease) {
2319 init_lease->abort();
2320 }
2321 }
2322
2323 int operate(const DoutPrefixProvider *dpp) override {
2324 reenter(this) {
2325
2326 /* read sync status */
2327 yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status,
2328 &obj_version, objvs));
2329
2330 data_sync_module = sync_env->sync_module->get_data_handler();
2331
2332 if (retcode < 0 && retcode != -ENOENT) {
2333 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
2334 return set_cr_error(retcode);
2335 }
2336
2337 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state !=
2338 rgw_data_sync_info::StateSync) {
2339 init_lease.reset(
2340 RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
2341 yield lease_stack.reset(spawn(init_lease.get(), false));
2342
2343 while (!init_lease->is_locked()) {
2344 if (init_lease->is_done()) {
2345 tn->log(5, "ERROR: failed to take data sync status lease");
2346 set_status("lease lock failed, early abort");
2347 drain_all();
2348 return set_cr_error(init_lease->get_ret_status());
2349 }
2350 tn->log(5, "waiting on data sync status lease");
2351 yield set_sleeping(true);
2352 }
2353 tn->log(5, "acquired data sync status lease");
2354
2355 // Reread sync status now that we've acquired the lock!
2356 obj_version.clear();
2357 yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, &obj_version, objvs));
2358 if (retcode < 0) {
2359 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
2360 return set_cr_error(retcode);
2361 }
2362 }
2363
2364 /* state: init status */
2365 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
2366 tn->log(20, SSTR("init"));
2367 sync_status.sync_info.num_shards = num_shards;
2368 uint64_t instance_id;
2369 instance_id = ceph::util::generate_random_number<uint64_t>();
2370 yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn,
2371 &sync_status, init_lease, obj_version, objvs));
2372 if (retcode < 0) {
2373 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
2374 init_lease->go_down();
2375 drain_all();
2376 return set_cr_error(retcode);
2377 }
2378 // sets state = StateBuildingFullSyncMaps
2379
2380 *reset_backoff = true;
2381 }
2382
2383 data_sync_module->init(sc, sync_status.sync_info.instance_id);
2384
2385 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
2386 tn->log(10, SSTR("building full sync maps"));
2387 /* call sync module init here */
2388 sync_status.sync_info.num_shards = num_shards;
2389 yield call(data_sync_module->init_sync(dpp, sc));
2390 if (retcode < 0) {
2391 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
2392 return set_cr_error(retcode);
2393 }
2394
2395 if (!init_lease->is_locked()) {
2396 init_lease->go_down();
2397 drain_all();
2398 return set_cr_error(-ECANCELED);
2399 }
2400 /* state: building full sync maps */
2401 yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs));
2402 if (retcode < 0) {
2403 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
2404 return set_cr_error(retcode);
2405 }
2406 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
2407
2408 if (!init_lease->is_locked()) {
2409 init_lease->go_down();
2410 drain_all();
2411 return set_cr_error(-ECANCELED);
2412 }
2413 /* update new state */
2414 yield call(set_sync_info_cr());
2415 if (retcode < 0) {
2416 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
2417 return set_cr_error(retcode);
2418 }
2419
2420 *reset_backoff = true;
2421 }
2422
2423 yield call(data_sync_module->start_sync(dpp, sc));
2424 if (retcode < 0) {
2425 tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
2426 return set_cr_error(retcode);
2427 }
2428
2429 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
2430 if (init_lease) {
2431 init_lease->go_down();
2432 drain_all();
2433 init_lease.reset();
2434 lease_stack.reset();
2435 }
2436 yield {
2437 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
2438 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
2439 iter != sync_status.sync_markers.end(); ++iter) {
2440 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sc, sync_env->svc->zone->get_zone_params().log_pool,
2441 iter->first, iter->second, sync_status, objvs[iter->first], tn);
2442 cr->get();
2443 shard_crs_lock.lock();
2444 shard_crs[iter->first] = cr;
2445 shard_crs_lock.unlock();
2446 spawn(cr, true);
2447 }
2448 }
2449 }
2450
2451 return set_cr_done();
2452 }
2453 return 0;
2454 }
2455
2456 RGWCoroutine *set_sync_info_cr() {
2457 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->driver,
2458 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
2459 sync_status.sync_info, &obj_version);
2460 }
2461
2462 void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
2463 std::lock_guard l{shard_crs_lock};
2464 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
2465 if (iter == shard_crs.end()) {
2466 return;
2467 }
2468 iter->second->append_modified_shards(entries);
2469 iter->second->wakeup();
2470 }
2471 };
2472
2473 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
2474 public:
2475 RGWDefaultDataSyncModule() {}
2476
2477 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc,
2478 rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
2479 std::optional<uint64_t> versioned_epoch,
2480 const rgw_zone_set_entry& source_trace_entry,
2481 rgw_zone_set *zones_trace) override;
2482 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
2483 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
2484 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
2485 };
2486
2487 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
2488 RGWDefaultDataSyncModule data_handler;
2489 public:
2490 RGWDefaultSyncModuleInstance() {}
2491 RGWDataSyncModule *get_data_handler() override {
2492 return &data_handler;
2493 }
2494 bool supports_user_writes() override {
2495 return true;
2496 }
2497 };
2498
2499 int RGWDefaultSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
2500 {
2501 instance->reset(new RGWDefaultSyncModuleInstance());
2502 return 0;
2503 }
2504
2505 class RGWUserPermHandler {
2506 friend struct Init;
2507 friend class Bucket;
2508
2509 RGWDataSyncEnv *sync_env;
2510 rgw_user uid;
2511
2512 struct _info {
2513 RGWUserInfo user_info;
2514 rgw::IAM::Environment env;
2515 std::unique_ptr<rgw::auth::Identity> identity;
2516 RGWAccessControlPolicy user_acl;
2517 };
2518
2519 std::shared_ptr<_info> info;
2520
2521 struct Init;
2522
2523 std::shared_ptr<Init> init_action;
2524
2525 struct Init : public RGWGenericAsyncCR::Action {
2526 RGWDataSyncEnv *sync_env;
2527
2528 rgw_user uid;
2529 std::shared_ptr<RGWUserPermHandler::_info> info;
2530
2531 int ret{0};
2532
2533 Init(RGWUserPermHandler *handler) : sync_env(handler->sync_env),
2534 uid(handler->uid),
2535 info(handler->info) {}
2536 int operate() override {
2537 auto user_ctl = sync_env->driver->getRados()->ctl.user;
2538
2539 ret = user_ctl->get_info_by_uid(sync_env->dpp, uid, &info->user_info, null_yield);
2540 if (ret < 0) {
2541 return ret;
2542 }
2543
2544 info->identity = rgw::auth::transform_old_authinfo(sync_env->cct,
2545 uid,
2546 RGW_PERM_FULL_CONTROL,
2547 false, /* system_request? */
2548 TYPE_RGW);
2549
2550 map<string, bufferlist> uattrs;
2551
2552 ret = user_ctl->get_attrs_by_uid(sync_env->dpp, uid, &uattrs, null_yield);
2553 if (ret == 0) {
2554 ret = RGWUserPermHandler::policy_from_attrs(sync_env->cct, uattrs, &info->user_acl);
2555 }
2556 if (ret == -ENOENT) {
2557 info->user_acl.create_default(uid, info->user_info.display_name);
2558 }
2559
2560 return 0;
2561 }
2562 };
2563
2564 public:
2565 RGWUserPermHandler(RGWDataSyncEnv *_sync_env,
2566 const rgw_user& _uid) : sync_env(_sync_env),
2567 uid(_uid) {}
2568
2569 RGWCoroutine *init_cr() {
2570 info = make_shared<_info>();
2571 init_action = make_shared<Init>(this);
2572
2573 return new RGWGenericAsyncCR(sync_env->cct,
2574 sync_env->async_rados,
2575 init_action);
2576 }
2577
2578 class Bucket {
2579 RGWDataSyncEnv *sync_env;
2580 std::shared_ptr<_info> info;
2581 RGWAccessControlPolicy bucket_acl;
2582 std::optional<perm_state> ps;
2583 public:
2584 Bucket() {}
2585
2586 int init(RGWUserPermHandler *handler,
2587 const RGWBucketInfo& bucket_info,
2588 const map<string, bufferlist>& bucket_attrs);
2589
2590 bool verify_bucket_permission(int perm);
2591 bool verify_object_permission(const map<string, bufferlist>& obj_attrs,
2592 int perm);
2593 };
2594
2595 static int policy_from_attrs(CephContext *cct,
2596 const map<string, bufferlist>& attrs,
2597 RGWAccessControlPolicy *acl) {
2598 acl->set_ctx(cct);
2599
2600 auto aiter = attrs.find(RGW_ATTR_ACL);
2601 if (aiter == attrs.end()) {
2602 return -ENOENT;
2603 }
2604 auto iter = aiter->second.begin();
2605 try {
2606 acl->decode(iter);
2607 } catch (buffer::error& err) {
2608 ldout(cct, 0) << "ERROR: " << __func__ << "(): could not decode policy, caught buffer::error" << dendl;
2609 return -EIO;
2610 }
2611
2612 return 0;
2613 }
2614
2615 int init_bucket(const RGWBucketInfo& bucket_info,
2616 const map<string, bufferlist>& bucket_attrs,
2617 Bucket *bs) {
2618 return bs->init(this, bucket_info, bucket_attrs);
2619 }
2620 };
2621
2622 int RGWUserPermHandler::Bucket::init(RGWUserPermHandler *handler,
2623 const RGWBucketInfo& bucket_info,
2624 const map<string, bufferlist>& bucket_attrs)
2625 {
2626 sync_env = handler->sync_env;
2627 info = handler->info;
2628
2629 int r = RGWUserPermHandler::policy_from_attrs(sync_env->cct, bucket_attrs, &bucket_acl);
2630 if (r < 0) {
2631 return r;
2632 }
2633
2634 ps.emplace(sync_env->cct,
2635 info->env,
2636 info->identity.get(),
2637 bucket_info,
2638 info->identity->get_perm_mask(),
2639 false, /* defer to bucket acls */
2640 nullptr, /* referer */
2641 false); /* request_payer */
2642
2643 return 0;
2644 }
2645
2646 bool RGWUserPermHandler::Bucket::verify_bucket_permission(int perm)
2647 {
2648 return verify_bucket_permission_no_policy(sync_env->dpp,
2649 &(*ps),
2650 &info->user_acl,
2651 &bucket_acl,
2652 perm);
2653 }
2654
2655 bool RGWUserPermHandler::Bucket::verify_object_permission(const map<string, bufferlist>& obj_attrs,
2656 int perm)
2657 {
2658 RGWAccessControlPolicy obj_acl;
2659
2660 int r = policy_from_attrs(sync_env->cct, obj_attrs, &obj_acl);
2661 if (r < 0) {
2662 return r;
2663 }
2664
2665 return verify_bucket_permission_no_policy(sync_env->dpp,
2666 &(*ps),
2667 &bucket_acl,
2668 &obj_acl,
2669 perm);
2670 }
2671
2672 class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
2673 rgw_bucket_sync_pipe sync_pipe;
2674
2675 std::shared_ptr<RGWUserPermHandler::Bucket> bucket_perms;
2676 std::optional<rgw_sync_pipe_dest_params> verify_dest_params;
2677
2678 std::optional<ceph::real_time> mtime;
2679 std::optional<string> etag;
2680 std::optional<uint64_t> obj_size;
2681
2682 std::unique_ptr<rgw::auth::Identity> identity;
2683
2684 std::shared_ptr<bool> need_retry;
2685
2686 public:
2687 RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe,
2688 std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms,
2689 std::optional<rgw_sync_pipe_dest_params>&& _verify_dest_params,
2690 std::shared_ptr<bool>& _need_retry) : sync_pipe(_sync_pipe),
2691 bucket_perms(_bucket_perms),
2692 verify_dest_params(std::move(_verify_dest_params)),
2693 need_retry(_need_retry) {
2694 *need_retry = false;
2695 }
2696
2697 int filter(CephContext *cct,
2698 const rgw_obj_key& source_key,
2699 const RGWBucketInfo& dest_bucket_info,
2700 std::optional<rgw_placement_rule> dest_placement_rule,
2701 const map<string, bufferlist>& obj_attrs,
2702 std::optional<rgw_user> *poverride_owner,
2703 const rgw_placement_rule **prule) override;
2704 };
2705
2706 int RGWFetchObjFilter_Sync::filter(CephContext *cct,
2707 const rgw_obj_key& source_key,
2708 const RGWBucketInfo& dest_bucket_info,
2709 std::optional<rgw_placement_rule> dest_placement_rule,
2710 const map<string, bufferlist>& obj_attrs,
2711 std::optional<rgw_user> *poverride_owner,
2712 const rgw_placement_rule **prule)
2713 {
2714 int abort_err = -ERR_PRECONDITION_FAILED;
2715
2716 rgw_sync_pipe_params params;
2717
2718 RGWObjTags obj_tags;
2719
2720 auto iter = obj_attrs.find(RGW_ATTR_TAGS);
2721 if (iter != obj_attrs.end()) {
2722 try {
2723 auto it = iter->second.cbegin();
2724 obj_tags.decode(it);
2725 } catch (buffer::error &err) {
2726 ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
2727 }
2728 }
2729
2730 if (!sync_pipe.info.handler.find_obj_params(source_key,
2731 obj_tags.get_tags(),
2732 &params)) {
2733 return abort_err;
2734 }
2735
2736 if (verify_dest_params &&
2737 !(*verify_dest_params == params.dest)) {
2738 /* raced! original dest params were different, will need to retry */
2739 ldout(cct, 0) << "WARNING: " << __func__ << ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl;
2740 *need_retry = true;
2741 return -ECANCELED;
2742 }
2743
2744 std::optional<std::map<string, bufferlist> > new_attrs;
2745
2746 if (params.dest.acl_translation) {
2747 rgw_user& acl_translation_owner = params.dest.acl_translation->owner;
2748 if (!acl_translation_owner.empty()) {
2749 if (params.mode == rgw_sync_pipe_params::MODE_USER &&
2750 acl_translation_owner != dest_bucket_info.owner) {
2751 ldout(cct, 0) << "ERROR: " << __func__ << ": acl translation was requested, but user (" << acl_translation_owner
2752 << ") is not dest bucket owner (" << dest_bucket_info.owner << ")" << dendl;
2753 return -EPERM;
2754 }
2755 *poverride_owner = acl_translation_owner;
2756 }
2757 }
2758 if (params.mode == rgw_sync_pipe_params::MODE_USER) {
2759 if (!bucket_perms->verify_object_permission(obj_attrs, RGW_PERM_READ)) {
2760 ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to fetch object" << dendl;
2761 return -EPERM;
2762 }
2763 }
2764
2765 if (!dest_placement_rule &&
2766 params.dest.storage_class) {
2767 dest_rule.storage_class = *params.dest.storage_class;
2768 dest_rule.inherit_from(dest_bucket_info.placement_rule);
2769 dest_placement_rule = dest_rule;
2770 *prule = &dest_rule;
2771 }
2772
2773 return RGWFetchObjFilter_Default::filter(cct,
2774 source_key,
2775 dest_bucket_info,
2776 dest_placement_rule,
2777 obj_attrs,
2778 poverride_owner,
2779 prule);
2780 }
2781
2782 class RGWObjFetchCR : public RGWCoroutine {
2783 RGWDataSyncCtx *sc;
2784 RGWDataSyncEnv *sync_env;
2785 rgw_bucket_sync_pipe& sync_pipe;
2786 rgw_obj_key& key;
2787 std::optional<rgw_obj_key> dest_key;
2788 std::optional<uint64_t> versioned_epoch;
2789 const rgw_zone_set_entry& source_trace_entry;
2790 rgw_zone_set *zones_trace;
2791
2792 bool need_more_info{false};
2793 bool check_change{false};
2794
2795 ceph::real_time src_mtime;
2796 uint64_t src_size;
2797 string src_etag;
2798 map<string, bufferlist> src_attrs;
2799 map<string, string> src_headers;
2800
2801 std::optional<rgw_user> param_user;
2802 rgw_sync_pipe_params::Mode param_mode;
2803
2804 std::optional<RGWUserPermHandler> user_perms;
2805 std::shared_ptr<RGWUserPermHandler::Bucket> source_bucket_perms;
2806 RGWUserPermHandler::Bucket dest_bucket_perms;
2807
2808 std::optional<rgw_sync_pipe_dest_params> dest_params;
2809
2810 int try_num{0};
2811 std::shared_ptr<bool> need_retry;
2812 public:
2813 RGWObjFetchCR(RGWDataSyncCtx *_sc,
2814 rgw_bucket_sync_pipe& _sync_pipe,
2815 rgw_obj_key& _key,
2816 std::optional<rgw_obj_key> _dest_key,
2817 std::optional<uint64_t> _versioned_epoch,
2818 const rgw_zone_set_entry& source_trace_entry,
2819 rgw_zone_set *_zones_trace) : RGWCoroutine(_sc->cct),
2820 sc(_sc), sync_env(_sc->env),
2821 sync_pipe(_sync_pipe),
2822 key(_key),
2823 dest_key(_dest_key),
2824 versioned_epoch(_versioned_epoch),
2825 source_trace_entry(source_trace_entry),
2826 zones_trace(_zones_trace) {
2827 }
2828
2829
2830 int operate(const DoutPrefixProvider *dpp) override {
2831 reenter(this) {
2832
2833 #define MAX_RACE_RETRIES_OBJ_FETCH 10
2834 for (try_num = 0; try_num < MAX_RACE_RETRIES_OBJ_FETCH; ++try_num) {
2835
2836 {
2837 std::optional<rgw_user> param_acl_translation;
2838 std::optional<string> param_storage_class;
2839
2840 if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
2841 &param_user,
2842 &param_acl_translation,
2843 &param_storage_class,
2844 &param_mode,
2845 &need_more_info)) {
2846 if (!need_more_info) {
2847 return set_cr_error(-ERR_PRECONDITION_FAILED);
2848 }
2849 }
2850 }
2851
2852 if (need_more_info) {
2853 ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
2854 /*
2855 * we need to fetch info about source object, so that we can determine
2856 * the correct policy configuration. This can happen if there are multiple
2857 * policy rules, and some depend on the object tagging */
2858 yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
2859 sync_env->driver,
2860 sc->source_zone,
2861 sync_pipe.info.source_bs.bucket,
2862 key,
2863 &src_mtime,
2864 &src_size,
2865 &src_etag,
2866 &src_attrs,
2867 &src_headers));
2868 if (retcode < 0) {
2869 return set_cr_error(retcode);
2870 }
2871
2872 RGWObjTags obj_tags;
2873
2874 auto iter = src_attrs.find(RGW_ATTR_TAGS);
2875 if (iter != src_attrs.end()) {
2876 try {
2877 auto it = iter->second.cbegin();
2878 obj_tags.decode(it);
2879 } catch (buffer::error &err) {
2880 ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
2881 }
2882 }
2883
2884 rgw_sync_pipe_params params;
2885 if (!sync_pipe.info.handler.find_obj_params(key,
2886 obj_tags.get_tags(),
2887 &params)) {
2888 return set_cr_error(-ERR_PRECONDITION_FAILED);
2889 }
2890
2891 param_user = params.user;
2892 param_mode = params.mode;
2893
2894 dest_params = params.dest;
2895 }
2896
2897 if (param_mode == rgw_sync_pipe_params::MODE_USER) {
2898 if (!param_user) {
2899 ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
2900 return set_cr_error(-EPERM);
2901 }
2902 user_perms.emplace(sync_env, *param_user);
2903
2904 yield call(user_perms->init_cr());
2905 if (retcode < 0) {
2906 ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
2907 return set_cr_error(retcode);
2908 }
2909
2910 /* verify that user is allowed to write at the target bucket */
2911 int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
2912 sync_pipe.dest_bucket_attrs,
2913 &dest_bucket_perms);
2914 if (r < 0) {
2915 ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
2916 return set_cr_error(retcode);
2917 }
2918
2919 if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
2920 ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bucket.get_key() << ")" << dendl;
2921 return -EPERM;
2922 }
2923
2924 /* init source bucket permission structure */
2925 source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
2926 r = user_perms->init_bucket(sync_pipe.source_bucket_info,
2927 sync_pipe.source_bucket_attrs,
2928 source_bucket_perms.get());
2929 if (r < 0) {
2930 ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
2931 return set_cr_error(retcode);
2932 }
2933 }
2934
2935 yield {
2936 if (!need_retry) {
2937 need_retry = make_shared<bool>();
2938 }
2939 auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe,
2940 source_bucket_perms,
2941 std::move(dest_params),
2942 need_retry);
2943
2944 call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->driver, sc->source_zone,
2945 nullopt,
2946 sync_pipe.info.source_bs.bucket,
2947 std::nullopt, sync_pipe.dest_bucket_info,
2948 key, dest_key, versioned_epoch,
2949 true,
2950 std::static_pointer_cast<RGWFetchObjFilter>(filter),
2951 source_trace_entry, zones_trace,
2952 sync_env->counters, dpp));
2953 }
2954 if (retcode < 0) {
2955 if (*need_retry) {
2956 continue;
2957 }
2958 return set_cr_error(retcode);
2959 }
2960
2961 return set_cr_done();
2962 }
2963
2964 ldout(cct, 0) << "ERROR: " << __func__ << ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << " key=" << key << dendl;
2965
2966 return set_cr_error(-EIO);
2967 }
2968 return 0;
2969 }
2970 };
2971
2972 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc,
2973 rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
2974 std::optional<uint64_t> versioned_epoch,
2975 const rgw_zone_set_entry& source_trace_entry,
2976 rgw_zone_set *zones_trace)
2977 {
2978 return new RGWObjFetchCR(sc, sync_pipe, key, std::nullopt, versioned_epoch,
2979 source_trace_entry, zones_trace);
2980 }
2981
2982 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
2983 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
2984 {
2985 auto sync_env = sc->env;
2986 return new RGWRemoveObjCR(sync_env->dpp, sync_env->async_rados, sync_env->driver, sc->source_zone,
2987 sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
2988 NULL, NULL, false, &mtime, zones_trace);
2989 }
2990
2991 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
2992 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
2993 {
2994 auto sync_env = sc->env;
2995 return new RGWRemoveObjCR(sync_env->dpp, sync_env->async_rados, sync_env->driver, sc->source_zone,
2996 sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
2997 &owner.id, &owner.display_name, true, &mtime, zones_trace);
2998 }
2999
3000 class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
3001 public:
3002 RGWArchiveDataSyncModule() {}
3003
3004 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc,
3005 rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
3006 std::optional<uint64_t> versioned_epoch,
3007 const rgw_zone_set_entry& source_trace_entry,
3008 rgw_zone_set *zones_trace) override;
3009 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
3010 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
3011 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
3012 };
3013
3014 class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
3015 RGWArchiveDataSyncModule data_handler;
3016 public:
3017 RGWArchiveSyncModuleInstance() {}
3018 RGWDataSyncModule *get_data_handler() override {
3019 return &data_handler;
3020 }
3021 RGWMetadataHandler *alloc_bucket_meta_handler() override {
3022 return RGWArchiveBucketMetaHandlerAllocator::alloc();
3023 }
3024 RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler(rgw::sal::Driver* driver) override {
3025 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc(driver);
3026 }
3027 };
3028
3029 int RGWArchiveSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
3030 {
3031 instance->reset(new RGWArchiveSyncModuleInstance());
3032 return 0;
3033 }
3034
3035 RGWCoroutine *RGWArchiveDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc,
3036 rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
3037 std::optional<uint64_t> versioned_epoch,
3038 const rgw_zone_set_entry& source_trace_entry,
3039 rgw_zone_set *zones_trace)
3040 {
3041 auto sync_env = sc->env;
3042 ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
3043 if (!sync_pipe.dest_bucket_info.versioned() ||
3044 (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
3045 ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
3046 sync_pipe.dest_bucket_info.flags = (sync_pipe.dest_bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
3047 int op_ret = sync_env->driver->getRados()->put_bucket_instance_info(sync_pipe.dest_bucket_info, false, real_time(), NULL, sync_env->dpp, null_yield);
3048 if (op_ret < 0) {
3049 ldpp_dout(sync_env->dpp, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
3050 return NULL;
3051 }
3052 }
3053
3054 std::optional<rgw_obj_key> dest_key;
3055
3056 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
3057 versioned_epoch = 0;
3058 dest_key = key;
3059 }
3060
3061 if (key.instance.empty()) {
3062 dest_key = key;
3063 sync_env->driver->getRados()->gen_rand_obj_instance_name(&(*dest_key));
3064 }
3065
3066 return new RGWObjFetchCR(sc, sync_pipe, key, dest_key, versioned_epoch,
3067 source_trace_entry, zones_trace);
3068 }
3069
3070 RGWCoroutine *RGWArchiveDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
3071 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
3072 {
3073 ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
3074 return NULL;
3075 }
3076
3077 RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
3078 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
3079 {
3080 ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
3081 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
3082 auto sync_env = sc->env;
3083 return new RGWRemoveObjCR(sync_env->dpp, sync_env->async_rados, sync_env->driver, sc->source_zone,
3084 sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
3085 &owner.id, &owner.display_name, true, &mtime, zones_trace);
3086 }
3087
3088 class RGWDataSyncControlCR : public RGWBackoffControlCR
3089 {
3090 RGWDataSyncCtx *sc;
3091 RGWDataSyncEnv *sync_env;
3092 uint32_t num_shards;
3093
3094 RGWSyncTraceNodeRef tn;
3095
3096 static constexpr bool exit_on_error = false; // retry on all errors
3097 public:
3098 RGWDataSyncControlCR(RGWDataSyncCtx *_sc, uint32_t _num_shards,
3099 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, exit_on_error),
3100 sc(_sc), sync_env(_sc->env), num_shards(_num_shards) {
3101 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
3102 }
3103
3104 RGWCoroutine *alloc_cr() override {
3105 return new RGWDataSyncCR(sc, num_shards, tn, backoff_ptr());
3106 }
3107
3108 void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
3109 ceph::mutex& m = cr_lock();
3110
3111 m.lock();
3112 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
3113 if (!cr) {
3114 m.unlock();
3115 return;
3116 }
3117
3118 cr->get();
3119 m.unlock();
3120
3121 if (cr) {
3122 cr->wakeup(shard_id, entries);
3123 }
3124
3125 cr->put();
3126 }
3127 };
3128
3129 void RGWRemoteDataLog::wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {
3130 std::shared_lock rl{lock};
3131 if (!data_sync_cr) {
3132 return;
3133 }
3134 data_sync_cr->wakeup(shard_id, entries);
3135 }
3136
3137 int RGWRemoteDataLog::run_sync(const DoutPrefixProvider *dpp, int num_shards)
3138 {
3139 lock.lock();
3140 data_sync_cr = new RGWDataSyncControlCR(&sc, num_shards, tn);
3141 data_sync_cr->get(); // run() will drop a ref, so take another
3142 lock.unlock();
3143
3144 int r = run(dpp, data_sync_cr);
3145
3146 lock.lock();
3147 data_sync_cr->put();
3148 data_sync_cr = NULL;
3149 lock.unlock();
3150
3151 if (r < 0) {
3152 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
3153 return r;
3154 }
3155 return 0;
3156 }
3157
3158 CephContext *RGWDataSyncStatusManager::get_cct() const
3159 {
3160 return driver->ctx();
3161 }
3162
3163 int RGWDataSyncStatusManager::init(const DoutPrefixProvider *dpp)
3164 {
3165 RGWZone *zone_def;
3166
3167 if (!(zone_def = driver->svc()->zone->find_zone(source_zone))) {
3168 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
3169 return -EIO;
3170 }
3171
3172 if (!driver->svc()->sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
3173 return -ENOTSUP;
3174 }
3175
3176 const RGWZoneParams& zone_params = driver->svc()->zone->get_zone_params();
3177
3178 if (sync_module == nullptr) {
3179 sync_module = driver->get_sync_module();
3180 }
3181
3182 conn = driver->svc()->zone->get_zone_conn(source_zone);
3183 if (!conn) {
3184 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
3185 return -EINVAL;
3186 }
3187
3188 error_logger = new RGWSyncErrorLogger(driver, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3189
3190 int r = source_log.init(source_zone, conn, error_logger, driver->getRados()->get_sync_tracer(),
3191 sync_module, counters);
3192 if (r < 0) {
3193 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
3194 finalize();
3195 return r;
3196 }
3197
3198 rgw_datalog_info datalog_info;
3199 r = source_log.read_log_info(dpp, &datalog_info);
3200 if (r < 0) {
3201 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
3202 finalize();
3203 return r;
3204 }
3205
3206 num_shards = datalog_info.num_shards;
3207
3208 for (int i = 0; i < num_shards; i++) {
3209 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
3210 }
3211
3212 return 0;
3213 }
3214
3215 void RGWDataSyncStatusManager::finalize()
3216 {
3217 delete error_logger;
3218 error_logger = nullptr;
3219 }
3220
3221 unsigned RGWDataSyncStatusManager::get_subsys() const
3222 {
3223 return dout_subsys;
3224 }
3225
3226 std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
3227 {
3228 auto zone = std::string_view{source_zone.id};
3229 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
3230 }
3231
3232 string RGWDataSyncStatusManager::sync_status_oid(const rgw_zone_id& source_zone)
3233 {
3234 char buf[datalog_sync_status_oid_prefix.size() + source_zone.id.size() + 16];
3235 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.id.c_str());
3236
3237 return string(buf);
3238 }
3239
3240 string RGWDataSyncStatusManager::shard_obj_name(const rgw_zone_id& source_zone, int shard_id)
3241 {
3242 char buf[datalog_sync_status_shard_prefix.size() + source_zone.id.size() + 16];
3243 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.id.c_str(), shard_id);
3244
3245 return string(buf);
3246 }
3247
3248 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
3249 RGWDataSyncCtx *sc;
3250 RGWDataSyncEnv *sync_env;
3251
3252 const rgw_bucket_sync_pair_info& sync_pair;
3253 const string sync_status_oid;
3254
3255 rgw_bucket_shard_sync_info& status;
3256 RGWObjVersionTracker& objv_tracker;
3257 const BucketIndexShardsManager& marker_mgr;
3258 bool exclusive;
3259 public:
3260 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
3261 const rgw_bucket_sync_pair_info& _sync_pair,
3262 rgw_bucket_shard_sync_info& _status,
3263 uint64_t gen,
3264 const BucketIndexShardsManager& _marker_mgr,
3265 RGWObjVersionTracker& objv_tracker,
3266 bool exclusive)
3267 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3268 sync_pair(_sync_pair),
3269 sync_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, _sync_pair, gen)),
3270 status(_status), objv_tracker(objv_tracker), marker_mgr(_marker_mgr), exclusive(exclusive)
3271 {}
3272
3273 int operate(const DoutPrefixProvider *dpp) override {
3274 reenter(this) {
3275 yield {
3276 rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid);
3277
3278 // whether or not to do full sync, incremental sync will follow anyway
3279 if (sync_env->sync_module->should_full_sync()) {
3280 const auto max_marker = marker_mgr.get(sync_pair.source_bs.shard_id, "");
3281 status.inc_marker.position = max_marker;
3282 }
3283 status.inc_marker.timestamp = ceph::real_clock::now();
3284 status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
3285
3286 map<string, bufferlist> attrs;
3287 status.encode_all_attrs(attrs);
3288 call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->driver,
3289 obj, attrs, &objv_tracker, exclusive));
3290 }
3291
3292 if (retcode < 0) {
3293 ldout(cct, 20) << "ERROR: init marker position failed. error: " << retcode << dendl;
3294 return set_cr_error(retcode);
3295 }
3296 ldout(cct, 20) << "init marker position: " << status.inc_marker.position <<
3297 ". written to shard status object: " << sync_status_oid << dendl;
3298 return set_cr_done();
3299 }
3300 return 0;
3301 }
3302 };
3303
3304 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
3305
3306 template <class T>
3307 static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
3308 {
3309 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
3310 if (iter == attrs.end()) {
3311 *val = T();
3312 return false;
3313 }
3314
3315 auto biter = iter->second.cbegin();
3316 try {
3317 decode(*val, biter);
3318 } catch (buffer::error& err) {
3319 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
3320 return false;
3321 }
3322 return true;
3323 }
3324
3325 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
3326 {
3327 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
3328 decode_attr(cct, attrs, "state", &state);
3329 }
3330 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
3331 decode_attr(cct, attrs, "inc_marker", &inc_marker);
3332 }
3333 }
3334
3335 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
3336 {
3337 encode_state_attr(attrs);
3338 inc_marker.encode_attr(attrs);
3339 }
3340
3341 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
3342 {
3343 using ceph::encode;
3344 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
3345 }
3346
3347 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
3348 {
3349 using ceph::encode;
3350 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
3351 }
3352
3353 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
3354 {
3355 using ceph::encode;
3356 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
3357 }
3358
3359 class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine {
3360 RGWDataSyncCtx *sc;
3361 RGWDataSyncEnv *sync_env;
3362 string oid;
3363 rgw_bucket_shard_sync_info *status;
3364 RGWObjVersionTracker* objv_tracker;
3365 map<string, bufferlist> attrs;
3366 public:
3367 RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
3368 const rgw_bucket_sync_pair_info& sync_pair,
3369 rgw_bucket_shard_sync_info *_status,
3370 RGWObjVersionTracker* objv_tracker,
3371 uint64_t gen)
3372 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3373 oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, gen)),
3374 status(_status), objv_tracker(objv_tracker)
3375 {}
3376 int operate(const DoutPrefixProvider *dpp) override;
3377 };
3378
3379 int RGWReadBucketPipeSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
3380 {
3381 reenter(this) {
3382 yield call(new RGWSimpleRadosReadAttrsCR(dpp, sync_env->driver,
3383 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, oid),
3384 &attrs, true, objv_tracker));
3385 if (retcode == -ENOENT) {
3386 *status = rgw_bucket_shard_sync_info();
3387 return set_cr_done();
3388 }
3389 if (retcode < 0) {
3390 ldpp_dout(dpp, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
3391 return set_cr_error(retcode);
3392 }
3393 status->decode_from_attrs(sync_env->cct, attrs);
3394 return set_cr_done();
3395 }
3396 return 0;
3397 }
3398
3399 // wrap ReadSyncStatus and set a flag if it's not in incremental
3400 class CheckBucketShardStatusIsIncremental : public RGWReadBucketPipeSyncStatusCoroutine {
3401 bool* result;
3402 rgw_bucket_shard_sync_info status;
3403 public:
3404 CheckBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
3405 const rgw_bucket_sync_pair_info& sync_pair,
3406 bool* result)
3407 : RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &status, nullptr, 0 /*no gen in compat mode*/),
3408 result(result)
3409 {}
3410
3411 int operate(const DoutPrefixProvider *dpp) override {
3412 int r = RGWReadBucketPipeSyncStatusCoroutine::operate(dpp);
3413 if (state == RGWCoroutine_Done &&
3414 status.state != rgw_bucket_shard_sync_info::StateIncrementalSync) {
3415 *result = false;
3416 }
3417 return r;
3418 }
3419 };
3420
3421 class CheckAllBucketShardStatusIsIncremental : public RGWShardCollectCR {
3422 // start with 1 shard, and only spawn more if we detect an existing shard.
3423 // this makes the backward compatilibility check far less expensive in the
3424 // general case where no shards exist
3425 static constexpr int initial_concurrent_shards = 1;
3426 static constexpr int max_concurrent_shards = 16;
3427
3428 RGWDataSyncCtx* sc;
3429 rgw_bucket_sync_pair_info sync_pair;
3430 const int num_shards;
3431 bool* result;
3432 int shard = 0;
3433 public:
3434 CheckAllBucketShardStatusIsIncremental(RGWDataSyncCtx* sc,
3435 const rgw_bucket_sync_pair_info& sync_pair,
3436 int num_shards, bool* result)
3437 : RGWShardCollectCR(sc->cct, initial_concurrent_shards),
3438 sc(sc), sync_pair(sync_pair), num_shards(num_shards), result(result)
3439 {}
3440
3441 bool spawn_next() override {
3442 // stop spawning if we saw any errors or non-incremental shards
3443 if (shard >= num_shards || status < 0 || !*result) {
3444 return false;
3445 }
3446 sync_pair.source_bs.shard_id = shard++;
3447 spawn(new CheckBucketShardStatusIsIncremental(sc, sync_pair, result), false);
3448 return true;
3449 }
3450
3451 private:
3452 int handle_result(int r) override {
3453 if (r < 0) {
3454 ldout(cct, 4) << "failed to read bucket shard status: "
3455 << cpp_strerror(r) << dendl;
3456 } else if (shard == 0) {
3457 // enable concurrency once the first shard succeeds
3458 max_concurrent = max_concurrent_shards;
3459 }
3460 return r;
3461 }
3462 };
3463
3464 // wrap InitBucketShardSyncStatus with local storage for 'status' and 'objv'
3465 // and a loop to retry on racing writes
3466 class InitBucketShardStatusCR : public RGWCoroutine {
3467 RGWDataSyncCtx* sc;
3468 rgw_bucket_sync_pair_info pair;
3469 rgw_bucket_shard_sync_info status;
3470 RGWObjVersionTracker objv;
3471 const uint64_t gen;
3472 const BucketIndexShardsManager& marker_mgr;
3473
3474 public:
3475 InitBucketShardStatusCR(RGWDataSyncCtx* sc,
3476 const rgw_bucket_sync_pair_info& pair,
3477 uint64_t gen,
3478 const BucketIndexShardsManager& marker_mgr)
3479 : RGWCoroutine(sc->cct), sc(sc), pair(pair), gen(gen), marker_mgr(marker_mgr)
3480 {}
3481 int operate(const DoutPrefixProvider *dpp) {
3482 reenter(this) {
3483 // non exclusive create with empty status
3484 objv.generate_new_write_ver(cct);
3485 yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, pair, status, gen, marker_mgr, objv, false));
3486 if (retcode < 0) {
3487 return set_cr_error(retcode);
3488 }
3489 return set_cr_done();
3490 }
3491 return 0;
3492 }
3493 };
3494
3495 class InitBucketShardStatusCollectCR : public RGWShardCollectCR {
3496 static constexpr int max_concurrent_shards = 16;
3497 RGWDataSyncCtx* sc;
3498 rgw_bucket_sync_pair_info sync_pair;
3499 const uint64_t gen;
3500 const BucketIndexShardsManager& marker_mgr;
3501
3502 const int num_shards;
3503 int shard = 0;
3504
3505 int handle_result(int r) override {
3506 if (r < 0) {
3507 ldout(cct, 4) << "failed to init bucket shard status: "
3508 << cpp_strerror(r) << dendl;
3509 }
3510 return r;
3511 }
3512 public:
3513 InitBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
3514 const rgw_bucket_sync_pair_info& sync_pair,
3515 uint64_t gen,
3516 const BucketIndexShardsManager& marker_mgr,
3517 int num_shards)
3518 : RGWShardCollectCR(sc->cct, max_concurrent_shards),
3519 sc(sc), sync_pair(sync_pair), gen(gen), marker_mgr(marker_mgr), num_shards(num_shards)
3520 {}
3521
3522 bool spawn_next() override {
3523 if (shard >= num_shards || status < 0) { // stop spawning on any errors
3524 return false;
3525 }
3526 sync_pair.source_bs.shard_id = shard++;
3527 spawn(new InitBucketShardStatusCR(sc, sync_pair, gen, marker_mgr), false);
3528 return true;
3529 }
3530 };
3531
3532 class RemoveBucketShardStatusCR : public RGWCoroutine {
3533 RGWDataSyncCtx* const sc;
3534 RGWDataSyncEnv* const sync_env;
3535
3536 rgw_bucket_sync_pair_info sync_pair;
3537 rgw_raw_obj obj;
3538 RGWObjVersionTracker objv;
3539
3540 public:
3541 RemoveBucketShardStatusCR(RGWDataSyncCtx* sc,
3542 const rgw_bucket_sync_pair_info& sync_pair, uint64_t gen)
3543 : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
3544 sync_pair(sync_pair),
3545 obj(sync_env->svc->zone->get_zone_params().log_pool,
3546 RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, gen))
3547 {}
3548
3549 int operate(const DoutPrefixProvider *dpp) override {
3550 reenter(this) {
3551 yield call(new RGWRadosRemoveCR(sync_env->driver, obj, &objv));
3552 if (retcode < 0 && retcode != -ENOENT) {
3553 ldout(cct, 20) << "ERROR: failed to remove bucket shard status for: " << sync_pair <<
3554 ". with error: " << retcode << dendl;
3555 return set_cr_error(retcode);
3556 }
3557 ldout(cct, 20) << "removed bucket shard status object: " << obj.oid << dendl;
3558 return set_cr_done();
3559 }
3560 return 0;
3561 }
3562 };
3563
3564 class RemoveBucketShardStatusCollectCR : public RGWShardCollectCR {
3565 static constexpr int max_concurrent_shards = 16;
3566 RGWDataSyncCtx* const sc;
3567 RGWDataSyncEnv* const sync_env;
3568 rgw_bucket_sync_pair_info sync_pair;
3569 const uint64_t gen;
3570
3571 const int num_shards;
3572 int shard = 0;
3573
3574 int handle_result(int r) override {
3575 if (r < 0) {
3576 ldout(cct, 4) << "failed to remove bucket shard status object: "
3577 << cpp_strerror(r) << dendl;
3578 }
3579 return r;
3580 }
3581 public:
3582 RemoveBucketShardStatusCollectCR(RGWDataSyncCtx* sc,
3583 const rgw_bucket_sync_pair_info& sync_pair,
3584 uint64_t gen,
3585 int num_shards)
3586 : RGWShardCollectCR(sc->cct, max_concurrent_shards),
3587 sc(sc), sync_env(sc->env), sync_pair(sync_pair), gen(gen), num_shards(num_shards)
3588 {}
3589
3590 bool spawn_next() override {
3591 if (shard >= num_shards) {
3592 return false;
3593 }
3594 sync_pair.source_bs.shard_id = shard++;
3595 spawn(new RemoveBucketShardStatusCR(sc, sync_pair, gen), false);
3596 return true;
3597 }
3598 };
3599
3600 class InitBucketFullSyncStatusCR : public RGWCoroutine {
3601 RGWDataSyncCtx *sc;
3602 RGWDataSyncEnv *sync_env;
3603
3604 const rgw_bucket_sync_pair_info& sync_pair;
3605 const rgw_raw_obj& status_obj;
3606 rgw_bucket_sync_status& status;
3607 RGWObjVersionTracker& objv;
3608 const RGWBucketInfo& source_info;
3609 const bool check_compat;
3610
3611 const rgw_bucket_index_marker_info& info;
3612 BucketIndexShardsManager marker_mgr;
3613
3614 bool all_incremental = true;
3615 bool no_zero = false;
3616
3617 public:
3618 InitBucketFullSyncStatusCR(RGWDataSyncCtx* sc,
3619 const rgw_bucket_sync_pair_info& sync_pair,
3620 const rgw_raw_obj& status_obj,
3621 rgw_bucket_sync_status& status,
3622 RGWObjVersionTracker& objv,
3623 const RGWBucketInfo& source_info,
3624 bool check_compat,
3625 const rgw_bucket_index_marker_info& info)
3626 : RGWCoroutine(sc->cct), sc(sc), sync_env(sc->env),
3627 sync_pair(sync_pair), status_obj(status_obj),
3628 status(status), objv(objv), source_info(source_info),
3629 check_compat(check_compat), info(info)
3630 {}
3631
3632 int operate(const DoutPrefixProvider *dpp) override {
3633 reenter(this) {
3634 retcode = marker_mgr.from_string(info.max_marker, -1);
3635 if (retcode < 0) {
3636 lderr(cct) << "failed to parse bilog shard markers: "
3637 << cpp_strerror(retcode) << dendl;
3638 return set_cr_error(retcode);
3639 }
3640
3641 status.state = BucketSyncState::Init;
3642
3643 if (info.oldest_gen == 0) {
3644 if (check_compat) {
3645 // use shard count from our log gen=0
3646 // try to convert existing per-shard incremental status for backward compatibility
3647 if (source_info.layout.logs.empty() ||
3648 source_info.layout.logs.front().gen > 0) {
3649 ldpp_dout(dpp, 20) << "no generation zero when checking compatibility" << dendl;
3650 no_zero = true;
3651 } else if (auto& log = source_info.layout.logs.front();
3652 log.layout.type != rgw::BucketLogType::InIndex) {
3653 ldpp_dout(dpp, 20) << "unrecognized log layout type when checking compatibility " << log.layout.type << dendl;
3654 no_zero = true;
3655 }
3656 if (!no_zero) {
3657 yield {
3658 const int num_shards0 = rgw::num_shards(
3659 source_info.layout.logs.front().layout.in_index.layout);
3660 call(new CheckAllBucketShardStatusIsIncremental(sc, sync_pair,
3661 num_shards0,
3662 &all_incremental));
3663 }
3664 if (retcode < 0) {
3665 return set_cr_error(retcode);
3666 }
3667 if (all_incremental) {
3668 // we can use existing status and resume incremental sync
3669 status.state = BucketSyncState::Incremental;
3670 }
3671 } else {
3672 all_incremental = false;
3673 }
3674 }
3675 }
3676
3677 if (status.state != BucketSyncState::Incremental) {
3678 // initialize all shard sync status. this will populate the log marker
3679 // positions where incremental sync will resume after full sync
3680 yield {
3681 const int num_shards = marker_mgr.get().size();
3682 call(new InitBucketShardStatusCollectCR(sc, sync_pair, info.latest_gen, marker_mgr, num_shards));
3683 }
3684 if (retcode < 0) {
3685 ldout(cct, 20) << "failed to init bucket shard status: "
3686 << cpp_strerror(retcode) << dendl;
3687 return set_cr_error(retcode);
3688 }
3689
3690 if (sync_env->sync_module->should_full_sync()) {
3691 status.state = BucketSyncState::Full;
3692 } else {
3693 status.state = BucketSyncState::Incremental;
3694 }
3695 }
3696
3697 status.shards_done_with_gen.resize(marker_mgr.get().size());
3698 status.incremental_gen = info.latest_gen;
3699
3700 ldout(cct, 20) << "writing bucket sync status during init. state=" << status.state << ". marker=" << status.full.position << dendl;
3701
3702 // write bucket sync status
3703 using CR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
3704 yield call(new CR(dpp, sync_env->driver,
3705 status_obj, status, &objv, false));
3706 if (retcode < 0) {
3707 ldout(cct, 20) << "failed to write bucket shard status: "
3708 << cpp_strerror(retcode) << dendl;
3709 return set_cr_error(retcode);
3710 }
3711 return set_cr_done();
3712 }
3713 return 0;
3714 }
3715 };
3716
3717 #define OMAP_READ_MAX_ENTRIES 10
3718 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
3719 RGWDataSyncCtx *sc;
3720 RGWDataSyncEnv *sync_env;
3721 rgw::sal::RadosStore* driver;
3722
3723 const int shard_id;
3724 int max_entries;
3725
3726 set<string>& recovering_buckets;
3727 string marker;
3728 string error_oid;
3729
3730 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
3731 set<string> error_entries;
3732 int max_omap_entries;
3733 int count;
3734
3735 public:
3736 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id,
3737 set<string>& _recovering_buckets, const int _max_entries)
3738 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3739 driver(sync_env->driver), shard_id(_shard_id), max_entries(_max_entries),
3740 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
3741 {
3742 error_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id) + ".retry";
3743 }
3744
3745 int operate(const DoutPrefixProvider *dpp) override;
3746 };
3747
3748 int RGWReadRecoveringBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
3749 {
3750 reenter(this){
3751 //read recovering bucket shards
3752 count = 0;
3753 do {
3754 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
3755 yield call(new RGWRadosGetOmapKeysCR(driver, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, error_oid),
3756 marker, max_omap_entries, omapkeys));
3757
3758 if (retcode == -ENOENT) {
3759 break;
3760 }
3761
3762 if (retcode < 0) {
3763 ldpp_dout(dpp, 0) << "failed to read recovering bucket shards with "
3764 << cpp_strerror(retcode) << dendl;
3765 return set_cr_error(retcode);
3766 }
3767
3768 error_entries = std::move(omapkeys->entries);
3769 if (error_entries.empty()) {
3770 break;
3771 }
3772
3773 count += error_entries.size();
3774 marker = *error_entries.rbegin();
3775 for (const std::string& key : error_entries) {
3776 rgw_bucket_shard bs;
3777 std::optional<uint64_t> gen;
3778 if (int r = rgw::error_repo::decode_key(key, bs, gen); r < 0) {
3779 // insert the key as-is
3780 recovering_buckets.insert(std::move(key));
3781 } else if (gen) {
3782 recovering_buckets.insert(fmt::format("{}[{}]", bucket_shard_str{bs}, *gen));
3783 } else {
3784 recovering_buckets.insert(fmt::format("{}[full]", bucket_shard_str{bs}));
3785 }
3786 }
3787 } while (omapkeys->more && count < max_entries);
3788
3789 return set_cr_done();
3790 }
3791
3792 return 0;
3793 }
3794
3795 class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
3796 RGWDataSyncCtx *sc;
3797 RGWDataSyncEnv *sync_env;
3798 rgw::sal::RadosStore* driver;
3799
3800 const int shard_id;
3801 int max_entries;
3802
3803 set<string>& pending_buckets;
3804 string marker;
3805 string status_oid;
3806
3807 rgw_data_sync_marker* sync_marker;
3808 int count;
3809
3810 std::string next_marker;
3811 vector<rgw_data_change_log_entry> log_entries;
3812 bool truncated;
3813
3814 public:
3815 RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id,
3816 set<string>& _pending_buckets,
3817 rgw_data_sync_marker* _sync_marker, const int _max_entries)
3818 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3819 driver(sync_env->driver), shard_id(_shard_id), max_entries(_max_entries),
3820 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
3821 {
3822 status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id);
3823 }
3824
3825 int operate(const DoutPrefixProvider *dpp) override;
3826 };
3827
3828 int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
3829 {
3830 reenter(this){
3831 //read sync status marker
3832 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
3833 yield call(new CR(dpp, sync_env->driver,
3834 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
3835 sync_marker));
3836 if (retcode < 0) {
3837 ldpp_dout(dpp, 0) << "failed to read sync status marker with "
3838 << cpp_strerror(retcode) << dendl;
3839 return set_cr_error(retcode);
3840 }
3841
3842 //read pending bucket shards
3843 marker = sync_marker->marker;
3844 count = 0;
3845 do{
3846 yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, marker,
3847 &next_marker, &log_entries, &truncated));
3848
3849 if (retcode == -ENOENT) {
3850 break;
3851 }
3852
3853 if (retcode < 0) {
3854 ldpp_dout(dpp, 0) << "failed to read remote data log info with "
3855 << cpp_strerror(retcode) << dendl;
3856 return set_cr_error(retcode);
3857 }
3858
3859 if (log_entries.empty()) {
3860 break;
3861 }
3862
3863 count += log_entries.size();
3864 for (const auto& entry : log_entries) {
3865 pending_buckets.insert(entry.entry.key);
3866 }
3867 }while(truncated && count < max_entries);
3868
3869 return set_cr_done();
3870 }
3871
3872 return 0;
3873 }
3874
3875 int RGWRemoteDataLog::read_shard_status(const DoutPrefixProvider *dpp, int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
3876 {
3877 // cannot run concurrently with run_sync(), so run in a separate manager
3878 RGWCoroutinesManager crs(driver->ctx(), driver->getRados()->get_cr_registry());
3879 RGWHTTPManager http_manager(driver->ctx(), crs.get_completion_mgr());
3880 int ret = http_manager.start();
3881 if (ret < 0) {
3882 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
3883 return ret;
3884 }
3885 RGWDataSyncEnv sync_env_local = sync_env;
3886 sync_env_local.http_manager = &http_manager;
3887 RGWDataSyncCtx sc_local = sc;
3888 sc_local.env = &sync_env_local;
3889 list<RGWCoroutinesStack *> stacks;
3890 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(driver->ctx(), &crs);
3891 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sc_local, shard_id, recovering_buckets, max_entries));
3892 stacks.push_back(recovering_stack);
3893 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(driver->ctx(), &crs);
3894 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sc_local, shard_id, pending_buckets, sync_marker, max_entries));
3895 stacks.push_back(pending_stack);
3896 ret = crs.run(dpp, stacks);
3897 http_manager.stop();
3898 return ret;
3899 }
3900
3901 CephContext *RGWBucketPipeSyncStatusManager::get_cct() const
3902 {
3903 return driver->ctx();
3904 }
3905
3906 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
3907 {
3908 JSONDecoder::decode_json("ID", id, obj);
3909 JSONDecoder::decode_json("DisplayName", display_name, obj);
3910 }
3911
3912 struct bucket_list_entry {
3913 bool delete_marker;
3914 rgw_obj_key key;
3915 bool is_latest;
3916 real_time mtime;
3917 string etag;
3918 uint64_t size;
3919 string storage_class;
3920 rgw_bucket_entry_owner owner;
3921 uint64_t versioned_epoch;
3922 string rgw_tag;
3923
3924 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
3925
3926 void decode_json(JSONObj *obj) {
3927 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
3928 JSONDecoder::decode_json("Key", key.name, obj);
3929 JSONDecoder::decode_json("VersionId", key.instance, obj);
3930 JSONDecoder::decode_json("IsLatest", is_latest, obj);
3931 string mtime_str;
3932 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
3933
3934 struct tm t;
3935 uint32_t nsec;
3936 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
3937 ceph_timespec ts;
3938 ts.tv_sec = (uint64_t)internal_timegm(&t);
3939 ts.tv_nsec = nsec;
3940 mtime = real_clock::from_ceph_timespec(ts);
3941 }
3942 JSONDecoder::decode_json("ETag", etag, obj);
3943 JSONDecoder::decode_json("Size", size, obj);
3944 JSONDecoder::decode_json("StorageClass", storage_class, obj);
3945 JSONDecoder::decode_json("Owner", owner, obj);
3946 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
3947 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
3948 if (key.instance == "null" && !versioned_epoch) {
3949 key.instance.clear();
3950 }
3951 }
3952
3953 RGWModifyOp get_modify_op() const {
3954 if (delete_marker) {
3955 return CLS_RGW_OP_LINK_OLH_DM;
3956 } else if (!key.instance.empty() && key.instance != "null") {
3957 return CLS_RGW_OP_LINK_OLH;
3958 } else {
3959 return CLS_RGW_OP_ADD;
3960 }
3961 }
3962 };
3963
3964 struct bucket_list_result {
3965 string name;
3966 string prefix;
3967 string key_marker;
3968 string version_id_marker;
3969 int max_keys;
3970 bool is_truncated;
3971 list<bucket_list_entry> entries;
3972
3973 bucket_list_result() : max_keys(0), is_truncated(false) {}
3974
3975 void decode_json(JSONObj *obj) {
3976 JSONDecoder::decode_json("Name", name, obj);
3977 JSONDecoder::decode_json("Prefix", prefix, obj);
3978 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
3979 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
3980 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
3981 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
3982 JSONDecoder::decode_json("Entries", entries, obj);
3983 }
3984 };
3985
3986 class RGWListRemoteBucketCR: public RGWCoroutine {
3987 RGWDataSyncCtx *sc;
3988 RGWDataSyncEnv *sync_env;
3989 const rgw_bucket_shard& bs;
3990 rgw_obj_key marker_position;
3991
3992 bucket_list_result *result;
3993
3994 public:
3995 RGWListRemoteBucketCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
3996 rgw_obj_key& _marker_position, bucket_list_result *_result)
3997 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bs(bs),
3998 marker_position(_marker_position), result(_result) {}
3999
4000 int operate(const DoutPrefixProvider *dpp) override {
4001 reenter(this) {
4002 yield {
4003 rgw_http_param_pair pairs[] = { { "versions" , NULL },
4004 { "format" , "json" },
4005 { "objs-container" , "true" },
4006 { "key-marker" , marker_position.name.c_str() },
4007 { "version-id-marker" , marker_position.instance.c_str() },
4008 { NULL, NULL } };
4009 string p = string("/") + bs.bucket.get_key(':', 0);
4010 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, result));
4011 }
4012 if (retcode < 0) {
4013 return set_cr_error(retcode);
4014 }
4015 return set_cr_done();
4016 }
4017 return 0;
4018 }
4019 };
4020
4021 struct next_bilog_result {
4022 uint64_t generation = 0;
4023 int num_shards = 0;
4024
4025 void decode_json(JSONObj *obj) {
4026 JSONDecoder::decode_json("generation", generation, obj);
4027 JSONDecoder::decode_json("num_shards", num_shards, obj);
4028 }
4029 };
4030
4031 struct bilog_list_result {
4032 list<rgw_bi_log_entry> entries;
4033 bool truncated{false};
4034 std::optional<next_bilog_result> next_log;
4035
4036 void decode_json(JSONObj *obj) {
4037 JSONDecoder::decode_json("entries", entries, obj);
4038 JSONDecoder::decode_json("truncated", truncated, obj);
4039 JSONDecoder::decode_json("next_log", next_log, obj);
4040 }
4041 };
4042
4043 class RGWListBucketIndexLogCR: public RGWCoroutine {
4044 RGWDataSyncCtx *sc;
4045 RGWDataSyncEnv *sync_env;
4046 const string instance_key;
4047 string marker;
4048
4049 bilog_list_result *result;
4050 std::optional<PerfGuard> timer;
4051 uint64_t generation;
4052 std::string gen_str = std::to_string(generation);
4053 uint32_t format_ver{1};
4054
4055 public:
4056 RGWListBucketIndexLogCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs, string& _marker,
4057 uint64_t _generation, bilog_list_result *_result)
4058 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
4059 instance_key(bs.get_key()), marker(_marker), result(_result), generation(_generation) {}
4060
4061 int operate(const DoutPrefixProvider *dpp) override {
4062 reenter(this) {
4063 if (sync_env->counters) {
4064 timer.emplace(sync_env->counters, sync_counters::l_poll);
4065 }
4066 yield {
4067 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
4068 { "format" , "json" },
4069 { "marker" , marker.c_str() },
4070 { "type", "bucket-index" },
4071 { "generation", gen_str.c_str() },
4072 { "format-ver", "2"},
4073 { NULL, NULL } };
4074
4075 call(new RGWReadRESTResourceCR<bilog_list_result>(sync_env->cct, sc->conn, sync_env->http_manager,
4076 "/admin/log", pairs, result));
4077 }
4078 timer.reset();
4079 if (retcode < 0) {
4080 if (sync_env->counters) {
4081 sync_env->counters->inc(sync_counters::l_poll_err);
4082 }
4083 return set_cr_error(retcode);
4084 }
4085 return set_cr_done();
4086 }
4087 return 0;
4088 }
4089 };
4090
4091 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
4092
4093 class RGWBucketFullSyncMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
4094 RGWDataSyncCtx *sc;
4095 RGWDataSyncEnv *sync_env;
4096
4097 const rgw_raw_obj& status_obj;
4098 rgw_bucket_sync_status& sync_status;
4099 RGWSyncTraceNodeRef tn;
4100 RGWObjVersionTracker& objv_tracker;
4101
4102 public:
4103 RGWBucketFullSyncMarkerTrack(RGWDataSyncCtx *_sc,
4104 const rgw_raw_obj& status_obj,
4105 rgw_bucket_sync_status& sync_status,
4106 RGWSyncTraceNodeRef tn,
4107 RGWObjVersionTracker& objv_tracker)
4108 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
4109 sc(_sc), sync_env(_sc->env), status_obj(status_obj),
4110 sync_status(sync_status), tn(std::move(tn)), objv_tracker(objv_tracker)
4111 {}
4112
4113
4114 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
4115 sync_status.full.position = new_marker;
4116 sync_status.full.count = index_pos;
4117
4118 tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker));
4119 return new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
4120 sync_env->dpp, sync_env->driver,
4121 status_obj, sync_status, &objv_tracker);
4122 }
4123
4124 RGWOrderCallCR *allocate_order_control_cr() override {
4125 return new RGWLastCallerWinsCR(sync_env->cct);
4126 }
4127 };
4128
4129 // write the incremental sync status and update 'stable_timestamp' on success
4130 class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine {
4131 RGWDataSyncEnv *sync_env;
4132 rgw_raw_obj obj;
4133 rgw_bucket_shard_inc_sync_marker sync_marker;
4134 ceph::real_time* stable_timestamp;
4135 RGWObjVersionTracker& objv_tracker;
4136 std::map<std::string, bufferlist> attrs;
4137 public:
4138 RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env,
4139 const rgw_raw_obj& obj,
4140 const rgw_bucket_shard_inc_sync_marker& sync_marker,
4141 ceph::real_time* stable_timestamp,
4142 RGWObjVersionTracker& objv_tracker)
4143 : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj),
4144 sync_marker(sync_marker), stable_timestamp(stable_timestamp),
4145 objv_tracker(objv_tracker)
4146 {}
4147 int operate(const DoutPrefixProvider *dpp) {
4148 reenter(this) {
4149 sync_marker.encode_attr(attrs);
4150
4151 yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->dpp, sync_env->driver,
4152 obj, attrs, &objv_tracker));
4153 if (retcode < 0) {
4154 return set_cr_error(retcode);
4155 }
4156 if (stable_timestamp) {
4157 *stable_timestamp = sync_marker.timestamp;
4158 }
4159 return set_cr_done();
4160 }
4161 return 0;
4162 }
4163 };
4164
4165 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
4166 RGWDataSyncCtx *sc;
4167 RGWDataSyncEnv *sync_env;
4168
4169 rgw_raw_obj obj;
4170 rgw_bucket_shard_inc_sync_marker sync_marker;
4171
4172 map<rgw_obj_key, string> key_to_marker;
4173
4174 struct operation {
4175 rgw_obj_key key;
4176 bool is_olh;
4177 };
4178 map<string, operation> marker_to_op;
4179 std::set<std::string> pending_olh; // object names with pending olh operations
4180
4181 RGWSyncTraceNodeRef tn;
4182 RGWObjVersionTracker& objv_tracker;
4183 ceph::real_time* stable_timestamp;
4184
4185 void handle_finish(const string& marker) override {
4186 auto iter = marker_to_op.find(marker);
4187 if (iter == marker_to_op.end()) {
4188 return;
4189 }
4190 auto& op = iter->second;
4191 key_to_marker.erase(op.key);
4192 reset_need_retry(op.key);
4193 if (op.is_olh) {
4194 pending_olh.erase(op.key.name);
4195 }
4196 marker_to_op.erase(iter);
4197 }
4198
4199 public:
4200 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
4201 const string& _marker_oid,
4202 const rgw_bucket_shard_inc_sync_marker& _marker,
4203 RGWSyncTraceNodeRef tn,
4204 RGWObjVersionTracker& objv_tracker,
4205 ceph::real_time* stable_timestamp)
4206 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
4207 sc(_sc), sync_env(_sc->env),
4208 obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid),
4209 sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker),
4210 stable_timestamp(stable_timestamp)
4211 {}
4212
4213 const rgw_raw_obj& get_obj() const { return obj; }
4214
4215 RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
4216 sync_marker.position = new_marker;
4217 sync_marker.timestamp = timestamp;
4218
4219 tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp));
4220 return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker,
4221 stable_timestamp, objv_tracker);
4222 }
4223
4224 /*
4225 * create index from key -> <op, marker>, and from marker -> key
4226 * this is useful so that we can insure that we only have one
4227 * entry for any key that is used. This is needed when doing
4228 * incremenatl sync of data, and we don't want to run multiple
4229 * concurrent sync operations for the same bucket shard
4230 * Also, we should make sure that we don't run concurrent operations on the same key with
4231 * different ops.
4232 */
4233 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
4234 auto result = key_to_marker.emplace(key, marker);
4235 if (!result.second) { // exists
4236 set_need_retry(key);
4237 return false;
4238 }
4239 marker_to_op[marker] = operation{key, is_olh};
4240 if (is_olh) {
4241 // prevent other olh ops from starting on this object name
4242 pending_olh.insert(key.name);
4243 }
4244 return true;
4245 }
4246
4247 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
4248 // serialize olh ops on the same object name
4249 if (is_olh && pending_olh.count(key.name)) {
4250 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
4251 return false;
4252 }
4253 return (key_to_marker.find(key) == key_to_marker.end());
4254 }
4255
4256 RGWOrderCallCR *allocate_order_control_cr() override {
4257 return new RGWLastCallerWinsCR(sync_env->cct);
4258 }
4259 };
4260
4261 static bool ignore_sync_error(int err) {
4262 switch (err) {
4263 case -ENOENT:
4264 case -EPERM:
4265 return true;
4266 default:
4267 break;
4268 }
4269 return false;
4270 }
4271
4272 template <class T, class K>
4273 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
4274 RGWDataSyncCtx *sc;
4275 RGWDataSyncEnv *sync_env;
4276
4277 rgw_bucket_sync_pipe& sync_pipe;
4278 rgw_bucket_shard& bs;
4279
4280 rgw_obj_key key;
4281 bool versioned;
4282 std::optional<uint64_t> versioned_epoch;
4283 rgw_bucket_entry_owner owner;
4284 real_time timestamp;
4285 RGWModifyOp op;
4286 RGWPendingState op_state;
4287
4288 T entry_marker;
4289 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
4290
4291 int sync_status;
4292
4293 stringstream error_ss;
4294
4295 bool error_injection;
4296
4297 RGWDataSyncModule *data_sync_module;
4298
4299 rgw_zone_set_entry source_trace_entry;
4300 rgw_zone_set zones_trace;
4301
4302 RGWSyncTraceNodeRef tn;
4303 std::string zone_name;
4304
4305 public:
4306 RGWBucketSyncSingleEntryCR(RGWDataSyncCtx *_sc,
4307 rgw_bucket_sync_pipe& _sync_pipe,
4308 const rgw_obj_key& _key, bool _versioned,
4309 std::optional<uint64_t> _versioned_epoch,
4310 real_time& _timestamp,
4311 const rgw_bucket_entry_owner& _owner,
4312 RGWModifyOp _op, RGWPendingState _op_state,
4313 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
4314 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
4315 sc(_sc), sync_env(_sc->env),
4316 sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
4317 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
4318 owner(_owner),
4319 timestamp(_timestamp), op(_op),
4320 op_state(_op_state),
4321 entry_marker(_entry_marker),
4322 marker_tracker(_marker_tracker),
4323 sync_status(0){
4324 stringstream ss;
4325 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
4326 set_description() << "bucket sync single entry (source_zone=" << sc->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
4327 set_status("init");
4328
4329 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
4330
4331 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sc->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
4332 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
4333
4334 data_sync_module = sync_env->sync_module->get_data_handler();
4335
4336 source_trace_entry.zone = sc->source_zone.id;
4337 source_trace_entry.location_key = _sync_pipe.info.source_bs.bucket.get_key();
4338
4339 zones_trace = _zones_trace;
4340 zones_trace.insert(sync_env->svc->zone->get_zone().id, _sync_pipe.info.dest_bucket.get_key());
4341
4342 if (sc->env->ostr) {
4343 RGWZone* z;
4344 if ((z = sc->env->driver->svc()->zone->find_zone(sc->source_zone))) {
4345 zone_name = z->name;
4346 }
4347 }
4348 }
4349
4350 int operate(const DoutPrefixProvider *dpp) override {
4351 reenter(this) {
4352 /* skip entries that are not complete */
4353 if (op_state != CLS_RGW_STATE_COMPLETE) {
4354 goto done;
4355 }
4356 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
4357 do {
4358 yield {
4359 marker_tracker->reset_need_retry(key);
4360 if (key.name.empty()) {
4361 /* shouldn't happen */
4362 set_status("skipping empty entry");
4363 tn->log(0, "entry with empty obj name, skipping");
4364 goto done;
4365 }
4366 if (error_injection &&
4367 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
4368 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
4369 retcode = -EIO;
4370 } else if (op == CLS_RGW_OP_ADD ||
4371 op == CLS_RGW_OP_LINK_OLH) {
4372 set_status("syncing obj");
4373 tn->log(5, SSTR("bucket sync: sync obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
4374 if (versioned_epoch) {
4375 pretty_print(sc->env, "Syncing object s3://{}/{} version {} in sync from zone {}\n",
4376 bs.bucket.name, key, *versioned_epoch, zone_name);
4377 } else {
4378 pretty_print(sc->env, "Syncing object s3://{}/{} in sync from zone {}\n",
4379 bs.bucket.name, key, zone_name);
4380 }
4381 call(data_sync_module->sync_object(dpp, sc, sync_pipe, key, versioned_epoch,
4382 source_trace_entry, &zones_trace));
4383 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
4384 set_status("removing obj");
4385 if (versioned_epoch) {
4386 pretty_print(sc->env, "Deleting object s3://{}/{} version {} in sync from zone {}\n",
4387 bs.bucket.name, key, *versioned_epoch, zone_name);
4388 } else {
4389 pretty_print(sc->env, "Deleting object s3://{}/{} in sync from zone {}\n",
4390 bs.bucket.name, key, zone_name);
4391 }
4392 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
4393 versioned = true;
4394 }
4395 tn->log(10, SSTR("removing obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
4396 call(data_sync_module->remove_object(dpp, sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
4397 // our copy of the object is more recent, continue as if it succeeded
4398 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
4399 set_status("creating delete marker");
4400 tn->log(10, SSTR("creating delete marker: obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
4401 call(data_sync_module->create_delete_marker(dpp, sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
4402 }
4403 tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key));
4404 }
4405 if (retcode == -ERR_PRECONDITION_FAILED) {
4406 pretty_print(sc->env, "Skipping object s3://{}/{} in sync from zone {}\n",
4407 bs.bucket.name, key, zone_name);
4408 set_status("Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
4409 tn->log(0, "Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
4410 retcode = 0;
4411 }
4412 } while (marker_tracker->need_retry(key));
4413 {
4414 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
4415 if (retcode >= 0) {
4416 tn->log(10, "success");
4417 } else {
4418 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
4419 }
4420 }
4421
4422 if (retcode < 0 && retcode != -ENOENT) {
4423 set_status() << "failed to sync obj; retcode=" << retcode;
4424 tn->log(0, SSTR("ERROR: failed to sync object: "
4425 << bucket_shard_str{bs} << "/" << key.name));
4426 if (!ignore_sync_error(retcode)) {
4427 error_ss << bucket_shard_str{bs} << "/" << key.name;
4428 sync_status = retcode;
4429 }
4430 }
4431 if (!error_ss.str().empty()) {
4432 yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
4433 }
4434 done:
4435 if (sync_status == 0) {
4436 /* update marker */
4437 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
4438 yield call(marker_tracker->finish(entry_marker));
4439 sync_status = retcode;
4440 }
4441 if (sync_status < 0) {
4442 return set_cr_error(sync_status);
4443 }
4444 return set_cr_done();
4445 }
4446 return 0;
4447 }
4448 };
4449
4450 class RGWBucketFullSyncCR : public RGWCoroutine {
4451 RGWDataSyncCtx *sc;
4452 RGWDataSyncEnv *sync_env;
4453 rgw_bucket_sync_pipe& sync_pipe;
4454 rgw_bucket_sync_status& sync_status;
4455 rgw_bucket_shard& bs;
4456 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
4457 bucket_list_result list_result;
4458 list<bucket_list_entry>::iterator entries_iter;
4459 rgw_obj_key list_marker;
4460 bucket_list_entry *entry{nullptr};
4461
4462 int total_entries{0};
4463
4464 int sync_result{0};
4465
4466 const rgw_raw_obj& status_obj;
4467 RGWObjVersionTracker& objv;
4468
4469 rgw_zone_set zones_trace;
4470
4471 RGWSyncTraceNodeRef tn;
4472 RGWBucketFullSyncMarkerTrack marker_tracker;
4473
4474 struct _prefix_handler {
4475 RGWBucketSyncFlowManager::pipe_rules_ref rules;
4476 RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter;
4477 std::optional<string> cur_prefix;
4478
4479 void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref& _rules) {
4480 rules = _rules;
4481 }
4482
4483 bool revalidate_marker(rgw_obj_key *marker) {
4484 if (cur_prefix &&
4485 boost::starts_with(marker->name, *cur_prefix)) {
4486 return true;
4487 }
4488 if (!rules) {
4489 return false;
4490 }
4491 iter = rules->prefix_search(marker->name);
4492 if (iter == rules->prefix_end()) {
4493 return false;
4494 }
4495 cur_prefix = iter->first;
4496 marker->name = *cur_prefix;
4497 marker->instance.clear();
4498 return true;
4499 }
4500
4501 bool check_key_handled(const rgw_obj_key& key) {
4502 if (!rules) {
4503 return false;
4504 }
4505 if (cur_prefix &&
4506 boost::starts_with(key.name, *cur_prefix)) {
4507 return true;
4508 }
4509 iter = rules->prefix_search(key.name);
4510 if (iter == rules->prefix_end()) {
4511 return false;
4512 }
4513 cur_prefix = iter->first;
4514 return boost::starts_with(key.name, iter->first);
4515 }
4516 } prefix_handler;
4517
4518 public:
4519 RGWBucketFullSyncCR(RGWDataSyncCtx *_sc,
4520 rgw_bucket_sync_pipe& _sync_pipe,
4521 const rgw_raw_obj& status_obj,
4522 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
4523 rgw_bucket_sync_status& sync_status,
4524 RGWSyncTraceNodeRef tn_parent,
4525 RGWObjVersionTracker& objv_tracker)
4526 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
4527 sync_pipe(_sync_pipe), sync_status(sync_status),
4528 bs(_sync_pipe.info.source_bs),
4529 lease_cr(std::move(lease_cr)), status_obj(status_obj), objv(objv_tracker),
4530 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
4531 SSTR(bucket_shard_str{bs}))),
4532 marker_tracker(sc, status_obj, sync_status, tn, objv_tracker)
4533 {
4534 zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bucket.get_key());
4535 prefix_handler.set_rules(sync_pipe.get_rules());
4536 }
4537
4538 int operate(const DoutPrefixProvider *dpp) override;
4539 };
4540
4541 int RGWBucketFullSyncCR::operate(const DoutPrefixProvider *dpp)
4542 {
4543 reenter(this) {
4544 list_marker = sync_status.full.position;
4545
4546 total_entries = sync_status.full.count;
4547 do {
4548 if (lease_cr && !lease_cr->is_locked()) {
4549 tn->log(1, "no lease or lease is lost, abort");
4550 drain_all();
4551 yield call(marker_tracker.flush());
4552 if (retcode < 0) {
4553 tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
4554 return set_cr_error(retcode);
4555 }
4556 return set_cr_error(-ECANCELED);
4557 }
4558 set_status("listing remote bucket");
4559 tn->log(20, "listing bucket for full sync");
4560
4561 if (!prefix_handler.revalidate_marker(&list_marker)) {
4562 set_status() << "finished iterating over all available prefixes: last marker=" << list_marker;
4563 tn->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker));
4564 break;
4565 }
4566
4567 yield call(new RGWListRemoteBucketCR(sc, bs, list_marker, &list_result));
4568 if (retcode < 0 && retcode != -ENOENT) {
4569 set_status("failed bucket listing, going down");
4570 drain_all();
4571 yield spawn(marker_tracker.flush(), true);
4572 return set_cr_error(retcode);
4573 }
4574 if (list_result.entries.size() > 0) {
4575 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
4576 }
4577 entries_iter = list_result.entries.begin();
4578 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
4579 if (lease_cr && !lease_cr->is_locked()) {
4580 drain_all();
4581 yield call(marker_tracker.flush());
4582 tn->log(1, "no lease or lease is lost, abort");
4583 if (retcode < 0) {
4584 tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
4585 return set_cr_error(retcode);
4586 }
4587 return set_cr_error(-ECANCELED);
4588 }
4589 tn->log(20, SSTR("[full sync] syncing object: "
4590 << bucket_shard_str{bs} << "/" << entries_iter->key));
4591 entry = &(*entries_iter);
4592 list_marker = entries_iter->key;
4593 if (!prefix_handler.check_key_handled(entries_iter->key)) {
4594 set_status() << "skipping entry due to policy rules: " << entries_iter->key;
4595 tn->log(20, SSTR("skipping entry due to policy rules: " << entries_iter->key));
4596 continue;
4597 }
4598 total_entries++;
4599 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
4600 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
4601 } else {
4602 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
4603 yield spawn(new SyncCR(sc, sync_pipe, entry->key,
4604 false, /* versioned, only matters for object removal */
4605 entry->versioned_epoch, entry->mtime,
4606 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
4607 entry->key, &marker_tracker, zones_trace, tn),
4608 false);
4609 }
4610 drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
4611 [&](uint64_t stack_id, int ret) {
4612 if (ret < 0) {
4613 tn->log(10, "a sync operation returned error");
4614 sync_result = ret;
4615 }
4616 return 0;
4617 });
4618 }
4619 } while (list_result.is_truncated && sync_result == 0);
4620 set_status("done iterating over all objects");
4621
4622 /* wait for all operations to complete */
4623 drain_all_cb([&](uint64_t stack_id, int ret) {
4624 if (ret < 0) {
4625 tn->log(10, "a sync operation returned error");
4626 sync_result = ret;
4627 }
4628 return 0;
4629 });
4630 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
4631 if (lease_cr && !lease_cr->is_locked()) {
4632 tn->log(1, "no lease or lease is lost, abort");
4633 yield call(marker_tracker.flush());
4634 if (retcode < 0) {
4635 tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
4636 return set_cr_error(retcode);
4637 }
4638 return set_cr_error(-ECANCELED);
4639 }
4640 yield call(marker_tracker.flush());
4641 if (retcode < 0) {
4642 tn->log(0, SSTR("ERROR: bucket full sync marker_tracker.flush() returned retcode=" << retcode));
4643 return set_cr_error(retcode);
4644 }
4645 /* update sync state to incremental */
4646 if (sync_result == 0) {
4647 sync_status.state = BucketSyncState::Incremental;
4648 tn->log(5, SSTR("set bucket state=" << sync_status.state));
4649 yield call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
4650 dpp, sync_env->driver, status_obj, sync_status, &objv));
4651 tn->log(5, SSTR("bucket status objv=" << objv));
4652 } else {
4653 tn->log(10, SSTR("backing out with sync_status=" << sync_result));
4654 }
4655 if (retcode < 0 && sync_result == 0) { /* actually tried to set incremental state and failed */
4656 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
4657 << bucket_shard_str{bs} << " retcode=" << retcode));
4658 return set_cr_error(retcode);
4659 }
4660 if (sync_result < 0) {
4661 return set_cr_error(sync_result);
4662 }
4663 return set_cr_done();
4664 }
4665 return 0;
4666 }
4667
4668 static bool has_olh_epoch(RGWModifyOp op) {
4669 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
4670 }
4671
4672 class RGWBucketShardIsDoneCR : public RGWCoroutine {
4673 RGWDataSyncCtx *sc;
4674 RGWDataSyncEnv *sync_env;
4675 rgw_bucket_sync_status bucket_status;
4676 const rgw_raw_obj& bucket_status_obj;
4677 const int shard_id;
4678 RGWObjVersionTracker objv_tracker;
4679 const next_bilog_result& next_log;
4680 const uint64_t generation;
4681
4682 public:
4683 RGWBucketShardIsDoneCR(RGWDataSyncCtx *_sc, const rgw_raw_obj& _bucket_status_obj,
4684 int _shard_id, const next_bilog_result& _next_log, const uint64_t _gen)
4685 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
4686 bucket_status_obj(_bucket_status_obj),
4687 shard_id(_shard_id), next_log(_next_log), generation(_gen) {}
4688
4689 int operate(const DoutPrefixProvider* dpp) override
4690 {
4691 reenter(this) {
4692 do {
4693 // read bucket sync status
4694 objv_tracker.clear();
4695 using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
4696 yield call(new ReadCR(dpp, sync_env->driver,
4697 bucket_status_obj, &bucket_status, false, &objv_tracker));
4698 if (retcode < 0) {
4699 ldpp_dout(dpp, 20) << "failed to read bucket shard status: "
4700 << cpp_strerror(retcode) << dendl;
4701 return set_cr_error(retcode);
4702 }
4703
4704 if (bucket_status.state != BucketSyncState::Incremental) {
4705 // exit with success to avoid stale shard being
4706 // retried in error repo if we lost a race
4707 ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR found sync state = " << bucket_status.state << dendl;
4708 return set_cr_done();
4709 }
4710
4711 if (bucket_status.incremental_gen != generation) {
4712 // exit with success to avoid stale shard being
4713 // retried in error repo if we lost a race
4714 ldpp_dout(dpp, 20) << "RGWBucketShardIsDoneCR expected gen: " << generation
4715 << ", got: " << bucket_status.incremental_gen << dendl;
4716 return set_cr_done();
4717 }
4718
4719 yield {
4720 // update bucket_status after a shard is done with current gen
4721 auto& done = bucket_status.shards_done_with_gen;
4722 done[shard_id] = true;
4723
4724 // increment gen if all shards are already done with current gen
4725 if (std::all_of(done.begin(), done.end(),
4726 [] (const bool done){return done; } )) {
4727 bucket_status.incremental_gen = next_log.generation;
4728 done.clear();
4729 done.resize(next_log.num_shards, false);
4730 }
4731 ldpp_dout(dpp, 20) << "bucket status incremental gen is " << bucket_status.incremental_gen << dendl;
4732 using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
4733 call(new WriteCR(dpp, sync_env->driver,
4734 bucket_status_obj, bucket_status, &objv_tracker, false));
4735 }
4736 if (retcode < 0 && retcode != -ECANCELED) {
4737 ldpp_dout(dpp, 20) << "failed to write bucket sync status: " << cpp_strerror(retcode) << dendl;
4738 return set_cr_error(retcode);
4739 } else if (retcode >= 0) {
4740 return set_cr_done();
4741 }
4742 } while (retcode == -ECANCELED);
4743 }
4744 return 0;
4745 }
4746 };
4747
4748 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
4749 RGWDataSyncCtx *sc;
4750 RGWDataSyncEnv *sync_env;
4751 rgw_bucket_sync_pipe& sync_pipe;
4752 RGWBucketSyncFlowManager::pipe_rules_ref rules;
4753 rgw_bucket_shard& bs;
4754 const rgw_raw_obj& bucket_status_obj;
4755 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
4756 bilog_list_result extended_result;
4757 list<rgw_bi_log_entry> list_result;
4758 int next_num_shards;
4759 uint64_t next_gen;
4760 bool truncated;
4761
4762 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
4763 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
4764 rgw_bucket_shard_sync_info& sync_info;
4765 uint64_t generation;
4766 rgw_obj_key key;
4767 rgw_bi_log_entry *entry{nullptr};
4768 bool updated_status{false};
4769 rgw_zone_id zone_id;
4770 string target_location_key;
4771
4772 string cur_id;
4773
4774 int sync_status{0};
4775 bool syncstopped{false};
4776
4777 RGWSyncTraceNodeRef tn;
4778 RGWBucketIncSyncShardMarkerTrack marker_tracker;
4779
4780 public:
4781 RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
4782 rgw_bucket_sync_pipe& _sync_pipe,
4783 const std::string& shard_status_oid,
4784 const rgw_raw_obj& _bucket_status_obj,
4785 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
4786 rgw_bucket_shard_sync_info& sync_info,
4787 uint64_t generation,
4788 RGWSyncTraceNodeRef& _tn_parent,
4789 RGWObjVersionTracker& objv_tracker,
4790 ceph::real_time* stable_timestamp)
4791 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
4792 sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
4793 bucket_status_obj(_bucket_status_obj), lease_cr(std::move(lease_cr)),
4794 sync_info(sync_info), generation(generation), zone_id(sync_env->svc->zone->get_zone().id),
4795 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
4796 SSTR(bucket_shard_str{bs}))),
4797 marker_tracker(sc, shard_status_oid, sync_info.inc_marker, tn,
4798 objv_tracker, stable_timestamp)
4799 {
4800 set_description() << "bucket shard incremental sync bucket="
4801 << bucket_shard_str{bs};
4802 set_status("init");
4803 rules = sync_pipe.get_rules();
4804 target_location_key = sync_pipe.info.dest_bucket.get_key();
4805 }
4806
4807 bool check_key_handled(const rgw_obj_key& key) {
4808 if (!rules) {
4809 return false;
4810 }
4811 auto iter = rules->prefix_search(key.name);
4812 if (iter == rules->prefix_end()) {
4813 return false;
4814 }
4815 return boost::starts_with(key.name, iter->first);
4816 }
4817
4818 int operate(const DoutPrefixProvider *dpp) override;
4819 };
4820
4821 int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
4822 {
4823 int ret;
4824 reenter(this) {
4825 do {
4826 if (lease_cr && !lease_cr->is_locked()) {
4827 tn->log(1, "no lease or lease is lost, abort");
4828 drain_all();
4829 yield call(marker_tracker.flush());
4830 if (retcode < 0) {
4831 tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
4832 return set_cr_error(retcode);
4833 }
4834 return set_cr_error(-ECANCELED);
4835 }
4836 tn->log(20, SSTR("listing bilog for incremental sync; position=" << sync_info.inc_marker.position));
4837 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
4838 yield call(new RGWListBucketIndexLogCR(sc, bs, sync_info.inc_marker.position, generation, &extended_result));
4839 if (retcode < 0 && retcode != -ENOENT) {
4840 /* wait for all operations to complete */
4841 drain_all();
4842 yield spawn(marker_tracker.flush(), true);
4843 return set_cr_error(retcode);
4844 }
4845 list_result = std::move(extended_result.entries);
4846 truncated = extended_result.truncated;
4847 if (extended_result.next_log) {
4848 next_gen = extended_result.next_log->generation;
4849 next_num_shards = extended_result.next_log->num_shards;
4850 }
4851
4852 squash_map.clear();
4853 entries_iter = list_result.begin();
4854 entries_end = list_result.end();
4855 for (; entries_iter != entries_end; ++entries_iter) {
4856 auto e = *entries_iter;
4857 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
4858 ldpp_dout(dpp, 20) << "syncstop at: " << e.timestamp << ". marker: " << e.id << dendl;
4859 syncstopped = true;
4860 entries_end = std::next(entries_iter); // stop after this entry
4861 break;
4862 }
4863 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
4864 ldpp_dout(dpp, 20) << "syncstart at: " << e.timestamp << ". marker: " << e.id << dendl;
4865 continue;
4866 }
4867 if (e.op == CLS_RGW_OP_CANCEL) {
4868 continue;
4869 }
4870 if (e.state != CLS_RGW_STATE_COMPLETE) {
4871 continue;
4872 }
4873 if (e.zones_trace.exists(zone_id.id, target_location_key)) {
4874 continue;
4875 }
4876 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
4877 // don't squash over olh entries - we need to apply their olh_epoch
4878 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
4879 continue;
4880 }
4881 if (squash_entry.first <= e.timestamp) {
4882 squash_entry = make_pair<>(e.timestamp, e.op);
4883 }
4884 }
4885
4886 entries_iter = list_result.begin();
4887 for (; entries_iter != entries_end; ++entries_iter) {
4888 if (lease_cr && !lease_cr->is_locked()) {
4889 tn->log(1, "no lease or lease is lost, abort");
4890 drain_all();
4891 yield call(marker_tracker.flush());
4892 if (retcode < 0) {
4893 tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
4894 return set_cr_error(retcode);
4895 }
4896 return set_cr_error(-ECANCELED);
4897 }
4898 entry = &(*entries_iter);
4899 {
4900 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
4901 if (p < 0) {
4902 cur_id = entry->id;
4903 } else {
4904 cur_id = entry->id.substr(p + 1);
4905 }
4906 }
4907 sync_info.inc_marker.position = cur_id;
4908
4909 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
4910 ldpp_dout(dpp, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
4911 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4912 continue;
4913 }
4914
4915 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
4916 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
4917 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
4918 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4919 continue;
4920 }
4921
4922 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
4923
4924 if (!key.ns.empty()) {
4925 set_status() << "skipping entry in namespace: " << entry->object;
4926 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
4927 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4928 continue;
4929 }
4930
4931 if (!check_key_handled(key)) {
4932 set_status() << "skipping entry due to policy rules: " << entry->object;
4933 tn->log(20, SSTR("skipping entry due to policy rules: " << entry->object));
4934 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4935 continue;
4936 }
4937
4938 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
4939 if (entry->op == CLS_RGW_OP_CANCEL) {
4940 set_status() << "canceled operation, skipping";
4941 tn->log(20, SSTR("skipping object: "
4942 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
4943 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4944 continue;
4945 }
4946 if (entry->state != CLS_RGW_STATE_COMPLETE) {
4947 set_status() << "non-complete operation, skipping";
4948 tn->log(20, SSTR("skipping object: "
4949 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
4950 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4951 continue;
4952 }
4953 if (entry->zones_trace.exists(zone_id.id, target_location_key)) {
4954 set_status() << "redundant operation, skipping";
4955 tn->log(20, SSTR("skipping object: "
4956 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
4957 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4958 continue;
4959 }
4960 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
4961 set_status() << "squashed operation, skipping";
4962 tn->log(20, SSTR("skipping object: "
4963 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
4964 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4965 continue;
4966 }
4967 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
4968 tn->log(20, SSTR("syncing object: "
4969 << bucket_shard_str{bs} << "/" << key));
4970 updated_status = false;
4971 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
4972 if (!updated_status) {
4973 set_status() << "can't do op, conflicting inflight operation";
4974 updated_status = true;
4975 }
4976 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
4977 yield wait_for_child();
4978 bool again = true;
4979 while (again) {
4980 again = collect(&ret, nullptr);
4981 if (ret < 0) {
4982 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
4983 sync_status = ret;
4984 /* we have reported this error */
4985 }
4986 }
4987 if (sync_status != 0)
4988 break;
4989 }
4990 if (sync_status != 0) {
4991 /* get error, stop */
4992 break;
4993 }
4994 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
4995 set_status() << "can't do op, sync already in progress for object";
4996 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
4997 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4998 continue;
4999 }
5000 // yield {
5001 set_status() << "start object sync";
5002 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
5003 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
5004 } else {
5005 std::optional<uint64_t> versioned_epoch;
5006 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
5007 if (entry->ver.pool < 0) {
5008 versioned_epoch = entry->ver.epoch;
5009 }
5010 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
5011 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
5012 spawn(new SyncCR(sc, sync_pipe, key,
5013 entry->is_versioned(), versioned_epoch,
5014 entry->timestamp, owner, entry->op, entry->state,
5015 cur_id, &marker_tracker, entry->zones_trace, tn),
5016 false);
5017 }
5018 // }
5019 drain_with_cb(sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
5020 [&](uint64_t stack_id, int ret) {
5021 if (ret < 0) {
5022 tn->log(10, "a sync operation returned error");
5023 sync_status = ret;
5024 }
5025 return 0;
5026 });
5027 }
5028
5029 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
5030
5031 drain_all_cb([&](uint64_t stack_id, int ret) {
5032 if (ret < 0) {
5033 tn->log(10, "a sync operation returned error");
5034 sync_status = ret;
5035 }
5036 return 0;
5037 });
5038 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
5039
5040 if (syncstopped) {
5041 // transition to StateStopped in RGWSyncBucketShardCR. if sync is
5042 // still disabled, we'll delete the sync status object. otherwise we'll
5043 // restart full sync to catch any changes that happened while sync was
5044 // disabled
5045 sync_info.state = rgw_bucket_shard_sync_info::StateStopped;
5046 return set_cr_done();
5047 }
5048
5049 yield call(marker_tracker.flush());
5050 if (retcode < 0) {
5051 tn->log(0, SSTR("ERROR: incremental sync marker_tracker.flush() returned retcode=" << retcode));
5052 return set_cr_error(retcode);
5053 }
5054 if (sync_status < 0) {
5055 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
5056 return set_cr_error(sync_status);
5057 }
5058
5059 if (!truncated && extended_result.next_log) {
5060 yield call(new RGWBucketShardIsDoneCR(sc, bucket_status_obj, bs.shard_id, *extended_result.next_log, generation));
5061 if (retcode < 0) {
5062 ldout(cct, 20) << "failed to update bucket sync status: "
5063 << cpp_strerror(retcode) << dendl;
5064 return set_cr_error(retcode);
5065 }
5066 yield {
5067 // delete the shard status object
5068 auto status_obj = sync_env->svc->rados->obj(marker_tracker.get_obj());
5069 retcode = status_obj.open(dpp);
5070 if (retcode < 0) {
5071 return set_cr_error(retcode);
5072 }
5073 call(new RGWRadosRemoveOidCR(sync_env->driver, std::move(status_obj)));
5074 if (retcode < 0) {
5075 ldpp_dout(dpp, 20) << "failed to remove shard status object: " << cpp_strerror(retcode) << dendl;
5076 return set_cr_error(retcode);
5077 }
5078 }
5079 }
5080
5081 return set_cr_done();
5082 }
5083 return 0;
5084 }
5085
5086 class RGWGetBucketPeersCR : public RGWCoroutine {
5087 RGWDataSyncEnv *sync_env;
5088
5089 std::optional<rgw_bucket> target_bucket;
5090 std::optional<rgw_zone_id> source_zone;
5091 std::optional<rgw_bucket> source_bucket;
5092
5093 rgw_sync_pipe_info_set *pipes;
5094 map<rgw_bucket, all_bucket_info> buckets_info;
5095 map<rgw_bucket, all_bucket_info>::iterator siiter;
5096 std::optional<all_bucket_info> target_bucket_info;
5097 std::optional<all_bucket_info> source_bucket_info;
5098
5099 rgw_sync_pipe_info_set::iterator siter;
5100
5101 std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
5102 std::shared_ptr<rgw_bucket_get_sync_policy_result> target_policy;
5103
5104 RGWSyncTraceNodeRef tn;
5105
5106 using pipe_const_iter = map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>::const_iterator;
5107
5108 static pair<pipe_const_iter, pipe_const_iter> get_pipe_iters(const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& m, std::optional<rgw_zone_id> zone) {
5109 if (!zone) {
5110 return { m.begin(), m.end() };
5111 }
5112
5113 auto b = m.find(*zone);
5114 if (b == m.end()) {
5115 return { b, b };
5116 }
5117 return { b, std::next(b) };
5118 }
5119
5120 void filter_sources(std::optional<rgw_zone_id> source_zone,
5121 std::optional<rgw_bucket> source_bucket,
5122 const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& all_sources,
5123 rgw_sync_pipe_info_set *result) {
5124 ldpp_dout(sync_env->dpp, 20) << __func__ << ": source_zone=" << source_zone.value_or(rgw_zone_id("*")).id
5125 << " source_bucket=" << source_bucket.value_or(rgw_bucket())
5126 << " all_sources.size()=" << all_sources.size() << dendl;
5127 auto iters = get_pipe_iters(all_sources, source_zone);
5128 for (auto i = iters.first; i != iters.second; ++i) {
5129 for (auto& handler : i->second) {
5130 if (!handler.specific()) {
5131 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
5132 continue;
5133 }
5134 if (source_bucket &&
5135 !source_bucket->match(*handler.source.bucket)) {
5136 continue;
5137 }
5138 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
5139 result->insert(handler, source_bucket_info, target_bucket_info);
5140 }
5141 }
5142 }
5143
5144 void filter_targets(std::optional<rgw_zone_id> target_zone,
5145 std::optional<rgw_bucket> target_bucket,
5146 const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& all_targets,
5147 rgw_sync_pipe_info_set *result) {
5148 ldpp_dout(sync_env->dpp, 20) << __func__ << ": target_zone=" << source_zone.value_or(rgw_zone_id("*")).id
5149 << " target_bucket=" << source_bucket.value_or(rgw_bucket())
5150 << " all_targets.size()=" << all_targets.size() << dendl;
5151 auto iters = get_pipe_iters(all_targets, target_zone);
5152 for (auto i = iters.first; i != iters.second; ++i) {
5153 for (auto& handler : i->second) {
5154 if (target_bucket &&
5155 handler.dest.bucket &&
5156 !target_bucket->match(*handler.dest.bucket)) {
5157 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
5158 continue;
5159 }
5160 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
5161 result->insert(handler, source_bucket_info, target_bucket_info);
5162 }
5163 }
5164 }
5165
5166 void update_from_target_bucket_policy();
5167 void update_from_source_bucket_policy();
5168
5169 struct GetHintTargets : public RGWGenericAsyncCR::Action {
5170 RGWDataSyncEnv *sync_env;
5171 rgw_bucket source_bucket;
5172 std::set<rgw_bucket> targets;
5173
5174 GetHintTargets(RGWDataSyncEnv *_sync_env,
5175 const rgw_bucket& _source_bucket) : sync_env(_sync_env),
5176 source_bucket(_source_bucket) {}
5177 int operate() override {
5178 int r = sync_env->svc->bucket_sync->get_bucket_sync_hints(sync_env->dpp,
5179 source_bucket,
5180 nullptr,
5181 &targets,
5182 null_yield);
5183 if (r < 0) {
5184 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): failed to fetch bucket sync hints for bucket=" << source_bucket << dendl;
5185 return r;
5186 }
5187
5188 return 0;
5189 }
5190 };
5191
5192 std::shared_ptr<GetHintTargets> get_hint_targets_action;
5193 std::set<rgw_bucket>::iterator hiter;
5194
5195 public:
5196 RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
5197 std::optional<rgw_bucket> _target_bucket,
5198 std::optional<rgw_zone_id> _source_zone,
5199 std::optional<rgw_bucket> _source_bucket,
5200 rgw_sync_pipe_info_set *_pipes,
5201 const RGWSyncTraceNodeRef& _tn_parent)
5202 : RGWCoroutine(_sync_env->cct),
5203 sync_env(_sync_env),
5204 target_bucket(_target_bucket),
5205 source_zone(_source_zone),
5206 source_bucket(_source_bucket),
5207 pipes(_pipes),
5208 tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
5209 SSTR( "target=" << target_bucket.value_or(rgw_bucket())
5210 << ":source=" << target_bucket.value_or(rgw_bucket())
5211 << ":source_zone=" << source_zone.value_or(rgw_zone_id("*")).id))) {
5212 }
5213
5214 int operate(const DoutPrefixProvider *dpp) override;
5215 };
5216
5217 std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs) {
5218 if (!bs) {
5219 out << "*";
5220 } else {
5221 out << *bs;
5222 }
5223 return out;
5224 }
5225
5226 static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
5227 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
5228 const rgw_bucket_sync_pair_info& sync_pair,
5229 std::optional<uint64_t> gen,
5230 const RGWSyncTraceNodeRef& tn,
5231 ceph::real_time* progress);
5232
5233 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
5234 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
5235 const rgw_bucket_shard& source_bs,
5236 const RGWSyncTraceNodeRef& _tn_parent,
5237 std::optional<uint64_t> gen,
5238 ceph::real_time* progress)
5239 : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
5240 lease_cr(std::move(lease_cr)),
5241 tn(sync_env->sync_tracer->add_node(
5242 _tn_parent, "bucket_sync_sources",
5243 SSTR( "source=" << source_bs << ":source_zone=" << sc->source_zone))),
5244 progress(progress),
5245 gen(gen)
5246 {
5247 sync_pair.source_bs = source_bs;
5248 }
5249
5250 int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
5251 {
5252 reenter(this) {
5253 yield call(new RGWGetBucketPeersCR(sync_env, std::nullopt, sc->source_zone,
5254 sync_pair.source_bs.bucket, &pipes, tn));
5255 if (retcode < 0 && retcode != -ENOENT) {
5256 tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
5257 return set_cr_error(retcode);
5258 }
5259
5260 ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << sync_pair.source_bs << dendl;
5261
5262 if (pipes.empty()) {
5263 ldpp_dout(dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
5264 return set_cr_done();
5265 }
5266
5267 shard_progress.resize(pipes.size());
5268 cur_shard_progress = shard_progress.begin();
5269
5270 for (siter = pipes.begin(); siter != pipes.end(); ++siter, ++cur_shard_progress) {
5271 ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
5272
5273 sync_pair.dest_bucket = siter->target.get_bucket();
5274 sync_pair.handler = siter->handler;
5275
5276 ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
5277
5278 yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
5279 gen, tn, &*cur_shard_progress),
5280 sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
5281 [&](uint64_t stack_id, int ret) {
5282 if (ret < 0) {
5283 tn->log(10, SSTR("ERROR: a sync operation returned error: " << ret));
5284 }
5285 return ret;
5286 });
5287 }
5288 drain_all_cb([&](uint64_t stack_id, int ret) {
5289 if (ret < 0) {
5290 tn->log(10, SSTR("a sync operation returned error: " << ret));
5291 }
5292 return ret;
5293 });
5294 if (progress) {
5295 *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
5296 }
5297 return set_cr_done();
5298 }
5299
5300 return 0;
5301 }
5302
5303 class RGWSyncGetBucketInfoCR : public RGWCoroutine {
5304 RGWDataSyncEnv *sync_env;
5305 rgw_bucket bucket;
5306 RGWBucketInfo *pbucket_info;
5307 map<string, bufferlist> *pattrs;
5308 RGWMetaSyncEnv meta_sync_env;
5309
5310 RGWSyncTraceNodeRef tn;
5311
5312 public:
5313 RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env,
5314 const rgw_bucket& _bucket,
5315 RGWBucketInfo *_pbucket_info,
5316 map<string, bufferlist> *_pattrs,
5317 const RGWSyncTraceNodeRef& _tn_parent)
5318 : RGWCoroutine(_sync_env->cct),
5319 sync_env(_sync_env),
5320 bucket(_bucket),
5321 pbucket_info(_pbucket_info),
5322 pattrs(_pattrs),
5323 tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info",
5324 SSTR(bucket))) {
5325 }
5326
5327 int operate(const DoutPrefixProvider *dpp) override;
5328 };
5329
5330 int RGWSyncGetBucketInfoCR::operate(const DoutPrefixProvider *dpp)
5331 {
5332 reenter(this) {
5333 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->driver, bucket, pbucket_info, pattrs, dpp));
5334 if (retcode == -ENOENT) {
5335 /* bucket instance info has not been synced in yet, fetch it now */
5336 yield {
5337 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
5338 string raw_key = string("bucket.instance:") + bucket.get_key();
5339
5340 meta_sync_env.init(dpp, cct, sync_env->driver, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
5341 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
5342
5343 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
5344 string() /* no marker */,
5345 MDLOG_STATUS_COMPLETE,
5346 NULL /* no marker tracker */,
5347 tn));
5348 }
5349 if (retcode < 0) {
5350 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bucket}));
5351 return set_cr_error(retcode);
5352 }
5353
5354 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->driver, bucket, pbucket_info, pattrs, dpp));
5355 }
5356 if (retcode < 0) {
5357 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
5358 return set_cr_error(retcode);
5359 }
5360
5361 return set_cr_done();
5362 }
5363
5364 return 0;
5365 }
5366
5367 void RGWGetBucketPeersCR::update_from_target_bucket_policy()
5368 {
5369 if (!target_policy ||
5370 !target_policy->policy_handler ||
5371 !pipes) {
5372 return;
5373 }
5374
5375 auto handler = target_policy->policy_handler.get();
5376
5377 filter_sources(source_zone,
5378 source_bucket,
5379 handler->get_sources(),
5380 pipes);
5381
5382 for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
5383 if (!siter->source.has_bucket_info()) {
5384 buckets_info.emplace(siter->source.get_bucket(), all_bucket_info());
5385 }
5386 if (!siter->target.has_bucket_info()) {
5387 buckets_info.emplace(siter->target.get_bucket(), all_bucket_info());
5388 }
5389 }
5390 }
5391
5392 void RGWGetBucketPeersCR::update_from_source_bucket_policy()
5393 {
5394 if (!source_policy ||
5395 !source_policy->policy_handler ||
5396 !pipes) {
5397 return;
5398 }
5399
5400 auto handler = source_policy->policy_handler.get();
5401
5402 filter_targets(sync_env->svc->zone->get_zone().id,
5403 target_bucket,
5404 handler->get_targets(),
5405 pipes);
5406
5407 for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
5408 if (!siter->source.has_bucket_info()) {
5409 buckets_info.emplace(siter->source.get_bucket(), all_bucket_info());
5410 }
5411 if (!siter->target.has_bucket_info()) {
5412 buckets_info.emplace(siter->target.get_bucket(), all_bucket_info());
5413 }
5414 }
5415 }
5416
5417
5418 class RGWSyncGetBucketSyncPolicyHandlerCR : public RGWCoroutine {
5419 RGWDataSyncEnv *sync_env;
5420 rgw_bucket bucket;
5421 rgw_bucket_get_sync_policy_params get_policy_params;
5422
5423 std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
5424
5425 RGWSyncTraceNodeRef tn;
5426
5427 int i;
5428
5429 public:
5430 RGWSyncGetBucketSyncPolicyHandlerCR(RGWDataSyncEnv *_sync_env,
5431 std::optional<rgw_zone_id> zone,
5432 const rgw_bucket& _bucket,
5433 std::shared_ptr<rgw_bucket_get_sync_policy_result>& _policy,
5434 const RGWSyncTraceNodeRef& _tn_parent)
5435 : RGWCoroutine(_sync_env->cct),
5436 sync_env(_sync_env),
5437 bucket(_bucket),
5438 policy(_policy),
5439 tn(sync_env->sync_tracer->add_node(_tn_parent, "get_sync_policy_handler",
5440 SSTR(bucket))) {
5441 get_policy_params.zone = zone;
5442 get_policy_params.bucket = bucket;
5443 }
5444
5445 int operate(const DoutPrefixProvider *dpp) override {
5446 reenter(this) {
5447 for (i = 0; i < 2; ++i) {
5448 yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
5449 sync_env->driver,
5450 get_policy_params,
5451 policy,
5452 dpp));
5453 if (retcode < 0 &&
5454 retcode != -ENOENT) {
5455 return set_cr_error(retcode);
5456 }
5457
5458 if (retcode == 0) {
5459 return set_cr_done();
5460 }
5461
5462 /* bucket instance was not found,
5463 * try to get bucket instance info, can trigger
5464 * metadata sync of bucket instance
5465 */
5466 yield call(new RGWSyncGetBucketInfoCR(sync_env,
5467 bucket,
5468 nullptr,
5469 nullptr,
5470 tn));
5471 if (retcode < 0) {
5472 return set_cr_error(retcode);
5473 }
5474 }
5475 }
5476
5477 return 0;
5478 }
5479 };
5480
5481
5482 int RGWGetBucketPeersCR::operate(const DoutPrefixProvider *dpp)
5483 {
5484 reenter(this) {
5485 if (pipes) {
5486 pipes->clear();
5487 }
5488 if (target_bucket) {
5489 target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
5490 yield call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env,
5491 nullopt,
5492 *target_bucket,
5493 target_policy,
5494 tn));
5495 if (retcode < 0 &&
5496 retcode != -ENOENT) {
5497 return set_cr_error(retcode);
5498 }
5499
5500 update_from_target_bucket_policy();
5501 }
5502
5503 if (source_bucket && source_zone) {
5504 source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
5505 yield call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env,
5506 source_zone,
5507 *source_bucket,
5508 source_policy,
5509 tn));
5510 if (retcode < 0 &&
5511 retcode != -ENOENT) {
5512 return set_cr_error(retcode);
5513 }
5514
5515 if (source_policy->policy_handler) {
5516 auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
5517 auto& opt_attrs = source_policy->policy_handler->get_bucket_attrs();
5518 if (opt_bucket_info && opt_attrs) {
5519 source_bucket_info.emplace();
5520 source_bucket_info->bucket_info = *opt_bucket_info;
5521 source_bucket_info->attrs = *opt_attrs;
5522 }
5523 }
5524
5525 if (!target_bucket) {
5526 get_hint_targets_action = make_shared<GetHintTargets>(sync_env, *source_bucket);
5527
5528 yield call(new RGWGenericAsyncCR(cct, sync_env->async_rados,
5529 get_hint_targets_action));
5530 if (retcode < 0) {
5531 return set_cr_error(retcode);
5532 }
5533
5534 /* hints might have incomplete bucket ids,
5535 * in which case we need to figure out the current
5536 * bucket_id
5537 */
5538 for (hiter = get_hint_targets_action->targets.begin();
5539 hiter != get_hint_targets_action->targets.end();
5540 ++hiter) {
5541 ldpp_dout(dpp, 20) << "Got sync hint for bucket=" << *source_bucket << ": " << hiter->get_key() << dendl;
5542
5543 target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
5544 yield call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env,
5545 nullopt,
5546 *hiter,
5547 target_policy,
5548 tn));
5549 if (retcode < 0 &&
5550 retcode != -ENOENT) {
5551 return set_cr_error(retcode);
5552 }
5553 update_from_target_bucket_policy();
5554 }
5555 }
5556 }
5557
5558 update_from_source_bucket_policy();
5559
5560 for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {
5561 if (siiter->second.bucket_info.bucket.name.empty()) {
5562 yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first,
5563 &siiter->second.bucket_info,
5564 &siiter->second.attrs,
5565 tn));
5566 }
5567 }
5568
5569 if (pipes) {
5570 pipes->update_empty_bucket_info(buckets_info);
5571 }
5572
5573 return set_cr_done();
5574 }
5575
5576 return 0;
5577 }
5578
5579 class RGWSyncBucketShardCR : public RGWCoroutine {
5580 RGWDataSyncCtx *sc;
5581 RGWDataSyncEnv *sync_env;
5582 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
5583 rgw_bucket_sync_pair_info sync_pair;
5584 rgw_bucket_sync_pipe& sync_pipe;
5585 bool& bucket_stopped;
5586 uint64_t generation;
5587 ceph::real_time* progress;
5588
5589 const std::string shard_status_oid;
5590 const rgw_raw_obj bucket_status_obj;
5591 rgw_bucket_shard_sync_info sync_status;
5592 RGWObjVersionTracker objv_tracker;
5593
5594 RGWSyncTraceNodeRef tn;
5595
5596 public:
5597 RGWSyncBucketShardCR(RGWDataSyncCtx *_sc,
5598 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
5599 const rgw_bucket_sync_pair_info& _sync_pair,
5600 rgw_bucket_sync_pipe& sync_pipe,
5601 bool& bucket_stopped,
5602 uint64_t generation,
5603 const RGWSyncTraceNodeRef& tn,
5604 ceph::real_time* progress)
5605 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
5606 lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
5607 sync_pipe(sync_pipe), bucket_stopped(bucket_stopped), generation(generation), progress(progress),
5608 shard_status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair, generation)),
5609 bucket_status_obj(sc->env->svc->zone->get_zone_params().log_pool,
5610 RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
5611 sync_pair.source_bs.bucket,
5612 sync_pair.dest_bucket)),
5613 tn(tn) {
5614 }
5615
5616 int operate(const DoutPrefixProvider *dpp) override;
5617 };
5618
5619 int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
5620 {
5621 reenter(this) {
5622 objv_tracker.clear();
5623 yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker, generation));
5624 if (retcode < 0 && retcode != -ENOENT) {
5625 tn->log(0, SSTR("ERROR: failed to read sync status for bucket. error: " << retcode));
5626 return set_cr_error(retcode);
5627 }
5628
5629 tn->log(20, SSTR("sync status for source bucket shard: " << sync_status.state));
5630 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
5631 if (progress) {
5632 *progress = sync_status.inc_marker.timestamp;
5633 }
5634
5635 yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
5636 shard_status_oid, bucket_status_obj, lease_cr,
5637 sync_status, generation, tn,
5638 objv_tracker, progress));
5639 if (retcode < 0) {
5640 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
5641 return set_cr_error(retcode);
5642 }
5643
5644 if (sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
5645 tn->log(20, SSTR("syncstopped indication for source bucket shard"));
5646 bucket_stopped = true;
5647 }
5648
5649 return set_cr_done();
5650 }
5651
5652 return 0;
5653 }
5654
5655 class RGWSyncBucketCR : public RGWCoroutine {
5656 RGWDataSyncCtx *sc;
5657 RGWDataSyncEnv *env;
5658 boost::intrusive_ptr<const RGWContinuousLeaseCR> data_lease_cr;
5659 boost::intrusive_ptr<RGWContinuousLeaseCR> bucket_lease_cr;
5660 rgw_bucket_sync_pair_info sync_pair;
5661 rgw_bucket_sync_pipe sync_pipe;
5662 std::optional<uint64_t> gen;
5663 ceph::real_time* progress;
5664
5665 const std::string lock_name = "bucket sync";
5666 const uint32_t lock_duration;
5667 const rgw_raw_obj status_obj;
5668 rgw_bucket_sync_status bucket_status;
5669 bool bucket_stopped = false;
5670 RGWObjVersionTracker objv;
5671 bool init_check_compat = false;
5672 rgw_bucket_index_marker_info info;
5673 rgw_raw_obj error_repo;
5674 rgw_bucket_shard source_bs;
5675 rgw_pool pool;
5676 uint64_t current_gen = 0;
5677
5678 RGWSyncTraceNodeRef tn;
5679
5680 public:
5681 RGWSyncBucketCR(RGWDataSyncCtx *_sc,
5682 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
5683 const rgw_bucket_sync_pair_info& _sync_pair,
5684 std::optional<uint64_t> gen,
5685 const RGWSyncTraceNodeRef& _tn_parent,
5686 ceph::real_time* progress)
5687 : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
5688 data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
5689 gen(gen), progress(progress),
5690 lock_duration(cct->_conf->rgw_sync_lease_period),
5691 status_obj(env->svc->zone->get_zone_params().log_pool,
5692 RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
5693 sync_pair.source_bs.bucket,
5694 sync_pair.dest_bucket)),
5695 tn(env->sync_tracer->add_node(_tn_parent, "bucket",
5696 SSTR(bucket_str{_sync_pair.dest_bucket} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
5697 }
5698
5699 int operate(const DoutPrefixProvider *dpp) override;
5700 };
5701
5702 static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
5703 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
5704 const rgw_bucket_sync_pair_info& sync_pair,
5705 std::optional<uint64_t> gen,
5706 const RGWSyncTraceNodeRef& tn,
5707 ceph::real_time* progress)
5708 {
5709 return new RGWSyncBucketCR(sc, std::move(lease), sync_pair,
5710 gen, tn, progress);
5711 }
5712
5713 #define RELEASE_LOCK(cr) \
5714 if (cr) {cr->go_down(); drain_all(); cr.reset();}
5715
5716 int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
5717 {
5718 reenter(this) {
5719 // read source/destination bucket info
5720 yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
5721 &sync_pipe.source_bucket_attrs, tn));
5722 if (retcode < 0) {
5723 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
5724 return set_cr_error(retcode);
5725 }
5726
5727 yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.dest_bucket, &sync_pipe.dest_bucket_info,
5728 &sync_pipe.dest_bucket_attrs, tn));
5729 if (retcode < 0) {
5730 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
5731 return set_cr_error(retcode);
5732 }
5733
5734 sync_pipe.info = sync_pair;
5735
5736 // read bucket sync status
5737 using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
5738 using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
5739
5740 objv.clear();
5741 yield call(new ReadCR(dpp, env->driver,
5742 status_obj, &bucket_status, false, &objv));
5743 if (retcode == -ENOENT) {
5744 // if the full sync status object didn't exist yet, run the backward
5745 // compatability logic in InitBucketFullSyncStatusCR below. if it did
5746 // exist, a `bucket sync init` probably requested its re-initialization,
5747 // and shouldn't try to resume incremental sync
5748 init_check_compat = true;
5749
5750 // use exclusive create to set state=Init
5751 objv.generate_new_write_ver(cct);
5752 yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status, &objv, true));
5753 tn->log(20, "bucket status object does not exist, create a new one");
5754 if (retcode == -EEXIST) {
5755 // raced with another create, read its status
5756 tn->log(20, "raced with another create, read its status");
5757 objv.clear();
5758 yield call(new ReadCR(dpp, env->driver,
5759 status_obj, &bucket_status, false, &objv));
5760 }
5761 }
5762 if (retcode < 0) {
5763 tn->log(20, SSTR("ERROR: failed to read bucket status object. error: " << retcode));
5764 return set_cr_error(retcode);
5765 }
5766
5767 do {
5768 tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state <<
5769 ". lease is: " << (bucket_lease_cr ? "taken" : "not taken") << ". stop indications is: " << bucket_stopped));
5770
5771 if (bucket_status.state != BucketSyncState::Incremental ||
5772 bucket_stopped) {
5773
5774 if (!bucket_lease_cr) {
5775 bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
5776 lock_name, lock_duration, this, &sc->lcc));
5777 yield spawn(bucket_lease_cr.get(), false);
5778 while (!bucket_lease_cr->is_locked()) {
5779 if (bucket_lease_cr->is_done()) {
5780 tn->log(5, "failed to take lease");
5781 set_status("lease lock failed, early abort");
5782 drain_all();
5783 return set_cr_error(bucket_lease_cr->get_ret_status());
5784 }
5785 tn->log(5, "waiting on bucket lease");
5786 yield set_sleeping(true);
5787 }
5788 }
5789
5790 // if state is Init or Stopped, we query the remote RGW for ther state
5791 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.source_bs.bucket, &info));
5792 if (retcode < 0) {
5793 RELEASE_LOCK(bucket_lease_cr);
5794 return set_cr_error(retcode);
5795 }
5796 if (info.syncstopped) {
5797 // remote indicates stopped state
5798 tn->log(20, "remote bilog indicates that sync was stopped");
5799
5800 // if state was incremental, remove all per-shard status objects
5801 if (bucket_status.state == BucketSyncState::Incremental) {
5802 yield {
5803 const auto num_shards = bucket_status.shards_done_with_gen.size();
5804 const auto gen = bucket_status.incremental_gen;
5805 call(new RemoveBucketShardStatusCollectCR(sc, sync_pair, gen, num_shards));
5806 }
5807 }
5808
5809 // check if local state is "stopped"
5810 objv.clear();
5811 yield call(new ReadCR(dpp, env->driver,
5812 status_obj, &bucket_status, false, &objv));
5813 if (retcode < 0) {
5814 tn->log(20, SSTR("ERROR: failed to read status before writing 'stopped'. error: " << retcode));
5815 RELEASE_LOCK(bucket_lease_cr);
5816 return set_cr_error(retcode);
5817 }
5818 if (bucket_status.state != BucketSyncState::Stopped) {
5819 // make sure that state is changed to stopped localy
5820 bucket_status.state = BucketSyncState::Stopped;
5821 yield call(new WriteCR(dpp, env->driver, status_obj, bucket_status,
5822 &objv, false));
5823 if (retcode < 0) {
5824 tn->log(20, SSTR("ERROR: failed to write 'stopped' status. error: " << retcode));
5825 RELEASE_LOCK(bucket_lease_cr);
5826 return set_cr_error(retcode);
5827 }
5828 }
5829 RELEASE_LOCK(bucket_lease_cr);
5830 return set_cr_done();
5831 }
5832 if (bucket_stopped) {
5833 tn->log(20, SSTR("ERROR: switched from 'stop' to 'start' sync. while state is: " << bucket_status.state));
5834 bucket_stopped = false;
5835 bucket_status.state = BucketSyncState::Init;
5836 }
5837 }
5838
5839 if (bucket_status.state != BucketSyncState::Incremental) {
5840 // if the state wasn't Incremental, take a bucket-wide lease to prevent
5841 // different shards from duplicating the init and full sync
5842 if (!bucket_lease_cr) {
5843 bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
5844 lock_name, lock_duration, this, &sc->lcc));
5845 yield spawn(bucket_lease_cr.get(), false);
5846 while (!bucket_lease_cr->is_locked()) {
5847 if (bucket_lease_cr->is_done()) {
5848 tn->log(5, "failed to take lease");
5849 set_status("lease lock failed, early abort");
5850 drain_all();
5851 return set_cr_error(bucket_lease_cr->get_ret_status());
5852 }
5853 tn->log(5, "waiting on bucket lease");
5854 yield set_sleeping(true);
5855 }
5856 }
5857
5858 // reread the status after acquiring the lock
5859 objv.clear();
5860 yield call(new ReadCR(dpp, env->driver, status_obj,
5861 &bucket_status, false, &objv));
5862 if (retcode < 0) {
5863 RELEASE_LOCK(bucket_lease_cr);
5864 tn->log(20, SSTR("ERROR: reading the status after acquiring the lock failed. error: " << retcode));
5865 return set_cr_error(retcode);
5866 }
5867 tn->log(20, SSTR("status after acquiring the lock is: " << bucket_status.state));
5868
5869 yield call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
5870 bucket_status, objv,
5871 sync_pipe.source_bucket_info,
5872 init_check_compat, info));
5873
5874 if (retcode < 0) {
5875 tn->log(20, SSTR("ERROR: init full sync failed. error: " << retcode));
5876 RELEASE_LOCK(bucket_lease_cr);
5877 return set_cr_error(retcode);
5878 }
5879 }
5880
5881 assert(bucket_status.state == BucketSyncState::Incremental ||
5882 bucket_status.state == BucketSyncState::Full);
5883
5884 if (bucket_status.state == BucketSyncState::Full) {
5885 assert(bucket_lease_cr);
5886 yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
5887 bucket_lease_cr, bucket_status,
5888 tn, objv));
5889 if (retcode < 0) {
5890 tn->log(20, SSTR("ERROR: full sync failed. error: " << retcode));
5891 RELEASE_LOCK(bucket_lease_cr);
5892 return set_cr_error(retcode);
5893 }
5894 }
5895
5896 if (bucket_status.state == BucketSyncState::Incremental) {
5897 // lease not required for incremental sync
5898 RELEASE_LOCK(bucket_lease_cr);
5899
5900 assert(sync_pair.source_bs.shard_id >= 0);
5901 // if a specific gen was requested, compare that to the sync status
5902 if (gen) {
5903 current_gen = bucket_status.incremental_gen;
5904 source_bs = sync_pair.source_bs;
5905 if (*gen > current_gen) {
5906 /* In case the data log entry is missing for previous gen, it may
5907 * not be marked complete and the sync can get stuck. To avoid it,
5908 * may be we can add this (shardid, gen) to error repo to force
5909 * sync and mark that shard as completed.
5910 */
5911 pool = sc->env->svc->zone->get_zone_params().log_pool;
5912 if ((static_cast<std::size_t>(source_bs.shard_id) < bucket_status.shards_done_with_gen.size()) &&
5913 !bucket_status.shards_done_with_gen[source_bs.shard_id]) {
5914 // use the error repo and sync status timestamp from the datalog shard corresponding to source_bs
5915 error_repo = datalog_oid_for_error_repo(sc, sc->env->driver,
5916 pool, source_bs);
5917 yield call(rgw::error_repo::write_cr(sc->env->driver->svc()->rados, error_repo,
5918 rgw::error_repo::encode_key(source_bs, current_gen),
5919 ceph::real_clock::zero()));
5920 if (retcode < 0) {
5921 tn->log(0, SSTR("ERROR: failed to log prev gen entry (bucket=" << source_bs.bucket << ", shard_id=" << source_bs.shard_id << ", gen=" << current_gen << " in error repo: retcode=" << retcode));
5922 } else {
5923 tn->log(20, SSTR("logged prev gen entry (bucket=" << source_bs.bucket << ", shard_id=" << source_bs.shard_id << ", gen=" << current_gen << " in error repo: retcode=" << retcode));
5924 }
5925 }
5926 retcode = -EAGAIN;
5927 tn->log(10, SSTR("ERROR: requested sync of future generation "
5928 << *gen << " > " << current_gen
5929 << ", returning " << retcode << " for later retry"));
5930 return set_cr_error(retcode);
5931 } else if (*gen < current_gen) {
5932 tn->log(10, SSTR("WARNING: requested sync of past generation "
5933 << *gen << " < " << current_gen
5934 << ", returning success"));
5935 return set_cr_done();
5936 }
5937 }
5938
5939 if (static_cast<std::size_t>(sync_pair.source_bs.shard_id) >= bucket_status.shards_done_with_gen.size()) {
5940 tn->log(1, SSTR("bucket shard " << sync_pair.source_bs << " index out of bounds"));
5941 return set_cr_done(); // return success so we don't retry
5942 }
5943 if (bucket_status.shards_done_with_gen[sync_pair.source_bs.shard_id]) {
5944 tn->log(10, SSTR("bucket shard " << sync_pair.source_bs << " of gen " <<
5945 gen << " already synced."));
5946 return set_cr_done();
5947 }
5948
5949 yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
5950 sync_pipe, bucket_stopped,
5951 bucket_status.incremental_gen, tn, progress));
5952 if (retcode < 0) {
5953 tn->log(20, SSTR("ERROR: incremental sync failed. error: " << retcode));
5954 return set_cr_error(retcode);
5955 }
5956 }
5957 // loop back to previous states unless incremental sync returns normally
5958 } while (bucket_status.state != BucketSyncState::Incremental || bucket_stopped);
5959
5960 return set_cr_done();
5961 }
5962
5963 return 0;
5964 }
5965
5966 int RGWBucketPipeSyncStatusManager::do_init(const DoutPrefixProvider *dpp,
5967 std::ostream* ostr)
5968 {
5969 int ret = http_manager.start();
5970 if (ret < 0) {
5971 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
5972 return ret;
5973 }
5974
5975 sync_module.reset(new RGWDefaultSyncModuleInstance());
5976 auto async_rados = driver->svc()->rados->get_async_processor();
5977
5978 sync_env.init(this, driver->ctx(), driver,
5979 driver->svc(), async_rados, &http_manager,
5980 error_logger.get(), driver->getRados()->get_sync_tracer(),
5981 sync_module, nullptr);
5982
5983 sync_env.ostr = ostr;
5984
5985 rgw_sync_pipe_info_set pipes;
5986
5987 ret = cr_mgr.run(dpp, new RGWGetBucketPeersCR(&sync_env,
5988 dest_bucket,
5989 source_zone,
5990 source_bucket,
5991 &pipes,
5992 sync_env.sync_tracer->root_node));
5993 if (ret < 0) {
5994 ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
5995 return ret;
5996 }
5997
5998 if (pipes.empty()) {
5999 ldpp_dout(this, 0) << "No peers. This is not a valid multisite configuration." << dendl;
6000 return -EINVAL;
6001 }
6002
6003 for (auto& pipe : pipes) {
6004 auto& szone = pipe.source.zone;
6005
6006 auto conn = driver->svc()->zone->get_zone_conn(szone);
6007 if (!conn) {
6008 ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
6009 return -EINVAL;
6010 }
6011
6012 RGWZone* z;
6013 if (!(z = driver->svc()->zone->find_zone(szone))) {
6014 ldpp_dout(this, 0) << "zone " << szone << " does not exist" << dendl;
6015 return -EINVAL;
6016 }
6017 sources.emplace_back(&sync_env, szone, conn,
6018 pipe.source.get_bucket_info(),
6019 pipe.target.get_bucket(),
6020 pipe.handler, z->name);
6021 }
6022
6023 return 0;
6024 }
6025
6026 int RGWBucketPipeSyncStatusManager::remote_info(const DoutPrefixProvider *dpp,
6027 source& s,
6028 uint64_t* oldest_gen,
6029 uint64_t* latest_gen,
6030 uint64_t* num_shards)
6031 {
6032 rgw_bucket_index_marker_info remote_info;
6033 BucketIndexShardsManager remote_markers;
6034 auto r = rgw_read_remote_bilog_info(dpp, s.sc.conn, s.info.bucket,
6035 remote_info, remote_markers,
6036 null_yield);
6037
6038 if (r < 0) {
6039 ldpp_dout(dpp, 0) << __PRETTY_FUNCTION__ << ":" << __LINE__
6040 << " rgw_read_remote_bilog_info: r="
6041 << r << dendl;
6042 return r;
6043 }
6044 if (oldest_gen)
6045 *oldest_gen = remote_info.oldest_gen;
6046
6047 if (latest_gen)
6048 *latest_gen = remote_info.latest_gen;
6049
6050 if (num_shards)
6051 *num_shards = remote_markers.get().size();
6052
6053 return 0;
6054 }
6055
6056 tl::expected<std::unique_ptr<RGWBucketPipeSyncStatusManager>, int>
6057 RGWBucketPipeSyncStatusManager::construct(
6058 const DoutPrefixProvider* dpp,
6059 rgw::sal::RadosStore* driver,
6060 std::optional<rgw_zone_id> source_zone,
6061 std::optional<rgw_bucket> source_bucket,
6062 const rgw_bucket& dest_bucket,
6063 std::ostream* ostr)
6064 {
6065 std::unique_ptr<RGWBucketPipeSyncStatusManager> self{
6066 new RGWBucketPipeSyncStatusManager(driver, source_zone, source_bucket,
6067 dest_bucket)};
6068 auto r = self->do_init(dpp, ostr);
6069 if (r < 0) {
6070 return tl::unexpected(r);
6071 }
6072 return self;
6073 }
6074
6075 int RGWBucketPipeSyncStatusManager::init_sync_status(
6076 const DoutPrefixProvider *dpp)
6077 {
6078 // Just running one at a time saves us from buildup/teardown and in
6079 // practice we only do one zone at a time.
6080 for (auto& source : sources) {
6081 list<RGWCoroutinesStack*> stacks;
6082 RGWCoroutinesStack *stack = new RGWCoroutinesStack(driver->ctx(), &cr_mgr);
6083 pretty_print(source.sc.env, "Initializing sync state of bucket {} with zone {}.\n",
6084 source.info.bucket.name, source.zone_name);
6085 stack->call(new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
6086 dpp, source.sc.env->driver,
6087 {sync_env.svc->zone->get_zone_params().log_pool,
6088 full_status_oid(source.sc.source_zone,
6089 source.info.bucket,
6090 source.dest)},
6091 rgw_bucket_sync_status{}));
6092 stacks.push_back(stack);
6093 auto r = cr_mgr.run(dpp, stacks);
6094 if (r < 0) {
6095 pretty_print(source.sc.env,
6096 "Initialization of sync state for bucket {} with zone {} "
6097 "failed with error {}\n",
6098 source.info.bucket.name, source.zone_name, cpp_strerror(r));
6099 }
6100 }
6101 return 0;
6102 }
6103
6104 tl::expected<std::map<int, rgw_bucket_shard_sync_info>, int>
6105 RGWBucketPipeSyncStatusManager::read_sync_status(
6106 const DoutPrefixProvider *dpp)
6107 {
6108 std::map<int, rgw_bucket_shard_sync_info> sync_status;
6109 list<RGWCoroutinesStack *> stacks;
6110
6111 auto sz = sources.begin();
6112
6113 if (source_zone) {
6114 sz = std::find_if(sources.begin(), sources.end(),
6115 [this](const source& s) {
6116 return s.sc.source_zone == *source_zone;
6117 }
6118 );
6119 if (sz == sources.end()) {
6120 ldpp_dout(this, 0) << "ERROR: failed to find source zone: "
6121 << *source_zone << dendl;
6122 return tl::unexpected(-ENOENT);
6123 }
6124 } else {
6125 ldpp_dout(this, 5) << "No source zone specified, using source zone: "
6126 << sz->sc.source_zone << dendl;
6127 return tl::unexpected(-ENOENT);
6128 }
6129 uint64_t num_shards, latest_gen;
6130 auto ret = remote_info(dpp, *sz, nullptr, &latest_gen, &num_shards);
6131 if (ret < 0) {
6132 ldpp_dout(this, 5) << "Unable to get remote info: "
6133 << ret << dendl;
6134 return tl::unexpected(ret);
6135 }
6136 auto stack = new RGWCoroutinesStack(driver->ctx(), &cr_mgr);
6137 std::vector<rgw_bucket_sync_pair_info> pairs(num_shards);
6138 for (auto shard = 0u; shard < num_shards; ++shard) {
6139 auto& pair = pairs[shard];
6140 pair.source_bs.bucket = sz->info.bucket;
6141 pair.dest_bucket = sz->dest;
6142 pair.source_bs.shard_id = shard;
6143 stack->call(new RGWReadBucketPipeSyncStatusCoroutine(
6144 &sz->sc, pair, &sync_status[shard],
6145 nullptr, latest_gen));
6146 }
6147
6148 stacks.push_back(stack);
6149
6150 ret = cr_mgr.run(dpp, stacks);
6151 if (ret < 0) {
6152 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
6153 << bucket_str{dest_bucket} << dendl;
6154 return tl::unexpected(ret);
6155 }
6156
6157 return sync_status;
6158 }
6159
6160 namespace rgw::bucket_sync_run {
6161 // Retry-loop over calls to sync_bucket_shard_cr
6162 class ShardCR : public RGWCoroutine {
6163 static constexpr auto allowed_retries = 10u;
6164
6165 RGWDataSyncCtx& sc;
6166 const rgw_bucket_sync_pair_info& pair;
6167 const uint64_t gen;
6168 unsigned retries = 0;
6169
6170 ceph::real_time prev_progress;
6171 ceph::real_time progress;
6172
6173 public:
6174
6175 ShardCR(RGWDataSyncCtx& sc, const rgw_bucket_sync_pair_info& pair,
6176 const uint64_t gen)
6177 : RGWCoroutine(sc.cct), sc(sc), pair(pair), gen(gen) {}
6178
6179 int operate(const DoutPrefixProvider *dpp) override {
6180 reenter(this) {
6181 // Since all errors (except ECANCELED) are considered retryable,
6182 // retry other errors so long as we're making progress.
6183 for (retries = 0u, retcode = -EDOM;
6184 (retries < allowed_retries) && (retcode != 0);
6185 ++retries) {
6186 ldpp_dout(dpp, 5) << "ShardCR: syncing bucket shard on: "
6187 << "zone=" << sc.source_zone
6188 << ", bucket=" << pair.source_bs.bucket.name
6189 << ", shard=" << pair.source_bs.shard_id
6190 << ", gen=" << gen
6191 << dendl;
6192 yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen,
6193 sc.env->sync_tracer->root_node,
6194 &progress));
6195
6196 if (retcode == -ECANCELED) {
6197 ldpp_dout(dpp, -1) << "ERROR: Got -ECANCELED for "
6198 << pair.source_bs << dendl;
6199 drain_all();
6200 return set_cr_error(retcode);
6201 } else if (retcode < 0) {
6202 ldpp_dout(dpp, 5) << "WARNING: Got error, retcode=" << retcode << " for "
6203 << pair.source_bs << "on retry "
6204 << retries + 1 << " of " << allowed_retries
6205 << " allowed" << dendl;
6206 // Reset the retry counter if we made any progress
6207 if (progress != prev_progress) {
6208 retries = 0;
6209 }
6210 prev_progress = progress;
6211 }
6212 }
6213 if (retcode < 0) {
6214 ldpp_dout(dpp, -1) << "ERROR: Exhausted retries for "
6215 << pair.source_bs << " retcode="
6216 << retcode << dendl;
6217 drain_all();
6218 return set_cr_error(retcode);
6219 }
6220
6221 drain_all();
6222 return set_cr_done();
6223 }
6224 return 0;
6225 }
6226 };
6227
6228 // Loop over calls to ShardCR with limited concurrency
6229 class GenCR : public RGWShardCollectCR {
6230 static constexpr auto MAX_CONCURRENT_SHARDS = 64;
6231
6232 RGWDataSyncCtx& sc;
6233 const uint64_t gen;
6234
6235 std::vector<rgw_bucket_sync_pair_info> pairs;
6236 decltype(pairs)::const_iterator iter;
6237
6238 public:
6239 GenCR(RGWDataSyncCtx& sc, const rgw_bucket& source, const rgw_bucket& dest,
6240 const uint64_t gen, const uint64_t shards,
6241 const RGWBucketSyncFlowManager::pipe_handler& handler)
6242 : RGWShardCollectCR(sc.cct, MAX_CONCURRENT_SHARDS),
6243 sc(sc), gen(gen) {
6244 pairs.resize(shards);
6245 for (auto shard = 0u; shard < shards; ++shard) {
6246 auto& pair = pairs[shard];
6247 pair.handler = handler;
6248 pair.source_bs.bucket = source;
6249 pair.dest_bucket = dest;
6250 pair.source_bs.shard_id = shard;
6251 }
6252 iter = pairs.cbegin();
6253 assert(pairs.size() == shards);
6254 }
6255
6256 virtual bool spawn_next() override {
6257 if (iter == pairs.cend()) {
6258 return false;
6259 }
6260 spawn(new ShardCR(sc, *iter, gen), false);
6261 ++iter;
6262 return true;
6263 }
6264
6265 int handle_result(int r) override {
6266 if (r < 0) {
6267 ldpp_dout(sc.env->dpp, 4) << "ERROR: Error syncing shard: "
6268 << cpp_strerror(r) << dendl;
6269 }
6270 return r;
6271 }
6272 };
6273
6274 // Read sync status, loop over calls to GenCR
6275 class SourceCR : public RGWCoroutine {
6276 RGWDataSyncCtx& sc;
6277 const RGWBucketInfo& info;
6278 const rgw_bucket& dest;
6279 const RGWBucketSyncFlowManager::pipe_handler& handler;
6280 const rgw_raw_obj status_obj{
6281 sc.env->svc->zone->get_zone_params().log_pool,
6282 RGWBucketPipeSyncStatusManager::full_status_oid(sc.source_zone, info.bucket,
6283 dest)};
6284
6285 BucketSyncState state = BucketSyncState::Incremental;
6286 uint64_t gen = 0;
6287 uint64_t num_shards = 0;
6288 rgw_bucket_sync_status status;
6289 std::string zone_name;
6290
6291 public:
6292
6293 SourceCR(RGWDataSyncCtx& sc, const RGWBucketInfo& info,
6294 const rgw_bucket& dest,
6295 const RGWBucketSyncFlowManager::pipe_handler& handler,
6296 const std::string& zone_name)
6297 : RGWCoroutine(sc.cct), sc(sc), info(info), dest(dest), handler(handler),
6298 zone_name(zone_name) {}
6299
6300 int operate(const DoutPrefixProvider *dpp) override {
6301 reenter(this) {
6302 // Get the source's status. In incremental sync, this gives us
6303 // the generation and shard count that is next needed to be run.
6304 yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
6305 dpp, sc.env->driver, status_obj, &status));
6306 if (retcode < 0) {
6307 ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
6308 << sc.source_zone << " retcode="
6309 << retcode << dendl;
6310 drain_all();
6311 return set_cr_error(retcode);
6312 }
6313
6314 if (status.state == BucketSyncState::Stopped) {
6315 // Nothing to do.
6316 pretty_print(sc.env, "Sync of bucket {} from source zone {} is in state Stopped. "
6317 "Nothing to do.\n", dest.name, zone_name);
6318 ldpp_dout(dpp, 5) << "SourceCR: Bucket is in state Stopped, returning."
6319 << dendl;
6320 drain_all();
6321 return set_cr_done();
6322 }
6323
6324 do {
6325 state = status.state;
6326 gen = status.incremental_gen;
6327 num_shards = status.shards_done_with_gen.size();
6328
6329 ldpp_dout(dpp, 5) << "SourceCR: "
6330 << "state=" << state
6331 << ", gen=" << gen
6332 << ", num_shards=" << num_shards
6333 << dendl;
6334
6335 // Special case to handle full sync. Since full sync no longer
6336 // uses shards and has no generations, we sync shard zero,
6337 // though use the current generation so a following
6338 // incremental sync can carry on.
6339 if (state != BucketSyncState::Incremental) {
6340 pretty_print(sc.env, "Beginning full sync of bucket {} from source zone {}.\n",
6341 dest.name, zone_name);
6342 ldpp_dout(dpp, 5) << "SourceCR: Calling GenCR with "
6343 << "gen=" << gen
6344 << ", num_shards=" << 1
6345 << dendl;
6346 yield call(new GenCR(sc, info.bucket, dest, gen, 1, handler));
6347 } else {
6348 pretty_print(sc.env, "Beginning incremental sync of bucket {}, generation {} from source zone {}.\n",
6349 dest.name, gen, zone_name);
6350 ldpp_dout(dpp, 5) << "SourceCR: Calling GenCR with "
6351 << "gen=" << gen
6352 << ", num_shards=" << num_shards
6353 << dendl;
6354 yield call(new GenCR(sc, info.bucket, dest, gen, num_shards,
6355 handler));
6356 }
6357 if (retcode < 0) {
6358 ldpp_dout(dpp, -1) << "ERROR: Giving up syncing from "
6359 << sc.source_zone << " retcode="
6360 << retcode << dendl;
6361 drain_all();
6362 return set_cr_error(retcode);
6363 }
6364
6365 pretty_print(sc.env, "Completed.\n");
6366
6367 yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_status>(
6368 dpp, sc.env->driver, status_obj, &status));
6369 if (retcode < 0) {
6370 ldpp_dout(dpp, -1) << "ERROR: Unable to fetch status for zone="
6371 << sc.source_zone << " retcode="
6372 << retcode << dendl;
6373 drain_all();
6374 return set_cr_error(retcode);
6375 }
6376 // Repeat until we have done an incremental run and the
6377 // generation remains unchanged.
6378 ldpp_dout(dpp, 5) << "SourceCR: "
6379 << "state=" << state
6380 << ", gen=" << gen
6381 << ", num_shards=" << num_shards
6382 << ", status.state=" << status.state
6383 << ", status.incremental_gen=" << status.incremental_gen
6384 << ", status.shards_done_with_gen.size()=" << status.shards_done_with_gen.size()
6385 << dendl;
6386 } while (state != BucketSyncState::Incremental ||
6387 gen != status.incremental_gen);
6388 drain_all();
6389 return set_cr_done();
6390 }
6391 return 0;
6392 }
6393 };
6394 } // namespace rgw::bucket_sync_run
6395
6396 int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp)
6397 {
6398 list<RGWCoroutinesStack *> stacks;
6399 for (auto& source : sources) {
6400 auto stack = new RGWCoroutinesStack(driver->ctx(), &cr_mgr);
6401 stack->call(new rgw::bucket_sync_run::SourceCR(
6402 source.sc, source.info, source.dest, source.handler,
6403 source.zone_name));
6404 stacks.push_back(stack);
6405 }
6406 auto ret = cr_mgr.run(dpp, stacks);
6407 if (ret < 0) {
6408 ldpp_dout(this, 0) << "ERROR: Sync unsuccessful on bucket "
6409 << bucket_str{dest_bucket} << dendl;
6410 }
6411 return ret;
6412 }
6413
6414 unsigned RGWBucketPipeSyncStatusManager::get_subsys() const
6415 {
6416 return dout_subsys;
6417 }
6418
6419 std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) const
6420 {
6421 auto zone = std::string_view{source_zone.value_or(rgw_zone_id("*")).id};
6422 return out << "bucket sync zone:" << zone.substr(0, 8)
6423 << " bucket:" << dest_bucket << ' ';
6424 }
6425
6426 string RGWBucketPipeSyncStatusManager::full_status_oid(const rgw_zone_id& source_zone,
6427 const rgw_bucket& source_bucket,
6428 const rgw_bucket& dest_bucket)
6429 {
6430 if (source_bucket == dest_bucket) {
6431 return bucket_full_status_oid_prefix + "." + source_zone.id + ":"
6432 + dest_bucket.get_key();
6433 } else {
6434 return bucket_full_status_oid_prefix + "." + source_zone.id + ":"
6435 + dest_bucket.get_key() + ":" + source_bucket.get_key();
6436 }
6437 }
6438
6439 inline std::string generation_token(uint64_t gen) {
6440 return (gen == 0) ? "" : (":" + std::to_string(gen));
6441 }
6442
6443 string RGWBucketPipeSyncStatusManager::inc_status_oid(const rgw_zone_id& source_zone,
6444 const rgw_bucket_sync_pair_info& sync_pair,
6445 uint64_t gen)
6446 {
6447 if (sync_pair.source_bs.bucket == sync_pair.dest_bucket) {
6448 return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.source_bs.get_key() +
6449 generation_token(gen);
6450 } else {
6451 return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bucket.get_key() + ":" + sync_pair.source_bs.get_key() +
6452 generation_token(gen);
6453 }
6454 }
6455
6456 string RGWBucketPipeSyncStatusManager::obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
6457 const rgw_zone_id& source_zone,
6458 const rgw_obj& obj)
6459 {
6460 string prefix = object_status_oid_prefix + "." + source_zone.id + ":" + obj.bucket.get_key();
6461 if (sync_pipe.source_bucket_info.bucket !=
6462 sync_pipe.dest_bucket_info.bucket) {
6463 prefix += string("/") + sync_pipe.dest_bucket_info.bucket.get_key();
6464 }
6465 return prefix + ":" + obj.key.name + ":" + obj.key.instance;
6466 }
6467
6468 int rgw_read_remote_bilog_info(const DoutPrefixProvider *dpp,
6469 RGWRESTConn* conn,
6470 const rgw_bucket& bucket,
6471 rgw_bucket_index_marker_info& info,
6472 BucketIndexShardsManager& markers,
6473 optional_yield y)
6474 {
6475 const auto instance_key = bucket.get_key();
6476 const rgw_http_param_pair params[] = {
6477 { "type" , "bucket-index" },
6478 { "bucket-instance", instance_key.c_str() },
6479 { "info" , nullptr },
6480 { nullptr, nullptr }
6481 };
6482 int r = conn->get_json_resource(dpp, "/admin/log/", params, y, info);
6483 if (r < 0) {
6484 ldpp_dout(dpp, -1) << "failed to fetch remote log markers: " << cpp_strerror(r) << dendl;
6485 return r;
6486 }
6487 // parse shard markers
6488 r = markers.from_string(info.max_marker, -1);
6489 if (r < 0) {
6490 ldpp_dout(dpp, -1) << "failed to decode remote log markers" << dendl;
6491 return r;
6492 }
6493 return 0;
6494 }
6495
6496 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
6497 static constexpr int max_concurrent_shards = 16;
6498 rgw::sal::RadosStore* const driver;
6499 RGWDataSyncCtx *const sc;
6500 RGWDataSyncEnv *const env;
6501 const uint64_t gen;
6502
6503 rgw_bucket_sync_pair_info sync_pair;
6504 using Vector = std::vector<rgw_bucket_shard_sync_info>;
6505 Vector::iterator i, end;
6506
6507 int handle_result(int r) override {
6508 if (r == -ENOENT) { // ENOENT is not a fatal error
6509 return 0;
6510 }
6511 if (r < 0) {
6512 ldout(cct, 4) << "failed to read bucket shard sync status: "
6513 << cpp_strerror(r) << dendl;
6514 }
6515 return r;
6516 }
6517 public:
6518 RGWCollectBucketSyncStatusCR(rgw::sal::RadosStore* driver, RGWDataSyncCtx *sc,
6519 const rgw_bucket_sync_pair_info& sync_pair,
6520 uint64_t gen,
6521 Vector *status)
6522 : RGWShardCollectCR(sc->cct, max_concurrent_shards),
6523 driver(driver), sc(sc), env(sc->env), gen(gen), sync_pair(sync_pair),
6524 i(status->begin()), end(status->end())
6525 {}
6526
6527 bool spawn_next() override {
6528 if (i == end) {
6529 return false;
6530 }
6531 spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr, gen), false);
6532 ++i;
6533 ++sync_pair.source_bs.shard_id;
6534 return true;
6535 }
6536 };
6537
6538 int rgw_read_bucket_full_sync_status(const DoutPrefixProvider *dpp,
6539 rgw::sal::RadosStore *driver,
6540 const rgw_sync_bucket_pipe& pipe,
6541 rgw_bucket_sync_status *status,
6542 optional_yield y)
6543 {
6544 auto get_oid = RGWBucketPipeSyncStatusManager::full_status_oid;
6545 const rgw_raw_obj obj{driver->svc()->zone->get_zone_params().log_pool,
6546 get_oid(*pipe.source.zone, *pipe.source.bucket, *pipe.dest.bucket)};
6547
6548 auto svc = driver->svc()->sysobj;
6549 auto sysobj = svc->get_obj(obj);
6550 bufferlist bl;
6551 int ret = sysobj.rop().read(dpp, &bl, y);
6552 if (ret < 0)
6553 return ret;
6554
6555 try {
6556 auto iter = bl.cbegin();
6557 using ceph::decode;
6558 rgw_bucket_sync_status result;
6559 decode(result, iter);
6560 *status = result;
6561 return 0;
6562 } catch (const buffer::error& err) {
6563 lderr(svc->ctx()) << "error decoding " << obj << ": " << err.what() << dendl;
6564 return -EIO;
6565 }
6566 }
6567
6568 int rgw_read_bucket_inc_sync_status(const DoutPrefixProvider *dpp,
6569 rgw::sal::RadosStore *driver,
6570 const rgw_sync_bucket_pipe& pipe,
6571 uint64_t gen,
6572 std::vector<rgw_bucket_shard_sync_info> *status)
6573 {
6574 if (!pipe.source.zone ||
6575 !pipe.source.bucket ||
6576 !pipe.dest.zone ||
6577 !pipe.dest.bucket) {
6578 return -EINVAL;
6579 }
6580
6581 rgw_bucket_sync_pair_info sync_pair;
6582 sync_pair.source_bs.bucket = *pipe.source.bucket;
6583 sync_pair.source_bs.shard_id = 0;
6584 sync_pair.dest_bucket = *pipe.dest.bucket;
6585
6586 RGWDataSyncEnv env;
6587 RGWSyncModuleInstanceRef module; // null sync module
6588 env.init(dpp, driver->ctx(), driver, driver->svc(), driver->svc()->rados->get_async_processor(),
6589 nullptr, nullptr, nullptr, module, nullptr);
6590
6591 RGWDataSyncCtx sc;
6592 sc.init(&env, nullptr, *pipe.source.zone);
6593
6594 RGWCoroutinesManager crs(driver->ctx(), driver->getRados()->get_cr_registry());
6595 return crs.run(dpp, new RGWCollectBucketSyncStatusCR(driver, &sc,
6596 sync_pair,
6597 gen,
6598 status));
6599 }
6600
6601 void rgw_data_sync_info::generate_test_instances(list<rgw_data_sync_info*>& o)
6602 {
6603 auto info = new rgw_data_sync_info;
6604 info->state = rgw_data_sync_info::StateBuildingFullSyncMaps;
6605 info->num_shards = 8;
6606 o.push_back(info);
6607 o.push_back(new rgw_data_sync_info);
6608 }
6609
6610 void rgw_data_sync_marker::generate_test_instances(list<rgw_data_sync_marker*>& o)
6611 {
6612 auto marker = new rgw_data_sync_marker;
6613 marker->state = rgw_data_sync_marker::IncrementalSync;
6614 marker->marker = "01234";
6615 marker->pos = 5;
6616 o.push_back(marker);
6617 o.push_back(new rgw_data_sync_marker);
6618 }
6619
6620 void rgw_data_sync_status::generate_test_instances(list<rgw_data_sync_status*>& o)
6621 {
6622 o.push_back(new rgw_data_sync_status);
6623 }
6624
6625 void rgw_bucket_shard_full_sync_marker::dump(Formatter *f) const
6626 {
6627 encode_json("position", position, f);
6628 encode_json("count", count, f);
6629 }
6630
6631 void rgw_bucket_shard_inc_sync_marker::decode_json(JSONObj *obj)
6632 {
6633 JSONDecoder::decode_json("position", position, obj);
6634 JSONDecoder::decode_json("timestamp", timestamp, obj);
6635 }
6636
6637 void rgw_bucket_shard_inc_sync_marker::dump(Formatter *f) const
6638 {
6639 encode_json("position", position, f);
6640 encode_json("timestamp", timestamp, f);
6641 }
6642
6643 void rgw_bucket_shard_sync_info::decode_json(JSONObj *obj)
6644 {
6645 std::string s;
6646 JSONDecoder::decode_json("status", s, obj);
6647 if (s == "full-sync") {
6648 state = StateFullSync;
6649 } else if (s == "incremental-sync") {
6650 state = StateIncrementalSync;
6651 } else if (s == "stopped") {
6652 state = StateStopped;
6653 } else {
6654 state = StateInit;
6655 }
6656 JSONDecoder::decode_json("inc_marker", inc_marker, obj);
6657 }
6658
6659 void rgw_bucket_shard_full_sync_marker::decode_json(JSONObj *obj)
6660 {
6661 JSONDecoder::decode_json("position", position, obj);
6662 JSONDecoder::decode_json("count", count, obj);
6663 }
6664
6665 void rgw_bucket_shard_sync_info::dump(Formatter *f) const
6666 {
6667 const char *s{nullptr};
6668 switch ((SyncState)state) {
6669 case StateInit:
6670 s = "init";
6671 break;
6672 case StateFullSync:
6673 s = "full-sync";
6674 break;
6675 case StateIncrementalSync:
6676 s = "incremental-sync";
6677 break;
6678 case StateStopped:
6679 s = "stopped";
6680 break;
6681 default:
6682 s = "unknown";
6683 break;
6684 }
6685 encode_json("status", s, f);
6686 encode_json("inc_marker", inc_marker, f);
6687 }
6688
6689 void rgw_bucket_full_sync_status::decode_json(JSONObj *obj)
6690 {
6691 JSONDecoder::decode_json("position", position, obj);
6692 JSONDecoder::decode_json("count", count, obj);
6693 }
6694
6695 void rgw_bucket_full_sync_status::dump(Formatter *f) const
6696 {
6697 encode_json("position", position, f);
6698 encode_json("count", count, f);
6699 }
6700
6701 void encode_json(const char *name, BucketSyncState state, Formatter *f)
6702 {
6703 switch (state) {
6704 case BucketSyncState::Init:
6705 encode_json(name, "init", f);
6706 break;
6707 case BucketSyncState::Full:
6708 encode_json(name, "full-sync", f);
6709 break;
6710 case BucketSyncState::Incremental:
6711 encode_json(name, "incremental-sync", f);
6712 break;
6713 case BucketSyncState::Stopped:
6714 encode_json(name, "stopped", f);
6715 break;
6716 default:
6717 encode_json(name, "unknown", f);
6718 break;
6719 }
6720 }
6721
6722 void decode_json_obj(BucketSyncState& state, JSONObj *obj)
6723 {
6724 std::string s;
6725 decode_json_obj(s, obj);
6726 if (s == "full-sync") {
6727 state = BucketSyncState::Full;
6728 } else if (s == "incremental-sync") {
6729 state = BucketSyncState::Incremental;
6730 } else if (s == "stopped") {
6731 state = BucketSyncState::Stopped;
6732 } else {
6733 state = BucketSyncState::Init;
6734 }
6735 }
6736
6737 void rgw_bucket_sync_status::decode_json(JSONObj *obj)
6738 {
6739 JSONDecoder::decode_json("state", state, obj);
6740 JSONDecoder::decode_json("full", full, obj);
6741 JSONDecoder::decode_json("incremental_gen", incremental_gen, obj);
6742 }
6743
6744 void rgw_bucket_sync_status::dump(Formatter *f) const
6745 {
6746 encode_json("state", state, f);
6747 encode_json("full", full, f);
6748 encode_json("incremental_gen", incremental_gen, f);
6749 }
6750
6751
6752 void bilog_status_v2::dump(Formatter *f) const
6753 {
6754 encode_json("sync_status", sync_status, f);
6755 encode_json("inc_status", inc_status, f);
6756 }
6757
6758 void bilog_status_v2::decode_json(JSONObj *obj)
6759 {
6760 JSONDecoder::decode_json("sync_status", sync_status, obj);
6761 JSONDecoder::decode_json("inc_status", inc_status, obj);
6762 }