]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
CommitLineData
91327a77
AA
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include <boost/utility/string_ref.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/errno.h"
12
13#include "rgw_common.h"
14#include "rgw_rados.h"
11fdf7f2 15#include "rgw_zone.h"
7c673cae
FG
16#include "rgw_sync.h"
17#include "rgw_data_sync.h"
18#include "rgw_rest_conn.h"
19#include "rgw_cr_rados.h"
20#include "rgw_cr_rest.h"
21#include "rgw_http_client.h"
22#include "rgw_bucket.h"
23#include "rgw_metadata.h"
81eedcae 24#include "rgw_sync_counters.h"
7c673cae 25#include "rgw_sync_module.h"
b32b8144 26#include "rgw_sync_log_trim.h"
7c673cae
FG
27
28#include "cls/lock/cls_lock_client.h"
29
11fdf7f2
TL
30#include "services/svc_zone.h"
31#include "services/svc_sync_modules.h"
32
33#include "include/random.h"
31f18b77
FG
34
35#include <boost/asio/yield.hpp>
36
7c673cae
FG
37#define dout_subsys ceph_subsys_rgw
38
39#undef dout_prefix
40#define dout_prefix (*_dout << "data sync: ")
41
42static string datalog_sync_status_oid_prefix = "datalog.sync-status";
43static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
44static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
45static string bucket_status_oid_prefix = "bucket.sync-status";
11fdf7f2 46static string object_status_oid_prefix = "bucket.sync-status";
7c673cae 47
7c673cae
FG
48
49void rgw_datalog_info::decode_json(JSONObj *obj) {
50 JSONDecoder::decode_json("num_objects", num_shards, obj);
51}
52
53void rgw_datalog_entry::decode_json(JSONObj *obj) {
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58}
59
60void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
61 JSONDecoder::decode_json("marker", marker, obj);
62 JSONDecoder::decode_json("truncated", truncated, obj);
63 JSONDecoder::decode_json("entries", entries, obj);
64};
65
66class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
67 static constexpr int MAX_CONCURRENT_SHARDS = 16;
68
69 RGWDataSyncEnv *env;
70 const int num_shards;
71 int shard_id{0};;
72
73 map<uint32_t, rgw_data_sync_marker>& markers;
74
75 public:
76 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
77 map<uint32_t, rgw_data_sync_marker>& markers)
78 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
79 env(env), num_shards(num_shards), markers(markers)
80 {}
81 bool spawn_next() override;
82};
83
84bool RGWReadDataSyncStatusMarkersCR::spawn_next()
85{
86 if (shard_id >= num_shards) {
87 return false;
88 }
89 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
90 spawn(new CR(env->async_rados, env->store->svc.sysobj,
91 rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
7c673cae
FG
92 &markers[shard_id]),
93 false);
94 shard_id++;
95 return true;
96}
97
28e407b8
AA
98class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
99 static constexpr int MAX_CONCURRENT_SHARDS = 16;
100
101 RGWDataSyncEnv *env;
102
103 uint64_t max_entries;
104 int num_shards;
11fdf7f2 105 int shard_id{0};
28e407b8
AA
106
107 string marker;
11fdf7f2 108 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
28e407b8
AA
109
110 public:
111 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
11fdf7f2 112 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
28e407b8 113 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
11fdf7f2 114 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
28e407b8
AA
115 {}
116 bool spawn_next() override;
117};
118
119bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
120{
11fdf7f2 121 if (shard_id >= num_shards)
28e407b8
AA
122 return false;
123
124 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
11fdf7f2
TL
125 auto& shard_keys = omapkeys[shard_id];
126 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
127 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
128 marker, max_entries, shard_keys), false);
28e407b8
AA
129
130 ++shard_id;
131 return true;
132}
133
7c673cae
FG
134class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
135 RGWDataSyncEnv *sync_env;
136 rgw_data_sync_status *sync_status;
137
138public:
139 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
140 rgw_data_sync_status *_status)
141 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
142 {}
143 int operate() override;
144};
145
146int RGWReadDataSyncStatusCoroutine::operate()
147{
148 reenter(this) {
149 // read sync info
150 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
151 yield {
152 bool empty_on_enoent = false; // fail on ENOENT
11fdf7f2
TL
153 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
154 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
155 &sync_status->sync_info, empty_on_enoent));
156 }
157 if (retcode < 0) {
158 ldout(sync_env->cct, 4) << "failed to read sync status info with "
159 << cpp_strerror(retcode) << dendl;
160 return set_cr_error(retcode);
161 }
162 // read shard markers
163 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
164 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
165 sync_status->sync_markers));
166 if (retcode < 0) {
167 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
168 << cpp_strerror(retcode) << dendl;
169 return set_cr_error(retcode);
170 }
171 return set_cr_done();
172 }
173 return 0;
174}
175
176class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
177 RGWDataSyncEnv *sync_env;
178
179 RGWRESTReadResource *http_op;
180
181 int shard_id;
182 RGWDataChangesLogInfo *shard_info;
183
184public:
185 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
186 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
187 sync_env(_sync_env),
188 http_op(NULL),
189 shard_id(_shard_id),
190 shard_info(_shard_info) {
191 }
192
193 ~RGWReadRemoteDataLogShardInfoCR() override {
194 if (http_op) {
195 http_op->put();
196 }
197 }
198
199 int operate() override {
200 reenter(this) {
201 yield {
202 char buf[16];
203 snprintf(buf, sizeof(buf), "%d", shard_id);
204 rgw_http_param_pair pairs[] = { { "type" , "data" },
205 { "id", buf },
206 { "info" , NULL },
207 { NULL, NULL } };
208
209 string p = "/admin/log/";
210
211 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
212
11fdf7f2 213 init_new_io(http_op);
7c673cae
FG
214
215 int ret = http_op->aio_read();
216 if (ret < 0) {
217 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
218 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
219 return set_cr_error(ret);
220 }
221
222 return io_block(0);
223 }
224 yield {
225 int ret = http_op->wait(shard_info);
226 if (ret < 0) {
227 return set_cr_error(ret);
228 }
229 return set_cr_done();
230 }
231 }
232 return 0;
233 }
234};
235
236struct read_remote_data_log_response {
237 string marker;
238 bool truncated;
239 list<rgw_data_change_log_entry> entries;
240
241 read_remote_data_log_response() : truncated(false) {}
242
243 void decode_json(JSONObj *obj) {
244 JSONDecoder::decode_json("marker", marker, obj);
245 JSONDecoder::decode_json("truncated", truncated, obj);
246 JSONDecoder::decode_json("entries", entries, obj);
247 };
248};
249
250class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
251 RGWDataSyncEnv *sync_env;
252
a8e16298 253 RGWRESTReadResource *http_op = nullptr;
7c673cae
FG
254
255 int shard_id;
a8e16298
TL
256 const std::string& marker;
257 string *pnext_marker;
7c673cae
FG
258 list<rgw_data_change_log_entry> *entries;
259 bool *truncated;
260
261 read_remote_data_log_response response;
81eedcae 262 std::optional<PerfGuard> timer;
7c673cae
FG
263
264public:
a8e16298
TL
265 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
266 const std::string& marker, string *pnext_marker,
267 list<rgw_data_change_log_entry> *_entries,
268 bool *_truncated)
269 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
270 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
271 entries(_entries), truncated(_truncated) {
7c673cae
FG
272 }
273 ~RGWReadRemoteDataLogShardCR() override {
274 if (http_op) {
275 http_op->put();
276 }
277 }
278
279 int operate() override {
280 reenter(this) {
281 yield {
282 char buf[16];
283 snprintf(buf, sizeof(buf), "%d", shard_id);
284 rgw_http_param_pair pairs[] = { { "type" , "data" },
285 { "id", buf },
a8e16298 286 { "marker", marker.c_str() },
7c673cae
FG
287 { "extra-info", "true" },
288 { NULL, NULL } };
289
290 string p = "/admin/log/";
291
292 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
293
11fdf7f2 294 init_new_io(http_op);
7c673cae 295
81eedcae
TL
296 if (sync_env->counters) {
297 timer.emplace(sync_env->counters, sync_counters::l_poll);
298 }
7c673cae
FG
299 int ret = http_op->aio_read();
300 if (ret < 0) {
301 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
302 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
81eedcae
TL
303 if (sync_env->counters) {
304 sync_env->counters->inc(sync_counters::l_poll_err);
305 }
7c673cae
FG
306 return set_cr_error(ret);
307 }
308
309 return io_block(0);
310 }
311 yield {
81eedcae 312 timer.reset();
7c673cae
FG
313 int ret = http_op->wait(&response);
314 if (ret < 0) {
81eedcae
TL
315 if (sync_env->counters && ret != -ENOENT) {
316 sync_env->counters->inc(sync_counters::l_poll_err);
317 }
7c673cae
FG
318 return set_cr_error(ret);
319 }
320 entries->clear();
321 entries->swap(response.entries);
a8e16298 322 *pnext_marker = response.marker;
7c673cae
FG
323 *truncated = response.truncated;
324 return set_cr_done();
325 }
326 }
327 return 0;
328 }
329};
330
331class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
332 RGWDataSyncEnv *sync_env;
333
334 int num_shards;
335 map<int, RGWDataChangesLogInfo> *datalog_info;
336
337 int shard_id;
338#define READ_DATALOG_MAX_CONCURRENT 10
339
340public:
341 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
342 int _num_shards,
343 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
344 sync_env(_sync_env), num_shards(_num_shards),
345 datalog_info(_datalog_info), shard_id(0) {}
346 bool spawn_next() override;
347};
348
349bool RGWReadRemoteDataLogInfoCR::spawn_next() {
350 if (shard_id >= num_shards) {
351 return false;
352 }
353 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
354 shard_id++;
355 return true;
356}
357
358class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
359 RGWDataSyncEnv *sync_env;
360 RGWRESTReadResource *http_op;
361
362 int shard_id;
363 string marker;
364 uint32_t max_entries;
365 rgw_datalog_shard_data *result;
366
367public:
368 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
369 const string& _marker, uint32_t _max_entries,
370 rgw_datalog_shard_data *_result)
371 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
372 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
373
374 int send_request() override {
375 RGWRESTConn *conn = sync_env->conn;
376 RGWRados *store = sync_env->store;
377
378 char buf[32];
379 snprintf(buf, sizeof(buf), "%d", shard_id);
380
381 char max_entries_buf[32];
382 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
383
384 const char *marker_key = (marker.empty() ? "" : "marker");
385
386 rgw_http_param_pair pairs[] = { { "type", "data" },
387 { "id", buf },
388 { "max-entries", max_entries_buf },
389 { marker_key, marker.c_str() },
390 { NULL, NULL } };
391
392 string p = "/admin/log/";
393
394 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 395 init_new_io(http_op);
7c673cae
FG
396
397 int ret = http_op->aio_read();
398 if (ret < 0) {
399 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
400 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
401 http_op->put();
402 return ret;
403 }
404
405 return 0;
406 }
407
408 int request_complete() override {
409 int ret = http_op->wait(result);
410 http_op->put();
411 if (ret < 0 && ret != -ENOENT) {
412 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
413 return ret;
414 }
415 return 0;
416 }
417};
418
419class RGWListRemoteDataLogCR : public RGWShardCollectCR {
420 RGWDataSyncEnv *sync_env;
421
422 map<int, string> shards;
423 int max_entries_per_shard;
424 map<int, rgw_datalog_shard_data> *result;
425
426 map<int, string>::iterator iter;
427#define READ_DATALOG_MAX_CONCURRENT 10
428
429public:
430 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
431 map<int, string>& _shards,
432 int _max_entries_per_shard,
433 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
434 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
435 result(_result) {
436 shards.swap(_shards);
437 iter = shards.begin();
438 }
439 bool spawn_next() override;
440};
441
442bool RGWListRemoteDataLogCR::spawn_next() {
443 if (iter == shards.end()) {
444 return false;
445 }
446
447 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
448 ++iter;
449 return true;
450}
451
452class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
453 static constexpr uint32_t lock_duration = 30;
454 RGWDataSyncEnv *sync_env;
455 RGWRados *store;
456 const rgw_pool& pool;
457 const uint32_t num_shards;
458
459 string sync_status_oid;
460
461 string lock_name;
462 string cookie;
463 rgw_data_sync_status *status;
464 map<int, RGWDataChangesLogInfo> shards_info;
11fdf7f2
TL
465
466 RGWSyncTraceNodeRef tn;
7c673cae
FG
467public:
468 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
31f18b77 469 uint64_t instance_id,
11fdf7f2 470 RGWSyncTraceNodeRef& _tn_parent,
7c673cae
FG
471 rgw_data_sync_status *status)
472 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
11fdf7f2
TL
473 pool(store->svc.zone->get_zone_params().log_pool),
474 num_shards(num_shards), status(status),
475 tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
7c673cae
FG
476 lock_name = "sync_lock";
477
31f18b77
FG
478 status->sync_info.instance_id = instance_id;
479
7c673cae
FG
480#define COOKIE_LEN 16
481 char buf[COOKIE_LEN + 1];
482
483 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
484 cookie = buf;
485
486 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
11fdf7f2 487
7c673cae
FG
488 }
489
490 int operate() override {
491 int ret;
492 reenter(this) {
493 using LockCR = RGWSimpleRadosLockCR;
494 yield call(new LockCR(sync_env->async_rados, store,
495 rgw_raw_obj{pool, sync_status_oid},
496 lock_name, cookie, lock_duration));
497 if (retcode < 0) {
11fdf7f2 498 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
499 return set_cr_error(retcode);
500 }
501 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
11fdf7f2 502 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
503 rgw_raw_obj{pool, sync_status_oid},
504 status->sync_info));
505 if (retcode < 0) {
11fdf7f2 506 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
507 return set_cr_error(retcode);
508 }
509
510 /* take lock again, we just recreated the object */
511 yield call(new LockCR(sync_env->async_rados, store,
512 rgw_raw_obj{pool, sync_status_oid},
513 lock_name, cookie, lock_duration));
514 if (retcode < 0) {
11fdf7f2 515 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
516 return set_cr_error(retcode);
517 }
518
11fdf7f2
TL
519 tn->log(10, "took lease");
520
7c673cae
FG
521 /* fetch current position in logs */
522 yield {
11fdf7f2 523 RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
7c673cae 524 if (!conn) {
11fdf7f2 525 tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
7c673cae
FG
526 return set_cr_error(-EIO);
527 }
528 for (uint32_t i = 0; i < num_shards; i++) {
529 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
530 }
531 }
532 while (collect(&ret, NULL)) {
533 if (ret < 0) {
11fdf7f2 534 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
7c673cae
FG
535 return set_state(RGWCoroutine_Error);
536 }
537 yield;
538 }
539 yield {
540 for (uint32_t i = 0; i < num_shards; i++) {
541 RGWDataChangesLogInfo& info = shards_info[i];
542 auto& marker = status->sync_markers[i];
543 marker.next_step_marker = info.marker;
544 marker.timestamp = info.last_update;
545 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
546 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
11fdf7f2 547 spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
548 rgw_raw_obj{pool, oid}, marker), true);
549 }
550 }
551 while (collect(&ret, NULL)) {
552 if (ret < 0) {
11fdf7f2 553 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
7c673cae
FG
554 return set_state(RGWCoroutine_Error);
555 }
556 yield;
557 }
558
559 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
11fdf7f2 560 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
561 rgw_raw_obj{pool, sync_status_oid},
562 status->sync_info));
563 if (retcode < 0) {
11fdf7f2 564 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
565 return set_cr_error(retcode);
566 }
567 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
568 rgw_raw_obj{pool, sync_status_oid},
569 lock_name, cookie));
570 return set_cr_done();
571 }
572 return 0;
573 }
574};
575
576int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
577{
578 rgw_http_param_pair pairs[] = { { "type", "data" },
579 { NULL, NULL } };
580
581 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
582 if (ret < 0) {
11fdf7f2 583 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
7c673cae
FG
584 return ret;
585 }
586
11fdf7f2 587 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
588
589 return 0;
590}
591
592int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
593{
594 rgw_datalog_info log_info;
595 int ret = read_log_info(&log_info);
596 if (ret < 0) {
597 return ret;
598 }
599
600 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
601}
602
603int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
604{
7c673cae
FG
605 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
606}
607
11fdf7f2 608int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
81eedcae
TL
609 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
610 PerfCounters* counters)
7c673cae 611{
11fdf7f2 612 sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
81eedcae 613 _sync_tracer, _source_zone, _sync_module, counters);
7c673cae
FG
614
615 if (initialized) {
616 return 0;
617 }
618
11fdf7f2 619 int ret = http_manager.start();
7c673cae 620 if (ret < 0) {
11fdf7f2 621 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
622 return ret;
623 }
624
11fdf7f2
TL
625 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
626
7c673cae
FG
627 initialized = true;
628
629 return 0;
630}
631
632void RGWRemoteDataLog::finish()
633{
634 stop();
635}
636
7c673cae
FG
637int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
638{
639 // cannot run concurrently with run_sync(), so run in a separate manager
640 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
641 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 642 int ret = http_manager.start();
7c673cae 643 if (ret < 0) {
11fdf7f2 644 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
645 return ret;
646 }
647 RGWDataSyncEnv sync_env_local = sync_env;
648 sync_env_local.http_manager = &http_manager;
649 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
650 http_manager.stop();
651 return ret;
652}
653
28e407b8
AA
654int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
655{
656 // cannot run concurrently with run_sync(), so run in a separate manager
657 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
658 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 659 int ret = http_manager.start();
28e407b8 660 if (ret < 0) {
11fdf7f2 661 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
662 return ret;
663 }
664 RGWDataSyncEnv sync_env_local = sync_env;
665 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
666 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
667 omapkeys.resize(num_shards);
28e407b8 668 uint64_t max_entries{1};
11fdf7f2 669 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
28e407b8
AA
670 http_manager.stop();
671
672 if (ret == 0) {
11fdf7f2
TL
673 for (int i = 0; i < num_shards; i++) {
674 if (omapkeys[i]->entries.size() != 0) {
675 recovering_shards.insert(i);
28e407b8
AA
676 }
677 }
678 }
679
680 return ret;
681}
682
7c673cae
FG
683int RGWRemoteDataLog::init_sync_status(int num_shards)
684{
685 rgw_data_sync_status sync_status;
b32b8144
FG
686 sync_status.sync_info.num_shards = num_shards;
687
7c673cae
FG
688 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
689 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 690 int ret = http_manager.start();
7c673cae 691 if (ret < 0) {
11fdf7f2 692 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
693 return ret;
694 }
695 RGWDataSyncEnv sync_env_local = sync_env;
696 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
697 auto instance_id = ceph::util::generate_random_number<uint64_t>();
698 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
7c673cae
FG
699 http_manager.stop();
700 return ret;
701}
702
703static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
704{
705 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
706 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
707 return string(buf);
708}
709
494da23a
TL
710struct read_metadata_list {
711 string marker;
712 bool truncated;
713 list<string> keys;
714 int count;
715
716 read_metadata_list() : truncated(false), count(0) {}
717
718 void decode_json(JSONObj *obj) {
719 JSONDecoder::decode_json("marker", marker, obj);
720 JSONDecoder::decode_json("truncated", truncated, obj);
721 JSONDecoder::decode_json("keys", keys, obj);
722 JSONDecoder::decode_json("count", count, obj);
723 }
724};
725
7c673cae
FG
726struct bucket_instance_meta_info {
727 string key;
728 obj_version ver;
729 utime_t mtime;
730 RGWBucketInstanceMetadataObject data;
731
732 bucket_instance_meta_info() {}
733
734 void decode_json(JSONObj *obj) {
735 JSONDecoder::decode_json("key", key, obj);
736 JSONDecoder::decode_json("ver", ver, obj);
737 JSONDecoder::decode_json("mtime", mtime, obj);
738 JSONDecoder::decode_json("data", data, obj);
739 }
740};
741
742class RGWListBucketIndexesCR : public RGWCoroutine {
743 RGWDataSyncEnv *sync_env;
744
745 RGWRados *store;
746
747 rgw_data_sync_status *sync_status;
748 int num_shards;
749
750 int req_ret;
751 int ret;
752
7c673cae
FG
753 list<string>::iterator iter;
754
755 RGWShardedOmapCRManager *entries_index;
756
757 string oid_prefix;
758
759 string path;
760 bucket_instance_meta_info meta_info;
761 string key;
762 string s;
763 int i;
764
765 bool failed;
494da23a
TL
766 bool truncated;
767 read_metadata_list result;
7c673cae
FG
768
769public:
770 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
771 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
772 store(sync_env->store), sync_status(_sync_status),
494da23a 773 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
7c673cae
FG
774 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
775 path = "/admin/metadata/bucket.instance";
776 num_shards = sync_status->sync_info.num_shards;
777 }
778 ~RGWListBucketIndexesCR() override {
779 delete entries_index;
780 }
781
782 int operate() override {
783 reenter(this) {
3efd9988 784 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
11fdf7f2 785 store->svc.zone->get_zone_params().log_pool,
3efd9988
FG
786 oid_prefix);
787 yield; // yield so OmapAppendCRs can start
7c673cae 788
494da23a 789 do {
7c673cae 790 yield {
494da23a 791 string entrypoint = string("/admin/metadata/bucket.instance");
7c673cae 792
494da23a
TL
793 rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
794 {"marker", result.marker.c_str()},
795 {NULL, NULL}};
796
797 call(new RGWReadRESTResourceCR<read_metadata_list>(store->ctx(), sync_env->conn, sync_env->http_manager,
798 entrypoint, pairs, &result));
799 }
800 if (retcode < 0) {
801 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
802 return set_cr_error(retcode);
7c673cae
FG
803 }
804
494da23a
TL
805 for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
806 ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
807 key = *iter;
808
809 yield {
810 rgw_http_param_pair pairs[] = {{"key", key.c_str()},
811 {NULL, NULL}};
812
813 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
814 }
815
816 num_shards = meta_info.data.get_bucket_info().num_shards;
817 if (num_shards > 0) {
818 for (i = 0; i < num_shards; i++) {
819 char buf[16];
820 snprintf(buf, sizeof(buf), ":%d", i);
821 s = key + buf;
822 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
823 }
824 } else {
825 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
7c673cae 826 }
7c673cae 827 }
494da23a
TL
828 truncated = result.truncated;
829 } while (truncated);
830
7c673cae
FG
831 yield {
832 if (!entries_index->finish()) {
833 failed = true;
834 }
835 }
836 if (!failed) {
837 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
838 int shard_id = (int)iter->first;
839 rgw_data_sync_marker& marker = iter->second;
840 marker.total_entries = entries_index->get_total_entries(shard_id);
11fdf7f2
TL
841 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
842 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
494da23a
TL
843 marker),
844 true);
7c673cae
FG
845 }
846 } else {
494da23a
TL
847 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
848 EIO, string("failed to build bucket instances map")));
7c673cae
FG
849 }
850 while (collect(&ret, NULL)) {
494da23a 851 if (ret < 0) {
7c673cae
FG
852 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
853 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
494da23a
TL
854 req_ret = ret;
855 }
7c673cae
FG
856 yield;
857 }
494da23a 858
7c673cae
FG
859 drain_all();
860 if (req_ret < 0) {
861 yield return set_cr_error(req_ret);
862 }
494da23a 863 yield return set_cr_done();
7c673cae
FG
864 }
865 return 0;
866 }
867};
868
869#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
870
871class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
872 RGWDataSyncEnv *sync_env;
873
874 string marker_oid;
875 rgw_data_sync_marker sync_marker;
876
877 map<string, string> key_to_marker;
878 map<string, string> marker_to_key;
879
880 void handle_finish(const string& marker) override {
881 map<string, string>::iterator iter = marker_to_key.find(marker);
882 if (iter == marker_to_key.end()) {
883 return;
884 }
885 key_to_marker.erase(iter->second);
886 reset_need_retry(iter->second);
887 marker_to_key.erase(iter);
888 }
889
11fdf7f2
TL
890 RGWSyncTraceNodeRef tn;
891
7c673cae
FG
892public:
893 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
894 const string& _marker_oid,
11fdf7f2
TL
895 const rgw_data_sync_marker& _marker,
896 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
897 sync_env(_sync_env),
898 marker_oid(_marker_oid),
11fdf7f2
TL
899 sync_marker(_marker),
900 tn(_tn) {}
7c673cae
FG
901
902 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
903 sync_marker.marker = new_marker;
904 sync_marker.pos = index_pos;
92f5a8d4 905 sync_marker.timestamp = timestamp;
7c673cae 906
11fdf7f2 907 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae
FG
908 RGWRados *store = sync_env->store;
909
11fdf7f2
TL
910 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
911 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
912 sync_marker);
913 }
914
915 /*
916 * create index from key -> marker, and from marker -> key
917 * this is useful so that we can insure that we only have one
918 * entry for any key that is used. This is needed when doing
919 * incremenatl sync of data, and we don't want to run multiple
920 * concurrent sync operations for the same bucket shard
921 */
922 bool index_key_to_marker(const string& key, const string& marker) {
923 if (key_to_marker.find(key) != key_to_marker.end()) {
924 set_need_retry(key);
925 return false;
926 }
927 key_to_marker[key] = marker;
928 marker_to_key[marker] = key;
929 return true;
930 }
91327a77 931
11fdf7f2 932 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
933 return new RGWLastCallerWinsCR(sync_env->cct);
934 }
7c673cae
FG
935};
936
937// ostream wrappers to print buckets without copying strings
938struct bucket_str {
939 const rgw_bucket& b;
11fdf7f2 940 explicit bucket_str(const rgw_bucket& b) : b(b) {}
7c673cae
FG
941};
942std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
943 auto& b = rhs.b;
944 if (!b.tenant.empty()) {
945 out << b.tenant << '/';
946 }
947 out << b.name;
948 if (!b.bucket_id.empty()) {
949 out << ':' << b.bucket_id;
950 }
951 return out;
952}
953
11fdf7f2
TL
954struct bucket_str_noinstance {
955 const rgw_bucket& b;
956 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
957};
958std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
959 auto& b = rhs.b;
960 if (!b.tenant.empty()) {
961 out << b.tenant << '/';
962 }
963 out << b.name;
964 return out;
965}
966
7c673cae
FG
967struct bucket_shard_str {
968 const rgw_bucket_shard& bs;
11fdf7f2 969 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
7c673cae
FG
970};
971std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
972 auto& bs = rhs.bs;
973 out << bucket_str{bs.bucket};
974 if (bs.shard_id >= 0) {
975 out << ':' << bs.shard_id;
976 }
977 return out;
978}
979
980class RGWRunBucketSyncCoroutine : public RGWCoroutine {
981 RGWDataSyncEnv *sync_env;
982 rgw_bucket_shard bs;
983 RGWBucketInfo bucket_info;
984 rgw_bucket_shard_sync_info sync_status;
985 RGWMetaSyncEnv meta_sync_env;
986
7c673cae
FG
987 const std::string status_oid;
988
989 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
990 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
991
11fdf7f2
TL
992 RGWSyncTraceNodeRef tn;
993
7c673cae 994public:
11fdf7f2 995 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
7c673cae 996 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
11fdf7f2
TL
997 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
998 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
999 SSTR(bucket_shard_str{bs}))) {
7c673cae
FG
1000 }
1001 ~RGWRunBucketSyncCoroutine() override {
1002 if (lease_cr) {
1003 lease_cr->abort();
1004 }
1005 }
1006
1007 int operate() override;
1008};
1009
1010class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1011 RGWDataSyncEnv *sync_env;
1012
1013 string raw_key;
1014 string entry_marker;
1015
1016 rgw_bucket_shard bs;
1017
1018 int sync_status;
1019
1020 bufferlist md_bl;
1021
1022 RGWDataSyncShardMarkerTrack *marker_tracker;
1023
1024 boost::intrusive_ptr<RGWOmapAppend> error_repo;
1025 bool remove_from_repo;
1026
1027 set<string> keys;
1028
11fdf7f2 1029 RGWSyncTraceNodeRef tn;
7c673cae
FG
1030public:
1031 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1032 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
11fdf7f2 1033 RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1034 sync_env(_sync_env),
1035 raw_key(_raw_key), entry_marker(_entry_marker),
1036 sync_status(0),
1037 marker_tracker(_marker_tracker),
1038 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1039 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
11fdf7f2 1040 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
7c673cae
FG
1041 }
1042
1043 int operate() override {
1044 reenter(this) {
1045 do {
1046 yield {
1047 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1048 &bs.bucket, &bs.shard_id);
1049 if (ret < 0) {
1050 return set_cr_error(-EIO);
1051 }
1052 if (marker_tracker) {
1053 marker_tracker->reset_need_retry(raw_key);
1054 }
11fdf7f2
TL
1055 tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
1056 call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
7c673cae
FG
1057 }
1058 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1059
1060 sync_status = retcode;
1061
1062 if (sync_status == -ENOENT) {
1063 // this was added when 'tenant/' was added to datalog entries, because
1064 // preexisting tenant buckets could never sync and would stay in the
1065 // error_repo forever
11fdf7f2 1066 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
7c673cae
FG
1067 sync_status = 0;
1068 }
1069
1070 if (sync_status < 0) {
b32b8144
FG
1071 // write actual sync failures for 'radosgw-admin sync error list'
1072 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1073 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1074 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1075 if (retcode < 0) {
11fdf7f2 1076 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
b32b8144 1077 }
7c673cae
FG
1078 }
1079 if (error_repo && !error_repo->append(raw_key)) {
11fdf7f2 1080 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
7c673cae
FG
1081 }
1082 } else if (error_repo && remove_from_repo) {
1083 keys = {raw_key};
1084 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1085 if (retcode < 0) {
11fdf7f2
TL
1086 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1087 << error_repo->get_obj() << " retcode=" << retcode));
7c673cae
FG
1088 }
1089 }
1090 /* FIXME: what do do in case of error */
1091 if (marker_tracker && !entry_marker.empty()) {
1092 /* update marker */
1093 yield call(marker_tracker->finish(entry_marker));
1094 }
1095 if (sync_status == 0) {
1096 sync_status = retcode;
1097 }
1098 if (sync_status < 0) {
1099 return set_cr_error(sync_status);
1100 }
1101 return set_cr_done();
1102 }
1103 return 0;
1104 }
1105};
1106
1107#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1108#define DATA_SYNC_MAX_ERR_ENTRIES 10
1109
1110class RGWDataSyncShardCR : public RGWCoroutine {
1111 RGWDataSyncEnv *sync_env;
1112
1113 rgw_pool pool;
1114
1115 uint32_t shard_id;
1116 rgw_data_sync_marker sync_marker;
1117
11fdf7f2 1118 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1119 std::set<std::string> entries;
1120 std::set<std::string>::iterator iter;
7c673cae
FG
1121
1122 string oid;
1123
1124 RGWDataSyncShardMarkerTrack *marker_tracker;
1125
a8e16298 1126 std::string next_marker;
7c673cae
FG
1127 list<rgw_data_change_log_entry> log_entries;
1128 list<rgw_data_change_log_entry>::iterator log_iter;
1129 bool truncated;
1130
7c673cae
FG
1131 Mutex inc_lock;
1132 Cond inc_cond;
1133
1134 boost::asio::coroutine incremental_cr;
1135 boost::asio::coroutine full_cr;
1136
1137
1138 set<string> modified_shards;
1139 set<string> current_modified;
1140
1141 set<string>::iterator modified_iter;
1142
1143 int total_entries;
1144
1145 int spawn_window;
1146
1147 bool *reset_backoff;
1148
1149 set<string> spawned_keys;
1150
31f18b77
FG
1151 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1152 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
1153 string status_oid;
1154
1155
1156 string error_oid;
1157 RGWOmapAppend *error_repo;
28e407b8 1158 std::set<std::string> error_entries;
7c673cae
FG
1159 string error_marker;
1160 int max_error_entries;
1161
91327a77 1162 ceph::coarse_real_time error_retry_time;
7c673cae
FG
1163
1164#define RETRY_BACKOFF_SECS_MIN 60
1165#define RETRY_BACKOFF_SECS_DEFAULT 60
1166#define RETRY_BACKOFF_SECS_MAX 600
1167 uint32_t retry_backoff_secs;
1168
11fdf7f2 1169 RGWSyncTraceNodeRef tn;
7c673cae
FG
1170public:
1171 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1172 rgw_pool& _pool,
11fdf7f2
TL
1173 uint32_t _shard_id, const rgw_data_sync_marker& _marker,
1174 RGWSyncTraceNodeRef& _tn,
1175 bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1176 sync_env(_sync_env),
1177 pool(_pool),
1178 shard_id(_shard_id),
1179 sync_marker(_marker),
91327a77 1180 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
7c673cae
FG
1181 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1182 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
11fdf7f2 1183 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
7c673cae
FG
1184 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1185 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1186 error_oid = status_oid + ".retry";
7c673cae
FG
1187 }
1188
1189 ~RGWDataSyncShardCR() override {
1190 delete marker_tracker;
1191 if (lease_cr) {
1192 lease_cr->abort();
7c673cae
FG
1193 }
1194 if (error_repo) {
1195 error_repo->put();
1196 }
1197 }
1198
1199 void append_modified_shards(set<string>& keys) {
1200 Mutex::Locker l(inc_lock);
1201 modified_shards.insert(keys.begin(), keys.end());
1202 }
1203
1204 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1205 delete marker_tracker;
1206 marker_tracker = mt;
1207 }
1208
1209 int operate() override {
1210 int r;
1211 while (true) {
1212 switch (sync_marker.state) {
1213 case rgw_data_sync_marker::FullSync:
1214 r = full_sync();
1215 if (r < 0) {
11fdf7f2
TL
1216 if (r != -EBUSY) {
1217 tn->log(10, SSTR("full sync failed (r=" << r << ")"));
1218 }
7c673cae
FG
1219 return set_cr_error(r);
1220 }
1221 return 0;
1222 case rgw_data_sync_marker::IncrementalSync:
1223 r = incremental_sync();
1224 if (r < 0) {
11fdf7f2
TL
1225 if (r != -EBUSY) {
1226 tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
1227 }
7c673cae
FG
1228 return set_cr_error(r);
1229 }
1230 return 0;
1231 default:
1232 return set_cr_error(-EIO);
1233 }
1234 }
1235 return 0;
1236 }
1237
1238 void init_lease_cr() {
1239 set_status("acquiring sync lock");
1240 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1241 string lock_name = "sync_lock";
1242 if (lease_cr) {
1243 lease_cr->abort();
7c673cae
FG
1244 }
1245 RGWRados *store = sync_env->store;
31f18b77 1246 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 1247 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
1248 lock_name, lock_duration, this));
1249 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1250 }
1251
1252 int full_sync() {
1253#define OMAP_GET_MAX_ENTRIES 100
1254 int max_entries = OMAP_GET_MAX_ENTRIES;
1255 reenter(&full_cr) {
11fdf7f2 1256 tn->log(10, "start full sync");
7c673cae
FG
1257 yield init_lease_cr();
1258 while (!lease_cr->is_locked()) {
1259 if (lease_cr->is_done()) {
11fdf7f2 1260 tn->log(5, "failed to take lease");
7c673cae 1261 set_status("lease lock failed, early abort");
a8e16298 1262 drain_all();
7c673cae
FG
1263 return set_cr_error(lease_cr->get_ret_status());
1264 }
1265 set_sleeping(true);
1266 yield;
1267 }
11fdf7f2 1268 tn->log(10, "took lease");
7c673cae 1269 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
11fdf7f2 1270 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae
FG
1271 total_entries = sync_marker.pos;
1272 do {
91327a77
AA
1273 if (!lease_cr->is_locked()) {
1274 stop_spawned_services();
1275 drain_all();
1276 return set_cr_error(-ECANCELED);
1277 }
11fdf7f2
TL
1278 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1279 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1280 sync_marker.marker, max_entries, omapkeys));
7c673cae 1281 if (retcode < 0) {
11fdf7f2 1282 tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
7c673cae
FG
1283 lease_cr->go_down();
1284 drain_all();
1285 return set_cr_error(retcode);
1286 }
11fdf7f2
TL
1287 entries = std::move(omapkeys->entries);
1288 if (entries.size() > 0) {
1289 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1290 }
1291 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
7c673cae
FG
1292 iter = entries.begin();
1293 for (; iter != entries.end(); ++iter) {
11fdf7f2 1294 tn->log(20, SSTR("full sync: " << *iter));
7c673cae 1295 total_entries++;
28e407b8 1296 if (!marker_tracker->start(*iter, total_entries, real_time())) {
11fdf7f2 1297 tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
7c673cae
FG
1298 } else {
1299 // fetch remote and write locally
11fdf7f2 1300 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
7c673cae 1301 }
28e407b8 1302 sync_marker.marker = *iter;
91327a77
AA
1303
1304 while ((int)num_spawned() > spawn_window) {
1305 set_status() << "num_spawned() > spawn_window";
1306 yield wait_for_child();
1307 int ret;
1308 while (collect(&ret, lease_stack.get())) {
1309 if (ret < 0) {
11fdf7f2 1310 tn->log(10, "a sync operation returned error");
91327a77
AA
1311 }
1312 }
1313 }
7c673cae 1314 }
11fdf7f2
TL
1315 } while (omapkeys->more);
1316 omapkeys.reset();
7c673cae 1317
91327a77 1318 drain_all_but_stack(lease_stack.get());
7c673cae 1319
11fdf7f2
TL
1320 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1321
7c673cae
FG
1322 yield {
1323 /* update marker to reflect we're done with full sync */
1324 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1325 sync_marker.marker = sync_marker.next_step_marker;
1326 sync_marker.next_step_marker.clear();
1327 RGWRados *store = sync_env->store;
11fdf7f2
TL
1328 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1329 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
1330 sync_marker));
1331 }
1332 if (retcode < 0) {
11fdf7f2 1333 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
7c673cae 1334 lease_cr->go_down();
91327a77 1335 drain_all();
7c673cae
FG
1336 return set_cr_error(retcode);
1337 }
91327a77 1338 // keep lease and transition to incremental_sync()
7c673cae
FG
1339 }
1340 return 0;
1341 }
1342
1343 int incremental_sync() {
1344 reenter(&incremental_cr) {
11fdf7f2 1345 tn->log(10, "start incremental sync");
91327a77 1346 if (lease_cr) {
11fdf7f2 1347 tn->log(10, "lease already held from full sync");
91327a77
AA
1348 } else {
1349 yield init_lease_cr();
1350 while (!lease_cr->is_locked()) {
1351 if (lease_cr->is_done()) {
11fdf7f2 1352 tn->log(5, "failed to take lease");
91327a77 1353 set_status("lease lock failed, early abort");
a8e16298 1354 drain_all();
91327a77
AA
1355 return set_cr_error(lease_cr->get_ret_status());
1356 }
1357 set_sleeping(true);
1358 yield;
7c673cae 1359 }
91327a77 1360 set_status("lease acquired");
11fdf7f2 1361 tn->log(10, "took lease");
7c673cae 1362 }
7c673cae
FG
1363 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1364 rgw_raw_obj(pool, error_oid),
1365 1 /* no buffer */);
1366 error_repo->get();
1367 spawn(error_repo, false);
11fdf7f2 1368 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae 1369 do {
91327a77
AA
1370 if (!lease_cr->is_locked()) {
1371 stop_spawned_services();
1372 drain_all();
1373 return set_cr_error(-ECANCELED);
1374 }
7c673cae
FG
1375 current_modified.clear();
1376 inc_lock.Lock();
1377 current_modified.swap(modified_shards);
1378 inc_lock.Unlock();
1379
11fdf7f2
TL
1380 if (current_modified.size() > 0) {
1381 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1382 }
7c673cae
FG
1383 /* process out of band updates */
1384 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1385 yield {
11fdf7f2
TL
1386 tn->log(20, SSTR("received async update notification: " << *modified_iter));
1387 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
7c673cae
FG
1388 }
1389 }
1390
91327a77
AA
1391 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1392 /* process bucket shards that previously failed */
11fdf7f2 1393 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
91327a77 1394 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
11fdf7f2
TL
1395 error_marker, max_error_entries, omapkeys));
1396 error_entries = std::move(omapkeys->entries);
1397 tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
91327a77
AA
1398 iter = error_entries.begin();
1399 for (; iter != error_entries.end(); ++iter) {
1400 error_marker = *iter;
11fdf7f2
TL
1401 tn->log(20, SSTR("handle error entry: " << error_marker));
1402 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
91327a77 1403 }
11fdf7f2 1404 if (!omapkeys->more) {
91327a77
AA
1405 if (error_marker.empty() && error_entries.empty()) {
1406 /* the retry repo is empty, we back off a bit before calling it again */
1407 retry_backoff_secs *= 2;
1408 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1409 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1410 }
1411 } else {
1412 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
7c673cae 1413 }
91327a77
AA
1414 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1415 error_marker.clear();
7c673cae 1416 }
7c673cae 1417 }
11fdf7f2 1418 omapkeys.reset();
7c673cae 1419
91327a77 1420#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1421 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
91327a77 1422 spawned_keys.clear();
a8e16298
TL
1423 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
1424 &next_marker, &log_entries, &truncated));
eafe8130 1425 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 1426 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
7c673cae
FG
1427 stop_spawned_services();
1428 drain_all();
1429 return set_cr_error(retcode);
1430 }
11fdf7f2
TL
1431
1432 if (log_entries.size() > 0) {
1433 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1434 }
1435
91327a77 1436 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
11fdf7f2 1437 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
91327a77 1438 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
11fdf7f2 1439 tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
91327a77
AA
1440 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1441 continue;
7c673cae 1442 }
91327a77 1443 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
11fdf7f2 1444 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
91327a77
AA
1445 } else {
1446 /*
1447 * don't spawn the same key more than once. We can do that as long as we don't yield
1448 */
1449 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1450 spawned_keys.insert(log_iter->entry.key);
11fdf7f2 1451 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
91327a77
AA
1452 if (retcode < 0) {
1453 stop_spawned_services();
1454 drain_all();
1455 return set_cr_error(retcode);
7c673cae
FG
1456 }
1457 }
91327a77 1458 }
11fdf7f2
TL
1459 }
1460 while ((int)num_spawned() > spawn_window) {
1461 set_status() << "num_spawned() > spawn_window";
1462 yield wait_for_child();
1463 int ret;
1464 while (collect(&ret, lease_stack.get())) {
1465 if (ret < 0) {
1466 tn->log(10, "a sync operation returned error");
1467 /* we have reported this error */
7c673cae 1468 }
91327a77 1469 /* not waiting for child here */
7c673cae 1470 }
a8e16298 1471 }
11fdf7f2
TL
1472
1473 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1474 << " next_marker=" << next_marker << " truncated=" << truncated));
a8e16298
TL
1475 if (!next_marker.empty()) {
1476 sync_marker.marker = next_marker;
1477 } else if (!log_entries.empty()) {
1478 sync_marker.marker = log_entries.back().log_id;
1479 }
11fdf7f2
TL
1480 if (!truncated) {
1481 // we reached the end, wait a while before checking for more
1482 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1483 yield wait(get_idle_interval());
1484 }
7c673cae
FG
1485 } while (true);
1486 }
1487 return 0;
1488 }
91327a77
AA
1489
1490 utime_t get_idle_interval() const {
1491#define INCREMENTAL_INTERVAL 20
1492 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1493 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1494 auto now = ceph::coarse_real_clock::now();
1495 if (error_retry_time > now) {
1496 auto d = error_retry_time - now;
1497 if (interval > d) {
1498 interval = d;
1499 }
1500 }
1501 }
1502 // convert timespan -> time_point -> utime_t
1503 return utime_t(ceph::coarse_real_clock::zero() + interval);
1504 }
1505
7c673cae
FG
1506 void stop_spawned_services() {
1507 lease_cr->go_down();
1508 if (error_repo) {
1509 error_repo->finish();
1510 error_repo->put();
1511 error_repo = NULL;
1512 }
1513 }
1514};
1515
1516class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1517 RGWDataSyncEnv *sync_env;
1518
1519 rgw_pool pool;
1520
1521 uint32_t shard_id;
1522 rgw_data_sync_marker sync_marker;
1523
11fdf7f2 1524 RGWSyncTraceNodeRef tn;
7c673cae 1525public:
11fdf7f2
TL
1526 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
1527 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1528 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
7c673cae
FG
1529 sync_env(_sync_env),
1530 pool(_pool),
1531 shard_id(_shard_id),
1532 sync_marker(_marker) {
11fdf7f2 1533 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
7c673cae
FG
1534 }
1535
1536 RGWCoroutine *alloc_cr() override {
11fdf7f2 1537 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
7c673cae
FG
1538 }
1539
1540 RGWCoroutine *alloc_finisher_cr() override {
1541 RGWRados *store = sync_env->store;
11fdf7f2
TL
1542 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1543 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
7c673cae
FG
1544 &sync_marker);
1545 }
1546
1547 void append_modified_shards(set<string>& keys) {
1548 Mutex::Locker l(cr_lock());
1549
1550 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1551 if (!cr) {
1552 return;
1553 }
1554
1555 cr->append_modified_shards(keys);
1556 }
1557};
1558
1559class RGWDataSyncCR : public RGWCoroutine {
1560 RGWDataSyncEnv *sync_env;
1561 uint32_t num_shards;
1562
1563 rgw_data_sync_status sync_status;
1564
1565 RGWDataSyncShardMarkerTrack *marker_tracker;
1566
1567 Mutex shard_crs_lock;
1568 map<int, RGWDataSyncShardControlCR *> shard_crs;
1569
1570 bool *reset_backoff;
1571
11fdf7f2 1572 RGWSyncTraceNodeRef tn;
31f18b77
FG
1573
1574 RGWDataSyncModule *data_sync_module{nullptr};
7c673cae 1575public:
11fdf7f2 1576 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1577 sync_env(_sync_env),
1578 num_shards(_num_shards),
1579 marker_tracker(NULL),
1580 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
11fdf7f2 1581 reset_backoff(_reset_backoff), tn(_tn) {
31f18b77 1582
7c673cae
FG
1583 }
1584
1585 ~RGWDataSyncCR() override {
1586 for (auto iter : shard_crs) {
1587 iter.second->put();
1588 }
1589 }
1590
1591 int operate() override {
1592 reenter(this) {
1593
1594 /* read sync status */
1595 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1596
31f18b77
FG
1597 data_sync_module = sync_env->sync_module->get_data_handler();
1598
224ce89b 1599 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 1600 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
7c673cae
FG
1601 return set_cr_error(retcode);
1602 }
1603
1604 /* state: init status */
1605 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
11fdf7f2 1606 tn->log(20, SSTR("init"));
224ce89b 1607 sync_status.sync_info.num_shards = num_shards;
31f18b77 1608 uint64_t instance_id;
11fdf7f2
TL
1609 instance_id = ceph::util::generate_random_number<uint64_t>();
1610 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
7c673cae 1611 if (retcode < 0) {
11fdf7f2 1612 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
7c673cae
FG
1613 return set_cr_error(retcode);
1614 }
1615 // sets state = StateBuildingFullSyncMaps
1616
1617 *reset_backoff = true;
1618 }
1619
31f18b77
FG
1620 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1621
7c673cae 1622 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
11fdf7f2 1623 tn->log(10, SSTR("building full sync maps"));
31f18b77 1624 /* call sync module init here */
b32b8144 1625 sync_status.sync_info.num_shards = num_shards;
31f18b77
FG
1626 yield call(data_sync_module->init_sync(sync_env));
1627 if (retcode < 0) {
11fdf7f2 1628 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
31f18b77
FG
1629 return set_cr_error(retcode);
1630 }
7c673cae 1631 /* state: building full sync maps */
7c673cae
FG
1632 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1633 if (retcode < 0) {
11fdf7f2 1634 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
7c673cae
FG
1635 return set_cr_error(retcode);
1636 }
1637 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1638
1639 /* update new state */
1640 yield call(set_sync_info_cr());
1641 if (retcode < 0) {
11fdf7f2 1642 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
7c673cae
FG
1643 return set_cr_error(retcode);
1644 }
1645
1646 *reset_backoff = true;
1647 }
1648
11fdf7f2
TL
1649 yield call(data_sync_module->start_sync(sync_env));
1650
7c673cae
FG
1651 yield {
1652 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
11fdf7f2 1653 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
7c673cae
FG
1654 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1655 iter != sync_status.sync_markers.end(); ++iter) {
11fdf7f2
TL
1656 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
1657 iter->first, iter->second, tn);
7c673cae
FG
1658 cr->get();
1659 shard_crs_lock.Lock();
1660 shard_crs[iter->first] = cr;
1661 shard_crs_lock.Unlock();
1662 spawn(cr, true);
1663 }
1664 }
1665 }
1666
1667 return set_cr_done();
1668 }
1669 return 0;
1670 }
1671
1672 RGWCoroutine *set_sync_info_cr() {
1673 RGWRados *store = sync_env->store;
11fdf7f2
TL
1674 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
1675 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
1676 sync_status.sync_info);
1677 }
1678
1679 void wakeup(int shard_id, set<string>& keys) {
1680 Mutex::Locker l(shard_crs_lock);
1681 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1682 if (iter == shard_crs.end()) {
1683 return;
1684 }
1685 iter->second->append_modified_shards(keys);
1686 iter->second->wakeup();
1687 }
1688};
1689
1690class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1691public:
1692 RGWDefaultDataSyncModule() {}
1693
11fdf7f2 1694 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
31f18b77 1695 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae 1696 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1697 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae
FG
1698};
1699
1700class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1701 RGWDefaultDataSyncModule data_handler;
1702public:
1703 RGWDefaultSyncModuleInstance() {}
1704 RGWDataSyncModule *get_data_handler() override {
1705 return &data_handler;
1706 }
11fdf7f2
TL
1707 bool supports_user_writes() override {
1708 return true;
1709 }
7c673cae
FG
1710};
1711
11fdf7f2 1712int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
7c673cae
FG
1713{
1714 instance->reset(new RGWDefaultSyncModuleInstance());
1715 return 0;
1716}
1717
11fdf7f2 1718RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1719{
1720 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
11fdf7f2
TL
1721 std::nullopt,
1722 key, std::nullopt, versioned_epoch,
81eedcae 1723 true, zones_trace, sync_env->counters);
7c673cae
FG
1724}
1725
1726RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
31f18b77 1727 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1728{
1729 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1730 bucket_info, key, versioned, versioned_epoch,
31f18b77 1731 NULL, NULL, false, &mtime, zones_trace);
7c673cae
FG
1732}
1733
1734RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1735 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1736{
1737 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1738 bucket_info, key, versioned, versioned_epoch,
31f18b77 1739 &owner.id, &owner.display_name, true, &mtime, zones_trace);
7c673cae
FG
1740}
1741
11fdf7f2
TL
1742class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
1743public:
1744 RGWArchiveDataSyncModule() {}
1745
1746 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1747 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1748 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1749 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1750};
1751
1752class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
1753 RGWArchiveDataSyncModule data_handler;
1754public:
1755 RGWArchiveSyncModuleInstance() {}
1756 RGWDataSyncModule *get_data_handler() override {
1757 return &data_handler;
1758 }
1759 RGWMetadataHandler *alloc_bucket_meta_handler() override {
1760 return RGWArchiveBucketMetaHandlerAllocator::alloc();
1761 }
1762 RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
1763 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
1764 }
1765};
1766
1767int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1768{
1769 instance->reset(new RGWArchiveSyncModuleInstance());
1770 return 0;
1771}
1772
1773RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1774{
1775 ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1776 if (!bucket_info.versioned() ||
1777 (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
1778 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
1779 bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
1780 int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
1781 if (op_ret < 0) {
1782 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
1783 return NULL;
1784 }
1785 }
1786
1787 std::optional<rgw_obj_key> dest_key;
1788
1789 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
1790 versioned_epoch = 0;
1791 dest_key = key;
1792 if (key.instance.empty()) {
1793 sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
1794 }
1795 }
1796
1797 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1798 bucket_info, std::nullopt,
1799 key, dest_key, versioned_epoch,
81eedcae 1800 true, zones_trace, nullptr);
11fdf7f2
TL
1801}
1802
1803RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1804 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1805{
1806 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
1807 return NULL;
1808}
1809
1810RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1811 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1812{
1813 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
1814 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1815 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1816 bucket_info, key, versioned, versioned_epoch,
1817 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1818}
1819
7c673cae
FG
1820class RGWDataSyncControlCR : public RGWBackoffControlCR
1821{
1822 RGWDataSyncEnv *sync_env;
1823 uint32_t num_shards;
1824
11fdf7f2
TL
1825 RGWSyncTraceNodeRef tn;
1826
3efd9988 1827 static constexpr bool exit_on_error = false; // retry on all errors
7c673cae 1828public:
11fdf7f2
TL
1829 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
1830 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1831 sync_env(_sync_env), num_shards(_num_shards) {
1832 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
7c673cae
FG
1833 }
1834
1835 RGWCoroutine *alloc_cr() override {
11fdf7f2 1836 return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
7c673cae
FG
1837 }
1838
1839 void wakeup(int shard_id, set<string>& keys) {
1840 Mutex& m = cr_lock();
1841
1842 m.Lock();
1843 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1844 if (!cr) {
1845 m.Unlock();
1846 return;
1847 }
1848
1849 cr->get();
1850 m.Unlock();
1851
1852 if (cr) {
11fdf7f2 1853 tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
7c673cae
FG
1854 cr->wakeup(shard_id, keys);
1855 }
1856
1857 cr->put();
1858 }
1859};
1860
1861void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1862 RWLock::RLocker rl(lock);
1863 if (!data_sync_cr) {
1864 return;
1865 }
1866 data_sync_cr->wakeup(shard_id, keys);
1867}
1868
1869int RGWRemoteDataLog::run_sync(int num_shards)
1870{
1871 lock.get_write();
11fdf7f2 1872 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
7c673cae
FG
1873 data_sync_cr->get(); // run() will drop a ref, so take another
1874 lock.unlock();
1875
1876 int r = run(data_sync_cr);
1877
1878 lock.get_write();
1879 data_sync_cr->put();
1880 data_sync_cr = NULL;
1881 lock.unlock();
1882
1883 if (r < 0) {
11fdf7f2 1884 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
7c673cae
FG
1885 return r;
1886 }
1887 return 0;
1888}
1889
1890int RGWDataSyncStatusManager::init()
1891{
11fdf7f2
TL
1892 RGWZone *zone_def;
1893
1894 if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
1895 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
7c673cae
FG
1896 return -EIO;
1897 }
1898
11fdf7f2 1899 if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
7c673cae
FG
1900 return -ENOTSUP;
1901 }
1902
11fdf7f2 1903 const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
7c673cae 1904
94b18763
FG
1905 if (sync_module == nullptr) {
1906 sync_module = store->get_sync_module();
1907 }
7c673cae 1908
11fdf7f2 1909 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 1910 if (!conn) {
11fdf7f2 1911 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
1912 return -EINVAL;
1913 }
1914
1915 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1916
81eedcae
TL
1917 int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(),
1918 sync_module, counters);
7c673cae 1919 if (r < 0) {
11fdf7f2 1920 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
7c673cae
FG
1921 finalize();
1922 return r;
1923 }
1924
1925 rgw_datalog_info datalog_info;
1926 r = source_log.read_log_info(&datalog_info);
1927 if (r < 0) {
11fdf7f2 1928 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
7c673cae
FG
1929 finalize();
1930 return r;
1931 }
1932
1933 num_shards = datalog_info.num_shards;
1934
1935 for (int i = 0; i < num_shards; i++) {
1936 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1937 }
1938
1939 return 0;
1940}
1941
1942void RGWDataSyncStatusManager::finalize()
1943{
1944 delete error_logger;
1945 error_logger = nullptr;
1946}
1947
11fdf7f2
TL
1948unsigned RGWDataSyncStatusManager::get_subsys() const
1949{
1950 return dout_subsys;
1951}
1952
1953std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
1954{
1955 auto zone = std::string_view{source_zone};
1956 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
1957}
1958
7c673cae
FG
1959string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1960{
1961 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1962 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1963
1964 return string(buf);
1965}
1966
1967string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1968{
1969 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1970 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1971
1972 return string(buf);
1973}
1974
1975int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1976 const rgw_bucket& bucket, int shard_id,
1977 RGWSyncErrorLogger *_error_logger,
11fdf7f2 1978 RGWSyncTraceManager *_sync_tracer,
7c673cae
FG
1979 RGWSyncModuleInstanceRef& _sync_module)
1980{
1981 conn = _conn;
1982 source_zone = _source_zone;
1983 bs.bucket = bucket;
1984 bs.shard_id = shard_id;
1985
11fdf7f2 1986 sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
81eedcae 1987 _error_logger, _sync_tracer, source_zone, _sync_module, nullptr);
7c673cae
FG
1988
1989 return 0;
1990}
1991
7c673cae
FG
1992class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1993 RGWDataSyncEnv *sync_env;
1994 const string instance_key;
1995
28e407b8 1996 rgw_bucket_index_marker_info *info;
7c673cae
FG
1997
1998public:
1999 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
2000 const rgw_bucket_shard& bs,
28e407b8 2001 rgw_bucket_index_marker_info *_info)
7c673cae
FG
2002 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2003 instance_key(bs.get_key()), info(_info) {}
2004
2005 int operate() override {
2006 reenter(this) {
2007 yield {
2008 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
2009 { "bucket-instance", instance_key.c_str() },
2010 { "info" , NULL },
2011 { NULL, NULL } };
2012
2013 string p = "/admin/log/";
28e407b8 2014 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
7c673cae
FG
2015 }
2016 if (retcode < 0) {
2017 return set_cr_error(retcode);
2018 }
2019 return set_cr_done();
2020 }
2021 return 0;
2022 }
2023};
2024
2025class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
2026 RGWDataSyncEnv *sync_env;
2027
2028 rgw_bucket_shard bs;
2029 const string sync_status_oid;
2030
2031 rgw_bucket_shard_sync_info& status;
2032
28e407b8 2033 rgw_bucket_index_marker_info info;
7c673cae
FG
2034public:
2035 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2036 const rgw_bucket_shard& bs,
2037 rgw_bucket_shard_sync_info& _status)
2038 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2039 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2040 status(_status)
2041 {}
2042
2043 int operate() override {
2044 reenter(this) {
2045 /* fetch current position in logs */
2046 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
2047 if (retcode < 0 && retcode != -ENOENT) {
2048 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
2049 return set_cr_error(retcode);
2050 }
2051 yield {
7c673cae 2052 auto store = sync_env->store;
11fdf7f2 2053 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
c07f9fc5
FG
2054
2055 if (info.syncstopped) {
2056 call(new RGWRadosRemoveCR(store, obj));
2057 } else {
2058 status.state = rgw_bucket_shard_sync_info::StateFullSync;
2059 status.inc_marker.position = info.max_marker;
2060 map<string, bufferlist> attrs;
2061 status.encode_all_attrs(attrs);
11fdf7f2 2062 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
c07f9fc5 2063 }
7c673cae 2064 }
91327a77
AA
2065 if (info.syncstopped) {
2066 retcode = -ENOENT;
2067 }
2068 if (retcode < 0) {
2069 return set_cr_error(retcode);
2070 }
7c673cae
FG
2071 return set_cr_done();
2072 }
2073 return 0;
2074 }
2075};
2076
2077RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
2078{
2079 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
2080}
2081
11fdf7f2
TL
2082#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2083
7c673cae 2084template <class T>
11fdf7f2 2085static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
7c673cae
FG
2086{
2087 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
2088 if (iter == attrs.end()) {
2089 *val = T();
11fdf7f2 2090 return false;
7c673cae
FG
2091 }
2092
11fdf7f2 2093 auto biter = iter->second.cbegin();
7c673cae 2094 try {
11fdf7f2 2095 decode(*val, biter);
7c673cae
FG
2096 } catch (buffer::error& err) {
2097 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
11fdf7f2 2098 return false;
7c673cae 2099 }
11fdf7f2 2100 return true;
7c673cae
FG
2101}
2102
2103void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
2104{
11fdf7f2
TL
2105 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
2106 decode_attr(cct, attrs, "state", &state);
2107 }
2108 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
2109 decode_attr(cct, attrs, "full_marker", &full_marker);
2110 }
2111 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
2112 decode_attr(cct, attrs, "inc_marker", &inc_marker);
2113 }
7c673cae
FG
2114}
2115
2116void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
2117{
2118 encode_state_attr(attrs);
2119 full_marker.encode_attr(attrs);
2120 inc_marker.encode_attr(attrs);
2121}
2122
2123void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
2124{
11fdf7f2
TL
2125 using ceph::encode;
2126 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
7c673cae
FG
2127}
2128
2129void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2130{
11fdf7f2
TL
2131 using ceph::encode;
2132 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
7c673cae
FG
2133}
2134
2135void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2136{
11fdf7f2
TL
2137 using ceph::encode;
2138 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
7c673cae
FG
2139}
2140
2141class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
2142 RGWDataSyncEnv *sync_env;
2143 string oid;
2144 rgw_bucket_shard_sync_info *status;
2145
2146 map<string, bufferlist> attrs;
2147public:
2148 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2149 const rgw_bucket_shard& bs,
2150 rgw_bucket_shard_sync_info *_status)
2151 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2152 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2153 status(_status) {}
2154 int operate() override;
2155};
2156
2157int RGWReadBucketSyncStatusCoroutine::operate()
2158{
2159 reenter(this) {
11fdf7f2
TL
2160 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
2161 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
2162 &attrs, true));
7c673cae
FG
2163 if (retcode == -ENOENT) {
2164 *status = rgw_bucket_shard_sync_info();
2165 return set_cr_done();
2166 }
2167 if (retcode < 0) {
2168 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2169 return set_cr_error(retcode);
2170 }
2171 status->decode_from_attrs(sync_env->cct, attrs);
2172 return set_cr_done();
2173 }
2174 return 0;
2175}
28e407b8
AA
2176
2177#define OMAP_READ_MAX_ENTRIES 10
2178class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2179 RGWDataSyncEnv *sync_env;
2180 RGWRados *store;
2181
2182 const int shard_id;
2183 int max_entries;
2184
2185 set<string>& recovering_buckets;
2186 string marker;
2187 string error_oid;
2188
11fdf7f2 2189 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
2190 set<string> error_entries;
2191 int max_omap_entries;
2192 int count;
2193
2194public:
2195 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2196 set<string>& _recovering_buckets, const int _max_entries)
2197 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2198 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2199 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2200 {
2201 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2202 }
2203
2204 int operate() override;
2205};
2206
2207int RGWReadRecoveringBucketShardsCoroutine::operate()
2208{
2209 reenter(this){
2210 //read recovering bucket shards
2211 count = 0;
2212 do {
11fdf7f2
TL
2213 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
2214 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid),
2215 marker, max_omap_entries, omapkeys));
28e407b8
AA
2216
2217 if (retcode == -ENOENT) {
2218 break;
2219 }
2220
2221 if (retcode < 0) {
2222 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2223 << cpp_strerror(retcode) << dendl;
2224 return set_cr_error(retcode);
2225 }
2226
11fdf7f2 2227 error_entries = std::move(omapkeys->entries);
28e407b8
AA
2228 if (error_entries.empty()) {
2229 break;
2230 }
2231
2232 count += error_entries.size();
2233 marker = *error_entries.rbegin();
11fdf7f2
TL
2234 recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
2235 std::make_move_iterator(error_entries.end()));
2236 } while (omapkeys->more && count < max_entries);
28e407b8
AA
2237
2238 return set_cr_done();
2239 }
2240
2241 return 0;
2242}
2243
2244class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2245 RGWDataSyncEnv *sync_env;
2246 RGWRados *store;
2247
2248 const int shard_id;
2249 int max_entries;
2250
2251 set<string>& pending_buckets;
2252 string marker;
2253 string status_oid;
2254
2255 rgw_data_sync_marker* sync_marker;
2256 int count;
2257
a8e16298 2258 std::string next_marker;
28e407b8
AA
2259 list<rgw_data_change_log_entry> log_entries;
2260 bool truncated;
2261
2262public:
2263 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2264 set<string>& _pending_buckets,
2265 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2266 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2267 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2268 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2269 {
2270 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2271 }
2272
2273 int operate() override;
2274};
2275
2276int RGWReadPendingBucketShardsCoroutine::operate()
2277{
2278 reenter(this){
2279 //read sync status marker
2280 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
2281 yield call(new CR(sync_env->async_rados, store->svc.sysobj,
2282 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
28e407b8
AA
2283 sync_marker));
2284 if (retcode < 0) {
2285 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2286 << cpp_strerror(retcode) << dendl;
2287 return set_cr_error(retcode);
2288 }
2289
2290 //read pending bucket shards
2291 marker = sync_marker->marker;
2292 count = 0;
2293 do{
a8e16298
TL
2294 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
2295 &next_marker, &log_entries, &truncated));
28e407b8
AA
2296
2297 if (retcode == -ENOENT) {
2298 break;
2299 }
2300
2301 if (retcode < 0) {
2302 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2303 << cpp_strerror(retcode) << dendl;
2304 return set_cr_error(retcode);
2305 }
2306
2307 if (log_entries.empty()) {
2308 break;
2309 }
2310
2311 count += log_entries.size();
2312 for (const auto& entry : log_entries) {
2313 pending_buckets.insert(entry.entry.key);
2314 }
2315 }while(truncated && count < max_entries);
2316
2317 return set_cr_done();
2318 }
2319
2320 return 0;
2321}
2322
2323int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2324{
2325 // cannot run concurrently with run_sync(), so run in a separate manager
2326 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2327 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2328 int ret = http_manager.start();
28e407b8 2329 if (ret < 0) {
11fdf7f2 2330 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
2331 return ret;
2332 }
2333 RGWDataSyncEnv sync_env_local = sync_env;
2334 sync_env_local.http_manager = &http_manager;
2335 list<RGWCoroutinesStack *> stacks;
2336 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2337 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2338 stacks.push_back(recovering_stack);
2339 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2340 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2341 stacks.push_back(pending_stack);
2342 ret = crs.run(stacks);
2343 http_manager.stop();
2344 return ret;
2345}
2346
7c673cae
FG
2347RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2348{
2349 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2350}
2351
2352RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2353 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2354 delete iter->second;
2355 }
2356 delete error_logger;
2357}
2358
2359
2360void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2361{
2362 JSONDecoder::decode_json("ID", id, obj);
2363 JSONDecoder::decode_json("DisplayName", display_name, obj);
2364}
2365
2366struct bucket_list_entry {
2367 bool delete_marker;
2368 rgw_obj_key key;
2369 bool is_latest;
2370 real_time mtime;
2371 string etag;
2372 uint64_t size;
2373 string storage_class;
2374 rgw_bucket_entry_owner owner;
2375 uint64_t versioned_epoch;
2376 string rgw_tag;
2377
2378 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2379
2380 void decode_json(JSONObj *obj) {
2381 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2382 JSONDecoder::decode_json("Key", key.name, obj);
2383 JSONDecoder::decode_json("VersionId", key.instance, obj);
2384 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2385 string mtime_str;
2386 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2387
2388 struct tm t;
2389 uint32_t nsec;
2390 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2391 ceph_timespec ts;
2392 ts.tv_sec = (uint64_t)internal_timegm(&t);
2393 ts.tv_nsec = nsec;
2394 mtime = real_clock::from_ceph_timespec(ts);
2395 }
2396 JSONDecoder::decode_json("ETag", etag, obj);
2397 JSONDecoder::decode_json("Size", size, obj);
2398 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2399 JSONDecoder::decode_json("Owner", owner, obj);
2400 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2401 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
11fdf7f2
TL
2402 if (key.instance == "null" && !versioned_epoch) {
2403 key.instance.clear();
2404 }
7c673cae 2405 }
a8e16298
TL
2406
2407 RGWModifyOp get_modify_op() const {
2408 if (delete_marker) {
2409 return CLS_RGW_OP_LINK_OLH_DM;
2410 } else if (!key.instance.empty() && key.instance != "null") {
2411 return CLS_RGW_OP_LINK_OLH;
2412 } else {
2413 return CLS_RGW_OP_ADD;
2414 }
2415 }
7c673cae
FG
2416};
2417
2418struct bucket_list_result {
2419 string name;
2420 string prefix;
2421 string key_marker;
2422 string version_id_marker;
2423 int max_keys;
2424 bool is_truncated;
2425 list<bucket_list_entry> entries;
2426
2427 bucket_list_result() : max_keys(0), is_truncated(false) {}
2428
2429 void decode_json(JSONObj *obj) {
2430 JSONDecoder::decode_json("Name", name, obj);
2431 JSONDecoder::decode_json("Prefix", prefix, obj);
2432 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2433 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2434 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2435 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2436 JSONDecoder::decode_json("Entries", entries, obj);
2437 }
2438};
2439
2440class RGWListBucketShardCR: public RGWCoroutine {
2441 RGWDataSyncEnv *sync_env;
2442 const rgw_bucket_shard& bs;
2443 const string instance_key;
2444 rgw_obj_key marker_position;
2445
2446 bucket_list_result *result;
2447
2448public:
2449 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2450 rgw_obj_key& _marker_position, bucket_list_result *_result)
2451 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2452 instance_key(bs.get_key()), marker_position(_marker_position),
2453 result(_result) {}
2454
2455 int operate() override {
2456 reenter(this) {
2457 yield {
2458 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2459 { "versions" , NULL },
2460 { "format" , "json" },
2461 { "objs-container" , "true" },
2462 { "key-marker" , marker_position.name.c_str() },
2463 { "version-id-marker" , marker_position.instance.c_str() },
2464 { NULL, NULL } };
2465 // don't include tenant in the url, it's already part of instance_key
2466 string p = string("/") + bs.bucket.name;
2467 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2468 }
2469 if (retcode < 0) {
2470 return set_cr_error(retcode);
2471 }
2472 return set_cr_done();
2473 }
2474 return 0;
2475 }
2476};
2477
2478class RGWListBucketIndexLogCR: public RGWCoroutine {
2479 RGWDataSyncEnv *sync_env;
2480 const string instance_key;
2481 string marker;
2482
2483 list<rgw_bi_log_entry> *result;
81eedcae 2484 std::optional<PerfGuard> timer;
7c673cae
FG
2485
2486public:
2487 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2488 string& _marker, list<rgw_bi_log_entry> *_result)
2489 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2490 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2491
2492 int operate() override {
2493 reenter(this) {
81eedcae
TL
2494 if (sync_env->counters) {
2495 timer.emplace(sync_env->counters, sync_counters::l_poll);
2496 }
7c673cae
FG
2497 yield {
2498 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2499 { "format" , "json" },
2500 { "marker" , marker.c_str() },
2501 { "type", "bucket-index" },
2502 { NULL, NULL } };
2503
2504 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2505 }
81eedcae 2506 timer.reset();
7c673cae 2507 if (retcode < 0) {
81eedcae
TL
2508 if (sync_env->counters) {
2509 sync_env->counters->inc(sync_counters::l_poll_err);
2510 }
7c673cae
FG
2511 return set_cr_error(retcode);
2512 }
2513 return set_cr_done();
2514 }
2515 return 0;
2516 }
2517};
2518
2519#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2520
2521class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2522 RGWDataSyncEnv *sync_env;
2523
2524 string marker_oid;
2525 rgw_bucket_shard_full_sync_marker sync_marker;
2526
11fdf7f2
TL
2527 RGWSyncTraceNodeRef tn;
2528
7c673cae
FG
2529public:
2530 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2531 const string& _marker_oid,
2532 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2533 sync_env(_sync_env),
2534 marker_oid(_marker_oid),
2535 sync_marker(_marker) {}
2536
11fdf7f2
TL
2537 void set_tn(RGWSyncTraceNodeRef& _tn) {
2538 tn = _tn;
2539 }
2540
7c673cae
FG
2541 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2542 sync_marker.position = new_marker;
2543 sync_marker.count = index_pos;
2544
2545 map<string, bufferlist> attrs;
2546 sync_marker.encode_attr(attrs);
2547
2548 RGWRados *store = sync_env->store;
2549
11fdf7f2
TL
2550 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
2551 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2552 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2553 attrs);
2554 }
91327a77 2555
11fdf7f2 2556 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2557 return new RGWLastCallerWinsCR(sync_env->cct);
2558 }
7c673cae
FG
2559};
2560
2561class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2562 RGWDataSyncEnv *sync_env;
2563
2564 string marker_oid;
2565 rgw_bucket_shard_inc_sync_marker sync_marker;
2566
2567 map<rgw_obj_key, string> key_to_marker;
91327a77
AA
2568
2569 struct operation {
2570 rgw_obj_key key;
2571 bool is_olh;
2572 };
2573 map<string, operation> marker_to_op;
2574 std::set<std::string> pending_olh; // object names with pending olh operations
7c673cae 2575
11fdf7f2
TL
2576 RGWSyncTraceNodeRef tn;
2577
7c673cae 2578 void handle_finish(const string& marker) override {
91327a77
AA
2579 auto iter = marker_to_op.find(marker);
2580 if (iter == marker_to_op.end()) {
7c673cae
FG
2581 return;
2582 }
91327a77
AA
2583 auto& op = iter->second;
2584 key_to_marker.erase(op.key);
2585 reset_need_retry(op.key);
2586 if (op.is_olh) {
2587 pending_olh.erase(op.key.name);
2588 }
2589 marker_to_op.erase(iter);
7c673cae
FG
2590 }
2591
2592public:
2593 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2594 const string& _marker_oid,
2595 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2596 sync_env(_sync_env),
2597 marker_oid(_marker_oid),
2598 sync_marker(_marker) {}
2599
11fdf7f2
TL
2600 void set_tn(RGWSyncTraceNodeRef& _tn) {
2601 tn = _tn;
2602 }
2603
7c673cae
FG
2604 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2605 sync_marker.position = new_marker;
2606
2607 map<string, bufferlist> attrs;
2608 sync_marker.encode_attr(attrs);
2609
2610 RGWRados *store = sync_env->store;
2611
11fdf7f2 2612 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae 2613 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
11fdf7f2
TL
2614 store->svc.sysobj,
2615 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2616 attrs);
2617 }
2618
2619 /*
2620 * create index from key -> <op, marker>, and from marker -> key
2621 * this is useful so that we can insure that we only have one
2622 * entry for any key that is used. This is needed when doing
2623 * incremenatl sync of data, and we don't want to run multiple
2624 * concurrent sync operations for the same bucket shard
2625 * Also, we should make sure that we don't run concurrent operations on the same key with
2626 * different ops.
2627 */
91327a77
AA
2628 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2629 auto result = key_to_marker.emplace(key, marker);
2630 if (!result.second) { // exists
7c673cae
FG
2631 set_need_retry(key);
2632 return false;
2633 }
91327a77
AA
2634 marker_to_op[marker] = operation{key, is_olh};
2635 if (is_olh) {
2636 // prevent other olh ops from starting on this object name
2637 pending_olh.insert(key.name);
2638 }
7c673cae
FG
2639 return true;
2640 }
2641
91327a77
AA
2642 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2643 // serialize olh ops on the same object name
2644 if (is_olh && pending_olh.count(key.name)) {
11fdf7f2 2645 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
91327a77
AA
2646 return false;
2647 }
7c673cae
FG
2648 return (key_to_marker.find(key) == key_to_marker.end());
2649 }
91327a77 2650
11fdf7f2 2651 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2652 return new RGWLastCallerWinsCR(sync_env->cct);
2653 }
7c673cae
FG
2654};
2655
2656template <class T, class K>
2657class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2658 RGWDataSyncEnv *sync_env;
2659
2660 RGWBucketInfo *bucket_info;
2661 const rgw_bucket_shard& bs;
2662
2663 rgw_obj_key key;
2664 bool versioned;
11fdf7f2 2665 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
2666 rgw_bucket_entry_owner owner;
2667 real_time timestamp;
2668 RGWModifyOp op;
2669 RGWPendingState op_state;
2670
2671 T entry_marker;
2672 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2673
2674 int sync_status;
2675
2676 stringstream error_ss;
2677
7c673cae
FG
2678 bool error_injection;
2679
2680 RGWDataSyncModule *data_sync_module;
31f18b77
FG
2681
2682 rgw_zone_set zones_trace;
7c673cae 2683
11fdf7f2 2684 RGWSyncTraceNodeRef tn;
7c673cae
FG
2685public:
2686 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2687 RGWBucketInfo *_bucket_info,
2688 const rgw_bucket_shard& bs,
91327a77 2689 const rgw_obj_key& _key, bool _versioned,
11fdf7f2 2690 std::optional<uint64_t> _versioned_epoch,
7c673cae
FG
2691 real_time& _timestamp,
2692 const rgw_bucket_entry_owner& _owner,
2693 RGWModifyOp _op, RGWPendingState _op_state,
11fdf7f2
TL
2694 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
2695 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
2696 sync_env(_sync_env),
2697 bucket_info(_bucket_info), bs(bs),
2698 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2699 owner(_owner),
2700 timestamp(_timestamp), op(_op),
2701 op_state(_op_state),
2702 entry_marker(_entry_marker),
2703 marker_tracker(_marker_tracker),
31f18b77 2704 sync_status(0){
7c673cae 2705 stringstream ss;
91327a77 2706 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
7c673cae 2707 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
7c673cae
FG
2708 set_status("init");
2709
11fdf7f2 2710 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
7c673cae 2711
11fdf7f2 2712 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
7c673cae
FG
2713 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2714
2715 data_sync_module = sync_env->sync_module->get_data_handler();
31f18b77
FG
2716
2717 zones_trace = _zones_trace;
11fdf7f2 2718 zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
7c673cae
FG
2719 }
2720
2721 int operate() override {
2722 reenter(this) {
2723 /* skip entries that are not complete */
2724 if (op_state != CLS_RGW_STATE_COMPLETE) {
2725 goto done;
2726 }
11fdf7f2 2727 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2728 do {
2729 yield {
2730 marker_tracker->reset_need_retry(key);
2731 if (key.name.empty()) {
2732 /* shouldn't happen */
2733 set_status("skipping empty entry");
11fdf7f2 2734 tn->log(0, "entry with empty obj name, skipping");
7c673cae
FG
2735 goto done;
2736 }
2737 if (error_injection &&
2738 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
11fdf7f2 2739 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
7c673cae
FG
2740 retcode = -EIO;
2741 } else if (op == CLS_RGW_OP_ADD ||
2742 op == CLS_RGW_OP_LINK_OLH) {
7c673cae 2743 set_status("syncing obj");
11fdf7f2 2744 tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
31f18b77 2745 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
7c673cae
FG
2746 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2747 set_status("removing obj");
2748 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2749 versioned = true;
2750 }
11fdf7f2 2751 tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2752 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
f64942e4
AA
2753 // our copy of the object is more recent, continue as if it succeeded
2754 if (retcode == -ERR_PRECONDITION_FAILED) {
2755 retcode = 0;
2756 }
7c673cae 2757 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
7c673cae 2758 set_status("creating delete marker");
11fdf7f2 2759 tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2760 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
7c673cae 2761 }
11fdf7f2 2762 tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
7c673cae
FG
2763 }
2764 } while (marker_tracker->need_retry(key));
2765 {
11fdf7f2 2766 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 2767 if (retcode >= 0) {
11fdf7f2 2768 tn->log(10, "success");
7c673cae 2769 } else {
11fdf7f2 2770 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
7c673cae 2771 }
7c673cae
FG
2772 }
2773
2774 if (retcode < 0 && retcode != -ENOENT) {
2775 set_status() << "failed to sync obj; retcode=" << retcode;
11fdf7f2
TL
2776 tn->log(0, SSTR("ERROR: failed to sync object: "
2777 << bucket_shard_str{bs} << "/" << key.name));
7c673cae
FG
2778 error_ss << bucket_shard_str{bs} << "/" << key.name;
2779 sync_status = retcode;
2780 }
2781 if (!error_ss.str().empty()) {
28e407b8 2782 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
7c673cae
FG
2783 }
2784done:
2785 if (sync_status == 0) {
2786 /* update marker */
2787 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2788 yield call(marker_tracker->finish(entry_marker));
2789 sync_status = retcode;
2790 }
2791 if (sync_status < 0) {
2792 return set_cr_error(sync_status);
2793 }
2794 return set_cr_done();
2795 }
2796 return 0;
2797 }
2798};
2799
2800#define BUCKET_SYNC_SPAWN_WINDOW 20
2801
2802class RGWBucketShardFullSyncCR : public RGWCoroutine {
2803 RGWDataSyncEnv *sync_env;
2804 const rgw_bucket_shard& bs;
2805 RGWBucketInfo *bucket_info;
2806 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2807 bucket_list_result list_result;
2808 list<bucket_list_entry>::iterator entries_iter;
91327a77 2809 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2810 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2811 rgw_obj_key list_marker;
2812 bucket_list_entry *entry{nullptr};
7c673cae
FG
2813
2814 int total_entries{0};
2815
2816 int sync_status{0};
2817
2818 const string& status_oid;
2819
31f18b77 2820 rgw_zone_set zones_trace;
11fdf7f2
TL
2821
2822 RGWSyncTraceNodeRef tn;
7c673cae
FG
2823public:
2824 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2825 RGWBucketInfo *_bucket_info,
2826 const std::string& status_oid,
2827 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2828 rgw_bucket_shard_sync_info& sync_info,
2829 RGWSyncTraceNodeRef tn_parent)
7c673cae 2830 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2831 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2832 marker_tracker(sync_env, status_oid, sync_info.full_marker),
11fdf7f2
TL
2833 status_oid(status_oid),
2834 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
2835 SSTR(bucket_shard_str{bs}))) {
31f18b77 2836 zones_trace.insert(sync_env->source_zone);
11fdf7f2 2837 marker_tracker.set_tn(tn);
7c673cae
FG
2838 }
2839
2840 int operate() override;
2841};
2842
2843int RGWBucketShardFullSyncCR::operate()
2844{
2845 int ret;
2846 reenter(this) {
91327a77 2847 list_marker = sync_info.full_marker.position;
7c673cae 2848
91327a77 2849 total_entries = sync_info.full_marker.count;
7c673cae
FG
2850 do {
2851 if (!lease_cr->is_locked()) {
2852 drain_all();
2853 return set_cr_error(-ECANCELED);
2854 }
2855 set_status("listing remote bucket");
11fdf7f2 2856 tn->log(20, "listing bucket for full sync");
7c673cae
FG
2857 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2858 &list_result));
2859 if (retcode < 0 && retcode != -ENOENT) {
2860 set_status("failed bucket listing, going down");
2861 drain_all();
2862 return set_cr_error(retcode);
2863 }
11fdf7f2
TL
2864 if (list_result.entries.size() > 0) {
2865 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
2866 }
7c673cae
FG
2867 entries_iter = list_result.entries.begin();
2868 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2869 if (!lease_cr->is_locked()) {
2870 drain_all();
2871 return set_cr_error(-ECANCELED);
2872 }
11fdf7f2
TL
2873 tn->log(20, SSTR("[full sync] syncing object: "
2874 << bucket_shard_str{bs} << "/" << entries_iter->key));
7c673cae
FG
2875 entry = &(*entries_iter);
2876 total_entries++;
2877 list_marker = entries_iter->key;
2878 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
11fdf7f2 2879 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
7c673cae 2880 } else {
7c673cae
FG
2881 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2882 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2883 false, /* versioned, only matters for object removal */
2884 entry->versioned_epoch, entry->mtime,
a8e16298 2885 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
11fdf7f2 2886 entry->key, &marker_tracker, zones_trace, tn),
7c673cae
FG
2887 false);
2888 }
2889 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2890 yield wait_for_child();
2891 bool again = true;
2892 while (again) {
2893 again = collect(&ret, nullptr);
2894 if (ret < 0) {
11fdf7f2 2895 tn->log(10, "a sync operation returned error");
7c673cae
FG
2896 sync_status = ret;
2897 /* we have reported this error */
2898 }
2899 }
2900 }
2901 }
2902 } while (list_result.is_truncated && sync_status == 0);
2903 set_status("done iterating over all objects");
2904 /* wait for all operations to complete */
2905 while (num_spawned()) {
2906 yield wait_for_child();
2907 bool again = true;
2908 while (again) {
2909 again = collect(&ret, nullptr);
2910 if (ret < 0) {
11fdf7f2 2911 tn->log(10, "a sync operation returned error");
7c673cae
FG
2912 sync_status = ret;
2913 /* we have reported this error */
2914 }
2915 }
2916 }
11fdf7f2 2917 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2918 if (!lease_cr->is_locked()) {
2919 return set_cr_error(-ECANCELED);
2920 }
2921 /* update sync state to incremental */
2922 if (sync_status == 0) {
2923 yield {
91327a77 2924 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
7c673cae 2925 map<string, bufferlist> attrs;
91327a77 2926 sync_info.encode_state_attr(attrs);
7c673cae 2927 RGWRados *store = sync_env->store;
11fdf7f2
TL
2928 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2929 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
2930 attrs));
2931 }
2932 } else {
11fdf7f2 2933 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
2934 }
2935 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
11fdf7f2
TL
2936 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
2937 << bucket_shard_str{bs} << " retcode=" << retcode));
7c673cae
FG
2938 return set_cr_error(retcode);
2939 }
2940 if (sync_status < 0) {
2941 return set_cr_error(sync_status);
2942 }
2943 return set_cr_done();
2944 }
2945 return 0;
2946}
2947
91327a77
AA
2948static bool has_olh_epoch(RGWModifyOp op) {
2949 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2950}
2951
7c673cae
FG
2952class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2953 RGWDataSyncEnv *sync_env;
2954 const rgw_bucket_shard& bs;
2955 RGWBucketInfo *bucket_info;
2956 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2957 list<rgw_bi_log_entry> list_result;
91327a77 2958 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
7c673cae 2959 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
91327a77 2960 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2961 rgw_obj_key key;
2962 rgw_bi_log_entry *entry{nullptr};
2963 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2964 bool updated_status{false};
2965 const string& status_oid;
31f18b77 2966 const string& zone_id;
7c673cae
FG
2967
2968 string cur_id;
2969
7c673cae 2970 int sync_status{0};
c07f9fc5 2971 bool syncstopped{false};
7c673cae 2972
11fdf7f2 2973 RGWSyncTraceNodeRef tn;
7c673cae
FG
2974public:
2975 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2976 const rgw_bucket_shard& bs,
2977 RGWBucketInfo *_bucket_info,
2978 const std::string& status_oid,
2979 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2980 rgw_bucket_shard_sync_info& sync_info,
2981 RGWSyncTraceNodeRef& _tn_parent)
7c673cae 2982 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2983 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2984 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
11fdf7f2
TL
2985 status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
2986 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
2987 SSTR(bucket_shard_str{bs})))
91327a77 2988 {
7c673cae
FG
2989 set_description() << "bucket shard incremental sync bucket="
2990 << bucket_shard_str{bs};
2991 set_status("init");
11fdf7f2 2992 marker_tracker.set_tn(tn);
7c673cae
FG
2993 }
2994
2995 int operate() override;
2996};
2997
2998int RGWBucketShardIncrementalSyncCR::operate()
2999{
3000 int ret;
3001 reenter(this) {
3002 do {
3003 if (!lease_cr->is_locked()) {
3004 drain_all();
11fdf7f2 3005 tn->log(0, "ERROR: lease is not taken, abort");
7c673cae
FG
3006 return set_cr_error(-ECANCELED);
3007 }
11fdf7f2 3008 tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
91327a77
AA
3009 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
3010 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
7c673cae 3011 &list_result));
91327a77
AA
3012 if (retcode < 0 && retcode != -ENOENT) {
3013 /* wait for all operations to complete */
7c673cae 3014 drain_all();
91327a77 3015 return set_cr_error(retcode);
7c673cae
FG
3016 }
3017 squash_map.clear();
91327a77
AA
3018 entries_iter = list_result.begin();
3019 entries_end = list_result.end();
3020 for (; entries_iter != entries_end; ++entries_iter) {
3021 auto e = *entries_iter;
3022 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
3023 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
c07f9fc5 3024 syncstopped = true;
91327a77
AA
3025 entries_end = entries_iter; // dont sync past here
3026 break;
c07f9fc5 3027 }
91327a77 3028 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
c07f9fc5
FG
3029 continue;
3030 }
94b18763
FG
3031 if (e.op == CLS_RGW_OP_CANCEL) {
3032 continue;
3033 }
7c673cae
FG
3034 if (e.state != CLS_RGW_STATE_COMPLETE) {
3035 continue;
3036 }
31f18b77
FG
3037 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
3038 continue;
3039 }
7c673cae 3040 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
91327a77
AA
3041 // don't squash over olh entries - we need to apply their olh_epoch
3042 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
3043 continue;
3044 }
7c673cae
FG
3045 if (squash_entry.first <= e.timestamp) {
3046 squash_entry = make_pair<>(e.timestamp, e.op);
3047 }
3048 }
c07f9fc5 3049
7c673cae 3050 entries_iter = list_result.begin();
91327a77 3051 for (; entries_iter != entries_end; ++entries_iter) {
7c673cae
FG
3052 if (!lease_cr->is_locked()) {
3053 drain_all();
3054 return set_cr_error(-ECANCELED);
3055 }
3056 entry = &(*entries_iter);
3057 {
3058 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3059 if (p < 0) {
3060 cur_id = entry->id;
3061 } else {
3062 cur_id = entry->id.substr(p + 1);
3063 }
3064 }
91327a77 3065 sync_info.inc_marker.position = cur_id;
7c673cae 3066
c07f9fc5 3067 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
91327a77 3068 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
c07f9fc5
FG
3069 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3070 continue;
3071 }
3072
7c673cae
FG
3073 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
3074 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
11fdf7f2 3075 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
7c673cae
FG
3076 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3077 continue;
3078 }
3079
11fdf7f2 3080 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
7c673cae
FG
3081
3082 if (!key.ns.empty()) {
3083 set_status() << "skipping entry in namespace: " << entry->object;
11fdf7f2 3084 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
7c673cae
FG
3085 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3086 continue;
3087 }
3088
3089 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
3090 if (entry->op == CLS_RGW_OP_CANCEL) {
3091 set_status() << "canceled operation, skipping";
11fdf7f2
TL
3092 tn->log(20, SSTR("skipping object: "
3093 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
7c673cae
FG
3094 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3095 continue;
3096 }
3097 if (entry->state != CLS_RGW_STATE_COMPLETE) {
3098 set_status() << "non-complete operation, skipping";
11fdf7f2
TL
3099 tn->log(20, SSTR("skipping object: "
3100 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
7c673cae
FG
3101 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3102 continue;
3103 }
31f18b77
FG
3104 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
3105 set_status() << "redundant operation, skipping";
11fdf7f2
TL
3106 tn->log(20, SSTR("skipping object: "
3107 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
31f18b77
FG
3108 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3109 continue;
3110 }
7c673cae
FG
3111 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
3112 set_status() << "squashed operation, skipping";
11fdf7f2
TL
3113 tn->log(20, SSTR("skipping object: "
3114 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
91327a77 3115 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
7c673cae
FG
3116 continue;
3117 }
11fdf7f2
TL
3118 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
3119 tn->log(20, SSTR("syncing object: "
3120 << bucket_shard_str{bs} << "/" << key));
7c673cae 3121 updated_status = false;
91327a77 3122 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
7c673cae
FG
3123 if (!updated_status) {
3124 set_status() << "can't do op, conflicting inflight operation";
3125 updated_status = true;
3126 }
11fdf7f2 3127 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
7c673cae
FG
3128 yield wait_for_child();
3129 bool again = true;
3130 while (again) {
3131 again = collect(&ret, nullptr);
3132 if (ret < 0) {
11fdf7f2 3133 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
7c673cae
FG
3134 sync_status = ret;
3135 /* we have reported this error */
3136 }
3137 }
1adf2230
AA
3138 if (sync_status != 0)
3139 break;
3140 }
3141 if (sync_status != 0) {
3142 /* get error, stop */
3143 break;
7c673cae 3144 }
91327a77 3145 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
7c673cae 3146 set_status() << "can't do op, sync already in progress for object";
11fdf7f2 3147 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
7c673cae
FG
3148 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3149 continue;
3150 }
3151 // yield {
3152 set_status() << "start object sync";
3153 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
11fdf7f2 3154 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
7c673cae 3155 } else {
11fdf7f2 3156 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
3157 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
3158 if (entry->ver.pool < 0) {
3159 versioned_epoch = entry->ver.epoch;
3160 }
11fdf7f2 3161 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
7c673cae
FG
3162 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
3163 spawn(new SyncCR(sync_env, bucket_info, bs, key,
3164 entry->is_versioned(), versioned_epoch,
3165 entry->timestamp, owner, entry->op, entry->state,
11fdf7f2 3166 cur_id, &marker_tracker, entry->zones_trace, tn),
7c673cae
FG
3167 false);
3168 }
3169 // }
3170 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
3171 set_status() << "num_spawned() > spawn_window";
3172 yield wait_for_child();
3173 bool again = true;
3174 while (again) {
3175 again = collect(&ret, nullptr);
3176 if (ret < 0) {
11fdf7f2 3177 tn->log(10, "a sync operation returned error");
7c673cae
FG
3178 sync_status = ret;
3179 /* we have reported this error */
3180 }
3181 /* not waiting for child here */
3182 }
3183 }
3184 }
91327a77 3185 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
c07f9fc5 3186
7c673cae
FG
3187 while (num_spawned()) {
3188 yield wait_for_child();
3189 bool again = true;
3190 while (again) {
3191 again = collect(&ret, nullptr);
3192 if (ret < 0) {
11fdf7f2 3193 tn->log(10, "a sync operation returned error");
7c673cae
FG
3194 sync_status = ret;
3195 /* we have reported this error */
3196 }
3197 /* not waiting for child here */
3198 }
3199 }
11fdf7f2 3200 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 3201
91327a77
AA
3202 if (syncstopped) {
3203 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3204 // still disabled, we'll delete the sync status object. otherwise we'll
3205 // restart full sync to catch any changes that happened while sync was
3206 // disabled
3207 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
3208 return set_cr_done();
3209 }
3210
7c673cae
FG
3211 yield call(marker_tracker.flush());
3212 if (retcode < 0) {
11fdf7f2 3213 tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
7c673cae
FG
3214 return set_cr_error(retcode);
3215 }
3216 if (sync_status < 0) {
11fdf7f2 3217 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
3218 return set_cr_error(sync_status);
3219 }
7c673cae
FG
3220 return set_cr_done();
3221 }
3222 return 0;
3223}
3224
3225int RGWRunBucketSyncCoroutine::operate()
3226{
3227 reenter(this) {
3228 yield {
3229 set_status("acquiring sync lock");
3230 auto store = sync_env->store;
31f18b77 3231 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 3232 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
3233 "sync_lock",
3234 cct->_conf->rgw_sync_lease_period,
3235 this));
3236 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
3237 }
3238 while (!lease_cr->is_locked()) {
3239 if (lease_cr->is_done()) {
11fdf7f2 3240 tn->log(5, "failed to take lease");
7c673cae 3241 set_status("lease lock failed, early abort");
a8e16298 3242 drain_all();
7c673cae
FG
3243 return set_cr_error(lease_cr->get_ret_status());
3244 }
3245 set_sleeping(true);
3246 yield;
3247 }
3248
11fdf7f2 3249 tn->log(10, "took lease");
7c673cae
FG
3250 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3251 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 3252 tn->log(0, "ERROR: failed to read sync status for bucket");
7c673cae
FG
3253 lease_cr->go_down();
3254 drain_all();
3255 return set_cr_error(retcode);
3256 }
3257
11fdf7f2 3258 tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
7c673cae
FG
3259
3260 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3261 if (retcode == -ENOENT) {
3262 /* bucket instance info has not been synced in yet, fetch it now */
3263 yield {
11fdf7f2 3264 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
7c673cae
FG
3265 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3266
11fdf7f2
TL
3267 meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
3268 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
7c673cae
FG
3269
3270 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3271 string() /* no marker */,
3272 MDLOG_STATUS_COMPLETE,
11fdf7f2
TL
3273 NULL /* no marker tracker */,
3274 tn));
7c673cae
FG
3275 }
3276 if (retcode < 0) {
11fdf7f2 3277 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
7c673cae
FG
3278 lease_cr->go_down();
3279 drain_all();
3280 return set_cr_error(retcode);
3281 }
3282
3283 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3284 }
3285 if (retcode < 0) {
11fdf7f2 3286 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
7c673cae
FG
3287 lease_cr->go_down();
3288 drain_all();
3289 return set_cr_error(retcode);
3290 }
3291
91327a77
AA
3292 do {
3293 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3294 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
a8e16298 3295 if (retcode == -ENOENT) {
11fdf7f2 3296 tn->log(0, "bucket sync disabled");
a8e16298
TL
3297 lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
3298 lease_cr->wakeup();
3299 lease_cr.reset();
3300 drain_all();
3301 return set_cr_done();
3302 }
91327a77 3303 if (retcode < 0) {
11fdf7f2 3304 tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
91327a77
AA
3305 lease_cr->go_down();
3306 drain_all();
3307 return set_cr_error(retcode);
3308 }
7c673cae 3309 }
7c673cae 3310
91327a77
AA
3311 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3312 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3313 status_oid, lease_cr.get(),
11fdf7f2 3314 sync_status, tn));
91327a77 3315 if (retcode < 0) {
11fdf7f2 3316 tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
91327a77
AA
3317 lease_cr->go_down();
3318 drain_all();
3319 return set_cr_error(retcode);
3320 }
7c673cae 3321 }
7c673cae 3322
91327a77
AA
3323 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3324 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3325 status_oid, lease_cr.get(),
11fdf7f2 3326 sync_status, tn));
91327a77 3327 if (retcode < 0) {
11fdf7f2 3328 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
91327a77
AA
3329 lease_cr->go_down();
3330 drain_all();
3331 return set_cr_error(retcode);
3332 }
7c673cae 3333 }
91327a77
AA
3334 // loop back to previous states unless incremental sync returns normally
3335 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
7c673cae
FG
3336
3337 lease_cr->go_down();
3338 drain_all();
3339 return set_cr_done();
3340 }
3341
3342 return 0;
3343}
3344
3345RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3346{
11fdf7f2 3347 return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
7c673cae
FG
3348}
3349
3350int RGWBucketSyncStatusManager::init()
3351{
11fdf7f2 3352 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 3353 if (!conn) {
11fdf7f2 3354 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
3355 return -EINVAL;
3356 }
3357
11fdf7f2 3358 int ret = http_manager.start();
7c673cae 3359 if (ret < 0) {
11fdf7f2 3360 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
3361 return ret;
3362 }
3363
3364
3365 const string key = bucket.get_key();
3366
3367 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3368 { NULL, NULL } };
3369
3370 string path = string("/admin/metadata/bucket.instance");
3371
3372 bucket_instance_meta_info result;
3373 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3374 if (ret < 0) {
11fdf7f2 3375 ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
7c673cae
FG
3376 return ret;
3377 }
3378
3379 RGWBucketInfo& bi = result.data.get_bucket_info();
3380 num_shards = bi.num_shards;
3381
3382 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3383
3384 sync_module.reset(new RGWDefaultSyncModuleInstance());
3385
3386 int effective_num_shards = (num_shards ? num_shards : 1);
3387
3388 auto async_rados = store->get_async_rados();
3389
3390 for (int i = 0; i < effective_num_shards; i++) {
11fdf7f2
TL
3391 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
3392 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
7c673cae 3393 if (ret < 0) {
11fdf7f2 3394 ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
7c673cae
FG
3395 return ret;
3396 }
3397 source_logs[i] = l;
3398 }
3399
3400 return 0;
3401}
3402
3403int RGWBucketSyncStatusManager::init_sync_status()
3404{
3405 list<RGWCoroutinesStack *> stacks;
3406
3407 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3408 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3409 RGWRemoteBucketLog *l = iter->second;
3410 stack->call(l->init_sync_status_cr());
3411
3412 stacks.push_back(stack);
3413 }
3414
3415 return cr_mgr.run(stacks);
3416}
3417
3418int RGWBucketSyncStatusManager::read_sync_status()
3419{
3420 list<RGWCoroutinesStack *> stacks;
3421
3422 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3423 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3424 RGWRemoteBucketLog *l = iter->second;
3425 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3426
3427 stacks.push_back(stack);
3428 }
3429
3430 int ret = cr_mgr.run(stacks);
3431 if (ret < 0) {
11fdf7f2 3432 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3433 << bucket_str{bucket} << dendl;
3434 return ret;
3435 }
3436
3437 return 0;
3438}
3439
3440int RGWBucketSyncStatusManager::run()
3441{
3442 list<RGWCoroutinesStack *> stacks;
3443
3444 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3445 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3446 RGWRemoteBucketLog *l = iter->second;
3447 stack->call(l->run_sync_cr());
3448
3449 stacks.push_back(stack);
3450 }
3451
3452 int ret = cr_mgr.run(stacks);
3453 if (ret < 0) {
11fdf7f2 3454 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3455 << bucket_str{bucket} << dendl;
3456 return ret;
3457 }
3458
3459 return 0;
3460}
3461
11fdf7f2
TL
3462unsigned RGWBucketSyncStatusManager::get_subsys() const
3463{
3464 return dout_subsys;
3465}
3466
3467std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
3468{
3469 auto zone = std::string_view{source_zone};
3470 return out << "bucket sync zone:" << zone.substr(0, 8)
3471 << " bucket:" << bucket.name << ' ';
3472}
3473
7c673cae
FG
3474string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3475 const rgw_bucket_shard& bs)
3476{
3477 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3478}
3479
11fdf7f2
TL
3480string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
3481 const rgw_obj& obj)
3482{
3483 return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
3484 obj.key.name + ":" + obj.key.instance;
3485}
3486
b32b8144
FG
3487class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3488 static constexpr int max_concurrent_shards = 16;
3489 RGWRados *const store;
3490 RGWDataSyncEnv *const env;
3491 const int num_shards;
3492 rgw_bucket_shard bs;
3493
3494 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3495 Vector::iterator i, end;
3496
3497 public:
3498 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3499 int num_shards, const rgw_bucket& bucket,
3500 Vector *status)
3501 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3502 store(store), env(env), num_shards(num_shards),
3503 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3504 i(status->begin()), end(status->end())
3505 {}
3506
3507 bool spawn_next() override {
3508 if (i == end) {
3509 return false;
3510 }
3511 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3512 ++i;
3513 ++bs.shard_id;
3514 return true;
3515 }
3516};
3517
11fdf7f2 3518int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
28e407b8 3519 const RGWBucketInfo& bucket_info,
b32b8144
FG
3520 std::vector<rgw_bucket_shard_sync_info> *status)
3521{
28e407b8 3522 const auto num_shards = bucket_info.num_shards;
b32b8144 3523 status->clear();
28e407b8 3524 status->resize(std::max<size_t>(1, num_shards));
b32b8144
FG
3525
3526 RGWDataSyncEnv env;
3527 RGWSyncModuleInstanceRef module; // null sync module
11fdf7f2 3528 env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
81eedcae 3529 nullptr, nullptr, nullptr, source_zone, module, nullptr);
b32b8144
FG
3530
3531 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
28e407b8
AA
3532 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3533 bucket_info.bucket, status));
b32b8144
FG
3534}
3535
7c673cae
FG
3536
3537// TODO: move into rgw_data_sync_trim.cc
3538#undef dout_prefix
3539#define dout_prefix (*_dout << "data trim: ")
3540
3541namespace {
3542
3543/// return the marker that it's safe to trim up to
3544const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3545{
3546 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3547}
3548
7c673cae
FG
3549/// populate the container starting with 'dest' with the minimum stable marker
3550/// of each shard for all of the peers in [first, last)
3551template <typename IterIn, typename IterOut>
3552void take_min_markers(IterIn first, IterIn last, IterOut dest)
3553{
3554 if (first == last) {
3555 return;
3556 }
eafe8130
TL
3557 for (auto p = first; p != last; ++p) {
3558 auto m = dest;
7c673cae 3559 for (auto &shard : p->sync_markers) {
eafe8130
TL
3560 const auto& stable = get_stable_marker(shard.second);
3561 if (*m > stable) {
3562 *m = stable;
7c673cae
FG
3563 }
3564 ++m;
3565 }
3566 }
3567}
3568
3569} // anonymous namespace
3570
3571class DataLogTrimCR : public RGWCoroutine {
eafe8130 3572 using TrimCR = RGWSyncLogTrimCR;
7c673cae
FG
3573 RGWRados *store;
3574 RGWHTTPManager *http;
3575 const int num_shards;
3576 const std::string& zone_id; //< my zone id
3577 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
eafe8130 3578 std::vector<std::string> min_shard_markers; //< min marker per shard
7c673cae
FG
3579 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3580 int ret{0};
3581
3582 public:
3583 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3584 int num_shards, std::vector<std::string>& last_trim)
3585 : RGWCoroutine(store->ctx()), store(store), http(http),
3586 num_shards(num_shards),
11fdf7f2 3587 zone_id(store->svc.zone->get_zone().id),
81eedcae 3588 peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
eafe8130 3589 min_shard_markers(num_shards, TrimCR::max_marker),
7c673cae
FG
3590 last_trim(last_trim)
3591 {}
3592
3593 int operate() override;
3594};
3595
3596int DataLogTrimCR::operate()
3597{
3598 reenter(this) {
3599 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3600 set_status("fetching sync status");
3601 yield {
3602 // query data sync status from each sync peer
3603 rgw_http_param_pair params[] = {
3604 { "type", "data" },
3605 { "status", nullptr },
3606 { "source-zone", zone_id.c_str() },
3607 { nullptr, nullptr }
3608 };
3609
3610 auto p = peer_status.begin();
81eedcae 3611 for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
7c673cae
FG
3612 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3613 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3614 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3615 false);
3616 ++p;
3617 }
3618 }
3619
3620 // must get a successful reply from all peers to consider trimming
3621 ret = 0;
3622 while (ret == 0 && num_spawned() > 0) {
3623 yield wait_for_child();
3624 collect_next(&ret);
3625 }
3626 drain_all();
3627
3628 if (ret < 0) {
3629 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3630 return set_cr_error(ret);
3631 }
3632
3633 ldout(cct, 10) << "trimming log shards" << dendl;
3634 set_status("trimming log shards");
3635 yield {
3636 // determine the minimum marker for each shard
3637 take_min_markers(peer_status.begin(), peer_status.end(),
3638 min_shard_markers.begin());
3639
3640 for (int i = 0; i < num_shards; i++) {
3641 const auto& m = min_shard_markers[i];
eafe8130 3642 if (m <= last_trim[i]) {
7c673cae
FG
3643 continue;
3644 }
3645 ldout(cct, 10) << "trimming log shard " << i
eafe8130 3646 << " at marker=" << m
7c673cae 3647 << " last_trim=" << last_trim[i] << dendl;
7c673cae 3648 spawn(new TrimCR(store, store->data_log->get_oid(i),
eafe8130 3649 m, &last_trim[i]),
7c673cae
FG
3650 true);
3651 }
3652 }
3653 return set_cr_done();
3654 }
3655 return 0;
3656}
3657
a8e16298
TL
3658RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
3659 RGWHTTPManager *http,
3660 int num_shards,
3661 std::vector<std::string>& markers)
3662{
3663 return new DataLogTrimCR(store, http, num_shards, markers);
3664}
3665
7c673cae
FG
3666class DataLogTrimPollCR : public RGWCoroutine {
3667 RGWRados *store;
3668 RGWHTTPManager *http;
3669 const int num_shards;
3670 const utime_t interval; //< polling interval
3671 const std::string lock_oid; //< use first data log shard for lock
3672 const std::string lock_cookie;
3673 std::vector<std::string> last_trim; //< last trimmed marker per shard
3674
3675 public:
3676 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3677 int num_shards, utime_t interval)
3678 : RGWCoroutine(store->ctx()), store(store), http(http),
3679 num_shards(num_shards), interval(interval),
3680 lock_oid(store->data_log->get_oid(0)),
3681 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3682 last_trim(num_shards)
3683 {}
3684
3685 int operate() override;
3686};
3687
3688int DataLogTrimPollCR::operate()
3689{
3690 reenter(this) {
3691 for (;;) {
3692 set_status("sleeping");
3693 wait(interval);
3694
3695 // request a 'data_trim' lock that covers the entire wait interval to
3696 // prevent other gateways from attempting to trim for the duration
3697 set_status("acquiring trim lock");
3698 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
11fdf7f2 3699 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
7c673cae
FG
3700 "data_trim", lock_cookie,
3701 interval.sec()));
3702 if (retcode < 0) {
3703 // if the lock is already held, go back to sleep and try again later
3704 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3705 << interval.sec() << "s" << dendl;
3706 continue;
3707 }
3708
3709 set_status("trimming");
3710 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3711
3712 // note that the lock is not released. this is intentional, as it avoids
3713 // duplicating this work in other gateways
3714 }
3715 }
3716 return 0;
3717}
3718
3719RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3720 RGWHTTPManager *http,
3721 int num_shards, utime_t interval)
3722{
3723 return new DataLogTrimPollCR(store, http, num_shards, interval);
3724}