]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
515bdcb52286a35071f571ecb441c95c0abd05a6
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/utility/string_ref.hpp>
5
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/errno.h"
12
13 #include "rgw_common.h"
14 #include "rgw_rados.h"
15 #include "rgw_zone.h"
16 #include "rgw_sync.h"
17 #include "rgw_data_sync.h"
18 #include "rgw_rest_conn.h"
19 #include "rgw_cr_rados.h"
20 #include "rgw_cr_rest.h"
21 #include "rgw_http_client.h"
22 #include "rgw_bucket.h"
23 #include "rgw_metadata.h"
24 #include "rgw_sync_counters.h"
25 #include "rgw_sync_module.h"
26 #include "rgw_sync_log_trim.h"
27
28 #include "cls/lock/cls_lock_client.h"
29
30 #include "services/svc_zone.h"
31 #include "services/svc_sync_modules.h"
32
33 #include "include/random.h"
34
35 #include <boost/asio/yield.hpp>
36
37 #define dout_subsys ceph_subsys_rgw
38
39 #undef dout_prefix
40 #define dout_prefix (*_dout << "data sync: ")
41
42 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
43 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
44 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
45 static string bucket_status_oid_prefix = "bucket.sync-status";
46 static string object_status_oid_prefix = "bucket.sync-status";
47
48
49 void rgw_datalog_info::decode_json(JSONObj *obj) {
50 JSONDecoder::decode_json("num_objects", num_shards, obj);
51 }
52
53 void rgw_datalog_entry::decode_json(JSONObj *obj) {
54 JSONDecoder::decode_json("key", key, obj);
55 utime_t ut;
56 JSONDecoder::decode_json("timestamp", ut, obj);
57 timestamp = ut.to_real_time();
58 }
59
60 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
61 JSONDecoder::decode_json("marker", marker, obj);
62 JSONDecoder::decode_json("truncated", truncated, obj);
63 JSONDecoder::decode_json("entries", entries, obj);
64 };
65
66 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
67 static constexpr int MAX_CONCURRENT_SHARDS = 16;
68
69 RGWDataSyncEnv *env;
70 const int num_shards;
71 int shard_id{0};;
72
73 map<uint32_t, rgw_data_sync_marker>& markers;
74
75 public:
76 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
77 map<uint32_t, rgw_data_sync_marker>& markers)
78 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
79 env(env), num_shards(num_shards), markers(markers)
80 {}
81 bool spawn_next() override;
82 };
83
84 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
85 {
86 if (shard_id >= num_shards) {
87 return false;
88 }
89 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
90 spawn(new CR(env->async_rados, env->store->svc.sysobj,
91 rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
92 &markers[shard_id]),
93 false);
94 shard_id++;
95 return true;
96 }
97
98 class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
99 static constexpr int MAX_CONCURRENT_SHARDS = 16;
100
101 RGWDataSyncEnv *env;
102
103 uint64_t max_entries;
104 int num_shards;
105 int shard_id{0};
106
107 string marker;
108 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys;
109
110 public:
111 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
112 std::vector<RGWRadosGetOmapKeysCR::ResultPtr>& omapkeys)
113 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
114 max_entries(_max_entries), num_shards(_num_shards), omapkeys(omapkeys)
115 {}
116 bool spawn_next() override;
117 };
118
119 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
120 {
121 if (shard_id >= num_shards)
122 return false;
123
124 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
125 auto& shard_keys = omapkeys[shard_id];
126 shard_keys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
127 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->svc.zone->get_zone_params().log_pool, error_oid),
128 marker, max_entries, shard_keys), false);
129
130 ++shard_id;
131 return true;
132 }
133
134 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
135 RGWDataSyncEnv *sync_env;
136 rgw_data_sync_status *sync_status;
137
138 public:
139 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
140 rgw_data_sync_status *_status)
141 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
142 {}
143 int operate() override;
144 };
145
146 int RGWReadDataSyncStatusCoroutine::operate()
147 {
148 reenter(this) {
149 // read sync info
150 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
151 yield {
152 bool empty_on_enoent = false; // fail on ENOENT
153 call(new ReadInfoCR(sync_env->async_rados, sync_env->store->svc.sysobj,
154 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
155 &sync_status->sync_info, empty_on_enoent));
156 }
157 if (retcode < 0) {
158 ldout(sync_env->cct, 4) << "failed to read sync status info with "
159 << cpp_strerror(retcode) << dendl;
160 return set_cr_error(retcode);
161 }
162 // read shard markers
163 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
164 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
165 sync_status->sync_markers));
166 if (retcode < 0) {
167 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
168 << cpp_strerror(retcode) << dendl;
169 return set_cr_error(retcode);
170 }
171 return set_cr_done();
172 }
173 return 0;
174 }
175
176 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
177 RGWDataSyncEnv *sync_env;
178
179 RGWRESTReadResource *http_op;
180
181 int shard_id;
182 RGWDataChangesLogInfo *shard_info;
183
184 public:
185 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
186 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
187 sync_env(_sync_env),
188 http_op(NULL),
189 shard_id(_shard_id),
190 shard_info(_shard_info) {
191 }
192
193 ~RGWReadRemoteDataLogShardInfoCR() override {
194 if (http_op) {
195 http_op->put();
196 }
197 }
198
199 int operate() override {
200 reenter(this) {
201 yield {
202 char buf[16];
203 snprintf(buf, sizeof(buf), "%d", shard_id);
204 rgw_http_param_pair pairs[] = { { "type" , "data" },
205 { "id", buf },
206 { "info" , NULL },
207 { NULL, NULL } };
208
209 string p = "/admin/log/";
210
211 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
212
213 init_new_io(http_op);
214
215 int ret = http_op->aio_read();
216 if (ret < 0) {
217 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
218 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
219 return set_cr_error(ret);
220 }
221
222 return io_block(0);
223 }
224 yield {
225 int ret = http_op->wait(shard_info);
226 if (ret < 0) {
227 return set_cr_error(ret);
228 }
229 return set_cr_done();
230 }
231 }
232 return 0;
233 }
234 };
235
236 struct read_remote_data_log_response {
237 string marker;
238 bool truncated;
239 list<rgw_data_change_log_entry> entries;
240
241 read_remote_data_log_response() : truncated(false) {}
242
243 void decode_json(JSONObj *obj) {
244 JSONDecoder::decode_json("marker", marker, obj);
245 JSONDecoder::decode_json("truncated", truncated, obj);
246 JSONDecoder::decode_json("entries", entries, obj);
247 };
248 };
249
250 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
251 RGWDataSyncEnv *sync_env;
252
253 RGWRESTReadResource *http_op = nullptr;
254
255 int shard_id;
256 const std::string& marker;
257 string *pnext_marker;
258 list<rgw_data_change_log_entry> *entries;
259 bool *truncated;
260
261 read_remote_data_log_response response;
262 std::optional<PerfGuard> timer;
263
264 public:
265 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
266 const std::string& marker, string *pnext_marker,
267 list<rgw_data_change_log_entry> *_entries,
268 bool *_truncated)
269 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
270 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
271 entries(_entries), truncated(_truncated) {
272 }
273 ~RGWReadRemoteDataLogShardCR() override {
274 if (http_op) {
275 http_op->put();
276 }
277 }
278
279 int operate() override {
280 reenter(this) {
281 yield {
282 char buf[16];
283 snprintf(buf, sizeof(buf), "%d", shard_id);
284 rgw_http_param_pair pairs[] = { { "type" , "data" },
285 { "id", buf },
286 { "marker", marker.c_str() },
287 { "extra-info", "true" },
288 { NULL, NULL } };
289
290 string p = "/admin/log/";
291
292 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
293
294 init_new_io(http_op);
295
296 if (sync_env->counters) {
297 timer.emplace(sync_env->counters, sync_counters::l_poll);
298 }
299 int ret = http_op->aio_read();
300 if (ret < 0) {
301 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
302 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
303 if (sync_env->counters) {
304 sync_env->counters->inc(sync_counters::l_poll_err);
305 }
306 return set_cr_error(ret);
307 }
308
309 return io_block(0);
310 }
311 yield {
312 timer.reset();
313 int ret = http_op->wait(&response);
314 if (ret < 0) {
315 if (sync_env->counters && ret != -ENOENT) {
316 sync_env->counters->inc(sync_counters::l_poll_err);
317 }
318 return set_cr_error(ret);
319 }
320 entries->clear();
321 entries->swap(response.entries);
322 *pnext_marker = response.marker;
323 *truncated = response.truncated;
324 return set_cr_done();
325 }
326 }
327 return 0;
328 }
329 };
330
331 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
332 RGWDataSyncEnv *sync_env;
333
334 int num_shards;
335 map<int, RGWDataChangesLogInfo> *datalog_info;
336
337 int shard_id;
338 #define READ_DATALOG_MAX_CONCURRENT 10
339
340 public:
341 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
342 int _num_shards,
343 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
344 sync_env(_sync_env), num_shards(_num_shards),
345 datalog_info(_datalog_info), shard_id(0) {}
346 bool spawn_next() override;
347 };
348
349 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
350 if (shard_id >= num_shards) {
351 return false;
352 }
353 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
354 shard_id++;
355 return true;
356 }
357
358 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
359 RGWDataSyncEnv *sync_env;
360 RGWRESTReadResource *http_op;
361
362 int shard_id;
363 string marker;
364 uint32_t max_entries;
365 rgw_datalog_shard_data *result;
366
367 public:
368 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
369 const string& _marker, uint32_t _max_entries,
370 rgw_datalog_shard_data *_result)
371 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
372 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
373
374 int send_request() override {
375 RGWRESTConn *conn = sync_env->conn;
376 RGWRados *store = sync_env->store;
377
378 char buf[32];
379 snprintf(buf, sizeof(buf), "%d", shard_id);
380
381 char max_entries_buf[32];
382 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
383
384 const char *marker_key = (marker.empty() ? "" : "marker");
385
386 rgw_http_param_pair pairs[] = { { "type", "data" },
387 { "id", buf },
388 { "max-entries", max_entries_buf },
389 { marker_key, marker.c_str() },
390 { NULL, NULL } };
391
392 string p = "/admin/log/";
393
394 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
395 init_new_io(http_op);
396
397 int ret = http_op->aio_read();
398 if (ret < 0) {
399 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
400 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
401 http_op->put();
402 return ret;
403 }
404
405 return 0;
406 }
407
408 int request_complete() override {
409 int ret = http_op->wait(result);
410 http_op->put();
411 if (ret < 0 && ret != -ENOENT) {
412 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
413 return ret;
414 }
415 return 0;
416 }
417 };
418
419 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
420 RGWDataSyncEnv *sync_env;
421
422 map<int, string> shards;
423 int max_entries_per_shard;
424 map<int, rgw_datalog_shard_data> *result;
425
426 map<int, string>::iterator iter;
427 #define READ_DATALOG_MAX_CONCURRENT 10
428
429 public:
430 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
431 map<int, string>& _shards,
432 int _max_entries_per_shard,
433 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
434 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
435 result(_result) {
436 shards.swap(_shards);
437 iter = shards.begin();
438 }
439 bool spawn_next() override;
440 };
441
442 bool RGWListRemoteDataLogCR::spawn_next() {
443 if (iter == shards.end()) {
444 return false;
445 }
446
447 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
448 ++iter;
449 return true;
450 }
451
452 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
453 static constexpr uint32_t lock_duration = 30;
454 RGWDataSyncEnv *sync_env;
455 RGWRados *store;
456 const rgw_pool& pool;
457 const uint32_t num_shards;
458
459 string sync_status_oid;
460
461 string lock_name;
462 string cookie;
463 rgw_data_sync_status *status;
464 map<int, RGWDataChangesLogInfo> shards_info;
465
466 RGWSyncTraceNodeRef tn;
467 public:
468 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
469 uint64_t instance_id,
470 RGWSyncTraceNodeRef& _tn_parent,
471 rgw_data_sync_status *status)
472 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
473 pool(store->svc.zone->get_zone_params().log_pool),
474 num_shards(num_shards), status(status),
475 tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
476 lock_name = "sync_lock";
477
478 status->sync_info.instance_id = instance_id;
479
480 #define COOKIE_LEN 16
481 char buf[COOKIE_LEN + 1];
482
483 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
484 cookie = buf;
485
486 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
487
488 }
489
490 int operate() override {
491 int ret;
492 reenter(this) {
493 using LockCR = RGWSimpleRadosLockCR;
494 yield call(new LockCR(sync_env->async_rados, store,
495 rgw_raw_obj{pool, sync_status_oid},
496 lock_name, cookie, lock_duration));
497 if (retcode < 0) {
498 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
499 return set_cr_error(retcode);
500 }
501 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
502 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
503 rgw_raw_obj{pool, sync_status_oid},
504 status->sync_info));
505 if (retcode < 0) {
506 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
507 return set_cr_error(retcode);
508 }
509
510 /* take lock again, we just recreated the object */
511 yield call(new LockCR(sync_env->async_rados, store,
512 rgw_raw_obj{pool, sync_status_oid},
513 lock_name, cookie, lock_duration));
514 if (retcode < 0) {
515 tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
516 return set_cr_error(retcode);
517 }
518
519 tn->log(10, "took lease");
520
521 /* fetch current position in logs */
522 yield {
523 RGWRESTConn *conn = store->svc.zone->get_zone_conn_by_id(sync_env->source_zone);
524 if (!conn) {
525 tn->log(0, SSTR("ERROR: connection to zone " << sync_env->source_zone << " does not exist!"));
526 return set_cr_error(-EIO);
527 }
528 for (uint32_t i = 0; i < num_shards; i++) {
529 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
530 }
531 }
532 while (collect(&ret, NULL)) {
533 if (ret < 0) {
534 tn->log(0, SSTR("ERROR: failed to read remote data log shards"));
535 return set_state(RGWCoroutine_Error);
536 }
537 yield;
538 }
539 yield {
540 for (uint32_t i = 0; i < num_shards; i++) {
541 RGWDataChangesLogInfo& info = shards_info[i];
542 auto& marker = status->sync_markers[i];
543 marker.next_step_marker = info.marker;
544 marker.timestamp = info.last_update;
545 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
546 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
547 spawn(new WriteMarkerCR(sync_env->async_rados, store->svc.sysobj,
548 rgw_raw_obj{pool, oid}, marker), true);
549 }
550 }
551 while (collect(&ret, NULL)) {
552 if (ret < 0) {
553 tn->log(0, SSTR("ERROR: failed to write data sync status markers"));
554 return set_state(RGWCoroutine_Error);
555 }
556 yield;
557 }
558
559 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
560 yield call(new WriteInfoCR(sync_env->async_rados, store->svc.sysobj,
561 rgw_raw_obj{pool, sync_status_oid},
562 status->sync_info));
563 if (retcode < 0) {
564 tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
565 return set_cr_error(retcode);
566 }
567 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
568 rgw_raw_obj{pool, sync_status_oid},
569 lock_name, cookie));
570 return set_cr_done();
571 }
572 return 0;
573 }
574 };
575
576 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
577 {
578 rgw_http_param_pair pairs[] = { { "type", "data" },
579 { NULL, NULL } };
580
581 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
582 if (ret < 0) {
583 ldpp_dout(dpp, 0) << "ERROR: failed to fetch datalog info" << dendl;
584 return ret;
585 }
586
587 ldpp_dout(dpp, 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
588
589 return 0;
590 }
591
592 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
593 {
594 rgw_datalog_info log_info;
595 int ret = read_log_info(&log_info);
596 if (ret < 0) {
597 return ret;
598 }
599
600 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
601 }
602
603 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
604 {
605 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
606 }
607
608 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger,
609 RGWSyncTraceManager *_sync_tracer, RGWSyncModuleInstanceRef& _sync_module,
610 PerfCounters* counters)
611 {
612 sync_env.init(dpp, store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
613 _sync_tracer, _source_zone, _sync_module, counters);
614
615 if (initialized) {
616 return 0;
617 }
618
619 int ret = http_manager.start();
620 if (ret < 0) {
621 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
622 return ret;
623 }
624
625 tn = sync_env.sync_tracer->add_node(sync_env.sync_tracer->root_node, "data");
626
627 initialized = true;
628
629 return 0;
630 }
631
632 void RGWRemoteDataLog::finish()
633 {
634 stop();
635 }
636
637 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
638 {
639 // cannot run concurrently with run_sync(), so run in a separate manager
640 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
641 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
642 int ret = http_manager.start();
643 if (ret < 0) {
644 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
645 return ret;
646 }
647 RGWDataSyncEnv sync_env_local = sync_env;
648 sync_env_local.http_manager = &http_manager;
649 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
650 http_manager.stop();
651 return ret;
652 }
653
654 int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
655 {
656 // cannot run concurrently with run_sync(), so run in a separate manager
657 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
658 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
659 int ret = http_manager.start();
660 if (ret < 0) {
661 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
662 return ret;
663 }
664 RGWDataSyncEnv sync_env_local = sync_env;
665 sync_env_local.http_manager = &http_manager;
666 std::vector<RGWRadosGetOmapKeysCR::ResultPtr> omapkeys;
667 omapkeys.resize(num_shards);
668 uint64_t max_entries{1};
669 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, omapkeys));
670 http_manager.stop();
671
672 if (ret == 0) {
673 for (int i = 0; i < num_shards; i++) {
674 if (omapkeys[i]->entries.size() != 0) {
675 recovering_shards.insert(i);
676 }
677 }
678 }
679
680 return ret;
681 }
682
683 int RGWRemoteDataLog::init_sync_status(int num_shards)
684 {
685 rgw_data_sync_status sync_status;
686 sync_status.sync_info.num_shards = num_shards;
687
688 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
689 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
690 int ret = http_manager.start();
691 if (ret < 0) {
692 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
693 return ret;
694 }
695 RGWDataSyncEnv sync_env_local = sync_env;
696 sync_env_local.http_manager = &http_manager;
697 auto instance_id = ceph::util::generate_random_number<uint64_t>();
698 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, tn, &sync_status));
699 http_manager.stop();
700 return ret;
701 }
702
703 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
704 {
705 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
706 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
707 return string(buf);
708 }
709
710 struct read_metadata_list {
711 string marker;
712 bool truncated;
713 list<string> keys;
714 int count;
715
716 read_metadata_list() : truncated(false), count(0) {}
717
718 void decode_json(JSONObj *obj) {
719 JSONDecoder::decode_json("marker", marker, obj);
720 JSONDecoder::decode_json("truncated", truncated, obj);
721 JSONDecoder::decode_json("keys", keys, obj);
722 JSONDecoder::decode_json("count", count, obj);
723 }
724 };
725
726 struct bucket_instance_meta_info {
727 string key;
728 obj_version ver;
729 utime_t mtime;
730 RGWBucketInstanceMetadataObject data;
731
732 bucket_instance_meta_info() {}
733
734 void decode_json(JSONObj *obj) {
735 JSONDecoder::decode_json("key", key, obj);
736 JSONDecoder::decode_json("ver", ver, obj);
737 JSONDecoder::decode_json("mtime", mtime, obj);
738 JSONDecoder::decode_json("data", data, obj);
739 }
740 };
741
742 class RGWListBucketIndexesCR : public RGWCoroutine {
743 RGWDataSyncEnv *sync_env;
744
745 RGWRados *store;
746
747 rgw_data_sync_status *sync_status;
748 int num_shards;
749
750 int req_ret;
751 int ret;
752
753 list<string>::iterator iter;
754
755 RGWShardedOmapCRManager *entries_index;
756
757 string oid_prefix;
758
759 string path;
760 bucket_instance_meta_info meta_info;
761 string key;
762 string s;
763 int i;
764
765 bool failed;
766 bool truncated;
767 read_metadata_list result;
768
769 public:
770 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
771 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
772 store(sync_env->store), sync_status(_sync_status),
773 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false), truncated(false) {
774 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
775 path = "/admin/metadata/bucket.instance";
776 num_shards = sync_status->sync_info.num_shards;
777 }
778 ~RGWListBucketIndexesCR() override {
779 delete entries_index;
780 }
781
782 int operate() override {
783 reenter(this) {
784 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
785 store->svc.zone->get_zone_params().log_pool,
786 oid_prefix);
787 yield; // yield so OmapAppendCRs can start
788
789 do {
790 yield {
791 string entrypoint = string("/admin/metadata/bucket.instance");
792
793 rgw_http_param_pair pairs[] = {{"max-entries", "1000"},
794 {"marker", result.marker.c_str()},
795 {NULL, NULL}};
796
797 call(new RGWReadRESTResourceCR<read_metadata_list>(store->ctx(), sync_env->conn, sync_env->http_manager,
798 entrypoint, pairs, &result));
799 }
800 if (retcode < 0) {
801 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.instance" << dendl;
802 return set_cr_error(retcode);
803 }
804
805 for (iter = result.keys.begin(); iter != result.keys.end(); ++iter) {
806 ldout(sync_env->cct, 20) << "list metadata: section=bucket.instance key=" << *iter << dendl;
807 key = *iter;
808
809 yield {
810 rgw_http_param_pair pairs[] = {{"key", key.c_str()},
811 {NULL, NULL}};
812
813 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
814 }
815
816 num_shards = meta_info.data.get_bucket_info().num_shards;
817 if (num_shards > 0) {
818 for (i = 0; i < num_shards; i++) {
819 char buf[16];
820 snprintf(buf, sizeof(buf), ":%d", i);
821 s = key + buf;
822 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
823 }
824 } else {
825 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
826 }
827 }
828 truncated = result.truncated;
829 } while (truncated);
830
831 yield {
832 if (!entries_index->finish()) {
833 failed = true;
834 }
835 }
836 if (!failed) {
837 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
838 int shard_id = (int)iter->first;
839 rgw_data_sync_marker& marker = iter->second;
840 marker.total_entries = entries_index->get_total_entries(shard_id);
841 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
842 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
843 marker),
844 true);
845 }
846 } else {
847 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
848 EIO, string("failed to build bucket instances map")));
849 }
850 while (collect(&ret, NULL)) {
851 if (ret < 0) {
852 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
853 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
854 req_ret = ret;
855 }
856 yield;
857 }
858
859 drain_all();
860 if (req_ret < 0) {
861 yield return set_cr_error(req_ret);
862 }
863 yield return set_cr_done();
864 }
865 return 0;
866 }
867 };
868
869 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
870
871 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
872 RGWDataSyncEnv *sync_env;
873
874 string marker_oid;
875 rgw_data_sync_marker sync_marker;
876
877 map<string, string> key_to_marker;
878 map<string, string> marker_to_key;
879
880 void handle_finish(const string& marker) override {
881 map<string, string>::iterator iter = marker_to_key.find(marker);
882 if (iter == marker_to_key.end()) {
883 return;
884 }
885 key_to_marker.erase(iter->second);
886 reset_need_retry(iter->second);
887 marker_to_key.erase(iter);
888 }
889
890 RGWSyncTraceNodeRef tn;
891
892 public:
893 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
894 const string& _marker_oid,
895 const rgw_data_sync_marker& _marker,
896 RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
897 sync_env(_sync_env),
898 marker_oid(_marker_oid),
899 sync_marker(_marker),
900 tn(_tn) {}
901
902 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
903 sync_marker.marker = new_marker;
904 sync_marker.pos = index_pos;
905 sync_marker.timestamp = timestamp;
906
907 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
908 RGWRados *store = sync_env->store;
909
910 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
911 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
912 sync_marker);
913 }
914
915 /*
916 * create index from key -> marker, and from marker -> key
917 * this is useful so that we can insure that we only have one
918 * entry for any key that is used. This is needed when doing
919 * incremenatl sync of data, and we don't want to run multiple
920 * concurrent sync operations for the same bucket shard
921 */
922 bool index_key_to_marker(const string& key, const string& marker) {
923 if (key_to_marker.find(key) != key_to_marker.end()) {
924 set_need_retry(key);
925 return false;
926 }
927 key_to_marker[key] = marker;
928 marker_to_key[marker] = key;
929 return true;
930 }
931
932 RGWOrderCallCR *allocate_order_control_cr() override {
933 return new RGWLastCallerWinsCR(sync_env->cct);
934 }
935 };
936
937 // ostream wrappers to print buckets without copying strings
938 struct bucket_str {
939 const rgw_bucket& b;
940 explicit bucket_str(const rgw_bucket& b) : b(b) {}
941 };
942 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
943 auto& b = rhs.b;
944 if (!b.tenant.empty()) {
945 out << b.tenant << '/';
946 }
947 out << b.name;
948 if (!b.bucket_id.empty()) {
949 out << ':' << b.bucket_id;
950 }
951 return out;
952 }
953
954 struct bucket_str_noinstance {
955 const rgw_bucket& b;
956 explicit bucket_str_noinstance(const rgw_bucket& b) : b(b) {}
957 };
958 std::ostream& operator<<(std::ostream& out, const bucket_str_noinstance& rhs) {
959 auto& b = rhs.b;
960 if (!b.tenant.empty()) {
961 out << b.tenant << '/';
962 }
963 out << b.name;
964 return out;
965 }
966
967 struct bucket_shard_str {
968 const rgw_bucket_shard& bs;
969 explicit bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
970 };
971 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
972 auto& bs = rhs.bs;
973 out << bucket_str{bs.bucket};
974 if (bs.shard_id >= 0) {
975 out << ':' << bs.shard_id;
976 }
977 return out;
978 }
979
980 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
981 RGWDataSyncEnv *sync_env;
982 rgw_bucket_shard bs;
983 RGWBucketInfo bucket_info;
984 rgw_bucket_shard_sync_info sync_status;
985 RGWMetaSyncEnv meta_sync_env;
986
987 const std::string status_oid;
988
989 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
990 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
991
992 RGWSyncTraceNodeRef tn;
993
994 public:
995 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, const RGWSyncTraceNodeRef& _tn_parent)
996 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
997 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
998 tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
999 SSTR(bucket_shard_str{bs}))) {
1000 }
1001 ~RGWRunBucketSyncCoroutine() override {
1002 if (lease_cr) {
1003 lease_cr->abort();
1004 }
1005 }
1006
1007 int operate() override;
1008 };
1009
1010 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1011 RGWDataSyncEnv *sync_env;
1012
1013 string raw_key;
1014 string entry_marker;
1015
1016 rgw_bucket_shard bs;
1017
1018 int sync_status;
1019
1020 bufferlist md_bl;
1021
1022 RGWDataSyncShardMarkerTrack *marker_tracker;
1023
1024 boost::intrusive_ptr<RGWOmapAppend> error_repo;
1025 bool remove_from_repo;
1026
1027 set<string> keys;
1028
1029 RGWSyncTraceNodeRef tn;
1030 public:
1031 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1032 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
1033 RGWOmapAppend *_error_repo, bool _remove_from_repo, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
1034 sync_env(_sync_env),
1035 raw_key(_raw_key), entry_marker(_entry_marker),
1036 sync_status(0),
1037 marker_tracker(_marker_tracker),
1038 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1039 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
1040 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", raw_key);
1041 }
1042
1043 int operate() override {
1044 reenter(this) {
1045 do {
1046 yield {
1047 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1048 &bs.bucket, &bs.shard_id);
1049 if (ret < 0) {
1050 return set_cr_error(-EIO);
1051 }
1052 if (marker_tracker) {
1053 marker_tracker->reset_need_retry(raw_key);
1054 }
1055 tn->log(0, SSTR("triggering sync of bucket/shard " << bucket_shard_str{bs}));
1056 call(new RGWRunBucketSyncCoroutine(sync_env, bs, tn));
1057 }
1058 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1059
1060 sync_status = retcode;
1061
1062 if (sync_status == -ENOENT) {
1063 // this was added when 'tenant/' was added to datalog entries, because
1064 // preexisting tenant buckets could never sync and would stay in the
1065 // error_repo forever
1066 tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << raw_key));
1067 sync_status = 0;
1068 }
1069
1070 if (sync_status < 0) {
1071 // write actual sync failures for 'radosgw-admin sync error list'
1072 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1073 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1074 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1075 if (retcode < 0) {
1076 tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
1077 }
1078 }
1079 if (error_repo && !error_repo->append(raw_key)) {
1080 tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
1081 }
1082 } else if (error_repo && remove_from_repo) {
1083 keys = {raw_key};
1084 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1085 if (retcode < 0) {
1086 tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
1087 << error_repo->get_obj() << " retcode=" << retcode));
1088 }
1089 }
1090 /* FIXME: what do do in case of error */
1091 if (marker_tracker && !entry_marker.empty()) {
1092 /* update marker */
1093 yield call(marker_tracker->finish(entry_marker));
1094 }
1095 if (sync_status == 0) {
1096 sync_status = retcode;
1097 }
1098 if (sync_status < 0) {
1099 return set_cr_error(sync_status);
1100 }
1101 return set_cr_done();
1102 }
1103 return 0;
1104 }
1105 };
1106
1107 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1108 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1109
1110 class RGWDataSyncShardCR : public RGWCoroutine {
1111 RGWDataSyncEnv *sync_env;
1112
1113 rgw_pool pool;
1114
1115 uint32_t shard_id;
1116 rgw_data_sync_marker sync_marker;
1117
1118 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
1119 std::set<std::string> entries;
1120 std::set<std::string>::iterator iter;
1121
1122 string oid;
1123
1124 RGWDataSyncShardMarkerTrack *marker_tracker;
1125
1126 std::string next_marker;
1127 list<rgw_data_change_log_entry> log_entries;
1128 list<rgw_data_change_log_entry>::iterator log_iter;
1129 bool truncated;
1130
1131 Mutex inc_lock;
1132 Cond inc_cond;
1133
1134 boost::asio::coroutine incremental_cr;
1135 boost::asio::coroutine full_cr;
1136
1137
1138 set<string> modified_shards;
1139 set<string> current_modified;
1140
1141 set<string>::iterator modified_iter;
1142
1143 int total_entries;
1144
1145 int spawn_window;
1146
1147 bool *reset_backoff;
1148
1149 set<string> spawned_keys;
1150
1151 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1152 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1153 string status_oid;
1154
1155
1156 string error_oid;
1157 RGWOmapAppend *error_repo;
1158 std::set<std::string> error_entries;
1159 string error_marker;
1160 int max_error_entries;
1161
1162 ceph::coarse_real_time error_retry_time;
1163
1164 #define RETRY_BACKOFF_SECS_MIN 60
1165 #define RETRY_BACKOFF_SECS_DEFAULT 60
1166 #define RETRY_BACKOFF_SECS_MAX 600
1167 uint32_t retry_backoff_secs;
1168
1169 RGWSyncTraceNodeRef tn;
1170 public:
1171 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1172 rgw_pool& _pool,
1173 uint32_t _shard_id, const rgw_data_sync_marker& _marker,
1174 RGWSyncTraceNodeRef& _tn,
1175 bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1176 sync_env(_sync_env),
1177 pool(_pool),
1178 shard_id(_shard_id),
1179 sync_marker(_marker),
1180 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1181 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1182 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1183 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT), tn(_tn) {
1184 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1185 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1186 error_oid = status_oid + ".retry";
1187 }
1188
1189 ~RGWDataSyncShardCR() override {
1190 delete marker_tracker;
1191 if (lease_cr) {
1192 lease_cr->abort();
1193 }
1194 if (error_repo) {
1195 error_repo->put();
1196 }
1197 }
1198
1199 void append_modified_shards(set<string>& keys) {
1200 Mutex::Locker l(inc_lock);
1201 modified_shards.insert(keys.begin(), keys.end());
1202 }
1203
1204 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1205 delete marker_tracker;
1206 marker_tracker = mt;
1207 }
1208
1209 int operate() override {
1210 int r;
1211 while (true) {
1212 switch (sync_marker.state) {
1213 case rgw_data_sync_marker::FullSync:
1214 r = full_sync();
1215 if (r < 0) {
1216 if (r != -EBUSY) {
1217 tn->log(10, SSTR("full sync failed (r=" << r << ")"));
1218 }
1219 return set_cr_error(r);
1220 }
1221 return 0;
1222 case rgw_data_sync_marker::IncrementalSync:
1223 r = incremental_sync();
1224 if (r < 0) {
1225 if (r != -EBUSY) {
1226 tn->log(10, SSTR("incremental sync failed (r=" << r << ")"));
1227 }
1228 return set_cr_error(r);
1229 }
1230 return 0;
1231 default:
1232 return set_cr_error(-EIO);
1233 }
1234 }
1235 return 0;
1236 }
1237
1238 void init_lease_cr() {
1239 set_status("acquiring sync lock");
1240 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1241 string lock_name = "sync_lock";
1242 if (lease_cr) {
1243 lease_cr->abort();
1244 }
1245 RGWRados *store = sync_env->store;
1246 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1247 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
1248 lock_name, lock_duration, this));
1249 lease_stack.reset(spawn(lease_cr.get(), false));
1250 }
1251
1252 int full_sync() {
1253 #define OMAP_GET_MAX_ENTRIES 100
1254 int max_entries = OMAP_GET_MAX_ENTRIES;
1255 reenter(&full_cr) {
1256 tn->log(10, "start full sync");
1257 yield init_lease_cr();
1258 while (!lease_cr->is_locked()) {
1259 if (lease_cr->is_done()) {
1260 tn->log(5, "failed to take lease");
1261 set_status("lease lock failed, early abort");
1262 drain_all();
1263 return set_cr_error(lease_cr->get_ret_status());
1264 }
1265 set_sleeping(true);
1266 yield;
1267 }
1268 tn->log(10, "took lease");
1269 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1270 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
1271 total_entries = sync_marker.pos;
1272 do {
1273 if (!lease_cr->is_locked()) {
1274 stop_spawned_services();
1275 drain_all();
1276 return set_cr_error(-ECANCELED);
1277 }
1278 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1279 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
1280 sync_marker.marker, max_entries, omapkeys));
1281 if (retcode < 0) {
1282 tn->log(0, SSTR("ERROR: RGWRadosGetOmapKeysCR() returned ret=" << retcode));
1283 lease_cr->go_down();
1284 drain_all();
1285 return set_cr_error(retcode);
1286 }
1287 entries = std::move(omapkeys->entries);
1288 if (entries.size() > 0) {
1289 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1290 }
1291 tn->log(20, SSTR("retrieved " << entries.size() << " entries to sync"));
1292 iter = entries.begin();
1293 for (; iter != entries.end(); ++iter) {
1294 tn->log(20, SSTR("full sync: " << *iter));
1295 total_entries++;
1296 if (!marker_tracker->start(*iter, total_entries, real_time())) {
1297 tn->log(0, SSTR("ERROR: cannot start syncing " << *iter << ". Duplicate entry?"));
1298 } else {
1299 // fetch remote and write locally
1300 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false, tn), false);
1301 }
1302 sync_marker.marker = *iter;
1303
1304 while ((int)num_spawned() > spawn_window) {
1305 set_status() << "num_spawned() > spawn_window";
1306 yield wait_for_child();
1307 int ret;
1308 while (collect(&ret, lease_stack.get())) {
1309 if (ret < 0) {
1310 tn->log(10, "a sync operation returned error");
1311 }
1312 }
1313 }
1314 }
1315 } while (omapkeys->more);
1316 omapkeys.reset();
1317
1318 drain_all_but_stack(lease_stack.get());
1319
1320 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1321
1322 yield {
1323 /* update marker to reflect we're done with full sync */
1324 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1325 sync_marker.marker = sync_marker.next_step_marker;
1326 sync_marker.next_step_marker.clear();
1327 RGWRados *store = sync_env->store;
1328 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1329 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
1330 sync_marker));
1331 }
1332 if (retcode < 0) {
1333 tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
1334 lease_cr->go_down();
1335 drain_all();
1336 return set_cr_error(retcode);
1337 }
1338 // keep lease and transition to incremental_sync()
1339 }
1340 return 0;
1341 }
1342
1343 int incremental_sync() {
1344 reenter(&incremental_cr) {
1345 tn->log(10, "start incremental sync");
1346 if (lease_cr) {
1347 tn->log(10, "lease already held from full sync");
1348 } else {
1349 yield init_lease_cr();
1350 while (!lease_cr->is_locked()) {
1351 if (lease_cr->is_done()) {
1352 tn->log(5, "failed to take lease");
1353 set_status("lease lock failed, early abort");
1354 drain_all();
1355 return set_cr_error(lease_cr->get_ret_status());
1356 }
1357 set_sleeping(true);
1358 yield;
1359 }
1360 set_status("lease acquired");
1361 tn->log(10, "took lease");
1362 }
1363 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1364 rgw_raw_obj(pool, error_oid),
1365 1 /* no buffer */);
1366 error_repo->get();
1367 spawn(error_repo, false);
1368 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker, tn));
1369 do {
1370 if (!lease_cr->is_locked()) {
1371 stop_spawned_services();
1372 drain_all();
1373 return set_cr_error(-ECANCELED);
1374 }
1375 current_modified.clear();
1376 inc_lock.Lock();
1377 current_modified.swap(modified_shards);
1378 inc_lock.Unlock();
1379
1380 if (current_modified.size() > 0) {
1381 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1382 }
1383 /* process out of band updates */
1384 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1385 yield {
1386 tn->log(20, SSTR("received async update notification: " << *modified_iter));
1387 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false, tn), false);
1388 }
1389 }
1390
1391 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1392 /* process bucket shards that previously failed */
1393 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
1394 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1395 error_marker, max_error_entries, omapkeys));
1396 error_entries = std::move(omapkeys->entries);
1397 tn->log(20, SSTR("read error repo, got " << error_entries.size() << " entries"));
1398 iter = error_entries.begin();
1399 for (; iter != error_entries.end(); ++iter) {
1400 error_marker = *iter;
1401 tn->log(20, SSTR("handle error entry: " << error_marker));
1402 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true, tn), false);
1403 }
1404 if (!omapkeys->more) {
1405 if (error_marker.empty() && error_entries.empty()) {
1406 /* the retry repo is empty, we back off a bit before calling it again */
1407 retry_backoff_secs *= 2;
1408 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1409 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1410 }
1411 } else {
1412 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1413 }
1414 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1415 error_marker.clear();
1416 }
1417 }
1418 omapkeys.reset();
1419
1420 #define INCREMENTAL_MAX_ENTRIES 100
1421 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker));
1422 spawned_keys.clear();
1423 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
1424 &next_marker, &log_entries, &truncated));
1425 if (retcode < 0 && retcode != -ENOENT) {
1426 tn->log(0, SSTR("ERROR: failed to read remote data log info: ret=" << retcode));
1427 stop_spawned_services();
1428 drain_all();
1429 return set_cr_error(retcode);
1430 }
1431
1432 if (log_entries.size() > 0) {
1433 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
1434 }
1435
1436 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1437 tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
1438 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1439 tn->log(20, SSTR("skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard"));
1440 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1441 continue;
1442 }
1443 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1444 tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
1445 } else {
1446 /*
1447 * don't spawn the same key more than once. We can do that as long as we don't yield
1448 */
1449 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1450 spawned_keys.insert(log_iter->entry.key);
1451 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false, tn), false);
1452 if (retcode < 0) {
1453 stop_spawned_services();
1454 drain_all();
1455 return set_cr_error(retcode);
1456 }
1457 }
1458 }
1459 }
1460 while ((int)num_spawned() > spawn_window) {
1461 set_status() << "num_spawned() > spawn_window";
1462 yield wait_for_child();
1463 int ret;
1464 while (collect(&ret, lease_stack.get())) {
1465 if (ret < 0) {
1466 tn->log(10, "a sync operation returned error");
1467 /* we have reported this error */
1468 }
1469 /* not waiting for child here */
1470 }
1471 }
1472
1473 tn->log(20, SSTR("shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1474 << " next_marker=" << next_marker << " truncated=" << truncated));
1475 if (!next_marker.empty()) {
1476 sync_marker.marker = next_marker;
1477 } else if (!log_entries.empty()) {
1478 sync_marker.marker = log_entries.back().log_id;
1479 }
1480 if (!truncated) {
1481 // we reached the end, wait a while before checking for more
1482 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
1483 yield wait(get_idle_interval());
1484 }
1485 } while (true);
1486 }
1487 return 0;
1488 }
1489
1490 utime_t get_idle_interval() const {
1491 #define INCREMENTAL_INTERVAL 20
1492 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1493 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1494 auto now = ceph::coarse_real_clock::now();
1495 if (error_retry_time > now) {
1496 auto d = error_retry_time - now;
1497 if (interval > d) {
1498 interval = d;
1499 }
1500 }
1501 }
1502 // convert timespan -> time_point -> utime_t
1503 return utime_t(ceph::coarse_real_clock::zero() + interval);
1504 }
1505
1506 void stop_spawned_services() {
1507 lease_cr->go_down();
1508 if (error_repo) {
1509 error_repo->finish();
1510 error_repo->put();
1511 error_repo = NULL;
1512 }
1513 }
1514 };
1515
1516 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1517 RGWDataSyncEnv *sync_env;
1518
1519 rgw_pool pool;
1520
1521 uint32_t shard_id;
1522 rgw_data_sync_marker sync_marker;
1523
1524 RGWSyncTraceNodeRef tn;
1525 public:
1526 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, const rgw_pool& _pool,
1527 uint32_t _shard_id, rgw_data_sync_marker& _marker,
1528 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, false),
1529 sync_env(_sync_env),
1530 pool(_pool),
1531 shard_id(_shard_id),
1532 sync_marker(_marker) {
1533 tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
1534 }
1535
1536 RGWCoroutine *alloc_cr() override {
1537 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, tn, backoff_ptr());
1538 }
1539
1540 RGWCoroutine *alloc_finisher_cr() override {
1541 RGWRados *store = sync_env->store;
1542 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store->svc.sysobj,
1543 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1544 &sync_marker);
1545 }
1546
1547 void append_modified_shards(set<string>& keys) {
1548 Mutex::Locker l(cr_lock());
1549
1550 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1551 if (!cr) {
1552 return;
1553 }
1554
1555 cr->append_modified_shards(keys);
1556 }
1557 };
1558
1559 class RGWDataSyncCR : public RGWCoroutine {
1560 RGWDataSyncEnv *sync_env;
1561 uint32_t num_shards;
1562
1563 rgw_data_sync_status sync_status;
1564
1565 RGWDataSyncShardMarkerTrack *marker_tracker;
1566
1567 Mutex shard_crs_lock;
1568 map<int, RGWDataSyncShardControlCR *> shard_crs;
1569
1570 bool *reset_backoff;
1571
1572 RGWSyncTraceNodeRef tn;
1573
1574 RGWDataSyncModule *data_sync_module{nullptr};
1575 public:
1576 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1577 sync_env(_sync_env),
1578 num_shards(_num_shards),
1579 marker_tracker(NULL),
1580 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1581 reset_backoff(_reset_backoff), tn(_tn) {
1582
1583 }
1584
1585 ~RGWDataSyncCR() override {
1586 for (auto iter : shard_crs) {
1587 iter.second->put();
1588 }
1589 }
1590
1591 int operate() override {
1592 reenter(this) {
1593
1594 /* read sync status */
1595 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1596
1597 data_sync_module = sync_env->sync_module->get_data_handler();
1598
1599 if (retcode < 0 && retcode != -ENOENT) {
1600 tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
1601 return set_cr_error(retcode);
1602 }
1603
1604 /* state: init status */
1605 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1606 tn->log(20, SSTR("init"));
1607 sync_status.sync_info.num_shards = num_shards;
1608 uint64_t instance_id;
1609 instance_id = ceph::util::generate_random_number<uint64_t>();
1610 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, tn, &sync_status));
1611 if (retcode < 0) {
1612 tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
1613 return set_cr_error(retcode);
1614 }
1615 // sets state = StateBuildingFullSyncMaps
1616
1617 *reset_backoff = true;
1618 }
1619
1620 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1621
1622 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1623 tn->log(10, SSTR("building full sync maps"));
1624 /* call sync module init here */
1625 sync_status.sync_info.num_shards = num_shards;
1626 yield call(data_sync_module->init_sync(sync_env));
1627 if (retcode < 0) {
1628 tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
1629 return set_cr_error(retcode);
1630 }
1631 /* state: building full sync maps */
1632 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1633 if (retcode < 0) {
1634 tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
1635 return set_cr_error(retcode);
1636 }
1637 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1638
1639 /* update new state */
1640 yield call(set_sync_info_cr());
1641 if (retcode < 0) {
1642 tn->log(0, SSTR("ERROR: failed to write sync status, retcode=" << retcode));
1643 return set_cr_error(retcode);
1644 }
1645
1646 *reset_backoff = true;
1647 }
1648
1649 yield call(data_sync_module->start_sync(sync_env));
1650
1651 yield {
1652 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1653 tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
1654 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1655 iter != sync_status.sync_markers.end(); ++iter) {
1656 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->svc.zone->get_zone_params().log_pool,
1657 iter->first, iter->second, tn);
1658 cr->get();
1659 shard_crs_lock.Lock();
1660 shard_crs[iter->first] = cr;
1661 shard_crs_lock.Unlock();
1662 spawn(cr, true);
1663 }
1664 }
1665 }
1666
1667 return set_cr_done();
1668 }
1669 return 0;
1670 }
1671
1672 RGWCoroutine *set_sync_info_cr() {
1673 RGWRados *store = sync_env->store;
1674 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store->svc.sysobj,
1675 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1676 sync_status.sync_info);
1677 }
1678
1679 void wakeup(int shard_id, set<string>& keys) {
1680 Mutex::Locker l(shard_crs_lock);
1681 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1682 if (iter == shard_crs.end()) {
1683 return;
1684 }
1685 iter->second->append_modified_shards(keys);
1686 iter->second->wakeup();
1687 }
1688 };
1689
1690 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1691 public:
1692 RGWDefaultDataSyncModule() {}
1693
1694 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1695 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1696 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1697 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1698 };
1699
1700 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1701 RGWDefaultDataSyncModule data_handler;
1702 public:
1703 RGWDefaultSyncModuleInstance() {}
1704 RGWDataSyncModule *get_data_handler() override {
1705 return &data_handler;
1706 }
1707 bool supports_user_writes() override {
1708 return true;
1709 }
1710 };
1711
1712 int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1713 {
1714 instance->reset(new RGWDefaultSyncModuleInstance());
1715 return 0;
1716 }
1717
1718 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1719 {
1720 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1721 std::nullopt,
1722 key, std::nullopt, versioned_epoch,
1723 true, zones_trace, sync_env->counters);
1724 }
1725
1726 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1727 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1728 {
1729 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1730 bucket_info, key, versioned, versioned_epoch,
1731 NULL, NULL, false, &mtime, zones_trace);
1732 }
1733
1734 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1735 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1736 {
1737 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1738 bucket_info, key, versioned, versioned_epoch,
1739 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1740 }
1741
1742 class RGWArchiveDataSyncModule : public RGWDefaultDataSyncModule {
1743 public:
1744 RGWArchiveDataSyncModule() {}
1745
1746 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1747 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1748 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1749 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1750 };
1751
1752 class RGWArchiveSyncModuleInstance : public RGWDefaultSyncModuleInstance {
1753 RGWArchiveDataSyncModule data_handler;
1754 public:
1755 RGWArchiveSyncModuleInstance() {}
1756 RGWDataSyncModule *get_data_handler() override {
1757 return &data_handler;
1758 }
1759 RGWMetadataHandler *alloc_bucket_meta_handler() override {
1760 return RGWArchiveBucketMetaHandlerAllocator::alloc();
1761 }
1762 RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
1763 return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
1764 }
1765 };
1766
1767 int RGWArchiveSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance)
1768 {
1769 instance->reset(new RGWArchiveSyncModuleInstance());
1770 return 0;
1771 }
1772
1773 RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1774 {
1775 ldout(sync_env->cct, 5) << "SYNC_ARCHIVE: sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
1776 if (!bucket_info.versioned() ||
1777 (bucket_info.flags & BUCKET_VERSIONS_SUSPENDED)) {
1778 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: enabling object versioning for archive bucket" << dendl;
1779 bucket_info.flags = (bucket_info.flags & ~BUCKET_VERSIONS_SUSPENDED) | BUCKET_VERSIONED;
1780 int op_ret = sync_env->store->put_bucket_instance_info(bucket_info, false, real_time(), NULL);
1781 if (op_ret < 0) {
1782 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: sync_object: error versioning archive bucket" << dendl;
1783 return NULL;
1784 }
1785 }
1786
1787 std::optional<rgw_obj_key> dest_key;
1788
1789 if (versioned_epoch.value_or(0) == 0) { /* force version if not set */
1790 versioned_epoch = 0;
1791 dest_key = key;
1792 if (key.instance.empty()) {
1793 sync_env->store->gen_rand_obj_instance_name(&(*dest_key));
1794 }
1795 }
1796
1797 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1798 bucket_info, std::nullopt,
1799 key, dest_key, versioned_epoch,
1800 true, zones_trace, nullptr);
1801 }
1802
1803 RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1804 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1805 {
1806 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: remove_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
1807 return NULL;
1808 }
1809
1810 RGWCoroutine *RGWArchiveDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1811 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1812 {
1813 ldout(sync_env->cct, 0) << "SYNC_ARCHIVE: create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
1814 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
1815 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1816 bucket_info, key, versioned, versioned_epoch,
1817 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1818 }
1819
1820 class RGWDataSyncControlCR : public RGWBackoffControlCR
1821 {
1822 RGWDataSyncEnv *sync_env;
1823 uint32_t num_shards;
1824
1825 RGWSyncTraceNodeRef tn;
1826
1827 static constexpr bool exit_on_error = false; // retry on all errors
1828 public:
1829 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards,
1830 RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1831 sync_env(_sync_env), num_shards(_num_shards) {
1832 tn = sync_env->sync_tracer->add_node(_tn_parent, "sync");
1833 }
1834
1835 RGWCoroutine *alloc_cr() override {
1836 return new RGWDataSyncCR(sync_env, num_shards, tn, backoff_ptr());
1837 }
1838
1839 void wakeup(int shard_id, set<string>& keys) {
1840 Mutex& m = cr_lock();
1841
1842 m.Lock();
1843 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1844 if (!cr) {
1845 m.Unlock();
1846 return;
1847 }
1848
1849 cr->get();
1850 m.Unlock();
1851
1852 if (cr) {
1853 tn->log(20, SSTR("notify shard=" << shard_id << " keys=" << keys));
1854 cr->wakeup(shard_id, keys);
1855 }
1856
1857 cr->put();
1858 }
1859 };
1860
1861 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1862 RWLock::RLocker rl(lock);
1863 if (!data_sync_cr) {
1864 return;
1865 }
1866 data_sync_cr->wakeup(shard_id, keys);
1867 }
1868
1869 int RGWRemoteDataLog::run_sync(int num_shards)
1870 {
1871 lock.get_write();
1872 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards, tn);
1873 data_sync_cr->get(); // run() will drop a ref, so take another
1874 lock.unlock();
1875
1876 int r = run(data_sync_cr);
1877
1878 lock.get_write();
1879 data_sync_cr->put();
1880 data_sync_cr = NULL;
1881 lock.unlock();
1882
1883 if (r < 0) {
1884 ldpp_dout(dpp, 0) << "ERROR: failed to run sync" << dendl;
1885 return r;
1886 }
1887 return 0;
1888 }
1889
1890 int RGWDataSyncStatusManager::init()
1891 {
1892 RGWZone *zone_def;
1893
1894 if (!store->svc.zone->find_zone_by_id(source_zone, &zone_def)) {
1895 ldpp_dout(this, 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1896 return -EIO;
1897 }
1898
1899 if (!store->svc.sync_modules->get_manager()->supports_data_export(zone_def->tier_type)) {
1900 return -ENOTSUP;
1901 }
1902
1903 const RGWZoneParams& zone_params = store->svc.zone->get_zone_params();
1904
1905 if (sync_module == nullptr) {
1906 sync_module = store->get_sync_module();
1907 }
1908
1909 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
1910 if (!conn) {
1911 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1912 return -EINVAL;
1913 }
1914
1915 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1916
1917 int r = source_log.init(source_zone, conn, error_logger, store->get_sync_tracer(),
1918 sync_module, counters);
1919 if (r < 0) {
1920 ldpp_dout(this, 0) << "ERROR: failed to init remote log, r=" << r << dendl;
1921 finalize();
1922 return r;
1923 }
1924
1925 rgw_datalog_info datalog_info;
1926 r = source_log.read_log_info(&datalog_info);
1927 if (r < 0) {
1928 ldpp_dout(this, 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1929 finalize();
1930 return r;
1931 }
1932
1933 num_shards = datalog_info.num_shards;
1934
1935 for (int i = 0; i < num_shards; i++) {
1936 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1937 }
1938
1939 return 0;
1940 }
1941
1942 void RGWDataSyncStatusManager::finalize()
1943 {
1944 delete error_logger;
1945 error_logger = nullptr;
1946 }
1947
1948 unsigned RGWDataSyncStatusManager::get_subsys() const
1949 {
1950 return dout_subsys;
1951 }
1952
1953 std::ostream& RGWDataSyncStatusManager::gen_prefix(std::ostream& out) const
1954 {
1955 auto zone = std::string_view{source_zone};
1956 return out << "data sync zone:" << zone.substr(0, 8) << ' ';
1957 }
1958
1959 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1960 {
1961 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1962 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1963
1964 return string(buf);
1965 }
1966
1967 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1968 {
1969 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1970 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1971
1972 return string(buf);
1973 }
1974
1975 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1976 const rgw_bucket& bucket, int shard_id,
1977 RGWSyncErrorLogger *_error_logger,
1978 RGWSyncTraceManager *_sync_tracer,
1979 RGWSyncModuleInstanceRef& _sync_module)
1980 {
1981 conn = _conn;
1982 source_zone = _source_zone;
1983 bs.bucket = bucket;
1984 bs.shard_id = shard_id;
1985
1986 sync_env.init(dpp, store->ctx(), store, conn, async_rados, http_manager,
1987 _error_logger, _sync_tracer, source_zone, _sync_module, nullptr);
1988
1989 return 0;
1990 }
1991
1992 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1993 RGWDataSyncEnv *sync_env;
1994 const string instance_key;
1995
1996 rgw_bucket_index_marker_info *info;
1997
1998 public:
1999 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
2000 const rgw_bucket_shard& bs,
2001 rgw_bucket_index_marker_info *_info)
2002 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2003 instance_key(bs.get_key()), info(_info) {}
2004
2005 int operate() override {
2006 reenter(this) {
2007 yield {
2008 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
2009 { "bucket-instance", instance_key.c_str() },
2010 { "info" , NULL },
2011 { NULL, NULL } };
2012
2013 string p = "/admin/log/";
2014 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
2015 }
2016 if (retcode < 0) {
2017 return set_cr_error(retcode);
2018 }
2019 return set_cr_done();
2020 }
2021 return 0;
2022 }
2023 };
2024
2025 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
2026 RGWDataSyncEnv *sync_env;
2027
2028 rgw_bucket_shard bs;
2029 const string sync_status_oid;
2030
2031 rgw_bucket_shard_sync_info& status;
2032
2033 rgw_bucket_index_marker_info info;
2034 public:
2035 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2036 const rgw_bucket_shard& bs,
2037 rgw_bucket_shard_sync_info& _status)
2038 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2039 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2040 status(_status)
2041 {}
2042
2043 int operate() override {
2044 reenter(this) {
2045 /* fetch current position in logs */
2046 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
2047 if (retcode < 0 && retcode != -ENOENT) {
2048 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
2049 return set_cr_error(retcode);
2050 }
2051 yield {
2052 auto store = sync_env->store;
2053 rgw_raw_obj obj(store->svc.zone->get_zone_params().log_pool, sync_status_oid);
2054
2055 if (info.syncstopped) {
2056 call(new RGWRadosRemoveCR(store, obj));
2057 } else {
2058 status.state = rgw_bucket_shard_sync_info::StateFullSync;
2059 status.inc_marker.position = info.max_marker;
2060 map<string, bufferlist> attrs;
2061 status.encode_all_attrs(attrs);
2062 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj, obj, attrs));
2063 }
2064 }
2065 if (info.syncstopped) {
2066 retcode = -ENOENT;
2067 }
2068 if (retcode < 0) {
2069 return set_cr_error(retcode);
2070 }
2071 return set_cr_done();
2072 }
2073 return 0;
2074 }
2075 };
2076
2077 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
2078 {
2079 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
2080 }
2081
2082 #define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
2083
2084 template <class T>
2085 static bool decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
2086 {
2087 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
2088 if (iter == attrs.end()) {
2089 *val = T();
2090 return false;
2091 }
2092
2093 auto biter = iter->second.cbegin();
2094 try {
2095 decode(*val, biter);
2096 } catch (buffer::error& err) {
2097 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
2098 return false;
2099 }
2100 return true;
2101 }
2102
2103 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
2104 {
2105 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "state", &state)) {
2106 decode_attr(cct, attrs, "state", &state);
2107 }
2108 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "full_marker", &full_marker)) {
2109 decode_attr(cct, attrs, "full_marker", &full_marker);
2110 }
2111 if (!decode_attr(cct, attrs, BUCKET_SYNC_ATTR_PREFIX "inc_marker", &inc_marker)) {
2112 decode_attr(cct, attrs, "inc_marker", &inc_marker);
2113 }
2114 }
2115
2116 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
2117 {
2118 encode_state_attr(attrs);
2119 full_marker.encode_attr(attrs);
2120 inc_marker.encode_attr(attrs);
2121 }
2122
2123 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
2124 {
2125 using ceph::encode;
2126 encode(state, attrs[BUCKET_SYNC_ATTR_PREFIX "state"]);
2127 }
2128
2129 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2130 {
2131 using ceph::encode;
2132 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "full_marker"]);
2133 }
2134
2135 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
2136 {
2137 using ceph::encode;
2138 encode(*this, attrs[BUCKET_SYNC_ATTR_PREFIX "inc_marker"]);
2139 }
2140
2141 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
2142 RGWDataSyncEnv *sync_env;
2143 string oid;
2144 rgw_bucket_shard_sync_info *status;
2145
2146 map<string, bufferlist> attrs;
2147 public:
2148 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
2149 const rgw_bucket_shard& bs,
2150 rgw_bucket_shard_sync_info *_status)
2151 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2152 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
2153 status(_status) {}
2154 int operate() override;
2155 };
2156
2157 int RGWReadBucketSyncStatusCoroutine::operate()
2158 {
2159 reenter(this) {
2160 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store->svc.sysobj,
2161 rgw_raw_obj(sync_env->store->svc.zone->get_zone_params().log_pool, oid),
2162 &attrs, true));
2163 if (retcode == -ENOENT) {
2164 *status = rgw_bucket_shard_sync_info();
2165 return set_cr_done();
2166 }
2167 if (retcode < 0) {
2168 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2169 return set_cr_error(retcode);
2170 }
2171 status->decode_from_attrs(sync_env->cct, attrs);
2172 return set_cr_done();
2173 }
2174 return 0;
2175 }
2176
2177 #define OMAP_READ_MAX_ENTRIES 10
2178 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2179 RGWDataSyncEnv *sync_env;
2180 RGWRados *store;
2181
2182 const int shard_id;
2183 int max_entries;
2184
2185 set<string>& recovering_buckets;
2186 string marker;
2187 string error_oid;
2188
2189 RGWRadosGetOmapKeysCR::ResultPtr omapkeys;
2190 set<string> error_entries;
2191 int max_omap_entries;
2192 int count;
2193
2194 public:
2195 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2196 set<string>& _recovering_buckets, const int _max_entries)
2197 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2198 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2199 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2200 {
2201 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2202 }
2203
2204 int operate() override;
2205 };
2206
2207 int RGWReadRecoveringBucketShardsCoroutine::operate()
2208 {
2209 reenter(this){
2210 //read recovering bucket shards
2211 count = 0;
2212 do {
2213 omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
2214 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, error_oid),
2215 marker, max_omap_entries, omapkeys));
2216
2217 if (retcode == -ENOENT) {
2218 break;
2219 }
2220
2221 if (retcode < 0) {
2222 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2223 << cpp_strerror(retcode) << dendl;
2224 return set_cr_error(retcode);
2225 }
2226
2227 error_entries = std::move(omapkeys->entries);
2228 if (error_entries.empty()) {
2229 break;
2230 }
2231
2232 count += error_entries.size();
2233 marker = *error_entries.rbegin();
2234 recovering_buckets.insert(std::make_move_iterator(error_entries.begin()),
2235 std::make_move_iterator(error_entries.end()));
2236 } while (omapkeys->more && count < max_entries);
2237
2238 return set_cr_done();
2239 }
2240
2241 return 0;
2242 }
2243
2244 class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2245 RGWDataSyncEnv *sync_env;
2246 RGWRados *store;
2247
2248 const int shard_id;
2249 int max_entries;
2250
2251 set<string>& pending_buckets;
2252 string marker;
2253 string status_oid;
2254
2255 rgw_data_sync_marker* sync_marker;
2256 int count;
2257
2258 std::string next_marker;
2259 list<rgw_data_change_log_entry> log_entries;
2260 bool truncated;
2261
2262 public:
2263 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2264 set<string>& _pending_buckets,
2265 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2266 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2267 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2268 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2269 {
2270 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2271 }
2272
2273 int operate() override;
2274 };
2275
2276 int RGWReadPendingBucketShardsCoroutine::operate()
2277 {
2278 reenter(this){
2279 //read sync status marker
2280 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
2281 yield call(new CR(sync_env->async_rados, store->svc.sysobj,
2282 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
2283 sync_marker));
2284 if (retcode < 0) {
2285 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2286 << cpp_strerror(retcode) << dendl;
2287 return set_cr_error(retcode);
2288 }
2289
2290 //read pending bucket shards
2291 marker = sync_marker->marker;
2292 count = 0;
2293 do{
2294 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
2295 &next_marker, &log_entries, &truncated));
2296
2297 if (retcode == -ENOENT) {
2298 break;
2299 }
2300
2301 if (retcode < 0) {
2302 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2303 << cpp_strerror(retcode) << dendl;
2304 return set_cr_error(retcode);
2305 }
2306
2307 if (log_entries.empty()) {
2308 break;
2309 }
2310
2311 count += log_entries.size();
2312 for (const auto& entry : log_entries) {
2313 pending_buckets.insert(entry.entry.key);
2314 }
2315 }while(truncated && count < max_entries);
2316
2317 return set_cr_done();
2318 }
2319
2320 return 0;
2321 }
2322
2323 int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2324 {
2325 // cannot run concurrently with run_sync(), so run in a separate manager
2326 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2327 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2328 int ret = http_manager.start();
2329 if (ret < 0) {
2330 ldpp_dout(dpp, 0) << "failed in http_manager.start() ret=" << ret << dendl;
2331 return ret;
2332 }
2333 RGWDataSyncEnv sync_env_local = sync_env;
2334 sync_env_local.http_manager = &http_manager;
2335 list<RGWCoroutinesStack *> stacks;
2336 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2337 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2338 stacks.push_back(recovering_stack);
2339 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2340 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2341 stacks.push_back(pending_stack);
2342 ret = crs.run(stacks);
2343 http_manager.stop();
2344 return ret;
2345 }
2346
2347 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2348 {
2349 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2350 }
2351
2352 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2353 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2354 delete iter->second;
2355 }
2356 delete error_logger;
2357 }
2358
2359
2360 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2361 {
2362 JSONDecoder::decode_json("ID", id, obj);
2363 JSONDecoder::decode_json("DisplayName", display_name, obj);
2364 }
2365
2366 struct bucket_list_entry {
2367 bool delete_marker;
2368 rgw_obj_key key;
2369 bool is_latest;
2370 real_time mtime;
2371 string etag;
2372 uint64_t size;
2373 string storage_class;
2374 rgw_bucket_entry_owner owner;
2375 uint64_t versioned_epoch;
2376 string rgw_tag;
2377
2378 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2379
2380 void decode_json(JSONObj *obj) {
2381 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2382 JSONDecoder::decode_json("Key", key.name, obj);
2383 JSONDecoder::decode_json("VersionId", key.instance, obj);
2384 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2385 string mtime_str;
2386 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2387
2388 struct tm t;
2389 uint32_t nsec;
2390 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2391 ceph_timespec ts;
2392 ts.tv_sec = (uint64_t)internal_timegm(&t);
2393 ts.tv_nsec = nsec;
2394 mtime = real_clock::from_ceph_timespec(ts);
2395 }
2396 JSONDecoder::decode_json("ETag", etag, obj);
2397 JSONDecoder::decode_json("Size", size, obj);
2398 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2399 JSONDecoder::decode_json("Owner", owner, obj);
2400 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2401 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
2402 if (key.instance == "null" && !versioned_epoch) {
2403 key.instance.clear();
2404 }
2405 }
2406
2407 RGWModifyOp get_modify_op() const {
2408 if (delete_marker) {
2409 return CLS_RGW_OP_LINK_OLH_DM;
2410 } else if (!key.instance.empty() && key.instance != "null") {
2411 return CLS_RGW_OP_LINK_OLH;
2412 } else {
2413 return CLS_RGW_OP_ADD;
2414 }
2415 }
2416 };
2417
2418 struct bucket_list_result {
2419 string name;
2420 string prefix;
2421 string key_marker;
2422 string version_id_marker;
2423 int max_keys;
2424 bool is_truncated;
2425 list<bucket_list_entry> entries;
2426
2427 bucket_list_result() : max_keys(0), is_truncated(false) {}
2428
2429 void decode_json(JSONObj *obj) {
2430 JSONDecoder::decode_json("Name", name, obj);
2431 JSONDecoder::decode_json("Prefix", prefix, obj);
2432 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2433 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2434 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2435 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2436 JSONDecoder::decode_json("Entries", entries, obj);
2437 }
2438 };
2439
2440 class RGWListBucketShardCR: public RGWCoroutine {
2441 RGWDataSyncEnv *sync_env;
2442 const rgw_bucket_shard& bs;
2443 const string instance_key;
2444 rgw_obj_key marker_position;
2445
2446 bucket_list_result *result;
2447
2448 public:
2449 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2450 rgw_obj_key& _marker_position, bucket_list_result *_result)
2451 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2452 instance_key(bs.get_key()), marker_position(_marker_position),
2453 result(_result) {}
2454
2455 int operate() override {
2456 reenter(this) {
2457 yield {
2458 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2459 { "versions" , NULL },
2460 { "format" , "json" },
2461 { "objs-container" , "true" },
2462 { "key-marker" , marker_position.name.c_str() },
2463 { "version-id-marker" , marker_position.instance.c_str() },
2464 { NULL, NULL } };
2465 // don't include tenant in the url, it's already part of instance_key
2466 string p = string("/") + bs.bucket.name;
2467 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2468 }
2469 if (retcode < 0) {
2470 return set_cr_error(retcode);
2471 }
2472 return set_cr_done();
2473 }
2474 return 0;
2475 }
2476 };
2477
2478 class RGWListBucketIndexLogCR: public RGWCoroutine {
2479 RGWDataSyncEnv *sync_env;
2480 const string instance_key;
2481 string marker;
2482
2483 list<rgw_bi_log_entry> *result;
2484 std::optional<PerfGuard> timer;
2485
2486 public:
2487 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2488 string& _marker, list<rgw_bi_log_entry> *_result)
2489 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2490 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2491
2492 int operate() override {
2493 reenter(this) {
2494 if (sync_env->counters) {
2495 timer.emplace(sync_env->counters, sync_counters::l_poll);
2496 }
2497 yield {
2498 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2499 { "format" , "json" },
2500 { "marker" , marker.c_str() },
2501 { "type", "bucket-index" },
2502 { NULL, NULL } };
2503
2504 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2505 }
2506 timer.reset();
2507 if (retcode < 0) {
2508 if (sync_env->counters) {
2509 sync_env->counters->inc(sync_counters::l_poll_err);
2510 }
2511 return set_cr_error(retcode);
2512 }
2513 return set_cr_done();
2514 }
2515 return 0;
2516 }
2517 };
2518
2519 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2520
2521 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2522 RGWDataSyncEnv *sync_env;
2523
2524 string marker_oid;
2525 rgw_bucket_shard_full_sync_marker sync_marker;
2526
2527 RGWSyncTraceNodeRef tn;
2528
2529 public:
2530 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2531 const string& _marker_oid,
2532 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2533 sync_env(_sync_env),
2534 marker_oid(_marker_oid),
2535 sync_marker(_marker) {}
2536
2537 void set_tn(RGWSyncTraceNodeRef& _tn) {
2538 tn = _tn;
2539 }
2540
2541 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2542 sync_marker.position = new_marker;
2543 sync_marker.count = index_pos;
2544
2545 map<string, bufferlist> attrs;
2546 sync_marker.encode_attr(attrs);
2547
2548 RGWRados *store = sync_env->store;
2549
2550 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
2551 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2552 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
2553 attrs);
2554 }
2555
2556 RGWOrderCallCR *allocate_order_control_cr() override {
2557 return new RGWLastCallerWinsCR(sync_env->cct);
2558 }
2559 };
2560
2561 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2562 RGWDataSyncEnv *sync_env;
2563
2564 string marker_oid;
2565 rgw_bucket_shard_inc_sync_marker sync_marker;
2566
2567 map<rgw_obj_key, string> key_to_marker;
2568
2569 struct operation {
2570 rgw_obj_key key;
2571 bool is_olh;
2572 };
2573 map<string, operation> marker_to_op;
2574 std::set<std::string> pending_olh; // object names with pending olh operations
2575
2576 RGWSyncTraceNodeRef tn;
2577
2578 void handle_finish(const string& marker) override {
2579 auto iter = marker_to_op.find(marker);
2580 if (iter == marker_to_op.end()) {
2581 return;
2582 }
2583 auto& op = iter->second;
2584 key_to_marker.erase(op.key);
2585 reset_need_retry(op.key);
2586 if (op.is_olh) {
2587 pending_olh.erase(op.key.name);
2588 }
2589 marker_to_op.erase(iter);
2590 }
2591
2592 public:
2593 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2594 const string& _marker_oid,
2595 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2596 sync_env(_sync_env),
2597 marker_oid(_marker_oid),
2598 sync_marker(_marker) {}
2599
2600 void set_tn(RGWSyncTraceNodeRef& _tn) {
2601 tn = _tn;
2602 }
2603
2604 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2605 sync_marker.position = new_marker;
2606
2607 map<string, bufferlist> attrs;
2608 sync_marker.encode_attr(attrs);
2609
2610 RGWRados *store = sync_env->store;
2611
2612 tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
2613 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2614 store->svc.sysobj,
2615 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, marker_oid),
2616 attrs);
2617 }
2618
2619 /*
2620 * create index from key -> <op, marker>, and from marker -> key
2621 * this is useful so that we can insure that we only have one
2622 * entry for any key that is used. This is needed when doing
2623 * incremenatl sync of data, and we don't want to run multiple
2624 * concurrent sync operations for the same bucket shard
2625 * Also, we should make sure that we don't run concurrent operations on the same key with
2626 * different ops.
2627 */
2628 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2629 auto result = key_to_marker.emplace(key, marker);
2630 if (!result.second) { // exists
2631 set_need_retry(key);
2632 return false;
2633 }
2634 marker_to_op[marker] = operation{key, is_olh};
2635 if (is_olh) {
2636 // prevent other olh ops from starting on this object name
2637 pending_olh.insert(key.name);
2638 }
2639 return true;
2640 }
2641
2642 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2643 // serialize olh ops on the same object name
2644 if (is_olh && pending_olh.count(key.name)) {
2645 tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
2646 return false;
2647 }
2648 return (key_to_marker.find(key) == key_to_marker.end());
2649 }
2650
2651 RGWOrderCallCR *allocate_order_control_cr() override {
2652 return new RGWLastCallerWinsCR(sync_env->cct);
2653 }
2654 };
2655
2656 template <class T, class K>
2657 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2658 RGWDataSyncEnv *sync_env;
2659
2660 RGWBucketInfo *bucket_info;
2661 const rgw_bucket_shard& bs;
2662
2663 rgw_obj_key key;
2664 bool versioned;
2665 std::optional<uint64_t> versioned_epoch;
2666 rgw_bucket_entry_owner owner;
2667 real_time timestamp;
2668 RGWModifyOp op;
2669 RGWPendingState op_state;
2670
2671 T entry_marker;
2672 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2673
2674 int sync_status;
2675
2676 stringstream error_ss;
2677
2678 bool error_injection;
2679
2680 RGWDataSyncModule *data_sync_module;
2681
2682 rgw_zone_set zones_trace;
2683
2684 RGWSyncTraceNodeRef tn;
2685 public:
2686 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2687 RGWBucketInfo *_bucket_info,
2688 const rgw_bucket_shard& bs,
2689 const rgw_obj_key& _key, bool _versioned,
2690 std::optional<uint64_t> _versioned_epoch,
2691 real_time& _timestamp,
2692 const rgw_bucket_entry_owner& _owner,
2693 RGWModifyOp _op, RGWPendingState _op_state,
2694 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace,
2695 RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct),
2696 sync_env(_sync_env),
2697 bucket_info(_bucket_info), bs(bs),
2698 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2699 owner(_owner),
2700 timestamp(_timestamp), op(_op),
2701 op_state(_op_state),
2702 entry_marker(_entry_marker),
2703 marker_tracker(_marker_tracker),
2704 sync_status(0){
2705 stringstream ss;
2706 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
2707 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2708 set_status("init");
2709
2710 tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", SSTR(key));
2711
2712 tn->log(20, SSTR("bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state));
2713 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2714
2715 data_sync_module = sync_env->sync_module->get_data_handler();
2716
2717 zones_trace = _zones_trace;
2718 zones_trace.insert(sync_env->store->svc.zone->get_zone().id);
2719 }
2720
2721 int operate() override {
2722 reenter(this) {
2723 /* skip entries that are not complete */
2724 if (op_state != CLS_RGW_STATE_COMPLETE) {
2725 goto done;
2726 }
2727 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
2728 do {
2729 yield {
2730 marker_tracker->reset_need_retry(key);
2731 if (key.name.empty()) {
2732 /* shouldn't happen */
2733 set_status("skipping empty entry");
2734 tn->log(0, "entry with empty obj name, skipping");
2735 goto done;
2736 }
2737 if (error_injection &&
2738 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2739 tn->log(0, SSTR(": injecting data sync error on key=" << key.name));
2740 retcode = -EIO;
2741 } else if (op == CLS_RGW_OP_ADD ||
2742 op == CLS_RGW_OP_LINK_OLH) {
2743 set_status("syncing obj");
2744 tn->log(5, SSTR("bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
2745 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2746 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2747 set_status("removing obj");
2748 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2749 versioned = true;
2750 }
2751 tn->log(10, SSTR("removing obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
2752 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
2753 // our copy of the object is more recent, continue as if it succeeded
2754 if (retcode == -ERR_PRECONDITION_FAILED) {
2755 retcode = 0;
2756 }
2757 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2758 set_status("creating delete marker");
2759 tn->log(10, SSTR("creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]"));
2760 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
2761 }
2762 tn->set_resource_name(SSTR(bucket_str_noinstance(bucket_info->bucket) << "/" << key));
2763 }
2764 } while (marker_tracker->need_retry(key));
2765 {
2766 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
2767 if (retcode >= 0) {
2768 tn->log(10, "success");
2769 } else {
2770 tn->log(10, SSTR("failed, retcode=" << retcode << " (" << cpp_strerror(-retcode) << ")"));
2771 }
2772 }
2773
2774 if (retcode < 0 && retcode != -ENOENT) {
2775 set_status() << "failed to sync obj; retcode=" << retcode;
2776 tn->log(0, SSTR("ERROR: failed to sync object: "
2777 << bucket_shard_str{bs} << "/" << key.name));
2778 error_ss << bucket_shard_str{bs} << "/" << key.name;
2779 sync_status = retcode;
2780 }
2781 if (!error_ss.str().empty()) {
2782 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
2783 }
2784 done:
2785 if (sync_status == 0) {
2786 /* update marker */
2787 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2788 yield call(marker_tracker->finish(entry_marker));
2789 sync_status = retcode;
2790 }
2791 if (sync_status < 0) {
2792 return set_cr_error(sync_status);
2793 }
2794 return set_cr_done();
2795 }
2796 return 0;
2797 }
2798 };
2799
2800 #define BUCKET_SYNC_SPAWN_WINDOW 20
2801
2802 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2803 RGWDataSyncEnv *sync_env;
2804 const rgw_bucket_shard& bs;
2805 RGWBucketInfo *bucket_info;
2806 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2807 bucket_list_result list_result;
2808 list<bucket_list_entry>::iterator entries_iter;
2809 rgw_bucket_shard_sync_info& sync_info;
2810 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2811 rgw_obj_key list_marker;
2812 bucket_list_entry *entry{nullptr};
2813
2814 int total_entries{0};
2815
2816 int sync_status{0};
2817
2818 const string& status_oid;
2819
2820 rgw_zone_set zones_trace;
2821
2822 RGWSyncTraceNodeRef tn;
2823 public:
2824 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2825 RGWBucketInfo *_bucket_info,
2826 const std::string& status_oid,
2827 RGWContinuousLeaseCR *lease_cr,
2828 rgw_bucket_shard_sync_info& sync_info,
2829 RGWSyncTraceNodeRef tn_parent)
2830 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2831 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2832 marker_tracker(sync_env, status_oid, sync_info.full_marker),
2833 status_oid(status_oid),
2834 tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
2835 SSTR(bucket_shard_str{bs}))) {
2836 zones_trace.insert(sync_env->source_zone);
2837 marker_tracker.set_tn(tn);
2838 }
2839
2840 int operate() override;
2841 };
2842
2843 int RGWBucketShardFullSyncCR::operate()
2844 {
2845 int ret;
2846 reenter(this) {
2847 list_marker = sync_info.full_marker.position;
2848
2849 total_entries = sync_info.full_marker.count;
2850 do {
2851 if (!lease_cr->is_locked()) {
2852 drain_all();
2853 return set_cr_error(-ECANCELED);
2854 }
2855 set_status("listing remote bucket");
2856 tn->log(20, "listing bucket for full sync");
2857 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2858 &list_result));
2859 if (retcode < 0 && retcode != -ENOENT) {
2860 set_status("failed bucket listing, going down");
2861 drain_all();
2862 return set_cr_error(retcode);
2863 }
2864 if (list_result.entries.size() > 0) {
2865 tn->set_flag(RGW_SNS_FLAG_ACTIVE); /* actually have entries to sync */
2866 }
2867 entries_iter = list_result.entries.begin();
2868 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2869 if (!lease_cr->is_locked()) {
2870 drain_all();
2871 return set_cr_error(-ECANCELED);
2872 }
2873 tn->log(20, SSTR("[full sync] syncing object: "
2874 << bucket_shard_str{bs} << "/" << entries_iter->key));
2875 entry = &(*entries_iter);
2876 total_entries++;
2877 list_marker = entries_iter->key;
2878 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2879 tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
2880 } else {
2881 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2882 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2883 false, /* versioned, only matters for object removal */
2884 entry->versioned_epoch, entry->mtime,
2885 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
2886 entry->key, &marker_tracker, zones_trace, tn),
2887 false);
2888 }
2889 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2890 yield wait_for_child();
2891 bool again = true;
2892 while (again) {
2893 again = collect(&ret, nullptr);
2894 if (ret < 0) {
2895 tn->log(10, "a sync operation returned error");
2896 sync_status = ret;
2897 /* we have reported this error */
2898 }
2899 }
2900 }
2901 }
2902 } while (list_result.is_truncated && sync_status == 0);
2903 set_status("done iterating over all objects");
2904 /* wait for all operations to complete */
2905 while (num_spawned()) {
2906 yield wait_for_child();
2907 bool again = true;
2908 while (again) {
2909 again = collect(&ret, nullptr);
2910 if (ret < 0) {
2911 tn->log(10, "a sync operation returned error");
2912 sync_status = ret;
2913 /* we have reported this error */
2914 }
2915 }
2916 }
2917 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
2918 if (!lease_cr->is_locked()) {
2919 return set_cr_error(-ECANCELED);
2920 }
2921 /* update sync state to incremental */
2922 if (sync_status == 0) {
2923 yield {
2924 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2925 map<string, bufferlist> attrs;
2926 sync_info.encode_state_attr(attrs);
2927 RGWRados *store = sync_env->store;
2928 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store->svc.sysobj,
2929 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
2930 attrs));
2931 }
2932 } else {
2933 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
2934 }
2935 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2936 tn->log(0, SSTR("ERROR: failed to set sync state on bucket "
2937 << bucket_shard_str{bs} << " retcode=" << retcode));
2938 return set_cr_error(retcode);
2939 }
2940 if (sync_status < 0) {
2941 return set_cr_error(sync_status);
2942 }
2943 return set_cr_done();
2944 }
2945 return 0;
2946 }
2947
2948 static bool has_olh_epoch(RGWModifyOp op) {
2949 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2950 }
2951
2952 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2953 RGWDataSyncEnv *sync_env;
2954 const rgw_bucket_shard& bs;
2955 RGWBucketInfo *bucket_info;
2956 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2957 list<rgw_bi_log_entry> list_result;
2958 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
2959 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2960 rgw_bucket_shard_sync_info& sync_info;
2961 rgw_obj_key key;
2962 rgw_bi_log_entry *entry{nullptr};
2963 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2964 bool updated_status{false};
2965 const string& status_oid;
2966 const string& zone_id;
2967
2968 string cur_id;
2969
2970 int sync_status{0};
2971 bool syncstopped{false};
2972
2973 RGWSyncTraceNodeRef tn;
2974 public:
2975 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2976 const rgw_bucket_shard& bs,
2977 RGWBucketInfo *_bucket_info,
2978 const std::string& status_oid,
2979 RGWContinuousLeaseCR *lease_cr,
2980 rgw_bucket_shard_sync_info& sync_info,
2981 RGWSyncTraceNodeRef& _tn_parent)
2982 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2983 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2984 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
2985 status_oid(status_oid), zone_id(_sync_env->store->svc.zone->get_zone().id),
2986 tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
2987 SSTR(bucket_shard_str{bs})))
2988 {
2989 set_description() << "bucket shard incremental sync bucket="
2990 << bucket_shard_str{bs};
2991 set_status("init");
2992 marker_tracker.set_tn(tn);
2993 }
2994
2995 int operate() override;
2996 };
2997
2998 int RGWBucketShardIncrementalSyncCR::operate()
2999 {
3000 int ret;
3001 reenter(this) {
3002 do {
3003 if (!lease_cr->is_locked()) {
3004 drain_all();
3005 tn->log(0, "ERROR: lease is not taken, abort");
3006 return set_cr_error(-ECANCELED);
3007 }
3008 tn->log(20, SSTR("listing bilog for incremental sync" << sync_info.inc_marker.position));
3009 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
3010 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
3011 &list_result));
3012 if (retcode < 0 && retcode != -ENOENT) {
3013 /* wait for all operations to complete */
3014 drain_all();
3015 return set_cr_error(retcode);
3016 }
3017 squash_map.clear();
3018 entries_iter = list_result.begin();
3019 entries_end = list_result.end();
3020 for (; entries_iter != entries_end; ++entries_iter) {
3021 auto e = *entries_iter;
3022 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
3023 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
3024 syncstopped = true;
3025 entries_end = entries_iter; // dont sync past here
3026 break;
3027 }
3028 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
3029 continue;
3030 }
3031 if (e.op == CLS_RGW_OP_CANCEL) {
3032 continue;
3033 }
3034 if (e.state != CLS_RGW_STATE_COMPLETE) {
3035 continue;
3036 }
3037 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
3038 continue;
3039 }
3040 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
3041 // don't squash over olh entries - we need to apply their olh_epoch
3042 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
3043 continue;
3044 }
3045 if (squash_entry.first <= e.timestamp) {
3046 squash_entry = make_pair<>(e.timestamp, e.op);
3047 }
3048 }
3049
3050 entries_iter = list_result.begin();
3051 for (; entries_iter != entries_end; ++entries_iter) {
3052 if (!lease_cr->is_locked()) {
3053 drain_all();
3054 return set_cr_error(-ECANCELED);
3055 }
3056 entry = &(*entries_iter);
3057 {
3058 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
3059 if (p < 0) {
3060 cur_id = entry->id;
3061 } else {
3062 cur_id = entry->id.substr(p + 1);
3063 }
3064 }
3065 sync_info.inc_marker.position = cur_id;
3066
3067 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
3068 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
3069 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3070 continue;
3071 }
3072
3073 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
3074 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
3075 tn->log(20, SSTR("parse_raw_oid() on " << entry->object << " returned false, skipping entry"));
3076 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3077 continue;
3078 }
3079
3080 tn->log(20, SSTR("parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns));
3081
3082 if (!key.ns.empty()) {
3083 set_status() << "skipping entry in namespace: " << entry->object;
3084 tn->log(20, SSTR("skipping entry in namespace: " << entry->object));
3085 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3086 continue;
3087 }
3088
3089 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
3090 if (entry->op == CLS_RGW_OP_CANCEL) {
3091 set_status() << "canceled operation, skipping";
3092 tn->log(20, SSTR("skipping object: "
3093 << bucket_shard_str{bs} << "/" << key << ": canceled operation"));
3094 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3095 continue;
3096 }
3097 if (entry->state != CLS_RGW_STATE_COMPLETE) {
3098 set_status() << "non-complete operation, skipping";
3099 tn->log(20, SSTR("skipping object: "
3100 << bucket_shard_str{bs} << "/" << key << ": non-complete operation"));
3101 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3102 continue;
3103 }
3104 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
3105 set_status() << "redundant operation, skipping";
3106 tn->log(20, SSTR("skipping object: "
3107 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation"));
3108 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3109 continue;
3110 }
3111 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
3112 set_status() << "squashed operation, skipping";
3113 tn->log(20, SSTR("skipping object: "
3114 << bucket_shard_str{bs} << "/" << key << ": squashed operation"));
3115 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3116 continue;
3117 }
3118 tn->set_flag(RGW_SNS_FLAG_ACTIVE);
3119 tn->log(20, SSTR("syncing object: "
3120 << bucket_shard_str{bs} << "/" << key));
3121 updated_status = false;
3122 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
3123 if (!updated_status) {
3124 set_status() << "can't do op, conflicting inflight operation";
3125 updated_status = true;
3126 }
3127 tn->log(5, SSTR("can't do op on key=" << key << " need to wait for conflicting operation to complete"));
3128 yield wait_for_child();
3129 bool again = true;
3130 while (again) {
3131 again = collect(&ret, nullptr);
3132 if (ret < 0) {
3133 tn->log(0, SSTR("ERROR: a child operation returned error (ret=" << ret << ")"));
3134 sync_status = ret;
3135 /* we have reported this error */
3136 }
3137 }
3138 if (sync_status != 0)
3139 break;
3140 }
3141 if (sync_status != 0) {
3142 /* get error, stop */
3143 break;
3144 }
3145 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
3146 set_status() << "can't do op, sync already in progress for object";
3147 tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
3148 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
3149 continue;
3150 }
3151 // yield {
3152 set_status() << "start object sync";
3153 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
3154 tn->log(0, SSTR("ERROR: cannot start syncing " << cur_id << ". Duplicate entry?"));
3155 } else {
3156 std::optional<uint64_t> versioned_epoch;
3157 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
3158 if (entry->ver.pool < 0) {
3159 versioned_epoch = entry->ver.epoch;
3160 }
3161 tn->log(20, SSTR("entry->timestamp=" << entry->timestamp));
3162 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
3163 spawn(new SyncCR(sync_env, bucket_info, bs, key,
3164 entry->is_versioned(), versioned_epoch,
3165 entry->timestamp, owner, entry->op, entry->state,
3166 cur_id, &marker_tracker, entry->zones_trace, tn),
3167 false);
3168 }
3169 // }
3170 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
3171 set_status() << "num_spawned() > spawn_window";
3172 yield wait_for_child();
3173 bool again = true;
3174 while (again) {
3175 again = collect(&ret, nullptr);
3176 if (ret < 0) {
3177 tn->log(10, "a sync operation returned error");
3178 sync_status = ret;
3179 /* we have reported this error */
3180 }
3181 /* not waiting for child here */
3182 }
3183 }
3184 }
3185 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
3186
3187 while (num_spawned()) {
3188 yield wait_for_child();
3189 bool again = true;
3190 while (again) {
3191 again = collect(&ret, nullptr);
3192 if (ret < 0) {
3193 tn->log(10, "a sync operation returned error");
3194 sync_status = ret;
3195 /* we have reported this error */
3196 }
3197 /* not waiting for child here */
3198 }
3199 }
3200 tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
3201
3202 if (syncstopped) {
3203 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3204 // still disabled, we'll delete the sync status object. otherwise we'll
3205 // restart full sync to catch any changes that happened while sync was
3206 // disabled
3207 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
3208 return set_cr_done();
3209 }
3210
3211 yield call(marker_tracker.flush());
3212 if (retcode < 0) {
3213 tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
3214 return set_cr_error(retcode);
3215 }
3216 if (sync_status < 0) {
3217 tn->log(10, SSTR("backing out with sync_status=" << sync_status));
3218 return set_cr_error(sync_status);
3219 }
3220 return set_cr_done();
3221 }
3222 return 0;
3223 }
3224
3225 int RGWRunBucketSyncCoroutine::operate()
3226 {
3227 reenter(this) {
3228 yield {
3229 set_status("acquiring sync lock");
3230 auto store = sync_env->store;
3231 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
3232 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
3233 "sync_lock",
3234 cct->_conf->rgw_sync_lease_period,
3235 this));
3236 lease_stack.reset(spawn(lease_cr.get(), false));
3237 }
3238 while (!lease_cr->is_locked()) {
3239 if (lease_cr->is_done()) {
3240 tn->log(5, "failed to take lease");
3241 set_status("lease lock failed, early abort");
3242 drain_all();
3243 return set_cr_error(lease_cr->get_ret_status());
3244 }
3245 set_sleeping(true);
3246 yield;
3247 }
3248
3249 tn->log(10, "took lease");
3250 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3251 if (retcode < 0 && retcode != -ENOENT) {
3252 tn->log(0, "ERROR: failed to read sync status for bucket");
3253 lease_cr->go_down();
3254 drain_all();
3255 return set_cr_error(retcode);
3256 }
3257
3258 tn->log(20, SSTR("sync status for bucket: " << sync_status.state));
3259
3260 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3261 if (retcode == -ENOENT) {
3262 /* bucket instance info has not been synced in yet, fetch it now */
3263 yield {
3264 tn->log(10, SSTR("no local info for bucket:" << ": fetching metadata"));
3265 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3266
3267 meta_sync_env.init(sync_env->dpp, cct, sync_env->store, sync_env->store->svc.zone->get_master_conn(), sync_env->async_rados,
3268 sync_env->http_manager, sync_env->error_logger, sync_env->sync_tracer);
3269
3270 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3271 string() /* no marker */,
3272 MDLOG_STATUS_COMPLETE,
3273 NULL /* no marker tracker */,
3274 tn));
3275 }
3276 if (retcode < 0) {
3277 tn->log(0, SSTR("ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket}));
3278 lease_cr->go_down();
3279 drain_all();
3280 return set_cr_error(retcode);
3281 }
3282
3283 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3284 }
3285 if (retcode < 0) {
3286 tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket}));
3287 lease_cr->go_down();
3288 drain_all();
3289 return set_cr_error(retcode);
3290 }
3291
3292 do {
3293 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3294 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
3295 if (retcode == -ENOENT) {
3296 tn->log(0, "bucket sync disabled");
3297 lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
3298 lease_cr->wakeup();
3299 lease_cr.reset();
3300 drain_all();
3301 return set_cr_done();
3302 }
3303 if (retcode < 0) {
3304 tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
3305 lease_cr->go_down();
3306 drain_all();
3307 return set_cr_error(retcode);
3308 }
3309 }
3310
3311 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3312 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3313 status_oid, lease_cr.get(),
3314 sync_status, tn));
3315 if (retcode < 0) {
3316 tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
3317 lease_cr->go_down();
3318 drain_all();
3319 return set_cr_error(retcode);
3320 }
3321 }
3322
3323 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3324 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3325 status_oid, lease_cr.get(),
3326 sync_status, tn));
3327 if (retcode < 0) {
3328 tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
3329 lease_cr->go_down();
3330 drain_all();
3331 return set_cr_error(retcode);
3332 }
3333 }
3334 // loop back to previous states unless incremental sync returns normally
3335 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
3336
3337 lease_cr->go_down();
3338 drain_all();
3339 return set_cr_done();
3340 }
3341
3342 return 0;
3343 }
3344
3345 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3346 {
3347 return new RGWRunBucketSyncCoroutine(&sync_env, bs, sync_env.sync_tracer->root_node);
3348 }
3349
3350 int RGWBucketSyncStatusManager::init()
3351 {
3352 conn = store->svc.zone->get_zone_conn_by_id(source_zone);
3353 if (!conn) {
3354 ldpp_dout(this, 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
3355 return -EINVAL;
3356 }
3357
3358 int ret = http_manager.start();
3359 if (ret < 0) {
3360 ldpp_dout(this, 0) << "failed in http_manager.start() ret=" << ret << dendl;
3361 return ret;
3362 }
3363
3364
3365 const string key = bucket.get_key();
3366
3367 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3368 { NULL, NULL } };
3369
3370 string path = string("/admin/metadata/bucket.instance");
3371
3372 bucket_instance_meta_info result;
3373 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3374 if (ret < 0) {
3375 ldpp_dout(this, 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
3376 return ret;
3377 }
3378
3379 RGWBucketInfo& bi = result.data.get_bucket_info();
3380 num_shards = bi.num_shards;
3381
3382 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3383
3384 sync_module.reset(new RGWDefaultSyncModuleInstance());
3385
3386 int effective_num_shards = (num_shards ? num_shards : 1);
3387
3388 auto async_rados = store->get_async_rados();
3389
3390 for (int i = 0; i < effective_num_shards; i++) {
3391 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(this, store, this, async_rados, &http_manager);
3392 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, store->get_sync_tracer(), sync_module);
3393 if (ret < 0) {
3394 ldpp_dout(this, 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
3395 return ret;
3396 }
3397 source_logs[i] = l;
3398 }
3399
3400 return 0;
3401 }
3402
3403 int RGWBucketSyncStatusManager::init_sync_status()
3404 {
3405 list<RGWCoroutinesStack *> stacks;
3406
3407 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3408 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3409 RGWRemoteBucketLog *l = iter->second;
3410 stack->call(l->init_sync_status_cr());
3411
3412 stacks.push_back(stack);
3413 }
3414
3415 return cr_mgr.run(stacks);
3416 }
3417
3418 int RGWBucketSyncStatusManager::read_sync_status()
3419 {
3420 list<RGWCoroutinesStack *> stacks;
3421
3422 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3423 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3424 RGWRemoteBucketLog *l = iter->second;
3425 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3426
3427 stacks.push_back(stack);
3428 }
3429
3430 int ret = cr_mgr.run(stacks);
3431 if (ret < 0) {
3432 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
3433 << bucket_str{bucket} << dendl;
3434 return ret;
3435 }
3436
3437 return 0;
3438 }
3439
3440 int RGWBucketSyncStatusManager::run()
3441 {
3442 list<RGWCoroutinesStack *> stacks;
3443
3444 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3445 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3446 RGWRemoteBucketLog *l = iter->second;
3447 stack->call(l->run_sync_cr());
3448
3449 stacks.push_back(stack);
3450 }
3451
3452 int ret = cr_mgr.run(stacks);
3453 if (ret < 0) {
3454 ldpp_dout(this, 0) << "ERROR: failed to read sync status for "
3455 << bucket_str{bucket} << dendl;
3456 return ret;
3457 }
3458
3459 return 0;
3460 }
3461
3462 unsigned RGWBucketSyncStatusManager::get_subsys() const
3463 {
3464 return dout_subsys;
3465 }
3466
3467 std::ostream& RGWBucketSyncStatusManager::gen_prefix(std::ostream& out) const
3468 {
3469 auto zone = std::string_view{source_zone};
3470 return out << "bucket sync zone:" << zone.substr(0, 8)
3471 << " bucket:" << bucket.name << ' ';
3472 }
3473
3474 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3475 const rgw_bucket_shard& bs)
3476 {
3477 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3478 }
3479
3480 string RGWBucketSyncStatusManager::obj_status_oid(const string& source_zone,
3481 const rgw_obj& obj)
3482 {
3483 return object_status_oid_prefix + "." + source_zone + ":" + obj.bucket.get_key() + ":" +
3484 obj.key.name + ":" + obj.key.instance;
3485 }
3486
3487 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3488 static constexpr int max_concurrent_shards = 16;
3489 RGWRados *const store;
3490 RGWDataSyncEnv *const env;
3491 const int num_shards;
3492 rgw_bucket_shard bs;
3493
3494 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3495 Vector::iterator i, end;
3496
3497 public:
3498 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3499 int num_shards, const rgw_bucket& bucket,
3500 Vector *status)
3501 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3502 store(store), env(env), num_shards(num_shards),
3503 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3504 i(status->begin()), end(status->end())
3505 {}
3506
3507 bool spawn_next() override {
3508 if (i == end) {
3509 return false;
3510 }
3511 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3512 ++i;
3513 ++bs.shard_id;
3514 return true;
3515 }
3516 };
3517
3518 int rgw_bucket_sync_status(const DoutPrefixProvider *dpp, RGWRados *store, const std::string& source_zone,
3519 const RGWBucketInfo& bucket_info,
3520 std::vector<rgw_bucket_shard_sync_info> *status)
3521 {
3522 const auto num_shards = bucket_info.num_shards;
3523 status->clear();
3524 status->resize(std::max<size_t>(1, num_shards));
3525
3526 RGWDataSyncEnv env;
3527 RGWSyncModuleInstanceRef module; // null sync module
3528 env.init(dpp, store->ctx(), store, nullptr, store->get_async_rados(),
3529 nullptr, nullptr, nullptr, source_zone, module, nullptr);
3530
3531 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
3532 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3533 bucket_info.bucket, status));
3534 }
3535
3536
3537 // TODO: move into rgw_data_sync_trim.cc
3538 #undef dout_prefix
3539 #define dout_prefix (*_dout << "data trim: ")
3540
3541 namespace {
3542
3543 /// return the marker that it's safe to trim up to
3544 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3545 {
3546 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3547 }
3548
3549 /// populate the container starting with 'dest' with the minimum stable marker
3550 /// of each shard for all of the peers in [first, last)
3551 template <typename IterIn, typename IterOut>
3552 void take_min_markers(IterIn first, IterIn last, IterOut dest)
3553 {
3554 if (first == last) {
3555 return;
3556 }
3557 for (auto p = first; p != last; ++p) {
3558 auto m = dest;
3559 for (auto &shard : p->sync_markers) {
3560 const auto& stable = get_stable_marker(shard.second);
3561 if (*m > stable) {
3562 *m = stable;
3563 }
3564 ++m;
3565 }
3566 }
3567 }
3568
3569 } // anonymous namespace
3570
3571 class DataLogTrimCR : public RGWCoroutine {
3572 using TrimCR = RGWSyncLogTrimCR;
3573 RGWRados *store;
3574 RGWHTTPManager *http;
3575 const int num_shards;
3576 const std::string& zone_id; //< my zone id
3577 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3578 std::vector<std::string> min_shard_markers; //< min marker per shard
3579 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3580 int ret{0};
3581
3582 public:
3583 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3584 int num_shards, std::vector<std::string>& last_trim)
3585 : RGWCoroutine(store->ctx()), store(store), http(http),
3586 num_shards(num_shards),
3587 zone_id(store->svc.zone->get_zone().id),
3588 peer_status(store->svc.zone->get_zone_data_notify_to_map().size()),
3589 min_shard_markers(num_shards, TrimCR::max_marker),
3590 last_trim(last_trim)
3591 {}
3592
3593 int operate() override;
3594 };
3595
3596 int DataLogTrimCR::operate()
3597 {
3598 reenter(this) {
3599 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3600 set_status("fetching sync status");
3601 yield {
3602 // query data sync status from each sync peer
3603 rgw_http_param_pair params[] = {
3604 { "type", "data" },
3605 { "status", nullptr },
3606 { "source-zone", zone_id.c_str() },
3607 { nullptr, nullptr }
3608 };
3609
3610 auto p = peer_status.begin();
3611 for (auto& c : store->svc.zone->get_zone_data_notify_to_map()) {
3612 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3613 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3614 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3615 false);
3616 ++p;
3617 }
3618 }
3619
3620 // must get a successful reply from all peers to consider trimming
3621 ret = 0;
3622 while (ret == 0 && num_spawned() > 0) {
3623 yield wait_for_child();
3624 collect_next(&ret);
3625 }
3626 drain_all();
3627
3628 if (ret < 0) {
3629 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3630 return set_cr_error(ret);
3631 }
3632
3633 ldout(cct, 10) << "trimming log shards" << dendl;
3634 set_status("trimming log shards");
3635 yield {
3636 // determine the minimum marker for each shard
3637 take_min_markers(peer_status.begin(), peer_status.end(),
3638 min_shard_markers.begin());
3639
3640 for (int i = 0; i < num_shards; i++) {
3641 const auto& m = min_shard_markers[i];
3642 if (m <= last_trim[i]) {
3643 continue;
3644 }
3645 ldout(cct, 10) << "trimming log shard " << i
3646 << " at marker=" << m
3647 << " last_trim=" << last_trim[i] << dendl;
3648 spawn(new TrimCR(store, store->data_log->get_oid(i),
3649 m, &last_trim[i]),
3650 true);
3651 }
3652 }
3653 return set_cr_done();
3654 }
3655 return 0;
3656 }
3657
3658 RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
3659 RGWHTTPManager *http,
3660 int num_shards,
3661 std::vector<std::string>& markers)
3662 {
3663 return new DataLogTrimCR(store, http, num_shards, markers);
3664 }
3665
3666 class DataLogTrimPollCR : public RGWCoroutine {
3667 RGWRados *store;
3668 RGWHTTPManager *http;
3669 const int num_shards;
3670 const utime_t interval; //< polling interval
3671 const std::string lock_oid; //< use first data log shard for lock
3672 const std::string lock_cookie;
3673 std::vector<std::string> last_trim; //< last trimmed marker per shard
3674
3675 public:
3676 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3677 int num_shards, utime_t interval)
3678 : RGWCoroutine(store->ctx()), store(store), http(http),
3679 num_shards(num_shards), interval(interval),
3680 lock_oid(store->data_log->get_oid(0)),
3681 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3682 last_trim(num_shards)
3683 {}
3684
3685 int operate() override;
3686 };
3687
3688 int DataLogTrimPollCR::operate()
3689 {
3690 reenter(this) {
3691 for (;;) {
3692 set_status("sleeping");
3693 wait(interval);
3694
3695 // request a 'data_trim' lock that covers the entire wait interval to
3696 // prevent other gateways from attempting to trim for the duration
3697 set_status("acquiring trim lock");
3698 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3699 rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, lock_oid),
3700 "data_trim", lock_cookie,
3701 interval.sec()));
3702 if (retcode < 0) {
3703 // if the lock is already held, go back to sleep and try again later
3704 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3705 << interval.sec() << "s" << dendl;
3706 continue;
3707 }
3708
3709 set_status("trimming");
3710 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3711
3712 // note that the lock is not released. this is intentional, as it avoids
3713 // duplicating this work in other gateways
3714 }
3715 }
3716 return 0;
3717 }
3718
3719 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3720 RGWHTTPManager *http,
3721 int num_shards, utime_t interval)
3722 {
3723 return new DataLogTrimPollCR(store, http, num_shards, interval);
3724 }