]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
CommitLineData
91327a77
AA
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include <boost/utility/string_ref.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/errno.h"
12
13#include "rgw_common.h"
14#include "rgw_rados.h"
11fdf7f2 15#include "rgw_zone.h"
7c673cae
FG
16#include "rgw_sync.h"
17#include "rgw_data_sync.h"
18#include "rgw_rest_conn.h"
19#include "rgw_cr_rados.h"
20#include "rgw_cr_rest.h"
21#include "rgw_http_client.h"
22#include "rgw_bucket.h"
23#include "rgw_metadata.h"
81eedcae 24#include "rgw_sync_counters.h"
7c673cae 25#include "rgw_sync_module.h"
b32b8144 26#include "rgw_sync_log_trim.h"
7c673cae
FG
27
28#include "cls/lock/cls_lock_client.h"
29
11fdf7f2
TL
30#include "services/svc_zone.h"
31#include "services/svc_sync_modules.h"
32
33#include "include/random.h"
31f18b77
FG
34
35#include <boost/asio/yield.hpp>
36
7c673cae
FG
37#define dout_subsys ceph_subsys_rgw
38
39#undef dout_prefix
40#define dout_prefix (*_dout << "data sync: ")
41
42static string datalog_sync_status_oid_prefix = "datalog.sync-status";
43static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
44static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
45static string bucket_status_oid_prefix = "bucket.sync-status";
11fdf7f2 46static string object_status_oid_prefix = "bucket.sync-status";
7c673cae 47
7c673cae
FG
48
49void rgw_datalog_info::decode_json(JSONObj *obj) {
50 JSONDecoder::decode_json("num_objects", num_shards, obj);
51}
52
53void rgw_datalog_entry::decode_json(JSONObj *obj) {
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58}
59
60void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
61 JSONDecoder::decode_json("marker", marker, obj);
62 JSONDecoder::decode_json("truncated", truncated, obj);
63 JSONDecoder::decode_json("entries", entries, obj);
64};
65
66class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
67 static constexpr int MAX_CONCURRENT_SHARDS = 16;
68
69 RGWDataSyncEnv *env;
70 const int num_shards;
71 int shard_id{0};;
72
73 map<uint32_t, rgw_data_sync_marker>& markers;
74
75 public:
76 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
77 map<uint32_t, rgw_data_sync_marker>& markers)
78 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
79 env(env), num_shards(num_shards), markers(markers)
80 {}
81 bool spawn_next() override;
82};
83
84bool RGWReadDataSyncStatusMarkersCR::spawn_next()
85{
86 if (shard_id >= num_shards) {
87 return false;
88 }
89 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
90 spawn(new CR(env->async_rados, env->store->svc.sysobj,
91 rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
7c673cae
FG
92 &markers[shard_id]),
93 false);
94 shard_id++;
95 return true;
96}
97
28e407b8
AA
98class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
99 static constexpr int MAX_CONCURRENT_SHARDS = 16;
100
101 RGWDataSyncEnv *env;
102
103 uint64_t max_entries;
104 int num_shards;
11fdf7f2 105 int shard_id{0};
28e407b8
AA
106
107 string marker;
11fdf7f2 108 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
28e407b8
AA
109
110 public:
111 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
11fdf7f2 112 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
28e407b8 113 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
11fdf7f2 114 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
28e407b8
AA
115 {}
116 bool spawn_next() override;
117};
118
119bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
120{
11fdf7f2 121 if (shard_id >= num_shards)
28e407b8
AA
122 return false;
123
124 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
11fdf7f2
TL
125 auto& shard_keys = omapkeys[shard_id];
126 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
127 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
128 marker, max_entries, shard_keys), false);
28e407b8
AA
129
130 ++shard_id;
131 return true;
132}
133
7c673cae
FG
134class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
135 RGWDataSyncEnv *sync_env;
136 rgw_data_sync_status *sync_status;
137
138public:
139 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
140 rgw_data_sync_status *_status)
141 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
142 {}
143 int operate() override;
144};
145
146int RGWReadDataSyncStatusCoroutine::operate()
147{
148 reenter(this) {
149 // read sync info
150 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
151 yield {
152 bool empty_on_enoent = false; // fail on ENOENT
11fdf7f2
TL
153 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
154 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
155 &sync_status->sync_info, empty_on_enoent));
156 }
157 if (retcode < 0) {
158 ldout(sync_env->cct, 4) << "failed to read sync status info with "
159 << cpp_strerror(retcode) << dendl;
160 return set_cr_error(retcode);
161 }
162 // read shard markers
163 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
164 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
165 sync_status->sync_markers));
166 if (retcode < 0) {
167 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
168 << cpp_strerror(retcode) << dendl;
169 return set_cr_error(retcode);
170 }
171 return set_cr_done();
172 }
173 return 0;
174}
175
176class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
177 RGWDataSyncEnv *sync_env;
178
179 RGWRESTReadResource *http_op;
180
181 int shard_id;
182 RGWDataChangesLogInfo *shard_info;
183
184public:
185 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
186 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
187 sync_env(_sync_env),
188 http_op(NULL),
189 shard_id(_shard_id),
190 shard_info(_shard_info) {
191 }
192
193 ~RGWReadRemoteDataLogShardInfoCR() override {
194 if (http_op) {
195 http_op->put();
196 }
197 }
198
199 int operate() override {
200 reenter(this) {
201 yield {
202 char buf[16];
203 snprintf(buf, sizeof(buf), "%d", shard_id);
204 rgw_http_param_pair pairs[] = { { "type" , "data" },
205 { "id", buf },
206 { "info" , NULL },
207 { NULL, NULL } };
208
209 string p = "/admin/log/";
210
211 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
212
11fdf7f2 213 init_new_io(http_op);
7c673cae
FG
214
215 int ret = http_op->aio_read();
216 if (ret < 0) {
217 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
218 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
219 return set_cr_error(ret);
220 }
221
222 return io_block(0);
223 }
224 yield {
225 int ret = http_op->wait(shard_info);
226 if (ret < 0) {
227 return set_cr_error(ret);
228 }
229 return set_cr_done();
230 }
231 }
232 return 0;
233 }
234};
235
236struct read_remote_data_log_response {
237 string marker;
238 bool truncated;
239 list<rgw_data_change_log_entry> entries;
240
241 read_remote_data_log_response() : truncated(false) {}
242
243 void decode_json(JSONObj *obj) {
244 JSONDecoder::decode_json("marker", marker, obj);
245 JSONDecoder::decode_json("truncated", truncated, obj);
246 JSONDecoder::decode_json("entries", entries, obj);
247 };
248};
249
250class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
251 RGWDataSyncEnv *sync_env;
252
a8e16298 253 RGWRESTReadResource *http_op = nullptr;
7c673cae
FG
254
255 int shard_id;
a8e16298
TL
256 const std::string& marker;
257 string *pnext_marker;
7c673cae
FG
258 list<rgw_data_change_log_entry> *entries;
259 bool *truncated;
260
261 read_remote_data_log_response response;
81eedcae 262 std::optional<PerfGuard> timer;
7c673cae
FG
263
264public:
a8e16298
TL
265 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
266 const std::string& marker, string *pnext_marker,
267 list<rgw_data_change_log_entry> *_entries,
268 bool *_truncated)
269 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
270 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
271 entries(_entries), truncated(_truncated) {
7c673cae
FG
272 }
273 ~RGWReadRemoteDataLogShardCR() override {
274 if (http_op) {
275 http_op->put();
276 }
277 }
278
279 int operate() override {
280 reenter(this) {
281 yield {
282 char buf[16];
283 snprintf(buf, sizeof(buf), "%d", shard_id);
284 rgw_http_param_pair pairs[] = { { "type" , "data" },
285 { "id", buf },
a8e16298 286 { "marker", marker.c_str() },
7c673cae
FG
287 { "extra-info", "true" },
288 { NULL, NULL } };
289
290 string p = "/admin/log/";
291
292 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
293
11fdf7f2 294 init_new_io(http_op);
7c673cae 295
81eedcae
TL
296 if (sync_env->counters) {
297 timer.emplace(sync_env->counters, sync_counters::l_poll);
298 }
7c673cae
FG
299 int ret = http_op->aio_read();
300 if (ret < 0) {
301 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
302 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
81eedcae
TL
303 if (sync_env->counters) {
304 sync_env->counters->inc(sync_counters::l_poll_err);
305 }
7c673cae
FG
306 return set_cr_error(ret);
307 }
308
309 return io_block(0);
310 }
311 yield {
81eedcae 312 timer.reset();
7c673cae
FG
313 int ret = http_op->wait(&response);
314 if (ret < 0) {
81eedcae
TL
315 if (sync_env->counters && ret != -ENOENT) {
316 sync_env->counters->inc(sync_counters::l_poll_err);
317 }
7c673cae
FG
318 return set_cr_error(ret);
319 }
320 entries->clear();
321 entries->swap(response.entries);
a8e16298 322 *pnext_marker = response.marker;
7c673cae
FG
323 *truncated = response.truncated;
324 return set_cr_done();
325 }
326 }
327 return 0;
328 }
329};
330
331class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
332 RGWDataSyncEnv *sync_env;
333
334 int num_shards;
335 map<int, RGWDataChangesLogInfo> *datalog_info;
336
337 int shard_id;
338#define READ_DATALOG_MAX_CONCURRENT 10
339
340public:
341 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
342 int _num_shards,
343 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
344 sync_env(_sync_env), num_shards(_num_shards),
345 datalog_info(_datalog_info), shard_id(0) {}
346 bool spawn_next() override;
347};
348
349bool RGWReadRemoteDataLogInfoCR::spawn_next() {
350 if (shard_id >= num_shards) {
351 return false;
352 }
353 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
354 shard_id++;
355 return true;
356}
357
358class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
359 RGWDataSyncEnv *sync_env;
360 RGWRESTReadResource *http_op;
361
362 int shard_id;
363 string marker;
364 uint32_t max_entries;
365 rgw_datalog_shard_data *result;
366
367public:
368 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
369 const string& _marker, uint32_t _max_entries,
370 rgw_datalog_shard_data *_result)
371 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
372 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
373
374 int send_request() override {
375 RGWRESTConn *conn = sync_env->conn;
376 RGWRados *store = sync_env->store;
377
378 char buf[32];
379 snprintf(buf, sizeof(buf), "%d", shard_id);
380
381 char max_entries_buf[32];
382 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
383
384 const char *marker_key = (marker.empty() ? "" : "marker");
385
386 rgw_http_param_pair pairs[] = { { "type", "data" },
387 { "id", buf },
388 { "max-entries", max_entries_buf },
389 { marker_key, marker.c_str() },
390 { NULL, NULL } };
391
392 string p = "/admin/log/";
393
394 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 395 init_new_io(http_op);
7c673cae
FG
396
397 int ret = http_op->aio_read();
398 if (ret < 0) {
399 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
400 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
401 http_op->put();
402 return ret;
403 }
404
405 return 0;
406 }
407
408 int request_complete() override {
409 int ret = http_op->wait(result);
410 http_op->put();
411 if (ret < 0 && ret != -ENOENT) {
412 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
413 return ret;
414 }
415 return 0;
416 }
417};
418
419class RGWListRemoteDataLogCR : public RGWShardCollectCR {
420 RGWDataSyncEnv *sync_env;
421
422 map<int, string> shards;
423 int max_entries_per_shard;
424 map<int, rgw_datalog_shard_data> *result;
425
426 map<int, string>::iterator iter;
427#define READ_DATALOG_MAX_CONCURRENT 10
428
429public:
430 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
431 map<int, string>& _shards,
432 int _max_entries_per_shard,
433 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
434 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
435 result(_result) {
436 shards.swap(_shards);
437 iter = shards.begin();
438 }
439 bool spawn_next() override;
440};
441
442bool RGWListRemoteDataLogCR::spawn_next() {
443 if (iter == shards.end()) {
444 return false;
445 }
446
447 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
448 ++iter;
449 return true;
450}
451
452class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
453 static constexpr uint32_t lock_duration = 30;
454 RGWDataSyncEnv *sync_env;
455 RGWRados *store;
456 const rgw_pool& pool;
457 const uint32_t num_shards;
458
459 string sync_status_oid;
460
461 string lock_name;
462 string cookie;
463 rgw_data_sync_status *status;
464 map<int, RGWDataChangesLogInfo> shards_info;
11fdf7f2
TL
465
466 RGWSyncTraceNodeRef tn;
7c673cae
FG
467public:
468 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
31f18b77 469 uint64_t instance_id,
11fdf7f2 470 RGWSyncTraceNodeRef& _tn_parent,
7c673cae
FG
471 rgw_data_sync_status *status)
472 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
11fdf7f2
TL
473 pool(store->svc.zone->get_zone_params().log_pool),
474 num_shards(num_shards), status(status),
475 tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
7c673cae
FG
476 lock_name = "sync_lock";
477
31f18b77
FG
478 status->sync_info.instance_id = instance_id;
479
7c673cae
FG
480#define COOKIE_LEN 16
481 char buf[COOKIE_LEN + 1];
482
483 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
484 cookie = buf;
485
486 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
11fdf7f2 487
7c673cae
FG
488 }
489
490 int operate() override {
491 int ret;
492 reenter(this) {
493 using LockCR = RGWSimpleRadosLockCR;
494 yield call(new LockCR(sync_env->async_rados, store,
495 rgw_raw_obj{pool, sync_status_oid},
496 lock_name, cookie, lock_duration));
497 if (retcode < 0) {
11fdf7f2 498 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
499 return set_cr_error(retcode);
500 }
501 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
11fdf7f2 502 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
503 rgw_raw_obj{pool, sync_status_oid},
504 status->sync_info));
505 if (retcode < 0) {
11fdf7f2 506 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
507 return set_cr_error(retcode);
508 }
509
510 /* take lock again, we just recreated the object */
511 yield call(new LockCR(sync_env->async_rados, store,
512 rgw_raw_obj{pool, sync_status_oid},
513 lock_name, cookie, lock_duration));
514 if (retcode < 0) {
11fdf7f2 515 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
516 return set_cr_error(retcode);
517 }
518
11fdf7f2
TL
519 tn->log(10, "took lease");
520
7c673cae
FG
521 /* fetch current position in logs */
522 yield {
11fdf7f2 523 RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
7c673cae 524 if (!conn) {
11fdf7f2 525 tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
7c673cae
FG
526 return set_cr_error(-EIO);
527 }
528 for (uint32_t i = 0; i < num_shards; i++) {
529 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
530 }
531 }
532 while (collect(&ret, NULL)) {
533 if (ret < 0) {
11fdf7f2 534 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
7c673cae
FG
535 return set_state(RGWCoroutine_Error);
536 }
537 yield;
538 }
539 yield {
540 for (uint32_t i = 0; i < num_shards; i++) {
541 RGWDataChangesLogInfo& info = shards_info[i];
542 auto& marker = status->sync_markers[i];
543 marker.next_step_marker = info.marker;
544 marker.timestamp = info.last_update;
545 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
546 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
11fdf7f2 547 spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
548 rgw_raw_obj{pool, oid}, marker), true);
549 }
550 }
551 while (collect(&ret, NULL)) {
552 if (ret < 0) {
11fdf7f2 553 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
7c673cae
FG
554 return set_state(RGWCoroutine_Error);
555 }
556 yield;
557 }
558
559 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
11fdf7f2 560 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
561 rgw_raw_obj{pool, sync_status_oid},
562 status->sync_info));
563 if (retcode < 0) {
11fdf7f2 564 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
565 return set_cr_error(retcode);
566 }
567 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
568 rgw_raw_obj{pool, sync_status_oid},
569 lock_name, cookie));
570 return set_cr_done();
571 }
572 return 0;
573 }
574};
575
576int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
577{
578 rgw_http_param_pair pairs[] = { { "type", "data" },
579 { NULL, NULL } };
580
581 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
582 if (ret < 0) {
11fdf7f2 583 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
7c673cae
FG
584 return ret;
585 }
586
11fdf7f2 587 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
588
589 return 0;
590}
591
592int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
593{
594 rgw_datalog_info log_info;
595 int ret = read_log_info(&log_info);
596 if (ret < 0) {
597 return ret;
598 }
599
600 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
601}
602
603int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
604{
7c673cae
FG
605 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
606}
607
11fdf7f2 608int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
81eedcae
TL
609 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
610 PerfCounters* counters)
7c673cae 611{
11fdf7f2 612 sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
81eedcae 613 _sync_tracer, _source_zone, _sync_module, counters);
7c673cae
FG
614
615 if (initialized) {
616 return 0;
617 }
618
11fdf7f2 619 int ret = http_manager.start();
7c673cae 620 if (ret < 0) {
11fdf7f2 621 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
622 return ret;
623 }
624
11fdf7f2
TL
625 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
626
7c673cae
FG
627 initialized = true;
628
629 return 0;
630}
631
632void RGWRemoteDataLog::finish()
633{
634 stop();
635}
636
7c673cae
FG
637int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
638{
639 // cannot run concurrently with run_sync(), so run in a separate manager
640 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
641 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 642 int ret = http_manager.start();
7c673cae 643 if (ret < 0) {
11fdf7f2 644 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
645 return ret;
646 }
647 RGWDataSyncEnv sync_env_local = sync_env;
648 sync_env_local.http_manager = &http_manager;
649 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
650 http_manager.stop();
651 return ret;
652}
653
28e407b8
AA
654int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
655{
656 // cannot run concurrently with run_sync(), so run in a separate manager
657 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
658 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 659 int ret = http_manager.start();
28e407b8 660 if (ret < 0) {
11fdf7f2 661 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
662 return ret;
663 }
664 RGWDataSyncEnv sync_env_local = sync_env;
665 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
666 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
667 omapkeys.resize(num_shards);
28e407b8 668 uint64_t max_entries{1};
11fdf7f2 669 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
28e407b8
AA
670 http_manager.stop();
671
672 if (ret == 0) {
11fdf7f2
TL
673 for (int i = 0; i < num_shards; i++) {
674 if (omapkeys[i]->entries.size() != 0) {
675 recovering_shards.insert(i);
28e407b8
AA
676 }
677 }
678 }
679
680 return ret;
681}
682
7c673cae
FG
683int RGWRemoteDataLog::init_sync_status(int num_shards)
684{
685 rgw_data_sync_status sync_status;
b32b8144
FG
686 sync_status.sync_info.num_shards = num_shards;
687
7c673cae
FG
688 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
689 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 690 int ret = http_manager.start();
7c673cae 691 if (ret < 0) {
11fdf7f2 692 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
693 return ret;
694 }
695 RGWDataSyncEnv sync_env_local = sync_env;
696 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
697 auto instance_id = ceph::util::generate_random_number<uint64_t>();
698 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
7c673cae
FG
699 http_manager.stop();
700 return ret;
701}
702
703static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
704{
705 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
706 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
707 return string(buf);
708}
709
710struct bucket_instance_meta_info {
711 string key;
712 obj_version ver;
713 utime_t mtime;
714 RGWBucketInstanceMetadataObject data;
715
716 bucket_instance_meta_info() {}
717
718 void decode_json(JSONObj *obj) {
719 JSONDecoder::decode_json("key", key, obj);
720 JSONDecoder::decode_json("ver", ver, obj);
721 JSONDecoder::decode_json("mtime", mtime, obj);
722 JSONDecoder::decode_json("data", data, obj);
723 }
724};
725
726class RGWListBucketIndexesCR : public RGWCoroutine {
727 RGWDataSyncEnv *sync_env;
728
729 RGWRados *store;
730
731 rgw_data_sync_status *sync_status;
732 int num_shards;
733
734 int req_ret;
735 int ret;
736
737 list<string> result;
738 list<string>::iterator iter;
739
740 RGWShardedOmapCRManager *entries_index;
741
742 string oid_prefix;
743
744 string path;
745 bucket_instance_meta_info meta_info;
746 string key;
747 string s;
748 int i;
749
750 bool failed;
751
752public:
753 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
754 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
755 store(sync_env->store), sync_status(_sync_status),
756 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
757 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
758 path = "/admin/metadata/bucket.instance";
759 num_shards = sync_status->sync_info.num_shards;
760 }
761 ~RGWListBucketIndexesCR() override {
762 delete entries_index;
763 }
764
765 int operate() override {
766 reenter(this) {
7c673cae
FG
767 yield {
768 string entrypoint = string("/admin/metadata/bucket.instance");
769 /* FIXME: need a better scaling solution here, requires streaming output */
770 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
771 entrypoint, NULL, &result));
772 }
3efd9988 773 if (retcode < 0) {
11fdf7f2 774 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
3efd9988 775 return set_cr_error(retcode);
7c673cae 776 }
3efd9988 777 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
11fdf7f2 778 store->svc.zone->get_zone_params().log_pool,
3efd9988
FG
779 oid_prefix);
780 yield; // yield so OmapAppendCRs can start
7c673cae 781 for (iter = result.begin(); iter != result.end(); ++iter) {
11fdf7f2 782 ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
7c673cae
FG
783
784 key = *iter;
785
786 yield {
787 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
788 { NULL, NULL } };
789
790 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
791 }
792
793 num_shards = meta_info.data.get_bucket_info().num_shards;
794 if (num_shards > 0) {
795 for (i = 0; i < num_shards; i++) {
796 char buf[16];
797 snprintf(buf, sizeof(buf), ":%d", i);
798 s = key + buf;
799 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
800 }
801 } else {
802 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
803 }
804 }
805 yield {
806 if (!entries_index->finish()) {
807 failed = true;
808 }
809 }
810 if (!failed) {
811 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
812 int shard_id = (int)iter->first;
813 rgw_data_sync_marker& marker = iter->second;
814 marker.total_entries = entries_index->get_total_entries(shard_id);
11fdf7f2
TL
815 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
816 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
7c673cae
FG
817 marker), true);
818 }
819 } else {
820 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
821 EIO, string("failed to build bucket instances map")));
822 }
823 while (collect(&ret, NULL)) {
824 if (ret < 0) {
825 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
826 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
827 req_ret = ret;
828 }
829 yield;
830 }
831 drain_all();
832 if (req_ret < 0) {
833 yield return set_cr_error(req_ret);
834 }
835 yield return set_cr_done();
836 }
837 return 0;
838 }
839};
840
841#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
842
843class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
844 RGWDataSyncEnv *sync_env;
845
846 string marker_oid;
847 rgw_data_sync_marker sync_marker;
848
849 map<string, string> key_to_marker;
850 map<string, string> marker_to_key;
851
852 void handle_finish(const string& marker) override {
853 map<string, string>::iterator iter = marker_to_key.find(marker);
854 if (iter == marker_to_key.end()) {
855 return;
856 }
857 key_to_marker.erase(iter->second);
858 reset_need_retry(iter->second);
859 marker_to_key.erase(iter);
860 }
861
11fdf7f2
TL
862 RGWSyncTraceNodeRef tn;
863
7c673cae
FG
864public:
865 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
866 const string& _marker_oid,
11fdf7f2
TL
867 const rgw_data_sync_marker& _marker,
868 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
869 sync_env(_sync_env),
870 marker_oid(_marker_oid),
11fdf7f2
TL
871 sync_marker(_marker),
872 tn(_tn) {}
7c673cae
FG
873
874 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
875 sync_marker.marker = new_marker;
876 sync_marker.pos = index_pos;
877
11fdf7f2 878 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae
FG
879 RGWRados *store = sync_env->store;
880
11fdf7f2
TL
881 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
882 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
883 sync_marker);
884 }
885
886 /*
887 * create index from key -> marker, and from marker -> key
888 * this is useful so that we can insure that we only have one
889 * entry for any key that is used. This is needed when doing
890 * incremenatl sync of data, and we don't want to run multiple
891 * concurrent sync operations for the same bucket shard
892 */
893 bool index_key_to_marker(const string& key, const string& marker) {
894 if (key_to_marker.find(key) != key_to_marker.end()) {
895 set_need_retry(key);
896 return false;
897 }
898 key_to_marker[key] = marker;
899 marker_to_key[marker] = key;
900 return true;
901 }
91327a77 902
11fdf7f2 903 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
904 return new RGWLastCallerWinsCR(sync_env->cct);
905 }
7c673cae
FG
906};
907
908// ostream wrappers to print buckets without copying strings
909struct bucket_str {
910 const rgw_bucket& b;
11fdf7f2 911 explicit bucket_str(const rgw_bucket& b) : b(b) {}
7c673cae
FG
912};
913std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
914 auto& b = rhs.b;
915 if (!b.tenant.empty()) {
916 out << b.tenant << '/';
917 }
918 out << b.name;
919 if (!b.bucket_id.empty()) {
920 out << ':' << b.bucket_id;
921 }
922 return out;
923}
924
11fdf7f2
TL
925struct bucket_str_noinstance {
926 const rgw_bucket& b;
927 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
928};
929std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
930 auto& b = rhs.b;
931 if (!b.tenant.empty()) {
932 out << b.tenant << '/';
933 }
934 out << b.name;
935 return out;
936}
937
7c673cae
FG
938struct bucket_shard_str {
939 const rgw_bucket_shard& bs;
11fdf7f2 940 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
7c673cae
FG
941};
942std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
943 auto& bs = rhs.bs;
944 out << bucket_str{bs.bucket};
945 if (bs.shard_id >= 0) {
946 out << ':' << bs.shard_id;
947 }
948 return out;
949}
950
951class RGWRunBucketSyncCoroutine : public RGWCoroutine {
952 RGWDataSyncEnv *sync_env;
953 rgw_bucket_shard bs;
954 RGWBucketInfo bucket_info;
955 rgw_bucket_shard_sync_info sync_status;
956 RGWMetaSyncEnv meta_sync_env;
957
7c673cae
FG
958 const std::string status_oid;
959
960 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
961 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
962
11fdf7f2
TL
963 RGWSyncTraceNodeRef tn;
964
7c673cae 965public:
11fdf7f2 966 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
7c673cae 967 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
11fdf7f2
TL
968 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
969 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
970 SSTR(bucket_shard_str{bs}))) {
7c673cae
FG
971 }
972 ~RGWRunBucketSyncCoroutine() override {
973 if (lease_cr) {
974 lease_cr->abort();
975 }
976 }
977
978 int operate() override;
979};
980
981class RGWDataSyncSingleEntryCR : public RGWCoroutine {
982 RGWDataSyncEnv *sync_env;
983
984 string raw_key;
985 string entry_marker;
986
987 rgw_bucket_shard bs;
988
989 int sync_status;
990
991 bufferlist md_bl;
992
993 RGWDataSyncShardMarkerTrack *marker_tracker;
994
995 boost::intrusive_ptr<RGWOmapAppend> error_repo;
996 bool remove_from_repo;
997
998 set<string> keys;
999
11fdf7f2 1000 RGWSyncTraceNodeRef tn;
7c673cae
FG
1001public:
1002 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1003 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
11fdf7f2 1004 RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1005 sync_env(_sync_env),
1006 raw_key(_raw_key), entry_marker(_entry_marker),
1007 sync_status(0),
1008 marker_tracker(_marker_tracker),
1009 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1010 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
11fdf7f2 1011 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
7c673cae
FG
1012 }
1013
1014 int operate() override {
1015 reenter(this) {
1016 do {
1017 yield {
1018 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1019 &bs.bucket, &bs.shard_id);
1020 if (ret < 0) {
1021 return set_cr_error(-EIO);
1022 }
1023 if (marker_tracker) {
1024 marker_tracker->reset_need_retry(raw_key);
1025 }
11fdf7f2
TL
1026 tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
1027 call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
7c673cae
FG
1028 }
1029 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1030
1031 sync_status = retcode;
1032
1033 if (sync_status == -ENOENT) {
1034 // this was added when 'tenant/' was added to datalog entries, because
1035 // preexisting tenant buckets could never sync and would stay in the
1036 // error_repo forever
11fdf7f2 1037 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
7c673cae
FG
1038 sync_status = 0;
1039 }
1040
1041 if (sync_status < 0) {
b32b8144
FG
1042 // write actual sync failures for 'radosgw-admin sync error list'
1043 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1044 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1045 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1046 if (retcode < 0) {
11fdf7f2 1047 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
b32b8144 1048 }
7c673cae
FG
1049 }
1050 if (error_repo && !error_repo->append(raw_key)) {
11fdf7f2 1051 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
7c673cae
FG
1052 }
1053 } else if (error_repo && remove_from_repo) {
1054 keys = {raw_key};
1055 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1056 if (retcode < 0) {
11fdf7f2
TL
1057 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1058 << error_repo->get_obj() << " retcode=" << retcode));
7c673cae
FG
1059 }
1060 }
1061 /* FIXME: what do do in case of error */
1062 if (marker_tracker && !entry_marker.empty()) {
1063 /* update marker */
1064 yield call(marker_tracker->finish(entry_marker));
1065 }
1066 if (sync_status == 0) {
1067 sync_status = retcode;
1068 }
1069 if (sync_status < 0) {
1070 return set_cr_error(sync_status);
1071 }
1072 return set_cr_done();
1073 }
1074 return 0;
1075 }
1076};
1077
1078#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1079#define DATA_SYNC_MAX_ERR_ENTRIES 10
1080
1081class RGWDataSyncShardCR : public RGWCoroutine {
1082 RGWDataSyncEnv *sync_env;
1083
1084 rgw_pool pool;
1085
1086 uint32_t shard_id;
1087 rgw_data_sync_marker sync_marker;
1088
11fdf7f2 1089 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1090 std::set<std::string> entries;
1091 std::set<std::string>::iterator iter;
7c673cae
FG
1092
1093 string oid;
1094
1095 RGWDataSyncShardMarkerTrack *marker_tracker;
1096
a8e16298 1097 std::string next_marker;
7c673cae
FG
1098 list<rgw_data_change_log_entry> log_entries;
1099 list<rgw_data_change_log_entry>::iterator log_iter;
1100 bool truncated;
1101
7c673cae
FG
1102 Mutex inc_lock;
1103 Cond inc_cond;
1104
1105 boost::asio::coroutine incremental_cr;
1106 boost::asio::coroutine full_cr;
1107
1108
1109 set<string> modified_shards;
1110 set<string> current_modified;
1111
1112 set<string>::iterator modified_iter;
1113
1114 int total_entries;
1115
1116 int spawn_window;
1117
1118 bool *reset_backoff;
1119
1120 set<string> spawned_keys;
1121
31f18b77
FG
1122 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1123 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
1124 string status_oid;
1125
1126
1127 string error_oid;
1128 RGWOmapAppend *error_repo;
28e407b8 1129 std::set<std::string> error_entries;
7c673cae
FG
1130 string error_marker;
1131 int max_error_entries;
1132
91327a77 1133 ceph::coarse_real_time error_retry_time;
7c673cae
FG
1134
1135#define RETRY_BACKOFF_SECS_MIN 60
1136#define RETRY_BACKOFF_SECS_DEFAULT 60
1137#define RETRY_BACKOFF_SECS_MAX 600
1138 uint32_t retry_backoff_secs;
1139
11fdf7f2 1140 RGWSyncTraceNodeRef tn;
7c673cae
FG
1141public:
1142 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1143 rgw_pool& _pool,
11fdf7f2
TL
1144 uint32_t _shard_id, const rgw_data_sync_marker& _marker,
1145 RGWSyncTraceNodeRef& _tn,
1146 bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1147 sync_env(_sync_env),
1148 pool(_pool),
1149 shard_id(_shard_id),
1150 sync_marker(_marker),
91327a77 1151 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
7c673cae
FG
1152 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1153 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
11fdf7f2 1154 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
7c673cae
FG
1155 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1156 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1157 error_oid = status_oid + ".retry";
7c673cae
FG
1158 }
1159
1160 ~RGWDataSyncShardCR() override {
1161 delete marker_tracker;
1162 if (lease_cr) {
1163 lease_cr->abort();
7c673cae
FG
1164 }
1165 if (error_repo) {
1166 error_repo->put();
1167 }
1168 }
1169
1170 void append_modified_shards(set<string>& keys) {
1171 Mutex::Locker l(inc_lock);
1172 modified_shards.insert(keys.begin(), keys.end());
1173 }
1174
1175 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1176 delete marker_tracker;
1177 marker_tracker = mt;
1178 }
1179
1180 int operate() override {
1181 int r;
1182 while (true) {
1183 switch (sync_marker.state) {
1184 case rgw_data_sync_marker::FullSync:
1185 r = full_sync();
1186 if (r < 0) {
11fdf7f2
TL
1187 if (r != -EBUSY) {
1188 tn->log(10, SSTR("full sync failed (r=" << r << ")"));
1189 }
7c673cae
FG
1190 return set_cr_error(r);
1191 }
1192 return 0;
1193 case rgw_data_sync_marker::IncrementalSync:
1194 r = incremental_sync();
1195 if (r < 0) {
11fdf7f2
TL
1196 if (r != -EBUSY) {
1197 tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
1198 }
7c673cae
FG
1199 return set_cr_error(r);
1200 }
1201 return 0;
1202 default:
1203 return set_cr_error(-EIO);
1204 }
1205 }
1206 return 0;
1207 }
1208
1209 void init_lease_cr() {
1210 set_status("acquiring sync lock");
1211 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1212 string lock_name = "sync_lock";
1213 if (lease_cr) {
1214 lease_cr->abort();
7c673cae
FG
1215 }
1216 RGWRados *store = sync_env->store;
31f18b77 1217 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 1218 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
1219 lock_name, lock_duration, this));
1220 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1221 }
1222
1223 int full_sync() {
1224#define OMAP_GET_MAX_ENTRIES 100
1225 int max_entries = OMAP_GET_MAX_ENTRIES;
1226 reenter(&full_cr) {
11fdf7f2 1227 tn->log(10, "start full sync");
7c673cae
FG
1228 yield init_lease_cr();
1229 while (!lease_cr->is_locked()) {
1230 if (lease_cr->is_done()) {
11fdf7f2 1231 tn->log(5, "failed to take lease");
7c673cae 1232 set_status("lease lock failed, early abort");
a8e16298 1233 drain_all();
7c673cae
FG
1234 return set_cr_error(lease_cr->get_ret_status());
1235 }
1236 set_sleeping(true);
1237 yield;
1238 }
11fdf7f2 1239 tn->log(10, "took lease");
7c673cae 1240 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
11fdf7f2 1241 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae
FG
1242 total_entries = sync_marker.pos;
1243 do {
91327a77
AA
1244 if (!lease_cr->is_locked()) {
1245 stop_spawned_services();
1246 drain_all();
1247 return set_cr_error(-ECANCELED);
1248 }
11fdf7f2
TL
1249 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1250 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1251 sync_marker.marker, max_entries, omapkeys));
7c673cae 1252 if (retcode < 0) {
11fdf7f2 1253 tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
7c673cae
FG
1254 lease_cr->go_down();
1255 drain_all();
1256 return set_cr_error(retcode);
1257 }
11fdf7f2
TL
1258 entries = std::move(omapkeys->entries);
1259 if (entries.size() > 0) {
1260 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1261 }
1262 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
7c673cae
FG
1263 iter = entries.begin();
1264 for (; iter != entries.end(); ++iter) {
11fdf7f2 1265 tn->log(20, SSTR("full sync: " << *iter));
7c673cae 1266 total_entries++;
28e407b8 1267 if (!marker_tracker->start(*iter, total_entries, real_time())) {
11fdf7f2 1268 tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
7c673cae
FG
1269 } else {
1270 // fetch remote and write locally
11fdf7f2 1271 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
7c673cae 1272 }
28e407b8 1273 sync_marker.marker = *iter;
91327a77
AA
1274
1275 while ((int)num_spawned() > spawn_window) {
1276 set_status() << "num_spawned() > spawn_window";
1277 yield wait_for_child();
1278 int ret;
1279 while (collect(&ret, lease_stack.get())) {
1280 if (ret < 0) {
11fdf7f2 1281 tn->log(10, "a sync operation returned error");
91327a77
AA
1282 }
1283 }
1284 }
7c673cae 1285 }
11fdf7f2
TL
1286 } while (omapkeys->more);
1287 omapkeys.reset();
7c673cae 1288
91327a77 1289 drain_all_but_stack(lease_stack.get());
7c673cae 1290
11fdf7f2
TL
1291 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1292
7c673cae
FG
1293 yield {
1294 /* update marker to reflect we're done with full sync */
1295 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1296 sync_marker.marker = sync_marker.next_step_marker;
1297 sync_marker.next_step_marker.clear();
1298 RGWRados *store = sync_env->store;
11fdf7f2
TL
1299 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1300 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
1301 sync_marker));
1302 }
1303 if (retcode < 0) {
11fdf7f2 1304 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
7c673cae 1305 lease_cr->go_down();
91327a77 1306 drain_all();
7c673cae
FG
1307 return set_cr_error(retcode);
1308 }
91327a77 1309 // keep lease and transition to incremental_sync()
7c673cae
FG
1310 }
1311 return 0;
1312 }
1313
1314 int incremental_sync() {
1315 reenter(&incremental_cr) {
11fdf7f2 1316 tn->log(10, "start incremental sync");
91327a77 1317 if (lease_cr) {
11fdf7f2 1318 tn->log(10, "lease already held from full sync");
91327a77
AA
1319 } else {
1320 yield init_lease_cr();
1321 while (!lease_cr->is_locked()) {
1322 if (lease_cr->is_done()) {
11fdf7f2 1323 tn->log(5, "failed to take lease");
91327a77 1324 set_status("lease lock failed, early abort");
a8e16298 1325 drain_all();
91327a77
AA
1326 return set_cr_error(lease_cr->get_ret_status());
1327 }
1328 set_sleeping(true);
1329 yield;
7c673cae 1330 }
91327a77 1331 set_status("lease acquired");
11fdf7f2 1332 tn->log(10, "took lease");
7c673cae 1333 }
7c673cae
FG
1334 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1335 rgw_raw_obj(pool, error_oid),
1336 1 /* no buffer */);
1337 error_repo->get();
1338 spawn(error_repo, false);
11fdf7f2 1339 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae 1340 do {
91327a77
AA
1341 if (!lease_cr->is_locked()) {
1342 stop_spawned_services();
1343 drain_all();
1344 return set_cr_error(-ECANCELED);
1345 }
7c673cae
FG
1346 current_modified.clear();
1347 inc_lock.Lock();
1348 current_modified.swap(modified_shards);
1349 inc_lock.Unlock();
1350
11fdf7f2
TL
1351 if (current_modified.size() > 0) {
1352 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1353 }
7c673cae
FG
1354 /* process out of band updates */
1355 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1356 yield {
11fdf7f2
TL
1357 tn->log(20, SSTR("received async update notification: " << *modified_iter));
1358 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
7c673cae
FG
1359 }
1360 }
1361
91327a77
AA
1362 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1363 /* process bucket shards that previously failed */
11fdf7f2 1364 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
91327a77 1365 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
11fdf7f2
TL
1366 error_marker, max_error_entries, omapkeys));
1367 error_entries = std::move(omapkeys->entries);
1368 tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
91327a77
AA
1369 iter = error_entries.begin();
1370 for (; iter != error_entries.end(); ++iter) {
1371 error_marker = *iter;
11fdf7f2
TL
1372 tn->log(20, SSTR("handle error entry: " << error_marker));
1373 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
91327a77 1374 }
11fdf7f2 1375 if (!omapkeys->more) {
91327a77
AA
1376 if (error_marker.empty() && error_entries.empty()) {
1377 /* the retry repo is empty, we back off a bit before calling it again */
1378 retry_backoff_secs *= 2;
1379 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1380 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1381 }
1382 } else {
1383 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
7c673cae 1384 }
91327a77
AA
1385 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1386 error_marker.clear();
7c673cae 1387 }
7c673cae 1388 }
11fdf7f2 1389 omapkeys.reset();
7c673cae 1390
91327a77 1391#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1392 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
91327a77 1393 spawned_keys.clear();
a8e16298
TL
1394 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
1395 &next_marker, &log_entries, &truncated));
7c673cae 1396 if (retcode < 0) {
11fdf7f2 1397 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
7c673cae
FG
1398 stop_spawned_services();
1399 drain_all();
1400 return set_cr_error(retcode);
1401 }
11fdf7f2
TL
1402
1403 if (log_entries.size() > 0) {
1404 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1405 }
1406
91327a77 1407 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
11fdf7f2 1408 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
91327a77 1409 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
11fdf7f2 1410 tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
91327a77
AA
1411 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1412 continue;
7c673cae 1413 }
91327a77 1414 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
11fdf7f2 1415 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
91327a77
AA
1416 } else {
1417 /*
1418 * don't spawn the same key more than once. We can do that as long as we don't yield
1419 */
1420 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1421 spawned_keys.insert(log_iter->entry.key);
11fdf7f2 1422 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
91327a77
AA
1423 if (retcode < 0) {
1424 stop_spawned_services();
1425 drain_all();
1426 return set_cr_error(retcode);
7c673cae
FG
1427 }
1428 }
91327a77 1429 }
11fdf7f2
TL
1430 }
1431 while ((int)num_spawned() > spawn_window) {
1432 set_status() << "num_spawned() > spawn_window";
1433 yield wait_for_child();
1434 int ret;
1435 while (collect(&ret, lease_stack.get())) {
1436 if (ret < 0) {
1437 tn->log(10, "a sync operation returned error");
1438 /* we have reported this error */
7c673cae 1439 }
91327a77 1440 /* not waiting for child here */
7c673cae 1441 }
a8e16298 1442 }
11fdf7f2
TL
1443
1444 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1445 << " next_marker=" << next_marker << " truncated=" << truncated));
a8e16298
TL
1446 if (!next_marker.empty()) {
1447 sync_marker.marker = next_marker;
1448 } else if (!log_entries.empty()) {
1449 sync_marker.marker = log_entries.back().log_id;
1450 }
11fdf7f2
TL
1451 if (!truncated) {
1452 // we reached the end, wait a while before checking for more
1453 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1454 yield wait(get_idle_interval());
1455 }
7c673cae
FG
1456 } while (true);
1457 }
1458 return 0;
1459 }
91327a77
AA
1460
1461 utime_t get_idle_interval() const {
1462#define INCREMENTAL_INTERVAL 20
1463 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1464 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1465 auto now = ceph::coarse_real_clock::now();
1466 if (error_retry_time > now) {
1467 auto d = error_retry_time - now;
1468 if (interval > d) {
1469 interval = d;
1470 }
1471 }
1472 }
1473 // convert timespan -> time_point -> utime_t
1474 return utime_t(ceph::coarse_real_clock::zero() + interval);
1475 }
1476
7c673cae
FG
1477 void stop_spawned_services() {
1478 lease_cr->go_down();
1479 if (error_repo) {
1480 error_repo->finish();
1481 error_repo->put();
1482 error_repo = NULL;
1483 }
1484 }
1485};
1486
1487class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1488 RGWDataSyncEnv *sync_env;
1489
1490 rgw_pool pool;
1491
1492 uint32_t shard_id;
1493 rgw_data_sync_marker sync_marker;
1494
11fdf7f2 1495 RGWSyncTraceNodeRef tn;
7c673cae 1496public:
11fdf7f2
TL
1497 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
1498 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1499 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
7c673cae
FG
1500 sync_env(_sync_env),
1501 pool(_pool),
1502 shard_id(_shard_id),
1503 sync_marker(_marker) {
11fdf7f2 1504 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
7c673cae
FG
1505 }
1506
1507 RGWCoroutine *alloc_cr() override {
11fdf7f2 1508 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
7c673cae
FG
1509 }
1510
1511 RGWCoroutine *alloc_finisher_cr() override {
1512 RGWRados *store = sync_env->store;
11fdf7f2
TL
1513 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1514 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
7c673cae
FG
1515 &sync_marker);
1516 }
1517
1518 void append_modified_shards(set<string>& keys) {
1519 Mutex::Locker l(cr_lock());
1520
1521 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1522 if (!cr) {
1523 return;
1524 }
1525
1526 cr->append_modified_shards(keys);
1527 }
1528};
1529
1530class RGWDataSyncCR : public RGWCoroutine {
1531 RGWDataSyncEnv *sync_env;
1532 uint32_t num_shards;
1533
1534 rgw_data_sync_status sync_status;
1535
1536 RGWDataSyncShardMarkerTrack *marker_tracker;
1537
1538 Mutex shard_crs_lock;
1539 map<int, RGWDataSyncShardControlCR *> shard_crs;
1540
1541 bool *reset_backoff;
1542
11fdf7f2 1543 RGWSyncTraceNodeRef tn;
31f18b77
FG
1544
1545 RGWDataSyncModule *data_sync_module{nullptr};
7c673cae 1546public:
11fdf7f2 1547 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1548 sync_env(_sync_env),
1549 num_shards(_num_shards),
1550 marker_tracker(NULL),
1551 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
11fdf7f2 1552 reset_backoff(_reset_backoff), tn(_tn) {
31f18b77 1553
7c673cae
FG
1554 }
1555
1556 ~RGWDataSyncCR() override {
1557 for (auto iter : shard_crs) {
1558 iter.second->put();
1559 }
1560 }
1561
1562 int operate() override {
1563 reenter(this) {
1564
1565 /* read sync status */
1566 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1567
31f18b77
FG
1568 data_sync_module = sync_env->sync_module->get_data_handler();
1569
224ce89b 1570 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 1571 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
7c673cae
FG
1572 return set_cr_error(retcode);
1573 }
1574
1575 /* state: init status */
1576 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
11fdf7f2 1577 tn->log(20, SSTR("init"));
224ce89b 1578 sync_status.sync_info.num_shards = num_shards;
31f18b77 1579 uint64_t instance_id;
11fdf7f2
TL
1580 instance_id = ceph::util::generate_random_number<uint64_t>();
1581 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
7c673cae 1582 if (retcode < 0) {
11fdf7f2 1583 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
7c673cae
FG
1584 return set_cr_error(retcode);
1585 }
1586 // sets state = StateBuildingFullSyncMaps
1587
1588 *reset_backoff = true;
1589 }
1590
31f18b77
FG
1591 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1592
7c673cae 1593 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
11fdf7f2 1594 tn->log(10, SSTR("building full sync maps"));
31f18b77 1595 /* call sync module init here */
b32b8144 1596 sync_status.sync_info.num_shards = num_shards;
31f18b77
FG
1597 yield call(data_sync_module->init_sync(sync_env));
1598 if (retcode < 0) {
11fdf7f2 1599 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
31f18b77
FG
1600 return set_cr_error(retcode);
1601 }
7c673cae 1602 /* state: building full sync maps */
7c673cae
FG
1603 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1604 if (retcode < 0) {
11fdf7f2 1605 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
7c673cae
FG
1606 return set_cr_error(retcode);
1607 }
1608 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1609
1610 /* update new state */
1611 yield call(set_sync_info_cr());
1612 if (retcode < 0) {
11fdf7f2 1613 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
7c673cae
FG
1614 return set_cr_error(retcode);
1615 }
1616
1617 *reset_backoff = true;
1618 }
1619
11fdf7f2
TL
1620 yield call(data_sync_module->start_sync(sync_env));
1621
7c673cae
FG
1622 yield {
1623 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
11fdf7f2 1624 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
7c673cae
FG
1625 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1626 iter != sync_status.sync_markers.end(); ++iter) {
11fdf7f2
TL
1627 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
1628 iter->first, iter->second, tn);
7c673cae
FG
1629 cr->get();
1630 shard_crs_lock.Lock();
1631 shard_crs[iter->first] = cr;
1632 shard_crs_lock.Unlock();
1633 spawn(cr, true);
1634 }
1635 }
1636 }
1637
1638 return set_cr_done();
1639 }
1640 return 0;
1641 }
1642
1643 RGWCoroutine *set_sync_info_cr() {
1644 RGWRados *store = sync_env->store;
11fdf7f2
TL
1645 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
1646 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
1647 sync_status.sync_info);
1648 }
1649
1650 void wakeup(int shard_id, set<string>& keys) {
1651 Mutex::Locker l(shard_crs_lock);
1652 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1653 if (iter == shard_crs.end()) {
1654 return;
1655 }
1656 iter->second->append_modified_shards(keys);
1657 iter->second->wakeup();
1658 }
1659};
1660
1661class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1662public:
1663 RGWDefaultDataSyncModule() {}
1664
11fdf7f2 1665 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
31f18b77 1666 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae 1667 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1668 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae
FG
1669};
1670
1671class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1672 RGWDefaultDataSyncModule data_handler;
1673public:
1674 RGWDefaultSyncModuleInstance() {}
1675 RGWDataSyncModule *get_data_handler() override {
1676 return &data_handler;
1677 }
11fdf7f2
TL
1678 bool supports_user_writes() override {
1679 return true;
1680 }
7c673cae
FG
1681};
1682
11fdf7f2 1683int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
7c673cae
FG
1684{
1685 instance->reset(new RGWDefaultSyncModuleInstance());
1686 return 0;
1687}
1688
11fdf7f2 1689RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1690{
1691 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
11fdf7f2
TL
1692 std::nullopt,
1693 key, std::nullopt, versioned_epoch,
81eedcae 1694 true, zones_trace, sync_env->counters);
7c673cae
FG
1695}
1696
1697RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
31f18b77 1698 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1699{
1700 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1701 bucket_info, key, versioned, versioned_epoch,
31f18b77 1702 NULL, NULL, false, &mtime, zones_trace);
7c673cae
FG
1703}
1704
1705RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1706 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1707{
1708 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1709 bucket_info, key, versioned, versioned_epoch,
31f18b77 1710 &owner.id, &owner.display_name, true, &mtime, zones_trace);
7c673cae
FG
1711}
1712
11fdf7f2
TL
1713class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
1714public:
1715 RGWArchiveDataSyncModule() {}
1716
1717 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1718 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1719 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1720 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1721};
1722
1723class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
1724 RGWArchiveDataSyncModule data_handler;
1725public:
1726 RGWArchiveSyncModuleInstance() {}
1727 RGWDataSyncModule *get_data_handler() override {
1728 return &data_handler;
1729 }
1730 RGWMetadataHandler *alloc_bucket_meta_handler() override {
1731 return RGWArchiveBucketMetaHandlerAllocator::alloc();
1732 }
1733 RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
1734 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
1735 }
1736};
1737
1738int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1739{
1740 instance->reset(new RGWArchiveSyncModuleInstance());
1741 return 0;
1742}
1743
1744RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1745{
1746 ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1747 if (!bucket_info.versioned() ||
1748 (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
1749 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
1750 bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
1751 int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
1752 if (op_ret < 0) {
1753 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
1754 return NULL;
1755 }
1756 }
1757
1758 std::optional<rgw_obj_key> dest_key;
1759
1760 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
1761 versioned_epoch = 0;
1762 dest_key = key;
1763 if (key.instance.empty()) {
1764 sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
1765 }
1766 }
1767
1768 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1769 bucket_info, std::nullopt,
1770 key, dest_key, versioned_epoch,
81eedcae 1771 true, zones_trace, nullptr);
11fdf7f2
TL
1772}
1773
1774RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1775 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1776{
1777 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
1778 return NULL;
1779}
1780
1781RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1782 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1783{
1784 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
1785 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1786 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1787 bucket_info, key, versioned, versioned_epoch,
1788 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1789}
1790
7c673cae
FG
1791class RGWDataSyncControlCR : public RGWBackoffControlCR
1792{
1793 RGWDataSyncEnv *sync_env;
1794 uint32_t num_shards;
1795
11fdf7f2
TL
1796 RGWSyncTraceNodeRef tn;
1797
3efd9988 1798 static constexpr bool exit_on_error = false; // retry on all errors
7c673cae 1799public:
11fdf7f2
TL
1800 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
1801 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1802 sync_env(_sync_env), num_shards(_num_shards) {
1803 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
7c673cae
FG
1804 }
1805
1806 RGWCoroutine *alloc_cr() override {
11fdf7f2 1807 return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
7c673cae
FG
1808 }
1809
1810 void wakeup(int shard_id, set<string>& keys) {
1811 Mutex& m = cr_lock();
1812
1813 m.Lock();
1814 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1815 if (!cr) {
1816 m.Unlock();
1817 return;
1818 }
1819
1820 cr->get();
1821 m.Unlock();
1822
1823 if (cr) {
11fdf7f2 1824 tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
7c673cae
FG
1825 cr->wakeup(shard_id, keys);
1826 }
1827
1828 cr->put();
1829 }
1830};
1831
1832void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1833 RWLock::RLocker rl(lock);
1834 if (!data_sync_cr) {
1835 return;
1836 }
1837 data_sync_cr->wakeup(shard_id, keys);
1838}
1839
1840int RGWRemoteDataLog::run_sync(int num_shards)
1841{
1842 lock.get_write();
11fdf7f2 1843 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
7c673cae
FG
1844 data_sync_cr->get(); // run() will drop a ref, so take another
1845 lock.unlock();
1846
1847 int r = run(data_sync_cr);
1848
1849 lock.get_write();
1850 data_sync_cr->put();
1851 data_sync_cr = NULL;
1852 lock.unlock();
1853
1854 if (r < 0) {
11fdf7f2 1855 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
7c673cae
FG
1856 return r;
1857 }
1858 return 0;
1859}
1860
1861int RGWDataSyncStatusManager::init()
1862{
11fdf7f2
TL
1863 RGWZone *zone_def;
1864
1865 if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
1866 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
7c673cae
FG
1867 return -EIO;
1868 }
1869
11fdf7f2 1870 if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
7c673cae
FG
1871 return -ENOTSUP;
1872 }
1873
11fdf7f2 1874 const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
7c673cae 1875
94b18763
FG
1876 if (sync_module == nullptr) {
1877 sync_module = store->get_sync_module();
1878 }
7c673cae 1879
11fdf7f2 1880 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 1881 if (!conn) {
11fdf7f2 1882 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
1883 return -EINVAL;
1884 }
1885
1886 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1887
81eedcae
TL
1888 int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(),
1889 sync_module, counters);
7c673cae 1890 if (r < 0) {
11fdf7f2 1891 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
7c673cae
FG
1892 finalize();
1893 return r;
1894 }
1895
1896 rgw_datalog_info datalog_info;
1897 r = source_log.read_log_info(&datalog_info);
1898 if (r < 0) {
11fdf7f2 1899 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
7c673cae
FG
1900 finalize();
1901 return r;
1902 }
1903
1904 num_shards = datalog_info.num_shards;
1905
1906 for (int i = 0; i < num_shards; i++) {
1907 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1908 }
1909
1910 return 0;
1911}
1912
1913void RGWDataSyncStatusManager::finalize()
1914{
1915 delete error_logger;
1916 error_logger = nullptr;
1917}
1918
11fdf7f2
TL
1919unsigned RGWDataSyncStatusManager::get_subsys() const
1920{
1921 return dout_subsys;
1922}
1923
1924std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
1925{
1926 auto zone = std::string_view{source_zone};
1927 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
1928}
1929
7c673cae
FG
1930string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1931{
1932 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1933 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1934
1935 return string(buf);
1936}
1937
1938string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1939{
1940 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1941 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1942
1943 return string(buf);
1944}
1945
1946int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1947 const rgw_bucket& bucket, int shard_id,
1948 RGWSyncErrorLogger *_error_logger,
11fdf7f2 1949 RGWSyncTraceManager *_sync_tracer,
7c673cae
FG
1950 RGWSyncModuleInstanceRef& _sync_module)
1951{
1952 conn = _conn;
1953 source_zone = _source_zone;
1954 bs.bucket = bucket;
1955 bs.shard_id = shard_id;
1956
11fdf7f2 1957 sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
81eedcae 1958 _error_logger, _sync_tracer, source_zone, _sync_module, nullptr);
7c673cae
FG
1959
1960 return 0;
1961}
1962
7c673cae
FG
1963class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1964 RGWDataSyncEnv *sync_env;
1965 const string instance_key;
1966
28e407b8 1967 rgw_bucket_index_marker_info *info;
7c673cae
FG
1968
1969public:
1970 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1971 const rgw_bucket_shard& bs,
28e407b8 1972 rgw_bucket_index_marker_info *_info)
7c673cae
FG
1973 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1974 instance_key(bs.get_key()), info(_info) {}
1975
1976 int operate() override {
1977 reenter(this) {
1978 yield {
1979 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1980 { "bucket-instance", instance_key.c_str() },
1981 { "info" , NULL },
1982 { NULL, NULL } };
1983
1984 string p = "/admin/log/";
28e407b8 1985 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
7c673cae
FG
1986 }
1987 if (retcode < 0) {
1988 return set_cr_error(retcode);
1989 }
1990 return set_cr_done();
1991 }
1992 return 0;
1993 }
1994};
1995
1996class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1997 RGWDataSyncEnv *sync_env;
1998
1999 rgw_bucket_shard bs;
2000 const string sync_status_oid;
2001
2002 rgw_bucket_shard_sync_info& status;
2003
28e407b8 2004 rgw_bucket_index_marker_info info;
7c673cae
FG
2005public:
2006 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2007 const rgw_bucket_shard& bs,
2008 rgw_bucket_shard_sync_info& _status)
2009 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2010 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2011 status(_status)
2012 {}
2013
2014 int operate() override {
2015 reenter(this) {
2016 /* fetch current position in logs */
2017 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
2018 if (retcode < 0 && retcode != -ENOENT) {
2019 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
2020 return set_cr_error(retcode);
2021 }
2022 yield {
7c673cae 2023 auto store = sync_env->store;
11fdf7f2 2024 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
c07f9fc5
FG
2025
2026 if (info.syncstopped) {
2027 call(new RGWRadosRemoveCR(store, obj));
2028 } else {
2029 status.state = rgw_bucket_shard_sync_info::StateFullSync;
2030 status.inc_marker.position = info.max_marker;
2031 map<string, bufferlist> attrs;
2032 status.encode_all_attrs(attrs);
11fdf7f2 2033 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
c07f9fc5 2034 }
7c673cae 2035 }
91327a77
AA
2036 if (info.syncstopped) {
2037 retcode = -ENOENT;
2038 }
2039 if (retcode < 0) {
2040 return set_cr_error(retcode);
2041 }
7c673cae
FG
2042 return set_cr_done();
2043 }
2044 return 0;
2045 }
2046};
2047
2048RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
2049{
2050 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
2051}
2052
11fdf7f2
TL
2053#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2054
7c673cae 2055template <class T>
11fdf7f2 2056static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
7c673cae
FG
2057{
2058 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
2059 if (iter == attrs.end()) {
2060 *val = T();
11fdf7f2 2061 return false;
7c673cae
FG
2062 }
2063
11fdf7f2 2064 auto biter = iter->second.cbegin();
7c673cae 2065 try {
11fdf7f2 2066 decode(*val, biter);
7c673cae
FG
2067 } catch (buffer::error& err) {
2068 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
11fdf7f2 2069 return false;
7c673cae 2070 }
11fdf7f2 2071 return true;
7c673cae
FG
2072}
2073
2074void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
2075{
11fdf7f2
TL
2076 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
2077 decode_attr(cct, attrs, "state", &state);
2078 }
2079 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
2080 decode_attr(cct, attrs, "full_marker", &full_marker);
2081 }
2082 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
2083 decode_attr(cct, attrs, "inc_marker", &inc_marker);
2084 }
7c673cae
FG
2085}
2086
2087void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
2088{
2089 encode_state_attr(attrs);
2090 full_marker.encode_attr(attrs);
2091 inc_marker.encode_attr(attrs);
2092}
2093
2094void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
2095{
11fdf7f2
TL
2096 using ceph::encode;
2097 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
7c673cae
FG
2098}
2099
2100void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2101{
11fdf7f2
TL
2102 using ceph::encode;
2103 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
7c673cae
FG
2104}
2105
2106void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2107{
11fdf7f2
TL
2108 using ceph::encode;
2109 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
7c673cae
FG
2110}
2111
2112class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
2113 RGWDataSyncEnv *sync_env;
2114 string oid;
2115 rgw_bucket_shard_sync_info *status;
2116
2117 map<string, bufferlist> attrs;
2118public:
2119 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2120 const rgw_bucket_shard& bs,
2121 rgw_bucket_shard_sync_info *_status)
2122 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2123 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2124 status(_status) {}
2125 int operate() override;
2126};
2127
2128int RGWReadBucketSyncStatusCoroutine::operate()
2129{
2130 reenter(this) {
11fdf7f2
TL
2131 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
2132 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
2133 &attrs, true));
7c673cae
FG
2134 if (retcode == -ENOENT) {
2135 *status = rgw_bucket_shard_sync_info();
2136 return set_cr_done();
2137 }
2138 if (retcode < 0) {
2139 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2140 return set_cr_error(retcode);
2141 }
2142 status->decode_from_attrs(sync_env->cct, attrs);
2143 return set_cr_done();
2144 }
2145 return 0;
2146}
28e407b8
AA
2147
2148#define OMAP_READ_MAX_ENTRIES 10
2149class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2150 RGWDataSyncEnv *sync_env;
2151 RGWRados *store;
2152
2153 const int shard_id;
2154 int max_entries;
2155
2156 set<string>& recovering_buckets;
2157 string marker;
2158 string error_oid;
2159
11fdf7f2 2160 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
2161 set<string> error_entries;
2162 int max_omap_entries;
2163 int count;
2164
2165public:
2166 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2167 set<string>& _recovering_buckets, const int _max_entries)
2168 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2169 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2170 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2171 {
2172 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2173 }
2174
2175 int operate() override;
2176};
2177
2178int RGWReadRecoveringBucketShardsCoroutine::operate()
2179{
2180 reenter(this){
2181 //read recovering bucket shards
2182 count = 0;
2183 do {
11fdf7f2
TL
2184 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
2185 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid),
2186 marker, max_omap_entries, omapkeys));
28e407b8
AA
2187
2188 if (retcode == -ENOENT) {
2189 break;
2190 }
2191
2192 if (retcode < 0) {
2193 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2194 << cpp_strerror(retcode) << dendl;
2195 return set_cr_error(retcode);
2196 }
2197
11fdf7f2 2198 error_entries = std::move(omapkeys->entries);
28e407b8
AA
2199 if (error_entries.empty()) {
2200 break;
2201 }
2202
2203 count += error_entries.size();
2204 marker = *error_entries.rbegin();
11fdf7f2
TL
2205 recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
2206 std::make_move_iterator(error_entries.end()));
2207 } while (omapkeys->more && count < max_entries);
28e407b8
AA
2208
2209 return set_cr_done();
2210 }
2211
2212 return 0;
2213}
2214
2215class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2216 RGWDataSyncEnv *sync_env;
2217 RGWRados *store;
2218
2219 const int shard_id;
2220 int max_entries;
2221
2222 set<string>& pending_buckets;
2223 string marker;
2224 string status_oid;
2225
2226 rgw_data_sync_marker* sync_marker;
2227 int count;
2228
a8e16298 2229 std::string next_marker;
28e407b8
AA
2230 list<rgw_data_change_log_entry> log_entries;
2231 bool truncated;
2232
2233public:
2234 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2235 set<string>& _pending_buckets,
2236 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2237 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2238 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2239 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2240 {
2241 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2242 }
2243
2244 int operate() override;
2245};
2246
2247int RGWReadPendingBucketShardsCoroutine::operate()
2248{
2249 reenter(this){
2250 //read sync status marker
2251 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
2252 yield call(new CR(sync_env->async_rados, store->svc.sysobj,
2253 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
28e407b8
AA
2254 sync_marker));
2255 if (retcode < 0) {
2256 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2257 << cpp_strerror(retcode) << dendl;
2258 return set_cr_error(retcode);
2259 }
2260
2261 //read pending bucket shards
2262 marker = sync_marker->marker;
2263 count = 0;
2264 do{
a8e16298
TL
2265 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
2266 &next_marker, &log_entries, &truncated));
28e407b8
AA
2267
2268 if (retcode == -ENOENT) {
2269 break;
2270 }
2271
2272 if (retcode < 0) {
2273 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2274 << cpp_strerror(retcode) << dendl;
2275 return set_cr_error(retcode);
2276 }
2277
2278 if (log_entries.empty()) {
2279 break;
2280 }
2281
2282 count += log_entries.size();
2283 for (const auto& entry : log_entries) {
2284 pending_buckets.insert(entry.entry.key);
2285 }
2286 }while(truncated && count < max_entries);
2287
2288 return set_cr_done();
2289 }
2290
2291 return 0;
2292}
2293
2294int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2295{
2296 // cannot run concurrently with run_sync(), so run in a separate manager
2297 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2298 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2299 int ret = http_manager.start();
28e407b8 2300 if (ret < 0) {
11fdf7f2 2301 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
2302 return ret;
2303 }
2304 RGWDataSyncEnv sync_env_local = sync_env;
2305 sync_env_local.http_manager = &http_manager;
2306 list<RGWCoroutinesStack *> stacks;
2307 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2308 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2309 stacks.push_back(recovering_stack);
2310 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2311 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2312 stacks.push_back(pending_stack);
2313 ret = crs.run(stacks);
2314 http_manager.stop();
2315 return ret;
2316}
2317
7c673cae
FG
2318RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2319{
2320 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2321}
2322
2323RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2324 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2325 delete iter->second;
2326 }
2327 delete error_logger;
2328}
2329
2330
2331void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2332{
2333 JSONDecoder::decode_json("ID", id, obj);
2334 JSONDecoder::decode_json("DisplayName", display_name, obj);
2335}
2336
2337struct bucket_list_entry {
2338 bool delete_marker;
2339 rgw_obj_key key;
2340 bool is_latest;
2341 real_time mtime;
2342 string etag;
2343 uint64_t size;
2344 string storage_class;
2345 rgw_bucket_entry_owner owner;
2346 uint64_t versioned_epoch;
2347 string rgw_tag;
2348
2349 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2350
2351 void decode_json(JSONObj *obj) {
2352 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2353 JSONDecoder::decode_json("Key", key.name, obj);
2354 JSONDecoder::decode_json("VersionId", key.instance, obj);
2355 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2356 string mtime_str;
2357 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2358
2359 struct tm t;
2360 uint32_t nsec;
2361 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2362 ceph_timespec ts;
2363 ts.tv_sec = (uint64_t)internal_timegm(&t);
2364 ts.tv_nsec = nsec;
2365 mtime = real_clock::from_ceph_timespec(ts);
2366 }
2367 JSONDecoder::decode_json("ETag", etag, obj);
2368 JSONDecoder::decode_json("Size", size, obj);
2369 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2370 JSONDecoder::decode_json("Owner", owner, obj);
2371 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2372 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
11fdf7f2
TL
2373 if (key.instance == "null" && !versioned_epoch) {
2374 key.instance.clear();
2375 }
7c673cae 2376 }
a8e16298
TL
2377
2378 RGWModifyOp get_modify_op() const {
2379 if (delete_marker) {
2380 return CLS_RGW_OP_LINK_OLH_DM;
2381 } else if (!key.instance.empty() && key.instance != "null") {
2382 return CLS_RGW_OP_LINK_OLH;
2383 } else {
2384 return CLS_RGW_OP_ADD;
2385 }
2386 }
7c673cae
FG
2387};
2388
2389struct bucket_list_result {
2390 string name;
2391 string prefix;
2392 string key_marker;
2393 string version_id_marker;
2394 int max_keys;
2395 bool is_truncated;
2396 list<bucket_list_entry> entries;
2397
2398 bucket_list_result() : max_keys(0), is_truncated(false) {}
2399
2400 void decode_json(JSONObj *obj) {
2401 JSONDecoder::decode_json("Name", name, obj);
2402 JSONDecoder::decode_json("Prefix", prefix, obj);
2403 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2404 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2405 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2406 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2407 JSONDecoder::decode_json("Entries", entries, obj);
2408 }
2409};
2410
2411class RGWListBucketShardCR: public RGWCoroutine {
2412 RGWDataSyncEnv *sync_env;
2413 const rgw_bucket_shard& bs;
2414 const string instance_key;
2415 rgw_obj_key marker_position;
2416
2417 bucket_list_result *result;
2418
2419public:
2420 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2421 rgw_obj_key& _marker_position, bucket_list_result *_result)
2422 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2423 instance_key(bs.get_key()), marker_position(_marker_position),
2424 result(_result) {}
2425
2426 int operate() override {
2427 reenter(this) {
2428 yield {
2429 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2430 { "versions" , NULL },
2431 { "format" , "json" },
2432 { "objs-container" , "true" },
2433 { "key-marker" , marker_position.name.c_str() },
2434 { "version-id-marker" , marker_position.instance.c_str() },
2435 { NULL, NULL } };
2436 // don't include tenant in the url, it's already part of instance_key
2437 string p = string("/") + bs.bucket.name;
2438 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2439 }
2440 if (retcode < 0) {
2441 return set_cr_error(retcode);
2442 }
2443 return set_cr_done();
2444 }
2445 return 0;
2446 }
2447};
2448
2449class RGWListBucketIndexLogCR: public RGWCoroutine {
2450 RGWDataSyncEnv *sync_env;
2451 const string instance_key;
2452 string marker;
2453
2454 list<rgw_bi_log_entry> *result;
81eedcae 2455 std::optional<PerfGuard> timer;
7c673cae
FG
2456
2457public:
2458 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2459 string& _marker, list<rgw_bi_log_entry> *_result)
2460 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2461 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2462
2463 int operate() override {
2464 reenter(this) {
81eedcae
TL
2465 if (sync_env->counters) {
2466 timer.emplace(sync_env->counters, sync_counters::l_poll);
2467 }
7c673cae
FG
2468 yield {
2469 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2470 { "format" , "json" },
2471 { "marker" , marker.c_str() },
2472 { "type", "bucket-index" },
2473 { NULL, NULL } };
2474
2475 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2476 }
81eedcae 2477 timer.reset();
7c673cae 2478 if (retcode < 0) {
81eedcae
TL
2479 if (sync_env->counters) {
2480 sync_env->counters->inc(sync_counters::l_poll_err);
2481 }
7c673cae
FG
2482 return set_cr_error(retcode);
2483 }
2484 return set_cr_done();
2485 }
2486 return 0;
2487 }
2488};
2489
2490#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2491
2492class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2493 RGWDataSyncEnv *sync_env;
2494
2495 string marker_oid;
2496 rgw_bucket_shard_full_sync_marker sync_marker;
2497
11fdf7f2
TL
2498 RGWSyncTraceNodeRef tn;
2499
7c673cae
FG
2500public:
2501 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2502 const string& _marker_oid,
2503 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2504 sync_env(_sync_env),
2505 marker_oid(_marker_oid),
2506 sync_marker(_marker) {}
2507
11fdf7f2
TL
2508 void set_tn(RGWSyncTraceNodeRef& _tn) {
2509 tn = _tn;
2510 }
2511
7c673cae
FG
2512 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2513 sync_marker.position = new_marker;
2514 sync_marker.count = index_pos;
2515
2516 map<string, bufferlist> attrs;
2517 sync_marker.encode_attr(attrs);
2518
2519 RGWRados *store = sync_env->store;
2520
11fdf7f2
TL
2521 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
2522 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2523 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2524 attrs);
2525 }
91327a77 2526
11fdf7f2 2527 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2528 return new RGWLastCallerWinsCR(sync_env->cct);
2529 }
7c673cae
FG
2530};
2531
2532class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2533 RGWDataSyncEnv *sync_env;
2534
2535 string marker_oid;
2536 rgw_bucket_shard_inc_sync_marker sync_marker;
2537
2538 map<rgw_obj_key, string> key_to_marker;
91327a77
AA
2539
2540 struct operation {
2541 rgw_obj_key key;
2542 bool is_olh;
2543 };
2544 map<string, operation> marker_to_op;
2545 std::set<std::string> pending_olh; // object names with pending olh operations
7c673cae 2546
11fdf7f2
TL
2547 RGWSyncTraceNodeRef tn;
2548
7c673cae 2549 void handle_finish(const string& marker) override {
91327a77
AA
2550 auto iter = marker_to_op.find(marker);
2551 if (iter == marker_to_op.end()) {
7c673cae
FG
2552 return;
2553 }
91327a77
AA
2554 auto& op = iter->second;
2555 key_to_marker.erase(op.key);
2556 reset_need_retry(op.key);
2557 if (op.is_olh) {
2558 pending_olh.erase(op.key.name);
2559 }
2560 marker_to_op.erase(iter);
7c673cae
FG
2561 }
2562
2563public:
2564 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2565 const string& _marker_oid,
2566 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2567 sync_env(_sync_env),
2568 marker_oid(_marker_oid),
2569 sync_marker(_marker) {}
2570
11fdf7f2
TL
2571 void set_tn(RGWSyncTraceNodeRef& _tn) {
2572 tn = _tn;
2573 }
2574
7c673cae
FG
2575 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2576 sync_marker.position = new_marker;
2577
2578 map<string, bufferlist> attrs;
2579 sync_marker.encode_attr(attrs);
2580
2581 RGWRados *store = sync_env->store;
2582
11fdf7f2 2583 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae 2584 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
11fdf7f2
TL
2585 store->svc.sysobj,
2586 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2587 attrs);
2588 }
2589
2590 /*
2591 * create index from key -> <op, marker>, and from marker -> key
2592 * this is useful so that we can insure that we only have one
2593 * entry for any key that is used. This is needed when doing
2594 * incremenatl sync of data, and we don't want to run multiple
2595 * concurrent sync operations for the same bucket shard
2596 * Also, we should make sure that we don't run concurrent operations on the same key with
2597 * different ops.
2598 */
91327a77
AA
2599 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2600 auto result = key_to_marker.emplace(key, marker);
2601 if (!result.second) { // exists
7c673cae
FG
2602 set_need_retry(key);
2603 return false;
2604 }
91327a77
AA
2605 marker_to_op[marker] = operation{key, is_olh};
2606 if (is_olh) {
2607 // prevent other olh ops from starting on this object name
2608 pending_olh.insert(key.name);
2609 }
7c673cae
FG
2610 return true;
2611 }
2612
91327a77
AA
2613 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2614 // serialize olh ops on the same object name
2615 if (is_olh && pending_olh.count(key.name)) {
11fdf7f2 2616 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
91327a77
AA
2617 return false;
2618 }
7c673cae
FG
2619 return (key_to_marker.find(key) == key_to_marker.end());
2620 }
91327a77 2621
11fdf7f2 2622 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2623 return new RGWLastCallerWinsCR(sync_env->cct);
2624 }
7c673cae
FG
2625};
2626
2627template <class T, class K>
2628class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2629 RGWDataSyncEnv *sync_env;
2630
2631 RGWBucketInfo *bucket_info;
2632 const rgw_bucket_shard& bs;
2633
2634 rgw_obj_key key;
2635 bool versioned;
11fdf7f2 2636 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
2637 rgw_bucket_entry_owner owner;
2638 real_time timestamp;
2639 RGWModifyOp op;
2640 RGWPendingState op_state;
2641
2642 T entry_marker;
2643 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2644
2645 int sync_status;
2646
2647 stringstream error_ss;
2648
7c673cae
FG
2649 bool error_injection;
2650
2651 RGWDataSyncModule *data_sync_module;
31f18b77
FG
2652
2653 rgw_zone_set zones_trace;
7c673cae 2654
11fdf7f2 2655 RGWSyncTraceNodeRef tn;
7c673cae
FG
2656public:
2657 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2658 RGWBucketInfo *_bucket_info,
2659 const rgw_bucket_shard& bs,
91327a77 2660 const rgw_obj_key& _key, bool _versioned,
11fdf7f2 2661 std::optional<uint64_t> _versioned_epoch,
7c673cae
FG
2662 real_time& _timestamp,
2663 const rgw_bucket_entry_owner& _owner,
2664 RGWModifyOp _op, RGWPendingState _op_state,
11fdf7f2
TL
2665 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
2666 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
2667 sync_env(_sync_env),
2668 bucket_info(_bucket_info), bs(bs),
2669 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2670 owner(_owner),
2671 timestamp(_timestamp), op(_op),
2672 op_state(_op_state),
2673 entry_marker(_entry_marker),
2674 marker_tracker(_marker_tracker),
31f18b77 2675 sync_status(0){
7c673cae 2676 stringstream ss;
91327a77 2677 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
7c673cae 2678 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
7c673cae
FG
2679 set_status("init");
2680
11fdf7f2 2681 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
7c673cae 2682
11fdf7f2 2683 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
7c673cae
FG
2684 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2685
2686 data_sync_module = sync_env->sync_module->get_data_handler();
31f18b77
FG
2687
2688 zones_trace = _zones_trace;
11fdf7f2 2689 zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
7c673cae
FG
2690 }
2691
2692 int operate() override {
2693 reenter(this) {
2694 /* skip entries that are not complete */
2695 if (op_state != CLS_RGW_STATE_COMPLETE) {
2696 goto done;
2697 }
11fdf7f2 2698 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2699 do {
2700 yield {
2701 marker_tracker->reset_need_retry(key);
2702 if (key.name.empty()) {
2703 /* shouldn't happen */
2704 set_status("skipping empty entry");
11fdf7f2 2705 tn->log(0, "entry with empty obj name, skipping");
7c673cae
FG
2706 goto done;
2707 }
2708 if (error_injection &&
2709 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
11fdf7f2 2710 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
7c673cae
FG
2711 retcode = -EIO;
2712 } else if (op == CLS_RGW_OP_ADD ||
2713 op == CLS_RGW_OP_LINK_OLH) {
7c673cae 2714 set_status("syncing obj");
11fdf7f2 2715 tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
31f18b77 2716 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
7c673cae
FG
2717 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2718 set_status("removing obj");
2719 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2720 versioned = true;
2721 }
11fdf7f2 2722 tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2723 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
f64942e4
AA
2724 // our copy of the object is more recent, continue as if it succeeded
2725 if (retcode == -ERR_PRECONDITION_FAILED) {
2726 retcode = 0;
2727 }
7c673cae 2728 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
7c673cae 2729 set_status("creating delete marker");
11fdf7f2 2730 tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2731 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
7c673cae 2732 }
11fdf7f2 2733 tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
7c673cae
FG
2734 }
2735 } while (marker_tracker->need_retry(key));
2736 {
11fdf7f2 2737 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 2738 if (retcode >= 0) {
11fdf7f2 2739 tn->log(10, "success");
7c673cae 2740 } else {
11fdf7f2 2741 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
7c673cae 2742 }
7c673cae
FG
2743 }
2744
2745 if (retcode < 0 && retcode != -ENOENT) {
2746 set_status() << "failed to sync obj; retcode=" << retcode;
11fdf7f2
TL
2747 tn->log(0, SSTR("ERROR: failed to sync object: "
2748 << bucket_shard_str{bs} << "/" << key.name));
7c673cae
FG
2749 error_ss << bucket_shard_str{bs} << "/" << key.name;
2750 sync_status = retcode;
2751 }
2752 if (!error_ss.str().empty()) {
28e407b8 2753 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
7c673cae
FG
2754 }
2755done:
2756 if (sync_status == 0) {
2757 /* update marker */
2758 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2759 yield call(marker_tracker->finish(entry_marker));
2760 sync_status = retcode;
2761 }
2762 if (sync_status < 0) {
2763 return set_cr_error(sync_status);
2764 }
2765 return set_cr_done();
2766 }
2767 return 0;
2768 }
2769};
2770
2771#define BUCKET_SYNC_SPAWN_WINDOW 20
2772
2773class RGWBucketShardFullSyncCR : public RGWCoroutine {
2774 RGWDataSyncEnv *sync_env;
2775 const rgw_bucket_shard& bs;
2776 RGWBucketInfo *bucket_info;
2777 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2778 bucket_list_result list_result;
2779 list<bucket_list_entry>::iterator entries_iter;
91327a77 2780 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2781 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2782 rgw_obj_key list_marker;
2783 bucket_list_entry *entry{nullptr};
7c673cae
FG
2784
2785 int total_entries{0};
2786
2787 int sync_status{0};
2788
2789 const string& status_oid;
2790
31f18b77 2791 rgw_zone_set zones_trace;
11fdf7f2
TL
2792
2793 RGWSyncTraceNodeRef tn;
7c673cae
FG
2794public:
2795 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2796 RGWBucketInfo *_bucket_info,
2797 const std::string& status_oid,
2798 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2799 rgw_bucket_shard_sync_info& sync_info,
2800 RGWSyncTraceNodeRef tn_parent)
7c673cae 2801 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2802 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2803 marker_tracker(sync_env, status_oid, sync_info.full_marker),
11fdf7f2
TL
2804 status_oid(status_oid),
2805 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
2806 SSTR(bucket_shard_str{bs}))) {
31f18b77 2807 zones_trace.insert(sync_env->source_zone);
11fdf7f2 2808 marker_tracker.set_tn(tn);
7c673cae
FG
2809 }
2810
2811 int operate() override;
2812};
2813
2814int RGWBucketShardFullSyncCR::operate()
2815{
2816 int ret;
2817 reenter(this) {
91327a77 2818 list_marker = sync_info.full_marker.position;
7c673cae 2819
91327a77 2820 total_entries = sync_info.full_marker.count;
7c673cae
FG
2821 do {
2822 if (!lease_cr->is_locked()) {
2823 drain_all();
2824 return set_cr_error(-ECANCELED);
2825 }
2826 set_status("listing remote bucket");
11fdf7f2 2827 tn->log(20, "listing bucket for full sync");
7c673cae
FG
2828 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2829 &list_result));
2830 if (retcode < 0 && retcode != -ENOENT) {
2831 set_status("failed bucket listing, going down");
2832 drain_all();
2833 return set_cr_error(retcode);
2834 }
11fdf7f2
TL
2835 if (list_result.entries.size() > 0) {
2836 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
2837 }
7c673cae
FG
2838 entries_iter = list_result.entries.begin();
2839 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2840 if (!lease_cr->is_locked()) {
2841 drain_all();
2842 return set_cr_error(-ECANCELED);
2843 }
11fdf7f2
TL
2844 tn->log(20, SSTR("[full sync] syncing object: "
2845 << bucket_shard_str{bs} << "/" << entries_iter->key));
7c673cae
FG
2846 entry = &(*entries_iter);
2847 total_entries++;
2848 list_marker = entries_iter->key;
2849 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
11fdf7f2 2850 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
7c673cae 2851 } else {
7c673cae
FG
2852 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2853 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2854 false, /* versioned, only matters for object removal */
2855 entry->versioned_epoch, entry->mtime,
a8e16298 2856 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
11fdf7f2 2857 entry->key, &marker_tracker, zones_trace, tn),
7c673cae
FG
2858 false);
2859 }
2860 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2861 yield wait_for_child();
2862 bool again = true;
2863 while (again) {
2864 again = collect(&ret, nullptr);
2865 if (ret < 0) {
11fdf7f2 2866 tn->log(10, "a sync operation returned error");
7c673cae
FG
2867 sync_status = ret;
2868 /* we have reported this error */
2869 }
2870 }
2871 }
2872 }
2873 } while (list_result.is_truncated && sync_status == 0);
2874 set_status("done iterating over all objects");
2875 /* wait for all operations to complete */
2876 while (num_spawned()) {
2877 yield wait_for_child();
2878 bool again = true;
2879 while (again) {
2880 again = collect(&ret, nullptr);
2881 if (ret < 0) {
11fdf7f2 2882 tn->log(10, "a sync operation returned error");
7c673cae
FG
2883 sync_status = ret;
2884 /* we have reported this error */
2885 }
2886 }
2887 }
11fdf7f2 2888 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2889 if (!lease_cr->is_locked()) {
2890 return set_cr_error(-ECANCELED);
2891 }
2892 /* update sync state to incremental */
2893 if (sync_status == 0) {
2894 yield {
91327a77 2895 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
7c673cae 2896 map<string, bufferlist> attrs;
91327a77 2897 sync_info.encode_state_attr(attrs);
7c673cae 2898 RGWRados *store = sync_env->store;
11fdf7f2
TL
2899 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2900 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
2901 attrs));
2902 }
2903 } else {
11fdf7f2 2904 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
2905 }
2906 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
11fdf7f2
TL
2907 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
2908 << bucket_shard_str{bs} << " retcode=" << retcode));
7c673cae
FG
2909 return set_cr_error(retcode);
2910 }
2911 if (sync_status < 0) {
2912 return set_cr_error(sync_status);
2913 }
2914 return set_cr_done();
2915 }
2916 return 0;
2917}
2918
91327a77
AA
2919static bool has_olh_epoch(RGWModifyOp op) {
2920 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2921}
2922
7c673cae
FG
2923class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2924 RGWDataSyncEnv *sync_env;
2925 const rgw_bucket_shard& bs;
2926 RGWBucketInfo *bucket_info;
2927 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2928 list<rgw_bi_log_entry> list_result;
91327a77 2929 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
7c673cae 2930 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
91327a77 2931 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2932 rgw_obj_key key;
2933 rgw_bi_log_entry *entry{nullptr};
2934 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2935 bool updated_status{false};
2936 const string& status_oid;
31f18b77 2937 const string& zone_id;
7c673cae
FG
2938
2939 string cur_id;
2940
7c673cae 2941 int sync_status{0};
c07f9fc5 2942 bool syncstopped{false};
7c673cae 2943
11fdf7f2 2944 RGWSyncTraceNodeRef tn;
7c673cae
FG
2945public:
2946 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2947 const rgw_bucket_shard& bs,
2948 RGWBucketInfo *_bucket_info,
2949 const std::string& status_oid,
2950 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2951 rgw_bucket_shard_sync_info& sync_info,
2952 RGWSyncTraceNodeRef& _tn_parent)
7c673cae 2953 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2954 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2955 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
11fdf7f2
TL
2956 status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
2957 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
2958 SSTR(bucket_shard_str{bs})))
91327a77 2959 {
7c673cae
FG
2960 set_description() << "bucket shard incremental sync bucket="
2961 << bucket_shard_str{bs};
2962 set_status("init");
11fdf7f2 2963 marker_tracker.set_tn(tn);
7c673cae
FG
2964 }
2965
2966 int operate() override;
2967};
2968
2969int RGWBucketShardIncrementalSyncCR::operate()
2970{
2971 int ret;
2972 reenter(this) {
2973 do {
2974 if (!lease_cr->is_locked()) {
2975 drain_all();
11fdf7f2 2976 tn->log(0, "ERROR: lease is not taken, abort");
7c673cae
FG
2977 return set_cr_error(-ECANCELED);
2978 }
11fdf7f2 2979 tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
91327a77
AA
2980 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
2981 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
7c673cae 2982 &list_result));
91327a77
AA
2983 if (retcode < 0 && retcode != -ENOENT) {
2984 /* wait for all operations to complete */
7c673cae 2985 drain_all();
91327a77 2986 return set_cr_error(retcode);
7c673cae
FG
2987 }
2988 squash_map.clear();
91327a77
AA
2989 entries_iter = list_result.begin();
2990 entries_end = list_result.end();
2991 for (; entries_iter != entries_end; ++entries_iter) {
2992 auto e = *entries_iter;
2993 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
2994 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
c07f9fc5 2995 syncstopped = true;
91327a77
AA
2996 entries_end = entries_iter; // dont sync past here
2997 break;
c07f9fc5 2998 }
91327a77 2999 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
c07f9fc5
FG
3000 continue;
3001 }
94b18763
FG
3002 if (e.op == CLS_RGW_OP_CANCEL) {
3003 continue;
3004 }
7c673cae
FG
3005 if (e.state != CLS_RGW_STATE_COMPLETE) {
3006 continue;
3007 }
31f18b77
FG
3008 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
3009 continue;
3010 }
7c673cae 3011 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
91327a77
AA
3012 // don't squash over olh entries - we need to apply their olh_epoch
3013 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
3014 continue;
3015 }
7c673cae
FG
3016 if (squash_entry.first <= e.timestamp) {
3017 squash_entry = make_pair<>(e.timestamp, e.op);
3018 }
3019 }
c07f9fc5 3020
7c673cae 3021 entries_iter = list_result.begin();
91327a77 3022 for (; entries_iter != entries_end; ++entries_iter) {
7c673cae
FG
3023 if (!lease_cr->is_locked()) {
3024 drain_all();
3025 return set_cr_error(-ECANCELED);
3026 }
3027 entry = &(*entries_iter);
3028 {
3029 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3030 if (p < 0) {
3031 cur_id = entry->id;
3032 } else {
3033 cur_id = entry->id.substr(p + 1);
3034 }
3035 }
91327a77 3036 sync_info.inc_marker.position = cur_id;
7c673cae 3037
c07f9fc5 3038 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
91327a77 3039 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
c07f9fc5
FG
3040 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3041 continue;
3042 }
3043
7c673cae
FG
3044 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
3045 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
11fdf7f2 3046 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
7c673cae
FG
3047 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3048 continue;
3049 }
3050
11fdf7f2 3051 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
7c673cae
FG
3052
3053 if (!key.ns.empty()) {
3054 set_status() << "skipping entry in namespace: " << entry->object;
11fdf7f2 3055 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
7c673cae
FG
3056 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3057 continue;
3058 }
3059
3060 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
3061 if (entry->op == CLS_RGW_OP_CANCEL) {
3062 set_status() << "canceled operation, skipping";
11fdf7f2
TL
3063 tn->log(20, SSTR("skipping object: "
3064 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
7c673cae
FG
3065 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3066 continue;
3067 }
3068 if (entry->state != CLS_RGW_STATE_COMPLETE) {
3069 set_status() << "non-complete operation, skipping";
11fdf7f2
TL
3070 tn->log(20, SSTR("skipping object: "
3071 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
7c673cae
FG
3072 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3073 continue;
3074 }
31f18b77
FG
3075 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
3076 set_status() << "redundant operation, skipping";
11fdf7f2
TL
3077 tn->log(20, SSTR("skipping object: "
3078 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
31f18b77
FG
3079 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3080 continue;
3081 }
7c673cae
FG
3082 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
3083 set_status() << "squashed operation, skipping";
11fdf7f2
TL
3084 tn->log(20, SSTR("skipping object: "
3085 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
91327a77 3086 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
7c673cae
FG
3087 continue;
3088 }
11fdf7f2
TL
3089 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
3090 tn->log(20, SSTR("syncing object: "
3091 << bucket_shard_str{bs} << "/" << key));
7c673cae 3092 updated_status = false;
91327a77 3093 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
7c673cae
FG
3094 if (!updated_status) {
3095 set_status() << "can't do op, conflicting inflight operation";
3096 updated_status = true;
3097 }
11fdf7f2 3098 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
7c673cae
FG
3099 yield wait_for_child();
3100 bool again = true;
3101 while (again) {
3102 again = collect(&ret, nullptr);
3103 if (ret < 0) {
11fdf7f2 3104 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
7c673cae
FG
3105 sync_status = ret;
3106 /* we have reported this error */
3107 }
3108 }
1adf2230
AA
3109 if (sync_status != 0)
3110 break;
3111 }
3112 if (sync_status != 0) {
3113 /* get error, stop */
3114 break;
7c673cae 3115 }
91327a77 3116 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
7c673cae 3117 set_status() << "can't do op, sync already in progress for object";
11fdf7f2 3118 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
7c673cae
FG
3119 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3120 continue;
3121 }
3122 // yield {
3123 set_status() << "start object sync";
3124 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
11fdf7f2 3125 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
7c673cae 3126 } else {
11fdf7f2 3127 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
3128 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
3129 if (entry->ver.pool < 0) {
3130 versioned_epoch = entry->ver.epoch;
3131 }
11fdf7f2 3132 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
7c673cae
FG
3133 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
3134 spawn(new SyncCR(sync_env, bucket_info, bs, key,
3135 entry->is_versioned(), versioned_epoch,
3136 entry->timestamp, owner, entry->op, entry->state,
11fdf7f2 3137 cur_id, &marker_tracker, entry->zones_trace, tn),
7c673cae
FG
3138 false);
3139 }
3140 // }
3141 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
3142 set_status() << "num_spawned() > spawn_window";
3143 yield wait_for_child();
3144 bool again = true;
3145 while (again) {
3146 again = collect(&ret, nullptr);
3147 if (ret < 0) {
11fdf7f2 3148 tn->log(10, "a sync operation returned error");
7c673cae
FG
3149 sync_status = ret;
3150 /* we have reported this error */
3151 }
3152 /* not waiting for child here */
3153 }
3154 }
3155 }
91327a77 3156 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
c07f9fc5 3157
7c673cae
FG
3158 while (num_spawned()) {
3159 yield wait_for_child();
3160 bool again = true;
3161 while (again) {
3162 again = collect(&ret, nullptr);
3163 if (ret < 0) {
11fdf7f2 3164 tn->log(10, "a sync operation returned error");
7c673cae
FG
3165 sync_status = ret;
3166 /* we have reported this error */
3167 }
3168 /* not waiting for child here */
3169 }
3170 }
11fdf7f2 3171 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 3172
91327a77
AA
3173 if (syncstopped) {
3174 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3175 // still disabled, we'll delete the sync status object. otherwise we'll
3176 // restart full sync to catch any changes that happened while sync was
3177 // disabled
3178 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
3179 return set_cr_done();
3180 }
3181
7c673cae
FG
3182 yield call(marker_tracker.flush());
3183 if (retcode < 0) {
11fdf7f2 3184 tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
7c673cae
FG
3185 return set_cr_error(retcode);
3186 }
3187 if (sync_status < 0) {
11fdf7f2 3188 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
3189 return set_cr_error(sync_status);
3190 }
7c673cae
FG
3191 return set_cr_done();
3192 }
3193 return 0;
3194}
3195
3196int RGWRunBucketSyncCoroutine::operate()
3197{
3198 reenter(this) {
3199 yield {
3200 set_status("acquiring sync lock");
3201 auto store = sync_env->store;
31f18b77 3202 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 3203 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
3204 "sync_lock",
3205 cct->_conf->rgw_sync_lease_period,
3206 this));
3207 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
3208 }
3209 while (!lease_cr->is_locked()) {
3210 if (lease_cr->is_done()) {
11fdf7f2 3211 tn->log(5, "failed to take lease");
7c673cae 3212 set_status("lease lock failed, early abort");
a8e16298 3213 drain_all();
7c673cae
FG
3214 return set_cr_error(lease_cr->get_ret_status());
3215 }
3216 set_sleeping(true);
3217 yield;
3218 }
3219
11fdf7f2 3220 tn->log(10, "took lease");
7c673cae
FG
3221 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3222 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 3223 tn->log(0, "ERROR: failed to read sync status for bucket");
7c673cae
FG
3224 lease_cr->go_down();
3225 drain_all();
3226 return set_cr_error(retcode);
3227 }
3228
11fdf7f2 3229 tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
7c673cae
FG
3230
3231 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3232 if (retcode == -ENOENT) {
3233 /* bucket instance info has not been synced in yet, fetch it now */
3234 yield {
11fdf7f2 3235 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
7c673cae
FG
3236 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3237
11fdf7f2
TL
3238 meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
3239 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
7c673cae
FG
3240
3241 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3242 string() /* no marker */,
3243 MDLOG_STATUS_COMPLETE,
11fdf7f2
TL
3244 NULL /* no marker tracker */,
3245 tn));
7c673cae
FG
3246 }
3247 if (retcode < 0) {
11fdf7f2 3248 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
7c673cae
FG
3249 lease_cr->go_down();
3250 drain_all();
3251 return set_cr_error(retcode);
3252 }
3253
3254 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3255 }
3256 if (retcode < 0) {
11fdf7f2 3257 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
7c673cae
FG
3258 lease_cr->go_down();
3259 drain_all();
3260 return set_cr_error(retcode);
3261 }
3262
91327a77
AA
3263 do {
3264 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3265 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
a8e16298 3266 if (retcode == -ENOENT) {
11fdf7f2 3267 tn->log(0, "bucket sync disabled");
a8e16298
TL
3268 lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
3269 lease_cr->wakeup();
3270 lease_cr.reset();
3271 drain_all();
3272 return set_cr_done();
3273 }
91327a77 3274 if (retcode < 0) {
11fdf7f2 3275 tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
91327a77
AA
3276 lease_cr->go_down();
3277 drain_all();
3278 return set_cr_error(retcode);
3279 }
7c673cae 3280 }
7c673cae 3281
91327a77
AA
3282 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3283 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3284 status_oid, lease_cr.get(),
11fdf7f2 3285 sync_status, tn));
91327a77 3286 if (retcode < 0) {
11fdf7f2 3287 tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
91327a77
AA
3288 lease_cr->go_down();
3289 drain_all();
3290 return set_cr_error(retcode);
3291 }
7c673cae 3292 }
7c673cae 3293
91327a77
AA
3294 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3295 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3296 status_oid, lease_cr.get(),
11fdf7f2 3297 sync_status, tn));
91327a77 3298 if (retcode < 0) {
11fdf7f2 3299 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
91327a77
AA
3300 lease_cr->go_down();
3301 drain_all();
3302 return set_cr_error(retcode);
3303 }
7c673cae 3304 }
91327a77
AA
3305 // loop back to previous states unless incremental sync returns normally
3306 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
7c673cae
FG
3307
3308 lease_cr->go_down();
3309 drain_all();
3310 return set_cr_done();
3311 }
3312
3313 return 0;
3314}
3315
3316RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3317{
11fdf7f2 3318 return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
7c673cae
FG
3319}
3320
3321int RGWBucketSyncStatusManager::init()
3322{
11fdf7f2 3323 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 3324 if (!conn) {
11fdf7f2 3325 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
3326 return -EINVAL;
3327 }
3328
11fdf7f2 3329 int ret = http_manager.start();
7c673cae 3330 if (ret < 0) {
11fdf7f2 3331 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
3332 return ret;
3333 }
3334
3335
3336 const string key = bucket.get_key();
3337
3338 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3339 { NULL, NULL } };
3340
3341 string path = string("/admin/metadata/bucket.instance");
3342
3343 bucket_instance_meta_info result;
3344 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3345 if (ret < 0) {
11fdf7f2 3346 ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
7c673cae
FG
3347 return ret;
3348 }
3349
3350 RGWBucketInfo& bi = result.data.get_bucket_info();
3351 num_shards = bi.num_shards;
3352
3353 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3354
3355 sync_module.reset(new RGWDefaultSyncModuleInstance());
3356
3357 int effective_num_shards = (num_shards ? num_shards : 1);
3358
3359 auto async_rados = store->get_async_rados();
3360
3361 for (int i = 0; i < effective_num_shards; i++) {
11fdf7f2
TL
3362 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
3363 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
7c673cae 3364 if (ret < 0) {
11fdf7f2 3365 ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
7c673cae
FG
3366 return ret;
3367 }
3368 source_logs[i] = l;
3369 }
3370
3371 return 0;
3372}
3373
3374int RGWBucketSyncStatusManager::init_sync_status()
3375{
3376 list<RGWCoroutinesStack *> stacks;
3377
3378 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3379 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3380 RGWRemoteBucketLog *l = iter->second;
3381 stack->call(l->init_sync_status_cr());
3382
3383 stacks.push_back(stack);
3384 }
3385
3386 return cr_mgr.run(stacks);
3387}
3388
3389int RGWBucketSyncStatusManager::read_sync_status()
3390{
3391 list<RGWCoroutinesStack *> stacks;
3392
3393 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3394 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3395 RGWRemoteBucketLog *l = iter->second;
3396 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3397
3398 stacks.push_back(stack);
3399 }
3400
3401 int ret = cr_mgr.run(stacks);
3402 if (ret < 0) {
11fdf7f2 3403 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3404 << bucket_str{bucket} << dendl;
3405 return ret;
3406 }
3407
3408 return 0;
3409}
3410
3411int RGWBucketSyncStatusManager::run()
3412{
3413 list<RGWCoroutinesStack *> stacks;
3414
3415 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3416 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3417 RGWRemoteBucketLog *l = iter->second;
3418 stack->call(l->run_sync_cr());
3419
3420 stacks.push_back(stack);
3421 }
3422
3423 int ret = cr_mgr.run(stacks);
3424 if (ret < 0) {
11fdf7f2 3425 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3426 << bucket_str{bucket} << dendl;
3427 return ret;
3428 }
3429
3430 return 0;
3431}
3432
11fdf7f2
TL
3433unsigned RGWBucketSyncStatusManager::get_subsys() const
3434{
3435 return dout_subsys;
3436}
3437
3438std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
3439{
3440 auto zone = std::string_view{source_zone};
3441 return out << "bucket sync zone:" << zone.substr(0, 8)
3442 << " bucket:" << bucket.name << ' ';
3443}
3444
7c673cae
FG
3445string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3446 const rgw_bucket_shard& bs)
3447{
3448 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3449}
3450
11fdf7f2
TL
3451string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
3452 const rgw_obj& obj)
3453{
3454 return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
3455 obj.key.name + ":" + obj.key.instance;
3456}
3457
b32b8144
FG
3458class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3459 static constexpr int max_concurrent_shards = 16;
3460 RGWRados *const store;
3461 RGWDataSyncEnv *const env;
3462 const int num_shards;
3463 rgw_bucket_shard bs;
3464
3465 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3466 Vector::iterator i, end;
3467
3468 public:
3469 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3470 int num_shards, const rgw_bucket& bucket,
3471 Vector *status)
3472 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3473 store(store), env(env), num_shards(num_shards),
3474 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3475 i(status->begin()), end(status->end())
3476 {}
3477
3478 bool spawn_next() override {
3479 if (i == end) {
3480 return false;
3481 }
3482 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3483 ++i;
3484 ++bs.shard_id;
3485 return true;
3486 }
3487};
3488
11fdf7f2 3489int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
28e407b8 3490 const RGWBucketInfo& bucket_info,
b32b8144
FG
3491 std::vector<rgw_bucket_shard_sync_info> *status)
3492{
28e407b8 3493 const auto num_shards = bucket_info.num_shards;
b32b8144 3494 status->clear();
28e407b8 3495 status->resize(std::max<size_t>(1, num_shards));
b32b8144
FG
3496
3497 RGWDataSyncEnv env;
3498 RGWSyncModuleInstanceRef module; // null sync module
11fdf7f2 3499 env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
81eedcae 3500 nullptr, nullptr, nullptr, source_zone, module, nullptr);
b32b8144
FG
3501
3502 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
28e407b8
AA
3503 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3504 bucket_info.bucket, status));
b32b8144
FG
3505}
3506
7c673cae
FG
3507
3508// TODO: move into rgw_data_sync_trim.cc
3509#undef dout_prefix
3510#define dout_prefix (*_dout << "data trim: ")
3511
3512namespace {
3513
3514/// return the marker that it's safe to trim up to
3515const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3516{
3517 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3518}
3519
3520/// comparison operator for take_min_markers()
3521bool operator<(const rgw_data_sync_marker& lhs,
3522 const rgw_data_sync_marker& rhs)
3523{
3524 // sort by stable marker
3525 return get_stable_marker(lhs) < get_stable_marker(rhs);
3526}
3527
3528/// populate the container starting with 'dest' with the minimum stable marker
3529/// of each shard for all of the peers in [first, last)
3530template <typename IterIn, typename IterOut>
3531void take_min_markers(IterIn first, IterIn last, IterOut dest)
3532{
3533 if (first == last) {
3534 return;
3535 }
3536 // initialize markers with the first peer's
3537 auto m = dest;
3538 for (auto &shard : first->sync_markers) {
3539 *m = std::move(shard.second);
3540 ++m;
3541 }
3542 // for remaining peers, replace with smaller markers
3543 for (auto p = first + 1; p != last; ++p) {
3544 m = dest;
3545 for (auto &shard : p->sync_markers) {
3546 if (shard.second < *m) {
3547 *m = std::move(shard.second);
3548 }
3549 ++m;
3550 }
3551 }
3552}
3553
3554} // anonymous namespace
3555
3556class DataLogTrimCR : public RGWCoroutine {
3557 RGWRados *store;
3558 RGWHTTPManager *http;
3559 const int num_shards;
3560 const std::string& zone_id; //< my zone id
3561 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3562 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3563 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3564 int ret{0};
3565
3566 public:
3567 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3568 int num_shards, std::vector<std::string>& last_trim)
3569 : RGWCoroutine(store->ctx()), store(store), http(http),
3570 num_shards(num_shards),
11fdf7f2 3571 zone_id(store->svc.zone->get_zone().id),
81eedcae 3572 peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
7c673cae
FG
3573 min_shard_markers(num_shards),
3574 last_trim(last_trim)
3575 {}
3576
3577 int operate() override;
3578};
3579
3580int DataLogTrimCR::operate()
3581{
3582 reenter(this) {
3583 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3584 set_status("fetching sync status");
3585 yield {
3586 // query data sync status from each sync peer
3587 rgw_http_param_pair params[] = {
3588 { "type", "data" },
3589 { "status", nullptr },
3590 { "source-zone", zone_id.c_str() },
3591 { nullptr, nullptr }
3592 };
3593
3594 auto p = peer_status.begin();
81eedcae 3595 for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
7c673cae
FG
3596 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3597 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3598 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3599 false);
3600 ++p;
3601 }
3602 }
3603
3604 // must get a successful reply from all peers to consider trimming
3605 ret = 0;
3606 while (ret == 0 && num_spawned() > 0) {
3607 yield wait_for_child();
3608 collect_next(&ret);
3609 }
3610 drain_all();
3611
3612 if (ret < 0) {
3613 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3614 return set_cr_error(ret);
3615 }
3616
3617 ldout(cct, 10) << "trimming log shards" << dendl;
3618 set_status("trimming log shards");
3619 yield {
3620 // determine the minimum marker for each shard
3621 take_min_markers(peer_status.begin(), peer_status.end(),
3622 min_shard_markers.begin());
3623
3624 for (int i = 0; i < num_shards; i++) {
3625 const auto& m = min_shard_markers[i];
3626 auto& stable = get_stable_marker(m);
3627 if (stable <= last_trim[i]) {
3628 continue;
3629 }
3630 ldout(cct, 10) << "trimming log shard " << i
3631 << " at marker=" << stable
3632 << " last_trim=" << last_trim[i] << dendl;
3633 using TrimCR = RGWSyncLogTrimCR;
3634 spawn(new TrimCR(store, store->data_log->get_oid(i),
3635 stable, &last_trim[i]),
3636 true);
3637 }
3638 }
3639 return set_cr_done();
3640 }
3641 return 0;
3642}
3643
a8e16298
TL
3644RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
3645 RGWHTTPManager *http,
3646 int num_shards,
3647 std::vector<std::string>& markers)
3648{
3649 return new DataLogTrimCR(store, http, num_shards, markers);
3650}
3651
7c673cae
FG
3652class DataLogTrimPollCR : public RGWCoroutine {
3653 RGWRados *store;
3654 RGWHTTPManager *http;
3655 const int num_shards;
3656 const utime_t interval; //< polling interval
3657 const std::string lock_oid; //< use first data log shard for lock
3658 const std::string lock_cookie;
3659 std::vector<std::string> last_trim; //< last trimmed marker per shard
3660
3661 public:
3662 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3663 int num_shards, utime_t interval)
3664 : RGWCoroutine(store->ctx()), store(store), http(http),
3665 num_shards(num_shards), interval(interval),
3666 lock_oid(store->data_log->get_oid(0)),
3667 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3668 last_trim(num_shards)
3669 {}
3670
3671 int operate() override;
3672};
3673
3674int DataLogTrimPollCR::operate()
3675{
3676 reenter(this) {
3677 for (;;) {
3678 set_status("sleeping");
3679 wait(interval);
3680
3681 // request a 'data_trim' lock that covers the entire wait interval to
3682 // prevent other gateways from attempting to trim for the duration
3683 set_status("acquiring trim lock");
3684 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
11fdf7f2 3685 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
7c673cae
FG
3686 "data_trim", lock_cookie,
3687 interval.sec()));
3688 if (retcode < 0) {
3689 // if the lock is already held, go back to sleep and try again later
3690 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3691 << interval.sec() << "s" << dendl;
3692 continue;
3693 }
3694
3695 set_status("trimming");
3696 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3697
3698 // note that the lock is not released. this is intentional, as it avoids
3699 // duplicating this work in other gateways
3700 }
3701 }
3702 return 0;
3703}
3704
3705RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3706 RGWHTTPManager *http,
3707 int num_shards, utime_t interval)
3708{
3709 return new DataLogTrimPollCR(store, http, num_shards, interval);
3710}