]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
CommitLineData
91327a77
AA
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include <boost/utility/string_ref.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/errno.h"
12
13#include "rgw_common.h"
14#include "rgw_rados.h"
11fdf7f2 15#include "rgw_zone.h"
7c673cae
FG
16#include "rgw_sync.h"
17#include "rgw_data_sync.h"
18#include "rgw_rest_conn.h"
19#include "rgw_cr_rados.h"
20#include "rgw_cr_rest.h"
21#include "rgw_http_client.h"
22#include "rgw_bucket.h"
23#include "rgw_metadata.h"
81eedcae 24#include "rgw_sync_counters.h"
7c673cae 25#include "rgw_sync_module.h"
b32b8144 26#include "rgw_sync_log_trim.h"
7c673cae
FG
27
28#include "cls/lock/cls_lock_client.h"
29
11fdf7f2
TL
30#include "services/svc_zone.h"
31#include "services/svc_sync_modules.h"
32
33#include "include/random.h"
31f18b77
FG
34
35#include <boost/asio/yield.hpp>
36
7c673cae
FG
37#define dout_subsys ceph_subsys_rgw
38
39#undef dout_prefix
40#define dout_prefix (*_dout << "data sync: ")
41
42static string datalog_sync_status_oid_prefix = "datalog.sync-status";
43static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
44static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
45static string bucket_status_oid_prefix = "bucket.sync-status";
11fdf7f2 46static string object_status_oid_prefix = "bucket.sync-status";
7c673cae 47
7c673cae
FG
48
49void rgw_datalog_info::decode_json(JSONObj *obj) {
50 JSONDecoder::decode_json("num_objects", num_shards, obj);
51}
52
53void rgw_datalog_entry::decode_json(JSONObj *obj) {
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58}
59
60void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
61 JSONDecoder::decode_json("marker", marker, obj);
62 JSONDecoder::decode_json("truncated", truncated, obj);
63 JSONDecoder::decode_json("entries", entries, obj);
64};
65
66class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
67 static constexpr int MAX_CONCURRENT_SHARDS = 16;
68
69 RGWDataSyncEnv *env;
70 const int num_shards;
71 int shard_id{0};;
72
73 map<uint32_t, rgw_data_sync_marker>& markers;
74
75 public:
76 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
77 map<uint32_t, rgw_data_sync_marker>& markers)
78 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
79 env(env), num_shards(num_shards), markers(markers)
80 {}
81 bool spawn_next() override;
82};
83
84bool RGWReadDataSyncStatusMarkersCR::spawn_next()
85{
86 if (shard_id >= num_shards) {
87 return false;
88 }
89 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
90 spawn(new CR(env->async_rados, env->store->svc.sysobj,
91 rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
7c673cae
FG
92 &markers[shard_id]),
93 false);
94 shard_id++;
95 return true;
96}
97
28e407b8
AA
98class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
99 static constexpr int MAX_CONCURRENT_SHARDS = 16;
100
101 RGWDataSyncEnv *env;
102
103 uint64_t max_entries;
104 int num_shards;
11fdf7f2 105 int shard_id{0};
28e407b8
AA
106
107 string marker;
11fdf7f2 108 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
28e407b8
AA
109
110 public:
111 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
11fdf7f2 112 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
28e407b8 113 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
11fdf7f2 114 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
28e407b8
AA
115 {}
116 bool spawn_next() override;
117};
118
119bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
120{
11fdf7f2 121 if (shard_id >= num_shards)
28e407b8
AA
122 return false;
123
124 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
11fdf7f2
TL
125 auto& shard_keys = omapkeys[shard_id];
126 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
127 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
128 marker, max_entries, shard_keys), false);
28e407b8
AA
129
130 ++shard_id;
131 return true;
132}
133
7c673cae
FG
134class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
135 RGWDataSyncEnv *sync_env;
136 rgw_data_sync_status *sync_status;
137
138public:
139 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
140 rgw_data_sync_status *_status)
141 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
142 {}
143 int operate() override;
144};
145
146int RGWReadDataSyncStatusCoroutine::operate()
147{
148 reenter(this) {
149 // read sync info
150 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
151 yield {
152 bool empty_on_enoent = false; // fail on ENOENT
11fdf7f2
TL
153 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
154 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
155 &sync_status->sync_info, empty_on_enoent));
156 }
157 if (retcode < 0) {
158 ldout(sync_env->cct, 4) << "failed to read sync status info with "
159 << cpp_strerror(retcode) << dendl;
160 return set_cr_error(retcode);
161 }
162 // read shard markers
163 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
164 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
165 sync_status->sync_markers));
166 if (retcode < 0) {
167 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
168 << cpp_strerror(retcode) << dendl;
169 return set_cr_error(retcode);
170 }
171 return set_cr_done();
172 }
173 return 0;
174}
175
176class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
177 RGWDataSyncEnv *sync_env;
178
179 RGWRESTReadResource *http_op;
180
181 int shard_id;
182 RGWDataChangesLogInfo *shard_info;
183
184public:
185 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
186 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
187 sync_env(_sync_env),
188 http_op(NULL),
189 shard_id(_shard_id),
190 shard_info(_shard_info) {
191 }
192
193 ~RGWReadRemoteDataLogShardInfoCR() override {
194 if (http_op) {
195 http_op->put();
196 }
197 }
198
199 int operate() override {
200 reenter(this) {
201 yield {
202 char buf[16];
203 snprintf(buf, sizeof(buf), "%d", shard_id);
204 rgw_http_param_pair pairs[] = { { "type" , "data" },
205 { "id", buf },
206 { "info" , NULL },
207 { NULL, NULL } };
208
209 string p = "/admin/log/";
210
211 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
212
11fdf7f2 213 init_new_io(http_op);
7c673cae
FG
214
215 int ret = http_op->aio_read();
216 if (ret < 0) {
217 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
218 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
219 return set_cr_error(ret);
220 }
221
222 return io_block(0);
223 }
224 yield {
225 int ret = http_op->wait(shard_info);
226 if (ret < 0) {
227 return set_cr_error(ret);
228 }
229 return set_cr_done();
230 }
231 }
232 return 0;
233 }
234};
235
236struct read_remote_data_log_response {
237 string marker;
238 bool truncated;
239 list<rgw_data_change_log_entry> entries;
240
241 read_remote_data_log_response() : truncated(false) {}
242
243 void decode_json(JSONObj *obj) {
244 JSONDecoder::decode_json("marker", marker, obj);
245 JSONDecoder::decode_json("truncated", truncated, obj);
246 JSONDecoder::decode_json("entries", entries, obj);
247 };
248};
249
250class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
251 RGWDataSyncEnv *sync_env;
252
a8e16298 253 RGWRESTReadResource *http_op = nullptr;
7c673cae
FG
254
255 int shard_id;
a8e16298
TL
256 const std::string& marker;
257 string *pnext_marker;
7c673cae
FG
258 list<rgw_data_change_log_entry> *entries;
259 bool *truncated;
260
261 read_remote_data_log_response response;
81eedcae 262 std::optional<PerfGuard> timer;
7c673cae
FG
263
264public:
a8e16298
TL
265 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
266 const std::string& marker, string *pnext_marker,
267 list<rgw_data_change_log_entry> *_entries,
268 bool *_truncated)
269 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
270 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
271 entries(_entries), truncated(_truncated) {
7c673cae
FG
272 }
273 ~RGWReadRemoteDataLogShardCR() override {
274 if (http_op) {
275 http_op->put();
276 }
277 }
278
279 int operate() override {
280 reenter(this) {
281 yield {
282 char buf[16];
283 snprintf(buf, sizeof(buf), "%d", shard_id);
284 rgw_http_param_pair pairs[] = { { "type" , "data" },
285 { "id", buf },
a8e16298 286 { "marker", marker.c_str() },
7c673cae
FG
287 { "extra-info", "true" },
288 { NULL, NULL } };
289
290 string p = "/admin/log/";
291
292 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
293
11fdf7f2 294 init_new_io(http_op);
7c673cae 295
81eedcae
TL
296 if (sync_env->counters) {
297 timer.emplace(sync_env->counters, sync_counters::l_poll);
298 }
7c673cae
FG
299 int ret = http_op->aio_read();
300 if (ret < 0) {
301 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
302 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
81eedcae
TL
303 if (sync_env->counters) {
304 sync_env->counters->inc(sync_counters::l_poll_err);
305 }
7c673cae
FG
306 return set_cr_error(ret);
307 }
308
309 return io_block(0);
310 }
311 yield {
81eedcae 312 timer.reset();
7c673cae
FG
313 int ret = http_op->wait(&response);
314 if (ret < 0) {
81eedcae
TL
315 if (sync_env->counters && ret != -ENOENT) {
316 sync_env->counters->inc(sync_counters::l_poll_err);
317 }
7c673cae
FG
318 return set_cr_error(ret);
319 }
320 entries->clear();
321 entries->swap(response.entries);
a8e16298 322 *pnext_marker = response.marker;
7c673cae
FG
323 *truncated = response.truncated;
324 return set_cr_done();
325 }
326 }
327 return 0;
328 }
329};
330
331class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
332 RGWDataSyncEnv *sync_env;
333
334 int num_shards;
335 map<int, RGWDataChangesLogInfo> *datalog_info;
336
337 int shard_id;
338#define READ_DATALOG_MAX_CONCURRENT 10
339
340public:
341 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
342 int _num_shards,
343 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
344 sync_env(_sync_env), num_shards(_num_shards),
345 datalog_info(_datalog_info), shard_id(0) {}
346 bool spawn_next() override;
347};
348
349bool RGWReadRemoteDataLogInfoCR::spawn_next() {
350 if (shard_id >= num_shards) {
351 return false;
352 }
353 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
354 shard_id++;
355 return true;
356}
357
358class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
359 RGWDataSyncEnv *sync_env;
360 RGWRESTReadResource *http_op;
361
362 int shard_id;
363 string marker;
364 uint32_t max_entries;
365 rgw_datalog_shard_data *result;
366
367public:
368 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
369 const string& _marker, uint32_t _max_entries,
370 rgw_datalog_shard_data *_result)
371 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
372 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
373
374 int send_request() override {
375 RGWRESTConn *conn = sync_env->conn;
376 RGWRados *store = sync_env->store;
377
378 char buf[32];
379 snprintf(buf, sizeof(buf), "%d", shard_id);
380
381 char max_entries_buf[32];
382 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
383
384 const char *marker_key = (marker.empty() ? "" : "marker");
385
386 rgw_http_param_pair pairs[] = { { "type", "data" },
387 { "id", buf },
388 { "max-entries", max_entries_buf },
389 { marker_key, marker.c_str() },
390 { NULL, NULL } };
391
392 string p = "/admin/log/";
393
394 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 395 init_new_io(http_op);
7c673cae
FG
396
397 int ret = http_op->aio_read();
398 if (ret < 0) {
399 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
400 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
401 http_op->put();
402 return ret;
403 }
404
405 return 0;
406 }
407
408 int request_complete() override {
409 int ret = http_op->wait(result);
410 http_op->put();
411 if (ret < 0 && ret != -ENOENT) {
412 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
413 return ret;
414 }
415 return 0;
416 }
417};
418
419class RGWListRemoteDataLogCR : public RGWShardCollectCR {
420 RGWDataSyncEnv *sync_env;
421
422 map<int, string> shards;
423 int max_entries_per_shard;
424 map<int, rgw_datalog_shard_data> *result;
425
426 map<int, string>::iterator iter;
427#define READ_DATALOG_MAX_CONCURRENT 10
428
429public:
430 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
431 map<int, string>& _shards,
432 int _max_entries_per_shard,
433 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
434 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
435 result(_result) {
436 shards.swap(_shards);
437 iter = shards.begin();
438 }
439 bool spawn_next() override;
440};
441
442bool RGWListRemoteDataLogCR::spawn_next() {
443 if (iter == shards.end()) {
444 return false;
445 }
446
447 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
448 ++iter;
449 return true;
450}
451
452class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
453 static constexpr uint32_t lock_duration = 30;
454 RGWDataSyncEnv *sync_env;
455 RGWRados *store;
456 const rgw_pool& pool;
457 const uint32_t num_shards;
458
459 string sync_status_oid;
460
461 string lock_name;
462 string cookie;
463 rgw_data_sync_status *status;
464 map<int, RGWDataChangesLogInfo> shards_info;
11fdf7f2
TL
465
466 RGWSyncTraceNodeRef tn;
7c673cae
FG
467public:
468 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
31f18b77 469 uint64_t instance_id,
11fdf7f2 470 RGWSyncTraceNodeRef& _tn_parent,
7c673cae
FG
471 rgw_data_sync_status *status)
472 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
11fdf7f2
TL
473 pool(store->svc.zone->get_zone_params().log_pool),
474 num_shards(num_shards), status(status),
475 tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
7c673cae
FG
476 lock_name = "sync_lock";
477
31f18b77
FG
478 status->sync_info.instance_id = instance_id;
479
7c673cae
FG
480#define COOKIE_LEN 16
481 char buf[COOKIE_LEN + 1];
482
483 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
484 cookie = buf;
485
486 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
11fdf7f2 487
7c673cae
FG
488 }
489
490 int operate() override {
491 int ret;
492 reenter(this) {
493 using LockCR = RGWSimpleRadosLockCR;
494 yield call(new LockCR(sync_env->async_rados, store,
495 rgw_raw_obj{pool, sync_status_oid},
496 lock_name, cookie, lock_duration));
497 if (retcode < 0) {
11fdf7f2 498 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
499 return set_cr_error(retcode);
500 }
501 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
11fdf7f2 502 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
503 rgw_raw_obj{pool, sync_status_oid},
504 status->sync_info));
505 if (retcode < 0) {
11fdf7f2 506 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
507 return set_cr_error(retcode);
508 }
509
510 /* take lock again, we just recreated the object */
511 yield call(new LockCR(sync_env->async_rados, store,
512 rgw_raw_obj{pool, sync_status_oid},
513 lock_name, cookie, lock_duration));
514 if (retcode < 0) {
11fdf7f2 515 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
516 return set_cr_error(retcode);
517 }
518
11fdf7f2
TL
519 tn->log(10, "took lease");
520
7c673cae
FG
521 /* fetch current position in logs */
522 yield {
11fdf7f2 523 RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
7c673cae 524 if (!conn) {
11fdf7f2 525 tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
7c673cae
FG
526 return set_cr_error(-EIO);
527 }
528 for (uint32_t i = 0; i < num_shards; i++) {
529 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
530 }
531 }
532 while (collect(&ret, NULL)) {
533 if (ret < 0) {
11fdf7f2 534 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
7c673cae
FG
535 return set_state(RGWCoroutine_Error);
536 }
537 yield;
538 }
539 yield {
540 for (uint32_t i = 0; i < num_shards; i++) {
541 RGWDataChangesLogInfo& info = shards_info[i];
542 auto& marker = status->sync_markers[i];
543 marker.next_step_marker = info.marker;
544 marker.timestamp = info.last_update;
545 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
546 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
11fdf7f2 547 spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
548 rgw_raw_obj{pool, oid}, marker), true);
549 }
550 }
551 while (collect(&ret, NULL)) {
552 if (ret < 0) {
11fdf7f2 553 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
7c673cae
FG
554 return set_state(RGWCoroutine_Error);
555 }
556 yield;
557 }
558
559 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
11fdf7f2 560 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
561 rgw_raw_obj{pool, sync_status_oid},
562 status->sync_info));
563 if (retcode < 0) {
11fdf7f2 564 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
565 return set_cr_error(retcode);
566 }
567 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
568 rgw_raw_obj{pool, sync_status_oid},
569 lock_name, cookie));
570 return set_cr_done();
571 }
572 return 0;
573 }
574};
575
576int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
577{
578 rgw_http_param_pair pairs[] = { { "type", "data" },
579 { NULL, NULL } };
580
581 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
582 if (ret < 0) {
11fdf7f2 583 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
7c673cae
FG
584 return ret;
585 }
586
11fdf7f2 587 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
588
589 return 0;
590}
591
592int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
593{
594 rgw_datalog_info log_info;
595 int ret = read_log_info(&log_info);
596 if (ret < 0) {
597 return ret;
598 }
599
600 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
601}
602
603int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
604{
7c673cae
FG
605 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
606}
607
11fdf7f2 608int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
81eedcae
TL
609 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
610 PerfCounters* counters)
7c673cae 611{
11fdf7f2 612 sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
81eedcae 613 _sync_tracer, _source_zone, _sync_module, counters);
7c673cae
FG
614
615 if (initialized) {
616 return 0;
617 }
618
11fdf7f2 619 int ret = http_manager.start();
7c673cae 620 if (ret < 0) {
11fdf7f2 621 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
622 return ret;
623 }
624
11fdf7f2
TL
625 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
626
7c673cae
FG
627 initialized = true;
628
629 return 0;
630}
631
632void RGWRemoteDataLog::finish()
633{
634 stop();
635}
636
7c673cae
FG
637int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
638{
639 // cannot run concurrently with run_sync(), so run in a separate manager
640 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
641 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 642 int ret = http_manager.start();
7c673cae 643 if (ret < 0) {
11fdf7f2 644 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
645 return ret;
646 }
647 RGWDataSyncEnv sync_env_local = sync_env;
648 sync_env_local.http_manager = &http_manager;
649 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
650 http_manager.stop();
651 return ret;
652}
653
28e407b8
AA
654int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
655{
656 // cannot run concurrently with run_sync(), so run in a separate manager
657 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
658 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 659 int ret = http_manager.start();
28e407b8 660 if (ret < 0) {
11fdf7f2 661 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
662 return ret;
663 }
664 RGWDataSyncEnv sync_env_local = sync_env;
665 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
666 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
667 omapkeys.resize(num_shards);
28e407b8 668 uint64_t max_entries{1};
11fdf7f2 669 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
28e407b8
AA
670 http_manager.stop();
671
672 if (ret == 0) {
11fdf7f2
TL
673 for (int i = 0; i < num_shards; i++) {
674 if (omapkeys[i]->entries.size() != 0) {
675 recovering_shards.insert(i);
28e407b8
AA
676 }
677 }
678 }
679
680 return ret;
681}
682
7c673cae
FG
683int RGWRemoteDataLog::init_sync_status(int num_shards)
684{
685 rgw_data_sync_status sync_status;
b32b8144
FG
686 sync_status.sync_info.num_shards = num_shards;
687
7c673cae
FG
688 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
689 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 690 int ret = http_manager.start();
7c673cae 691 if (ret < 0) {
11fdf7f2 692 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
693 return ret;
694 }
695 RGWDataSyncEnv sync_env_local = sync_env;
696 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
697 auto instance_id = ceph::util::generate_random_number<uint64_t>();
698 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
7c673cae
FG
699 http_manager.stop();
700 return ret;
701}
702
703static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
704{
705 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
706 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
707 return string(buf);
708}
709
494da23a
TL
710struct read_metadata_list {
711 string marker;
712 bool truncated;
713 list<string> keys;
714 int count;
715
716 read_metadata_list() : truncated(false), count(0) {}
717
718 void decode_json(JSONObj *obj) {
719 JSONDecoder::decode_json("marker", marker, obj);
720 JSONDecoder::decode_json("truncated", truncated, obj);
721 JSONDecoder::decode_json("keys", keys, obj);
722 JSONDecoder::decode_json("count", count, obj);
723 }
724};
725
7c673cae
FG
726struct bucket_instance_meta_info {
727 string key;
728 obj_version ver;
729 utime_t mtime;
730 RGWBucketInstanceMetadataObject data;
731
732 bucket_instance_meta_info() {}
733
734 void decode_json(JSONObj *obj) {
735 JSONDecoder::decode_json("key", key, obj);
736 JSONDecoder::decode_json("ver", ver, obj);
737 JSONDecoder::decode_json("mtime", mtime, obj);
738 JSONDecoder::decode_json("data", data, obj);
739 }
740};
741
742class RGWListBucketIndexesCR : public RGWCoroutine {
743 RGWDataSyncEnv *sync_env;
744
745 RGWRados *store;
746
747 rgw_data_sync_status *sync_status;
748 int num_shards;
749
750 int req_ret;
751 int ret;
752
7c673cae
FG
753 list<string>::iterator iter;
754
755 RGWShardedOmapCRManager *entries_index;
756
757 string oid_prefix;
758
759 string path;
760 bucket_instance_meta_info meta_info;
761 string key;
762 string s;
763 int i;
764
765 bool failed;
494da23a
TL
766 bool truncated;
767 read_metadata_list result;
7c673cae
FG
768
769public:
770 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
771 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
772 store(sync_env->store), sync_status(_sync_status),
494da23a 773 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
7c673cae
FG
774 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
775 path = "/admin/metadata/bucket.instance";
776 num_shards = sync_status->sync_info.num_shards;
777 }
778 ~RGWListBucketIndexesCR() override {
779 delete entries_index;
780 }
781
782 int operate() override {
783 reenter(this) {
3efd9988 784 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
11fdf7f2 785 store->svc.zone->get_zone_params().log_pool,
3efd9988
FG
786 oid_prefix);
787 yield; // yield so OmapAppendCRs can start
7c673cae 788
494da23a 789 do {
7c673cae 790 yield {
494da23a 791 string entrypoint = string("/admin/metadata/bucket.instance");
7c673cae 792
494da23a
TL
793 rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
794 {"marker", result.marker.c_str()},
795 {NULL, NULL}};
796
797 call(new RGWReadRESTResourceCR<read_metadata_list>(store->ctx(), sync_env->conn, sync_env->http_manager,
798 entrypoint, pairs, &result));
799 }
800 if (retcode < 0) {
801 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
802 return set_cr_error(retcode);
7c673cae
FG
803 }
804
494da23a
TL
805 for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
806 ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
807 key = *iter;
808
809 yield {
810 rgw_http_param_pair pairs[] = {{"key", key.c_str()},
811 {NULL, NULL}};
812
813 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
814 }
815
816 num_shards = meta_info.data.get_bucket_info().num_shards;
817 if (num_shards > 0) {
818 for (i = 0; i < num_shards; i++) {
819 char buf[16];
820 snprintf(buf, sizeof(buf), ":%d", i);
821 s = key + buf;
822 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
823 }
824 } else {
825 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
7c673cae 826 }
7c673cae 827 }
494da23a
TL
828 truncated = result.truncated;
829 } while (truncated);
830
7c673cae
FG
831 yield {
832 if (!entries_index->finish()) {
833 failed = true;
834 }
835 }
836 if (!failed) {
837 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
838 int shard_id = (int)iter->first;
839 rgw_data_sync_marker& marker = iter->second;
840 marker.total_entries = entries_index->get_total_entries(shard_id);
11fdf7f2
TL
841 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
842 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
494da23a
TL
843 marker),
844 true);
7c673cae
FG
845 }
846 } else {
494da23a
TL
847 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
848 EIO, string("failed to build bucket instances map")));
7c673cae
FG
849 }
850 while (collect(&ret, NULL)) {
494da23a 851 if (ret < 0) {
7c673cae
FG
852 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
853 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
494da23a
TL
854 req_ret = ret;
855 }
7c673cae
FG
856 yield;
857 }
494da23a 858
7c673cae
FG
859 drain_all();
860 if (req_ret < 0) {
861 yield return set_cr_error(req_ret);
862 }
494da23a 863 yield return set_cr_done();
7c673cae
FG
864 }
865 return 0;
866 }
867};
868
869#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
870
871class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
872 RGWDataSyncEnv *sync_env;
873
874 string marker_oid;
875 rgw_data_sync_marker sync_marker;
876
877 map<string, string> key_to_marker;
878 map<string, string> marker_to_key;
879
880 void handle_finish(const string& marker) override {
881 map<string, string>::iterator iter = marker_to_key.find(marker);
882 if (iter == marker_to_key.end()) {
883 return;
884 }
885 key_to_marker.erase(iter->second);
886 reset_need_retry(iter->second);
887 marker_to_key.erase(iter);
888 }
889
11fdf7f2
TL
890 RGWSyncTraceNodeRef tn;
891
7c673cae
FG
892public:
893 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
894 const string& _marker_oid,
11fdf7f2
TL
895 const rgw_data_sync_marker& _marker,
896 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
897 sync_env(_sync_env),
898 marker_oid(_marker_oid),
11fdf7f2
TL
899 sync_marker(_marker),
900 tn(_tn) {}
7c673cae
FG
901
902 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
903 sync_marker.marker = new_marker;
904 sync_marker.pos = index_pos;
905
11fdf7f2 906 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae
FG
907 RGWRados *store = sync_env->store;
908
11fdf7f2
TL
909 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
910 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
911 sync_marker);
912 }
913
914 /*
915 * create index from key -> marker, and from marker -> key
916 * this is useful so that we can insure that we only have one
917 * entry for any key that is used. This is needed when doing
918 * incremenatl sync of data, and we don't want to run multiple
919 * concurrent sync operations for the same bucket shard
920 */
921 bool index_key_to_marker(const string& key, const string& marker) {
922 if (key_to_marker.find(key) != key_to_marker.end()) {
923 set_need_retry(key);
924 return false;
925 }
926 key_to_marker[key] = marker;
927 marker_to_key[marker] = key;
928 return true;
929 }
91327a77 930
11fdf7f2 931 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
932 return new RGWLastCallerWinsCR(sync_env->cct);
933 }
7c673cae
FG
934};
935
936// ostream wrappers to print buckets without copying strings
937struct bucket_str {
938 const rgw_bucket& b;
11fdf7f2 939 explicit bucket_str(const rgw_bucket& b) : b(b) {}
7c673cae
FG
940};
941std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
942 auto& b = rhs.b;
943 if (!b.tenant.empty()) {
944 out << b.tenant << '/';
945 }
946 out << b.name;
947 if (!b.bucket_id.empty()) {
948 out << ':' << b.bucket_id;
949 }
950 return out;
951}
952
11fdf7f2
TL
953struct bucket_str_noinstance {
954 const rgw_bucket& b;
955 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
956};
957std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
958 auto& b = rhs.b;
959 if (!b.tenant.empty()) {
960 out << b.tenant << '/';
961 }
962 out << b.name;
963 return out;
964}
965
7c673cae
FG
966struct bucket_shard_str {
967 const rgw_bucket_shard& bs;
11fdf7f2 968 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
7c673cae
FG
969};
970std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
971 auto& bs = rhs.bs;
972 out << bucket_str{bs.bucket};
973 if (bs.shard_id >= 0) {
974 out << ':' << bs.shard_id;
975 }
976 return out;
977}
978
979class RGWRunBucketSyncCoroutine : public RGWCoroutine {
980 RGWDataSyncEnv *sync_env;
981 rgw_bucket_shard bs;
982 RGWBucketInfo bucket_info;
983 rgw_bucket_shard_sync_info sync_status;
984 RGWMetaSyncEnv meta_sync_env;
985
7c673cae
FG
986 const std::string status_oid;
987
988 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
989 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
990
11fdf7f2
TL
991 RGWSyncTraceNodeRef tn;
992
7c673cae 993public:
11fdf7f2 994 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
7c673cae 995 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
11fdf7f2
TL
996 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
997 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
998 SSTR(bucket_shard_str{bs}))) {
7c673cae
FG
999 }
1000 ~RGWRunBucketSyncCoroutine() override {
1001 if (lease_cr) {
1002 lease_cr->abort();
1003 }
1004 }
1005
1006 int operate() override;
1007};
1008
1009class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1010 RGWDataSyncEnv *sync_env;
1011
1012 string raw_key;
1013 string entry_marker;
1014
1015 rgw_bucket_shard bs;
1016
1017 int sync_status;
1018
1019 bufferlist md_bl;
1020
1021 RGWDataSyncShardMarkerTrack *marker_tracker;
1022
1023 boost::intrusive_ptr<RGWOmapAppend> error_repo;
1024 bool remove_from_repo;
1025
1026 set<string> keys;
1027
11fdf7f2 1028 RGWSyncTraceNodeRef tn;
7c673cae
FG
1029public:
1030 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1031 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
11fdf7f2 1032 RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1033 sync_env(_sync_env),
1034 raw_key(_raw_key), entry_marker(_entry_marker),
1035 sync_status(0),
1036 marker_tracker(_marker_tracker),
1037 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1038 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
11fdf7f2 1039 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
7c673cae
FG
1040 }
1041
1042 int operate() override {
1043 reenter(this) {
1044 do {
1045 yield {
1046 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1047 &bs.bucket, &bs.shard_id);
1048 if (ret < 0) {
1049 return set_cr_error(-EIO);
1050 }
1051 if (marker_tracker) {
1052 marker_tracker->reset_need_retry(raw_key);
1053 }
11fdf7f2
TL
1054 tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
1055 call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
7c673cae
FG
1056 }
1057 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1058
1059 sync_status = retcode;
1060
1061 if (sync_status == -ENOENT) {
1062 // this was added when 'tenant/' was added to datalog entries, because
1063 // preexisting tenant buckets could never sync and would stay in the
1064 // error_repo forever
11fdf7f2 1065 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
7c673cae
FG
1066 sync_status = 0;
1067 }
1068
1069 if (sync_status < 0) {
b32b8144
FG
1070 // write actual sync failures for 'radosgw-admin sync error list'
1071 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1072 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1073 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1074 if (retcode < 0) {
11fdf7f2 1075 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
b32b8144 1076 }
7c673cae
FG
1077 }
1078 if (error_repo && !error_repo->append(raw_key)) {
11fdf7f2 1079 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
7c673cae
FG
1080 }
1081 } else if (error_repo && remove_from_repo) {
1082 keys = {raw_key};
1083 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1084 if (retcode < 0) {
11fdf7f2
TL
1085 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1086 << error_repo->get_obj() << " retcode=" << retcode));
7c673cae
FG
1087 }
1088 }
1089 /* FIXME: what do do in case of error */
1090 if (marker_tracker && !entry_marker.empty()) {
1091 /* update marker */
1092 yield call(marker_tracker->finish(entry_marker));
1093 }
1094 if (sync_status == 0) {
1095 sync_status = retcode;
1096 }
1097 if (sync_status < 0) {
1098 return set_cr_error(sync_status);
1099 }
1100 return set_cr_done();
1101 }
1102 return 0;
1103 }
1104};
1105
1106#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1107#define DATA_SYNC_MAX_ERR_ENTRIES 10
1108
1109class RGWDataSyncShardCR : public RGWCoroutine {
1110 RGWDataSyncEnv *sync_env;
1111
1112 rgw_pool pool;
1113
1114 uint32_t shard_id;
1115 rgw_data_sync_marker sync_marker;
1116
11fdf7f2 1117 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1118 std::set<std::string> entries;
1119 std::set<std::string>::iterator iter;
7c673cae
FG
1120
1121 string oid;
1122
1123 RGWDataSyncShardMarkerTrack *marker_tracker;
1124
a8e16298 1125 std::string next_marker;
7c673cae
FG
1126 list<rgw_data_change_log_entry> log_entries;
1127 list<rgw_data_change_log_entry>::iterator log_iter;
1128 bool truncated;
1129
7c673cae
FG
1130 Mutex inc_lock;
1131 Cond inc_cond;
1132
1133 boost::asio::coroutine incremental_cr;
1134 boost::asio::coroutine full_cr;
1135
1136
1137 set<string> modified_shards;
1138 set<string> current_modified;
1139
1140 set<string>::iterator modified_iter;
1141
1142 int total_entries;
1143
1144 int spawn_window;
1145
1146 bool *reset_backoff;
1147
1148 set<string> spawned_keys;
1149
31f18b77
FG
1150 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1151 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
1152 string status_oid;
1153
1154
1155 string error_oid;
1156 RGWOmapAppend *error_repo;
28e407b8 1157 std::set<std::string> error_entries;
7c673cae
FG
1158 string error_marker;
1159 int max_error_entries;
1160
91327a77 1161 ceph::coarse_real_time error_retry_time;
7c673cae
FG
1162
1163#define RETRY_BACKOFF_SECS_MIN 60
1164#define RETRY_BACKOFF_SECS_DEFAULT 60
1165#define RETRY_BACKOFF_SECS_MAX 600
1166 uint32_t retry_backoff_secs;
1167
11fdf7f2 1168 RGWSyncTraceNodeRef tn;
7c673cae
FG
1169public:
1170 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1171 rgw_pool& _pool,
11fdf7f2
TL
1172 uint32_t _shard_id, const rgw_data_sync_marker& _marker,
1173 RGWSyncTraceNodeRef& _tn,
1174 bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1175 sync_env(_sync_env),
1176 pool(_pool),
1177 shard_id(_shard_id),
1178 sync_marker(_marker),
91327a77 1179 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
7c673cae
FG
1180 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1181 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
11fdf7f2 1182 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
7c673cae
FG
1183 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1184 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1185 error_oid = status_oid + ".retry";
7c673cae
FG
1186 }
1187
1188 ~RGWDataSyncShardCR() override {
1189 delete marker_tracker;
1190 if (lease_cr) {
1191 lease_cr->abort();
7c673cae
FG
1192 }
1193 if (error_repo) {
1194 error_repo->put();
1195 }
1196 }
1197
1198 void append_modified_shards(set<string>& keys) {
1199 Mutex::Locker l(inc_lock);
1200 modified_shards.insert(keys.begin(), keys.end());
1201 }
1202
1203 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1204 delete marker_tracker;
1205 marker_tracker = mt;
1206 }
1207
1208 int operate() override {
1209 int r;
1210 while (true) {
1211 switch (sync_marker.state) {
1212 case rgw_data_sync_marker::FullSync:
1213 r = full_sync();
1214 if (r < 0) {
11fdf7f2
TL
1215 if (r != -EBUSY) {
1216 tn->log(10, SSTR("full sync failed (r=" << r << ")"));
1217 }
7c673cae
FG
1218 return set_cr_error(r);
1219 }
1220 return 0;
1221 case rgw_data_sync_marker::IncrementalSync:
1222 r = incremental_sync();
1223 if (r < 0) {
11fdf7f2
TL
1224 if (r != -EBUSY) {
1225 tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
1226 }
7c673cae
FG
1227 return set_cr_error(r);
1228 }
1229 return 0;
1230 default:
1231 return set_cr_error(-EIO);
1232 }
1233 }
1234 return 0;
1235 }
1236
1237 void init_lease_cr() {
1238 set_status("acquiring sync lock");
1239 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1240 string lock_name = "sync_lock";
1241 if (lease_cr) {
1242 lease_cr->abort();
7c673cae
FG
1243 }
1244 RGWRados *store = sync_env->store;
31f18b77 1245 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 1246 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
1247 lock_name, lock_duration, this));
1248 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1249 }
1250
1251 int full_sync() {
1252#define OMAP_GET_MAX_ENTRIES 100
1253 int max_entries = OMAP_GET_MAX_ENTRIES;
1254 reenter(&full_cr) {
11fdf7f2 1255 tn->log(10, "start full sync");
7c673cae
FG
1256 yield init_lease_cr();
1257 while (!lease_cr->is_locked()) {
1258 if (lease_cr->is_done()) {
11fdf7f2 1259 tn->log(5, "failed to take lease");
7c673cae 1260 set_status("lease lock failed, early abort");
a8e16298 1261 drain_all();
7c673cae
FG
1262 return set_cr_error(lease_cr->get_ret_status());
1263 }
1264 set_sleeping(true);
1265 yield;
1266 }
11fdf7f2 1267 tn->log(10, "took lease");
7c673cae 1268 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
11fdf7f2 1269 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae
FG
1270 total_entries = sync_marker.pos;
1271 do {
91327a77
AA
1272 if (!lease_cr->is_locked()) {
1273 stop_spawned_services();
1274 drain_all();
1275 return set_cr_error(-ECANCELED);
1276 }
11fdf7f2
TL
1277 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1278 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1279 sync_marker.marker, max_entries, omapkeys));
7c673cae 1280 if (retcode < 0) {
11fdf7f2 1281 tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
7c673cae
FG
1282 lease_cr->go_down();
1283 drain_all();
1284 return set_cr_error(retcode);
1285 }
11fdf7f2
TL
1286 entries = std::move(omapkeys->entries);
1287 if (entries.size() > 0) {
1288 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1289 }
1290 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
7c673cae
FG
1291 iter = entries.begin();
1292 for (; iter != entries.end(); ++iter) {
11fdf7f2 1293 tn->log(20, SSTR("full sync: " << *iter));
7c673cae 1294 total_entries++;
28e407b8 1295 if (!marker_tracker->start(*iter, total_entries, real_time())) {
11fdf7f2 1296 tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
7c673cae
FG
1297 } else {
1298 // fetch remote and write locally
11fdf7f2 1299 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
7c673cae 1300 }
28e407b8 1301 sync_marker.marker = *iter;
91327a77
AA
1302
1303 while ((int)num_spawned() > spawn_window) {
1304 set_status() << "num_spawned() > spawn_window";
1305 yield wait_for_child();
1306 int ret;
1307 while (collect(&ret, lease_stack.get())) {
1308 if (ret < 0) {
11fdf7f2 1309 tn->log(10, "a sync operation returned error");
91327a77
AA
1310 }
1311 }
1312 }
7c673cae 1313 }
11fdf7f2
TL
1314 } while (omapkeys->more);
1315 omapkeys.reset();
7c673cae 1316
91327a77 1317 drain_all_but_stack(lease_stack.get());
7c673cae 1318
11fdf7f2
TL
1319 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1320
7c673cae
FG
1321 yield {
1322 /* update marker to reflect we're done with full sync */
1323 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1324 sync_marker.marker = sync_marker.next_step_marker;
1325 sync_marker.next_step_marker.clear();
1326 RGWRados *store = sync_env->store;
11fdf7f2
TL
1327 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1328 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
1329 sync_marker));
1330 }
1331 if (retcode < 0) {
11fdf7f2 1332 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
7c673cae 1333 lease_cr->go_down();
91327a77 1334 drain_all();
7c673cae
FG
1335 return set_cr_error(retcode);
1336 }
91327a77 1337 // keep lease and transition to incremental_sync()
7c673cae
FG
1338 }
1339 return 0;
1340 }
1341
1342 int incremental_sync() {
1343 reenter(&incremental_cr) {
11fdf7f2 1344 tn->log(10, "start incremental sync");
91327a77 1345 if (lease_cr) {
11fdf7f2 1346 tn->log(10, "lease already held from full sync");
91327a77
AA
1347 } else {
1348 yield init_lease_cr();
1349 while (!lease_cr->is_locked()) {
1350 if (lease_cr->is_done()) {
11fdf7f2 1351 tn->log(5, "failed to take lease");
91327a77 1352 set_status("lease lock failed, early abort");
a8e16298 1353 drain_all();
91327a77
AA
1354 return set_cr_error(lease_cr->get_ret_status());
1355 }
1356 set_sleeping(true);
1357 yield;
7c673cae 1358 }
91327a77 1359 set_status("lease acquired");
11fdf7f2 1360 tn->log(10, "took lease");
7c673cae 1361 }
7c673cae
FG
1362 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1363 rgw_raw_obj(pool, error_oid),
1364 1 /* no buffer */);
1365 error_repo->get();
1366 spawn(error_repo, false);
11fdf7f2 1367 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae 1368 do {
91327a77
AA
1369 if (!lease_cr->is_locked()) {
1370 stop_spawned_services();
1371 drain_all();
1372 return set_cr_error(-ECANCELED);
1373 }
7c673cae
FG
1374 current_modified.clear();
1375 inc_lock.Lock();
1376 current_modified.swap(modified_shards);
1377 inc_lock.Unlock();
1378
11fdf7f2
TL
1379 if (current_modified.size() > 0) {
1380 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1381 }
7c673cae
FG
1382 /* process out of band updates */
1383 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1384 yield {
11fdf7f2
TL
1385 tn->log(20, SSTR("received async update notification: " << *modified_iter));
1386 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
7c673cae
FG
1387 }
1388 }
1389
91327a77
AA
1390 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1391 /* process bucket shards that previously failed */
11fdf7f2 1392 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
91327a77 1393 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
11fdf7f2
TL
1394 error_marker, max_error_entries, omapkeys));
1395 error_entries = std::move(omapkeys->entries);
1396 tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
91327a77
AA
1397 iter = error_entries.begin();
1398 for (; iter != error_entries.end(); ++iter) {
1399 error_marker = *iter;
11fdf7f2
TL
1400 tn->log(20, SSTR("handle error entry: " << error_marker));
1401 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
91327a77 1402 }
11fdf7f2 1403 if (!omapkeys->more) {
91327a77
AA
1404 if (error_marker.empty() && error_entries.empty()) {
1405 /* the retry repo is empty, we back off a bit before calling it again */
1406 retry_backoff_secs *= 2;
1407 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1408 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1409 }
1410 } else {
1411 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
7c673cae 1412 }
91327a77
AA
1413 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1414 error_marker.clear();
7c673cae 1415 }
7c673cae 1416 }
11fdf7f2 1417 omapkeys.reset();
7c673cae 1418
91327a77 1419#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1420 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
91327a77 1421 spawned_keys.clear();
a8e16298
TL
1422 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
1423 &next_marker, &log_entries, &truncated));
eafe8130 1424 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 1425 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
7c673cae
FG
1426 stop_spawned_services();
1427 drain_all();
1428 return set_cr_error(retcode);
1429 }
11fdf7f2
TL
1430
1431 if (log_entries.size() > 0) {
1432 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1433 }
1434
91327a77 1435 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
11fdf7f2 1436 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
91327a77 1437 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
11fdf7f2 1438 tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
91327a77
AA
1439 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1440 continue;
7c673cae 1441 }
91327a77 1442 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
11fdf7f2 1443 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
91327a77
AA
1444 } else {
1445 /*
1446 * don't spawn the same key more than once. We can do that as long as we don't yield
1447 */
1448 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1449 spawned_keys.insert(log_iter->entry.key);
11fdf7f2 1450 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
91327a77
AA
1451 if (retcode < 0) {
1452 stop_spawned_services();
1453 drain_all();
1454 return set_cr_error(retcode);
7c673cae
FG
1455 }
1456 }
91327a77 1457 }
11fdf7f2
TL
1458 }
1459 while ((int)num_spawned() > spawn_window) {
1460 set_status() << "num_spawned() > spawn_window";
1461 yield wait_for_child();
1462 int ret;
1463 while (collect(&ret, lease_stack.get())) {
1464 if (ret < 0) {
1465 tn->log(10, "a sync operation returned error");
1466 /* we have reported this error */
7c673cae 1467 }
91327a77 1468 /* not waiting for child here */
7c673cae 1469 }
a8e16298 1470 }
11fdf7f2
TL
1471
1472 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1473 << " next_marker=" << next_marker << " truncated=" << truncated));
a8e16298
TL
1474 if (!next_marker.empty()) {
1475 sync_marker.marker = next_marker;
1476 } else if (!log_entries.empty()) {
1477 sync_marker.marker = log_entries.back().log_id;
1478 }
11fdf7f2
TL
1479 if (!truncated) {
1480 // we reached the end, wait a while before checking for more
1481 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1482 yield wait(get_idle_interval());
1483 }
7c673cae
FG
1484 } while (true);
1485 }
1486 return 0;
1487 }
91327a77
AA
1488
1489 utime_t get_idle_interval() const {
1490#define INCREMENTAL_INTERVAL 20
1491 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1492 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1493 auto now = ceph::coarse_real_clock::now();
1494 if (error_retry_time > now) {
1495 auto d = error_retry_time - now;
1496 if (interval > d) {
1497 interval = d;
1498 }
1499 }
1500 }
1501 // convert timespan -> time_point -> utime_t
1502 return utime_t(ceph::coarse_real_clock::zero() + interval);
1503 }
1504
7c673cae
FG
1505 void stop_spawned_services() {
1506 lease_cr->go_down();
1507 if (error_repo) {
1508 error_repo->finish();
1509 error_repo->put();
1510 error_repo = NULL;
1511 }
1512 }
1513};
1514
1515class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1516 RGWDataSyncEnv *sync_env;
1517
1518 rgw_pool pool;
1519
1520 uint32_t shard_id;
1521 rgw_data_sync_marker sync_marker;
1522
11fdf7f2 1523 RGWSyncTraceNodeRef tn;
7c673cae 1524public:
11fdf7f2
TL
1525 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
1526 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1527 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
7c673cae
FG
1528 sync_env(_sync_env),
1529 pool(_pool),
1530 shard_id(_shard_id),
1531 sync_marker(_marker) {
11fdf7f2 1532 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
7c673cae
FG
1533 }
1534
1535 RGWCoroutine *alloc_cr() override {
11fdf7f2 1536 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
7c673cae
FG
1537 }
1538
1539 RGWCoroutine *alloc_finisher_cr() override {
1540 RGWRados *store = sync_env->store;
11fdf7f2
TL
1541 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1542 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
7c673cae
FG
1543 &sync_marker);
1544 }
1545
1546 void append_modified_shards(set<string>& keys) {
1547 Mutex::Locker l(cr_lock());
1548
1549 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1550 if (!cr) {
1551 return;
1552 }
1553
1554 cr->append_modified_shards(keys);
1555 }
1556};
1557
1558class RGWDataSyncCR : public RGWCoroutine {
1559 RGWDataSyncEnv *sync_env;
1560 uint32_t num_shards;
1561
1562 rgw_data_sync_status sync_status;
1563
1564 RGWDataSyncShardMarkerTrack *marker_tracker;
1565
1566 Mutex shard_crs_lock;
1567 map<int, RGWDataSyncShardControlCR *> shard_crs;
1568
1569 bool *reset_backoff;
1570
11fdf7f2 1571 RGWSyncTraceNodeRef tn;
31f18b77
FG
1572
1573 RGWDataSyncModule *data_sync_module{nullptr};
7c673cae 1574public:
11fdf7f2 1575 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1576 sync_env(_sync_env),
1577 num_shards(_num_shards),
1578 marker_tracker(NULL),
1579 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
11fdf7f2 1580 reset_backoff(_reset_backoff), tn(_tn) {
31f18b77 1581
7c673cae
FG
1582 }
1583
1584 ~RGWDataSyncCR() override {
1585 for (auto iter : shard_crs) {
1586 iter.second->put();
1587 }
1588 }
1589
1590 int operate() override {
1591 reenter(this) {
1592
1593 /* read sync status */
1594 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1595
31f18b77
FG
1596 data_sync_module = sync_env->sync_module->get_data_handler();
1597
224ce89b 1598 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 1599 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
7c673cae
FG
1600 return set_cr_error(retcode);
1601 }
1602
1603 /* state: init status */
1604 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
11fdf7f2 1605 tn->log(20, SSTR("init"));
224ce89b 1606 sync_status.sync_info.num_shards = num_shards;
31f18b77 1607 uint64_t instance_id;
11fdf7f2
TL
1608 instance_id = ceph::util::generate_random_number<uint64_t>();
1609 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
7c673cae 1610 if (retcode < 0) {
11fdf7f2 1611 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
7c673cae
FG
1612 return set_cr_error(retcode);
1613 }
1614 // sets state = StateBuildingFullSyncMaps
1615
1616 *reset_backoff = true;
1617 }
1618
31f18b77
FG
1619 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1620
7c673cae 1621 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
11fdf7f2 1622 tn->log(10, SSTR("building full sync maps"));
31f18b77 1623 /* call sync module init here */
b32b8144 1624 sync_status.sync_info.num_shards = num_shards;
31f18b77
FG
1625 yield call(data_sync_module->init_sync(sync_env));
1626 if (retcode < 0) {
11fdf7f2 1627 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
31f18b77
FG
1628 return set_cr_error(retcode);
1629 }
7c673cae 1630 /* state: building full sync maps */
7c673cae
FG
1631 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1632 if (retcode < 0) {
11fdf7f2 1633 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
7c673cae
FG
1634 return set_cr_error(retcode);
1635 }
1636 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1637
1638 /* update new state */
1639 yield call(set_sync_info_cr());
1640 if (retcode < 0) {
11fdf7f2 1641 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
7c673cae
FG
1642 return set_cr_error(retcode);
1643 }
1644
1645 *reset_backoff = true;
1646 }
1647
11fdf7f2
TL
1648 yield call(data_sync_module->start_sync(sync_env));
1649
7c673cae
FG
1650 yield {
1651 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
11fdf7f2 1652 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
7c673cae
FG
1653 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1654 iter != sync_status.sync_markers.end(); ++iter) {
11fdf7f2
TL
1655 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
1656 iter->first, iter->second, tn);
7c673cae
FG
1657 cr->get();
1658 shard_crs_lock.Lock();
1659 shard_crs[iter->first] = cr;
1660 shard_crs_lock.Unlock();
1661 spawn(cr, true);
1662 }
1663 }
1664 }
1665
1666 return set_cr_done();
1667 }
1668 return 0;
1669 }
1670
1671 RGWCoroutine *set_sync_info_cr() {
1672 RGWRados *store = sync_env->store;
11fdf7f2
TL
1673 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
1674 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
1675 sync_status.sync_info);
1676 }
1677
1678 void wakeup(int shard_id, set<string>& keys) {
1679 Mutex::Locker l(shard_crs_lock);
1680 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1681 if (iter == shard_crs.end()) {
1682 return;
1683 }
1684 iter->second->append_modified_shards(keys);
1685 iter->second->wakeup();
1686 }
1687};
1688
1689class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1690public:
1691 RGWDefaultDataSyncModule() {}
1692
11fdf7f2 1693 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
31f18b77 1694 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae 1695 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1696 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae
FG
1697};
1698
1699class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1700 RGWDefaultDataSyncModule data_handler;
1701public:
1702 RGWDefaultSyncModuleInstance() {}
1703 RGWDataSyncModule *get_data_handler() override {
1704 return &data_handler;
1705 }
11fdf7f2
TL
1706 bool supports_user_writes() override {
1707 return true;
1708 }
7c673cae
FG
1709};
1710
11fdf7f2 1711int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
7c673cae
FG
1712{
1713 instance->reset(new RGWDefaultSyncModuleInstance());
1714 return 0;
1715}
1716
11fdf7f2 1717RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1718{
1719 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
11fdf7f2
TL
1720 std::nullopt,
1721 key, std::nullopt, versioned_epoch,
81eedcae 1722 true, zones_trace, sync_env->counters);
7c673cae
FG
1723}
1724
1725RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
31f18b77 1726 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1727{
1728 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1729 bucket_info, key, versioned, versioned_epoch,
31f18b77 1730 NULL, NULL, false, &mtime, zones_trace);
7c673cae
FG
1731}
1732
1733RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1734 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1735{
1736 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1737 bucket_info, key, versioned, versioned_epoch,
31f18b77 1738 &owner.id, &owner.display_name, true, &mtime, zones_trace);
7c673cae
FG
1739}
1740
11fdf7f2
TL
1741class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
1742public:
1743 RGWArchiveDataSyncModule() {}
1744
1745 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1746 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1747 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1748 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1749};
1750
1751class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
1752 RGWArchiveDataSyncModule data_handler;
1753public:
1754 RGWArchiveSyncModuleInstance() {}
1755 RGWDataSyncModule *get_data_handler() override {
1756 return &data_handler;
1757 }
1758 RGWMetadataHandler *alloc_bucket_meta_handler() override {
1759 return RGWArchiveBucketMetaHandlerAllocator::alloc();
1760 }
1761 RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
1762 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
1763 }
1764};
1765
1766int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1767{
1768 instance->reset(new RGWArchiveSyncModuleInstance());
1769 return 0;
1770}
1771
1772RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1773{
1774 ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1775 if (!bucket_info.versioned() ||
1776 (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
1777 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
1778 bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
1779 int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
1780 if (op_ret < 0) {
1781 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
1782 return NULL;
1783 }
1784 }
1785
1786 std::optional<rgw_obj_key> dest_key;
1787
1788 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
1789 versioned_epoch = 0;
1790 dest_key = key;
1791 if (key.instance.empty()) {
1792 sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
1793 }
1794 }
1795
1796 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1797 bucket_info, std::nullopt,
1798 key, dest_key, versioned_epoch,
81eedcae 1799 true, zones_trace, nullptr);
11fdf7f2
TL
1800}
1801
1802RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1803 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1804{
1805 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
1806 return NULL;
1807}
1808
1809RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1810 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1811{
1812 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
1813 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1814 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1815 bucket_info, key, versioned, versioned_epoch,
1816 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1817}
1818
7c673cae
FG
1819class RGWDataSyncControlCR : public RGWBackoffControlCR
1820{
1821 RGWDataSyncEnv *sync_env;
1822 uint32_t num_shards;
1823
11fdf7f2
TL
1824 RGWSyncTraceNodeRef tn;
1825
3efd9988 1826 static constexpr bool exit_on_error = false; // retry on all errors
7c673cae 1827public:
11fdf7f2
TL
1828 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
1829 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1830 sync_env(_sync_env), num_shards(_num_shards) {
1831 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
7c673cae
FG
1832 }
1833
1834 RGWCoroutine *alloc_cr() override {
11fdf7f2 1835 return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
7c673cae
FG
1836 }
1837
1838 void wakeup(int shard_id, set<string>& keys) {
1839 Mutex& m = cr_lock();
1840
1841 m.Lock();
1842 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1843 if (!cr) {
1844 m.Unlock();
1845 return;
1846 }
1847
1848 cr->get();
1849 m.Unlock();
1850
1851 if (cr) {
11fdf7f2 1852 tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
7c673cae
FG
1853 cr->wakeup(shard_id, keys);
1854 }
1855
1856 cr->put();
1857 }
1858};
1859
1860void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1861 RWLock::RLocker rl(lock);
1862 if (!data_sync_cr) {
1863 return;
1864 }
1865 data_sync_cr->wakeup(shard_id, keys);
1866}
1867
1868int RGWRemoteDataLog::run_sync(int num_shards)
1869{
1870 lock.get_write();
11fdf7f2 1871 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
7c673cae
FG
1872 data_sync_cr->get(); // run() will drop a ref, so take another
1873 lock.unlock();
1874
1875 int r = run(data_sync_cr);
1876
1877 lock.get_write();
1878 data_sync_cr->put();
1879 data_sync_cr = NULL;
1880 lock.unlock();
1881
1882 if (r < 0) {
11fdf7f2 1883 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
7c673cae
FG
1884 return r;
1885 }
1886 return 0;
1887}
1888
1889int RGWDataSyncStatusManager::init()
1890{
11fdf7f2
TL
1891 RGWZone *zone_def;
1892
1893 if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
1894 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
7c673cae
FG
1895 return -EIO;
1896 }
1897
11fdf7f2 1898 if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
7c673cae
FG
1899 return -ENOTSUP;
1900 }
1901
11fdf7f2 1902 const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
7c673cae 1903
94b18763
FG
1904 if (sync_module == nullptr) {
1905 sync_module = store->get_sync_module();
1906 }
7c673cae 1907
11fdf7f2 1908 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 1909 if (!conn) {
11fdf7f2 1910 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
1911 return -EINVAL;
1912 }
1913
1914 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1915
81eedcae
TL
1916 int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(),
1917 sync_module, counters);
7c673cae 1918 if (r < 0) {
11fdf7f2 1919 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
7c673cae
FG
1920 finalize();
1921 return r;
1922 }
1923
1924 rgw_datalog_info datalog_info;
1925 r = source_log.read_log_info(&datalog_info);
1926 if (r < 0) {
11fdf7f2 1927 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
7c673cae
FG
1928 finalize();
1929 return r;
1930 }
1931
1932 num_shards = datalog_info.num_shards;
1933
1934 for (int i = 0; i < num_shards; i++) {
1935 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1936 }
1937
1938 return 0;
1939}
1940
1941void RGWDataSyncStatusManager::finalize()
1942{
1943 delete error_logger;
1944 error_logger = nullptr;
1945}
1946
11fdf7f2
TL
1947unsigned RGWDataSyncStatusManager::get_subsys() const
1948{
1949 return dout_subsys;
1950}
1951
1952std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
1953{
1954 auto zone = std::string_view{source_zone};
1955 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
1956}
1957
7c673cae
FG
1958string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1959{
1960 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1961 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1962
1963 return string(buf);
1964}
1965
1966string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1967{
1968 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1969 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1970
1971 return string(buf);
1972}
1973
1974int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1975 const rgw_bucket& bucket, int shard_id,
1976 RGWSyncErrorLogger *_error_logger,
11fdf7f2 1977 RGWSyncTraceManager *_sync_tracer,
7c673cae
FG
1978 RGWSyncModuleInstanceRef& _sync_module)
1979{
1980 conn = _conn;
1981 source_zone = _source_zone;
1982 bs.bucket = bucket;
1983 bs.shard_id = shard_id;
1984
11fdf7f2 1985 sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
81eedcae 1986 _error_logger, _sync_tracer, source_zone, _sync_module, nullptr);
7c673cae
FG
1987
1988 return 0;
1989}
1990
7c673cae
FG
1991class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1992 RGWDataSyncEnv *sync_env;
1993 const string instance_key;
1994
28e407b8 1995 rgw_bucket_index_marker_info *info;
7c673cae
FG
1996
1997public:
1998 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1999 const rgw_bucket_shard& bs,
28e407b8 2000 rgw_bucket_index_marker_info *_info)
7c673cae
FG
2001 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2002 instance_key(bs.get_key()), info(_info) {}
2003
2004 int operate() override {
2005 reenter(this) {
2006 yield {
2007 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
2008 { "bucket-instance", instance_key.c_str() },
2009 { "info" , NULL },
2010 { NULL, NULL } };
2011
2012 string p = "/admin/log/";
28e407b8 2013 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
7c673cae
FG
2014 }
2015 if (retcode < 0) {
2016 return set_cr_error(retcode);
2017 }
2018 return set_cr_done();
2019 }
2020 return 0;
2021 }
2022};
2023
2024class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
2025 RGWDataSyncEnv *sync_env;
2026
2027 rgw_bucket_shard bs;
2028 const string sync_status_oid;
2029
2030 rgw_bucket_shard_sync_info& status;
2031
28e407b8 2032 rgw_bucket_index_marker_info info;
7c673cae
FG
2033public:
2034 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2035 const rgw_bucket_shard& bs,
2036 rgw_bucket_shard_sync_info& _status)
2037 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2038 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2039 status(_status)
2040 {}
2041
2042 int operate() override {
2043 reenter(this) {
2044 /* fetch current position in logs */
2045 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
2046 if (retcode < 0 && retcode != -ENOENT) {
2047 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
2048 return set_cr_error(retcode);
2049 }
2050 yield {
7c673cae 2051 auto store = sync_env->store;
11fdf7f2 2052 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
c07f9fc5
FG
2053
2054 if (info.syncstopped) {
2055 call(new RGWRadosRemoveCR(store, obj));
2056 } else {
2057 status.state = rgw_bucket_shard_sync_info::StateFullSync;
2058 status.inc_marker.position = info.max_marker;
2059 map<string, bufferlist> attrs;
2060 status.encode_all_attrs(attrs);
11fdf7f2 2061 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
c07f9fc5 2062 }
7c673cae 2063 }
91327a77
AA
2064 if (info.syncstopped) {
2065 retcode = -ENOENT;
2066 }
2067 if (retcode < 0) {
2068 return set_cr_error(retcode);
2069 }
7c673cae
FG
2070 return set_cr_done();
2071 }
2072 return 0;
2073 }
2074};
2075
2076RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
2077{
2078 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
2079}
2080
11fdf7f2
TL
2081#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2082
7c673cae 2083template <class T>
11fdf7f2 2084static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
7c673cae
FG
2085{
2086 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
2087 if (iter == attrs.end()) {
2088 *val = T();
11fdf7f2 2089 return false;
7c673cae
FG
2090 }
2091
11fdf7f2 2092 auto biter = iter->second.cbegin();
7c673cae 2093 try {
11fdf7f2 2094 decode(*val, biter);
7c673cae
FG
2095 } catch (buffer::error& err) {
2096 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
11fdf7f2 2097 return false;
7c673cae 2098 }
11fdf7f2 2099 return true;
7c673cae
FG
2100}
2101
2102void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
2103{
11fdf7f2
TL
2104 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
2105 decode_attr(cct, attrs, "state", &state);
2106 }
2107 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
2108 decode_attr(cct, attrs, "full_marker", &full_marker);
2109 }
2110 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
2111 decode_attr(cct, attrs, "inc_marker", &inc_marker);
2112 }
7c673cae
FG
2113}
2114
2115void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
2116{
2117 encode_state_attr(attrs);
2118 full_marker.encode_attr(attrs);
2119 inc_marker.encode_attr(attrs);
2120}
2121
2122void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
2123{
11fdf7f2
TL
2124 using ceph::encode;
2125 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
7c673cae
FG
2126}
2127
2128void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2129{
11fdf7f2
TL
2130 using ceph::encode;
2131 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
7c673cae
FG
2132}
2133
2134void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2135{
11fdf7f2
TL
2136 using ceph::encode;
2137 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
7c673cae
FG
2138}
2139
2140class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
2141 RGWDataSyncEnv *sync_env;
2142 string oid;
2143 rgw_bucket_shard_sync_info *status;
2144
2145 map<string, bufferlist> attrs;
2146public:
2147 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2148 const rgw_bucket_shard& bs,
2149 rgw_bucket_shard_sync_info *_status)
2150 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2151 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2152 status(_status) {}
2153 int operate() override;
2154};
2155
2156int RGWReadBucketSyncStatusCoroutine::operate()
2157{
2158 reenter(this) {
11fdf7f2
TL
2159 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
2160 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
2161 &attrs, true));
7c673cae
FG
2162 if (retcode == -ENOENT) {
2163 *status = rgw_bucket_shard_sync_info();
2164 return set_cr_done();
2165 }
2166 if (retcode < 0) {
2167 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2168 return set_cr_error(retcode);
2169 }
2170 status->decode_from_attrs(sync_env->cct, attrs);
2171 return set_cr_done();
2172 }
2173 return 0;
2174}
28e407b8
AA
2175
2176#define OMAP_READ_MAX_ENTRIES 10
2177class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2178 RGWDataSyncEnv *sync_env;
2179 RGWRados *store;
2180
2181 const int shard_id;
2182 int max_entries;
2183
2184 set<string>& recovering_buckets;
2185 string marker;
2186 string error_oid;
2187
11fdf7f2 2188 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
2189 set<string> error_entries;
2190 int max_omap_entries;
2191 int count;
2192
2193public:
2194 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2195 set<string>& _recovering_buckets, const int _max_entries)
2196 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2197 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2198 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2199 {
2200 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2201 }
2202
2203 int operate() override;
2204};
2205
2206int RGWReadRecoveringBucketShardsCoroutine::operate()
2207{
2208 reenter(this){
2209 //read recovering bucket shards
2210 count = 0;
2211 do {
11fdf7f2
TL
2212 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
2213 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid),
2214 marker, max_omap_entries, omapkeys));
28e407b8
AA
2215
2216 if (retcode == -ENOENT) {
2217 break;
2218 }
2219
2220 if (retcode < 0) {
2221 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2222 << cpp_strerror(retcode) << dendl;
2223 return set_cr_error(retcode);
2224 }
2225
11fdf7f2 2226 error_entries = std::move(omapkeys->entries);
28e407b8
AA
2227 if (error_entries.empty()) {
2228 break;
2229 }
2230
2231 count += error_entries.size();
2232 marker = *error_entries.rbegin();
11fdf7f2
TL
2233 recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
2234 std::make_move_iterator(error_entries.end()));
2235 } while (omapkeys->more && count < max_entries);
28e407b8
AA
2236
2237 return set_cr_done();
2238 }
2239
2240 return 0;
2241}
2242
2243class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2244 RGWDataSyncEnv *sync_env;
2245 RGWRados *store;
2246
2247 const int shard_id;
2248 int max_entries;
2249
2250 set<string>& pending_buckets;
2251 string marker;
2252 string status_oid;
2253
2254 rgw_data_sync_marker* sync_marker;
2255 int count;
2256
a8e16298 2257 std::string next_marker;
28e407b8
AA
2258 list<rgw_data_change_log_entry> log_entries;
2259 bool truncated;
2260
2261public:
2262 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2263 set<string>& _pending_buckets,
2264 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2265 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2266 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2267 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2268 {
2269 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2270 }
2271
2272 int operate() override;
2273};
2274
2275int RGWReadPendingBucketShardsCoroutine::operate()
2276{
2277 reenter(this){
2278 //read sync status marker
2279 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
2280 yield call(new CR(sync_env->async_rados, store->svc.sysobj,
2281 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
28e407b8
AA
2282 sync_marker));
2283 if (retcode < 0) {
2284 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2285 << cpp_strerror(retcode) << dendl;
2286 return set_cr_error(retcode);
2287 }
2288
2289 //read pending bucket shards
2290 marker = sync_marker->marker;
2291 count = 0;
2292 do{
a8e16298
TL
2293 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
2294 &next_marker, &log_entries, &truncated));
28e407b8
AA
2295
2296 if (retcode == -ENOENT) {
2297 break;
2298 }
2299
2300 if (retcode < 0) {
2301 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2302 << cpp_strerror(retcode) << dendl;
2303 return set_cr_error(retcode);
2304 }
2305
2306 if (log_entries.empty()) {
2307 break;
2308 }
2309
2310 count += log_entries.size();
2311 for (const auto& entry : log_entries) {
2312 pending_buckets.insert(entry.entry.key);
2313 }
2314 }while(truncated && count < max_entries);
2315
2316 return set_cr_done();
2317 }
2318
2319 return 0;
2320}
2321
2322int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2323{
2324 // cannot run concurrently with run_sync(), so run in a separate manager
2325 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2326 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2327 int ret = http_manager.start();
28e407b8 2328 if (ret < 0) {
11fdf7f2 2329 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
2330 return ret;
2331 }
2332 RGWDataSyncEnv sync_env_local = sync_env;
2333 sync_env_local.http_manager = &http_manager;
2334 list<RGWCoroutinesStack *> stacks;
2335 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2336 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2337 stacks.push_back(recovering_stack);
2338 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2339 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2340 stacks.push_back(pending_stack);
2341 ret = crs.run(stacks);
2342 http_manager.stop();
2343 return ret;
2344}
2345
7c673cae
FG
2346RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2347{
2348 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2349}
2350
2351RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2352 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2353 delete iter->second;
2354 }
2355 delete error_logger;
2356}
2357
2358
2359void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2360{
2361 JSONDecoder::decode_json("ID", id, obj);
2362 JSONDecoder::decode_json("DisplayName", display_name, obj);
2363}
2364
2365struct bucket_list_entry {
2366 bool delete_marker;
2367 rgw_obj_key key;
2368 bool is_latest;
2369 real_time mtime;
2370 string etag;
2371 uint64_t size;
2372 string storage_class;
2373 rgw_bucket_entry_owner owner;
2374 uint64_t versioned_epoch;
2375 string rgw_tag;
2376
2377 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2378
2379 void decode_json(JSONObj *obj) {
2380 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2381 JSONDecoder::decode_json("Key", key.name, obj);
2382 JSONDecoder::decode_json("VersionId", key.instance, obj);
2383 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2384 string mtime_str;
2385 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2386
2387 struct tm t;
2388 uint32_t nsec;
2389 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2390 ceph_timespec ts;
2391 ts.tv_sec = (uint64_t)internal_timegm(&t);
2392 ts.tv_nsec = nsec;
2393 mtime = real_clock::from_ceph_timespec(ts);
2394 }
2395 JSONDecoder::decode_json("ETag", etag, obj);
2396 JSONDecoder::decode_json("Size", size, obj);
2397 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2398 JSONDecoder::decode_json("Owner", owner, obj);
2399 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2400 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
11fdf7f2
TL
2401 if (key.instance == "null" && !versioned_epoch) {
2402 key.instance.clear();
2403 }
7c673cae 2404 }
a8e16298
TL
2405
2406 RGWModifyOp get_modify_op() const {
2407 if (delete_marker) {
2408 return CLS_RGW_OP_LINK_OLH_DM;
2409 } else if (!key.instance.empty() && key.instance != "null") {
2410 return CLS_RGW_OP_LINK_OLH;
2411 } else {
2412 return CLS_RGW_OP_ADD;
2413 }
2414 }
7c673cae
FG
2415};
2416
2417struct bucket_list_result {
2418 string name;
2419 string prefix;
2420 string key_marker;
2421 string version_id_marker;
2422 int max_keys;
2423 bool is_truncated;
2424 list<bucket_list_entry> entries;
2425
2426 bucket_list_result() : max_keys(0), is_truncated(false) {}
2427
2428 void decode_json(JSONObj *obj) {
2429 JSONDecoder::decode_json("Name", name, obj);
2430 JSONDecoder::decode_json("Prefix", prefix, obj);
2431 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2432 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2433 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2434 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2435 JSONDecoder::decode_json("Entries", entries, obj);
2436 }
2437};
2438
2439class RGWListBucketShardCR: public RGWCoroutine {
2440 RGWDataSyncEnv *sync_env;
2441 const rgw_bucket_shard& bs;
2442 const string instance_key;
2443 rgw_obj_key marker_position;
2444
2445 bucket_list_result *result;
2446
2447public:
2448 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2449 rgw_obj_key& _marker_position, bucket_list_result *_result)
2450 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2451 instance_key(bs.get_key()), marker_position(_marker_position),
2452 result(_result) {}
2453
2454 int operate() override {
2455 reenter(this) {
2456 yield {
2457 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2458 { "versions" , NULL },
2459 { "format" , "json" },
2460 { "objs-container" , "true" },
2461 { "key-marker" , marker_position.name.c_str() },
2462 { "version-id-marker" , marker_position.instance.c_str() },
2463 { NULL, NULL } };
2464 // don't include tenant in the url, it's already part of instance_key
2465 string p = string("/") + bs.bucket.name;
2466 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2467 }
2468 if (retcode < 0) {
2469 return set_cr_error(retcode);
2470 }
2471 return set_cr_done();
2472 }
2473 return 0;
2474 }
2475};
2476
2477class RGWListBucketIndexLogCR: public RGWCoroutine {
2478 RGWDataSyncEnv *sync_env;
2479 const string instance_key;
2480 string marker;
2481
2482 list<rgw_bi_log_entry> *result;
81eedcae 2483 std::optional<PerfGuard> timer;
7c673cae
FG
2484
2485public:
2486 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2487 string& _marker, list<rgw_bi_log_entry> *_result)
2488 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2489 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2490
2491 int operate() override {
2492 reenter(this) {
81eedcae
TL
2493 if (sync_env->counters) {
2494 timer.emplace(sync_env->counters, sync_counters::l_poll);
2495 }
7c673cae
FG
2496 yield {
2497 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2498 { "format" , "json" },
2499 { "marker" , marker.c_str() },
2500 { "type", "bucket-index" },
2501 { NULL, NULL } };
2502
2503 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2504 }
81eedcae 2505 timer.reset();
7c673cae 2506 if (retcode < 0) {
81eedcae
TL
2507 if (sync_env->counters) {
2508 sync_env->counters->inc(sync_counters::l_poll_err);
2509 }
7c673cae
FG
2510 return set_cr_error(retcode);
2511 }
2512 return set_cr_done();
2513 }
2514 return 0;
2515 }
2516};
2517
2518#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2519
2520class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2521 RGWDataSyncEnv *sync_env;
2522
2523 string marker_oid;
2524 rgw_bucket_shard_full_sync_marker sync_marker;
2525
11fdf7f2
TL
2526 RGWSyncTraceNodeRef tn;
2527
7c673cae
FG
2528public:
2529 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2530 const string& _marker_oid,
2531 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2532 sync_env(_sync_env),
2533 marker_oid(_marker_oid),
2534 sync_marker(_marker) {}
2535
11fdf7f2
TL
2536 void set_tn(RGWSyncTraceNodeRef& _tn) {
2537 tn = _tn;
2538 }
2539
7c673cae
FG
2540 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2541 sync_marker.position = new_marker;
2542 sync_marker.count = index_pos;
2543
2544 map<string, bufferlist> attrs;
2545 sync_marker.encode_attr(attrs);
2546
2547 RGWRados *store = sync_env->store;
2548
11fdf7f2
TL
2549 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
2550 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2551 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2552 attrs);
2553 }
91327a77 2554
11fdf7f2 2555 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2556 return new RGWLastCallerWinsCR(sync_env->cct);
2557 }
7c673cae
FG
2558};
2559
2560class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2561 RGWDataSyncEnv *sync_env;
2562
2563 string marker_oid;
2564 rgw_bucket_shard_inc_sync_marker sync_marker;
2565
2566 map<rgw_obj_key, string> key_to_marker;
91327a77
AA
2567
2568 struct operation {
2569 rgw_obj_key key;
2570 bool is_olh;
2571 };
2572 map<string, operation> marker_to_op;
2573 std::set<std::string> pending_olh; // object names with pending olh operations
7c673cae 2574
11fdf7f2
TL
2575 RGWSyncTraceNodeRef tn;
2576
7c673cae 2577 void handle_finish(const string& marker) override {
91327a77
AA
2578 auto iter = marker_to_op.find(marker);
2579 if (iter == marker_to_op.end()) {
7c673cae
FG
2580 return;
2581 }
91327a77
AA
2582 auto& op = iter->second;
2583 key_to_marker.erase(op.key);
2584 reset_need_retry(op.key);
2585 if (op.is_olh) {
2586 pending_olh.erase(op.key.name);
2587 }
2588 marker_to_op.erase(iter);
7c673cae
FG
2589 }
2590
2591public:
2592 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2593 const string& _marker_oid,
2594 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2595 sync_env(_sync_env),
2596 marker_oid(_marker_oid),
2597 sync_marker(_marker) {}
2598
11fdf7f2
TL
2599 void set_tn(RGWSyncTraceNodeRef& _tn) {
2600 tn = _tn;
2601 }
2602
7c673cae
FG
2603 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2604 sync_marker.position = new_marker;
2605
2606 map<string, bufferlist> attrs;
2607 sync_marker.encode_attr(attrs);
2608
2609 RGWRados *store = sync_env->store;
2610
11fdf7f2 2611 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae 2612 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
11fdf7f2
TL
2613 store->svc.sysobj,
2614 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2615 attrs);
2616 }
2617
2618 /*
2619 * create index from key -> <op, marker>, and from marker -> key
2620 * this is useful so that we can insure that we only have one
2621 * entry for any key that is used. This is needed when doing
2622 * incremenatl sync of data, and we don't want to run multiple
2623 * concurrent sync operations for the same bucket shard
2624 * Also, we should make sure that we don't run concurrent operations on the same key with
2625 * different ops.
2626 */
91327a77
AA
2627 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2628 auto result = key_to_marker.emplace(key, marker);
2629 if (!result.second) { // exists
7c673cae
FG
2630 set_need_retry(key);
2631 return false;
2632 }
91327a77
AA
2633 marker_to_op[marker] = operation{key, is_olh};
2634 if (is_olh) {
2635 // prevent other olh ops from starting on this object name
2636 pending_olh.insert(key.name);
2637 }
7c673cae
FG
2638 return true;
2639 }
2640
91327a77
AA
2641 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2642 // serialize olh ops on the same object name
2643 if (is_olh && pending_olh.count(key.name)) {
11fdf7f2 2644 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
91327a77
AA
2645 return false;
2646 }
7c673cae
FG
2647 return (key_to_marker.find(key) == key_to_marker.end());
2648 }
91327a77 2649
11fdf7f2 2650 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2651 return new RGWLastCallerWinsCR(sync_env->cct);
2652 }
7c673cae
FG
2653};
2654
2655template <class T, class K>
2656class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2657 RGWDataSyncEnv *sync_env;
2658
2659 RGWBucketInfo *bucket_info;
2660 const rgw_bucket_shard& bs;
2661
2662 rgw_obj_key key;
2663 bool versioned;
11fdf7f2 2664 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
2665 rgw_bucket_entry_owner owner;
2666 real_time timestamp;
2667 RGWModifyOp op;
2668 RGWPendingState op_state;
2669
2670 T entry_marker;
2671 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2672
2673 int sync_status;
2674
2675 stringstream error_ss;
2676
7c673cae
FG
2677 bool error_injection;
2678
2679 RGWDataSyncModule *data_sync_module;
31f18b77
FG
2680
2681 rgw_zone_set zones_trace;
7c673cae 2682
11fdf7f2 2683 RGWSyncTraceNodeRef tn;
7c673cae
FG
2684public:
2685 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2686 RGWBucketInfo *_bucket_info,
2687 const rgw_bucket_shard& bs,
91327a77 2688 const rgw_obj_key& _key, bool _versioned,
11fdf7f2 2689 std::optional<uint64_t> _versioned_epoch,
7c673cae
FG
2690 real_time& _timestamp,
2691 const rgw_bucket_entry_owner& _owner,
2692 RGWModifyOp _op, RGWPendingState _op_state,
11fdf7f2
TL
2693 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
2694 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
2695 sync_env(_sync_env),
2696 bucket_info(_bucket_info), bs(bs),
2697 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2698 owner(_owner),
2699 timestamp(_timestamp), op(_op),
2700 op_state(_op_state),
2701 entry_marker(_entry_marker),
2702 marker_tracker(_marker_tracker),
31f18b77 2703 sync_status(0){
7c673cae 2704 stringstream ss;
91327a77 2705 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
7c673cae 2706 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
7c673cae
FG
2707 set_status("init");
2708
11fdf7f2 2709 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
7c673cae 2710
11fdf7f2 2711 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
7c673cae
FG
2712 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2713
2714 data_sync_module = sync_env->sync_module->get_data_handler();
31f18b77
FG
2715
2716 zones_trace = _zones_trace;
11fdf7f2 2717 zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
7c673cae
FG
2718 }
2719
2720 int operate() override {
2721 reenter(this) {
2722 /* skip entries that are not complete */
2723 if (op_state != CLS_RGW_STATE_COMPLETE) {
2724 goto done;
2725 }
11fdf7f2 2726 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2727 do {
2728 yield {
2729 marker_tracker->reset_need_retry(key);
2730 if (key.name.empty()) {
2731 /* shouldn't happen */
2732 set_status("skipping empty entry");
11fdf7f2 2733 tn->log(0, "entry with empty obj name, skipping");
7c673cae
FG
2734 goto done;
2735 }
2736 if (error_injection &&
2737 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
11fdf7f2 2738 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
7c673cae
FG
2739 retcode = -EIO;
2740 } else if (op == CLS_RGW_OP_ADD ||
2741 op == CLS_RGW_OP_LINK_OLH) {
7c673cae 2742 set_status("syncing obj");
11fdf7f2 2743 tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
31f18b77 2744 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
7c673cae
FG
2745 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2746 set_status("removing obj");
2747 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2748 versioned = true;
2749 }
11fdf7f2 2750 tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2751 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
f64942e4
AA
2752 // our copy of the object is more recent, continue as if it succeeded
2753 if (retcode == -ERR_PRECONDITION_FAILED) {
2754 retcode = 0;
2755 }
7c673cae 2756 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
7c673cae 2757 set_status("creating delete marker");
11fdf7f2 2758 tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2759 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
7c673cae 2760 }
11fdf7f2 2761 tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
7c673cae
FG
2762 }
2763 } while (marker_tracker->need_retry(key));
2764 {
11fdf7f2 2765 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 2766 if (retcode >= 0) {
11fdf7f2 2767 tn->log(10, "success");
7c673cae 2768 } else {
11fdf7f2 2769 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
7c673cae 2770 }
7c673cae
FG
2771 }
2772
2773 if (retcode < 0 && retcode != -ENOENT) {
2774 set_status() << "failed to sync obj; retcode=" << retcode;
11fdf7f2
TL
2775 tn->log(0, SSTR("ERROR: failed to sync object: "
2776 << bucket_shard_str{bs} << "/" << key.name));
7c673cae
FG
2777 error_ss << bucket_shard_str{bs} << "/" << key.name;
2778 sync_status = retcode;
2779 }
2780 if (!error_ss.str().empty()) {
28e407b8 2781 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
7c673cae
FG
2782 }
2783done:
2784 if (sync_status == 0) {
2785 /* update marker */
2786 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2787 yield call(marker_tracker->finish(entry_marker));
2788 sync_status = retcode;
2789 }
2790 if (sync_status < 0) {
2791 return set_cr_error(sync_status);
2792 }
2793 return set_cr_done();
2794 }
2795 return 0;
2796 }
2797};
2798
2799#define BUCKET_SYNC_SPAWN_WINDOW 20
2800
2801class RGWBucketShardFullSyncCR : public RGWCoroutine {
2802 RGWDataSyncEnv *sync_env;
2803 const rgw_bucket_shard& bs;
2804 RGWBucketInfo *bucket_info;
2805 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2806 bucket_list_result list_result;
2807 list<bucket_list_entry>::iterator entries_iter;
91327a77 2808 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2809 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2810 rgw_obj_key list_marker;
2811 bucket_list_entry *entry{nullptr};
7c673cae
FG
2812
2813 int total_entries{0};
2814
2815 int sync_status{0};
2816
2817 const string& status_oid;
2818
31f18b77 2819 rgw_zone_set zones_trace;
11fdf7f2
TL
2820
2821 RGWSyncTraceNodeRef tn;
7c673cae
FG
2822public:
2823 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2824 RGWBucketInfo *_bucket_info,
2825 const std::string& status_oid,
2826 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2827 rgw_bucket_shard_sync_info& sync_info,
2828 RGWSyncTraceNodeRef tn_parent)
7c673cae 2829 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2830 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2831 marker_tracker(sync_env, status_oid, sync_info.full_marker),
11fdf7f2
TL
2832 status_oid(status_oid),
2833 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
2834 SSTR(bucket_shard_str{bs}))) {
31f18b77 2835 zones_trace.insert(sync_env->source_zone);
11fdf7f2 2836 marker_tracker.set_tn(tn);
7c673cae
FG
2837 }
2838
2839 int operate() override;
2840};
2841
2842int RGWBucketShardFullSyncCR::operate()
2843{
2844 int ret;
2845 reenter(this) {
91327a77 2846 list_marker = sync_info.full_marker.position;
7c673cae 2847
91327a77 2848 total_entries = sync_info.full_marker.count;
7c673cae
FG
2849 do {
2850 if (!lease_cr->is_locked()) {
2851 drain_all();
2852 return set_cr_error(-ECANCELED);
2853 }
2854 set_status("listing remote bucket");
11fdf7f2 2855 tn->log(20, "listing bucket for full sync");
7c673cae
FG
2856 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2857 &list_result));
2858 if (retcode < 0 && retcode != -ENOENT) {
2859 set_status("failed bucket listing, going down");
2860 drain_all();
2861 return set_cr_error(retcode);
2862 }
11fdf7f2
TL
2863 if (list_result.entries.size() > 0) {
2864 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
2865 }
7c673cae
FG
2866 entries_iter = list_result.entries.begin();
2867 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2868 if (!lease_cr->is_locked()) {
2869 drain_all();
2870 return set_cr_error(-ECANCELED);
2871 }
11fdf7f2
TL
2872 tn->log(20, SSTR("[full sync] syncing object: "
2873 << bucket_shard_str{bs} << "/" << entries_iter->key));
7c673cae
FG
2874 entry = &(*entries_iter);
2875 total_entries++;
2876 list_marker = entries_iter->key;
2877 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
11fdf7f2 2878 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
7c673cae 2879 } else {
7c673cae
FG
2880 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2881 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2882 false, /* versioned, only matters for object removal */
2883 entry->versioned_epoch, entry->mtime,
a8e16298 2884 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
11fdf7f2 2885 entry->key, &marker_tracker, zones_trace, tn),
7c673cae
FG
2886 false);
2887 }
2888 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2889 yield wait_for_child();
2890 bool again = true;
2891 while (again) {
2892 again = collect(&ret, nullptr);
2893 if (ret < 0) {
11fdf7f2 2894 tn->log(10, "a sync operation returned error");
7c673cae
FG
2895 sync_status = ret;
2896 /* we have reported this error */
2897 }
2898 }
2899 }
2900 }
2901 } while (list_result.is_truncated && sync_status == 0);
2902 set_status("done iterating over all objects");
2903 /* wait for all operations to complete */
2904 while (num_spawned()) {
2905 yield wait_for_child();
2906 bool again = true;
2907 while (again) {
2908 again = collect(&ret, nullptr);
2909 if (ret < 0) {
11fdf7f2 2910 tn->log(10, "a sync operation returned error");
7c673cae
FG
2911 sync_status = ret;
2912 /* we have reported this error */
2913 }
2914 }
2915 }
11fdf7f2 2916 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2917 if (!lease_cr->is_locked()) {
2918 return set_cr_error(-ECANCELED);
2919 }
2920 /* update sync state to incremental */
2921 if (sync_status == 0) {
2922 yield {
91327a77 2923 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
7c673cae 2924 map<string, bufferlist> attrs;
91327a77 2925 sync_info.encode_state_attr(attrs);
7c673cae 2926 RGWRados *store = sync_env->store;
11fdf7f2
TL
2927 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2928 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
2929 attrs));
2930 }
2931 } else {
11fdf7f2 2932 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
2933 }
2934 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
11fdf7f2
TL
2935 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
2936 << bucket_shard_str{bs} << " retcode=" << retcode));
7c673cae
FG
2937 return set_cr_error(retcode);
2938 }
2939 if (sync_status < 0) {
2940 return set_cr_error(sync_status);
2941 }
2942 return set_cr_done();
2943 }
2944 return 0;
2945}
2946
91327a77
AA
2947static bool has_olh_epoch(RGWModifyOp op) {
2948 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2949}
2950
7c673cae
FG
2951class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2952 RGWDataSyncEnv *sync_env;
2953 const rgw_bucket_shard& bs;
2954 RGWBucketInfo *bucket_info;
2955 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2956 list<rgw_bi_log_entry> list_result;
91327a77 2957 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
7c673cae 2958 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
91327a77 2959 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2960 rgw_obj_key key;
2961 rgw_bi_log_entry *entry{nullptr};
2962 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2963 bool updated_status{false};
2964 const string& status_oid;
31f18b77 2965 const string& zone_id;
7c673cae
FG
2966
2967 string cur_id;
2968
7c673cae 2969 int sync_status{0};
c07f9fc5 2970 bool syncstopped{false};
7c673cae 2971
11fdf7f2 2972 RGWSyncTraceNodeRef tn;
7c673cae
FG
2973public:
2974 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2975 const rgw_bucket_shard& bs,
2976 RGWBucketInfo *_bucket_info,
2977 const std::string& status_oid,
2978 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2979 rgw_bucket_shard_sync_info& sync_info,
2980 RGWSyncTraceNodeRef& _tn_parent)
7c673cae 2981 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2982 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2983 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
11fdf7f2
TL
2984 status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
2985 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
2986 SSTR(bucket_shard_str{bs})))
91327a77 2987 {
7c673cae
FG
2988 set_description() << "bucket shard incremental sync bucket="
2989 << bucket_shard_str{bs};
2990 set_status("init");
11fdf7f2 2991 marker_tracker.set_tn(tn);
7c673cae
FG
2992 }
2993
2994 int operate() override;
2995};
2996
2997int RGWBucketShardIncrementalSyncCR::operate()
2998{
2999 int ret;
3000 reenter(this) {
3001 do {
3002 if (!lease_cr->is_locked()) {
3003 drain_all();
11fdf7f2 3004 tn->log(0, "ERROR: lease is not taken, abort");
7c673cae
FG
3005 return set_cr_error(-ECANCELED);
3006 }
11fdf7f2 3007 tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
91327a77
AA
3008 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
3009 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
7c673cae 3010 &list_result));
91327a77
AA
3011 if (retcode < 0 && retcode != -ENOENT) {
3012 /* wait for all operations to complete */
7c673cae 3013 drain_all();
91327a77 3014 return set_cr_error(retcode);
7c673cae
FG
3015 }
3016 squash_map.clear();
91327a77
AA
3017 entries_iter = list_result.begin();
3018 entries_end = list_result.end();
3019 for (; entries_iter != entries_end; ++entries_iter) {
3020 auto e = *entries_iter;
3021 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
3022 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
c07f9fc5 3023 syncstopped = true;
91327a77
AA
3024 entries_end = entries_iter; // dont sync past here
3025 break;
c07f9fc5 3026 }
91327a77 3027 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
c07f9fc5
FG
3028 continue;
3029 }
94b18763
FG
3030 if (e.op == CLS_RGW_OP_CANCEL) {
3031 continue;
3032 }
7c673cae
FG
3033 if (e.state != CLS_RGW_STATE_COMPLETE) {
3034 continue;
3035 }
31f18b77
FG
3036 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
3037 continue;
3038 }
7c673cae 3039 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
91327a77
AA
3040 // don't squash over olh entries - we need to apply their olh_epoch
3041 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
3042 continue;
3043 }
7c673cae
FG
3044 if (squash_entry.first <= e.timestamp) {
3045 squash_entry = make_pair<>(e.timestamp, e.op);
3046 }
3047 }
c07f9fc5 3048
7c673cae 3049 entries_iter = list_result.begin();
91327a77 3050 for (; entries_iter != entries_end; ++entries_iter) {
7c673cae
FG
3051 if (!lease_cr->is_locked()) {
3052 drain_all();
3053 return set_cr_error(-ECANCELED);
3054 }
3055 entry = &(*entries_iter);
3056 {
3057 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3058 if (p < 0) {
3059 cur_id = entry->id;
3060 } else {
3061 cur_id = entry->id.substr(p + 1);
3062 }
3063 }
91327a77 3064 sync_info.inc_marker.position = cur_id;
7c673cae 3065
c07f9fc5 3066 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
91327a77 3067 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
c07f9fc5
FG
3068 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3069 continue;
3070 }
3071
7c673cae
FG
3072 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
3073 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
11fdf7f2 3074 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
7c673cae
FG
3075 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3076 continue;
3077 }
3078
11fdf7f2 3079 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
7c673cae
FG
3080
3081 if (!key.ns.empty()) {
3082 set_status() << "skipping entry in namespace: " << entry->object;
11fdf7f2 3083 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
7c673cae
FG
3084 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3085 continue;
3086 }
3087
3088 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
3089 if (entry->op == CLS_RGW_OP_CANCEL) {
3090 set_status() << "canceled operation, skipping";
11fdf7f2
TL
3091 tn->log(20, SSTR("skipping object: "
3092 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
7c673cae
FG
3093 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3094 continue;
3095 }
3096 if (entry->state != CLS_RGW_STATE_COMPLETE) {
3097 set_status() << "non-complete operation, skipping";
11fdf7f2
TL
3098 tn->log(20, SSTR("skipping object: "
3099 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
7c673cae
FG
3100 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3101 continue;
3102 }
31f18b77
FG
3103 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
3104 set_status() << "redundant operation, skipping";
11fdf7f2
TL
3105 tn->log(20, SSTR("skipping object: "
3106 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
31f18b77
FG
3107 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3108 continue;
3109 }
7c673cae
FG
3110 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
3111 set_status() << "squashed operation, skipping";
11fdf7f2
TL
3112 tn->log(20, SSTR("skipping object: "
3113 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
91327a77 3114 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
7c673cae
FG
3115 continue;
3116 }
11fdf7f2
TL
3117 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
3118 tn->log(20, SSTR("syncing object: "
3119 << bucket_shard_str{bs} << "/" << key));
7c673cae 3120 updated_status = false;
91327a77 3121 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
7c673cae
FG
3122 if (!updated_status) {
3123 set_status() << "can't do op, conflicting inflight operation";
3124 updated_status = true;
3125 }
11fdf7f2 3126 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
7c673cae
FG
3127 yield wait_for_child();
3128 bool again = true;
3129 while (again) {
3130 again = collect(&ret, nullptr);
3131 if (ret < 0) {
11fdf7f2 3132 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
7c673cae
FG
3133 sync_status = ret;
3134 /* we have reported this error */
3135 }
3136 }
1adf2230
AA
3137 if (sync_status != 0)
3138 break;
3139 }
3140 if (sync_status != 0) {
3141 /* get error, stop */
3142 break;
7c673cae 3143 }
91327a77 3144 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
7c673cae 3145 set_status() << "can't do op, sync already in progress for object";
11fdf7f2 3146 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
7c673cae
FG
3147 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3148 continue;
3149 }
3150 // yield {
3151 set_status() << "start object sync";
3152 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
11fdf7f2 3153 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
7c673cae 3154 } else {
11fdf7f2 3155 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
3156 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
3157 if (entry->ver.pool < 0) {
3158 versioned_epoch = entry->ver.epoch;
3159 }
11fdf7f2 3160 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
7c673cae
FG
3161 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
3162 spawn(new SyncCR(sync_env, bucket_info, bs, key,
3163 entry->is_versioned(), versioned_epoch,
3164 entry->timestamp, owner, entry->op, entry->state,
11fdf7f2 3165 cur_id, &marker_tracker, entry->zones_trace, tn),
7c673cae
FG
3166 false);
3167 }
3168 // }
3169 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
3170 set_status() << "num_spawned() > spawn_window";
3171 yield wait_for_child();
3172 bool again = true;
3173 while (again) {
3174 again = collect(&ret, nullptr);
3175 if (ret < 0) {
11fdf7f2 3176 tn->log(10, "a sync operation returned error");
7c673cae
FG
3177 sync_status = ret;
3178 /* we have reported this error */
3179 }
3180 /* not waiting for child here */
3181 }
3182 }
3183 }
91327a77 3184 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
c07f9fc5 3185
7c673cae
FG
3186 while (num_spawned()) {
3187 yield wait_for_child();
3188 bool again = true;
3189 while (again) {
3190 again = collect(&ret, nullptr);
3191 if (ret < 0) {
11fdf7f2 3192 tn->log(10, "a sync operation returned error");
7c673cae
FG
3193 sync_status = ret;
3194 /* we have reported this error */
3195 }
3196 /* not waiting for child here */
3197 }
3198 }
11fdf7f2 3199 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 3200
91327a77
AA
3201 if (syncstopped) {
3202 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3203 // still disabled, we'll delete the sync status object. otherwise we'll
3204 // restart full sync to catch any changes that happened while sync was
3205 // disabled
3206 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
3207 return set_cr_done();
3208 }
3209
7c673cae
FG
3210 yield call(marker_tracker.flush());
3211 if (retcode < 0) {
11fdf7f2 3212 tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
7c673cae
FG
3213 return set_cr_error(retcode);
3214 }
3215 if (sync_status < 0) {
11fdf7f2 3216 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
3217 return set_cr_error(sync_status);
3218 }
7c673cae
FG
3219 return set_cr_done();
3220 }
3221 return 0;
3222}
3223
3224int RGWRunBucketSyncCoroutine::operate()
3225{
3226 reenter(this) {
3227 yield {
3228 set_status("acquiring sync lock");
3229 auto store = sync_env->store;
31f18b77 3230 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 3231 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
3232 "sync_lock",
3233 cct->_conf->rgw_sync_lease_period,
3234 this));
3235 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
3236 }
3237 while (!lease_cr->is_locked()) {
3238 if (lease_cr->is_done()) {
11fdf7f2 3239 tn->log(5, "failed to take lease");
7c673cae 3240 set_status("lease lock failed, early abort");
a8e16298 3241 drain_all();
7c673cae
FG
3242 return set_cr_error(lease_cr->get_ret_status());
3243 }
3244 set_sleeping(true);
3245 yield;
3246 }
3247
11fdf7f2 3248 tn->log(10, "took lease");
7c673cae
FG
3249 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3250 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 3251 tn->log(0, "ERROR: failed to read sync status for bucket");
7c673cae
FG
3252 lease_cr->go_down();
3253 drain_all();
3254 return set_cr_error(retcode);
3255 }
3256
11fdf7f2 3257 tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
7c673cae
FG
3258
3259 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3260 if (retcode == -ENOENT) {
3261 /* bucket instance info has not been synced in yet, fetch it now */
3262 yield {
11fdf7f2 3263 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
7c673cae
FG
3264 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3265
11fdf7f2
TL
3266 meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
3267 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
7c673cae
FG
3268
3269 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3270 string() /* no marker */,
3271 MDLOG_STATUS_COMPLETE,
11fdf7f2
TL
3272 NULL /* no marker tracker */,
3273 tn));
7c673cae
FG
3274 }
3275 if (retcode < 0) {
11fdf7f2 3276 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
7c673cae
FG
3277 lease_cr->go_down();
3278 drain_all();
3279 return set_cr_error(retcode);
3280 }
3281
3282 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3283 }
3284 if (retcode < 0) {
11fdf7f2 3285 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
7c673cae
FG
3286 lease_cr->go_down();
3287 drain_all();
3288 return set_cr_error(retcode);
3289 }
3290
91327a77
AA
3291 do {
3292 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3293 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
a8e16298 3294 if (retcode == -ENOENT) {
11fdf7f2 3295 tn->log(0, "bucket sync disabled");
a8e16298
TL
3296 lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
3297 lease_cr->wakeup();
3298 lease_cr.reset();
3299 drain_all();
3300 return set_cr_done();
3301 }
91327a77 3302 if (retcode < 0) {
11fdf7f2 3303 tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
91327a77
AA
3304 lease_cr->go_down();
3305 drain_all();
3306 return set_cr_error(retcode);
3307 }
7c673cae 3308 }
7c673cae 3309
91327a77
AA
3310 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3311 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3312 status_oid, lease_cr.get(),
11fdf7f2 3313 sync_status, tn));
91327a77 3314 if (retcode < 0) {
11fdf7f2 3315 tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
91327a77
AA
3316 lease_cr->go_down();
3317 drain_all();
3318 return set_cr_error(retcode);
3319 }
7c673cae 3320 }
7c673cae 3321
91327a77
AA
3322 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3323 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3324 status_oid, lease_cr.get(),
11fdf7f2 3325 sync_status, tn));
91327a77 3326 if (retcode < 0) {
11fdf7f2 3327 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
91327a77
AA
3328 lease_cr->go_down();
3329 drain_all();
3330 return set_cr_error(retcode);
3331 }
7c673cae 3332 }
91327a77
AA
3333 // loop back to previous states unless incremental sync returns normally
3334 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
7c673cae
FG
3335
3336 lease_cr->go_down();
3337 drain_all();
3338 return set_cr_done();
3339 }
3340
3341 return 0;
3342}
3343
3344RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3345{
11fdf7f2 3346 return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
7c673cae
FG
3347}
3348
3349int RGWBucketSyncStatusManager::init()
3350{
11fdf7f2 3351 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 3352 if (!conn) {
11fdf7f2 3353 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
3354 return -EINVAL;
3355 }
3356
11fdf7f2 3357 int ret = http_manager.start();
7c673cae 3358 if (ret < 0) {
11fdf7f2 3359 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
3360 return ret;
3361 }
3362
3363
3364 const string key = bucket.get_key();
3365
3366 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3367 { NULL, NULL } };
3368
3369 string path = string("/admin/metadata/bucket.instance");
3370
3371 bucket_instance_meta_info result;
3372 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3373 if (ret < 0) {
11fdf7f2 3374 ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
7c673cae
FG
3375 return ret;
3376 }
3377
3378 RGWBucketInfo& bi = result.data.get_bucket_info();
3379 num_shards = bi.num_shards;
3380
3381 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3382
3383 sync_module.reset(new RGWDefaultSyncModuleInstance());
3384
3385 int effective_num_shards = (num_shards ? num_shards : 1);
3386
3387 auto async_rados = store->get_async_rados();
3388
3389 for (int i = 0; i < effective_num_shards; i++) {
11fdf7f2
TL
3390 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
3391 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
7c673cae 3392 if (ret < 0) {
11fdf7f2 3393 ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
7c673cae
FG
3394 return ret;
3395 }
3396 source_logs[i] = l;
3397 }
3398
3399 return 0;
3400}
3401
3402int RGWBucketSyncStatusManager::init_sync_status()
3403{
3404 list<RGWCoroutinesStack *> stacks;
3405
3406 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3407 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3408 RGWRemoteBucketLog *l = iter->second;
3409 stack->call(l->init_sync_status_cr());
3410
3411 stacks.push_back(stack);
3412 }
3413
3414 return cr_mgr.run(stacks);
3415}
3416
3417int RGWBucketSyncStatusManager::read_sync_status()
3418{
3419 list<RGWCoroutinesStack *> stacks;
3420
3421 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3422 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3423 RGWRemoteBucketLog *l = iter->second;
3424 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3425
3426 stacks.push_back(stack);
3427 }
3428
3429 int ret = cr_mgr.run(stacks);
3430 if (ret < 0) {
11fdf7f2 3431 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3432 << bucket_str{bucket} << dendl;
3433 return ret;
3434 }
3435
3436 return 0;
3437}
3438
3439int RGWBucketSyncStatusManager::run()
3440{
3441 list<RGWCoroutinesStack *> stacks;
3442
3443 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3444 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3445 RGWRemoteBucketLog *l = iter->second;
3446 stack->call(l->run_sync_cr());
3447
3448 stacks.push_back(stack);
3449 }
3450
3451 int ret = cr_mgr.run(stacks);
3452 if (ret < 0) {
11fdf7f2 3453 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3454 << bucket_str{bucket} << dendl;
3455 return ret;
3456 }
3457
3458 return 0;
3459}
3460
11fdf7f2
TL
3461unsigned RGWBucketSyncStatusManager::get_subsys() const
3462{
3463 return dout_subsys;
3464}
3465
3466std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
3467{
3468 auto zone = std::string_view{source_zone};
3469 return out << "bucket sync zone:" << zone.substr(0, 8)
3470 << " bucket:" << bucket.name << ' ';
3471}
3472
7c673cae
FG
3473string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3474 const rgw_bucket_shard& bs)
3475{
3476 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3477}
3478
11fdf7f2
TL
3479string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
3480 const rgw_obj& obj)
3481{
3482 return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
3483 obj.key.name + ":" + obj.key.instance;
3484}
3485
b32b8144
FG
3486class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3487 static constexpr int max_concurrent_shards = 16;
3488 RGWRados *const store;
3489 RGWDataSyncEnv *const env;
3490 const int num_shards;
3491 rgw_bucket_shard bs;
3492
3493 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3494 Vector::iterator i, end;
3495
3496 public:
3497 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3498 int num_shards, const rgw_bucket& bucket,
3499 Vector *status)
3500 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3501 store(store), env(env), num_shards(num_shards),
3502 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3503 i(status->begin()), end(status->end())
3504 {}
3505
3506 bool spawn_next() override {
3507 if (i == end) {
3508 return false;
3509 }
3510 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3511 ++i;
3512 ++bs.shard_id;
3513 return true;
3514 }
3515};
3516
11fdf7f2 3517int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
28e407b8 3518 const RGWBucketInfo& bucket_info,
b32b8144
FG
3519 std::vector<rgw_bucket_shard_sync_info> *status)
3520{
28e407b8 3521 const auto num_shards = bucket_info.num_shards;
b32b8144 3522 status->clear();
28e407b8 3523 status->resize(std::max<size_t>(1, num_shards));
b32b8144
FG
3524
3525 RGWDataSyncEnv env;
3526 RGWSyncModuleInstanceRef module; // null sync module
11fdf7f2 3527 env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
81eedcae 3528 nullptr, nullptr, nullptr, source_zone, module, nullptr);
b32b8144
FG
3529
3530 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
28e407b8
AA
3531 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3532 bucket_info.bucket, status));
b32b8144
FG
3533}
3534
7c673cae
FG
3535
3536// TODO: move into rgw_data_sync_trim.cc
3537#undef dout_prefix
3538#define dout_prefix (*_dout << "data trim: ")
3539
3540namespace {
3541
3542/// return the marker that it's safe to trim up to
3543const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3544{
3545 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3546}
3547
7c673cae
FG
3548/// populate the container starting with 'dest' with the minimum stable marker
3549/// of each shard for all of the peers in [first, last)
3550template <typename IterIn, typename IterOut>
3551void take_min_markers(IterIn first, IterIn last, IterOut dest)
3552{
3553 if (first == last) {
3554 return;
3555 }
eafe8130
TL
3556 for (auto p = first; p != last; ++p) {
3557 auto m = dest;
7c673cae 3558 for (auto &shard : p->sync_markers) {
eafe8130
TL
3559 const auto& stable = get_stable_marker(shard.second);
3560 if (*m > stable) {
3561 *m = stable;
7c673cae
FG
3562 }
3563 ++m;
3564 }
3565 }
3566}
3567
3568} // anonymous namespace
3569
3570class DataLogTrimCR : public RGWCoroutine {
eafe8130 3571 using TrimCR = RGWSyncLogTrimCR;
7c673cae
FG
3572 RGWRados *store;
3573 RGWHTTPManager *http;
3574 const int num_shards;
3575 const std::string& zone_id; //< my zone id
3576 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
eafe8130 3577 std::vector<std::string> min_shard_markers; //< min marker per shard
7c673cae
FG
3578 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3579 int ret{0};
3580
3581 public:
3582 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3583 int num_shards, std::vector<std::string>& last_trim)
3584 : RGWCoroutine(store->ctx()), store(store), http(http),
3585 num_shards(num_shards),
11fdf7f2 3586 zone_id(store->svc.zone->get_zone().id),
81eedcae 3587 peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
eafe8130 3588 min_shard_markers(num_shards, TrimCR::max_marker),
7c673cae
FG
3589 last_trim(last_trim)
3590 {}
3591
3592 int operate() override;
3593};
3594
3595int DataLogTrimCR::operate()
3596{
3597 reenter(this) {
3598 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3599 set_status("fetching sync status");
3600 yield {
3601 // query data sync status from each sync peer
3602 rgw_http_param_pair params[] = {
3603 { "type", "data" },
3604 { "status", nullptr },
3605 { "source-zone", zone_id.c_str() },
3606 { nullptr, nullptr }
3607 };
3608
3609 auto p = peer_status.begin();
81eedcae 3610 for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
7c673cae
FG
3611 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3612 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3613 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3614 false);
3615 ++p;
3616 }
3617 }
3618
3619 // must get a successful reply from all peers to consider trimming
3620 ret = 0;
3621 while (ret == 0 && num_spawned() > 0) {
3622 yield wait_for_child();
3623 collect_next(&ret);
3624 }
3625 drain_all();
3626
3627 if (ret < 0) {
3628 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3629 return set_cr_error(ret);
3630 }
3631
3632 ldout(cct, 10) << "trimming log shards" << dendl;
3633 set_status("trimming log shards");
3634 yield {
3635 // determine the minimum marker for each shard
3636 take_min_markers(peer_status.begin(), peer_status.end(),
3637 min_shard_markers.begin());
3638
3639 for (int i = 0; i < num_shards; i++) {
3640 const auto& m = min_shard_markers[i];
eafe8130 3641 if (m <= last_trim[i]) {
7c673cae
FG
3642 continue;
3643 }
3644 ldout(cct, 10) << "trimming log shard " << i
eafe8130 3645 << " at marker=" << m
7c673cae 3646 << " last_trim=" << last_trim[i] << dendl;
7c673cae 3647 spawn(new TrimCR(store, store->data_log->get_oid(i),
eafe8130 3648 m, &last_trim[i]),
7c673cae
FG
3649 true);
3650 }
3651 }
3652 return set_cr_done();
3653 }
3654 return 0;
3655}
3656
a8e16298
TL
3657RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
3658 RGWHTTPManager *http,
3659 int num_shards,
3660 std::vector<std::string>& markers)
3661{
3662 return new DataLogTrimCR(store, http, num_shards, markers);
3663}
3664
7c673cae
FG
3665class DataLogTrimPollCR : public RGWCoroutine {
3666 RGWRados *store;
3667 RGWHTTPManager *http;
3668 const int num_shards;
3669 const utime_t interval; //< polling interval
3670 const std::string lock_oid; //< use first data log shard for lock
3671 const std::string lock_cookie;
3672 std::vector<std::string> last_trim; //< last trimmed marker per shard
3673
3674 public:
3675 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3676 int num_shards, utime_t interval)
3677 : RGWCoroutine(store->ctx()), store(store), http(http),
3678 num_shards(num_shards), interval(interval),
3679 lock_oid(store->data_log->get_oid(0)),
3680 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3681 last_trim(num_shards)
3682 {}
3683
3684 int operate() override;
3685};
3686
3687int DataLogTrimPollCR::operate()
3688{
3689 reenter(this) {
3690 for (;;) {
3691 set_status("sleeping");
3692 wait(interval);
3693
3694 // request a 'data_trim' lock that covers the entire wait interval to
3695 // prevent other gateways from attempting to trim for the duration
3696 set_status("acquiring trim lock");
3697 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
11fdf7f2 3698 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
7c673cae
FG
3699 "data_trim", lock_cookie,
3700 interval.sec()));
3701 if (retcode < 0) {
3702 // if the lock is already held, go back to sleep and try again later
3703 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3704 << interval.sec() << "s" << dendl;
3705 continue;
3706 }
3707
3708 set_status("trimming");
3709 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3710
3711 // note that the lock is not released. this is intentional, as it avoids
3712 // duplicating this work in other gateways
3713 }
3714 }
3715 return 0;
3716}
3717
3718RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3719 RGWHTTPManager *http,
3720 int num_shards, utime_t interval)
3721{
3722 return new DataLogTrimPollCR(store, http, num_shards, interval);
3723}