]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "common/ceph_json.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_zone.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_cr_tools.h"
18 #include "rgw_http_client.h"
19 #include "rgw_bucket.h"
20 #include "rgw_bucket_sync.h"
21 #include "rgw_bucket_sync_cache.h"
22 #include "rgw_datalog.h"
23 #include "rgw_metadata.h"
24 #include "rgw_sync_counters.h"
25 #include "rgw_sync_error_repo.h"
26 #include "rgw_sync_module.h"
27 #include "rgw_sal.h"
28
29 #include "cls/lock/cls_lock_client.h"
30 #include "cls/rgw/cls_rgw_client.h"
31
32 #include "services/svc_zone.h"
33 #include "services/svc_sync_modules.h"
34 #include "rgw_bucket.h"
35
36 #include "include/common_fwd.h"
37 #include "include/random.h"
38
39 #include <boost/asio/yield.hpp>
40 #include <string_view>
41
42 #define dout_subsys ceph_subsys_rgw
43
44 #undef dout_prefix
45 #define dout_prefix (*_dout << "data sync: ")
46
47 using namespace std;
48
49 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
50 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
51 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
52 static string bucket_status_oid_prefix = "bucket.sync-status";
53 static string object_status_oid_prefix = "bucket.sync-status";
54
55
56 void rgw_datalog_info::decode_json(JSONObj *obj) {
57 JSONDecoder::decode_json("num_objects", num_shards, obj);
58 }
59
60 void rgw_datalog_entry::decode_json(JSONObj *obj) {
61 JSONDecoder::decode_json("key", key, obj);
62 utime_t ut;
63 JSONDecoder::decode_json("timestamp", ut, obj);
64 timestamp = ut.to_real_time();
65 }
66
67 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
68 JSONDecoder::decode_json("marker", marker, obj);
69 JSONDecoder::decode_json("truncated", truncated, obj);
70 JSONDecoder::decode_json("entries", entries, obj);
71 };
72
73
74 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
75 static constexpr int MAX_CONCURRENT_SHARDS = 16;
76
77 RGWDataSyncCtx *sc;
78 RGWDataSyncEnv *env;
79 const int num_shards;
80 int shard_id{0};;
81
82 map<uint32_t, rgw_data_sync_marker>& markers;
83
84 public:
85 RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx *sc, int num_shards,
86 map<uint32_t, rgw_data_sync_marker>& markers)
87 : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS),
88 sc(sc), env(sc->env), num_shards(num_shards), markers(markers)
89 {}
90 bool spawn_next() override;
91 };
92
93 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
94 {
95 if (shard_id >= num_shards) {
96 return false;
97 }
98 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
99 spawn(new CR(env->dpp, env->async_rados, env->svc->sysobj,
100 rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
101 &markers[shard_id]),
102 false);
103 shard_id++;
104 return true;
105 }
106
107 class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
108 static constexpr int MAX_CONCURRENT_SHARDS = 16;
109
110 RGWDataSyncCtx *sc;
111 RGWDataSyncEnv *env;
112
113 uint64_t max_entries;
114 int num_shards;
115 int shard_id{0};
116
117 string marker;
118 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
119
120 public:
121 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncCtx *sc, uint64_t _max_entries, int _num_shards,
122 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
123 : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS), sc(sc), env(sc->env),
124 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
125 {}
126 bool spawn_next() override;
127 };
128
129 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
130 {
131 if (shard_id >= num_shards)
132 return false;
133
134 string error_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id) + ".retry";
135 auto& shard_keys = omapkeys[shard_id];
136 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
137 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, error_oid),
138 marker, max_entries, shard_keys), false);
139
140 ++shard_id;
141 return true;
142 }
143
144 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
145 RGWDataSyncCtx *sc;
146 RGWDataSyncEnv *sync_env;
147 rgw_data_sync_status *sync_status;
148
149 public:
150 RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc,
151 rgw_data_sync_status *_status)
152 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status)
153 {}
154 int operate(const DoutPrefixProvider *dpp) override;
155 };
156
157 int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
158 {
159 reenter(this) {
160 // read sync info
161 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
162 yield {
163 bool empty_on_enoent = false; // fail on ENOENT
164 call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
165 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
166 &sync_status->sync_info, empty_on_enoent));
167 }
168 if (retcode < 0) {
169 ldpp_dout(dpp, 4) << "failed to read sync status info with "
170 << cpp_strerror(retcode) << dendl;
171 return set_cr_error(retcode);
172 }
173 // read shard markers
174 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
175 yield call(new ReadMarkersCR(sc, sync_status->sync_info.num_shards,
176 sync_status->sync_markers));
177 if (retcode < 0) {
178 ldpp_dout(dpp, 4) << "failed to read sync status markers with "
179 << cpp_strerror(retcode) << dendl;
180 return set_cr_error(retcode);
181 }
182 return set_cr_done();
183 }
184 return 0;
185 }
186
187 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
188 RGWDataSyncCtx *sc;
189 RGWDataSyncEnv *sync_env;
190
191 RGWRESTReadResource *http_op;
192
193 int shard_id;
194 RGWDataChangesLogInfo *shard_info;
195
196 public:
197 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncCtx *_sc,
198 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sc->cct),
199 sc(_sc),
200 sync_env(_sc->env),
201 http_op(NULL),
202 shard_id(_shard_id),
203 shard_info(_shard_info) {
204 }
205
206 ~RGWReadRemoteDataLogShardInfoCR() override {
207 if (http_op) {
208 http_op->put();
209 }
210 }
211
212 int operate(const DoutPrefixProvider *dpp) override {
213 reenter(this) {
214 yield {
215 char buf[16];
216 snprintf(buf, sizeof(buf), "%d", shard_id);
217 rgw_http_param_pair pairs[] = { { "type" , "data" },
218 { "id", buf },
219 { "info" , NULL },
220 { NULL, NULL } };
221
222 string p = "/admin/log/";
223
224 http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
225
226 init_new_io(http_op);
227
228 int ret = http_op->aio_read(dpp);
229 if (ret < 0) {
230 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
231 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
232 return set_cr_error(ret);
233 }
234
235 return io_block(0);
236 }
237 yield {
238 int ret = http_op->wait(shard_info, null_yield);
239 if (ret < 0) {
240 return set_cr_error(ret);
241 }
242 return set_cr_done();
243 }
244 }
245 return 0;
246 }
247 };
248
249 struct read_remote_data_log_response {
250 string marker;
251 bool truncated;
252 list<rgw_data_change_log_entry> entries;
253
254 read_remote_data_log_response() : truncated(false) {}
255
256 void decode_json(JSONObj *obj) {
257 JSONDecoder::decode_json("marker", marker, obj);
258 JSONDecoder::decode_json("truncated", truncated, obj);
259 JSONDecoder::decode_json("entries", entries, obj);
260 };
261 };
262
263 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
264 RGWDataSyncCtx *sc;
265 RGWDataSyncEnv *sync_env;
266
267 RGWRESTReadResource *http_op = nullptr;
268
269 int shard_id;
270 const std::string& marker;
271 string *pnext_marker;
272 list<rgw_data_change_log_entry> *entries;
273 bool *truncated;
274
275 read_remote_data_log_response response;
276 std::optional<TOPNSPC::common::PerfGuard> timer;
277
278 public:
279 RGWReadRemoteDataLogShardCR(RGWDataSyncCtx *_sc, int _shard_id,
280 const std::string& marker, string *pnext_marker,
281 list<rgw_data_change_log_entry> *_entries,
282 bool *_truncated)
283 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
284 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
285 entries(_entries), truncated(_truncated) {
286 }
287 ~RGWReadRemoteDataLogShardCR() override {
288 if (http_op) {
289 http_op->put();
290 }
291 }
292
293 int operate(const DoutPrefixProvider *dpp) override {
294 reenter(this) {
295 yield {
296 char buf[16];
297 snprintf(buf, sizeof(buf), "%d", shard_id);
298 rgw_http_param_pair pairs[] = { { "type" , "data" },
299 { "id", buf },
300 { "marker", marker.c_str() },
301 { "extra-info", "true" },
302 { NULL, NULL } };
303
304 string p = "/admin/log/";
305
306 http_op = new RGWRESTReadResource(sc->conn, p, pairs, NULL, sync_env->http_manager);
307
308 init_new_io(http_op);
309
310 if (sync_env->counters) {
311 timer.emplace(sync_env->counters, sync_counters::l_poll);
312 }
313 int ret = http_op->aio_read(dpp);
314 if (ret < 0) {
315 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
316 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
317 if (sync_env->counters) {
318 sync_env->counters->inc(sync_counters::l_poll_err);
319 }
320 return set_cr_error(ret);
321 }
322
323 return io_block(0);
324 }
325 yield {
326 timer.reset();
327 int ret = http_op->wait(&response, null_yield);
328 if (ret < 0) {
329 if (sync_env->counters && ret != -ENOENT) {
330 sync_env->counters->inc(sync_counters::l_poll_err);
331 }
332 return set_cr_error(ret);
333 }
334 entries->clear();
335 entries->swap(response.entries);
336 *pnext_marker = response.marker;
337 *truncated = response.truncated;
338 return set_cr_done();
339 }
340 }
341 return 0;
342 }
343 };
344
345 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
346 RGWDataSyncCtx *sc;
347 RGWDataSyncEnv *sync_env;
348
349 int num_shards;
350 map<int, RGWDataChangesLogInfo> *datalog_info;
351
352 int shard_id;
353 #define READ_DATALOG_MAX_CONCURRENT 10
354
355 public:
356 RGWReadRemoteDataLogInfoCR(RGWDataSyncCtx *_sc,
357 int _num_shards,
358 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sc->cct, READ_DATALOG_MAX_CONCURRENT),
359 sc(_sc), sync_env(_sc->env), num_shards(_num_shards),
360 datalog_info(_datalog_info), shard_id(0) {}
361 bool spawn_next() override;
362 };
363
364 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
365 if (shard_id >= num_shards) {
366 return false;
367 }
368 spawn(new RGWReadRemoteDataLogShardInfoCR(sc, shard_id, &(*datalog_info)[shard_id]), false);
369 shard_id++;
370 return true;
371 }
372
373 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
374 RGWDataSyncCtx *sc;
375 RGWDataSyncEnv *sync_env;
376 RGWRESTReadResource *http_op;
377
378 int shard_id;
379 string marker;
380 uint32_t max_entries;
381 rgw_datalog_shard_data *result;
382
383 public:
384 RGWListRemoteDataLogShardCR(RGWDataSyncCtx *sc, int _shard_id,
385 const string& _marker, uint32_t _max_entries,
386 rgw_datalog_shard_data *_result)
387 : RGWSimpleCoroutine(sc->cct), sc(sc), sync_env(sc->env), http_op(NULL),
388 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
389
390 int send_request(const DoutPrefixProvider *dpp) override {
391 RGWRESTConn *conn = sc->conn;
392
393 char buf[32];
394 snprintf(buf, sizeof(buf), "%d", shard_id);
395
396 char max_entries_buf[32];
397 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
398
399 const char *marker_key = (marker.empty() ? "" : "marker");
400
401 rgw_http_param_pair pairs[] = { { "type", "data" },
402 { "id", buf },
403 { "max-entries", max_entries_buf },
404 { marker_key, marker.c_str() },
405 { NULL, NULL } };
406
407 string p = "/admin/log/";
408
409 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
410 init_new_io(http_op);
411
412 int ret = http_op->aio_read(dpp);
413 if (ret < 0) {
414 ldpp_dout(dpp, 0) << "ERROR: failed to read from " << p << dendl;
415 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
416 http_op->put();
417 return ret;
418 }
419
420 return 0;
421 }
422
423 int request_complete() override {
424 int ret = http_op->wait(result, null_yield);
425 http_op->put();
426 if (ret < 0 && ret != -ENOENT) {
427 ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
428 return ret;
429 }
430 return 0;
431 }
432 };
433
434 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
435 RGWDataSyncCtx *sc;
436 RGWDataSyncEnv *sync_env;
437
438 map<int, string> shards;
439 int max_entries_per_shard;
440 map<int, rgw_datalog_shard_data> *result;
441
442 map<int, string>::iterator iter;
443 #define READ_DATALOG_MAX_CONCURRENT 10
444
445 public:
446 RGWListRemoteDataLogCR(RGWDataSyncCtx *_sc,
447 map<int, string>& _shards,
448 int _max_entries_per_shard,
449 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sc->cct, READ_DATALOG_MAX_CONCURRENT),
450 sc(_sc), sync_env(_sc->env), max_entries_per_shard(_max_entries_per_shard),
451 result(_result) {
452 shards.swap(_shards);
453 iter = shards.begin();
454 }
455 bool spawn_next() override;
456 };
457
458 bool RGWListRemoteDataLogCR::spawn_next() {
459 if (iter == shards.end()) {
460 return false;
461 }
462
463 spawn(new RGWListRemoteDataLogShardCR(sc, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
464 ++iter;
465 return true;
466 }
467
468 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
469 static constexpr uint32_t lock_duration = 30;
470 RGWDataSyncCtx *sc;
471 RGWDataSyncEnv *sync_env;
472 rgw::sal::RadosStore* store;
473 const rgw_pool& pool;
474 const uint32_t num_shards;
475
476 string sync_status_oid;
477
478 string lock_name;
479 string cookie;
480 rgw_data_sync_status *status;
481 map<int, RGWDataChangesLogInfo> shards_info;
482
483 RGWSyncTraceNodeRef tn;
484 public:
485 RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards,
486 uint64_t instance_id,
487 RGWSyncTraceNodeRef& _tn_parent,
488 rgw_data_sync_status *status)
489 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), store(sync_env->store),
490 pool(sync_env->svc->zone->get_zone_params().log_pool),
491 num_shards(num_shards), status(status),
492 tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
493 lock_name = "sync_lock";
494
495 status->sync_info.instance_id = instance_id;
496
497 #define COOKIE_LEN 16
498 char buf[COOKIE_LEN + 1];
499
500 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
501 cookie = buf;
502
503 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sc->source_zone);
504
505 }
506
507 int operate(const DoutPrefixProvider *dpp) override {
508 int ret;
509 reenter(this) {
510 using LockCR = RGWSimpleRadosLockCR;
511 yield call(new LockCR(sync_env->async_rados, store,
512 rgw_raw_obj{pool, sync_status_oid},
513 lock_name, cookie, lock_duration));
514 if (retcode < 0) {
515 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
516 return set_cr_error(retcode);
517 }
518 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
519 yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
520 rgw_raw_obj{pool, sync_status_oid},
521 status->sync_info));
522 if (retcode < 0) {
523 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
524 return set_cr_error(retcode);
525 }
526
527 /* take lock again, we just recreated the object */
528 yield call(new LockCR(sync_env->async_rados, store,
529 rgw_raw_obj{pool, sync_status_oid},
530 lock_name, cookie, lock_duration));
531 if (retcode < 0) {
532 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
533 return set_cr_error(retcode);
534 }
535
536 tn->log(10, "took lease");
537
538 /* fetch current position in logs */
539 yield {
540 RGWRESTConn *conn = sync_env->svc->zone->get_zone_conn(sc->source_zone);
541 if (!conn) {
542 tn->log(0, SSTR("ERROR: connection to zone " << sc->source_zone << " does not exist!"));
543 return set_cr_error(-EIO);
544 }
545 for (uint32_t i = 0; i < num_shards; i++) {
546 spawn(new RGWReadRemoteDataLogShardInfoCR(sc, i, &shards_info[i]), true);
547 }
548 }
549 while (collect(&ret, NULL)) {
550 if (ret < 0) {
551 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
552 return set_state(RGWCoroutine_Error);
553 }
554 yield;
555 }
556 yield {
557 for (uint32_t i = 0; i < num_shards; i++) {
558 RGWDataChangesLogInfo& info = shards_info[i];
559 auto& marker = status->sync_markers[i];
560 marker.next_step_marker = info.marker;
561 marker.timestamp = info.last_update;
562 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, i);
563 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
564 spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
565 rgw_raw_obj{pool, oid}, marker), true);
566 }
567 }
568 while (collect(&ret, NULL)) {
569 if (ret < 0) {
570 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
571 return set_state(RGWCoroutine_Error);
572 }
573 yield;
574 }
575
576 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
577 yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
578 rgw_raw_obj{pool, sync_status_oid},
579 status->sync_info));
580 if (retcode < 0) {
581 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
582 return set_cr_error(retcode);
583 }
584 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
585 rgw_raw_obj{pool, sync_status_oid},
586 lock_name, cookie));
587 return set_cr_done();
588 }
589 return 0;
590 }
591 };
592
593 RGWRemoteDataLog::RGWRemoteDataLog(const DoutPrefixProvider *dpp,
594 rgw::sal::RadosStore* store,
595 RGWAsyncRadosProcessor *async_rados)
596 : RGWCoroutinesManager(store->ctx(), store->getRados()->get_cr_registry()),
597 dpp(dpp), store(store),
598 cct(store->ctx()), cr_registry(store->getRados()->get_cr_registry()),
599 async_rados(async_rados),
600 http_manager(store->ctx(), completion_mgr),
601 data_sync_cr(NULL),
602 initialized(false)
603 {
604 }
605
606 int RGWRemoteDataLog::read_log_info(const DoutPrefixProvider *dpp, rgw_datalog_info *log_info)
607 {
608 rgw_http_param_pair pairs[] = { { "type", "data" },
609 { NULL, NULL } };
610
611 int ret = sc.conn->get_json_resource(dpp, "/admin/log", pairs, null_yield, *log_info);
612 if (ret < 0) {
613 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
614 return ret;
615 }
616
617 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
618
619 return 0;
620 }
621
622 int RGWRemoteDataLog::read_source_log_shards_info(const DoutPrefixProvider *dpp, map<int, RGWDataChangesLogInfo> *shards_info)
623 {
624 rgw_datalog_info log_info;
625 int ret = read_log_info(dpp, &log_info);
626 if (ret < 0) {
627 return ret;
628 }
629
630 return run(dpp, new RGWReadRemoteDataLogInfoCR(&sc, log_info.num_shards, shards_info));
631 }
632
633 int RGWRemoteDataLog::read_source_log_shards_next(const DoutPrefixProvider *dpp, map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
634 {
635 return run(dpp, new RGWListRemoteDataLogCR(&sc, shard_markers, 1, result));
636 }
637
638 int RGWRemoteDataLog::init(const rgw_zone_id& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
639 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
640 PerfCounters* counters)
641 {
642 sync_env.init(dpp, cct, store, store->svc(), async_rados, &http_manager, _error_logger,
643 _sync_tracer, _sync_module, counters);
644 sc.init(&sync_env, _conn, _source_zone);
645
646 if (initialized) {
647 return 0;
648 }
649
650 int ret = http_manager.start();
651 if (ret < 0) {
652 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
653 return ret;
654 }
655
656 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
657
658 initialized = true;
659
660 return 0;
661 }
662
663 void RGWRemoteDataLog::finish()
664 {
665 stop();
666 }
667
668 int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status)
669 {
670 // cannot run concurrently with run_sync(), so run in a separate manager
671 RGWCoroutinesManager crs(cct, cr_registry);
672 RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
673 int ret = http_manager.start();
674 if (ret < 0) {
675 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
676 return ret;
677 }
678 RGWDataSyncEnv sync_env_local = sync_env;
679 sync_env_local.http_manager = &http_manager;
680
681 RGWDataSyncCtx sc_local = sc;
682 sc_local.env = &sync_env_local;
683
684 ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status));
685 http_manager.stop();
686 return ret;
687 }
688
689 int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider *dpp, const int num_shards, set<int>& recovering_shards)
690 {
691 // cannot run concurrently with run_sync(), so run in a separate manager
692 RGWCoroutinesManager crs(cct, cr_registry);
693 RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
694 int ret = http_manager.start();
695 if (ret < 0) {
696 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
697 return ret;
698 }
699 RGWDataSyncEnv sync_env_local = sync_env;
700 sync_env_local.http_manager = &http_manager;
701
702 RGWDataSyncCtx sc_local = sc;
703 sc_local.env = &sync_env_local;
704
705 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
706 omapkeys.resize(num_shards);
707 uint64_t max_entries{1};
708
709 ret = crs.run(dpp, new RGWReadDataSyncRecoveringShardsCR(&sc_local, max_entries, num_shards, omapkeys));
710 http_manager.stop();
711
712 if (ret == 0) {
713 for (int i = 0; i < num_shards; i++) {
714 if (omapkeys[i]->entries.size() != 0) {
715 recovering_shards.insert(i);
716 }
717 }
718 }
719
720 return ret;
721 }
722
723 int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
724 {
725 rgw_data_sync_status sync_status;
726 sync_status.sync_info.num_shards = num_shards;
727
728 RGWCoroutinesManager crs(cct, cr_registry);
729 RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
730 int ret = http_manager.start();
731 if (ret < 0) {
732 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
733 return ret;
734 }
735 RGWDataSyncEnv sync_env_local = sync_env;
736 sync_env_local.http_manager = &http_manager;
737 auto instance_id = ceph::util::generate_random_number<uint64_t>();
738 RGWDataSyncCtx sc_local = sc;
739 sc_local.env = &sync_env_local;
740 ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status));
741 http_manager.stop();
742 return ret;
743 }
744
745 static string full_data_sync_index_shard_oid(const rgw_zone_id& source_zone, int shard_id)
746 {
747 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.id.size() + 1 + 16];
748 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.id.c_str(), shard_id);
749 return string(buf);
750 }
751
752 struct read_metadata_list {
753 string marker;
754 bool truncated;
755 list<string> keys;
756 int count;
757
758 read_metadata_list() : truncated(false), count(0) {}
759
760 void decode_json(JSONObj *obj) {
761 JSONDecoder::decode_json("marker", marker, obj);
762 JSONDecoder::decode_json("truncated", truncated, obj);
763 JSONDecoder::decode_json("keys", keys, obj);
764 JSONDecoder::decode_json("count", count, obj);
765 }
766 };
767
768 struct bucket_instance_meta_info {
769 string key;
770 obj_version ver;
771 utime_t mtime;
772 RGWBucketInstanceMetadataObject data;
773
774 bucket_instance_meta_info() {}
775
776 void decode_json(JSONObj *obj) {
777 JSONDecoder::decode_json("key", key, obj);
778 JSONDecoder::decode_json("ver", ver, obj);
779 JSONDecoder::decode_json("mtime", mtime, obj);
780 JSONDecoder::decode_json("data", data, obj);
781 }
782 };
783
784 class RGWListBucketIndexesCR : public RGWCoroutine {
785 RGWDataSyncCtx *sc;
786 RGWDataSyncEnv *sync_env;
787
788 rgw::sal::RadosStore* store;
789
790 rgw_data_sync_status *sync_status;
791 int num_shards;
792
793 int req_ret;
794 int ret;
795
796 list<string>::iterator iter;
797
798 RGWShardedOmapCRManager *entries_index;
799
800 string oid_prefix;
801
802 string path;
803 bucket_instance_meta_info meta_info;
804 string key;
805 string s;
806 int i;
807
808 bool failed;
809 bool truncated;
810 read_metadata_list result;
811
812 public:
813 RGWListBucketIndexesCR(RGWDataSyncCtx *_sc,
814 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
815 store(sync_env->store), sync_status(_sync_status),
816 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
817 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sc->source_zone.id;
818 path = "/admin/metadata/bucket.instance";
819 num_shards = sync_status->sync_info.num_shards;
820 }
821 ~RGWListBucketIndexesCR() override {
822 delete entries_index;
823 }
824
825 int operate(const DoutPrefixProvider *dpp) override {
826 reenter(this) {
827 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
828 sync_env->svc->zone->get_zone_params().log_pool,
829 oid_prefix);
830 yield; // yield so OmapAppendCRs can start
831
832 do {
833 yield {
834 string entrypoint = string("/admin/metadata/bucket.instance");
835
836 rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
837 {"marker", result.marker.c_str()},
838 {NULL, NULL}};
839
840 call(new RGWReadRESTResourceCR<read_metadata_list>(sync_env->cct, sc->conn, sync_env->http_manager,
841 entrypoint, pairs, &result));
842 }
843 if (retcode < 0) {
844 ldpp_dout(dpp, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
845 return set_cr_error(retcode);
846 }
847
848 for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
849 ldpp_dout(dpp, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
850 key = *iter;
851
852 yield {
853 rgw_http_param_pair pairs[] = {{"key", key.c_str()},
854 {NULL, NULL}};
855
856 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(sync_env->cct, sc->conn, sync_env->http_manager, path, pairs, &meta_info));
857 }
858
859 num_shards = meta_info.data.get_bucket_info().layout.current_index.layout.normal.num_shards;
860 if (num_shards > 0) {
861 for (i = 0; i < num_shards; i++) {
862 char buf[16];
863 snprintf(buf, sizeof(buf), ":%d", i);
864 s = key + buf;
865 yield entries_index->append(s, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
866 }
867 } else {
868 yield entries_index->append(key, sync_env->svc->datalog_rados->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
869 }
870 }
871 truncated = result.truncated;
872 } while (truncated);
873
874 yield {
875 if (!entries_index->finish()) {
876 failed = true;
877 }
878 }
879 if (!failed) {
880 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
881 int shard_id = (int)iter->first;
882 rgw_data_sync_marker& marker = iter->second;
883 marker.total_entries = entries_index->get_total_entries(shard_id);
884 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(dpp, sync_env->async_rados, sync_env->svc->sysobj,
885 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
886 marker),
887 true);
888 }
889 } else {
890 yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "",
891 EIO, string("failed to build bucket instances map")));
892 }
893 while (collect(&ret, NULL)) {
894 if (ret < 0) {
895 yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data.init", "",
896 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
897 req_ret = ret;
898 }
899 yield;
900 }
901
902 drain_all();
903 if (req_ret < 0) {
904 yield return set_cr_error(req_ret);
905 }
906 yield return set_cr_done();
907 }
908 return 0;
909 }
910 };
911
912 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
913
914 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
915 RGWDataSyncCtx *sc;
916 RGWDataSyncEnv *sync_env;
917 string marker_oid;
918 rgw_data_sync_marker sync_marker;
919 RGWSyncTraceNodeRef tn;
920
921 public:
922 RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
923 const string& _marker_oid,
924 const rgw_data_sync_marker& _marker,
925 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
926 sc(_sc), sync_env(_sc->env),
927 marker_oid(_marker_oid),
928 sync_marker(_marker),
929 tn(_tn) {}
930
931 RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
932 sync_marker.marker = new_marker;
933 sync_marker.pos = index_pos;
934 sync_marker.timestamp = timestamp;
935
936 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
937
938 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
939 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
940 sync_marker);
941 }
942
943 RGWOrderCallCR *allocate_order_control_cr() override {
944 return new RGWLastCallerWinsCR(sync_env->cct);
945 }
946 };
947
948 // ostream wrappers to print buckets without copying strings
949 struct bucket_str {
950 const rgw_bucket& b;
951 explicit bucket_str(const rgw_bucket& b) : b(b) {}
952 };
953 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
954 auto& b = rhs.b;
955 if (!b.tenant.empty()) {
956 out << b.tenant << '/';
957 }
958 out << b.name;
959 if (!b.bucket_id.empty()) {
960 out << ':' << b.bucket_id;
961 }
962 return out;
963 }
964
965 struct bucket_str_noinstance {
966 const rgw_bucket& b;
967 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
968 };
969 std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
970 auto& b = rhs.b;
971 if (!b.tenant.empty()) {
972 out << b.tenant << '/';
973 }
974 out << b.name;
975 return out;
976 }
977
978 struct bucket_shard_str {
979 const rgw_bucket_shard& bs;
980 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
981 };
982 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
983 auto& bs = rhs.bs;
984 out << bucket_str{bs.bucket};
985 if (bs.shard_id >= 0) {
986 out << ':' << bs.shard_id;
987 }
988 return out;
989 }
990
991 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
992 RGWDataSyncCtx *sc;
993 RGWDataSyncEnv *sync_env;
994 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
995 rgw_bucket_sync_pair_info sync_pair;
996 rgw_bucket_sync_pipe sync_pipe;
997 rgw_bucket_shard_sync_info sync_status;
998 RGWMetaSyncEnv meta_sync_env;
999 RGWObjVersionTracker objv_tracker;
1000 ceph::real_time* progress;
1001
1002 const std::string status_oid;
1003
1004 RGWSyncTraceNodeRef tn;
1005
1006 public:
1007 RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc,
1008 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1009 const rgw_bucket_sync_pair_info& _sync_pair,
1010 const RGWSyncTraceNodeRef& _tn_parent,
1011 ceph::real_time* progress)
1012 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
1013 lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
1014 status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
1015 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
1016 SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
1017 }
1018
1019 int operate(const DoutPrefixProvider *dpp) override;
1020 };
1021
1022 struct all_bucket_info {
1023 RGWBucketInfo bucket_info;
1024 map<string, bufferlist> attrs;
1025 };
1026
1027 struct rgw_sync_pipe_info_entity
1028 {
1029 private:
1030 RGWBucketInfo bucket_info;
1031 map<string, bufferlist> bucket_attrs;
1032 bool _has_bucket_info{false};
1033
1034 public:
1035 rgw_zone_id zone;
1036
1037 rgw_sync_pipe_info_entity() {}
1038 rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e,
1039 std::optional<all_bucket_info>& binfo) {
1040 if (e.zone) {
1041 zone = *e.zone;
1042 }
1043 if (!e.bucket) {
1044 return;
1045 }
1046 if (!binfo ||
1047 binfo->bucket_info.bucket != *e.bucket) {
1048 bucket_info.bucket = *e.bucket;
1049 } else {
1050 set_bucket_info(*binfo);
1051 }
1052 }
1053
1054 void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
1055 if (_has_bucket_info) {
1056 return;
1057 }
1058 if (bucket_info.bucket.name.empty()) {
1059 return;
1060 }
1061
1062 auto iter = buckets_info.find(bucket_info.bucket);
1063 if (iter == buckets_info.end()) {
1064 return;
1065 }
1066
1067 set_bucket_info(iter->second);
1068 }
1069
1070 bool has_bucket_info() const {
1071 return _has_bucket_info;
1072 }
1073
1074 void set_bucket_info(const all_bucket_info& all_info) {
1075 bucket_info = all_info.bucket_info;
1076 bucket_attrs = all_info.attrs;
1077 _has_bucket_info = true;
1078 }
1079
1080 const RGWBucketInfo& get_bucket_info() const {
1081 return bucket_info;
1082 }
1083
1084 const rgw_bucket& get_bucket() const {
1085 return bucket_info.bucket;
1086 }
1087
1088 bool operator<(const rgw_sync_pipe_info_entity& e) const {
1089 if (zone < e.zone) {
1090 return false;
1091 }
1092 if (zone > e.zone) {
1093 return true;
1094 }
1095 return (bucket_info.bucket < e.bucket_info.bucket);
1096 }
1097 };
1098
1099 std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e) {
1100 auto& bucket = e.get_bucket_info().bucket;
1101
1102 out << e.zone << ":" << bucket.get_key();
1103 return out;
1104 }
1105
1106 struct rgw_sync_pipe_handler_info {
1107 RGWBucketSyncFlowManager::pipe_handler handler;
1108 rgw_sync_pipe_info_entity source;
1109 rgw_sync_pipe_info_entity target;
1110
1111 rgw_sync_pipe_handler_info() {}
1112 rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler& _handler,
1113 std::optional<all_bucket_info> source_bucket_info,
1114 std::optional<all_bucket_info> target_bucket_info) : handler(_handler),
1115 source(handler.source, source_bucket_info),
1116 target(handler.dest, target_bucket_info) {
1117 }
1118
1119 bool operator<(const rgw_sync_pipe_handler_info& p) const {
1120 if (source < p.source) {
1121 return true;
1122 }
1123 if (p.source < source) {
1124 return false;
1125 }
1126 return (target < p.target);
1127 }
1128
1129 void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
1130 source.update_empty_bucket_info(buckets_info);
1131 target.update_empty_bucket_info(buckets_info);
1132 }
1133 };
1134
1135 std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_handler_info& p) {
1136 out << p.source << ">" << p.target;
1137 return out;
1138 }
1139
1140 struct rgw_sync_pipe_info_set {
1141 std::set<rgw_sync_pipe_handler_info> handlers;
1142
1143 using iterator = std::set<rgw_sync_pipe_handler_info>::iterator;
1144
1145 void clear() {
1146 handlers.clear();
1147 }
1148
1149 void insert(const RGWBucketSyncFlowManager::pipe_handler& handler,
1150 std::optional<all_bucket_info>& source_bucket_info,
1151 std::optional<all_bucket_info>& target_bucket_info) {
1152 rgw_sync_pipe_handler_info p(handler, source_bucket_info, target_bucket_info);
1153 handlers.insert(p);
1154 }
1155
1156 iterator begin() {
1157 return handlers.begin();
1158 }
1159
1160 iterator end() {
1161 return handlers.end();
1162 }
1163
1164 bool empty() const {
1165 return handlers.empty();
1166 }
1167
1168 void update_empty_bucket_info(const std::map<rgw_bucket, all_bucket_info>& buckets_info) {
1169 if (buckets_info.empty()) {
1170 return;
1171 }
1172
1173 std::set<rgw_sync_pipe_handler_info> p;
1174
1175 for (auto pipe : handlers) {
1176 pipe.update_empty_bucket_info(buckets_info);
1177 p.insert(pipe);
1178 }
1179
1180 handlers = std::move(p);
1181 }
1182 };
1183
1184 class RGWRunBucketsSyncBySourceCR : public RGWCoroutine {
1185 RGWDataSyncCtx *sc;
1186 RGWDataSyncEnv *sync_env;
1187
1188 rgw_bucket_shard source;
1189
1190 RGWSyncTraceNodeRef tn;
1191
1192 public:
1193 RGWRunBucketsSyncBySourceCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& _source, const RGWSyncTraceNodeRef& _tn_parent)
1194 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), source(_source),
1195 tn(sync_env->sync_tracer->add_node(_tn_parent, "source",
1196 SSTR(bucket_shard_str{_source} ))) {
1197 }
1198 ~RGWRunBucketsSyncBySourceCR() override {
1199 }
1200
1201 int operate(const DoutPrefixProvider *dpp) override;
1202 };
1203
1204 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
1205 RGWDataSyncCtx *sc;
1206 RGWDataSyncEnv *sync_env;
1207 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
1208
1209 std::optional<rgw_bucket_shard> target_bs;
1210 std::optional<rgw_bucket_shard> source_bs;
1211
1212 std::optional<rgw_bucket> target_bucket;
1213 std::optional<rgw_bucket> source_bucket;
1214
1215 rgw_sync_pipe_info_set pipes;
1216 rgw_sync_pipe_info_set::iterator siter;
1217
1218 rgw_bucket_sync_pair_info sync_pair;
1219
1220 RGWSyncTraceNodeRef tn;
1221 ceph::real_time* progress;
1222 std::map<uint64_t, ceph::real_time> shard_progress;
1223
1224 ceph::real_time *cur_progress{nullptr};
1225 std::optional<ceph::real_time> min_progress;
1226
1227 RGWRESTConn *conn{nullptr};
1228 rgw_zone_id last_zone;
1229
1230 int source_num_shards{0};
1231 int target_num_shards{0};
1232
1233 int num_shards{0};
1234 int cur_shard{0};
1235 bool again = false;
1236
1237 public:
1238 RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
1239 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1240 std::optional<rgw_bucket_shard> _target_bs,
1241 std::optional<rgw_bucket_shard> _source_bs,
1242 const RGWSyncTraceNodeRef& _tn_parent,
1243 ceph::real_time* progress);
1244
1245 int operate(const DoutPrefixProvider *dpp) override;
1246
1247 void handle_complete_stack(uint64_t stack_id) {
1248 auto iter = shard_progress.find(stack_id);
1249 if (iter == shard_progress.end()) {
1250 lderr(cct) << "ERROR: RGWRunBucketSourcesSyncCR::handle_complete_stack(): stack_id=" << stack_id << " not found! Likely a bug" << dendl;
1251 return;
1252 }
1253 if (progress) {
1254 if (!min_progress) {
1255 min_progress = iter->second;
1256 } else {
1257 if (iter->second < *min_progress) {
1258 min_progress = iter->second;
1259 }
1260 }
1261 }
1262
1263 shard_progress.erase(stack_id);
1264 }
1265 };
1266
1267 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1268 RGWDataSyncCtx *sc;
1269 RGWDataSyncEnv *sync_env;
1270 rgw::bucket_sync::Handle state; // cached bucket-shard state
1271 rgw_data_sync_obligation obligation; // input obligation
1272 std::optional<rgw_data_sync_obligation> complete; // obligation to complete
1273 uint32_t obligation_counter = 0;
1274 RGWDataSyncShardMarkerTrack *marker_tracker;
1275 const rgw_raw_obj& error_repo;
1276 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
1277 RGWSyncTraceNodeRef tn;
1278
1279 ceph::real_time progress;
1280 int sync_status = 0;
1281 public:
1282 RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
1283 rgw_data_sync_obligation obligation,
1284 RGWDataSyncShardMarkerTrack *_marker_tracker,
1285 const rgw_raw_obj& error_repo,
1286 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
1287 const RGWSyncTraceNodeRef& _tn_parent)
1288 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
1289 state(std::move(state)), obligation(std::move(obligation)),
1290 marker_tracker(_marker_tracker), error_repo(error_repo),
1291 lease_cr(std::move(lease_cr)) {
1292 set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
1293 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
1294 }
1295
1296 int operate(const DoutPrefixProvider *dpp) override {
1297 reenter(this) {
1298 if (state->obligation) {
1299 // this is already syncing in another DataSyncSingleEntryCR
1300 if (state->obligation->timestamp < obligation.timestamp) {
1301 // cancel existing obligation and overwrite it
1302 tn->log(10, SSTR("canceling existing obligation " << *state->obligation));
1303 complete = std::move(*state->obligation);
1304 *state->obligation = std::move(obligation);
1305 state->counter++;
1306 } else {
1307 // cancel new obligation
1308 tn->log(10, SSTR("canceling new obligation " << obligation));
1309 complete = std::move(obligation);
1310 }
1311 } else {
1312 // start syncing a new obligation
1313 state->obligation = obligation;
1314 obligation_counter = state->counter;
1315 state->counter++;
1316
1317 // loop until the latest obligation is satisfied, because other callers
1318 // may update the obligation while we're syncing
1319 while ((state->obligation->timestamp == ceph::real_time() ||
1320 state->progress_timestamp < state->obligation->timestamp) &&
1321 obligation_counter != state->counter) {
1322 obligation_counter = state->counter;
1323 progress = ceph::real_time{};
1324
1325 ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
1326 << ' ' << *state->obligation << dendl;
1327 yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
1328 std::nullopt, /* target_bs */
1329 state->key, tn, &progress));
1330 if (retcode < 0) {
1331 break;
1332 }
1333 state->progress_timestamp = std::max(progress, state->progress_timestamp);
1334 }
1335 // any new obligations will process themselves
1336 complete = std::move(*state->obligation);
1337 state->obligation.reset();
1338
1339 tn->log(10, SSTR("sync finished on " << bucket_shard_str{state->key}
1340 << " progress=" << progress << ' ' << complete << " r=" << retcode));
1341 }
1342 sync_status = retcode;
1343
1344 if (sync_status == -ENOENT) {
1345 // this was added when 'tenant/' was added to datalog entries, because
1346 // preexisting tenant buckets could never sync and would stay in the
1347 // error_repo forever
1348 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key));
1349 sync_status = 0;
1350 }
1351
1352 if (sync_status < 0) {
1353 // write actual sync failures for 'radosgw-admin sync error list'
1354 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1355 yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data", complete->key,
1356 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1357 if (retcode < 0) {
1358 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
1359 }
1360 }
1361 if (complete->timestamp != ceph::real_time{}) {
1362 tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
1363 yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo,
1364 complete->key, complete->timestamp));
1365 if (retcode < 0) {
1366 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
1367 }
1368 }
1369 } else if (complete->retry) {
1370 yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
1371 complete->key, complete->timestamp));
1372 if (retcode < 0) {
1373 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1374 << error_repo << " retcode=" << retcode));
1375 }
1376 }
1377 /* FIXME: what do do in case of error */
1378 if (marker_tracker && !complete->marker.empty()) {
1379 /* update marker */
1380 yield call(marker_tracker->finish(complete->marker));
1381 }
1382 if (sync_status == 0) {
1383 sync_status = retcode;
1384 }
1385 if (sync_status < 0) {
1386 return set_cr_error(sync_status);
1387 }
1388 return set_cr_done();
1389 }
1390 return 0;
1391 }
1392 };
1393
1394 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1395 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1396
1397 class RGWDataSyncShardCR : public RGWCoroutine {
1398 RGWDataSyncCtx *sc;
1399 RGWDataSyncEnv *sync_env;
1400
1401 rgw_pool pool;
1402
1403 uint32_t shard_id;
1404 rgw_data_sync_marker& sync_marker;
1405
1406 RGWRadosGetOmapValsCR::ResultPtr omapvals;
1407 std::map<std::string, bufferlist> entries;
1408 std::map<std::string, bufferlist>::iterator iter;
1409
1410 string oid;
1411
1412 std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
1413
1414 std::string next_marker;
1415 list<rgw_data_change_log_entry> log_entries;
1416 list<rgw_data_change_log_entry>::iterator log_iter;
1417 bool truncated = false;
1418
1419 ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
1420 ceph::condition_variable inc_cond;
1421
1422 boost::asio::coroutine incremental_cr;
1423 boost::asio::coroutine full_cr;
1424
1425
1426 set<string> modified_shards;
1427 set<string> current_modified;
1428
1429 set<string>::iterator modified_iter;
1430
1431 uint64_t total_entries = 0;
1432 static constexpr int spawn_window = BUCKET_SHARD_SYNC_SPAWN_WINDOW;
1433 bool *reset_backoff = nullptr;
1434
1435 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1436 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1437 string status_oid;
1438
1439 rgw_raw_obj error_repo;
1440 std::map<std::string, bufferlist> error_entries;
1441 string error_marker;
1442 ceph::real_time entry_timestamp;
1443 static constexpr int max_error_entries = DATA_SYNC_MAX_ERR_ENTRIES;
1444
1445 ceph::coarse_real_time error_retry_time;
1446
1447 #define RETRY_BACKOFF_SECS_MIN 60
1448 #define RETRY_BACKOFF_SECS_DEFAULT 60
1449 #define RETRY_BACKOFF_SECS_MAX 600
1450 uint32_t retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1451
1452 RGWSyncTraceNodeRef tn;
1453
1454 rgw_bucket_shard source_bs;
1455
1456 // target number of entries to cache before recycling idle ones
1457 static constexpr size_t target_cache_size = 256;
1458 boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
1459
1460 int parse_bucket_key(const std::string& key, rgw_bucket_shard& bs) const {
1461 return rgw_bucket_parse_bucket_key(sync_env->cct, key,
1462 &bs.bucket, &bs.shard_id);
1463 }
1464 RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src,
1465 const std::string& key,
1466 const std::string& marker,
1467 ceph::real_time timestamp, bool retry) {
1468 auto state = bucket_shard_cache->get(src);
1469 auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
1470 return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
1471 &*marker_tracker, error_repo,
1472 lease_cr.get(), tn);
1473 }
1474 public:
1475 RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
1476 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1477 RGWSyncTraceNodeRef& _tn, bool *_reset_backoff)
1478 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
1479 pool(_pool), shard_id(_shard_id), sync_marker(_marker),
1480 status_oid(RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
1481 error_repo(pool, status_oid + ".retry"), tn(_tn),
1482 bucket_shard_cache(rgw::bucket_sync::Cache::create(target_cache_size))
1483 {
1484 set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id;
1485 }
1486
1487 ~RGWDataSyncShardCR() override {
1488 if (lease_cr) {
1489 lease_cr->abort();
1490 }
1491 }
1492
1493 void append_modified_shards(set<string>& keys) {
1494 std::lock_guard l{inc_lock};
1495 modified_shards.insert(keys.begin(), keys.end());
1496 }
1497
1498 int operate(const DoutPrefixProvider *dpp) override {
1499 int r;
1500 while (true) {
1501 switch (sync_marker.state) {
1502 case rgw_data_sync_marker::FullSync:
1503 r = full_sync();
1504 if (r < 0) {
1505 if (r != -EBUSY) {
1506 tn->log(10, SSTR("full sync failed (r=" << r << ")"));
1507 }
1508 return set_cr_error(r);
1509 }
1510 return 0;
1511 case rgw_data_sync_marker::IncrementalSync:
1512 r = incremental_sync();
1513 if (r < 0) {
1514 if (r != -EBUSY) {
1515 tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
1516 }
1517 return set_cr_error(r);
1518 }
1519 return 0;
1520 default:
1521 return set_cr_error(-EIO);
1522 }
1523 }
1524 return 0;
1525 }
1526
1527 void init_lease_cr() {
1528 set_status("acquiring sync lock");
1529 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1530 string lock_name = "sync_lock";
1531 if (lease_cr) {
1532 lease_cr->abort();
1533 }
1534 auto store = sync_env->store;
1535 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1536 rgw_raw_obj(pool, status_oid),
1537 lock_name, lock_duration, this));
1538 lease_stack.reset(spawn(lease_cr.get(), false));
1539 }
1540
1541 int full_sync() {
1542 #define OMAP_GET_MAX_ENTRIES 100
1543 int max_entries = OMAP_GET_MAX_ENTRIES;
1544 reenter(&full_cr) {
1545 tn->log(10, "start full sync");
1546 yield init_lease_cr();
1547 while (!lease_cr->is_locked()) {
1548 if (lease_cr->is_done()) {
1549 tn->log(5, "failed to take lease");
1550 set_status("lease lock failed, early abort");
1551 drain_all();
1552 return set_cr_error(lease_cr->get_ret_status());
1553 }
1554 set_sleeping(true);
1555 yield;
1556 }
1557 tn->log(10, "took lease");
1558 oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
1559 marker_tracker.emplace(sc, status_oid, sync_marker, tn);
1560 total_entries = sync_marker.pos;
1561 entry_timestamp = sync_marker.timestamp; // time when full sync started
1562 do {
1563 if (!lease_cr->is_locked()) {
1564 lease_cr->go_down();
1565 drain_all();
1566 return set_cr_error(-ECANCELED);
1567 }
1568 omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
1569 yield call(new RGWRadosGetOmapValsCR(sync_env->store, rgw_raw_obj(pool, oid),
1570 sync_marker.marker, max_entries, omapvals));
1571 if (retcode < 0) {
1572 lease_cr->go_down();
1573 drain_all();
1574 return set_cr_error(retcode);
1575 }
1576 entries = std::move(omapvals->entries);
1577 if (entries.size() > 0) {
1578 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1579 }
1580 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1581 iter = entries.begin();
1582 for (; iter != entries.end(); ++iter) {
1583 retcode = parse_bucket_key(iter->first, source_bs);
1584 if (retcode < 0) {
1585 tn->log(1, SSTR("failed to parse bucket shard: " << iter->first));
1586 marker_tracker->try_update_high_marker(iter->first, 0, entry_timestamp);
1587 continue;
1588 }
1589 tn->log(20, SSTR("full sync: " << iter->first));
1590 total_entries++;
1591 if (!marker_tracker->start(iter->first, total_entries, entry_timestamp)) {
1592 tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
1593 } else {
1594 // fetch remote and write locally
1595 yield_spawn_window(sync_single_entry(source_bs, iter->first, iter->first,
1596 entry_timestamp, false),
1597 spawn_window, std::nullopt);
1598 }
1599 sync_marker.marker = iter->first;
1600 }
1601 } while (omapvals->more);
1602 omapvals.reset();
1603
1604 drain_all_but_stack(lease_stack.get());
1605
1606 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1607
1608 yield {
1609 /* update marker to reflect we're done with full sync */
1610 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1611 sync_marker.marker = sync_marker.next_step_marker;
1612 sync_marker.next_step_marker.clear();
1613 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
1614 rgw_raw_obj(pool, status_oid),
1615 sync_marker));
1616 }
1617 if (retcode < 0) {
1618 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
1619 lease_cr->go_down();
1620 drain_all();
1621 return set_cr_error(retcode);
1622 }
1623 // clean up full sync index
1624 yield {
1625 const auto& pool = sync_env->svc->zone->get_zone_params().log_pool;
1626 auto oid = full_data_sync_index_shard_oid(sc->source_zone.id, shard_id);
1627 call(new RGWRadosRemoveCR(sync_env->store, {pool, oid}));
1628 }
1629 // keep lease and transition to incremental_sync()
1630 }
1631 return 0;
1632 }
1633
1634 int incremental_sync() {
1635 reenter(&incremental_cr) {
1636 tn->log(10, "start incremental sync");
1637 if (lease_cr) {
1638 tn->log(10, "lease already held from full sync");
1639 } else {
1640 yield init_lease_cr();
1641 while (!lease_cr->is_locked()) {
1642 if (lease_cr->is_done()) {
1643 tn->log(5, "failed to take lease");
1644 set_status("lease lock failed, early abort");
1645 drain_all();
1646 return set_cr_error(lease_cr->get_ret_status());
1647 }
1648 set_sleeping(true);
1649 yield;
1650 }
1651 set_status("lease acquired");
1652 tn->log(10, "took lease");
1653 }
1654 marker_tracker.emplace(sc, status_oid, sync_marker, tn);
1655 do {
1656 if (!lease_cr->is_locked()) {
1657 lease_cr->go_down();
1658 drain_all();
1659 return set_cr_error(-ECANCELED);
1660 }
1661 current_modified.clear();
1662 inc_lock.lock();
1663 current_modified.swap(modified_shards);
1664 inc_lock.unlock();
1665
1666 if (current_modified.size() > 0) {
1667 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1668 }
1669 /* process out of band updates */
1670 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1671 retcode = parse_bucket_key(*modified_iter, source_bs);
1672 if (retcode < 0) {
1673 tn->log(1, SSTR("failed to parse bucket shard: " << *modified_iter));
1674 continue;
1675 }
1676 tn->log(20, SSTR("received async update notification: " << *modified_iter));
1677 spawn(sync_single_entry(source_bs, *modified_iter, string(),
1678 ceph::real_time{}, false), false);
1679 }
1680
1681 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1682 /* process bucket shards that previously failed */
1683 omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
1684 yield call(new RGWRadosGetOmapValsCR(sync_env->store, error_repo,
1685 error_marker, max_error_entries, omapvals));
1686 error_entries = std::move(omapvals->entries);
1687 tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
1688 iter = error_entries.begin();
1689 for (; iter != error_entries.end(); ++iter) {
1690 error_marker = iter->first;
1691 entry_timestamp = rgw_error_repo_decode_value(iter->second);
1692 retcode = parse_bucket_key(error_marker, source_bs);
1693 if (retcode < 0) {
1694 tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
1695 spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
1696 error_marker, entry_timestamp), false);
1697 continue;
1698 }
1699 tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
1700 spawn(sync_single_entry(source_bs, error_marker, "",
1701 entry_timestamp, true), false);
1702 }
1703 if (!omapvals->more) {
1704 if (error_marker.empty() && error_entries.empty()) {
1705 /* the retry repo is empty, we back off a bit before calling it again */
1706 retry_backoff_secs *= 2;
1707 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1708 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1709 }
1710 } else {
1711 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1712 }
1713 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1714 error_marker.clear();
1715 }
1716 }
1717 omapvals.reset();
1718
1719 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
1720 yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, sync_marker.marker,
1721 &next_marker, &log_entries, &truncated));
1722 if (retcode < 0 && retcode != -ENOENT) {
1723 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
1724 lease_cr->go_down();
1725 drain_all();
1726 return set_cr_error(retcode);
1727 }
1728
1729 if (log_entries.size() > 0) {
1730 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1731 }
1732
1733 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1734 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
1735 retcode = parse_bucket_key(log_iter->entry.key, source_bs);
1736 if (retcode < 0) {
1737 tn->log(1, SSTR("failed to parse bucket shard: " << log_iter->entry.key));
1738 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1739 continue;
1740 }
1741 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1742 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
1743 } else {
1744 yield_spawn_window(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id,
1745 log_iter->log_timestamp, false),
1746 spawn_window, std::nullopt);
1747 }
1748 }
1749
1750 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1751 << " next_marker=" << next_marker << " truncated=" << truncated));
1752 if (!next_marker.empty()) {
1753 sync_marker.marker = next_marker;
1754 } else if (!log_entries.empty()) {
1755 sync_marker.marker = log_entries.back().log_id;
1756 }
1757 if (!truncated) {
1758 // we reached the end, wait a while before checking for more
1759 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1760 yield wait(get_idle_interval());
1761 }
1762 } while (true);
1763 }
1764 return 0;
1765 }
1766
1767 utime_t get_idle_interval() const {
1768 #define INCREMENTAL_INTERVAL 20
1769 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1770 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1771 auto now = ceph::coarse_real_clock::now();
1772 if (error_retry_time > now) {
1773 auto d = error_retry_time - now;
1774 if (interval > d) {
1775 interval = d;
1776 }
1777 }
1778 }
1779 // convert timespan -> time_point -> utime_t
1780 return utime_t(ceph::coarse_real_clock::zero() + interval);
1781 }
1782 };
1783
1784 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1785 RGWDataSyncCtx *sc;
1786 RGWDataSyncEnv *sync_env;
1787
1788 rgw_pool pool;
1789
1790 uint32_t shard_id;
1791 rgw_data_sync_marker sync_marker;
1792
1793 RGWSyncTraceNodeRef tn;
1794 public:
1795 RGWDataSyncShardControlCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool,
1796 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1797 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, false),
1798 sc(_sc), sync_env(_sc->env),
1799 pool(_pool),
1800 shard_id(_shard_id),
1801 sync_marker(_marker) {
1802 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
1803 }
1804
1805 RGWCoroutine *alloc_cr() override {
1806 return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, tn, backoff_ptr());
1807 }
1808
1809 RGWCoroutine *alloc_finisher_cr() override {
1810 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
1811 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
1812 &sync_marker);
1813 }
1814
1815 void append_modified_shards(set<string>& keys) {
1816 std::lock_guard l{cr_lock()};
1817
1818 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1819 if (!cr) {
1820 return;
1821 }
1822
1823 cr->append_modified_shards(keys);
1824 }
1825 };
1826
1827 class RGWDataSyncCR : public RGWCoroutine {
1828 RGWDataSyncCtx *sc;
1829 RGWDataSyncEnv *sync_env;
1830 uint32_t num_shards;
1831
1832 rgw_data_sync_status sync_status;
1833
1834 ceph::mutex shard_crs_lock =
1835 ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
1836 map<int, RGWDataSyncShardControlCR *> shard_crs;
1837
1838 bool *reset_backoff;
1839
1840 RGWSyncTraceNodeRef tn;
1841
1842 RGWDataSyncModule *data_sync_module{nullptr};
1843 public:
1844 RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
1845 sc(_sc), sync_env(_sc->env),
1846 num_shards(_num_shards),
1847 reset_backoff(_reset_backoff), tn(_tn) {
1848
1849 }
1850
1851 ~RGWDataSyncCR() override {
1852 for (auto iter : shard_crs) {
1853 iter.second->put();
1854 }
1855 }
1856
1857 int operate(const DoutPrefixProvider *dpp) override {
1858 reenter(this) {
1859
1860 /* read sync status */
1861 yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status));
1862
1863 data_sync_module = sync_env->sync_module->get_data_handler();
1864
1865 if (retcode < 0 && retcode != -ENOENT) {
1866 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
1867 return set_cr_error(retcode);
1868 }
1869
1870 /* state: init status */
1871 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1872 tn->log(20, SSTR("init"));
1873 sync_status.sync_info.num_shards = num_shards;
1874 uint64_t instance_id;
1875 instance_id = ceph::util::generate_random_number<uint64_t>();
1876 yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status));
1877 if (retcode < 0) {
1878 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
1879 return set_cr_error(retcode);
1880 }
1881 // sets state = StateBuildingFullSyncMaps
1882
1883 *reset_backoff = true;
1884 }
1885
1886 data_sync_module->init(sc, sync_status.sync_info.instance_id);
1887
1888 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1889 tn->log(10, SSTR("building full sync maps"));
1890 /* call sync module init here */
1891 sync_status.sync_info.num_shards = num_shards;
1892 yield call(data_sync_module->init_sync(dpp, sc));
1893 if (retcode < 0) {
1894 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
1895 return set_cr_error(retcode);
1896 }
1897 /* state: building full sync maps */
1898 yield call(new RGWListBucketIndexesCR(sc, &sync_status));
1899 if (retcode < 0) {
1900 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
1901 return set_cr_error(retcode);
1902 }
1903 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1904
1905 /* update new state */
1906 yield call(set_sync_info_cr());
1907 if (retcode < 0) {
1908 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
1909 return set_cr_error(retcode);
1910 }
1911
1912 *reset_backoff = true;
1913 }
1914
1915 yield call(data_sync_module->start_sync(dpp, sc));
1916 if (retcode < 0) {
1917 tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
1918 return set_cr_error(retcode);
1919 }
1920
1921 yield {
1922 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1923 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
1924 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1925 iter != sync_status.sync_markers.end(); ++iter) {
1926 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sc, sync_env->svc->zone->get_zone_params().log_pool,
1927 iter->first, iter->second, tn);
1928 cr->get();
1929 shard_crs_lock.lock();
1930 shard_crs[iter->first] = cr;
1931 shard_crs_lock.unlock();
1932 spawn(cr, true);
1933 }
1934 }
1935 }
1936
1937 return set_cr_done();
1938 }
1939 return 0;
1940 }
1941
1942 RGWCoroutine *set_sync_info_cr() {
1943 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
1944 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
1945 sync_status.sync_info);
1946 }
1947
1948 void wakeup(int shard_id, set<string>& keys) {
1949 std::lock_guard l{shard_crs_lock};
1950 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1951 if (iter == shard_crs.end()) {
1952 return;
1953 }
1954 iter->second->append_modified_shards(keys);
1955 iter->second->wakeup();
1956 }
1957 };
1958
1959 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1960 public:
1961 RGWDefaultDataSyncModule() {}
1962
1963 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1964 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1965 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
1966 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1967 };
1968
1969 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1970 RGWDefaultDataSyncModule data_handler;
1971 public:
1972 RGWDefaultSyncModuleInstance() {}
1973 RGWDataSyncModule *get_data_handler() override {
1974 return &data_handler;
1975 }
1976 bool supports_user_writes() override {
1977 return true;
1978 }
1979 };
1980
1981 int RGWDefaultSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1982 {
1983 instance->reset(new RGWDefaultSyncModuleInstance());
1984 return 0;
1985 }
1986
1987 class RGWUserPermHandler {
1988 friend struct Init;
1989 friend class Bucket;
1990
1991 RGWDataSyncEnv *sync_env;
1992 rgw_user uid;
1993
1994 struct _info {
1995 RGWUserInfo user_info;
1996 rgw::IAM::Environment env;
1997 std::unique_ptr<rgw::auth::Identity> identity;
1998 RGWAccessControlPolicy user_acl;
1999 };
2000
2001 std::shared_ptr<_info> info;
2002
2003 struct Init;
2004
2005 std::shared_ptr<Init> init_action;
2006
2007 struct Init : public RGWGenericAsyncCR::Action {
2008 RGWDataSyncEnv *sync_env;
2009
2010 rgw_user uid;
2011 std::shared_ptr<RGWUserPermHandler::_info> info;
2012
2013 int ret{0};
2014
2015 Init(RGWUserPermHandler *handler) : sync_env(handler->sync_env),
2016 uid(handler->uid),
2017 info(handler->info) {}
2018 int operate() override {
2019 auto user_ctl = sync_env->store->getRados()->ctl.user;
2020
2021 ret = user_ctl->get_info_by_uid(sync_env->dpp, uid, &info->user_info, null_yield);
2022 if (ret < 0) {
2023 return ret;
2024 }
2025
2026 info->identity = rgw::auth::transform_old_authinfo(sync_env->cct,
2027 uid,
2028 RGW_PERM_FULL_CONTROL,
2029 false, /* system_request? */
2030 TYPE_RGW);
2031
2032 map<string, bufferlist> uattrs;
2033
2034 ret = user_ctl->get_attrs_by_uid(sync_env->dpp, uid, &uattrs, null_yield);
2035 if (ret == 0) {
2036 ret = RGWUserPermHandler::policy_from_attrs(sync_env->cct, uattrs, &info->user_acl);
2037 }
2038 if (ret == -ENOENT) {
2039 info->user_acl.create_default(uid, info->user_info.display_name);
2040 }
2041
2042 return 0;
2043 }
2044 };
2045
2046 public:
2047 RGWUserPermHandler(RGWDataSyncEnv *_sync_env,
2048 const rgw_user& _uid) : sync_env(_sync_env),
2049 uid(_uid) {}
2050
2051 RGWCoroutine *init_cr() {
2052 info = make_shared<_info>();
2053 init_action = make_shared<Init>(this);
2054
2055 return new RGWGenericAsyncCR(sync_env->cct,
2056 sync_env->async_rados,
2057 init_action);
2058 }
2059
2060 class Bucket {
2061 RGWDataSyncEnv *sync_env;
2062 std::shared_ptr<_info> info;
2063 RGWAccessControlPolicy bucket_acl;
2064 std::optional<perm_state> ps;
2065 public:
2066 Bucket() {}
2067
2068 int init(RGWUserPermHandler *handler,
2069 const RGWBucketInfo& bucket_info,
2070 const map<string, bufferlist>& bucket_attrs);
2071
2072 bool verify_bucket_permission(int perm);
2073 bool verify_object_permission(const map<string, bufferlist>& obj_attrs,
2074 int perm);
2075 };
2076
2077 static int policy_from_attrs(CephContext *cct,
2078 const map<string, bufferlist>& attrs,
2079 RGWAccessControlPolicy *acl) {
2080 acl->set_ctx(cct);
2081
2082 auto aiter = attrs.find(RGW_ATTR_ACL);
2083 if (aiter == attrs.end()) {
2084 return -ENOENT;
2085 }
2086 auto iter = aiter->second.begin();
2087 try {
2088 acl->decode(iter);
2089 } catch (buffer::error& err) {
2090 ldout(cct, 0) << "ERROR: " << __func__ << "(): could not decode policy, caught buffer::error" << dendl;
2091 return -EIO;
2092 }
2093
2094 return 0;
2095 }
2096
2097 int init_bucket(const RGWBucketInfo& bucket_info,
2098 const map<string, bufferlist>& bucket_attrs,
2099 Bucket *bs) {
2100 return bs->init(this, bucket_info, bucket_attrs);
2101 }
2102 };
2103
2104 int RGWUserPermHandler::Bucket::init(RGWUserPermHandler *handler,
2105 const RGWBucketInfo& bucket_info,
2106 const map<string, bufferlist>& bucket_attrs)
2107 {
2108 sync_env = handler->sync_env;
2109 info = handler->info;
2110
2111 int r = RGWUserPermHandler::policy_from_attrs(sync_env->cct, bucket_attrs, &bucket_acl);
2112 if (r < 0) {
2113 return r;
2114 }
2115
2116 ps.emplace(sync_env->cct,
2117 info->env,
2118 info->identity.get(),
2119 bucket_info,
2120 info->identity->get_perm_mask(),
2121 false, /* defer to bucket acls */
2122 nullptr, /* referer */
2123 false); /* request_payer */
2124
2125 return 0;
2126 }
2127
2128 bool RGWUserPermHandler::Bucket::verify_bucket_permission(int perm)
2129 {
2130 return verify_bucket_permission_no_policy(sync_env->dpp,
2131 &(*ps),
2132 &info->user_acl,
2133 &bucket_acl,
2134 perm);
2135 }
2136
2137 bool RGWUserPermHandler::Bucket::verify_object_permission(const map<string, bufferlist>& obj_attrs,
2138 int perm)
2139 {
2140 RGWAccessControlPolicy obj_acl;
2141
2142 int r = policy_from_attrs(sync_env->cct, obj_attrs, &obj_acl);
2143 if (r < 0) {
2144 return r;
2145 }
2146
2147 return verify_bucket_permission_no_policy(sync_env->dpp,
2148 &(*ps),
2149 &bucket_acl,
2150 &obj_acl,
2151 perm);
2152 }
2153
2154 class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
2155 rgw_bucket_sync_pipe sync_pipe;
2156
2157 std::shared_ptr<RGWUserPermHandler::Bucket> bucket_perms;
2158 std::optional<rgw_sync_pipe_dest_params> verify_dest_params;
2159
2160 std::optional<ceph::real_time> mtime;
2161 std::optional<string> etag;
2162 std::optional<uint64_t> obj_size;
2163
2164 std::unique_ptr<rgw::auth::Identity> identity;
2165
2166 std::shared_ptr<bool> need_retry;
2167
2168 public:
2169 RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe,
2170 std::shared_ptr<RGWUserPermHandler::Bucket>& _bucket_perms,
2171 std::optional<rgw_sync_pipe_dest_params>&& _verify_dest_params,
2172 std::shared_ptr<bool>& _need_retry) : sync_pipe(_sync_pipe),
2173 bucket_perms(_bucket_perms),
2174 verify_dest_params(std::move(_verify_dest_params)),
2175 need_retry(_need_retry) {
2176 *need_retry = false;
2177 }
2178
2179 int filter(CephContext *cct,
2180 const rgw_obj_key& source_key,
2181 const RGWBucketInfo& dest_bucket_info,
2182 std::optional<rgw_placement_rule> dest_placement_rule,
2183 const map<string, bufferlist>& obj_attrs,
2184 std::optional<rgw_user> *poverride_owner,
2185 const rgw_placement_rule **prule) override;
2186 };
2187
2188 int RGWFetchObjFilter_Sync::filter(CephContext *cct,
2189 const rgw_obj_key& source_key,
2190 const RGWBucketInfo& dest_bucket_info,
2191 std::optional<rgw_placement_rule> dest_placement_rule,
2192 const map<string, bufferlist>& obj_attrs,
2193 std::optional<rgw_user> *poverride_owner,
2194 const rgw_placement_rule **prule)
2195 {
2196 int abort_err = -ERR_PRECONDITION_FAILED;
2197
2198 rgw_sync_pipe_params params;
2199
2200 RGWObjTags obj_tags;
2201
2202 auto iter = obj_attrs.find(RGW_ATTR_TAGS);
2203 if (iter != obj_attrs.end()) {
2204 try {
2205 auto it = iter->second.cbegin();
2206 obj_tags.decode(it);
2207 } catch (buffer::error &err) {
2208 ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
2209 }
2210 }
2211
2212 if (!sync_pipe.info.handler.find_obj_params(source_key,
2213 obj_tags.get_tags(),
2214 &params)) {
2215 return abort_err;
2216 }
2217
2218 if (verify_dest_params &&
2219 !(*verify_dest_params == params.dest)) {
2220 /* raced! original dest params were different, will need to retry */
2221 ldout(cct, 0) << "WARNING: " << __func__ << ": pipe dest params are different than original params, must have raced with object rewrite, retrying" << dendl;
2222 *need_retry = true;
2223 return -ECANCELED;
2224 }
2225
2226 std::optional<std::map<string, bufferlist> > new_attrs;
2227
2228 if (params.dest.acl_translation) {
2229 rgw_user& acl_translation_owner = params.dest.acl_translation->owner;
2230 if (!acl_translation_owner.empty()) {
2231 if (params.mode == rgw_sync_pipe_params::MODE_USER &&
2232 acl_translation_owner != dest_bucket_info.owner) {
2233 ldout(cct, 0) << "ERROR: " << __func__ << ": acl translation was requested, but user (" << acl_translation_owner
2234 << ") is not dest bucket owner (" << dest_bucket_info.owner << ")" << dendl;
2235 return -EPERM;
2236 }
2237 *poverride_owner = acl_translation_owner;
2238 }
2239 }
2240 if (params.mode == rgw_sync_pipe_params::MODE_USER) {
2241 if (!bucket_perms->verify_object_permission(obj_attrs, RGW_PERM_READ)) {
2242 ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to fetch object" << dendl;
2243 return -EPERM;
2244 }
2245 }
2246
2247 if (!dest_placement_rule &&
2248 params.dest.storage_class) {
2249 dest_rule.storage_class = *params.dest.storage_class;
2250 dest_rule.inherit_from(dest_bucket_info.placement_rule);
2251 dest_placement_rule = dest_rule;
2252 *prule = &dest_rule;
2253 }
2254
2255 return RGWFetchObjFilter_Default::filter(cct,
2256 source_key,
2257 dest_bucket_info,
2258 dest_placement_rule,
2259 obj_attrs,
2260 poverride_owner,
2261 prule);
2262 }
2263
2264 class RGWObjFetchCR : public RGWCoroutine {
2265 RGWDataSyncCtx *sc;
2266 RGWDataSyncEnv *sync_env;
2267 rgw_bucket_sync_pipe& sync_pipe;
2268 rgw_obj_key& key;
2269 std::optional<rgw_obj_key> dest_key;
2270 std::optional<uint64_t> versioned_epoch;
2271 rgw_zone_set *zones_trace;
2272
2273 bool need_more_info{false};
2274 bool check_change{false};
2275
2276 ceph::real_time src_mtime;
2277 uint64_t src_size;
2278 string src_etag;
2279 map<string, bufferlist> src_attrs;
2280 map<string, string> src_headers;
2281
2282 std::optional<rgw_user> param_user;
2283 rgw_sync_pipe_params::Mode param_mode;
2284
2285 std::optional<RGWUserPermHandler> user_perms;
2286 std::shared_ptr<RGWUserPermHandler::Bucket> source_bucket_perms;
2287 RGWUserPermHandler::Bucket dest_bucket_perms;
2288
2289 std::optional<rgw_sync_pipe_dest_params> dest_params;
2290
2291 int try_num{0};
2292 std::shared_ptr<bool> need_retry;
2293 public:
2294 RGWObjFetchCR(RGWDataSyncCtx *_sc,
2295 rgw_bucket_sync_pipe& _sync_pipe,
2296 rgw_obj_key& _key,
2297 std::optional<rgw_obj_key> _dest_key,
2298 std::optional<uint64_t> _versioned_epoch,
2299 rgw_zone_set *_zones_trace) : RGWCoroutine(_sc->cct),
2300 sc(_sc), sync_env(_sc->env),
2301 sync_pipe(_sync_pipe),
2302 key(_key),
2303 dest_key(_dest_key),
2304 versioned_epoch(_versioned_epoch),
2305 zones_trace(_zones_trace) {
2306 }
2307
2308
2309 int operate(const DoutPrefixProvider *dpp) override {
2310 reenter(this) {
2311
2312 #define MAX_RACE_RETRIES_OBJ_FETCH 10
2313 for (try_num = 0; try_num < MAX_RACE_RETRIES_OBJ_FETCH; ++try_num) {
2314
2315 {
2316 std::optional<rgw_user> param_acl_translation;
2317 std::optional<string> param_storage_class;
2318
2319 if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
2320 &param_user,
2321 &param_acl_translation,
2322 &param_storage_class,
2323 &param_mode,
2324 &need_more_info)) {
2325 if (!need_more_info) {
2326 return set_cr_error(-ERR_PRECONDITION_FAILED);
2327 }
2328 }
2329 }
2330
2331 if (need_more_info) {
2332 ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
2333 /*
2334 * we need to fetch info about source object, so that we can determine
2335 * the correct policy configuration. This can happen if there are multiple
2336 * policy rules, and some depend on the object tagging */
2337 yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
2338 sync_env->store,
2339 sc->source_zone,
2340 sync_pipe.info.source_bs.bucket,
2341 key,
2342 &src_mtime,
2343 &src_size,
2344 &src_etag,
2345 &src_attrs,
2346 &src_headers));
2347 if (retcode < 0) {
2348 return set_cr_error(retcode);
2349 }
2350
2351 RGWObjTags obj_tags;
2352
2353 auto iter = src_attrs.find(RGW_ATTR_TAGS);
2354 if (iter != src_attrs.end()) {
2355 try {
2356 auto it = iter->second.cbegin();
2357 obj_tags.decode(it);
2358 } catch (buffer::error &err) {
2359 ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
2360 }
2361 }
2362
2363 rgw_sync_pipe_params params;
2364 if (!sync_pipe.info.handler.find_obj_params(key,
2365 obj_tags.get_tags(),
2366 &params)) {
2367 return set_cr_error(-ERR_PRECONDITION_FAILED);
2368 }
2369
2370 param_user = params.user;
2371 param_mode = params.mode;
2372
2373 dest_params = params.dest;
2374 }
2375
2376 if (param_mode == rgw_sync_pipe_params::MODE_USER) {
2377 if (!param_user) {
2378 ldout(cct, 20) << "ERROR: " << __func__ << ": user level sync but user param not set" << dendl;
2379 return set_cr_error(-EPERM);
2380 }
2381 user_perms.emplace(sync_env, *param_user);
2382
2383 yield call(user_perms->init_cr());
2384 if (retcode < 0) {
2385 ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init user perms manager for uid=" << *param_user << dendl;
2386 return set_cr_error(retcode);
2387 }
2388
2389 /* verify that user is allowed to write at the target bucket */
2390 int r = user_perms->init_bucket(sync_pipe.dest_bucket_info,
2391 sync_pipe.dest_bucket_attrs,
2392 &dest_bucket_perms);
2393 if (r < 0) {
2394 ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
2395 return set_cr_error(retcode);
2396 }
2397
2398 if (!dest_bucket_perms.verify_bucket_permission(RGW_PERM_WRITE)) {
2399 ldout(cct, 0) << "ERROR: " << __func__ << ": permission check failed: user not allowed to write into bucket (bucket=" << sync_pipe.info.dest_bs.bucket.get_key() << ")" << dendl;
2400 return -EPERM;
2401 }
2402
2403 /* init source bucket permission structure */
2404 source_bucket_perms = make_shared<RGWUserPermHandler::Bucket>();
2405 r = user_perms->init_bucket(sync_pipe.source_bucket_info,
2406 sync_pipe.source_bucket_attrs,
2407 source_bucket_perms.get());
2408 if (r < 0) {
2409 ldout(cct, 20) << "ERROR: " << __func__ << ": failed to init bucket perms manager for uid=" << *param_user << " bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << dendl;
2410 return set_cr_error(retcode);
2411 }
2412 }
2413
2414 yield {
2415 if (!need_retry) {
2416 need_retry = make_shared<bool>();
2417 }
2418 auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe,
2419 source_bucket_perms,
2420 std::move(dest_params),
2421 need_retry);
2422
2423 call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
2424 nullopt,
2425 sync_pipe.info.source_bs.bucket,
2426 std::nullopt, sync_pipe.dest_bucket_info,
2427 key, dest_key, versioned_epoch,
2428 true,
2429 std::static_pointer_cast<RGWFetchObjFilter>(filter),
2430 zones_trace, sync_env->counters, dpp));
2431 }
2432 if (retcode < 0) {
2433 if (*need_retry) {
2434 continue;
2435 }
2436 return set_cr_error(retcode);
2437 }
2438
2439 return set_cr_done();
2440 }
2441
2442 ldout(cct, 0) << "ERROR: " << __func__ << ": Too many retries trying to fetch object, possibly a bug: bucket=" << sync_pipe.source_bucket_info.bucket.get_key() << " key=" << key << dendl;
2443
2444 return set_cr_error(-EIO);
2445 }
2446 return 0;
2447 }
2448 };
2449
2450 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
2451 {
2452 return new RGWObjFetchCR(sc, sync_pipe, key, std::nullopt, versioned_epoch, zones_trace);
2453 }
2454
2455 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
2456 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
2457 {
2458 auto sync_env = sc->env;
2459 return new RGWRemoveObjCR(sync_env->dpp, sync_env->async_rados, sync_env->store, sc->source_zone,
2460 sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
2461 NULL, NULL, false, &mtime, zones_trace);
2462 }
2463
2464 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
2465 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
2466 {
2467 auto sync_env = sc->env;
2468 return new RGWRemoveObjCR(sync_env->dpp, sync_env->async_rados, sync_env->store, sc->source_zone,
2469 sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
2470 &owner.id, &owner.display_name, true, &mtime, zones_trace);
2471 }
2472
2473 class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
2474 public:
2475 RGWArchiveDataSyncModule() {}
2476
2477 RGWCoroutine *sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
2478 RGWCoroutine *remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
2479 RGWCoroutine *create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
2480 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
2481 };
2482
2483 class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
2484 RGWArchiveDataSyncModule data_handler;
2485 public:
2486 RGWArchiveSyncModuleInstance() {}
2487 RGWDataSyncModule *get_data_handler() override {
2488 return &data_handler;
2489 }
2490 RGWMetadataHandler *alloc_bucket_meta_handler() override {
2491 return RGWArchiveBucketMetaHandlerAllocator::alloc();
2492 }
2493 RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler() override {
2494 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
2495 }
2496 };
2497
2498 int RGWArchiveSyncModule::create_instance(const DoutPrefixProvider *dpp, CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
2499 {
2500 instance->reset(new RGWArchiveSyncModuleInstance());
2501 return 0;
2502 }
2503
2504 RGWCoroutine *RGWArchiveDataSyncModule::sync_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
2505 {
2506 auto sync_env = sc->env;
2507 ldout(sc->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
2508 if (!sync_pipe.dest_bucket_info.versioned() ||
2509 (sync_pipe.dest_bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
2510 ldout(sc->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
2511 sync_pipe.dest_bucket_info.flags = (sync_pipe.dest_bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
2512 int op_ret = sync_env->store->getRados()->put_bucket_instance_info(sync_pipe.dest_bucket_info, false, real_time(), NULL, sync_env->dpp);
2513 if (op_ret < 0) {
2514 ldpp_dout(sync_env->dpp, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
2515 return NULL;
2516 }
2517 }
2518
2519 std::optional<rgw_obj_key> dest_key;
2520
2521 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
2522 versioned_epoch = 0;
2523 dest_key = key;
2524 if (key.instance.empty()) {
2525 sync_env->store->getRados()->gen_rand_obj_instance_name(&(*dest_key));
2526 }
2527 }
2528
2529 return new RGWObjFetchCR(sc, sync_pipe, key, dest_key, versioned_epoch, zones_trace);
2530 }
2531
2532 RGWCoroutine *RGWArchiveDataSyncModule::remove_object(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
2533 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
2534 {
2535 ldout(sc->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
2536 return NULL;
2537 }
2538
2539 RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(const DoutPrefixProvider *dpp, RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
2540 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
2541 {
2542 ldout(sc->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
2543 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
2544 auto sync_env = sc->env;
2545 return new RGWRemoveObjCR(sync_env->dpp, sync_env->async_rados, sync_env->store, sc->source_zone,
2546 sync_pipe.dest_bucket_info, key, versioned, versioned_epoch,
2547 &owner.id, &owner.display_name, true, &mtime, zones_trace);
2548 }
2549
2550 class RGWDataSyncControlCR : public RGWBackoffControlCR
2551 {
2552 RGWDataSyncCtx *sc;
2553 RGWDataSyncEnv *sync_env;
2554 uint32_t num_shards;
2555
2556 RGWSyncTraceNodeRef tn;
2557
2558 static constexpr bool exit_on_error = false; // retry on all errors
2559 public:
2560 RGWDataSyncControlCR(RGWDataSyncCtx *_sc, uint32_t _num_shards,
2561 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, exit_on_error),
2562 sc(_sc), sync_env(_sc->env), num_shards(_num_shards) {
2563 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
2564 }
2565
2566 RGWCoroutine *alloc_cr() override {
2567 return new RGWDataSyncCR(sc, num_shards, tn, backoff_ptr());
2568 }
2569
2570 void wakeup(int shard_id, set<string>& keys) {
2571 ceph::mutex& m = cr_lock();
2572
2573 m.lock();
2574 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
2575 if (!cr) {
2576 m.unlock();
2577 return;
2578 }
2579
2580 cr->get();
2581 m.unlock();
2582
2583 if (cr) {
2584 tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
2585 cr->wakeup(shard_id, keys);
2586 }
2587
2588 cr->put();
2589 }
2590 };
2591
2592 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
2593 std::shared_lock rl{lock};
2594 if (!data_sync_cr) {
2595 return;
2596 }
2597 data_sync_cr->wakeup(shard_id, keys);
2598 }
2599
2600 int RGWRemoteDataLog::run_sync(const DoutPrefixProvider *dpp, int num_shards)
2601 {
2602 lock.lock();
2603 data_sync_cr = new RGWDataSyncControlCR(&sc, num_shards, tn);
2604 data_sync_cr->get(); // run() will drop a ref, so take another
2605 lock.unlock();
2606
2607 int r = run(dpp, data_sync_cr);
2608
2609 lock.lock();
2610 data_sync_cr->put();
2611 data_sync_cr = NULL;
2612 lock.unlock();
2613
2614 if (r < 0) {
2615 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
2616 return r;
2617 }
2618 return 0;
2619 }
2620
2621 CephContext *RGWDataSyncStatusManager::get_cct() const
2622 {
2623 return store->ctx();
2624 }
2625
2626 int RGWDataSyncStatusManager::init(const DoutPrefixProvider *dpp)
2627 {
2628 RGWZone *zone_def;
2629
2630 if (!store->svc()->zone->find_zone(source_zone, &zone_def)) {
2631 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
2632 return -EIO;
2633 }
2634
2635 if (!store->svc()->sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
2636 return -ENOTSUP;
2637 }
2638
2639 const RGWZoneParams& zone_params = store->svc()->zone->get_zone_params();
2640
2641 if (sync_module == nullptr) {
2642 sync_module = store->get_sync_module();
2643 }
2644
2645 conn = store->svc()->zone->get_zone_conn(source_zone);
2646 if (!conn) {
2647 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2648 return -EINVAL;
2649 }
2650
2651 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2652
2653 int r = source_log.init(source_zone, conn, error_logger, store->getRados()->get_sync_tracer(),
2654 sync_module, counters);
2655 if (r < 0) {
2656 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
2657 finalize();
2658 return r;
2659 }
2660
2661 rgw_datalog_info datalog_info;
2662 r = source_log.read_log_info(dpp, &datalog_info);
2663 if (r < 0) {
2664 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
2665 finalize();
2666 return r;
2667 }
2668
2669 num_shards = datalog_info.num_shards;
2670
2671 for (int i = 0; i < num_shards; i++) {
2672 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
2673 }
2674
2675 return 0;
2676 }
2677
2678 void RGWDataSyncStatusManager::finalize()
2679 {
2680 delete error_logger;
2681 error_logger = nullptr;
2682 }
2683
2684 unsigned RGWDataSyncStatusManager::get_subsys() const
2685 {
2686 return dout_subsys;
2687 }
2688
2689 std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
2690 {
2691 auto zone = std::string_view{source_zone.id};
2692 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
2693 }
2694
2695 string RGWDataSyncStatusManager::sync_status_oid(const rgw_zone_id& source_zone)
2696 {
2697 char buf[datalog_sync_status_oid_prefix.size() + source_zone.id.size() + 16];
2698 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.id.c_str());
2699
2700 return string(buf);
2701 }
2702
2703 string RGWDataSyncStatusManager::shard_obj_name(const rgw_zone_id& source_zone, int shard_id)
2704 {
2705 char buf[datalog_sync_status_shard_prefix.size() + source_zone.id.size() + 16];
2706 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.id.c_str(), shard_id);
2707
2708 return string(buf);
2709 }
2710
2711 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
2712 RGWDataSyncCtx *sc;
2713 RGWDataSyncEnv *sync_env;
2714 const string instance_key;
2715
2716 rgw_bucket_index_marker_info *info;
2717
2718 public:
2719 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncCtx *_sc,
2720 const rgw_bucket_shard& bs,
2721 rgw_bucket_index_marker_info *_info)
2722 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
2723 instance_key(bs.get_key()), info(_info) {}
2724
2725 int operate(const DoutPrefixProvider *dpp) override {
2726 reenter(this) {
2727 yield {
2728 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
2729 { "bucket-instance", instance_key.c_str() },
2730 { "info" , NULL },
2731 { NULL, NULL } };
2732
2733 string p = "/admin/log/";
2734 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, info));
2735 }
2736 if (retcode < 0) {
2737 return set_cr_error(retcode);
2738 }
2739 return set_cr_done();
2740 }
2741 return 0;
2742 }
2743 };
2744
2745 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
2746 RGWDataSyncCtx *sc;
2747 RGWDataSyncEnv *sync_env;
2748
2749 const rgw_bucket_sync_pair_info& sync_pair;
2750 const string sync_status_oid;
2751
2752 rgw_bucket_shard_sync_info& status;
2753 RGWObjVersionTracker& objv_tracker;
2754 rgw_bucket_index_marker_info info;
2755 public:
2756 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
2757 const rgw_bucket_sync_pair_info& _sync_pair,
2758 rgw_bucket_shard_sync_info& _status,
2759 RGWObjVersionTracker& objv_tracker)
2760 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
2761 sync_pair(_sync_pair),
2762 sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)),
2763 status(_status), objv_tracker(objv_tracker)
2764 {}
2765
2766 int operate(const DoutPrefixProvider *dpp) override {
2767 reenter(this) {
2768 /* fetch current position in logs */
2769 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sc, sync_pair.source_bs, &info));
2770 if (retcode < 0 && retcode != -ENOENT) {
2771 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
2772 return set_cr_error(retcode);
2773 }
2774 yield {
2775 auto store = sync_env->store;
2776 rgw_raw_obj obj(sync_env->svc->zone->get_zone_params().log_pool, sync_status_oid);
2777 const bool stopped = status.state == rgw_bucket_shard_sync_info::StateStopped;
2778 bool write_status = false;
2779
2780 if (info.syncstopped) {
2781 if (stopped && !sync_env->sync_module->should_full_sync()) {
2782 // preserve our current incremental marker position
2783 write_status = true;
2784 }
2785 } else {
2786 // whether or not to do full sync, incremental sync will follow anyway
2787 if (sync_env->sync_module->should_full_sync()) {
2788 status.state = rgw_bucket_shard_sync_info::StateFullSync;
2789 status.inc_marker.position = info.max_marker;
2790 } else {
2791 // clear the marker position unless we're resuming from SYNCSTOP
2792 if (!stopped) {
2793 status.inc_marker.position = "";
2794 }
2795 status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2796 }
2797 write_status = true;
2798 status.inc_marker.timestamp = ceph::real_clock::now();
2799 }
2800
2801 if (write_status) {
2802 map<string, bufferlist> attrs;
2803 status.encode_all_attrs(attrs);
2804 call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker));
2805 } else {
2806 call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
2807 }
2808 }
2809 if (info.syncstopped) {
2810 retcode = -ENOENT;
2811 }
2812 if (retcode < 0) {
2813 return set_cr_error(retcode);
2814 }
2815 return set_cr_done();
2816 }
2817 return 0;
2818 }
2819 };
2820
2821 RGWRemoteBucketManager::RGWRemoteBucketManager(const DoutPrefixProvider *_dpp,
2822 RGWDataSyncEnv *_sync_env,
2823 const rgw_zone_id& _source_zone,
2824 RGWRESTConn *_conn,
2825 const RGWBucketInfo& source_bucket_info,
2826 const rgw_bucket& dest_bucket) : dpp(_dpp), sync_env(_sync_env)
2827 {
2828 conn = _conn;
2829 source_zone = _source_zone;
2830
2831 int num_shards = (source_bucket_info.layout.current_index.layout.normal.num_shards <= 0 ?
2832 1 : source_bucket_info.layout.current_index.layout.normal.num_shards);
2833
2834 sync_pairs.resize(num_shards);
2835
2836 int cur_shard = std::min<int>(source_bucket_info.layout.current_index.layout.normal.num_shards, 0);
2837
2838 for (int i = 0; i < num_shards; ++i, ++cur_shard) {
2839 auto& sync_pair = sync_pairs[i];
2840
2841 sync_pair.source_bs.bucket = source_bucket_info.bucket;
2842 sync_pair.dest_bs.bucket = dest_bucket;
2843
2844 sync_pair.source_bs.shard_id = (source_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? cur_shard : -1);
2845
2846 if (dest_bucket == source_bucket_info.bucket) {
2847 sync_pair.dest_bs.shard_id = sync_pair.source_bs.shard_id;
2848 } else {
2849 sync_pair.dest_bs.shard_id = -1;
2850 }
2851 }
2852
2853 sc.init(sync_env, conn, source_zone);
2854 }
2855
2856 RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker)
2857 {
2858 if ((size_t)num >= sync_pairs.size()) {
2859 return nullptr;
2860 }
2861 return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker);
2862 }
2863
2864 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2865
2866 template <class T>
2867 static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
2868 {
2869 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
2870 if (iter == attrs.end()) {
2871 *val = T();
2872 return false;
2873 }
2874
2875 auto biter = iter->second.cbegin();
2876 try {
2877 decode(*val, biter);
2878 } catch (buffer::error& err) {
2879 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
2880 return false;
2881 }
2882 return true;
2883 }
2884
2885 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
2886 {
2887 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
2888 decode_attr(cct, attrs, "state", &state);
2889 }
2890 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
2891 decode_attr(cct, attrs, "full_marker", &full_marker);
2892 }
2893 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
2894 decode_attr(cct, attrs, "inc_marker", &inc_marker);
2895 }
2896 }
2897
2898 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
2899 {
2900 encode_state_attr(attrs);
2901 full_marker.encode_attr(attrs);
2902 inc_marker.encode_attr(attrs);
2903 }
2904
2905 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
2906 {
2907 using ceph::encode;
2908 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
2909 }
2910
2911 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2912 {
2913 using ceph::encode;
2914 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
2915 }
2916
2917 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2918 {
2919 using ceph::encode;
2920 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
2921 }
2922
2923 class RGWReadBucketPipeSyncStatusCoroutine : public RGWCoroutine {
2924 RGWDataSyncCtx *sc;
2925 RGWDataSyncEnv *sync_env;
2926 string oid;
2927 rgw_bucket_shard_sync_info *status;
2928 RGWObjVersionTracker* objv_tracker;
2929 map<string, bufferlist> attrs;
2930 public:
2931 RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
2932 const rgw_bucket_sync_pair_info& sync_pair,
2933 rgw_bucket_shard_sync_info *_status,
2934 RGWObjVersionTracker* objv_tracker)
2935 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
2936 oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
2937 status(_status), objv_tracker(objv_tracker)
2938 {}
2939 int operate(const DoutPrefixProvider *dpp) override;
2940 };
2941
2942 int RGWReadBucketPipeSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp)
2943 {
2944 reenter(this) {
2945 yield call(new RGWSimpleRadosReadAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
2946 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, oid),
2947 &attrs, true, objv_tracker));
2948 if (retcode == -ENOENT) {
2949 *status = rgw_bucket_shard_sync_info();
2950 return set_cr_done();
2951 }
2952 if (retcode < 0) {
2953 ldpp_dout(dpp, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2954 return set_cr_error(retcode);
2955 }
2956 status->decode_from_attrs(sync_env->cct, attrs);
2957 return set_cr_done();
2958 }
2959 return 0;
2960 }
2961
2962 #define OMAP_READ_MAX_ENTRIES 10
2963 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2964 RGWDataSyncCtx *sc;
2965 RGWDataSyncEnv *sync_env;
2966 rgw::sal::RadosStore* store;
2967
2968 const int shard_id;
2969 int max_entries;
2970
2971 set<string>& recovering_buckets;
2972 string marker;
2973 string error_oid;
2974
2975 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
2976 set<string> error_entries;
2977 int max_omap_entries;
2978 int count;
2979
2980 public:
2981 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id,
2982 set<string>& _recovering_buckets, const int _max_entries)
2983 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
2984 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2985 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2986 {
2987 error_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id) + ".retry";
2988 }
2989
2990 int operate(const DoutPrefixProvider *dpp) override;
2991 };
2992
2993 int RGWReadRecoveringBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
2994 {
2995 reenter(this){
2996 //read recovering bucket shards
2997 count = 0;
2998 do {
2999 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
3000 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, error_oid),
3001 marker, max_omap_entries, omapkeys));
3002
3003 if (retcode == -ENOENT) {
3004 break;
3005 }
3006
3007 if (retcode < 0) {
3008 ldpp_dout(dpp, 0) << "failed to read recovering bucket shards with "
3009 << cpp_strerror(retcode) << dendl;
3010 return set_cr_error(retcode);
3011 }
3012
3013 error_entries = std::move(omapkeys->entries);
3014 if (error_entries.empty()) {
3015 break;
3016 }
3017
3018 count += error_entries.size();
3019 marker = *error_entries.rbegin();
3020 recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
3021 std::make_move_iterator(error_entries.end()));
3022 } while (omapkeys->more && count < max_entries);
3023
3024 return set_cr_done();
3025 }
3026
3027 return 0;
3028 }
3029
3030 class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
3031 RGWDataSyncCtx *sc;
3032 RGWDataSyncEnv *sync_env;
3033 rgw::sal::RadosStore* store;
3034
3035 const int shard_id;
3036 int max_entries;
3037
3038 set<string>& pending_buckets;
3039 string marker;
3040 string status_oid;
3041
3042 rgw_data_sync_marker* sync_marker;
3043 int count;
3044
3045 std::string next_marker;
3046 list<rgw_data_change_log_entry> log_entries;
3047 bool truncated;
3048
3049 public:
3050 RGWReadPendingBucketShardsCoroutine(RGWDataSyncCtx *_sc, const int _shard_id,
3051 set<string>& _pending_buckets,
3052 rgw_data_sync_marker* _sync_marker, const int _max_entries)
3053 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3054 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
3055 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
3056 {
3057 status_oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id);
3058 }
3059
3060 int operate(const DoutPrefixProvider *dpp) override;
3061 };
3062
3063 int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
3064 {
3065 reenter(this){
3066 //read sync status marker
3067 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
3068 yield call(new CR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
3069 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
3070 sync_marker));
3071 if (retcode < 0) {
3072 ldpp_dout(dpp, 0) << "failed to read sync status marker with "
3073 << cpp_strerror(retcode) << dendl;
3074 return set_cr_error(retcode);
3075 }
3076
3077 //read pending bucket shards
3078 marker = sync_marker->marker;
3079 count = 0;
3080 do{
3081 yield call(new RGWReadRemoteDataLogShardCR(sc, shard_id, marker,
3082 &next_marker, &log_entries, &truncated));
3083
3084 if (retcode == -ENOENT) {
3085 break;
3086 }
3087
3088 if (retcode < 0) {
3089 ldpp_dout(dpp, 0) << "failed to read remote data log info with "
3090 << cpp_strerror(retcode) << dendl;
3091 return set_cr_error(retcode);
3092 }
3093
3094 if (log_entries.empty()) {
3095 break;
3096 }
3097
3098 count += log_entries.size();
3099 for (const auto& entry : log_entries) {
3100 pending_buckets.insert(entry.entry.key);
3101 }
3102 }while(truncated && count < max_entries);
3103
3104 return set_cr_done();
3105 }
3106
3107 return 0;
3108 }
3109
3110 int RGWRemoteDataLog::read_shard_status(const DoutPrefixProvider *dpp, int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
3111 {
3112 // cannot run concurrently with run_sync(), so run in a separate manager
3113 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
3114 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
3115 int ret = http_manager.start();
3116 if (ret < 0) {
3117 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
3118 return ret;
3119 }
3120 RGWDataSyncEnv sync_env_local = sync_env;
3121 sync_env_local.http_manager = &http_manager;
3122 RGWDataSyncCtx sc_local = sc;
3123 sc_local.env = &sync_env_local;
3124 list<RGWCoroutinesStack *> stacks;
3125 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
3126 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sc_local, shard_id, recovering_buckets, max_entries));
3127 stacks.push_back(recovering_stack);
3128 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
3129 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sc_local, shard_id, pending_buckets, sync_marker, max_entries));
3130 stacks.push_back(pending_stack);
3131 ret = crs.run(dpp, stacks);
3132 http_manager.stop();
3133 return ret;
3134 }
3135
3136 RGWCoroutine *RGWRemoteBucketManager::read_sync_status_cr(int num, rgw_bucket_shard_sync_info *sync_status)
3137 {
3138 if ((size_t)num >= sync_pairs.size()) {
3139 return nullptr;
3140 }
3141
3142 return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr);
3143 }
3144
3145 RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RadosStore* _store,
3146 std::optional<rgw_zone_id> _source_zone,
3147 std::optional<rgw_bucket> _source_bucket,
3148 const rgw_bucket& _dest_bucket) : store(_store),
3149 cr_mgr(_store->ctx(), _store->getRados()->get_cr_registry()),
3150 http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
3151 source_zone(_source_zone), source_bucket(_source_bucket),
3152 conn(NULL), error_logger(NULL),
3153 dest_bucket(_dest_bucket),
3154 num_shards(0)
3155 {
3156 }
3157
3158 RGWBucketPipeSyncStatusManager::~RGWBucketPipeSyncStatusManager()
3159 {
3160 for (vector<RGWRemoteBucketManager *>::iterator iter = source_mgrs.begin(); iter != source_mgrs.end(); ++iter) {
3161 delete *iter;
3162 }
3163 delete error_logger;
3164 }
3165
3166 CephContext *RGWBucketPipeSyncStatusManager::get_cct() const
3167 {
3168 return store->ctx();
3169 }
3170
3171 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
3172 {
3173 JSONDecoder::decode_json("ID", id, obj);
3174 JSONDecoder::decode_json("DisplayName", display_name, obj);
3175 }
3176
3177 struct bucket_list_entry {
3178 bool delete_marker;
3179 rgw_obj_key key;
3180 bool is_latest;
3181 real_time mtime;
3182 string etag;
3183 uint64_t size;
3184 string storage_class;
3185 rgw_bucket_entry_owner owner;
3186 uint64_t versioned_epoch;
3187 string rgw_tag;
3188
3189 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
3190
3191 void decode_json(JSONObj *obj) {
3192 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
3193 JSONDecoder::decode_json("Key", key.name, obj);
3194 JSONDecoder::decode_json("VersionId", key.instance, obj);
3195 JSONDecoder::decode_json("IsLatest", is_latest, obj);
3196 string mtime_str;
3197 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
3198
3199 struct tm t;
3200 uint32_t nsec;
3201 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
3202 ceph_timespec ts;
3203 ts.tv_sec = (uint64_t)internal_timegm(&t);
3204 ts.tv_nsec = nsec;
3205 mtime = real_clock::from_ceph_timespec(ts);
3206 }
3207 JSONDecoder::decode_json("ETag", etag, obj);
3208 JSONDecoder::decode_json("Size", size, obj);
3209 JSONDecoder::decode_json("StorageClass", storage_class, obj);
3210 JSONDecoder::decode_json("Owner", owner, obj);
3211 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
3212 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
3213 if (key.instance == "null" && !versioned_epoch) {
3214 key.instance.clear();
3215 }
3216 }
3217
3218 RGWModifyOp get_modify_op() const {
3219 if (delete_marker) {
3220 return CLS_RGW_OP_LINK_OLH_DM;
3221 } else if (!key.instance.empty() && key.instance != "null") {
3222 return CLS_RGW_OP_LINK_OLH;
3223 } else {
3224 return CLS_RGW_OP_ADD;
3225 }
3226 }
3227 };
3228
3229 struct bucket_list_result {
3230 string name;
3231 string prefix;
3232 string key_marker;
3233 string version_id_marker;
3234 int max_keys;
3235 bool is_truncated;
3236 list<bucket_list_entry> entries;
3237
3238 bucket_list_result() : max_keys(0), is_truncated(false) {}
3239
3240 void decode_json(JSONObj *obj) {
3241 JSONDecoder::decode_json("Name", name, obj);
3242 JSONDecoder::decode_json("Prefix", prefix, obj);
3243 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
3244 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
3245 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
3246 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
3247 JSONDecoder::decode_json("Entries", entries, obj);
3248 }
3249 };
3250
3251 class RGWListBucketShardCR: public RGWCoroutine {
3252 RGWDataSyncCtx *sc;
3253 RGWDataSyncEnv *sync_env;
3254 const rgw_bucket_shard& bs;
3255 const string instance_key;
3256 rgw_obj_key marker_position;
3257
3258 bucket_list_result *result;
3259
3260 public:
3261 RGWListBucketShardCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
3262 rgw_obj_key& _marker_position, bucket_list_result *_result)
3263 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), bs(bs),
3264 instance_key(bs.get_key()), marker_position(_marker_position),
3265 result(_result) {}
3266
3267 int operate(const DoutPrefixProvider *dpp) override {
3268 reenter(this) {
3269 yield {
3270 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
3271 { "versions" , NULL },
3272 { "format" , "json" },
3273 { "objs-container" , "true" },
3274 { "key-marker" , marker_position.name.c_str() },
3275 { "version-id-marker" , marker_position.instance.c_str() },
3276 { NULL, NULL } };
3277 // don't include tenant in the url, it's already part of instance_key
3278 string p = string("/") + bs.bucket.name;
3279 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sc->conn, sync_env->http_manager, p, pairs, result));
3280 }
3281 if (retcode < 0) {
3282 return set_cr_error(retcode);
3283 }
3284 return set_cr_done();
3285 }
3286 return 0;
3287 }
3288 };
3289
3290 class RGWListBucketIndexLogCR: public RGWCoroutine {
3291 RGWDataSyncCtx *sc;
3292 RGWDataSyncEnv *sync_env;
3293 const string instance_key;
3294 string marker;
3295
3296 list<rgw_bi_log_entry> *result;
3297 std::optional<PerfGuard> timer;
3298
3299 public:
3300 RGWListBucketIndexLogCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& bs,
3301 string& _marker, list<rgw_bi_log_entry> *_result)
3302 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3303 instance_key(bs.get_key()), marker(_marker), result(_result) {}
3304
3305 int operate(const DoutPrefixProvider *dpp) override {
3306 reenter(this) {
3307 if (sync_env->counters) {
3308 timer.emplace(sync_env->counters, sync_counters::l_poll);
3309 }
3310 yield {
3311 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
3312 { "format" , "json" },
3313 { "marker" , marker.c_str() },
3314 { "type", "bucket-index" },
3315 { NULL, NULL } };
3316
3317 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sc->conn, sync_env->http_manager, "/admin/log", pairs, result));
3318 }
3319 timer.reset();
3320 if (retcode < 0) {
3321 if (sync_env->counters) {
3322 sync_env->counters->inc(sync_counters::l_poll_err);
3323 }
3324 return set_cr_error(retcode);
3325 }
3326 return set_cr_done();
3327 }
3328 return 0;
3329 }
3330 };
3331
3332 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
3333
3334 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
3335 RGWDataSyncCtx *sc;
3336 RGWDataSyncEnv *sync_env;
3337
3338 string marker_oid;
3339 rgw_bucket_shard_full_sync_marker sync_marker;
3340 RGWSyncTraceNodeRef tn;
3341 RGWObjVersionTracker& objv_tracker;
3342
3343 public:
3344 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
3345 const string& _marker_oid,
3346 const rgw_bucket_shard_full_sync_marker& _marker,
3347 RGWSyncTraceNodeRef tn,
3348 RGWObjVersionTracker& objv_tracker)
3349 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
3350 sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid),
3351 sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker)
3352 {}
3353
3354 RGWCoroutine* store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
3355 sync_marker.position = new_marker;
3356 sync_marker.count = index_pos;
3357
3358 map<string, bufferlist> attrs;
3359 sync_marker.encode_attr(attrs);
3360
3361 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
3362 return new RGWSimpleRadosWriteAttrsCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
3363 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
3364 attrs, &objv_tracker);
3365 }
3366
3367 RGWOrderCallCR *allocate_order_control_cr() override {
3368 return new RGWLastCallerWinsCR(sync_env->cct);
3369 }
3370 };
3371
3372 // write the incremental sync status and update 'stable_timestamp' on success
3373 class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine {
3374 RGWDataSyncEnv *sync_env;
3375 rgw_raw_obj obj;
3376 rgw_bucket_shard_inc_sync_marker sync_marker;
3377 ceph::real_time* stable_timestamp;
3378 RGWObjVersionTracker& objv_tracker;
3379 std::map<std::string, bufferlist> attrs;
3380 public:
3381 RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env,
3382 const rgw_raw_obj& obj,
3383 const rgw_bucket_shard_inc_sync_marker& sync_marker,
3384 ceph::real_time* stable_timestamp,
3385 RGWObjVersionTracker& objv_tracker)
3386 : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj),
3387 sync_marker(sync_marker), stable_timestamp(stable_timestamp),
3388 objv_tracker(objv_tracker)
3389 {}
3390 int operate(const DoutPrefixProvider *dpp) {
3391 reenter(this) {
3392 sync_marker.encode_attr(attrs);
3393
3394 yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
3395 obj, attrs, &objv_tracker));
3396 if (retcode < 0) {
3397 return set_cr_error(retcode);
3398 }
3399 if (stable_timestamp) {
3400 *stable_timestamp = sync_marker.timestamp;
3401 }
3402 return set_cr_done();
3403 }
3404 return 0;
3405 }
3406 };
3407
3408 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
3409 RGWDataSyncCtx *sc;
3410 RGWDataSyncEnv *sync_env;
3411
3412 rgw_raw_obj obj;
3413 rgw_bucket_shard_inc_sync_marker sync_marker;
3414
3415 map<rgw_obj_key, string> key_to_marker;
3416
3417 struct operation {
3418 rgw_obj_key key;
3419 bool is_olh;
3420 };
3421 map<string, operation> marker_to_op;
3422 std::set<std::string> pending_olh; // object names with pending olh operations
3423
3424 RGWSyncTraceNodeRef tn;
3425 RGWObjVersionTracker& objv_tracker;
3426 ceph::real_time* stable_timestamp;
3427
3428 void handle_finish(const string& marker) override {
3429 auto iter = marker_to_op.find(marker);
3430 if (iter == marker_to_op.end()) {
3431 return;
3432 }
3433 auto& op = iter->second;
3434 key_to_marker.erase(op.key);
3435 reset_need_retry(op.key);
3436 if (op.is_olh) {
3437 pending_olh.erase(op.key.name);
3438 }
3439 marker_to_op.erase(iter);
3440 }
3441
3442 public:
3443 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
3444 const string& _marker_oid,
3445 const rgw_bucket_shard_inc_sync_marker& _marker,
3446 RGWSyncTraceNodeRef tn,
3447 RGWObjVersionTracker& objv_tracker,
3448 ceph::real_time* stable_timestamp)
3449 : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
3450 sc(_sc), sync_env(_sc->env),
3451 obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid),
3452 sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker),
3453 stable_timestamp(stable_timestamp)
3454 {}
3455
3456 RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
3457 sync_marker.position = new_marker;
3458 sync_marker.timestamp = timestamp;
3459
3460 tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp));
3461 return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker,
3462 stable_timestamp, objv_tracker);
3463 }
3464
3465 /*
3466 * create index from key -> <op, marker>, and from marker -> key
3467 * this is useful so that we can insure that we only have one
3468 * entry for any key that is used. This is needed when doing
3469 * incremenatl sync of data, and we don't want to run multiple
3470 * concurrent sync operations for the same bucket shard
3471 * Also, we should make sure that we don't run concurrent operations on the same key with
3472 * different ops.
3473 */
3474 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
3475 auto result = key_to_marker.emplace(key, marker);
3476 if (!result.second) { // exists
3477 set_need_retry(key);
3478 return false;
3479 }
3480 marker_to_op[marker] = operation{key, is_olh};
3481 if (is_olh) {
3482 // prevent other olh ops from starting on this object name
3483 pending_olh.insert(key.name);
3484 }
3485 return true;
3486 }
3487
3488 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
3489 // serialize olh ops on the same object name
3490 if (is_olh && pending_olh.count(key.name)) {
3491 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
3492 return false;
3493 }
3494 return (key_to_marker.find(key) == key_to_marker.end());
3495 }
3496
3497 RGWOrderCallCR *allocate_order_control_cr() override {
3498 return new RGWLastCallerWinsCR(sync_env->cct);
3499 }
3500 };
3501
3502 static bool ignore_sync_error(int err) {
3503 switch (err) {
3504 case -ENOENT:
3505 case -EPERM:
3506 return true;
3507 default:
3508 break;
3509 }
3510 return false;
3511 }
3512
3513 template <class T, class K>
3514 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
3515 RGWDataSyncCtx *sc;
3516 RGWDataSyncEnv *sync_env;
3517
3518 rgw_bucket_sync_pipe& sync_pipe;
3519 rgw_bucket_shard& bs;
3520
3521 rgw_obj_key key;
3522 bool versioned;
3523 std::optional<uint64_t> versioned_epoch;
3524 rgw_bucket_entry_owner owner;
3525 real_time timestamp;
3526 RGWModifyOp op;
3527 RGWPendingState op_state;
3528
3529 T entry_marker;
3530 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
3531
3532 int sync_status;
3533
3534 stringstream error_ss;
3535
3536 bool error_injection;
3537
3538 RGWDataSyncModule *data_sync_module;
3539
3540 rgw_zone_set zones_trace;
3541
3542 RGWSyncTraceNodeRef tn;
3543 public:
3544 RGWBucketSyncSingleEntryCR(RGWDataSyncCtx *_sc,
3545 rgw_bucket_sync_pipe& _sync_pipe,
3546 const rgw_obj_key& _key, bool _versioned,
3547 std::optional<uint64_t> _versioned_epoch,
3548 real_time& _timestamp,
3549 const rgw_bucket_entry_owner& _owner,
3550 RGWModifyOp _op, RGWPendingState _op_state,
3551 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
3552 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct),
3553 sc(_sc), sync_env(_sc->env),
3554 sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
3555 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
3556 owner(_owner),
3557 timestamp(_timestamp), op(_op),
3558 op_state(_op_state),
3559 entry_marker(_entry_marker),
3560 marker_tracker(_marker_tracker),
3561 sync_status(0){
3562 stringstream ss;
3563 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
3564 set_description() << "bucket sync single entry (source_zone=" << sc->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
3565 set_status("init");
3566
3567 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
3568
3569 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sc->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
3570 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
3571
3572 data_sync_module = sync_env->sync_module->get_data_handler();
3573
3574 zones_trace = _zones_trace;
3575 zones_trace.insert(sync_env->svc->zone->get_zone().id, _sync_pipe.info.dest_bs.get_key());
3576 }
3577
3578 int operate(const DoutPrefixProvider *dpp) override {
3579 reenter(this) {
3580 /* skip entries that are not complete */
3581 if (op_state != CLS_RGW_STATE_COMPLETE) {
3582 goto done;
3583 }
3584 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
3585 do {
3586 yield {
3587 marker_tracker->reset_need_retry(key);
3588 if (key.name.empty()) {
3589 /* shouldn't happen */
3590 set_status("skipping empty entry");
3591 tn->log(0, "entry with empty obj name, skipping");
3592 goto done;
3593 }
3594 if (error_injection &&
3595 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
3596 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
3597 retcode = -EIO;
3598 } else if (op == CLS_RGW_OP_ADD ||
3599 op == CLS_RGW_OP_LINK_OLH) {
3600 set_status("syncing obj");
3601 tn->log(5, SSTR("bucket sync: sync obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
3602 call(data_sync_module->sync_object(dpp, sc, sync_pipe, key, versioned_epoch, &zones_trace));
3603 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
3604 set_status("removing obj");
3605 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
3606 versioned = true;
3607 }
3608 tn->log(10, SSTR("removing obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
3609 call(data_sync_module->remove_object(dpp, sc, sync_pipe, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
3610 // our copy of the object is more recent, continue as if it succeeded
3611 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
3612 set_status("creating delete marker");
3613 tn->log(10, SSTR("creating delete marker: obj: " << sc->source_zone << "/" << bs.bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
3614 call(data_sync_module->create_delete_marker(dpp, sc, sync_pipe, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
3615 }
3616 tn->set_resource_name(SSTR(bucket_str_noinstance(bs.bucket) << "/" << key));
3617 }
3618 if (retcode == -ERR_PRECONDITION_FAILED) {
3619 set_status("Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
3620 tn->log(0, "Skipping object sync: precondition failed (object contains newer change or policy doesn't allow sync)");
3621 retcode = 0;
3622 }
3623 } while (marker_tracker->need_retry(key));
3624 {
3625 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
3626 if (retcode >= 0) {
3627 tn->log(10, "success");
3628 } else {
3629 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
3630 }
3631 }
3632
3633 if (retcode < 0 && retcode != -ENOENT) {
3634 set_status() << "failed to sync obj; retcode=" << retcode;
3635 tn->log(0, SSTR("ERROR: failed to sync object: "
3636 << bucket_shard_str{bs} << "/" << key.name));
3637 if (!ignore_sync_error(retcode)) {
3638 error_ss << bucket_shard_str{bs} << "/" << key.name;
3639 sync_status = retcode;
3640 }
3641 }
3642 if (!error_ss.str().empty()) {
3643 yield call(sync_env->error_logger->log_error_cr(dpp, sc->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
3644 }
3645 done:
3646 if (sync_status == 0) {
3647 /* update marker */
3648 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
3649 yield call(marker_tracker->finish(entry_marker));
3650 sync_status = retcode;
3651 }
3652 if (sync_status < 0) {
3653 return set_cr_error(sync_status);
3654 }
3655 return set_cr_done();
3656 }
3657 return 0;
3658 }
3659 };
3660
3661 #define BUCKET_SYNC_SPAWN_WINDOW 20
3662
3663 class RGWBucketShardFullSyncCR : public RGWCoroutine {
3664 RGWDataSyncCtx *sc;
3665 RGWDataSyncEnv *sync_env;
3666 rgw_bucket_sync_pipe& sync_pipe;
3667 rgw_bucket_shard& bs;
3668 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
3669 bucket_list_result list_result;
3670 list<bucket_list_entry>::iterator entries_iter;
3671 rgw_bucket_shard_sync_info& sync_info;
3672 rgw_obj_key list_marker;
3673 bucket_list_entry *entry{nullptr};
3674
3675 int total_entries{0};
3676
3677 int sync_status{0};
3678
3679 const string& status_oid;
3680
3681 rgw_zone_set zones_trace;
3682
3683 RGWSyncTraceNodeRef tn;
3684 RGWBucketFullSyncShardMarkerTrack marker_tracker;
3685
3686 struct _prefix_handler {
3687 RGWBucketSyncFlowManager::pipe_rules_ref rules;
3688 RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter;
3689 std::optional<string> cur_prefix;
3690
3691 void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref& _rules) {
3692 rules = _rules;
3693 }
3694
3695 bool revalidate_marker(rgw_obj_key *marker) {
3696 if (cur_prefix &&
3697 boost::starts_with(marker->name, *cur_prefix)) {
3698 return true;
3699 }
3700 if (!rules) {
3701 return false;
3702 }
3703 iter = rules->prefix_search(marker->name);
3704 if (iter == rules->prefix_end()) {
3705 return false;
3706 }
3707 cur_prefix = iter->first;
3708 marker->name = *cur_prefix;
3709 marker->instance.clear();
3710 return true;
3711 }
3712
3713 bool check_key_handled(const rgw_obj_key& key) {
3714 if (!rules) {
3715 return false;
3716 }
3717 if (cur_prefix &&
3718 boost::starts_with(key.name, *cur_prefix)) {
3719 return true;
3720 }
3721 iter = rules->prefix_search(key.name);
3722 if (iter == rules->prefix_end()) {
3723 return false;
3724 }
3725 cur_prefix = iter->first;
3726 return boost::starts_with(key.name, iter->first);
3727 }
3728 } prefix_handler;
3729
3730 public:
3731 RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
3732 rgw_bucket_sync_pipe& _sync_pipe,
3733 const std::string& status_oid,
3734 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
3735 rgw_bucket_shard_sync_info& sync_info,
3736 RGWSyncTraceNodeRef tn_parent,
3737 RGWObjVersionTracker& objv_tracker)
3738 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3739 sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
3740 lease_cr(std::move(lease_cr)), sync_info(sync_info),
3741 status_oid(status_oid),
3742 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
3743 SSTR(bucket_shard_str{bs}))),
3744 marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker)
3745 {
3746 zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key());
3747 prefix_handler.set_rules(sync_pipe.get_rules());
3748 }
3749
3750 int operate(const DoutPrefixProvider *dpp) override;
3751 };
3752
3753 int RGWBucketShardFullSyncCR::operate(const DoutPrefixProvider *dpp)
3754 {
3755 reenter(this) {
3756 list_marker = sync_info.full_marker.position;
3757
3758 total_entries = sync_info.full_marker.count;
3759 do {
3760 if (lease_cr && !lease_cr->is_locked()) {
3761 drain_all();
3762 return set_cr_error(-ECANCELED);
3763 }
3764 set_status("listing remote bucket");
3765 tn->log(20, "listing bucket for full sync");
3766
3767 if (!prefix_handler.revalidate_marker(&list_marker)) {
3768 set_status() << "finished iterating over all available prefixes: last marker=" << list_marker;
3769 tn->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker));
3770 break;
3771 }
3772
3773 yield call(new RGWListBucketShardCR(sc, bs, list_marker,
3774 &list_result));
3775 if (retcode < 0 && retcode != -ENOENT) {
3776 set_status("failed bucket listing, going down");
3777 drain_all();
3778 return set_cr_error(retcode);
3779 }
3780 if (list_result.entries.size() > 0) {
3781 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
3782 }
3783 entries_iter = list_result.entries.begin();
3784 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
3785 if (lease_cr && !lease_cr->is_locked()) {
3786 drain_all();
3787 return set_cr_error(-ECANCELED);
3788 }
3789 tn->log(20, SSTR("[full sync] syncing object: "
3790 << bucket_shard_str{bs} << "/" << entries_iter->key));
3791 entry = &(*entries_iter);
3792 list_marker = entries_iter->key;
3793 if (!prefix_handler.check_key_handled(entries_iter->key)) {
3794 set_status() << "skipping entry due to policy rules: " << entries_iter->key;
3795 tn->log(20, SSTR("skipping entry due to policy rules: " << entries_iter->key));
3796 continue;
3797 }
3798 total_entries++;
3799 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
3800 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
3801 } else {
3802 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
3803 yield spawn(new SyncCR(sc, sync_pipe, entry->key,
3804 false, /* versioned, only matters for object removal */
3805 entry->versioned_epoch, entry->mtime,
3806 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
3807 entry->key, &marker_tracker, zones_trace, tn),
3808 false);
3809 }
3810 drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
3811 [&](uint64_t stack_id, int ret) {
3812 if (ret < 0) {
3813 tn->log(10, "a sync operation returned error");
3814 sync_status = ret;
3815 }
3816 return 0;
3817 });
3818 }
3819 } while (list_result.is_truncated && sync_status == 0);
3820 set_status("done iterating over all objects");
3821 /* wait for all operations to complete */
3822
3823 drain_all_cb([&](uint64_t stack_id, int ret) {
3824 if (ret < 0) {
3825 tn->log(10, "a sync operation returned error");
3826 sync_status = ret;
3827 }
3828 return 0;
3829 });
3830 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
3831 if (lease_cr && !lease_cr->is_locked()) {
3832 return set_cr_error(-ECANCELED);
3833 }
3834 /* update sync state to incremental */
3835 if (sync_status == 0) {
3836 yield {
3837 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
3838 map<string, bufferlist> attrs;
3839 sync_info.encode_state_attr(attrs);
3840 call(new RGWSimpleRadosWriteAttrsCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
3841 rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
3842 attrs));
3843 }
3844 } else {
3845 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
3846 }
3847 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
3848 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
3849 << bucket_shard_str{bs} << " retcode=" << retcode));
3850 return set_cr_error(retcode);
3851 }
3852 if (sync_status < 0) {
3853 return set_cr_error(sync_status);
3854 }
3855 return set_cr_done();
3856 }
3857 return 0;
3858 }
3859
3860 static bool has_olh_epoch(RGWModifyOp op) {
3861 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
3862 }
3863
3864 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
3865 RGWDataSyncCtx *sc;
3866 RGWDataSyncEnv *sync_env;
3867 rgw_bucket_sync_pipe& sync_pipe;
3868 RGWBucketSyncFlowManager::pipe_rules_ref rules;
3869 rgw_bucket_shard& bs;
3870 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
3871 list<rgw_bi_log_entry> list_result;
3872 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
3873 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
3874 rgw_bucket_shard_sync_info& sync_info;
3875 rgw_obj_key key;
3876 rgw_bi_log_entry *entry{nullptr};
3877 bool updated_status{false};
3878 rgw_zone_id zone_id;
3879 string target_location_key;
3880
3881 string cur_id;
3882
3883 int sync_status{0};
3884 bool syncstopped{false};
3885
3886 RGWSyncTraceNodeRef tn;
3887 RGWBucketIncSyncShardMarkerTrack marker_tracker;
3888
3889 public:
3890 RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
3891 rgw_bucket_sync_pipe& _sync_pipe,
3892 const std::string& status_oid,
3893 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
3894 rgw_bucket_shard_sync_info& sync_info,
3895 RGWSyncTraceNodeRef& _tn_parent,
3896 RGWObjVersionTracker& objv_tracker,
3897 ceph::real_time* stable_timestamp)
3898 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
3899 sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
3900 lease_cr(std::move(lease_cr)), sync_info(sync_info),
3901 zone_id(sync_env->svc->zone->get_zone().id),
3902 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
3903 SSTR(bucket_shard_str{bs}))),
3904 marker_tracker(sc, status_oid, sync_info.inc_marker, tn,
3905 objv_tracker, stable_timestamp)
3906 {
3907 set_description() << "bucket shard incremental sync bucket="
3908 << bucket_shard_str{bs};
3909 set_status("init");
3910 rules = sync_pipe.get_rules();
3911 target_location_key = sync_pipe.info.dest_bs.bucket.get_key();
3912 }
3913
3914 bool check_key_handled(const rgw_obj_key& key) {
3915 if (!rules) {
3916 return false;
3917 }
3918 auto iter = rules->prefix_search(key.name);
3919 if (iter == rules->prefix_end()) {
3920 return false;
3921 }
3922 return boost::starts_with(key.name, iter->first);
3923 }
3924
3925 int operate(const DoutPrefixProvider *dpp) override;
3926 };
3927
3928 int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
3929 {
3930 int ret;
3931 reenter(this) {
3932 do {
3933 if (lease_cr && !lease_cr->is_locked()) {
3934 drain_all();
3935 tn->log(0, "ERROR: lease is not taken, abort");
3936 return set_cr_error(-ECANCELED);
3937 }
3938 tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
3939 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
3940 yield call(new RGWListBucketIndexLogCR(sc, bs, sync_info.inc_marker.position,
3941 &list_result));
3942 if (retcode < 0 && retcode != -ENOENT) {
3943 /* wait for all operations to complete */
3944 drain_all();
3945 return set_cr_error(retcode);
3946 }
3947 squash_map.clear();
3948 entries_iter = list_result.begin();
3949 entries_end = list_result.end();
3950 for (; entries_iter != entries_end; ++entries_iter) {
3951 auto e = *entries_iter;
3952 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
3953 ldpp_dout(dpp, 20) << "syncstop on " << e.timestamp << dendl;
3954 syncstopped = true;
3955 entries_end = std::next(entries_iter); // stop after this entry
3956 break;
3957 }
3958 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
3959 continue;
3960 }
3961 if (e.op == CLS_RGW_OP_CANCEL) {
3962 continue;
3963 }
3964 if (e.state != CLS_RGW_STATE_COMPLETE) {
3965 continue;
3966 }
3967 if (e.zones_trace.exists(zone_id.id, target_location_key)) {
3968 continue;
3969 }
3970 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
3971 // don't squash over olh entries - we need to apply their olh_epoch
3972 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
3973 continue;
3974 }
3975 if (squash_entry.first <= e.timestamp) {
3976 squash_entry = make_pair<>(e.timestamp, e.op);
3977 }
3978 }
3979
3980 entries_iter = list_result.begin();
3981 for (; entries_iter != entries_end; ++entries_iter) {
3982 if (lease_cr && !lease_cr->is_locked()) {
3983 drain_all();
3984 return set_cr_error(-ECANCELED);
3985 }
3986 entry = &(*entries_iter);
3987 {
3988 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3989 if (p < 0) {
3990 cur_id = entry->id;
3991 } else {
3992 cur_id = entry->id.substr(p + 1);
3993 }
3994 }
3995 sync_info.inc_marker.position = cur_id;
3996
3997 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
3998 ldpp_dout(dpp, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
3999 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4000 continue;
4001 }
4002
4003 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
4004 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
4005 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
4006 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4007 continue;
4008 }
4009
4010 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
4011
4012 if (!key.ns.empty()) {
4013 set_status() << "skipping entry in namespace: " << entry->object;
4014 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
4015 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4016 continue;
4017 }
4018
4019 if (!check_key_handled(key)) {
4020 set_status() << "skipping entry due to policy rules: " << entry->object;
4021 tn->log(20, SSTR("skipping entry due to policy rules: " << entry->object));
4022 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4023 continue;
4024 }
4025
4026 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
4027 if (entry->op == CLS_RGW_OP_CANCEL) {
4028 set_status() << "canceled operation, skipping";
4029 tn->log(20, SSTR("skipping object: "
4030 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
4031 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4032 continue;
4033 }
4034 if (entry->state != CLS_RGW_STATE_COMPLETE) {
4035 set_status() << "non-complete operation, skipping";
4036 tn->log(20, SSTR("skipping object: "
4037 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
4038 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4039 continue;
4040 }
4041 if (entry->zones_trace.exists(zone_id.id, target_location_key)) {
4042 set_status() << "redundant operation, skipping";
4043 tn->log(20, SSTR("skipping object: "
4044 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
4045 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4046 continue;
4047 }
4048 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
4049 set_status() << "squashed operation, skipping";
4050 tn->log(20, SSTR("skipping object: "
4051 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
4052 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4053 continue;
4054 }
4055 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
4056 tn->log(20, SSTR("syncing object: "
4057 << bucket_shard_str{bs} << "/" << key));
4058 updated_status = false;
4059 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
4060 if (!updated_status) {
4061 set_status() << "can't do op, conflicting inflight operation";
4062 updated_status = true;
4063 }
4064 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
4065 yield wait_for_child();
4066 bool again = true;
4067 while (again) {
4068 again = collect(&ret, nullptr);
4069 if (ret < 0) {
4070 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
4071 sync_status = ret;
4072 /* we have reported this error */
4073 }
4074 }
4075 if (sync_status != 0)
4076 break;
4077 }
4078 if (sync_status != 0) {
4079 /* get error, stop */
4080 break;
4081 }
4082 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
4083 set_status() << "can't do op, sync already in progress for object";
4084 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
4085 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
4086 continue;
4087 }
4088 // yield {
4089 set_status() << "start object sync";
4090 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
4091 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
4092 } else {
4093 std::optional<uint64_t> versioned_epoch;
4094 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
4095 if (entry->ver.pool < 0) {
4096 versioned_epoch = entry->ver.epoch;
4097 }
4098 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
4099 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
4100 spawn(new SyncCR(sc, sync_pipe, key,
4101 entry->is_versioned(), versioned_epoch,
4102 entry->timestamp, owner, entry->op, entry->state,
4103 cur_id, &marker_tracker, entry->zones_trace, tn),
4104 false);
4105 }
4106 // }
4107 drain_with_cb(BUCKET_SYNC_SPAWN_WINDOW,
4108 [&](uint64_t stack_id, int ret) {
4109 if (ret < 0) {
4110 tn->log(10, "a sync operation returned error");
4111 sync_status = ret;
4112 }
4113 return 0;
4114 });
4115 }
4116 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
4117
4118 drain_all_cb([&](uint64_t stack_id, int ret) {
4119 if (ret < 0) {
4120 tn->log(10, "a sync operation returned error");
4121 sync_status = ret;
4122 }
4123 return 0;
4124 });
4125 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
4126
4127 if (syncstopped) {
4128 // transition to StateStopped in RGWRunBucketSyncCoroutine. if sync is
4129 // still disabled, we'll delete the sync status object. otherwise we'll
4130 // restart full sync to catch any changes that happened while sync was
4131 // disabled
4132 sync_info.state = rgw_bucket_shard_sync_info::StateStopped;
4133 return set_cr_done();
4134 }
4135
4136 yield call(marker_tracker.flush());
4137 if (retcode < 0) {
4138 tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
4139 return set_cr_error(retcode);
4140 }
4141 if (sync_status < 0) {
4142 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
4143 return set_cr_error(sync_status);
4144 }
4145 return set_cr_done();
4146 }
4147 return 0;
4148 }
4149
4150 class RGWGetBucketPeersCR : public RGWCoroutine {
4151 RGWDataSyncEnv *sync_env;
4152
4153 std::optional<rgw_bucket> target_bucket;
4154 std::optional<rgw_zone_id> source_zone;
4155 std::optional<rgw_bucket> source_bucket;
4156
4157 rgw_sync_pipe_info_set *pipes;
4158 map<rgw_bucket, all_bucket_info> buckets_info;
4159 map<rgw_bucket, all_bucket_info>::iterator siiter;
4160 std::optional<all_bucket_info> target_bucket_info;
4161 std::optional<all_bucket_info> source_bucket_info;
4162
4163 rgw_sync_pipe_info_set::iterator siter;
4164
4165 std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
4166 std::shared_ptr<rgw_bucket_get_sync_policy_result> target_policy;
4167
4168 RGWSyncTraceNodeRef tn;
4169
4170 using pipe_const_iter = map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>::const_iterator;
4171
4172 static pair<pipe_const_iter, pipe_const_iter> get_pipe_iters(const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& m, std::optional<rgw_zone_id> zone) {
4173 if (!zone) {
4174 return { m.begin(), m.end() };
4175 }
4176
4177 auto b = m.find(*zone);
4178 if (b == m.end()) {
4179 return { b, b };
4180 }
4181 return { b, std::next(b) };
4182 }
4183
4184 void filter_sources(std::optional<rgw_zone_id> source_zone,
4185 std::optional<rgw_bucket> source_bucket,
4186 const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& all_sources,
4187 rgw_sync_pipe_info_set *result) {
4188 ldpp_dout(sync_env->dpp, 20) << __func__ << ": source_zone=" << source_zone.value_or(rgw_zone_id("*")).id
4189 << " source_bucket=" << source_bucket.value_or(rgw_bucket())
4190 << " all_sources.size()=" << all_sources.size() << dendl;
4191 auto iters = get_pipe_iters(all_sources, source_zone);
4192 for (auto i = iters.first; i != iters.second; ++i) {
4193 for (auto& handler : i->second) {
4194 if (!handler.specific()) {
4195 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
4196 continue;
4197 }
4198 if (source_bucket &&
4199 !source_bucket->match(*handler.source.bucket)) {
4200 continue;
4201 }
4202 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
4203 result->insert(handler, source_bucket_info, target_bucket_info);
4204 }
4205 }
4206 }
4207
4208 void filter_targets(std::optional<rgw_zone_id> target_zone,
4209 std::optional<rgw_bucket> target_bucket,
4210 const map<rgw_zone_id, RGWBucketSyncFlowManager::pipe_set>& all_targets,
4211 rgw_sync_pipe_info_set *result) {
4212 ldpp_dout(sync_env->dpp, 20) << __func__ << ": target_zone=" << source_zone.value_or(rgw_zone_id("*")).id
4213 << " target_bucket=" << source_bucket.value_or(rgw_bucket())
4214 << " all_targets.size()=" << all_targets.size() << dendl;
4215 auto iters = get_pipe_iters(all_targets, target_zone);
4216 for (auto i = iters.first; i != iters.second; ++i) {
4217 for (auto& handler : i->second) {
4218 if (target_bucket &&
4219 handler.dest.bucket &&
4220 !target_bucket->match(*handler.dest.bucket)) {
4221 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
4222 continue;
4223 }
4224 ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
4225 result->insert(handler, source_bucket_info, target_bucket_info);
4226 }
4227 }
4228 }
4229
4230 void update_from_target_bucket_policy();
4231 void update_from_source_bucket_policy();
4232
4233 struct GetHintTargets : public RGWGenericAsyncCR::Action {
4234 RGWDataSyncEnv *sync_env;
4235 rgw_bucket source_bucket;
4236 std::set<rgw_bucket> targets;
4237
4238 GetHintTargets(RGWDataSyncEnv *_sync_env,
4239 const rgw_bucket& _source_bucket) : sync_env(_sync_env),
4240 source_bucket(_source_bucket) {}
4241 int operate() override {
4242 int r = sync_env->svc->bucket_sync->get_bucket_sync_hints(sync_env->dpp,
4243 source_bucket,
4244 nullptr,
4245 &targets,
4246 null_yield);
4247 if (r < 0) {
4248 ldpp_dout(sync_env->dpp, 0) << "ERROR: " << __func__ << "(): failed to fetch bucket sync hints for bucket=" << source_bucket << dendl;
4249 return r;
4250 }
4251
4252 return 0;
4253 }
4254 };
4255
4256 std::shared_ptr<GetHintTargets> get_hint_targets_action;
4257 std::set<rgw_bucket>::iterator hiter;
4258
4259 public:
4260 RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
4261 std::optional<rgw_bucket> _target_bucket,
4262 std::optional<rgw_zone_id> _source_zone,
4263 std::optional<rgw_bucket> _source_bucket,
4264 rgw_sync_pipe_info_set *_pipes,
4265 const RGWSyncTraceNodeRef& _tn_parent)
4266 : RGWCoroutine(_sync_env->cct),
4267 sync_env(_sync_env),
4268 target_bucket(_target_bucket),
4269 source_zone(_source_zone),
4270 source_bucket(_source_bucket),
4271 pipes(_pipes),
4272 tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers",
4273 SSTR( "target=" << target_bucket.value_or(rgw_bucket())
4274 << ":source=" << target_bucket.value_or(rgw_bucket())
4275 << ":source_zone=" << source_zone.value_or(rgw_zone_id("*")).id))) {
4276 }
4277
4278 int operate(const DoutPrefixProvider *dpp) override;
4279 };
4280
4281 std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs) {
4282 if (!bs) {
4283 out << "*";
4284 } else {
4285 out << *bs;
4286 }
4287 return out;
4288 }
4289
4290 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
4291 boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
4292 std::optional<rgw_bucket_shard> _target_bs,
4293 std::optional<rgw_bucket_shard> _source_bs,
4294 const RGWSyncTraceNodeRef& _tn_parent,
4295 ceph::real_time* progress)
4296 : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
4297 lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
4298 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
4299 SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))),
4300 progress(progress)
4301 {
4302 if (target_bs) {
4303 target_bucket = target_bs->bucket;
4304 }
4305 if (source_bs) {
4306 source_bucket = source_bs->bucket;
4307 }
4308 }
4309
4310 int RGWRunBucketSourcesSyncCR::operate(const DoutPrefixProvider *dpp)
4311 {
4312 reenter(this) {
4313 yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
4314 if (retcode < 0 && retcode != -ENOENT) {
4315 tn->log(0, "ERROR: failed to read sync status for bucket");
4316 return set_cr_error(retcode);
4317 }
4318
4319 ldpp_dout(dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
4320
4321 if (pipes.empty()) {
4322 ldpp_dout(dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
4323 return set_cr_done();
4324 }
4325
4326 for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
4327 {
4328 ldpp_dout(dpp, 20) << __func__ << "(): sync pipe=" << *siter << dendl;
4329
4330 source_num_shards = siter->source.get_bucket_info().layout.current_index.layout.normal.num_shards;
4331 target_num_shards = siter->target.get_bucket_info().layout.current_index.layout.normal.num_shards;
4332 if (source_bs) {
4333 sync_pair.source_bs = *source_bs;
4334 } else {
4335 sync_pair.source_bs.bucket = siter->source.get_bucket();
4336 }
4337 sync_pair.dest_bs.bucket = siter->target.get_bucket();
4338
4339 sync_pair.handler = siter->handler;
4340
4341 if (sync_pair.source_bs.shard_id >= 0) {
4342 num_shards = 1;
4343 cur_shard = sync_pair.source_bs.shard_id;
4344 } else {
4345 num_shards = std::max<int>(1, source_num_shards);
4346 cur_shard = std::min<int>(0, source_num_shards);
4347 }
4348 }
4349
4350 ldpp_dout(dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
4351
4352 for (; num_shards > 0; --num_shards, ++cur_shard) {
4353 /*
4354 * use a negatvie shard_id for backward compatibility,
4355 * this affects the crafted status oid
4356 */
4357 sync_pair.source_bs.shard_id = (source_num_shards > 0 ? cur_shard : -1);
4358 if (source_num_shards == target_num_shards) {
4359 sync_pair.dest_bs.shard_id = sync_pair.source_bs.shard_id;
4360 } else {
4361 sync_pair.dest_bs.shard_id = -1;
4362 }
4363
4364 ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
4365
4366 cur_progress = (progress ? &shard_progress[prealloc_stack_id()] : nullptr);
4367
4368 yield_spawn_window(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
4369 cur_progress),
4370 BUCKET_SYNC_SPAWN_WINDOW,
4371 [&](uint64_t stack_id, int ret) {
4372 handle_complete_stack(stack_id);
4373 if (ret < 0) {
4374 tn->log(10, "a sync operation returned error");
4375 }
4376 return ret;
4377 });
4378 }
4379 }
4380 drain_all_cb([&](uint64_t stack_id, int ret) {
4381 handle_complete_stack(stack_id);
4382 if (ret < 0) {
4383 tn->log(10, "a sync operation returned error");
4384 }
4385 return ret;
4386 });
4387 if (progress && min_progress) {
4388 *progress = *min_progress;
4389 }
4390 return set_cr_done();
4391 }
4392
4393 return 0;
4394 }
4395
4396 class RGWSyncGetBucketInfoCR : public RGWCoroutine {
4397 RGWDataSyncEnv *sync_env;
4398 rgw_bucket bucket;
4399 RGWBucketInfo *pbucket_info;
4400 map<string, bufferlist> *pattrs;
4401 RGWMetaSyncEnv meta_sync_env;
4402
4403 RGWSyncTraceNodeRef tn;
4404
4405 public:
4406 RGWSyncGetBucketInfoCR(RGWDataSyncEnv *_sync_env,
4407 const rgw_bucket& _bucket,
4408 RGWBucketInfo *_pbucket_info,
4409 map<string, bufferlist> *_pattrs,
4410 const RGWSyncTraceNodeRef& _tn_parent)
4411 : RGWCoroutine(_sync_env->cct),
4412 sync_env(_sync_env),
4413 bucket(_bucket),
4414 pbucket_info(_pbucket_info),
4415 pattrs(_pattrs),
4416 tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_info",
4417 SSTR(bucket))) {
4418 }
4419
4420 int operate(const DoutPrefixProvider *dpp) override;
4421 };
4422
4423 int RGWSyncGetBucketInfoCR::operate(const DoutPrefixProvider *dpp)
4424 {
4425 reenter(this) {
4426 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info, pattrs, dpp));
4427 if (retcode == -ENOENT) {
4428 /* bucket instance info has not been synced in yet, fetch it now */
4429 yield {
4430 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
4431 string raw_key = string("bucket.instance:") + bucket.get_key();
4432
4433 meta_sync_env.init(dpp, cct, sync_env->store, sync_env->svc->zone->get_master_conn(), sync_env->async_rados,
4434 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
4435
4436 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
4437 string() /* no marker */,
4438 MDLOG_STATUS_COMPLETE,
4439 NULL /* no marker tracker */,
4440 tn));
4441 }
4442 if (retcode < 0) {
4443 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bucket}));
4444 return set_cr_error(retcode);
4445 }
4446
4447 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket, pbucket_info, pattrs, dpp));
4448 }
4449 if (retcode < 0) {
4450 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bucket}));
4451 return set_cr_error(retcode);
4452 }
4453
4454 return set_cr_done();
4455 }
4456
4457 return 0;
4458 }
4459
4460 void RGWGetBucketPeersCR::update_from_target_bucket_policy()
4461 {
4462 if (!target_policy ||
4463 !target_policy->policy_handler ||
4464 !pipes) {
4465 return;
4466 }
4467
4468 auto handler = target_policy->policy_handler.get();
4469
4470 filter_sources(source_zone,
4471 source_bucket,
4472 handler->get_sources(),
4473 pipes);
4474
4475 for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
4476 if (!siter->source.has_bucket_info()) {
4477 buckets_info.emplace(siter->source.get_bucket(), all_bucket_info());
4478 }
4479 if (!siter->target.has_bucket_info()) {
4480 buckets_info.emplace(siter->target.get_bucket(), all_bucket_info());
4481 }
4482 }
4483 }
4484
4485 void RGWGetBucketPeersCR::update_from_source_bucket_policy()
4486 {
4487 if (!source_policy ||
4488 !source_policy->policy_handler ||
4489 !pipes) {
4490 return;
4491 }
4492
4493 auto handler = source_policy->policy_handler.get();
4494
4495 filter_targets(sync_env->svc->zone->get_zone().id,
4496 target_bucket,
4497 handler->get_targets(),
4498 pipes);
4499
4500 for (siter = pipes->begin(); siter != pipes->end(); ++siter) {
4501 if (!siter->source.has_bucket_info()) {
4502 buckets_info.emplace(siter->source.get_bucket(), all_bucket_info());
4503 }
4504 if (!siter->target.has_bucket_info()) {
4505 buckets_info.emplace(siter->target.get_bucket(), all_bucket_info());
4506 }
4507 }
4508 }
4509
4510
4511 class RGWSyncGetBucketSyncPolicyHandlerCR : public RGWCoroutine {
4512 RGWDataSyncEnv *sync_env;
4513 rgw_bucket bucket;
4514 rgw_bucket_get_sync_policy_params get_policy_params;
4515
4516 std::shared_ptr<rgw_bucket_get_sync_policy_result> policy;
4517
4518 RGWSyncTraceNodeRef tn;
4519
4520 int i;
4521
4522 public:
4523 RGWSyncGetBucketSyncPolicyHandlerCR(RGWDataSyncEnv *_sync_env,
4524 std::optional<rgw_zone_id> zone,
4525 const rgw_bucket& _bucket,
4526 std::shared_ptr<rgw_bucket_get_sync_policy_result>& _policy,
4527 const RGWSyncTraceNodeRef& _tn_parent)
4528 : RGWCoroutine(_sync_env->cct),
4529 sync_env(_sync_env),
4530 bucket(_bucket),
4531 policy(_policy),
4532 tn(sync_env->sync_tracer->add_node(_tn_parent, "get_sync_policy_handler",
4533 SSTR(bucket))) {
4534 get_policy_params.zone = zone;
4535 get_policy_params.bucket = bucket;
4536 }
4537
4538 int operate(const DoutPrefixProvider *dpp) override {
4539 reenter(this) {
4540 for (i = 0; i < 2; ++i) {
4541 yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
4542 sync_env->store,
4543 get_policy_params,
4544 policy,
4545 dpp));
4546 if (retcode < 0 &&
4547 retcode != -ENOENT) {
4548 return set_cr_error(retcode);
4549 }
4550
4551 if (retcode == 0) {
4552 return set_cr_done();
4553 }
4554
4555 /* bucket instance was not found,
4556 * try to get bucket instance info, can trigger
4557 * metadata sync of bucket instance
4558 */
4559 yield call(new RGWSyncGetBucketInfoCR(sync_env,
4560 bucket,
4561 nullptr,
4562 nullptr,
4563 tn));
4564 if (retcode < 0) {
4565 return set_cr_error(retcode);
4566 }
4567 }
4568 }
4569
4570 return 0;
4571 }
4572 };
4573
4574
4575 int RGWGetBucketPeersCR::operate(const DoutPrefixProvider *dpp)
4576 {
4577 reenter(this) {
4578 if (pipes) {
4579 pipes->clear();
4580 }
4581 if (target_bucket) {
4582 target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
4583 yield call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env,
4584 nullopt,
4585 *target_bucket,
4586 target_policy,
4587 tn));
4588 if (retcode < 0 &&
4589 retcode != -ENOENT) {
4590 return set_cr_error(retcode);
4591 }
4592
4593 update_from_target_bucket_policy();
4594 }
4595
4596 if (source_bucket && source_zone) {
4597 source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
4598 yield call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env,
4599 source_zone,
4600 *source_bucket,
4601 source_policy,
4602 tn));
4603 if (retcode < 0 &&
4604 retcode != -ENOENT) {
4605 return set_cr_error(retcode);
4606 }
4607
4608 if (source_policy->policy_handler) {
4609 auto& opt_bucket_info = source_policy->policy_handler->get_bucket_info();
4610 auto& opt_attrs = source_policy->policy_handler->get_bucket_attrs();
4611 if (opt_bucket_info && opt_attrs) {
4612 source_bucket_info.emplace();
4613 source_bucket_info->bucket_info = *opt_bucket_info;
4614 source_bucket_info->attrs = *opt_attrs;
4615 }
4616 }
4617
4618 if (!target_bucket) {
4619 get_hint_targets_action = make_shared<GetHintTargets>(sync_env, *source_bucket);
4620
4621 yield call(new RGWGenericAsyncCR(cct, sync_env->async_rados,
4622 get_hint_targets_action));
4623 if (retcode < 0) {
4624 return set_cr_error(retcode);
4625 }
4626
4627 /* hints might have incomplete bucket ids,
4628 * in which case we need to figure out the current
4629 * bucket_id
4630 */
4631 for (hiter = get_hint_targets_action->targets.begin();
4632 hiter != get_hint_targets_action->targets.end();
4633 ++hiter) {
4634 ldpp_dout(dpp, 20) << "Got sync hint for bucket=" << *source_bucket << ": " << hiter->get_key() << dendl;
4635
4636 target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
4637 yield call(new RGWSyncGetBucketSyncPolicyHandlerCR(sync_env,
4638 nullopt,
4639 *hiter,
4640 target_policy,
4641 tn));
4642 if (retcode < 0 &&
4643 retcode != -ENOENT) {
4644 return set_cr_error(retcode);
4645 }
4646 update_from_target_bucket_policy();
4647 }
4648 }
4649 }
4650
4651 update_from_source_bucket_policy();
4652
4653 for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {
4654 if (siiter->second.bucket_info.bucket.name.empty()) {
4655 yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first,
4656 &siiter->second.bucket_info,
4657 &siiter->second.attrs,
4658 tn));
4659 }
4660 }
4661
4662 if (pipes) {
4663 pipes->update_empty_bucket_info(buckets_info);
4664 }
4665
4666 return set_cr_done();
4667 }
4668
4669 return 0;
4670 }
4671
4672 int RGWRunBucketsSyncBySourceCR::operate(const DoutPrefixProvider *dpp)
4673 {
4674 reenter(this) {
4675 return set_cr_done();
4676 }
4677
4678 return 0;
4679 }
4680
4681 int RGWRunBucketSyncCoroutine::operate(const DoutPrefixProvider *dpp)
4682 {
4683 reenter(this) {
4684 yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
4685 if (retcode < 0 && retcode != -ENOENT) {
4686 tn->log(0, "ERROR: failed to read sync status for bucket");
4687 drain_all();
4688 return set_cr_error(retcode);
4689 }
4690
4691 tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
4692
4693 yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
4694 &sync_pipe.source_bucket_attrs, tn));
4695 if (retcode < 0) {
4696 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
4697 drain_all();
4698 return set_cr_error(retcode);
4699 }
4700
4701 yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info,
4702 &sync_pipe.dest_bucket_attrs, tn));
4703 if (retcode < 0) {
4704 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
4705 drain_all();
4706 return set_cr_error(retcode);
4707 }
4708
4709 sync_pipe.info = sync_pair;
4710
4711 do {
4712 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
4713 sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
4714 yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
4715 if (retcode == -ENOENT) {
4716 tn->log(0, "bucket sync disabled");
4717 drain_all();
4718 return set_cr_done();
4719 }
4720 if (retcode < 0) {
4721 tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
4722 drain_all();
4723 return set_cr_error(retcode);
4724 }
4725 }
4726 if (progress) {
4727 *progress = sync_status.inc_marker.timestamp;
4728 }
4729
4730 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
4731 yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
4732 status_oid, lease_cr,
4733 sync_status, tn, objv_tracker));
4734 if (retcode < 0) {
4735 tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
4736 drain_all();
4737 return set_cr_error(retcode);
4738 }
4739 }
4740
4741 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
4742 yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
4743 status_oid, lease_cr,
4744 sync_status, tn,
4745 objv_tracker, progress));
4746 if (retcode < 0) {
4747 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
4748 drain_all();
4749 return set_cr_error(retcode);
4750 }
4751 }
4752 // loop back to previous states unless incremental sync returns normally
4753 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
4754
4755 drain_all();
4756 return set_cr_done();
4757 }
4758
4759 return 0;
4760 }
4761
4762 RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
4763 {
4764 if ((size_t)num >= sync_pairs.size()) {
4765 return nullptr;
4766 }
4767
4768 return new RGWRunBucketSyncCoroutine(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
4769 }
4770
4771 int RGWBucketPipeSyncStatusManager::init(const DoutPrefixProvider *dpp)
4772 {
4773 int ret = http_manager.start();
4774 if (ret < 0) {
4775 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
4776 return ret;
4777 }
4778
4779 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
4780
4781 sync_module.reset(new RGWDefaultSyncModuleInstance());
4782 auto async_rados = store->svc()->rados->get_async_processor();
4783
4784 sync_env.init(this, store->ctx(), store,
4785 store->svc(), async_rados, &http_manager,
4786 error_logger, store->getRados()->get_sync_tracer(),
4787 sync_module, nullptr);
4788
4789 rgw_sync_pipe_info_set pipes;
4790
4791 ret = cr_mgr.run(dpp, new RGWGetBucketPeersCR(&sync_env,
4792 dest_bucket,
4793 source_zone,
4794 source_bucket,
4795 &pipes,
4796 sync_env.sync_tracer->root_node));
4797 if (ret < 0) {
4798 ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl;
4799 return ret;
4800 }
4801
4802 rgw_zone_id last_zone;
4803
4804 for (auto& pipe : pipes) {
4805 auto& szone = pipe.source.zone;
4806
4807 if (last_zone != szone) {
4808 conn = store->svc()->zone->get_zone_conn(szone);
4809 if (!conn) {
4810 ldpp_dout(this, 0) << "connection object to zone " << szone << " does not exist" << dendl;
4811 return -EINVAL;
4812 }
4813 last_zone = szone;
4814 }
4815
4816 source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env,
4817 szone, conn,
4818 pipe.source.get_bucket_info(),
4819 pipe.target.get_bucket()));
4820 }
4821
4822 return 0;
4823 }
4824
4825 int RGWBucketPipeSyncStatusManager::init_sync_status(const DoutPrefixProvider *dpp)
4826 {
4827 list<RGWCoroutinesStack *> stacks;
4828 // pass an empty objv tracker to each so that the version gets incremented
4829 std::list<RGWObjVersionTracker> objvs;
4830
4831 for (auto& mgr : source_mgrs) {
4832 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
4833
4834 for (int i = 0; i < mgr->num_pipes(); ++i) {
4835 objvs.emplace_back();
4836 stack->call(mgr->init_sync_status_cr(i, objvs.back()));
4837 }
4838
4839 stacks.push_back(stack);
4840 }
4841
4842 return cr_mgr.run(dpp, stacks);
4843 }
4844
4845 int RGWBucketPipeSyncStatusManager::read_sync_status(const DoutPrefixProvider *dpp)
4846 {
4847 list<RGWCoroutinesStack *> stacks;
4848
4849 for (auto& mgr : source_mgrs) {
4850 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
4851 for (int i = 0; i < mgr->num_pipes(); ++i) {
4852 stack->call(mgr->read_sync_status_cr(i, &sync_status[i]));
4853 }
4854
4855 stacks.push_back(stack);
4856 }
4857
4858 int ret = cr_mgr.run(dpp, stacks);
4859 if (ret < 0) {
4860 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
4861 << bucket_str{dest_bucket} << dendl;
4862 return ret;
4863 }
4864
4865 return 0;
4866 }
4867
4868 int RGWBucketPipeSyncStatusManager::run(const DoutPrefixProvider *dpp)
4869 {
4870 list<RGWCoroutinesStack *> stacks;
4871
4872 for (auto& mgr : source_mgrs) {
4873 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
4874 for (int i = 0; i < mgr->num_pipes(); ++i) {
4875 stack->call(mgr->run_sync_cr(i));
4876 }
4877
4878 stacks.push_back(stack);
4879 }
4880
4881 int ret = cr_mgr.run(dpp, stacks);
4882 if (ret < 0) {
4883 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
4884 << bucket_str{dest_bucket} << dendl;
4885 return ret;
4886 }
4887
4888 return 0;
4889 }
4890
4891 unsigned RGWBucketPipeSyncStatusManager::get_subsys() const
4892 {
4893 return dout_subsys;
4894 }
4895
4896 std::ostream& RGWBucketPipeSyncStatusManager::gen_prefix(std::ostream& out) const
4897 {
4898 auto zone = std::string_view{source_zone.value_or(rgw_zone_id("*")).id};
4899 return out << "bucket sync zone:" << zone.substr(0, 8)
4900 << " bucket:" << dest_bucket << ' ';
4901 }
4902
4903 string RGWBucketPipeSyncStatusManager::status_oid(const rgw_zone_id& source_zone,
4904 const rgw_bucket_sync_pair_info& sync_pair)
4905 {
4906 if (sync_pair.source_bs == sync_pair.dest_bs) {
4907 return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bs.get_key();
4908 } else {
4909 return bucket_status_oid_prefix + "." + source_zone.id + ":" + sync_pair.dest_bs.get_key() + ":" + sync_pair.source_bs.get_key();
4910 }
4911 }
4912
4913 string RGWBucketPipeSyncStatusManager::obj_status_oid(const rgw_bucket_sync_pipe& sync_pipe,
4914 const rgw_zone_id& source_zone,
4915 const rgw::sal::Object* obj)
4916 {
4917 string prefix = object_status_oid_prefix + "." + source_zone.id + ":" + obj->get_bucket()->get_key().get_key();
4918 if (sync_pipe.source_bucket_info.bucket !=
4919 sync_pipe.dest_bucket_info.bucket) {
4920 prefix += string("/") + sync_pipe.dest_bucket_info.bucket.get_key();
4921 }
4922 return prefix + ":" + obj->get_name() + ":" + obj->get_instance();
4923 }
4924
4925 int rgw_read_remote_bilog_info(const DoutPrefixProvider *dpp,
4926 RGWRESTConn* conn,
4927 const rgw_bucket& bucket,
4928 BucketIndexShardsManager& markers,
4929 optional_yield y)
4930 {
4931 const auto instance_key = bucket.get_key();
4932 const rgw_http_param_pair params[] = {
4933 { "type" , "bucket-index" },
4934 { "bucket-instance", instance_key.c_str() },
4935 { "info" , nullptr },
4936 { nullptr, nullptr }
4937 };
4938 rgw_bucket_index_marker_info result;
4939 int r = conn->get_json_resource(dpp, "/admin/log/", params, y, result);
4940 if (r < 0) {
4941 ldpp_dout(dpp, -1) << "failed to fetch remote log markers: " << cpp_strerror(r) << dendl;
4942 return r;
4943 }
4944 r = markers.from_string(result.max_marker, -1);
4945 if (r < 0) {
4946 lderr(conn->get_ctx()) << "failed to decode remote log markers" << dendl;
4947 return r;
4948 }
4949 return 0;
4950 }
4951
4952 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
4953 static constexpr int max_concurrent_shards = 16;
4954 rgw::sal::RadosStore* const store;
4955 RGWDataSyncCtx *const sc;
4956 RGWDataSyncEnv *const env;
4957 RGWBucketInfo source_bucket_info;
4958 RGWBucketInfo dest_bucket_info;
4959 rgw_bucket_shard source_bs;
4960 rgw_bucket_shard dest_bs;
4961
4962 rgw_bucket_sync_pair_info sync_pair;
4963
4964 bool shard_to_shard_sync;
4965
4966 using Vector = std::vector<rgw_bucket_shard_sync_info>;
4967 Vector::iterator i, end;
4968
4969 public:
4970 RGWCollectBucketSyncStatusCR(rgw::sal::RadosStore* store, RGWDataSyncCtx *sc,
4971 const RGWBucketInfo& source_bucket_info,
4972 const RGWBucketInfo& dest_bucket_info,
4973 Vector *status)
4974 : RGWShardCollectCR(sc->cct, max_concurrent_shards),
4975 store(store), sc(sc), env(sc->env),
4976 source_bucket_info(source_bucket_info),
4977 dest_bucket_info(dest_bucket_info),
4978 i(status->begin()), end(status->end())
4979 {
4980 shard_to_shard_sync = (source_bucket_info.layout.current_index.layout.normal.num_shards == dest_bucket_info.layout.current_index.layout.normal.num_shards);
4981
4982 source_bs = rgw_bucket_shard(source_bucket_info.bucket, source_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? 0 : -1);
4983 dest_bs = rgw_bucket_shard(dest_bucket_info.bucket, dest_bucket_info.layout.current_index.layout.normal.num_shards > 0 ? 0 : -1);
4984
4985 status->clear();
4986 status->resize(std::max<size_t>(1, source_bucket_info.layout.current_index.layout.normal.num_shards));
4987
4988 i = status->begin();
4989 end = status->end();
4990 }
4991
4992 bool spawn_next() override {
4993 if (i == end) {
4994 return false;
4995 }
4996 sync_pair.source_bs = source_bs;
4997 sync_pair.dest_bs = dest_bs;
4998 spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr), false);
4999 ++i;
5000 ++source_bs.shard_id;
5001 if (shard_to_shard_sync) {
5002 dest_bs.shard_id = source_bs.shard_id;
5003 }
5004 return true;
5005 }
5006 };
5007
5008 int rgw_bucket_sync_status(const DoutPrefixProvider *dpp,
5009 rgw::sal::RadosStore* store,
5010 const rgw_sync_bucket_pipe& pipe,
5011 const RGWBucketInfo& dest_bucket_info,
5012 const RGWBucketInfo *psource_bucket_info,
5013 std::vector<rgw_bucket_shard_sync_info> *status)
5014 {
5015 if (!pipe.source.zone ||
5016 !pipe.source.bucket ||
5017 !pipe.dest.zone ||
5018 !pipe.dest.bucket) {
5019 return -EINVAL;
5020 }
5021
5022 if (*pipe.dest.bucket !=
5023 dest_bucket_info.bucket) {
5024 return -EINVAL;
5025 }
5026
5027 const rgw_bucket& source_bucket = *pipe.source.bucket;
5028
5029 RGWBucketInfo source_bucket_info;
5030
5031 if (!psource_bucket_info) {
5032 auto& bucket_ctl = store->getRados()->ctl.bucket;
5033
5034 int ret = bucket_ctl->read_bucket_info(source_bucket, &source_bucket_info, null_yield, dpp);
5035 if (ret < 0) {
5036 ldpp_dout(dpp, 0) << "ERROR: failed to get bucket instance info: bucket=" << source_bucket << ": " << cpp_strerror(-ret) << dendl;
5037 return ret;
5038 }
5039
5040 psource_bucket_info = &source_bucket_info;
5041 }
5042
5043
5044 RGWDataSyncEnv env;
5045 RGWSyncModuleInstanceRef module; // null sync module
5046 env.init(dpp, store->ctx(), store, store->svc(), store->svc()->rados->get_async_processor(),
5047 nullptr, nullptr, nullptr, module, nullptr);
5048
5049 RGWDataSyncCtx sc;
5050 sc.init(&env, nullptr, *pipe.source.zone);
5051
5052 RGWCoroutinesManager crs(store->ctx(), store->getRados()->get_cr_registry());
5053 return crs.run(dpp, new RGWCollectBucketSyncStatusCR(store, &sc,
5054 *psource_bucket_info,
5055 dest_bucket_info,
5056 status));
5057 }
5058
5059 void rgw_data_sync_info::generate_test_instances(list<rgw_data_sync_info*>& o)
5060 {
5061 auto info = new rgw_data_sync_info;
5062 info->state = rgw_data_sync_info::StateBuildingFullSyncMaps;
5063 info->num_shards = 8;
5064 o.push_back(info);
5065 o.push_back(new rgw_data_sync_info);
5066 }
5067
5068 void rgw_data_sync_marker::generate_test_instances(list<rgw_data_sync_marker*>& o)
5069 {
5070 auto marker = new rgw_data_sync_marker;
5071 marker->state = rgw_data_sync_marker::IncrementalSync;
5072 marker->marker = "01234";
5073 marker->pos = 5;
5074 o.push_back(marker);
5075 o.push_back(new rgw_data_sync_marker);
5076 }
5077
5078 void rgw_data_sync_status::generate_test_instances(list<rgw_data_sync_status*>& o)
5079 {
5080 o.push_back(new rgw_data_sync_status);
5081 }
5082
5083 void rgw_bucket_shard_full_sync_marker::decode_json(JSONObj *obj)
5084 {
5085 JSONDecoder::decode_json("position", position, obj);
5086 JSONDecoder::decode_json("count", count, obj);
5087 }
5088
5089 void rgw_bucket_shard_full_sync_marker::dump(Formatter *f) const
5090 {
5091 encode_json("position", position, f);
5092 encode_json("count", count, f);
5093 }
5094
5095 void rgw_bucket_shard_inc_sync_marker::decode_json(JSONObj *obj)
5096 {
5097 JSONDecoder::decode_json("position", position, obj);
5098 JSONDecoder::decode_json("timestamp", timestamp, obj);
5099 }
5100
5101 void rgw_bucket_shard_inc_sync_marker::dump(Formatter *f) const
5102 {
5103 encode_json("position", position, f);
5104 encode_json("timestamp", timestamp, f);
5105 }
5106
5107 void rgw_bucket_shard_sync_info::decode_json(JSONObj *obj)
5108 {
5109 std::string s;
5110 JSONDecoder::decode_json("status", s, obj);
5111 if (s == "full-sync") {
5112 state = StateFullSync;
5113 } else if (s == "incremental-sync") {
5114 state = StateIncrementalSync;
5115 } else if (s == "stopped") {
5116 state = StateStopped;
5117 } else {
5118 state = StateInit;
5119 }
5120 JSONDecoder::decode_json("full_marker", full_marker, obj);
5121 JSONDecoder::decode_json("inc_marker", inc_marker, obj);
5122 }
5123
5124 void rgw_bucket_shard_sync_info::dump(Formatter *f) const
5125 {
5126 const char *s{nullptr};
5127 switch ((SyncState)state) {
5128 case StateInit:
5129 s = "init";
5130 break;
5131 case StateFullSync:
5132 s = "full-sync";
5133 break;
5134 case StateIncrementalSync:
5135 s = "incremental-sync";
5136 break;
5137 case StateStopped:
5138 s = "stopped";
5139 break;
5140 default:
5141 s = "unknown";
5142 break;
5143 }
5144 encode_json("status", s, f);
5145 encode_json("full_marker", full_marker, f);
5146 encode_json("inc_marker", inc_marker, f);
5147 }
5148