]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
CommitLineData
91327a77
AA
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include <boost/utility/string_ref.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/errno.h"
12
13#include "rgw_common.h"
14#include "rgw_rados.h"
11fdf7f2 15#include "rgw_zone.h"
7c673cae
FG
16#include "rgw_sync.h"
17#include "rgw_data_sync.h"
18#include "rgw_rest_conn.h"
19#include "rgw_cr_rados.h"
20#include "rgw_cr_rest.h"
21#include "rgw_http_client.h"
22#include "rgw_bucket.h"
23#include "rgw_metadata.h"
7c673cae 24#include "rgw_sync_module.h"
b32b8144 25#include "rgw_sync_log_trim.h"
7c673cae
FG
26
27#include "cls/lock/cls_lock_client.h"
28
11fdf7f2
TL
29#include "services/svc_zone.h"
30#include "services/svc_sync_modules.h"
31
32#include "include/random.h"
31f18b77
FG
33
34#include <boost/asio/yield.hpp>
35
7c673cae
FG
36#define dout_subsys ceph_subsys_rgw
37
38#undef dout_prefix
39#define dout_prefix (*_dout << "data sync: ")
40
41static string datalog_sync_status_oid_prefix = "datalog.sync-status";
42static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
43static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
44static string bucket_status_oid_prefix = "bucket.sync-status";
11fdf7f2 45static string object_status_oid_prefix = "bucket.sync-status";
7c673cae 46
7c673cae
FG
47
48void rgw_datalog_info::decode_json(JSONObj *obj) {
49 JSONDecoder::decode_json("num_objects", num_shards, obj);
50}
51
52void rgw_datalog_entry::decode_json(JSONObj *obj) {
53 JSONDecoder::decode_json("key", key, obj);
54 utime_t ut;
55 JSONDecoder::decode_json("timestamp", ut, obj);
56 timestamp = ut.to_real_time();
57}
58
59void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
60 JSONDecoder::decode_json("marker", marker, obj);
61 JSONDecoder::decode_json("truncated", truncated, obj);
62 JSONDecoder::decode_json("entries", entries, obj);
63};
64
65class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
66 static constexpr int MAX_CONCURRENT_SHARDS = 16;
67
68 RGWDataSyncEnv *env;
69 const int num_shards;
70 int shard_id{0};;
71
72 map<uint32_t, rgw_data_sync_marker>& markers;
73
74 public:
75 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
76 map<uint32_t, rgw_data_sync_marker>& markers)
77 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
78 env(env), num_shards(num_shards), markers(markers)
79 {}
80 bool spawn_next() override;
81};
82
83bool RGWReadDataSyncStatusMarkersCR::spawn_next()
84{
85 if (shard_id >= num_shards) {
86 return false;
87 }
88 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
89 spawn(new CR(env->async_rados, env->store->svc.sysobj,
90 rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
7c673cae
FG
91 &markers[shard_id]),
92 false);
93 shard_id++;
94 return true;
95}
96
28e407b8
AA
97class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
98 static constexpr int MAX_CONCURRENT_SHARDS = 16;
99
100 RGWDataSyncEnv *env;
101
102 uint64_t max_entries;
103 int num_shards;
11fdf7f2 104 int shard_id{0};
28e407b8
AA
105
106 string marker;
11fdf7f2 107 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
28e407b8
AA
108
109 public:
110 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
11fdf7f2 111 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
28e407b8 112 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
11fdf7f2 113 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
28e407b8
AA
114 {}
115 bool spawn_next() override;
116};
117
118bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
119{
11fdf7f2 120 if (shard_id >= num_shards)
28e407b8
AA
121 return false;
122
123 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
11fdf7f2
TL
124 auto& shard_keys = omapkeys[shard_id];
125 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
126 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
127 marker, max_entries, shard_keys), false);
28e407b8
AA
128
129 ++shard_id;
130 return true;
131}
132
7c673cae
FG
133class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
134 RGWDataSyncEnv *sync_env;
135 rgw_data_sync_status *sync_status;
136
137public:
138 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
139 rgw_data_sync_status *_status)
140 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
141 {}
142 int operate() override;
143};
144
145int RGWReadDataSyncStatusCoroutine::operate()
146{
147 reenter(this) {
148 // read sync info
149 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
150 yield {
151 bool empty_on_enoent = false; // fail on ENOENT
11fdf7f2
TL
152 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
153 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
154 &sync_status->sync_info, empty_on_enoent));
155 }
156 if (retcode < 0) {
157 ldout(sync_env->cct, 4) << "failed to read sync status info with "
158 << cpp_strerror(retcode) << dendl;
159 return set_cr_error(retcode);
160 }
161 // read shard markers
162 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
163 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
164 sync_status->sync_markers));
165 if (retcode < 0) {
166 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
167 << cpp_strerror(retcode) << dendl;
168 return set_cr_error(retcode);
169 }
170 return set_cr_done();
171 }
172 return 0;
173}
174
175class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
176 RGWDataSyncEnv *sync_env;
177
178 RGWRESTReadResource *http_op;
179
180 int shard_id;
181 RGWDataChangesLogInfo *shard_info;
182
183public:
184 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
185 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
186 sync_env(_sync_env),
187 http_op(NULL),
188 shard_id(_shard_id),
189 shard_info(_shard_info) {
190 }
191
192 ~RGWReadRemoteDataLogShardInfoCR() override {
193 if (http_op) {
194 http_op->put();
195 }
196 }
197
198 int operate() override {
199 reenter(this) {
200 yield {
201 char buf[16];
202 snprintf(buf, sizeof(buf), "%d", shard_id);
203 rgw_http_param_pair pairs[] = { { "type" , "data" },
204 { "id", buf },
205 { "info" , NULL },
206 { NULL, NULL } };
207
208 string p = "/admin/log/";
209
210 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
211
11fdf7f2 212 init_new_io(http_op);
7c673cae
FG
213
214 int ret = http_op->aio_read();
215 if (ret < 0) {
216 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
217 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
218 return set_cr_error(ret);
219 }
220
221 return io_block(0);
222 }
223 yield {
224 int ret = http_op->wait(shard_info);
225 if (ret < 0) {
226 return set_cr_error(ret);
227 }
228 return set_cr_done();
229 }
230 }
231 return 0;
232 }
233};
234
235struct read_remote_data_log_response {
236 string marker;
237 bool truncated;
238 list<rgw_data_change_log_entry> entries;
239
240 read_remote_data_log_response() : truncated(false) {}
241
242 void decode_json(JSONObj *obj) {
243 JSONDecoder::decode_json("marker", marker, obj);
244 JSONDecoder::decode_json("truncated", truncated, obj);
245 JSONDecoder::decode_json("entries", entries, obj);
246 };
247};
248
249class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
250 RGWDataSyncEnv *sync_env;
251
a8e16298 252 RGWRESTReadResource *http_op = nullptr;
7c673cae
FG
253
254 int shard_id;
a8e16298
TL
255 const std::string& marker;
256 string *pnext_marker;
7c673cae
FG
257 list<rgw_data_change_log_entry> *entries;
258 bool *truncated;
259
260 read_remote_data_log_response response;
261
262public:
a8e16298
TL
263 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
264 const std::string& marker, string *pnext_marker,
265 list<rgw_data_change_log_entry> *_entries,
266 bool *_truncated)
267 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
268 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
269 entries(_entries), truncated(_truncated) {
7c673cae
FG
270 }
271 ~RGWReadRemoteDataLogShardCR() override {
272 if (http_op) {
273 http_op->put();
274 }
275 }
276
277 int operate() override {
278 reenter(this) {
279 yield {
280 char buf[16];
281 snprintf(buf, sizeof(buf), "%d", shard_id);
282 rgw_http_param_pair pairs[] = { { "type" , "data" },
283 { "id", buf },
a8e16298 284 { "marker", marker.c_str() },
7c673cae
FG
285 { "extra-info", "true" },
286 { NULL, NULL } };
287
288 string p = "/admin/log/";
289
290 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
291
11fdf7f2 292 init_new_io(http_op);
7c673cae
FG
293
294 int ret = http_op->aio_read();
295 if (ret < 0) {
296 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
297 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
298 return set_cr_error(ret);
299 }
300
301 return io_block(0);
302 }
303 yield {
304 int ret = http_op->wait(&response);
305 if (ret < 0) {
306 return set_cr_error(ret);
307 }
308 entries->clear();
309 entries->swap(response.entries);
a8e16298 310 *pnext_marker = response.marker;
7c673cae
FG
311 *truncated = response.truncated;
312 return set_cr_done();
313 }
314 }
315 return 0;
316 }
317};
318
319class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
320 RGWDataSyncEnv *sync_env;
321
322 int num_shards;
323 map<int, RGWDataChangesLogInfo> *datalog_info;
324
325 int shard_id;
326#define READ_DATALOG_MAX_CONCURRENT 10
327
328public:
329 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
330 int _num_shards,
331 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
332 sync_env(_sync_env), num_shards(_num_shards),
333 datalog_info(_datalog_info), shard_id(0) {}
334 bool spawn_next() override;
335};
336
337bool RGWReadRemoteDataLogInfoCR::spawn_next() {
338 if (shard_id >= num_shards) {
339 return false;
340 }
341 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
342 shard_id++;
343 return true;
344}
345
346class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
347 RGWDataSyncEnv *sync_env;
348 RGWRESTReadResource *http_op;
349
350 int shard_id;
351 string marker;
352 uint32_t max_entries;
353 rgw_datalog_shard_data *result;
354
355public:
356 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
357 const string& _marker, uint32_t _max_entries,
358 rgw_datalog_shard_data *_result)
359 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
360 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
361
362 int send_request() override {
363 RGWRESTConn *conn = sync_env->conn;
364 RGWRados *store = sync_env->store;
365
366 char buf[32];
367 snprintf(buf, sizeof(buf), "%d", shard_id);
368
369 char max_entries_buf[32];
370 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
371
372 const char *marker_key = (marker.empty() ? "" : "marker");
373
374 rgw_http_param_pair pairs[] = { { "type", "data" },
375 { "id", buf },
376 { "max-entries", max_entries_buf },
377 { marker_key, marker.c_str() },
378 { NULL, NULL } };
379
380 string p = "/admin/log/";
381
382 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
11fdf7f2 383 init_new_io(http_op);
7c673cae
FG
384
385 int ret = http_op->aio_read();
386 if (ret < 0) {
387 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
388 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
389 http_op->put();
390 return ret;
391 }
392
393 return 0;
394 }
395
396 int request_complete() override {
397 int ret = http_op->wait(result);
398 http_op->put();
399 if (ret < 0 && ret != -ENOENT) {
400 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
401 return ret;
402 }
403 return 0;
404 }
405};
406
407class RGWListRemoteDataLogCR : public RGWShardCollectCR {
408 RGWDataSyncEnv *sync_env;
409
410 map<int, string> shards;
411 int max_entries_per_shard;
412 map<int, rgw_datalog_shard_data> *result;
413
414 map<int, string>::iterator iter;
415#define READ_DATALOG_MAX_CONCURRENT 10
416
417public:
418 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
419 map<int, string>& _shards,
420 int _max_entries_per_shard,
421 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
422 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
423 result(_result) {
424 shards.swap(_shards);
425 iter = shards.begin();
426 }
427 bool spawn_next() override;
428};
429
430bool RGWListRemoteDataLogCR::spawn_next() {
431 if (iter == shards.end()) {
432 return false;
433 }
434
435 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
436 ++iter;
437 return true;
438}
439
440class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
441 static constexpr uint32_t lock_duration = 30;
442 RGWDataSyncEnv *sync_env;
443 RGWRados *store;
444 const rgw_pool& pool;
445 const uint32_t num_shards;
446
447 string sync_status_oid;
448
449 string lock_name;
450 string cookie;
451 rgw_data_sync_status *status;
452 map<int, RGWDataChangesLogInfo> shards_info;
11fdf7f2
TL
453
454 RGWSyncTraceNodeRef tn;
7c673cae
FG
455public:
456 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
31f18b77 457 uint64_t instance_id,
11fdf7f2 458 RGWSyncTraceNodeRef& _tn_parent,
7c673cae
FG
459 rgw_data_sync_status *status)
460 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
11fdf7f2
TL
461 pool(store->svc.zone->get_zone_params().log_pool),
462 num_shards(num_shards), status(status),
463 tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
7c673cae
FG
464 lock_name = "sync_lock";
465
31f18b77
FG
466 status->sync_info.instance_id = instance_id;
467
7c673cae
FG
468#define COOKIE_LEN 16
469 char buf[COOKIE_LEN + 1];
470
471 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
472 cookie = buf;
473
474 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
11fdf7f2 475
7c673cae
FG
476 }
477
478 int operate() override {
479 int ret;
480 reenter(this) {
481 using LockCR = RGWSimpleRadosLockCR;
482 yield call(new LockCR(sync_env->async_rados, store,
483 rgw_raw_obj{pool, sync_status_oid},
484 lock_name, cookie, lock_duration));
485 if (retcode < 0) {
11fdf7f2 486 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
487 return set_cr_error(retcode);
488 }
489 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
11fdf7f2 490 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
491 rgw_raw_obj{pool, sync_status_oid},
492 status->sync_info));
493 if (retcode < 0) {
11fdf7f2 494 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
495 return set_cr_error(retcode);
496 }
497
498 /* take lock again, we just recreated the object */
499 yield call(new LockCR(sync_env->async_rados, store,
500 rgw_raw_obj{pool, sync_status_oid},
501 lock_name, cookie, lock_duration));
502 if (retcode < 0) {
11fdf7f2 503 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
7c673cae
FG
504 return set_cr_error(retcode);
505 }
506
11fdf7f2
TL
507 tn->log(10, "took lease");
508
7c673cae
FG
509 /* fetch current position in logs */
510 yield {
11fdf7f2 511 RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
7c673cae 512 if (!conn) {
11fdf7f2 513 tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
7c673cae
FG
514 return set_cr_error(-EIO);
515 }
516 for (uint32_t i = 0; i < num_shards; i++) {
517 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
518 }
519 }
520 while (collect(&ret, NULL)) {
521 if (ret < 0) {
11fdf7f2 522 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
7c673cae
FG
523 return set_state(RGWCoroutine_Error);
524 }
525 yield;
526 }
527 yield {
528 for (uint32_t i = 0; i < num_shards; i++) {
529 RGWDataChangesLogInfo& info = shards_info[i];
530 auto& marker = status->sync_markers[i];
531 marker.next_step_marker = info.marker;
532 marker.timestamp = info.last_update;
533 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
534 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
11fdf7f2 535 spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
536 rgw_raw_obj{pool, oid}, marker), true);
537 }
538 }
539 while (collect(&ret, NULL)) {
540 if (ret < 0) {
11fdf7f2 541 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
7c673cae
FG
542 return set_state(RGWCoroutine_Error);
543 }
544 yield;
545 }
546
547 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
11fdf7f2 548 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
7c673cae
FG
549 rgw_raw_obj{pool, sync_status_oid},
550 status->sync_info));
551 if (retcode < 0) {
11fdf7f2 552 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
7c673cae
FG
553 return set_cr_error(retcode);
554 }
555 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
556 rgw_raw_obj{pool, sync_status_oid},
557 lock_name, cookie));
558 return set_cr_done();
559 }
560 return 0;
561 }
562};
563
564int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
565{
566 rgw_http_param_pair pairs[] = { { "type", "data" },
567 { NULL, NULL } };
568
569 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
570 if (ret < 0) {
11fdf7f2 571 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
7c673cae
FG
572 return ret;
573 }
574
11fdf7f2 575 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
7c673cae
FG
576
577 return 0;
578}
579
580int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
581{
582 rgw_datalog_info log_info;
583 int ret = read_log_info(&log_info);
584 if (ret < 0) {
585 return ret;
586 }
587
588 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
589}
590
591int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
592{
7c673cae
FG
593 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
594}
595
11fdf7f2
TL
596int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
597 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module)
7c673cae 598{
11fdf7f2
TL
599 sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
600 _sync_tracer, _source_zone, _sync_module);
7c673cae
FG
601
602 if (initialized) {
603 return 0;
604 }
605
11fdf7f2 606 int ret = http_manager.start();
7c673cae 607 if (ret < 0) {
11fdf7f2 608 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
609 return ret;
610 }
611
11fdf7f2
TL
612 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
613
7c673cae
FG
614 initialized = true;
615
616 return 0;
617}
618
619void RGWRemoteDataLog::finish()
620{
621 stop();
622}
623
7c673cae
FG
624int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
625{
626 // cannot run concurrently with run_sync(), so run in a separate manager
627 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
628 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 629 int ret = http_manager.start();
7c673cae 630 if (ret < 0) {
11fdf7f2 631 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
632 return ret;
633 }
634 RGWDataSyncEnv sync_env_local = sync_env;
635 sync_env_local.http_manager = &http_manager;
636 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
637 http_manager.stop();
638 return ret;
639}
640
28e407b8
AA
641int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
642{
643 // cannot run concurrently with run_sync(), so run in a separate manager
644 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
645 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 646 int ret = http_manager.start();
28e407b8 647 if (ret < 0) {
11fdf7f2 648 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
649 return ret;
650 }
651 RGWDataSyncEnv sync_env_local = sync_env;
652 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
653 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
654 omapkeys.resize(num_shards);
28e407b8 655 uint64_t max_entries{1};
11fdf7f2 656 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
28e407b8
AA
657 http_manager.stop();
658
659 if (ret == 0) {
11fdf7f2
TL
660 for (int i = 0; i < num_shards; i++) {
661 if (omapkeys[i]->entries.size() != 0) {
662 recovering_shards.insert(i);
28e407b8
AA
663 }
664 }
665 }
666
667 return ret;
668}
669
7c673cae
FG
670int RGWRemoteDataLog::init_sync_status(int num_shards)
671{
672 rgw_data_sync_status sync_status;
b32b8144
FG
673 sync_status.sync_info.num_shards = num_shards;
674
7c673cae
FG
675 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
676 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 677 int ret = http_manager.start();
7c673cae 678 if (ret < 0) {
11fdf7f2 679 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
680 return ret;
681 }
682 RGWDataSyncEnv sync_env_local = sync_env;
683 sync_env_local.http_manager = &http_manager;
11fdf7f2
TL
684 auto instance_id = ceph::util::generate_random_number<uint64_t>();
685 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
7c673cae
FG
686 http_manager.stop();
687 return ret;
688}
689
690static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
691{
692 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
693 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
694 return string(buf);
695}
696
697struct bucket_instance_meta_info {
698 string key;
699 obj_version ver;
700 utime_t mtime;
701 RGWBucketInstanceMetadataObject data;
702
703 bucket_instance_meta_info() {}
704
705 void decode_json(JSONObj *obj) {
706 JSONDecoder::decode_json("key", key, obj);
707 JSONDecoder::decode_json("ver", ver, obj);
708 JSONDecoder::decode_json("mtime", mtime, obj);
709 JSONDecoder::decode_json("data", data, obj);
710 }
711};
712
713class RGWListBucketIndexesCR : public RGWCoroutine {
714 RGWDataSyncEnv *sync_env;
715
716 RGWRados *store;
717
718 rgw_data_sync_status *sync_status;
719 int num_shards;
720
721 int req_ret;
722 int ret;
723
724 list<string> result;
725 list<string>::iterator iter;
726
727 RGWShardedOmapCRManager *entries_index;
728
729 string oid_prefix;
730
731 string path;
732 bucket_instance_meta_info meta_info;
733 string key;
734 string s;
735 int i;
736
737 bool failed;
738
739public:
740 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
741 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
742 store(sync_env->store), sync_status(_sync_status),
743 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
744 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
745 path = "/admin/metadata/bucket.instance";
746 num_shards = sync_status->sync_info.num_shards;
747 }
748 ~RGWListBucketIndexesCR() override {
749 delete entries_index;
750 }
751
752 int operate() override {
753 reenter(this) {
7c673cae
FG
754 yield {
755 string entrypoint = string("/admin/metadata/bucket.instance");
756 /* FIXME: need a better scaling solution here, requires streaming output */
757 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
758 entrypoint, NULL, &result));
759 }
3efd9988 760 if (retcode < 0) {
11fdf7f2 761 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
3efd9988 762 return set_cr_error(retcode);
7c673cae 763 }
3efd9988 764 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
11fdf7f2 765 store->svc.zone->get_zone_params().log_pool,
3efd9988
FG
766 oid_prefix);
767 yield; // yield so OmapAppendCRs can start
7c673cae 768 for (iter = result.begin(); iter != result.end(); ++iter) {
11fdf7f2 769 ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
7c673cae
FG
770
771 key = *iter;
772
773 yield {
774 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
775 { NULL, NULL } };
776
777 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
778 }
779
780 num_shards = meta_info.data.get_bucket_info().num_shards;
781 if (num_shards > 0) {
782 for (i = 0; i < num_shards; i++) {
783 char buf[16];
784 snprintf(buf, sizeof(buf), ":%d", i);
785 s = key + buf;
786 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
787 }
788 } else {
789 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
790 }
791 }
792 yield {
793 if (!entries_index->finish()) {
794 failed = true;
795 }
796 }
797 if (!failed) {
798 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
799 int shard_id = (int)iter->first;
800 rgw_data_sync_marker& marker = iter->second;
801 marker.total_entries = entries_index->get_total_entries(shard_id);
11fdf7f2
TL
802 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
803 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
7c673cae
FG
804 marker), true);
805 }
806 } else {
807 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
808 EIO, string("failed to build bucket instances map")));
809 }
810 while (collect(&ret, NULL)) {
811 if (ret < 0) {
812 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
813 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
814 req_ret = ret;
815 }
816 yield;
817 }
818 drain_all();
819 if (req_ret < 0) {
820 yield return set_cr_error(req_ret);
821 }
822 yield return set_cr_done();
823 }
824 return 0;
825 }
826};
827
828#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
829
830class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
831 RGWDataSyncEnv *sync_env;
832
833 string marker_oid;
834 rgw_data_sync_marker sync_marker;
835
836 map<string, string> key_to_marker;
837 map<string, string> marker_to_key;
838
839 void handle_finish(const string& marker) override {
840 map<string, string>::iterator iter = marker_to_key.find(marker);
841 if (iter == marker_to_key.end()) {
842 return;
843 }
844 key_to_marker.erase(iter->second);
845 reset_need_retry(iter->second);
846 marker_to_key.erase(iter);
847 }
848
11fdf7f2
TL
849 RGWSyncTraceNodeRef tn;
850
7c673cae
FG
851public:
852 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
853 const string& _marker_oid,
11fdf7f2
TL
854 const rgw_data_sync_marker& _marker,
855 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
7c673cae
FG
856 sync_env(_sync_env),
857 marker_oid(_marker_oid),
11fdf7f2
TL
858 sync_marker(_marker),
859 tn(_tn) {}
7c673cae
FG
860
861 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
862 sync_marker.marker = new_marker;
863 sync_marker.pos = index_pos;
864
11fdf7f2 865 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae
FG
866 RGWRados *store = sync_env->store;
867
11fdf7f2
TL
868 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
869 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
870 sync_marker);
871 }
872
873 /*
874 * create index from key -> marker, and from marker -> key
875 * this is useful so that we can insure that we only have one
876 * entry for any key that is used. This is needed when doing
877 * incremenatl sync of data, and we don't want to run multiple
878 * concurrent sync operations for the same bucket shard
879 */
880 bool index_key_to_marker(const string& key, const string& marker) {
881 if (key_to_marker.find(key) != key_to_marker.end()) {
882 set_need_retry(key);
883 return false;
884 }
885 key_to_marker[key] = marker;
886 marker_to_key[marker] = key;
887 return true;
888 }
91327a77 889
11fdf7f2 890 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
891 return new RGWLastCallerWinsCR(sync_env->cct);
892 }
7c673cae
FG
893};
894
895// ostream wrappers to print buckets without copying strings
896struct bucket_str {
897 const rgw_bucket& b;
11fdf7f2 898 explicit bucket_str(const rgw_bucket& b) : b(b) {}
7c673cae
FG
899};
900std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
901 auto& b = rhs.b;
902 if (!b.tenant.empty()) {
903 out << b.tenant << '/';
904 }
905 out << b.name;
906 if (!b.bucket_id.empty()) {
907 out << ':' << b.bucket_id;
908 }
909 return out;
910}
911
11fdf7f2
TL
912struct bucket_str_noinstance {
913 const rgw_bucket& b;
914 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
915};
916std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
917 auto& b = rhs.b;
918 if (!b.tenant.empty()) {
919 out << b.tenant << '/';
920 }
921 out << b.name;
922 return out;
923}
924
7c673cae
FG
925struct bucket_shard_str {
926 const rgw_bucket_shard& bs;
11fdf7f2 927 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
7c673cae
FG
928};
929std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
930 auto& bs = rhs.bs;
931 out << bucket_str{bs.bucket};
932 if (bs.shard_id >= 0) {
933 out << ':' << bs.shard_id;
934 }
935 return out;
936}
937
938class RGWRunBucketSyncCoroutine : public RGWCoroutine {
939 RGWDataSyncEnv *sync_env;
940 rgw_bucket_shard bs;
941 RGWBucketInfo bucket_info;
942 rgw_bucket_shard_sync_info sync_status;
943 RGWMetaSyncEnv meta_sync_env;
944
7c673cae
FG
945 const std::string status_oid;
946
947 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
948 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
949
11fdf7f2
TL
950 RGWSyncTraceNodeRef tn;
951
7c673cae 952public:
11fdf7f2 953 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
7c673cae 954 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
11fdf7f2
TL
955 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
956 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
957 SSTR(bucket_shard_str{bs}))) {
7c673cae
FG
958 }
959 ~RGWRunBucketSyncCoroutine() override {
960 if (lease_cr) {
961 lease_cr->abort();
962 }
963 }
964
965 int operate() override;
966};
967
968class RGWDataSyncSingleEntryCR : public RGWCoroutine {
969 RGWDataSyncEnv *sync_env;
970
971 string raw_key;
972 string entry_marker;
973
974 rgw_bucket_shard bs;
975
976 int sync_status;
977
978 bufferlist md_bl;
979
980 RGWDataSyncShardMarkerTrack *marker_tracker;
981
982 boost::intrusive_ptr<RGWOmapAppend> error_repo;
983 bool remove_from_repo;
984
985 set<string> keys;
986
11fdf7f2 987 RGWSyncTraceNodeRef tn;
7c673cae
FG
988public:
989 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
990 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
11fdf7f2 991 RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
992 sync_env(_sync_env),
993 raw_key(_raw_key), entry_marker(_entry_marker),
994 sync_status(0),
995 marker_tracker(_marker_tracker),
996 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
997 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
11fdf7f2 998 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
7c673cae
FG
999 }
1000
1001 int operate() override {
1002 reenter(this) {
1003 do {
1004 yield {
1005 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1006 &bs.bucket, &bs.shard_id);
1007 if (ret < 0) {
1008 return set_cr_error(-EIO);
1009 }
1010 if (marker_tracker) {
1011 marker_tracker->reset_need_retry(raw_key);
1012 }
11fdf7f2
TL
1013 tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
1014 call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
7c673cae
FG
1015 }
1016 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1017
1018 sync_status = retcode;
1019
1020 if (sync_status == -ENOENT) {
1021 // this was added when 'tenant/' was added to datalog entries, because
1022 // preexisting tenant buckets could never sync and would stay in the
1023 // error_repo forever
11fdf7f2 1024 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
7c673cae
FG
1025 sync_status = 0;
1026 }
1027
1028 if (sync_status < 0) {
b32b8144
FG
1029 // write actual sync failures for 'radosgw-admin sync error list'
1030 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1031 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1032 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1033 if (retcode < 0) {
11fdf7f2 1034 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
b32b8144 1035 }
7c673cae
FG
1036 }
1037 if (error_repo && !error_repo->append(raw_key)) {
11fdf7f2 1038 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
7c673cae
FG
1039 }
1040 } else if (error_repo && remove_from_repo) {
1041 keys = {raw_key};
1042 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1043 if (retcode < 0) {
11fdf7f2
TL
1044 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1045 << error_repo->get_obj() << " retcode=" << retcode));
7c673cae
FG
1046 }
1047 }
1048 /* FIXME: what do do in case of error */
1049 if (marker_tracker && !entry_marker.empty()) {
1050 /* update marker */
1051 yield call(marker_tracker->finish(entry_marker));
1052 }
1053 if (sync_status == 0) {
1054 sync_status = retcode;
1055 }
1056 if (sync_status < 0) {
1057 return set_cr_error(sync_status);
1058 }
1059 return set_cr_done();
1060 }
1061 return 0;
1062 }
1063};
1064
1065#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1066#define DATA_SYNC_MAX_ERR_ENTRIES 10
1067
1068class RGWDataSyncShardCR : public RGWCoroutine {
1069 RGWDataSyncEnv *sync_env;
1070
1071 rgw_pool pool;
1072
1073 uint32_t shard_id;
1074 rgw_data_sync_marker sync_marker;
1075
11fdf7f2 1076 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
1077 std::set<std::string> entries;
1078 std::set<std::string>::iterator iter;
7c673cae
FG
1079
1080 string oid;
1081
1082 RGWDataSyncShardMarkerTrack *marker_tracker;
1083
a8e16298 1084 std::string next_marker;
7c673cae
FG
1085 list<rgw_data_change_log_entry> log_entries;
1086 list<rgw_data_change_log_entry>::iterator log_iter;
1087 bool truncated;
1088
7c673cae
FG
1089 Mutex inc_lock;
1090 Cond inc_cond;
1091
1092 boost::asio::coroutine incremental_cr;
1093 boost::asio::coroutine full_cr;
1094
1095
1096 set<string> modified_shards;
1097 set<string> current_modified;
1098
1099 set<string>::iterator modified_iter;
1100
1101 int total_entries;
1102
1103 int spawn_window;
1104
1105 bool *reset_backoff;
1106
1107 set<string> spawned_keys;
1108
31f18b77
FG
1109 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1110 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
1111 string status_oid;
1112
1113
1114 string error_oid;
1115 RGWOmapAppend *error_repo;
28e407b8 1116 std::set<std::string> error_entries;
7c673cae
FG
1117 string error_marker;
1118 int max_error_entries;
1119
91327a77 1120 ceph::coarse_real_time error_retry_time;
7c673cae
FG
1121
1122#define RETRY_BACKOFF_SECS_MIN 60
1123#define RETRY_BACKOFF_SECS_DEFAULT 60
1124#define RETRY_BACKOFF_SECS_MAX 600
1125 uint32_t retry_backoff_secs;
1126
11fdf7f2 1127 RGWSyncTraceNodeRef tn;
7c673cae
FG
1128public:
1129 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1130 rgw_pool& _pool,
11fdf7f2
TL
1131 uint32_t _shard_id, const rgw_data_sync_marker& _marker,
1132 RGWSyncTraceNodeRef& _tn,
1133 bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1134 sync_env(_sync_env),
1135 pool(_pool),
1136 shard_id(_shard_id),
1137 sync_marker(_marker),
91327a77 1138 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
7c673cae
FG
1139 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1140 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
11fdf7f2 1141 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
7c673cae
FG
1142 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1143 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1144 error_oid = status_oid + ".retry";
7c673cae
FG
1145 }
1146
1147 ~RGWDataSyncShardCR() override {
1148 delete marker_tracker;
1149 if (lease_cr) {
1150 lease_cr->abort();
7c673cae
FG
1151 }
1152 if (error_repo) {
1153 error_repo->put();
1154 }
1155 }
1156
1157 void append_modified_shards(set<string>& keys) {
1158 Mutex::Locker l(inc_lock);
1159 modified_shards.insert(keys.begin(), keys.end());
1160 }
1161
1162 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1163 delete marker_tracker;
1164 marker_tracker = mt;
1165 }
1166
1167 int operate() override {
1168 int r;
1169 while (true) {
1170 switch (sync_marker.state) {
1171 case rgw_data_sync_marker::FullSync:
1172 r = full_sync();
1173 if (r < 0) {
11fdf7f2
TL
1174 if (r != -EBUSY) {
1175 tn->log(10, SSTR("full sync failed (r=" << r << ")"));
1176 }
7c673cae
FG
1177 return set_cr_error(r);
1178 }
1179 return 0;
1180 case rgw_data_sync_marker::IncrementalSync:
1181 r = incremental_sync();
1182 if (r < 0) {
11fdf7f2
TL
1183 if (r != -EBUSY) {
1184 tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
1185 }
7c673cae
FG
1186 return set_cr_error(r);
1187 }
1188 return 0;
1189 default:
1190 return set_cr_error(-EIO);
1191 }
1192 }
1193 return 0;
1194 }
1195
1196 void init_lease_cr() {
1197 set_status("acquiring sync lock");
1198 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1199 string lock_name = "sync_lock";
1200 if (lease_cr) {
1201 lease_cr->abort();
7c673cae
FG
1202 }
1203 RGWRados *store = sync_env->store;
31f18b77 1204 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 1205 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
1206 lock_name, lock_duration, this));
1207 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1208 }
1209
1210 int full_sync() {
1211#define OMAP_GET_MAX_ENTRIES 100
1212 int max_entries = OMAP_GET_MAX_ENTRIES;
1213 reenter(&full_cr) {
11fdf7f2 1214 tn->log(10, "start full sync");
7c673cae
FG
1215 yield init_lease_cr();
1216 while (!lease_cr->is_locked()) {
1217 if (lease_cr->is_done()) {
11fdf7f2 1218 tn->log(5, "failed to take lease");
7c673cae 1219 set_status("lease lock failed, early abort");
a8e16298 1220 drain_all();
7c673cae
FG
1221 return set_cr_error(lease_cr->get_ret_status());
1222 }
1223 set_sleeping(true);
1224 yield;
1225 }
11fdf7f2 1226 tn->log(10, "took lease");
7c673cae 1227 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
11fdf7f2 1228 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae
FG
1229 total_entries = sync_marker.pos;
1230 do {
91327a77
AA
1231 if (!lease_cr->is_locked()) {
1232 stop_spawned_services();
1233 drain_all();
1234 return set_cr_error(-ECANCELED);
1235 }
11fdf7f2
TL
1236 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1237 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1238 sync_marker.marker, max_entries, omapkeys));
7c673cae 1239 if (retcode < 0) {
11fdf7f2 1240 tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
7c673cae
FG
1241 lease_cr->go_down();
1242 drain_all();
1243 return set_cr_error(retcode);
1244 }
11fdf7f2
TL
1245 entries = std::move(omapkeys->entries);
1246 if (entries.size() > 0) {
1247 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1248 }
1249 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
7c673cae
FG
1250 iter = entries.begin();
1251 for (; iter != entries.end(); ++iter) {
11fdf7f2 1252 tn->log(20, SSTR("full sync: " << *iter));
7c673cae 1253 total_entries++;
28e407b8 1254 if (!marker_tracker->start(*iter, total_entries, real_time())) {
11fdf7f2 1255 tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
7c673cae
FG
1256 } else {
1257 // fetch remote and write locally
11fdf7f2 1258 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
7c673cae 1259 }
28e407b8 1260 sync_marker.marker = *iter;
91327a77
AA
1261
1262 while ((int)num_spawned() > spawn_window) {
1263 set_status() << "num_spawned() > spawn_window";
1264 yield wait_for_child();
1265 int ret;
1266 while (collect(&ret, lease_stack.get())) {
1267 if (ret < 0) {
11fdf7f2 1268 tn->log(10, "a sync operation returned error");
91327a77
AA
1269 }
1270 }
1271 }
7c673cae 1272 }
11fdf7f2
TL
1273 } while (omapkeys->more);
1274 omapkeys.reset();
7c673cae 1275
91327a77 1276 drain_all_but_stack(lease_stack.get());
7c673cae 1277
11fdf7f2
TL
1278 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1279
7c673cae
FG
1280 yield {
1281 /* update marker to reflect we're done with full sync */
1282 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1283 sync_marker.marker = sync_marker.next_step_marker;
1284 sync_marker.next_step_marker.clear();
1285 RGWRados *store = sync_env->store;
11fdf7f2
TL
1286 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1287 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
1288 sync_marker));
1289 }
1290 if (retcode < 0) {
11fdf7f2 1291 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
7c673cae 1292 lease_cr->go_down();
91327a77 1293 drain_all();
7c673cae
FG
1294 return set_cr_error(retcode);
1295 }
91327a77 1296 // keep lease and transition to incremental_sync()
7c673cae
FG
1297 }
1298 return 0;
1299 }
1300
1301 int incremental_sync() {
1302 reenter(&incremental_cr) {
11fdf7f2 1303 tn->log(10, "start incremental sync");
91327a77 1304 if (lease_cr) {
11fdf7f2 1305 tn->log(10, "lease already held from full sync");
91327a77
AA
1306 } else {
1307 yield init_lease_cr();
1308 while (!lease_cr->is_locked()) {
1309 if (lease_cr->is_done()) {
11fdf7f2 1310 tn->log(5, "failed to take lease");
91327a77 1311 set_status("lease lock failed, early abort");
a8e16298 1312 drain_all();
91327a77
AA
1313 return set_cr_error(lease_cr->get_ret_status());
1314 }
1315 set_sleeping(true);
1316 yield;
7c673cae 1317 }
91327a77 1318 set_status("lease acquired");
11fdf7f2 1319 tn->log(10, "took lease");
7c673cae 1320 }
7c673cae
FG
1321 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1322 rgw_raw_obj(pool, error_oid),
1323 1 /* no buffer */);
1324 error_repo->get();
1325 spawn(error_repo, false);
11fdf7f2 1326 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
7c673cae 1327 do {
91327a77
AA
1328 if (!lease_cr->is_locked()) {
1329 stop_spawned_services();
1330 drain_all();
1331 return set_cr_error(-ECANCELED);
1332 }
7c673cae
FG
1333 current_modified.clear();
1334 inc_lock.Lock();
1335 current_modified.swap(modified_shards);
1336 inc_lock.Unlock();
1337
11fdf7f2
TL
1338 if (current_modified.size() > 0) {
1339 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1340 }
7c673cae
FG
1341 /* process out of band updates */
1342 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1343 yield {
11fdf7f2
TL
1344 tn->log(20, SSTR("received async update notification: " << *modified_iter));
1345 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
7c673cae
FG
1346 }
1347 }
1348
91327a77
AA
1349 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1350 /* process bucket shards that previously failed */
11fdf7f2 1351 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
91327a77 1352 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
11fdf7f2
TL
1353 error_marker, max_error_entries, omapkeys));
1354 error_entries = std::move(omapkeys->entries);
1355 tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
91327a77
AA
1356 iter = error_entries.begin();
1357 for (; iter != error_entries.end(); ++iter) {
1358 error_marker = *iter;
11fdf7f2
TL
1359 tn->log(20, SSTR("handle error entry: " << error_marker));
1360 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
91327a77 1361 }
11fdf7f2 1362 if (!omapkeys->more) {
91327a77
AA
1363 if (error_marker.empty() && error_entries.empty()) {
1364 /* the retry repo is empty, we back off a bit before calling it again */
1365 retry_backoff_secs *= 2;
1366 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1367 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1368 }
1369 } else {
1370 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
7c673cae 1371 }
91327a77
AA
1372 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1373 error_marker.clear();
7c673cae 1374 }
7c673cae 1375 }
11fdf7f2 1376 omapkeys.reset();
7c673cae 1377
91327a77 1378#define INCREMENTAL_MAX_ENTRIES 100
11fdf7f2 1379 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
91327a77 1380 spawned_keys.clear();
a8e16298
TL
1381 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
1382 &next_marker, &log_entries, &truncated));
7c673cae 1383 if (retcode < 0) {
11fdf7f2 1384 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
7c673cae
FG
1385 stop_spawned_services();
1386 drain_all();
1387 return set_cr_error(retcode);
1388 }
11fdf7f2
TL
1389
1390 if (log_entries.size() > 0) {
1391 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1392 }
1393
91327a77 1394 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
11fdf7f2 1395 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
91327a77 1396 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
11fdf7f2 1397 tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
91327a77
AA
1398 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1399 continue;
7c673cae 1400 }
91327a77 1401 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
11fdf7f2 1402 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
91327a77
AA
1403 } else {
1404 /*
1405 * don't spawn the same key more than once. We can do that as long as we don't yield
1406 */
1407 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1408 spawned_keys.insert(log_iter->entry.key);
11fdf7f2 1409 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
91327a77
AA
1410 if (retcode < 0) {
1411 stop_spawned_services();
1412 drain_all();
1413 return set_cr_error(retcode);
7c673cae
FG
1414 }
1415 }
91327a77 1416 }
11fdf7f2
TL
1417 }
1418 while ((int)num_spawned() > spawn_window) {
1419 set_status() << "num_spawned() > spawn_window";
1420 yield wait_for_child();
1421 int ret;
1422 while (collect(&ret, lease_stack.get())) {
1423 if (ret < 0) {
1424 tn->log(10, "a sync operation returned error");
1425 /* we have reported this error */
7c673cae 1426 }
91327a77 1427 /* not waiting for child here */
7c673cae 1428 }
a8e16298 1429 }
11fdf7f2
TL
1430
1431 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1432 << " next_marker=" << next_marker << " truncated=" << truncated));
a8e16298
TL
1433 if (!next_marker.empty()) {
1434 sync_marker.marker = next_marker;
1435 } else if (!log_entries.empty()) {
1436 sync_marker.marker = log_entries.back().log_id;
1437 }
11fdf7f2
TL
1438 if (!truncated) {
1439 // we reached the end, wait a while before checking for more
1440 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1441 yield wait(get_idle_interval());
1442 }
7c673cae
FG
1443 } while (true);
1444 }
1445 return 0;
1446 }
91327a77
AA
1447
1448 utime_t get_idle_interval() const {
1449#define INCREMENTAL_INTERVAL 20
1450 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1451 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1452 auto now = ceph::coarse_real_clock::now();
1453 if (error_retry_time > now) {
1454 auto d = error_retry_time - now;
1455 if (interval > d) {
1456 interval = d;
1457 }
1458 }
1459 }
1460 // convert timespan -> time_point -> utime_t
1461 return utime_t(ceph::coarse_real_clock::zero() + interval);
1462 }
1463
7c673cae
FG
1464 void stop_spawned_services() {
1465 lease_cr->go_down();
1466 if (error_repo) {
1467 error_repo->finish();
1468 error_repo->put();
1469 error_repo = NULL;
1470 }
1471 }
1472};
1473
1474class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1475 RGWDataSyncEnv *sync_env;
1476
1477 rgw_pool pool;
1478
1479 uint32_t shard_id;
1480 rgw_data_sync_marker sync_marker;
1481
11fdf7f2 1482 RGWSyncTraceNodeRef tn;
7c673cae 1483public:
11fdf7f2
TL
1484 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
1485 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1486 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
7c673cae
FG
1487 sync_env(_sync_env),
1488 pool(_pool),
1489 shard_id(_shard_id),
1490 sync_marker(_marker) {
11fdf7f2 1491 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
7c673cae
FG
1492 }
1493
1494 RGWCoroutine *alloc_cr() override {
11fdf7f2 1495 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
7c673cae
FG
1496 }
1497
1498 RGWCoroutine *alloc_finisher_cr() override {
1499 RGWRados *store = sync_env->store;
11fdf7f2
TL
1500 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1501 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
7c673cae
FG
1502 &sync_marker);
1503 }
1504
1505 void append_modified_shards(set<string>& keys) {
1506 Mutex::Locker l(cr_lock());
1507
1508 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1509 if (!cr) {
1510 return;
1511 }
1512
1513 cr->append_modified_shards(keys);
1514 }
1515};
1516
1517class RGWDataSyncCR : public RGWCoroutine {
1518 RGWDataSyncEnv *sync_env;
1519 uint32_t num_shards;
1520
1521 rgw_data_sync_status sync_status;
1522
1523 RGWDataSyncShardMarkerTrack *marker_tracker;
1524
1525 Mutex shard_crs_lock;
1526 map<int, RGWDataSyncShardControlCR *> shard_crs;
1527
1528 bool *reset_backoff;
1529
11fdf7f2 1530 RGWSyncTraceNodeRef tn;
31f18b77
FG
1531
1532 RGWDataSyncModule *data_sync_module{nullptr};
7c673cae 1533public:
11fdf7f2 1534 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
1535 sync_env(_sync_env),
1536 num_shards(_num_shards),
1537 marker_tracker(NULL),
1538 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
11fdf7f2 1539 reset_backoff(_reset_backoff), tn(_tn) {
31f18b77 1540
7c673cae
FG
1541 }
1542
1543 ~RGWDataSyncCR() override {
1544 for (auto iter : shard_crs) {
1545 iter.second->put();
1546 }
1547 }
1548
1549 int operate() override {
1550 reenter(this) {
1551
1552 /* read sync status */
1553 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1554
31f18b77
FG
1555 data_sync_module = sync_env->sync_module->get_data_handler();
1556
224ce89b 1557 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 1558 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
7c673cae
FG
1559 return set_cr_error(retcode);
1560 }
1561
1562 /* state: init status */
1563 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
11fdf7f2 1564 tn->log(20, SSTR("init"));
224ce89b 1565 sync_status.sync_info.num_shards = num_shards;
31f18b77 1566 uint64_t instance_id;
11fdf7f2
TL
1567 instance_id = ceph::util::generate_random_number<uint64_t>();
1568 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
7c673cae 1569 if (retcode < 0) {
11fdf7f2 1570 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
7c673cae
FG
1571 return set_cr_error(retcode);
1572 }
1573 // sets state = StateBuildingFullSyncMaps
1574
1575 *reset_backoff = true;
1576 }
1577
31f18b77
FG
1578 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1579
7c673cae 1580 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
11fdf7f2 1581 tn->log(10, SSTR("building full sync maps"));
31f18b77 1582 /* call sync module init here */
b32b8144 1583 sync_status.sync_info.num_shards = num_shards;
31f18b77
FG
1584 yield call(data_sync_module->init_sync(sync_env));
1585 if (retcode < 0) {
11fdf7f2 1586 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
31f18b77
FG
1587 return set_cr_error(retcode);
1588 }
7c673cae 1589 /* state: building full sync maps */
7c673cae
FG
1590 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1591 if (retcode < 0) {
11fdf7f2 1592 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
7c673cae
FG
1593 return set_cr_error(retcode);
1594 }
1595 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1596
1597 /* update new state */
1598 yield call(set_sync_info_cr());
1599 if (retcode < 0) {
11fdf7f2 1600 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
7c673cae
FG
1601 return set_cr_error(retcode);
1602 }
1603
1604 *reset_backoff = true;
1605 }
1606
11fdf7f2
TL
1607 yield call(data_sync_module->start_sync(sync_env));
1608
7c673cae
FG
1609 yield {
1610 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
11fdf7f2 1611 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
7c673cae
FG
1612 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1613 iter != sync_status.sync_markers.end(); ++iter) {
11fdf7f2
TL
1614 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
1615 iter->first, iter->second, tn);
7c673cae
FG
1616 cr->get();
1617 shard_crs_lock.Lock();
1618 shard_crs[iter->first] = cr;
1619 shard_crs_lock.Unlock();
1620 spawn(cr, true);
1621 }
1622 }
1623 }
1624
1625 return set_cr_done();
1626 }
1627 return 0;
1628 }
1629
1630 RGWCoroutine *set_sync_info_cr() {
1631 RGWRados *store = sync_env->store;
11fdf7f2
TL
1632 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
1633 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
7c673cae
FG
1634 sync_status.sync_info);
1635 }
1636
1637 void wakeup(int shard_id, set<string>& keys) {
1638 Mutex::Locker l(shard_crs_lock);
1639 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1640 if (iter == shard_crs.end()) {
1641 return;
1642 }
1643 iter->second->append_modified_shards(keys);
1644 iter->second->wakeup();
1645 }
1646};
1647
1648class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1649public:
1650 RGWDefaultDataSyncModule() {}
1651
11fdf7f2 1652 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
31f18b77 1653 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae 1654 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1655 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae
FG
1656};
1657
1658class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1659 RGWDefaultDataSyncModule data_handler;
1660public:
1661 RGWDefaultSyncModuleInstance() {}
1662 RGWDataSyncModule *get_data_handler() override {
1663 return &data_handler;
1664 }
11fdf7f2
TL
1665 bool supports_user_writes() override {
1666 return true;
1667 }
7c673cae
FG
1668};
1669
11fdf7f2 1670int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
7c673cae
FG
1671{
1672 instance->reset(new RGWDefaultSyncModuleInstance());
1673 return 0;
1674}
1675
11fdf7f2 1676RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1677{
1678 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
11fdf7f2
TL
1679 std::nullopt,
1680 key, std::nullopt, versioned_epoch,
31f18b77 1681 true, zones_trace);
7c673cae
FG
1682}
1683
1684RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
31f18b77 1685 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1686{
1687 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1688 bucket_info, key, versioned, versioned_epoch,
31f18b77 1689 NULL, NULL, false, &mtime, zones_trace);
7c673cae
FG
1690}
1691
1692RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1693 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1694{
1695 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1696 bucket_info, key, versioned, versioned_epoch,
31f18b77 1697 &owner.id, &owner.display_name, true, &mtime, zones_trace);
7c673cae
FG
1698}
1699
11fdf7f2
TL
1700class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
1701public:
1702 RGWArchiveDataSyncModule() {}
1703
1704 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1705 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1706 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1707 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1708};
1709
1710class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
1711 RGWArchiveDataSyncModule data_handler;
1712public:
1713 RGWArchiveSyncModuleInstance() {}
1714 RGWDataSyncModule *get_data_handler() override {
1715 return &data_handler;
1716 }
1717 RGWMetadataHandler *alloc_bucket_meta_handler() override {
1718 return RGWArchiveBucketMetaHandlerAllocator::alloc();
1719 }
1720 RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
1721 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
1722 }
1723};
1724
1725int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1726{
1727 instance->reset(new RGWArchiveSyncModuleInstance());
1728 return 0;
1729}
1730
1731RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1732{
1733 ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1734 if (!bucket_info.versioned() ||
1735 (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
1736 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
1737 bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
1738 int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
1739 if (op_ret < 0) {
1740 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
1741 return NULL;
1742 }
1743 }
1744
1745 std::optional<rgw_obj_key> dest_key;
1746
1747 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
1748 versioned_epoch = 0;
1749 dest_key = key;
1750 if (key.instance.empty()) {
1751 sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
1752 }
1753 }
1754
1755 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1756 bucket_info, std::nullopt,
1757 key, dest_key, versioned_epoch,
1758 true, zones_trace);
1759}
1760
1761RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1762 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1763{
1764 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
1765 return NULL;
1766}
1767
1768RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1769 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1770{
1771 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
1772 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1773 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1774 bucket_info, key, versioned, versioned_epoch,
1775 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1776}
1777
7c673cae
FG
1778class RGWDataSyncControlCR : public RGWBackoffControlCR
1779{
1780 RGWDataSyncEnv *sync_env;
1781 uint32_t num_shards;
1782
11fdf7f2
TL
1783 RGWSyncTraceNodeRef tn;
1784
3efd9988 1785 static constexpr bool exit_on_error = false; // retry on all errors
7c673cae 1786public:
11fdf7f2
TL
1787 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
1788 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1789 sync_env(_sync_env), num_shards(_num_shards) {
1790 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
7c673cae
FG
1791 }
1792
1793 RGWCoroutine *alloc_cr() override {
11fdf7f2 1794 return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
7c673cae
FG
1795 }
1796
1797 void wakeup(int shard_id, set<string>& keys) {
1798 Mutex& m = cr_lock();
1799
1800 m.Lock();
1801 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1802 if (!cr) {
1803 m.Unlock();
1804 return;
1805 }
1806
1807 cr->get();
1808 m.Unlock();
1809
1810 if (cr) {
11fdf7f2 1811 tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
7c673cae
FG
1812 cr->wakeup(shard_id, keys);
1813 }
1814
1815 cr->put();
1816 }
1817};
1818
1819void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1820 RWLock::RLocker rl(lock);
1821 if (!data_sync_cr) {
1822 return;
1823 }
1824 data_sync_cr->wakeup(shard_id, keys);
1825}
1826
1827int RGWRemoteDataLog::run_sync(int num_shards)
1828{
1829 lock.get_write();
11fdf7f2 1830 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
7c673cae
FG
1831 data_sync_cr->get(); // run() will drop a ref, so take another
1832 lock.unlock();
1833
1834 int r = run(data_sync_cr);
1835
1836 lock.get_write();
1837 data_sync_cr->put();
1838 data_sync_cr = NULL;
1839 lock.unlock();
1840
1841 if (r < 0) {
11fdf7f2 1842 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
7c673cae
FG
1843 return r;
1844 }
1845 return 0;
1846}
1847
1848int RGWDataSyncStatusManager::init()
1849{
11fdf7f2
TL
1850 RGWZone *zone_def;
1851
1852 if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
1853 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
7c673cae
FG
1854 return -EIO;
1855 }
1856
11fdf7f2 1857 if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
7c673cae
FG
1858 return -ENOTSUP;
1859 }
1860
11fdf7f2 1861 const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
7c673cae 1862
94b18763
FG
1863 if (sync_module == nullptr) {
1864 sync_module = store->get_sync_module();
1865 }
7c673cae 1866
11fdf7f2 1867 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 1868 if (!conn) {
11fdf7f2 1869 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
1870 return -EINVAL;
1871 }
1872
1873 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1874
11fdf7f2 1875 int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(), sync_module);
7c673cae 1876 if (r < 0) {
11fdf7f2 1877 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
7c673cae
FG
1878 finalize();
1879 return r;
1880 }
1881
1882 rgw_datalog_info datalog_info;
1883 r = source_log.read_log_info(&datalog_info);
1884 if (r < 0) {
11fdf7f2 1885 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
7c673cae
FG
1886 finalize();
1887 return r;
1888 }
1889
1890 num_shards = datalog_info.num_shards;
1891
1892 for (int i = 0; i < num_shards; i++) {
1893 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1894 }
1895
1896 return 0;
1897}
1898
1899void RGWDataSyncStatusManager::finalize()
1900{
1901 delete error_logger;
1902 error_logger = nullptr;
1903}
1904
11fdf7f2
TL
1905unsigned RGWDataSyncStatusManager::get_subsys() const
1906{
1907 return dout_subsys;
1908}
1909
1910std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
1911{
1912 auto zone = std::string_view{source_zone};
1913 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
1914}
1915
7c673cae
FG
1916string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1917{
1918 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1919 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1920
1921 return string(buf);
1922}
1923
1924string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1925{
1926 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1927 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1928
1929 return string(buf);
1930}
1931
1932int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1933 const rgw_bucket& bucket, int shard_id,
1934 RGWSyncErrorLogger *_error_logger,
11fdf7f2 1935 RGWSyncTraceManager *_sync_tracer,
7c673cae
FG
1936 RGWSyncModuleInstanceRef& _sync_module)
1937{
1938 conn = _conn;
1939 source_zone = _source_zone;
1940 bs.bucket = bucket;
1941 bs.shard_id = shard_id;
1942
11fdf7f2
TL
1943 sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
1944 _error_logger, _sync_tracer, source_zone, _sync_module);
7c673cae
FG
1945
1946 return 0;
1947}
1948
7c673cae
FG
1949class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1950 RGWDataSyncEnv *sync_env;
1951 const string instance_key;
1952
28e407b8 1953 rgw_bucket_index_marker_info *info;
7c673cae
FG
1954
1955public:
1956 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1957 const rgw_bucket_shard& bs,
28e407b8 1958 rgw_bucket_index_marker_info *_info)
7c673cae
FG
1959 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1960 instance_key(bs.get_key()), info(_info) {}
1961
1962 int operate() override {
1963 reenter(this) {
1964 yield {
1965 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1966 { "bucket-instance", instance_key.c_str() },
1967 { "info" , NULL },
1968 { NULL, NULL } };
1969
1970 string p = "/admin/log/";
28e407b8 1971 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
7c673cae
FG
1972 }
1973 if (retcode < 0) {
1974 return set_cr_error(retcode);
1975 }
1976 return set_cr_done();
1977 }
1978 return 0;
1979 }
1980};
1981
1982class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1983 RGWDataSyncEnv *sync_env;
1984
1985 rgw_bucket_shard bs;
1986 const string sync_status_oid;
1987
1988 rgw_bucket_shard_sync_info& status;
1989
28e407b8 1990 rgw_bucket_index_marker_info info;
7c673cae
FG
1991public:
1992 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1993 const rgw_bucket_shard& bs,
1994 rgw_bucket_shard_sync_info& _status)
1995 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1996 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1997 status(_status)
1998 {}
1999
2000 int operate() override {
2001 reenter(this) {
2002 /* fetch current position in logs */
2003 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
2004 if (retcode < 0 && retcode != -ENOENT) {
2005 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
2006 return set_cr_error(retcode);
2007 }
2008 yield {
7c673cae 2009 auto store = sync_env->store;
11fdf7f2 2010 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
c07f9fc5
FG
2011
2012 if (info.syncstopped) {
2013 call(new RGWRadosRemoveCR(store, obj));
2014 } else {
2015 status.state = rgw_bucket_shard_sync_info::StateFullSync;
2016 status.inc_marker.position = info.max_marker;
2017 map<string, bufferlist> attrs;
2018 status.encode_all_attrs(attrs);
11fdf7f2 2019 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
c07f9fc5 2020 }
7c673cae 2021 }
91327a77
AA
2022 if (info.syncstopped) {
2023 retcode = -ENOENT;
2024 }
2025 if (retcode < 0) {
2026 return set_cr_error(retcode);
2027 }
7c673cae
FG
2028 return set_cr_done();
2029 }
2030 return 0;
2031 }
2032};
2033
2034RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
2035{
2036 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
2037}
2038
11fdf7f2
TL
2039#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2040
7c673cae 2041template <class T>
11fdf7f2 2042static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
7c673cae
FG
2043{
2044 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
2045 if (iter == attrs.end()) {
2046 *val = T();
11fdf7f2 2047 return false;
7c673cae
FG
2048 }
2049
11fdf7f2 2050 auto biter = iter->second.cbegin();
7c673cae 2051 try {
11fdf7f2 2052 decode(*val, biter);
7c673cae
FG
2053 } catch (buffer::error& err) {
2054 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
11fdf7f2 2055 return false;
7c673cae 2056 }
11fdf7f2 2057 return true;
7c673cae
FG
2058}
2059
2060void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
2061{
11fdf7f2
TL
2062 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
2063 decode_attr(cct, attrs, "state", &state);
2064 }
2065 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
2066 decode_attr(cct, attrs, "full_marker", &full_marker);
2067 }
2068 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
2069 decode_attr(cct, attrs, "inc_marker", &inc_marker);
2070 }
7c673cae
FG
2071}
2072
2073void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
2074{
2075 encode_state_attr(attrs);
2076 full_marker.encode_attr(attrs);
2077 inc_marker.encode_attr(attrs);
2078}
2079
2080void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
2081{
11fdf7f2
TL
2082 using ceph::encode;
2083 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
7c673cae
FG
2084}
2085
2086void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2087{
11fdf7f2
TL
2088 using ceph::encode;
2089 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
7c673cae
FG
2090}
2091
2092void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2093{
11fdf7f2
TL
2094 using ceph::encode;
2095 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
7c673cae
FG
2096}
2097
2098class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
2099 RGWDataSyncEnv *sync_env;
2100 string oid;
2101 rgw_bucket_shard_sync_info *status;
2102
2103 map<string, bufferlist> attrs;
2104public:
2105 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2106 const rgw_bucket_shard& bs,
2107 rgw_bucket_shard_sync_info *_status)
2108 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2109 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2110 status(_status) {}
2111 int operate() override;
2112};
2113
2114int RGWReadBucketSyncStatusCoroutine::operate()
2115{
2116 reenter(this) {
11fdf7f2
TL
2117 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
2118 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
2119 &attrs, true));
7c673cae
FG
2120 if (retcode == -ENOENT) {
2121 *status = rgw_bucket_shard_sync_info();
2122 return set_cr_done();
2123 }
2124 if (retcode < 0) {
2125 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2126 return set_cr_error(retcode);
2127 }
2128 status->decode_from_attrs(sync_env->cct, attrs);
2129 return set_cr_done();
2130 }
2131 return 0;
2132}
28e407b8
AA
2133
2134#define OMAP_READ_MAX_ENTRIES 10
2135class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2136 RGWDataSyncEnv *sync_env;
2137 RGWRados *store;
2138
2139 const int shard_id;
2140 int max_entries;
2141
2142 set<string>& recovering_buckets;
2143 string marker;
2144 string error_oid;
2145
11fdf7f2 2146 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
28e407b8
AA
2147 set<string> error_entries;
2148 int max_omap_entries;
2149 int count;
2150
2151public:
2152 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2153 set<string>& _recovering_buckets, const int _max_entries)
2154 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2155 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2156 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2157 {
2158 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2159 }
2160
2161 int operate() override;
2162};
2163
2164int RGWReadRecoveringBucketShardsCoroutine::operate()
2165{
2166 reenter(this){
2167 //read recovering bucket shards
2168 count = 0;
2169 do {
11fdf7f2
TL
2170 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
2171 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid),
2172 marker, max_omap_entries, omapkeys));
28e407b8
AA
2173
2174 if (retcode == -ENOENT) {
2175 break;
2176 }
2177
2178 if (retcode < 0) {
2179 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2180 << cpp_strerror(retcode) << dendl;
2181 return set_cr_error(retcode);
2182 }
2183
11fdf7f2 2184 error_entries = std::move(omapkeys->entries);
28e407b8
AA
2185 if (error_entries.empty()) {
2186 break;
2187 }
2188
2189 count += error_entries.size();
2190 marker = *error_entries.rbegin();
11fdf7f2
TL
2191 recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
2192 std::make_move_iterator(error_entries.end()));
2193 } while (omapkeys->more && count < max_entries);
28e407b8
AA
2194
2195 return set_cr_done();
2196 }
2197
2198 return 0;
2199}
2200
2201class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2202 RGWDataSyncEnv *sync_env;
2203 RGWRados *store;
2204
2205 const int shard_id;
2206 int max_entries;
2207
2208 set<string>& pending_buckets;
2209 string marker;
2210 string status_oid;
2211
2212 rgw_data_sync_marker* sync_marker;
2213 int count;
2214
a8e16298 2215 std::string next_marker;
28e407b8
AA
2216 list<rgw_data_change_log_entry> log_entries;
2217 bool truncated;
2218
2219public:
2220 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2221 set<string>& _pending_buckets,
2222 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2223 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2224 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2225 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2226 {
2227 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2228 }
2229
2230 int operate() override;
2231};
2232
2233int RGWReadPendingBucketShardsCoroutine::operate()
2234{
2235 reenter(this){
2236 //read sync status marker
2237 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
11fdf7f2
TL
2238 yield call(new CR(sync_env->async_rados, store->svc.sysobj,
2239 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
28e407b8
AA
2240 sync_marker));
2241 if (retcode < 0) {
2242 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2243 << cpp_strerror(retcode) << dendl;
2244 return set_cr_error(retcode);
2245 }
2246
2247 //read pending bucket shards
2248 marker = sync_marker->marker;
2249 count = 0;
2250 do{
a8e16298
TL
2251 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
2252 &next_marker, &log_entries, &truncated));
28e407b8
AA
2253
2254 if (retcode == -ENOENT) {
2255 break;
2256 }
2257
2258 if (retcode < 0) {
2259 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2260 << cpp_strerror(retcode) << dendl;
2261 return set_cr_error(retcode);
2262 }
2263
2264 if (log_entries.empty()) {
2265 break;
2266 }
2267
2268 count += log_entries.size();
2269 for (const auto& entry : log_entries) {
2270 pending_buckets.insert(entry.entry.key);
2271 }
2272 }while(truncated && count < max_entries);
2273
2274 return set_cr_done();
2275 }
2276
2277 return 0;
2278}
2279
2280int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2281{
2282 // cannot run concurrently with run_sync(), so run in a separate manager
2283 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2284 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
11fdf7f2 2285 int ret = http_manager.start();
28e407b8 2286 if (ret < 0) {
11fdf7f2 2287 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
28e407b8
AA
2288 return ret;
2289 }
2290 RGWDataSyncEnv sync_env_local = sync_env;
2291 sync_env_local.http_manager = &http_manager;
2292 list<RGWCoroutinesStack *> stacks;
2293 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2294 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2295 stacks.push_back(recovering_stack);
2296 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2297 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2298 stacks.push_back(pending_stack);
2299 ret = crs.run(stacks);
2300 http_manager.stop();
2301 return ret;
2302}
2303
7c673cae
FG
2304RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2305{
2306 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2307}
2308
2309RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2310 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2311 delete iter->second;
2312 }
2313 delete error_logger;
2314}
2315
2316
2317void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2318{
2319 JSONDecoder::decode_json("ID", id, obj);
2320 JSONDecoder::decode_json("DisplayName", display_name, obj);
2321}
2322
2323struct bucket_list_entry {
2324 bool delete_marker;
2325 rgw_obj_key key;
2326 bool is_latest;
2327 real_time mtime;
2328 string etag;
2329 uint64_t size;
2330 string storage_class;
2331 rgw_bucket_entry_owner owner;
2332 uint64_t versioned_epoch;
2333 string rgw_tag;
2334
2335 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2336
2337 void decode_json(JSONObj *obj) {
2338 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2339 JSONDecoder::decode_json("Key", key.name, obj);
2340 JSONDecoder::decode_json("VersionId", key.instance, obj);
2341 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2342 string mtime_str;
2343 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2344
2345 struct tm t;
2346 uint32_t nsec;
2347 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2348 ceph_timespec ts;
2349 ts.tv_sec = (uint64_t)internal_timegm(&t);
2350 ts.tv_nsec = nsec;
2351 mtime = real_clock::from_ceph_timespec(ts);
2352 }
2353 JSONDecoder::decode_json("ETag", etag, obj);
2354 JSONDecoder::decode_json("Size", size, obj);
2355 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2356 JSONDecoder::decode_json("Owner", owner, obj);
2357 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2358 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
11fdf7f2
TL
2359 if (key.instance == "null" && !versioned_epoch) {
2360 key.instance.clear();
2361 }
7c673cae 2362 }
a8e16298
TL
2363
2364 RGWModifyOp get_modify_op() const {
2365 if (delete_marker) {
2366 return CLS_RGW_OP_LINK_OLH_DM;
2367 } else if (!key.instance.empty() && key.instance != "null") {
2368 return CLS_RGW_OP_LINK_OLH;
2369 } else {
2370 return CLS_RGW_OP_ADD;
2371 }
2372 }
7c673cae
FG
2373};
2374
2375struct bucket_list_result {
2376 string name;
2377 string prefix;
2378 string key_marker;
2379 string version_id_marker;
2380 int max_keys;
2381 bool is_truncated;
2382 list<bucket_list_entry> entries;
2383
2384 bucket_list_result() : max_keys(0), is_truncated(false) {}
2385
2386 void decode_json(JSONObj *obj) {
2387 JSONDecoder::decode_json("Name", name, obj);
2388 JSONDecoder::decode_json("Prefix", prefix, obj);
2389 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2390 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2391 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2392 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2393 JSONDecoder::decode_json("Entries", entries, obj);
2394 }
2395};
2396
2397class RGWListBucketShardCR: public RGWCoroutine {
2398 RGWDataSyncEnv *sync_env;
2399 const rgw_bucket_shard& bs;
2400 const string instance_key;
2401 rgw_obj_key marker_position;
2402
2403 bucket_list_result *result;
2404
2405public:
2406 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2407 rgw_obj_key& _marker_position, bucket_list_result *_result)
2408 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2409 instance_key(bs.get_key()), marker_position(_marker_position),
2410 result(_result) {}
2411
2412 int operate() override {
2413 reenter(this) {
2414 yield {
2415 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2416 { "versions" , NULL },
2417 { "format" , "json" },
2418 { "objs-container" , "true" },
2419 { "key-marker" , marker_position.name.c_str() },
2420 { "version-id-marker" , marker_position.instance.c_str() },
2421 { NULL, NULL } };
2422 // don't include tenant in the url, it's already part of instance_key
2423 string p = string("/") + bs.bucket.name;
2424 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2425 }
2426 if (retcode < 0) {
2427 return set_cr_error(retcode);
2428 }
2429 return set_cr_done();
2430 }
2431 return 0;
2432 }
2433};
2434
2435class RGWListBucketIndexLogCR: public RGWCoroutine {
2436 RGWDataSyncEnv *sync_env;
2437 const string instance_key;
2438 string marker;
2439
2440 list<rgw_bi_log_entry> *result;
2441
2442public:
2443 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2444 string& _marker, list<rgw_bi_log_entry> *_result)
2445 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2446 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2447
2448 int operate() override {
2449 reenter(this) {
2450 yield {
2451 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2452 { "format" , "json" },
2453 { "marker" , marker.c_str() },
2454 { "type", "bucket-index" },
2455 { NULL, NULL } };
2456
2457 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2458 }
2459 if (retcode < 0) {
2460 return set_cr_error(retcode);
2461 }
2462 return set_cr_done();
2463 }
2464 return 0;
2465 }
2466};
2467
2468#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2469
2470class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2471 RGWDataSyncEnv *sync_env;
2472
2473 string marker_oid;
2474 rgw_bucket_shard_full_sync_marker sync_marker;
2475
11fdf7f2
TL
2476 RGWSyncTraceNodeRef tn;
2477
7c673cae
FG
2478public:
2479 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2480 const string& _marker_oid,
2481 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2482 sync_env(_sync_env),
2483 marker_oid(_marker_oid),
2484 sync_marker(_marker) {}
2485
11fdf7f2
TL
2486 void set_tn(RGWSyncTraceNodeRef& _tn) {
2487 tn = _tn;
2488 }
2489
7c673cae
FG
2490 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2491 sync_marker.position = new_marker;
2492 sync_marker.count = index_pos;
2493
2494 map<string, bufferlist> attrs;
2495 sync_marker.encode_attr(attrs);
2496
2497 RGWRados *store = sync_env->store;
2498
11fdf7f2
TL
2499 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
2500 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2501 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2502 attrs);
2503 }
91327a77 2504
11fdf7f2 2505 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2506 return new RGWLastCallerWinsCR(sync_env->cct);
2507 }
7c673cae
FG
2508};
2509
2510class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2511 RGWDataSyncEnv *sync_env;
2512
2513 string marker_oid;
2514 rgw_bucket_shard_inc_sync_marker sync_marker;
2515
2516 map<rgw_obj_key, string> key_to_marker;
91327a77
AA
2517
2518 struct operation {
2519 rgw_obj_key key;
2520 bool is_olh;
2521 };
2522 map<string, operation> marker_to_op;
2523 std::set<std::string> pending_olh; // object names with pending olh operations
7c673cae 2524
11fdf7f2
TL
2525 RGWSyncTraceNodeRef tn;
2526
7c673cae 2527 void handle_finish(const string& marker) override {
91327a77
AA
2528 auto iter = marker_to_op.find(marker);
2529 if (iter == marker_to_op.end()) {
7c673cae
FG
2530 return;
2531 }
91327a77
AA
2532 auto& op = iter->second;
2533 key_to_marker.erase(op.key);
2534 reset_need_retry(op.key);
2535 if (op.is_olh) {
2536 pending_olh.erase(op.key.name);
2537 }
2538 marker_to_op.erase(iter);
7c673cae
FG
2539 }
2540
2541public:
2542 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2543 const string& _marker_oid,
2544 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2545 sync_env(_sync_env),
2546 marker_oid(_marker_oid),
2547 sync_marker(_marker) {}
2548
11fdf7f2
TL
2549 void set_tn(RGWSyncTraceNodeRef& _tn) {
2550 tn = _tn;
2551 }
2552
7c673cae
FG
2553 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2554 sync_marker.position = new_marker;
2555
2556 map<string, bufferlist> attrs;
2557 sync_marker.encode_attr(attrs);
2558
2559 RGWRados *store = sync_env->store;
2560
11fdf7f2 2561 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
7c673cae 2562 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
11fdf7f2
TL
2563 store->svc.sysobj,
2564 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
7c673cae
FG
2565 attrs);
2566 }
2567
2568 /*
2569 * create index from key -> <op, marker>, and from marker -> key
2570 * this is useful so that we can insure that we only have one
2571 * entry for any key that is used. This is needed when doing
2572 * incremenatl sync of data, and we don't want to run multiple
2573 * concurrent sync operations for the same bucket shard
2574 * Also, we should make sure that we don't run concurrent operations on the same key with
2575 * different ops.
2576 */
91327a77
AA
2577 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2578 auto result = key_to_marker.emplace(key, marker);
2579 if (!result.second) { // exists
7c673cae
FG
2580 set_need_retry(key);
2581 return false;
2582 }
91327a77
AA
2583 marker_to_op[marker] = operation{key, is_olh};
2584 if (is_olh) {
2585 // prevent other olh ops from starting on this object name
2586 pending_olh.insert(key.name);
2587 }
7c673cae
FG
2588 return true;
2589 }
2590
91327a77
AA
2591 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2592 // serialize olh ops on the same object name
2593 if (is_olh && pending_olh.count(key.name)) {
11fdf7f2 2594 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
91327a77
AA
2595 return false;
2596 }
7c673cae
FG
2597 return (key_to_marker.find(key) == key_to_marker.end());
2598 }
91327a77 2599
11fdf7f2 2600 RGWOrderCallCR *allocate_order_control_cr() override {
91327a77
AA
2601 return new RGWLastCallerWinsCR(sync_env->cct);
2602 }
7c673cae
FG
2603};
2604
2605template <class T, class K>
2606class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2607 RGWDataSyncEnv *sync_env;
2608
2609 RGWBucketInfo *bucket_info;
2610 const rgw_bucket_shard& bs;
2611
2612 rgw_obj_key key;
2613 bool versioned;
11fdf7f2 2614 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
2615 rgw_bucket_entry_owner owner;
2616 real_time timestamp;
2617 RGWModifyOp op;
2618 RGWPendingState op_state;
2619
2620 T entry_marker;
2621 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2622
2623 int sync_status;
2624
2625 stringstream error_ss;
2626
7c673cae
FG
2627 bool error_injection;
2628
2629 RGWDataSyncModule *data_sync_module;
31f18b77
FG
2630
2631 rgw_zone_set zones_trace;
7c673cae 2632
11fdf7f2 2633 RGWSyncTraceNodeRef tn;
7c673cae
FG
2634public:
2635 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2636 RGWBucketInfo *_bucket_info,
2637 const rgw_bucket_shard& bs,
91327a77 2638 const rgw_obj_key& _key, bool _versioned,
11fdf7f2 2639 std::optional<uint64_t> _versioned_epoch,
7c673cae
FG
2640 real_time& _timestamp,
2641 const rgw_bucket_entry_owner& _owner,
2642 RGWModifyOp _op, RGWPendingState _op_state,
11fdf7f2
TL
2643 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
2644 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
2645 sync_env(_sync_env),
2646 bucket_info(_bucket_info), bs(bs),
2647 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2648 owner(_owner),
2649 timestamp(_timestamp), op(_op),
2650 op_state(_op_state),
2651 entry_marker(_entry_marker),
2652 marker_tracker(_marker_tracker),
31f18b77 2653 sync_status(0){
7c673cae 2654 stringstream ss;
91327a77 2655 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
7c673cae 2656 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
7c673cae
FG
2657 set_status("init");
2658
11fdf7f2 2659 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
7c673cae 2660
11fdf7f2 2661 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
7c673cae
FG
2662 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2663
2664 data_sync_module = sync_env->sync_module->get_data_handler();
31f18b77
FG
2665
2666 zones_trace = _zones_trace;
11fdf7f2 2667 zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
7c673cae
FG
2668 }
2669
2670 int operate() override {
2671 reenter(this) {
2672 /* skip entries that are not complete */
2673 if (op_state != CLS_RGW_STATE_COMPLETE) {
2674 goto done;
2675 }
11fdf7f2 2676 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2677 do {
2678 yield {
2679 marker_tracker->reset_need_retry(key);
2680 if (key.name.empty()) {
2681 /* shouldn't happen */
2682 set_status("skipping empty entry");
11fdf7f2 2683 tn->log(0, "entry with empty obj name, skipping");
7c673cae
FG
2684 goto done;
2685 }
2686 if (error_injection &&
2687 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
11fdf7f2 2688 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
7c673cae
FG
2689 retcode = -EIO;
2690 } else if (op == CLS_RGW_OP_ADD ||
2691 op == CLS_RGW_OP_LINK_OLH) {
7c673cae 2692 set_status("syncing obj");
11fdf7f2 2693 tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
31f18b77 2694 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
7c673cae
FG
2695 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2696 set_status("removing obj");
2697 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2698 versioned = true;
2699 }
11fdf7f2 2700 tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2701 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
f64942e4
AA
2702 // our copy of the object is more recent, continue as if it succeeded
2703 if (retcode == -ERR_PRECONDITION_FAILED) {
2704 retcode = 0;
2705 }
7c673cae 2706 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
7c673cae 2707 set_status("creating delete marker");
11fdf7f2 2708 tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
91327a77 2709 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
7c673cae 2710 }
11fdf7f2 2711 tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
7c673cae
FG
2712 }
2713 } while (marker_tracker->need_retry(key));
2714 {
11fdf7f2 2715 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 2716 if (retcode >= 0) {
11fdf7f2 2717 tn->log(10, "success");
7c673cae 2718 } else {
11fdf7f2 2719 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
7c673cae 2720 }
7c673cae
FG
2721 }
2722
2723 if (retcode < 0 && retcode != -ENOENT) {
2724 set_status() << "failed to sync obj; retcode=" << retcode;
11fdf7f2
TL
2725 tn->log(0, SSTR("ERROR: failed to sync object: "
2726 << bucket_shard_str{bs} << "/" << key.name));
7c673cae
FG
2727 error_ss << bucket_shard_str{bs} << "/" << key.name;
2728 sync_status = retcode;
2729 }
2730 if (!error_ss.str().empty()) {
28e407b8 2731 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
7c673cae
FG
2732 }
2733done:
2734 if (sync_status == 0) {
2735 /* update marker */
2736 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2737 yield call(marker_tracker->finish(entry_marker));
2738 sync_status = retcode;
2739 }
2740 if (sync_status < 0) {
2741 return set_cr_error(sync_status);
2742 }
2743 return set_cr_done();
2744 }
2745 return 0;
2746 }
2747};
2748
2749#define BUCKET_SYNC_SPAWN_WINDOW 20
2750
2751class RGWBucketShardFullSyncCR : public RGWCoroutine {
2752 RGWDataSyncEnv *sync_env;
2753 const rgw_bucket_shard& bs;
2754 RGWBucketInfo *bucket_info;
2755 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2756 bucket_list_result list_result;
2757 list<bucket_list_entry>::iterator entries_iter;
91327a77 2758 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2759 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2760 rgw_obj_key list_marker;
2761 bucket_list_entry *entry{nullptr};
7c673cae
FG
2762
2763 int total_entries{0};
2764
2765 int sync_status{0};
2766
2767 const string& status_oid;
2768
31f18b77 2769 rgw_zone_set zones_trace;
11fdf7f2
TL
2770
2771 RGWSyncTraceNodeRef tn;
7c673cae
FG
2772public:
2773 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2774 RGWBucketInfo *_bucket_info,
2775 const std::string& status_oid,
2776 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2777 rgw_bucket_shard_sync_info& sync_info,
2778 RGWSyncTraceNodeRef tn_parent)
7c673cae 2779 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2780 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2781 marker_tracker(sync_env, status_oid, sync_info.full_marker),
11fdf7f2
TL
2782 status_oid(status_oid),
2783 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
2784 SSTR(bucket_shard_str{bs}))) {
31f18b77 2785 zones_trace.insert(sync_env->source_zone);
11fdf7f2 2786 marker_tracker.set_tn(tn);
7c673cae
FG
2787 }
2788
2789 int operate() override;
2790};
2791
2792int RGWBucketShardFullSyncCR::operate()
2793{
2794 int ret;
2795 reenter(this) {
91327a77 2796 list_marker = sync_info.full_marker.position;
7c673cae 2797
91327a77 2798 total_entries = sync_info.full_marker.count;
7c673cae
FG
2799 do {
2800 if (!lease_cr->is_locked()) {
2801 drain_all();
2802 return set_cr_error(-ECANCELED);
2803 }
2804 set_status("listing remote bucket");
11fdf7f2 2805 tn->log(20, "listing bucket for full sync");
7c673cae
FG
2806 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2807 &list_result));
2808 if (retcode < 0 && retcode != -ENOENT) {
2809 set_status("failed bucket listing, going down");
2810 drain_all();
2811 return set_cr_error(retcode);
2812 }
11fdf7f2
TL
2813 if (list_result.entries.size() > 0) {
2814 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
2815 }
7c673cae
FG
2816 entries_iter = list_result.entries.begin();
2817 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2818 if (!lease_cr->is_locked()) {
2819 drain_all();
2820 return set_cr_error(-ECANCELED);
2821 }
11fdf7f2
TL
2822 tn->log(20, SSTR("[full sync] syncing object: "
2823 << bucket_shard_str{bs} << "/" << entries_iter->key));
7c673cae
FG
2824 entry = &(*entries_iter);
2825 total_entries++;
2826 list_marker = entries_iter->key;
2827 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
11fdf7f2 2828 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
7c673cae 2829 } else {
7c673cae
FG
2830 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2831 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2832 false, /* versioned, only matters for object removal */
2833 entry->versioned_epoch, entry->mtime,
a8e16298 2834 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
11fdf7f2 2835 entry->key, &marker_tracker, zones_trace, tn),
7c673cae
FG
2836 false);
2837 }
2838 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2839 yield wait_for_child();
2840 bool again = true;
2841 while (again) {
2842 again = collect(&ret, nullptr);
2843 if (ret < 0) {
11fdf7f2 2844 tn->log(10, "a sync operation returned error");
7c673cae
FG
2845 sync_status = ret;
2846 /* we have reported this error */
2847 }
2848 }
2849 }
2850 }
2851 } while (list_result.is_truncated && sync_status == 0);
2852 set_status("done iterating over all objects");
2853 /* wait for all operations to complete */
2854 while (num_spawned()) {
2855 yield wait_for_child();
2856 bool again = true;
2857 while (again) {
2858 again = collect(&ret, nullptr);
2859 if (ret < 0) {
11fdf7f2 2860 tn->log(10, "a sync operation returned error");
7c673cae
FG
2861 sync_status = ret;
2862 /* we have reported this error */
2863 }
2864 }
2865 }
11fdf7f2 2866 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae
FG
2867 if (!lease_cr->is_locked()) {
2868 return set_cr_error(-ECANCELED);
2869 }
2870 /* update sync state to incremental */
2871 if (sync_status == 0) {
2872 yield {
91327a77 2873 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
7c673cae 2874 map<string, bufferlist> attrs;
91327a77 2875 sync_info.encode_state_attr(attrs);
7c673cae 2876 RGWRados *store = sync_env->store;
11fdf7f2
TL
2877 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2878 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
7c673cae
FG
2879 attrs));
2880 }
2881 } else {
11fdf7f2 2882 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
2883 }
2884 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
11fdf7f2
TL
2885 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
2886 << bucket_shard_str{bs} << " retcode=" << retcode));
7c673cae
FG
2887 return set_cr_error(retcode);
2888 }
2889 if (sync_status < 0) {
2890 return set_cr_error(sync_status);
2891 }
2892 return set_cr_done();
2893 }
2894 return 0;
2895}
2896
91327a77
AA
2897static bool has_olh_epoch(RGWModifyOp op) {
2898 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2899}
2900
7c673cae
FG
2901class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2902 RGWDataSyncEnv *sync_env;
2903 const rgw_bucket_shard& bs;
2904 RGWBucketInfo *bucket_info;
2905 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2906 list<rgw_bi_log_entry> list_result;
91327a77 2907 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
7c673cae 2908 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
91327a77 2909 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2910 rgw_obj_key key;
2911 rgw_bi_log_entry *entry{nullptr};
2912 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2913 bool updated_status{false};
2914 const string& status_oid;
31f18b77 2915 const string& zone_id;
7c673cae
FG
2916
2917 string cur_id;
2918
7c673cae 2919 int sync_status{0};
c07f9fc5 2920 bool syncstopped{false};
7c673cae 2921
11fdf7f2 2922 RGWSyncTraceNodeRef tn;
7c673cae
FG
2923public:
2924 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2925 const rgw_bucket_shard& bs,
2926 RGWBucketInfo *_bucket_info,
2927 const std::string& status_oid,
2928 RGWContinuousLeaseCR *lease_cr,
11fdf7f2
TL
2929 rgw_bucket_shard_sync_info& sync_info,
2930 RGWSyncTraceNodeRef& _tn_parent)
7c673cae 2931 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2932 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2933 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
11fdf7f2
TL
2934 status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
2935 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
2936 SSTR(bucket_shard_str{bs})))
91327a77 2937 {
7c673cae
FG
2938 set_description() << "bucket shard incremental sync bucket="
2939 << bucket_shard_str{bs};
2940 set_status("init");
11fdf7f2 2941 marker_tracker.set_tn(tn);
7c673cae
FG
2942 }
2943
2944 int operate() override;
2945};
2946
2947int RGWBucketShardIncrementalSyncCR::operate()
2948{
2949 int ret;
2950 reenter(this) {
2951 do {
2952 if (!lease_cr->is_locked()) {
2953 drain_all();
11fdf7f2 2954 tn->log(0, "ERROR: lease is not taken, abort");
7c673cae
FG
2955 return set_cr_error(-ECANCELED);
2956 }
11fdf7f2 2957 tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
91327a77
AA
2958 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
2959 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
7c673cae 2960 &list_result));
91327a77
AA
2961 if (retcode < 0 && retcode != -ENOENT) {
2962 /* wait for all operations to complete */
7c673cae 2963 drain_all();
91327a77 2964 return set_cr_error(retcode);
7c673cae
FG
2965 }
2966 squash_map.clear();
91327a77
AA
2967 entries_iter = list_result.begin();
2968 entries_end = list_result.end();
2969 for (; entries_iter != entries_end; ++entries_iter) {
2970 auto e = *entries_iter;
2971 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
2972 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
c07f9fc5 2973 syncstopped = true;
91327a77
AA
2974 entries_end = entries_iter; // dont sync past here
2975 break;
c07f9fc5 2976 }
91327a77 2977 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
c07f9fc5
FG
2978 continue;
2979 }
94b18763
FG
2980 if (e.op == CLS_RGW_OP_CANCEL) {
2981 continue;
2982 }
7c673cae
FG
2983 if (e.state != CLS_RGW_STATE_COMPLETE) {
2984 continue;
2985 }
31f18b77
FG
2986 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2987 continue;
2988 }
7c673cae 2989 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
91327a77
AA
2990 // don't squash over olh entries - we need to apply their olh_epoch
2991 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
2992 continue;
2993 }
7c673cae
FG
2994 if (squash_entry.first <= e.timestamp) {
2995 squash_entry = make_pair<>(e.timestamp, e.op);
2996 }
2997 }
c07f9fc5 2998
7c673cae 2999 entries_iter = list_result.begin();
91327a77 3000 for (; entries_iter != entries_end; ++entries_iter) {
7c673cae
FG
3001 if (!lease_cr->is_locked()) {
3002 drain_all();
3003 return set_cr_error(-ECANCELED);
3004 }
3005 entry = &(*entries_iter);
3006 {
3007 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3008 if (p < 0) {
3009 cur_id = entry->id;
3010 } else {
3011 cur_id = entry->id.substr(p + 1);
3012 }
3013 }
91327a77 3014 sync_info.inc_marker.position = cur_id;
7c673cae 3015
c07f9fc5 3016 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
91327a77 3017 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
c07f9fc5
FG
3018 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3019 continue;
3020 }
3021
7c673cae
FG
3022 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
3023 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
11fdf7f2 3024 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
7c673cae
FG
3025 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3026 continue;
3027 }
3028
11fdf7f2 3029 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
7c673cae
FG
3030
3031 if (!key.ns.empty()) {
3032 set_status() << "skipping entry in namespace: " << entry->object;
11fdf7f2 3033 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
7c673cae
FG
3034 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3035 continue;
3036 }
3037
3038 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
3039 if (entry->op == CLS_RGW_OP_CANCEL) {
3040 set_status() << "canceled operation, skipping";
11fdf7f2
TL
3041 tn->log(20, SSTR("skipping object: "
3042 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
7c673cae
FG
3043 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3044 continue;
3045 }
3046 if (entry->state != CLS_RGW_STATE_COMPLETE) {
3047 set_status() << "non-complete operation, skipping";
11fdf7f2
TL
3048 tn->log(20, SSTR("skipping object: "
3049 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
7c673cae
FG
3050 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3051 continue;
3052 }
31f18b77
FG
3053 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
3054 set_status() << "redundant operation, skipping";
11fdf7f2
TL
3055 tn->log(20, SSTR("skipping object: "
3056 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
31f18b77
FG
3057 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3058 continue;
3059 }
7c673cae
FG
3060 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
3061 set_status() << "squashed operation, skipping";
11fdf7f2
TL
3062 tn->log(20, SSTR("skipping object: "
3063 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
91327a77 3064 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
7c673cae
FG
3065 continue;
3066 }
11fdf7f2
TL
3067 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
3068 tn->log(20, SSTR("syncing object: "
3069 << bucket_shard_str{bs} << "/" << key));
7c673cae 3070 updated_status = false;
91327a77 3071 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
7c673cae
FG
3072 if (!updated_status) {
3073 set_status() << "can't do op, conflicting inflight operation";
3074 updated_status = true;
3075 }
11fdf7f2 3076 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
7c673cae
FG
3077 yield wait_for_child();
3078 bool again = true;
3079 while (again) {
3080 again = collect(&ret, nullptr);
3081 if (ret < 0) {
11fdf7f2 3082 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
7c673cae
FG
3083 sync_status = ret;
3084 /* we have reported this error */
3085 }
3086 }
1adf2230
AA
3087 if (sync_status != 0)
3088 break;
3089 }
3090 if (sync_status != 0) {
3091 /* get error, stop */
3092 break;
7c673cae 3093 }
91327a77 3094 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
7c673cae 3095 set_status() << "can't do op, sync already in progress for object";
11fdf7f2 3096 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
7c673cae
FG
3097 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3098 continue;
3099 }
3100 // yield {
3101 set_status() << "start object sync";
3102 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
11fdf7f2 3103 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
7c673cae 3104 } else {
11fdf7f2 3105 std::optional<uint64_t> versioned_epoch;
7c673cae
FG
3106 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
3107 if (entry->ver.pool < 0) {
3108 versioned_epoch = entry->ver.epoch;
3109 }
11fdf7f2 3110 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
7c673cae
FG
3111 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
3112 spawn(new SyncCR(sync_env, bucket_info, bs, key,
3113 entry->is_versioned(), versioned_epoch,
3114 entry->timestamp, owner, entry->op, entry->state,
11fdf7f2 3115 cur_id, &marker_tracker, entry->zones_trace, tn),
7c673cae
FG
3116 false);
3117 }
3118 // }
3119 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
3120 set_status() << "num_spawned() > spawn_window";
3121 yield wait_for_child();
3122 bool again = true;
3123 while (again) {
3124 again = collect(&ret, nullptr);
3125 if (ret < 0) {
11fdf7f2 3126 tn->log(10, "a sync operation returned error");
7c673cae
FG
3127 sync_status = ret;
3128 /* we have reported this error */
3129 }
3130 /* not waiting for child here */
3131 }
3132 }
3133 }
91327a77 3134 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
c07f9fc5 3135
7c673cae
FG
3136 while (num_spawned()) {
3137 yield wait_for_child();
3138 bool again = true;
3139 while (again) {
3140 again = collect(&ret, nullptr);
3141 if (ret < 0) {
11fdf7f2 3142 tn->log(10, "a sync operation returned error");
7c673cae
FG
3143 sync_status = ret;
3144 /* we have reported this error */
3145 }
3146 /* not waiting for child here */
3147 }
3148 }
11fdf7f2 3149 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
7c673cae 3150
91327a77
AA
3151 if (syncstopped) {
3152 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3153 // still disabled, we'll delete the sync status object. otherwise we'll
3154 // restart full sync to catch any changes that happened while sync was
3155 // disabled
3156 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
3157 return set_cr_done();
3158 }
3159
7c673cae
FG
3160 yield call(marker_tracker.flush());
3161 if (retcode < 0) {
11fdf7f2 3162 tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
7c673cae
FG
3163 return set_cr_error(retcode);
3164 }
3165 if (sync_status < 0) {
11fdf7f2 3166 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
7c673cae
FG
3167 return set_cr_error(sync_status);
3168 }
7c673cae
FG
3169 return set_cr_done();
3170 }
3171 return 0;
3172}
3173
3174int RGWRunBucketSyncCoroutine::operate()
3175{
3176 reenter(this) {
3177 yield {
3178 set_status("acquiring sync lock");
3179 auto store = sync_env->store;
31f18b77 3180 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
11fdf7f2 3181 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
31f18b77
FG
3182 "sync_lock",
3183 cct->_conf->rgw_sync_lease_period,
3184 this));
3185 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
3186 }
3187 while (!lease_cr->is_locked()) {
3188 if (lease_cr->is_done()) {
11fdf7f2 3189 tn->log(5, "failed to take lease");
7c673cae 3190 set_status("lease lock failed, early abort");
a8e16298 3191 drain_all();
7c673cae
FG
3192 return set_cr_error(lease_cr->get_ret_status());
3193 }
3194 set_sleeping(true);
3195 yield;
3196 }
3197
11fdf7f2 3198 tn->log(10, "took lease");
7c673cae
FG
3199 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3200 if (retcode < 0 && retcode != -ENOENT) {
11fdf7f2 3201 tn->log(0, "ERROR: failed to read sync status for bucket");
7c673cae
FG
3202 lease_cr->go_down();
3203 drain_all();
3204 return set_cr_error(retcode);
3205 }
3206
11fdf7f2 3207 tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
7c673cae
FG
3208
3209 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3210 if (retcode == -ENOENT) {
3211 /* bucket instance info has not been synced in yet, fetch it now */
3212 yield {
11fdf7f2 3213 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
7c673cae
FG
3214 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3215
11fdf7f2
TL
3216 meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
3217 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
7c673cae
FG
3218
3219 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3220 string() /* no marker */,
3221 MDLOG_STATUS_COMPLETE,
11fdf7f2
TL
3222 NULL /* no marker tracker */,
3223 tn));
7c673cae
FG
3224 }
3225 if (retcode < 0) {
11fdf7f2 3226 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
7c673cae
FG
3227 lease_cr->go_down();
3228 drain_all();
3229 return set_cr_error(retcode);
3230 }
3231
3232 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3233 }
3234 if (retcode < 0) {
11fdf7f2 3235 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
7c673cae
FG
3236 lease_cr->go_down();
3237 drain_all();
3238 return set_cr_error(retcode);
3239 }
3240
91327a77
AA
3241 do {
3242 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3243 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
a8e16298 3244 if (retcode == -ENOENT) {
11fdf7f2 3245 tn->log(0, "bucket sync disabled");
a8e16298
TL
3246 lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
3247 lease_cr->wakeup();
3248 lease_cr.reset();
3249 drain_all();
3250 return set_cr_done();
3251 }
91327a77 3252 if (retcode < 0) {
11fdf7f2 3253 tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
91327a77
AA
3254 lease_cr->go_down();
3255 drain_all();
3256 return set_cr_error(retcode);
3257 }
7c673cae 3258 }
7c673cae 3259
91327a77
AA
3260 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3261 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3262 status_oid, lease_cr.get(),
11fdf7f2 3263 sync_status, tn));
91327a77 3264 if (retcode < 0) {
11fdf7f2 3265 tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
91327a77
AA
3266 lease_cr->go_down();
3267 drain_all();
3268 return set_cr_error(retcode);
3269 }
7c673cae 3270 }
7c673cae 3271
91327a77
AA
3272 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3273 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3274 status_oid, lease_cr.get(),
11fdf7f2 3275 sync_status, tn));
91327a77 3276 if (retcode < 0) {
11fdf7f2 3277 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
91327a77
AA
3278 lease_cr->go_down();
3279 drain_all();
3280 return set_cr_error(retcode);
3281 }
7c673cae 3282 }
91327a77
AA
3283 // loop back to previous states unless incremental sync returns normally
3284 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
7c673cae
FG
3285
3286 lease_cr->go_down();
3287 drain_all();
3288 return set_cr_done();
3289 }
3290
3291 return 0;
3292}
3293
3294RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3295{
11fdf7f2 3296 return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
7c673cae
FG
3297}
3298
3299int RGWBucketSyncStatusManager::init()
3300{
11fdf7f2 3301 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
7c673cae 3302 if (!conn) {
11fdf7f2 3303 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
7c673cae
FG
3304 return -EINVAL;
3305 }
3306
11fdf7f2 3307 int ret = http_manager.start();
7c673cae 3308 if (ret < 0) {
11fdf7f2 3309 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
7c673cae
FG
3310 return ret;
3311 }
3312
3313
3314 const string key = bucket.get_key();
3315
3316 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3317 { NULL, NULL } };
3318
3319 string path = string("/admin/metadata/bucket.instance");
3320
3321 bucket_instance_meta_info result;
3322 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3323 if (ret < 0) {
11fdf7f2 3324 ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
7c673cae
FG
3325 return ret;
3326 }
3327
3328 RGWBucketInfo& bi = result.data.get_bucket_info();
3329 num_shards = bi.num_shards;
3330
3331 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3332
3333 sync_module.reset(new RGWDefaultSyncModuleInstance());
3334
3335 int effective_num_shards = (num_shards ? num_shards : 1);
3336
3337 auto async_rados = store->get_async_rados();
3338
3339 for (int i = 0; i < effective_num_shards; i++) {
11fdf7f2
TL
3340 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
3341 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
7c673cae 3342 if (ret < 0) {
11fdf7f2 3343 ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
7c673cae
FG
3344 return ret;
3345 }
3346 source_logs[i] = l;
3347 }
3348
3349 return 0;
3350}
3351
3352int RGWBucketSyncStatusManager::init_sync_status()
3353{
3354 list<RGWCoroutinesStack *> stacks;
3355
3356 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3357 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3358 RGWRemoteBucketLog *l = iter->second;
3359 stack->call(l->init_sync_status_cr());
3360
3361 stacks.push_back(stack);
3362 }
3363
3364 return cr_mgr.run(stacks);
3365}
3366
3367int RGWBucketSyncStatusManager::read_sync_status()
3368{
3369 list<RGWCoroutinesStack *> stacks;
3370
3371 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3372 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3373 RGWRemoteBucketLog *l = iter->second;
3374 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3375
3376 stacks.push_back(stack);
3377 }
3378
3379 int ret = cr_mgr.run(stacks);
3380 if (ret < 0) {
11fdf7f2 3381 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3382 << bucket_str{bucket} << dendl;
3383 return ret;
3384 }
3385
3386 return 0;
3387}
3388
3389int RGWBucketSyncStatusManager::run()
3390{
3391 list<RGWCoroutinesStack *> stacks;
3392
3393 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3394 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3395 RGWRemoteBucketLog *l = iter->second;
3396 stack->call(l->run_sync_cr());
3397
3398 stacks.push_back(stack);
3399 }
3400
3401 int ret = cr_mgr.run(stacks);
3402 if (ret < 0) {
11fdf7f2 3403 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
7c673cae
FG
3404 << bucket_str{bucket} << dendl;
3405 return ret;
3406 }
3407
3408 return 0;
3409}
3410
11fdf7f2
TL
3411unsigned RGWBucketSyncStatusManager::get_subsys() const
3412{
3413 return dout_subsys;
3414}
3415
3416std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
3417{
3418 auto zone = std::string_view{source_zone};
3419 return out << "bucket sync zone:" << zone.substr(0, 8)
3420 << " bucket:" << bucket.name << ' ';
3421}
3422
7c673cae
FG
3423string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3424 const rgw_bucket_shard& bs)
3425{
3426 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3427}
3428
11fdf7f2
TL
3429string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
3430 const rgw_obj& obj)
3431{
3432 return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
3433 obj.key.name + ":" + obj.key.instance;
3434}
3435
b32b8144
FG
3436class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3437 static constexpr int max_concurrent_shards = 16;
3438 RGWRados *const store;
3439 RGWDataSyncEnv *const env;
3440 const int num_shards;
3441 rgw_bucket_shard bs;
3442
3443 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3444 Vector::iterator i, end;
3445
3446 public:
3447 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3448 int num_shards, const rgw_bucket& bucket,
3449 Vector *status)
3450 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3451 store(store), env(env), num_shards(num_shards),
3452 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3453 i(status->begin()), end(status->end())
3454 {}
3455
3456 bool spawn_next() override {
3457 if (i == end) {
3458 return false;
3459 }
3460 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3461 ++i;
3462 ++bs.shard_id;
3463 return true;
3464 }
3465};
3466
11fdf7f2 3467int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
28e407b8 3468 const RGWBucketInfo& bucket_info,
b32b8144
FG
3469 std::vector<rgw_bucket_shard_sync_info> *status)
3470{
28e407b8 3471 const auto num_shards = bucket_info.num_shards;
b32b8144 3472 status->clear();
28e407b8 3473 status->resize(std::max<size_t>(1, num_shards));
b32b8144
FG
3474
3475 RGWDataSyncEnv env;
3476 RGWSyncModuleInstanceRef module; // null sync module
11fdf7f2
TL
3477 env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
3478 nullptr, nullptr, nullptr, source_zone, module);
b32b8144
FG
3479
3480 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
28e407b8
AA
3481 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3482 bucket_info.bucket, status));
b32b8144
FG
3483}
3484
7c673cae
FG
3485
3486// TODO: move into rgw_data_sync_trim.cc
3487#undef dout_prefix
3488#define dout_prefix (*_dout << "data trim: ")
3489
3490namespace {
3491
3492/// return the marker that it's safe to trim up to
3493const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3494{
3495 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3496}
3497
3498/// comparison operator for take_min_markers()
3499bool operator<(const rgw_data_sync_marker& lhs,
3500 const rgw_data_sync_marker& rhs)
3501{
3502 // sort by stable marker
3503 return get_stable_marker(lhs) < get_stable_marker(rhs);
3504}
3505
3506/// populate the container starting with 'dest' with the minimum stable marker
3507/// of each shard for all of the peers in [first, last)
3508template <typename IterIn, typename IterOut>
3509void take_min_markers(IterIn first, IterIn last, IterOut dest)
3510{
3511 if (first == last) {
3512 return;
3513 }
3514 // initialize markers with the first peer's
3515 auto m = dest;
3516 for (auto &shard : first->sync_markers) {
3517 *m = std::move(shard.second);
3518 ++m;
3519 }
3520 // for remaining peers, replace with smaller markers
3521 for (auto p = first + 1; p != last; ++p) {
3522 m = dest;
3523 for (auto &shard : p->sync_markers) {
3524 if (shard.second < *m) {
3525 *m = std::move(shard.second);
3526 }
3527 ++m;
3528 }
3529 }
3530}
3531
3532} // anonymous namespace
3533
3534class DataLogTrimCR : public RGWCoroutine {
3535 RGWRados *store;
3536 RGWHTTPManager *http;
3537 const int num_shards;
3538 const std::string& zone_id; //< my zone id
3539 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3540 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3541 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3542 int ret{0};
3543
3544 public:
3545 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3546 int num_shards, std::vector<std::string>& last_trim)
3547 : RGWCoroutine(store->ctx()), store(store), http(http),
3548 num_shards(num_shards),
11fdf7f2
TL
3549 zone_id(store->svc.zone->get_zone().id),
3550 peer_status(store->svc.zone->get_zone_conn_map().size()),
7c673cae
FG
3551 min_shard_markers(num_shards),
3552 last_trim(last_trim)
3553 {}
3554
3555 int operate() override;
3556};
3557
3558int DataLogTrimCR::operate()
3559{
3560 reenter(this) {
3561 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3562 set_status("fetching sync status");
3563 yield {
3564 // query data sync status from each sync peer
3565 rgw_http_param_pair params[] = {
3566 { "type", "data" },
3567 { "status", nullptr },
3568 { "source-zone", zone_id.c_str() },
3569 { nullptr, nullptr }
3570 };
3571
3572 auto p = peer_status.begin();
11fdf7f2 3573 for (auto& c : store->svc.zone->get_zone_conn_map()) {
7c673cae
FG
3574 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3575 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3576 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3577 false);
3578 ++p;
3579 }
3580 }
3581
3582 // must get a successful reply from all peers to consider trimming
3583 ret = 0;
3584 while (ret == 0 && num_spawned() > 0) {
3585 yield wait_for_child();
3586 collect_next(&ret);
3587 }
3588 drain_all();
3589
3590 if (ret < 0) {
3591 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3592 return set_cr_error(ret);
3593 }
3594
3595 ldout(cct, 10) << "trimming log shards" << dendl;
3596 set_status("trimming log shards");
3597 yield {
3598 // determine the minimum marker for each shard
3599 take_min_markers(peer_status.begin(), peer_status.end(),
3600 min_shard_markers.begin());
3601
3602 for (int i = 0; i < num_shards; i++) {
3603 const auto& m = min_shard_markers[i];
3604 auto& stable = get_stable_marker(m);
3605 if (stable <= last_trim[i]) {
3606 continue;
3607 }
3608 ldout(cct, 10) << "trimming log shard " << i
3609 << " at marker=" << stable
3610 << " last_trim=" << last_trim[i] << dendl;
3611 using TrimCR = RGWSyncLogTrimCR;
3612 spawn(new TrimCR(store, store->data_log->get_oid(i),
3613 stable, &last_trim[i]),
3614 true);
3615 }
3616 }
3617 return set_cr_done();
3618 }
3619 return 0;
3620}
3621
a8e16298
TL
3622RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
3623 RGWHTTPManager *http,
3624 int num_shards,
3625 std::vector<std::string>& markers)
3626{
3627 return new DataLogTrimCR(store, http, num_shards, markers);
3628}
3629
7c673cae
FG
3630class DataLogTrimPollCR : public RGWCoroutine {
3631 RGWRados *store;
3632 RGWHTTPManager *http;
3633 const int num_shards;
3634 const utime_t interval; //< polling interval
3635 const std::string lock_oid; //< use first data log shard for lock
3636 const std::string lock_cookie;
3637 std::vector<std::string> last_trim; //< last trimmed marker per shard
3638
3639 public:
3640 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3641 int num_shards, utime_t interval)
3642 : RGWCoroutine(store->ctx()), store(store), http(http),
3643 num_shards(num_shards), interval(interval),
3644 lock_oid(store->data_log->get_oid(0)),
3645 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3646 last_trim(num_shards)
3647 {}
3648
3649 int operate() override;
3650};
3651
3652int DataLogTrimPollCR::operate()
3653{
3654 reenter(this) {
3655 for (;;) {
3656 set_status("sleeping");
3657 wait(interval);
3658
3659 // request a 'data_trim' lock that covers the entire wait interval to
3660 // prevent other gateways from attempting to trim for the duration
3661 set_status("acquiring trim lock");
3662 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
11fdf7f2 3663 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
7c673cae
FG
3664 "data_trim", lock_cookie,
3665 interval.sec()));
3666 if (retcode < 0) {
3667 // if the lock is already held, go back to sleep and try again later
3668 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3669 << interval.sec() << "s" << dendl;
3670 continue;
3671 }
3672
3673 set_status("trimming");
3674 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3675
3676 // note that the lock is not released. this is intentional, as it avoids
3677 // duplicating this work in other gateways
3678 }
3679 }
3680 return 0;
3681}
3682
3683RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3684 RGWHTTPManager *http,
3685 int num_shards, utime_t interval)
3686{
3687 return new DataLogTrimPollCR(store, http, num_shards, interval);
3688}