]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 #include <boost/utility/string_ref.hpp>
2
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
21 #include "rgw_sync_log_trim.h"
22
23 #include "cls/lock/cls_lock_client.h"
24
25 #include "auth/Crypto.h"
26
27 #include <boost/asio/yield.hpp>
28
29 #define dout_subsys ceph_subsys_rgw
30
31 #undef dout_prefix
32 #define dout_prefix (*_dout << "data sync: ")
33
34 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
35 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
36 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
37 static string bucket_status_oid_prefix = "bucket.sync-status";
38
39 class RGWSyncDebugLogger {
40 CephContext *cct;
41 string prefix;
42
43 bool ended;
44
45 public:
46 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
47 const string& sync_type, const string& sync_stage,
48 const string& resource, bool log_start = true) {
49 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
50 }
51 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
52 ~RGWSyncDebugLogger();
53
54 void init(CephContext *_cct, const string& source_zone,
55 const string& sync_type, const string& sync_stage,
56 const string& resource, bool log_start = true);
57 void log(const string& state);
58 void finish(int status);
59 };
60
61 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
62 const string& sync_type, const string& sync_section,
63 const string& resource, bool log_start)
64 {
65 cct = _cct;
66 ended = false;
67 string zone_str = source_zone.substr(0, 8);
68 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
69 if (log_start) {
70 log("start");
71 }
72 }
73
74 RGWSyncDebugLogger::~RGWSyncDebugLogger()
75 {
76 if (!ended) {
77 log("finish");
78 }
79 }
80
81 void RGWSyncDebugLogger::log(const string& state)
82 {
83 ldout(cct, 5) << prefix << ":" << state << dendl;
84 }
85
86 void RGWSyncDebugLogger::finish(int status)
87 {
88 ended = true;
89 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
90 }
91
92 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
93 public:
94 RGWDataSyncDebugLogger() {}
95 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
96 const string& resource, bool log_start = true) {
97 init(sync_env, sync_section, resource, log_start);
98 }
99 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
100 const string& resource, bool log_start = true) {
101 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
102 }
103
104 };
105
106 void rgw_datalog_info::decode_json(JSONObj *obj) {
107 JSONDecoder::decode_json("num_objects", num_shards, obj);
108 }
109
110 void rgw_datalog_entry::decode_json(JSONObj *obj) {
111 JSONDecoder::decode_json("key", key, obj);
112 utime_t ut;
113 JSONDecoder::decode_json("timestamp", ut, obj);
114 timestamp = ut.to_real_time();
115 }
116
117 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
118 JSONDecoder::decode_json("marker", marker, obj);
119 JSONDecoder::decode_json("truncated", truncated, obj);
120 JSONDecoder::decode_json("entries", entries, obj);
121 };
122
123 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
124 static constexpr int MAX_CONCURRENT_SHARDS = 16;
125
126 RGWDataSyncEnv *env;
127 const int num_shards;
128 int shard_id{0};;
129
130 map<uint32_t, rgw_data_sync_marker>& markers;
131
132 public:
133 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
134 map<uint32_t, rgw_data_sync_marker>& markers)
135 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
136 env(env), num_shards(num_shards), markers(markers)
137 {}
138 bool spawn_next() override;
139 };
140
141 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
142 {
143 if (shard_id >= num_shards) {
144 return false;
145 }
146 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
147 spawn(new CR(env->async_rados, env->store,
148 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
149 &markers[shard_id]),
150 false);
151 shard_id++;
152 return true;
153 }
154
155 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
156 RGWDataSyncEnv *sync_env;
157 rgw_data_sync_status *sync_status;
158
159 public:
160 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
161 rgw_data_sync_status *_status)
162 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
163 {}
164 int operate() override;
165 };
166
167 int RGWReadDataSyncStatusCoroutine::operate()
168 {
169 reenter(this) {
170 // read sync info
171 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
172 yield {
173 bool empty_on_enoent = false; // fail on ENOENT
174 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
175 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
176 &sync_status->sync_info, empty_on_enoent));
177 }
178 if (retcode < 0) {
179 ldout(sync_env->cct, 4) << "failed to read sync status info with "
180 << cpp_strerror(retcode) << dendl;
181 return set_cr_error(retcode);
182 }
183 // read shard markers
184 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
185 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
186 sync_status->sync_markers));
187 if (retcode < 0) {
188 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
189 << cpp_strerror(retcode) << dendl;
190 return set_cr_error(retcode);
191 }
192 return set_cr_done();
193 }
194 return 0;
195 }
196
197 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
198 RGWDataSyncEnv *sync_env;
199
200 RGWRESTReadResource *http_op;
201
202 int shard_id;
203 RGWDataChangesLogInfo *shard_info;
204
205 public:
206 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
207 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
208 sync_env(_sync_env),
209 http_op(NULL),
210 shard_id(_shard_id),
211 shard_info(_shard_info) {
212 }
213
214 ~RGWReadRemoteDataLogShardInfoCR() override {
215 if (http_op) {
216 http_op->put();
217 }
218 }
219
220 int operate() override {
221 reenter(this) {
222 yield {
223 char buf[16];
224 snprintf(buf, sizeof(buf), "%d", shard_id);
225 rgw_http_param_pair pairs[] = { { "type" , "data" },
226 { "id", buf },
227 { "info" , NULL },
228 { NULL, NULL } };
229
230 string p = "/admin/log/";
231
232 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
233
234 http_op->set_user_info((void *)stack);
235
236 int ret = http_op->aio_read();
237 if (ret < 0) {
238 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
239 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
240 return set_cr_error(ret);
241 }
242
243 return io_block(0);
244 }
245 yield {
246 int ret = http_op->wait(shard_info);
247 if (ret < 0) {
248 return set_cr_error(ret);
249 }
250 return set_cr_done();
251 }
252 }
253 return 0;
254 }
255 };
256
257 struct read_remote_data_log_response {
258 string marker;
259 bool truncated;
260 list<rgw_data_change_log_entry> entries;
261
262 read_remote_data_log_response() : truncated(false) {}
263
264 void decode_json(JSONObj *obj) {
265 JSONDecoder::decode_json("marker", marker, obj);
266 JSONDecoder::decode_json("truncated", truncated, obj);
267 JSONDecoder::decode_json("entries", entries, obj);
268 };
269 };
270
271 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
272 RGWDataSyncEnv *sync_env;
273
274 RGWRESTReadResource *http_op;
275
276 int shard_id;
277 string *pmarker;
278 list<rgw_data_change_log_entry> *entries;
279 bool *truncated;
280
281 read_remote_data_log_response response;
282
283 public:
284 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
285 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
286 sync_env(_sync_env),
287 http_op(NULL),
288 shard_id(_shard_id),
289 pmarker(_pmarker),
290 entries(_entries),
291 truncated(_truncated) {
292 }
293 ~RGWReadRemoteDataLogShardCR() override {
294 if (http_op) {
295 http_op->put();
296 }
297 }
298
299 int operate() override {
300 reenter(this) {
301 yield {
302 char buf[16];
303 snprintf(buf, sizeof(buf), "%d", shard_id);
304 rgw_http_param_pair pairs[] = { { "type" , "data" },
305 { "id", buf },
306 { "marker", pmarker->c_str() },
307 { "extra-info", "true" },
308 { NULL, NULL } };
309
310 string p = "/admin/log/";
311
312 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
313
314 http_op->set_user_info((void *)stack);
315
316 int ret = http_op->aio_read();
317 if (ret < 0) {
318 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
319 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
320 return set_cr_error(ret);
321 }
322
323 return io_block(0);
324 }
325 yield {
326 int ret = http_op->wait(&response);
327 if (ret < 0) {
328 return set_cr_error(ret);
329 }
330 entries->clear();
331 entries->swap(response.entries);
332 *pmarker = response.marker;
333 *truncated = response.truncated;
334 return set_cr_done();
335 }
336 }
337 return 0;
338 }
339 };
340
341 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
342 RGWDataSyncEnv *sync_env;
343
344 int num_shards;
345 map<int, RGWDataChangesLogInfo> *datalog_info;
346
347 int shard_id;
348 #define READ_DATALOG_MAX_CONCURRENT 10
349
350 public:
351 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
352 int _num_shards,
353 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
354 sync_env(_sync_env), num_shards(_num_shards),
355 datalog_info(_datalog_info), shard_id(0) {}
356 bool spawn_next() override;
357 };
358
359 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
360 if (shard_id >= num_shards) {
361 return false;
362 }
363 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
364 shard_id++;
365 return true;
366 }
367
368 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
369 RGWDataSyncEnv *sync_env;
370 RGWRESTReadResource *http_op;
371
372 int shard_id;
373 string marker;
374 uint32_t max_entries;
375 rgw_datalog_shard_data *result;
376
377 public:
378 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
379 const string& _marker, uint32_t _max_entries,
380 rgw_datalog_shard_data *_result)
381 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
382 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
383
384 int send_request() override {
385 RGWRESTConn *conn = sync_env->conn;
386 RGWRados *store = sync_env->store;
387
388 char buf[32];
389 snprintf(buf, sizeof(buf), "%d", shard_id);
390
391 char max_entries_buf[32];
392 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
393
394 const char *marker_key = (marker.empty() ? "" : "marker");
395
396 rgw_http_param_pair pairs[] = { { "type", "data" },
397 { "id", buf },
398 { "max-entries", max_entries_buf },
399 { marker_key, marker.c_str() },
400 { NULL, NULL } };
401
402 string p = "/admin/log/";
403
404 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
405 http_op->set_user_info((void *)stack);
406
407 int ret = http_op->aio_read();
408 if (ret < 0) {
409 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
410 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
411 http_op->put();
412 return ret;
413 }
414
415 return 0;
416 }
417
418 int request_complete() override {
419 int ret = http_op->wait(result);
420 http_op->put();
421 if (ret < 0 && ret != -ENOENT) {
422 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
423 return ret;
424 }
425 return 0;
426 }
427 };
428
429 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
430 RGWDataSyncEnv *sync_env;
431
432 map<int, string> shards;
433 int max_entries_per_shard;
434 map<int, rgw_datalog_shard_data> *result;
435
436 map<int, string>::iterator iter;
437 #define READ_DATALOG_MAX_CONCURRENT 10
438
439 public:
440 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
441 map<int, string>& _shards,
442 int _max_entries_per_shard,
443 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
444 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
445 result(_result) {
446 shards.swap(_shards);
447 iter = shards.begin();
448 }
449 bool spawn_next() override;
450 };
451
452 bool RGWListRemoteDataLogCR::spawn_next() {
453 if (iter == shards.end()) {
454 return false;
455 }
456
457 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
458 ++iter;
459 return true;
460 }
461
462 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
463 static constexpr uint32_t lock_duration = 30;
464 RGWDataSyncEnv *sync_env;
465 RGWRados *store;
466 const rgw_pool& pool;
467 const uint32_t num_shards;
468
469 string sync_status_oid;
470
471 string lock_name;
472 string cookie;
473 rgw_data_sync_status *status;
474 map<int, RGWDataChangesLogInfo> shards_info;
475 public:
476 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
477 uint64_t instance_id,
478 rgw_data_sync_status *status)
479 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
480 pool(store->get_zone_params().log_pool),
481 num_shards(num_shards), status(status) {
482 lock_name = "sync_lock";
483
484 status->sync_info.instance_id = instance_id;
485
486 #define COOKIE_LEN 16
487 char buf[COOKIE_LEN + 1];
488
489 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
490 cookie = buf;
491
492 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
493 }
494
495 int operate() override {
496 int ret;
497 reenter(this) {
498 using LockCR = RGWSimpleRadosLockCR;
499 yield call(new LockCR(sync_env->async_rados, store,
500 rgw_raw_obj{pool, sync_status_oid},
501 lock_name, cookie, lock_duration));
502 if (retcode < 0) {
503 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
504 return set_cr_error(retcode);
505 }
506 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
507 yield call(new WriteInfoCR(sync_env->async_rados, store,
508 rgw_raw_obj{pool, sync_status_oid},
509 status->sync_info));
510 if (retcode < 0) {
511 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
512 return set_cr_error(retcode);
513 }
514
515 /* take lock again, we just recreated the object */
516 yield call(new LockCR(sync_env->async_rados, store,
517 rgw_raw_obj{pool, sync_status_oid},
518 lock_name, cookie, lock_duration));
519 if (retcode < 0) {
520 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
521 return set_cr_error(retcode);
522 }
523
524 /* fetch current position in logs */
525 yield {
526 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
527 if (!conn) {
528 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
529 return set_cr_error(-EIO);
530 }
531 for (uint32_t i = 0; i < num_shards; i++) {
532 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
533 }
534 }
535 while (collect(&ret, NULL)) {
536 if (ret < 0) {
537 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
538 return set_state(RGWCoroutine_Error);
539 }
540 yield;
541 }
542 yield {
543 for (uint32_t i = 0; i < num_shards; i++) {
544 RGWDataChangesLogInfo& info = shards_info[i];
545 auto& marker = status->sync_markers[i];
546 marker.next_step_marker = info.marker;
547 marker.timestamp = info.last_update;
548 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
549 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
550 spawn(new WriteMarkerCR(sync_env->async_rados, store,
551 rgw_raw_obj{pool, oid}, marker), true);
552 }
553 }
554 while (collect(&ret, NULL)) {
555 if (ret < 0) {
556 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
557 return set_state(RGWCoroutine_Error);
558 }
559 yield;
560 }
561
562 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
563 yield call(new WriteInfoCR(sync_env->async_rados, store,
564 rgw_raw_obj{pool, sync_status_oid},
565 status->sync_info));
566 if (retcode < 0) {
567 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
568 return set_cr_error(retcode);
569 }
570 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
571 rgw_raw_obj{pool, sync_status_oid},
572 lock_name, cookie));
573 return set_cr_done();
574 }
575 return 0;
576 }
577 };
578
579 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
580 {
581 rgw_http_param_pair pairs[] = { { "type", "data" },
582 { NULL, NULL } };
583
584 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
585 if (ret < 0) {
586 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
587 return ret;
588 }
589
590 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
591
592 return 0;
593 }
594
595 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
596 {
597 rgw_datalog_info log_info;
598 int ret = read_log_info(&log_info);
599 if (ret < 0) {
600 return ret;
601 }
602
603 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
604 }
605
606 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
607 {
608 if (store->is_meta_master()) {
609 return 0;
610 }
611
612 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
613 }
614
615 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
616 {
617 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
618 _source_zone, _sync_module, observer);
619
620 if (initialized) {
621 return 0;
622 }
623
624 int ret = http_manager.set_threaded();
625 if (ret < 0) {
626 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
627 return ret;
628 }
629
630 initialized = true;
631
632 return 0;
633 }
634
635 void RGWRemoteDataLog::finish()
636 {
637 stop();
638 }
639
640 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
641 {
642 // cannot run concurrently with run_sync(), so run in a separate manager
643 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
644 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
645 int ret = http_manager.set_threaded();
646 if (ret < 0) {
647 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
648 return ret;
649 }
650 RGWDataSyncEnv sync_env_local = sync_env;
651 sync_env_local.http_manager = &http_manager;
652 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
653 http_manager.stop();
654 return ret;
655 }
656
657 int RGWRemoteDataLog::init_sync_status(int num_shards)
658 {
659 rgw_data_sync_status sync_status;
660 sync_status.sync_info.num_shards = num_shards;
661
662 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
663 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
664 int ret = http_manager.set_threaded();
665 if (ret < 0) {
666 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
667 return ret;
668 }
669 RGWDataSyncEnv sync_env_local = sync_env;
670 sync_env_local.http_manager = &http_manager;
671 uint64_t instance_id;
672 get_random_bytes((char *)&instance_id, sizeof(instance_id));
673 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
674 http_manager.stop();
675 return ret;
676 }
677
678 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
679 {
680 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
681 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
682 return string(buf);
683 }
684
685 struct bucket_instance_meta_info {
686 string key;
687 obj_version ver;
688 utime_t mtime;
689 RGWBucketInstanceMetadataObject data;
690
691 bucket_instance_meta_info() {}
692
693 void decode_json(JSONObj *obj) {
694 JSONDecoder::decode_json("key", key, obj);
695 JSONDecoder::decode_json("ver", ver, obj);
696 JSONDecoder::decode_json("mtime", mtime, obj);
697 JSONDecoder::decode_json("data", data, obj);
698 }
699 };
700
701 class RGWListBucketIndexesCR : public RGWCoroutine {
702 RGWDataSyncEnv *sync_env;
703
704 RGWRados *store;
705
706 rgw_data_sync_status *sync_status;
707 int num_shards;
708
709 int req_ret;
710 int ret;
711
712 list<string> result;
713 list<string>::iterator iter;
714
715 RGWShardedOmapCRManager *entries_index;
716
717 string oid_prefix;
718
719 string path;
720 bucket_instance_meta_info meta_info;
721 string key;
722 string s;
723 int i;
724
725 bool failed;
726
727 public:
728 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
729 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
730 store(sync_env->store), sync_status(_sync_status),
731 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
732 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
733 path = "/admin/metadata/bucket.instance";
734 num_shards = sync_status->sync_info.num_shards;
735 }
736 ~RGWListBucketIndexesCR() override {
737 delete entries_index;
738 }
739
740 int operate() override {
741 reenter(this) {
742 yield {
743 string entrypoint = string("/admin/metadata/bucket.instance");
744 /* FIXME: need a better scaling solution here, requires streaming output */
745 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
746 entrypoint, NULL, &result));
747 }
748 if (retcode < 0) {
749 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
750 return set_cr_error(retcode);
751 }
752 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
753 store->get_zone_params().log_pool,
754 oid_prefix);
755 yield; // yield so OmapAppendCRs can start
756 for (iter = result.begin(); iter != result.end(); ++iter) {
757 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
758
759 key = *iter;
760
761 yield {
762 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
763 { NULL, NULL } };
764
765 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
766 }
767
768 num_shards = meta_info.data.get_bucket_info().num_shards;
769 if (num_shards > 0) {
770 for (i = 0; i < num_shards; i++) {
771 char buf[16];
772 snprintf(buf, sizeof(buf), ":%d", i);
773 s = key + buf;
774 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
775 }
776 } else {
777 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
778 }
779 }
780 yield {
781 if (!entries_index->finish()) {
782 failed = true;
783 }
784 }
785 if (!failed) {
786 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
787 int shard_id = (int)iter->first;
788 rgw_data_sync_marker& marker = iter->second;
789 marker.total_entries = entries_index->get_total_entries(shard_id);
790 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
791 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
792 marker), true);
793 }
794 } else {
795 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
796 EIO, string("failed to build bucket instances map")));
797 }
798 while (collect(&ret, NULL)) {
799 if (ret < 0) {
800 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
801 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
802 req_ret = ret;
803 }
804 yield;
805 }
806 drain_all();
807 if (req_ret < 0) {
808 yield return set_cr_error(req_ret);
809 }
810 yield return set_cr_done();
811 }
812 return 0;
813 }
814 };
815
816 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
817
818 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
819 RGWDataSyncEnv *sync_env;
820
821 string marker_oid;
822 rgw_data_sync_marker sync_marker;
823
824 map<string, string> key_to_marker;
825 map<string, string> marker_to_key;
826
827 void handle_finish(const string& marker) override {
828 map<string, string>::iterator iter = marker_to_key.find(marker);
829 if (iter == marker_to_key.end()) {
830 return;
831 }
832 key_to_marker.erase(iter->second);
833 reset_need_retry(iter->second);
834 marker_to_key.erase(iter);
835 }
836
837 public:
838 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
839 const string& _marker_oid,
840 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
841 sync_env(_sync_env),
842 marker_oid(_marker_oid),
843 sync_marker(_marker) {}
844
845 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
846 sync_marker.marker = new_marker;
847 sync_marker.pos = index_pos;
848
849 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
850 RGWRados *store = sync_env->store;
851
852 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
853 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
854 sync_marker);
855 }
856
857 /*
858 * create index from key -> marker, and from marker -> key
859 * this is useful so that we can insure that we only have one
860 * entry for any key that is used. This is needed when doing
861 * incremenatl sync of data, and we don't want to run multiple
862 * concurrent sync operations for the same bucket shard
863 */
864 bool index_key_to_marker(const string& key, const string& marker) {
865 if (key_to_marker.find(key) != key_to_marker.end()) {
866 set_need_retry(key);
867 return false;
868 }
869 key_to_marker[key] = marker;
870 marker_to_key[marker] = key;
871 return true;
872 }
873 };
874
875 // ostream wrappers to print buckets without copying strings
876 struct bucket_str {
877 const rgw_bucket& b;
878 bucket_str(const rgw_bucket& b) : b(b) {}
879 };
880 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
881 auto& b = rhs.b;
882 if (!b.tenant.empty()) {
883 out << b.tenant << '/';
884 }
885 out << b.name;
886 if (!b.bucket_id.empty()) {
887 out << ':' << b.bucket_id;
888 }
889 return out;
890 }
891
892 struct bucket_shard_str {
893 const rgw_bucket_shard& bs;
894 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
895 };
896 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
897 auto& bs = rhs.bs;
898 out << bucket_str{bs.bucket};
899 if (bs.shard_id >= 0) {
900 out << ':' << bs.shard_id;
901 }
902 return out;
903 }
904
905 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
906 RGWDataSyncEnv *sync_env;
907 rgw_bucket_shard bs;
908 RGWBucketInfo bucket_info;
909 rgw_bucket_shard_sync_info sync_status;
910 RGWMetaSyncEnv meta_sync_env;
911
912 RGWDataSyncDebugLogger logger;
913 const std::string status_oid;
914
915 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
916 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
917
918 public:
919 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
920 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
921 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
922 logger.init(sync_env, "Bucket", bs.get_key());
923 }
924 ~RGWRunBucketSyncCoroutine() override {
925 if (lease_cr) {
926 lease_cr->abort();
927 }
928 }
929
930 int operate() override;
931 };
932
933 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
934 RGWDataSyncEnv *sync_env;
935
936 string raw_key;
937 string entry_marker;
938
939 rgw_bucket_shard bs;
940
941 int sync_status;
942
943 bufferlist md_bl;
944
945 RGWDataSyncShardMarkerTrack *marker_tracker;
946
947 boost::intrusive_ptr<RGWOmapAppend> error_repo;
948 bool remove_from_repo;
949
950 set<string> keys;
951
952 public:
953 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
954 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
955 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
956 sync_env(_sync_env),
957 raw_key(_raw_key), entry_marker(_entry_marker),
958 sync_status(0),
959 marker_tracker(_marker_tracker),
960 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
961 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
962 }
963
964 int operate() override {
965 reenter(this) {
966 do {
967 yield {
968 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
969 &bs.bucket, &bs.shard_id);
970 if (ret < 0) {
971 return set_cr_error(-EIO);
972 }
973 if (marker_tracker) {
974 marker_tracker->reset_need_retry(raw_key);
975 }
976 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
977 }
978 } while (marker_tracker && marker_tracker->need_retry(raw_key));
979
980 sync_status = retcode;
981
982 if (sync_status == -ENOENT) {
983 // this was added when 'tenant/' was added to datalog entries, because
984 // preexisting tenant buckets could never sync and would stay in the
985 // error_repo forever
986 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
987 "for missing bucket " << raw_key << dendl;
988 sync_status = 0;
989 }
990
991 if (sync_status < 0) {
992 // write actual sync failures for 'radosgw-admin sync error list'
993 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
994 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
995 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
996 if (retcode < 0) {
997 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
998 }
999 }
1000 if (error_repo && !error_repo->append(raw_key)) {
1001 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
1002 }
1003 } else if (error_repo && remove_from_repo) {
1004 keys = {raw_key};
1005 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1006 if (retcode < 0) {
1007 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1008 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1009 }
1010 }
1011 if (sync_env->observer) {
1012 sync_env->observer->on_bucket_changed(bs.bucket.get_key());
1013 }
1014 /* FIXME: what do do in case of error */
1015 if (marker_tracker && !entry_marker.empty()) {
1016 /* update marker */
1017 yield call(marker_tracker->finish(entry_marker));
1018 }
1019 if (sync_status == 0) {
1020 sync_status = retcode;
1021 }
1022 if (sync_status < 0) {
1023 return set_cr_error(sync_status);
1024 }
1025 return set_cr_done();
1026 }
1027 return 0;
1028 }
1029 };
1030
1031 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1032 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1033
1034 enum RemoteDatalogStatus {
1035 RemoteNotTrimmed = 0,
1036 RemoteTrimmed = 1,
1037 RemoteMightTrimmed = 2
1038 };
1039
1040 class RGWDataSyncShardCR : public RGWCoroutine {
1041 RGWDataSyncEnv *sync_env;
1042
1043 rgw_pool pool;
1044
1045 uint32_t shard_id;
1046 rgw_data_sync_marker sync_marker;
1047
1048 map<string, bufferlist> entries;
1049 map<string, bufferlist>::iterator iter;
1050
1051 string oid;
1052
1053 RGWDataSyncShardMarkerTrack *marker_tracker;
1054
1055 list<rgw_data_change_log_entry> log_entries;
1056 list<rgw_data_change_log_entry>::iterator log_iter;
1057 bool truncated;
1058
1059 RGWDataChangesLogInfo shard_info;
1060 string datalog_marker;
1061
1062 RemoteDatalogStatus remote_trimmed;
1063 Mutex inc_lock;
1064 Cond inc_cond;
1065
1066 boost::asio::coroutine incremental_cr;
1067 boost::asio::coroutine full_cr;
1068
1069
1070 set<string> modified_shards;
1071 set<string> current_modified;
1072
1073 set<string>::iterator modified_iter;
1074
1075 int total_entries;
1076
1077 int spawn_window;
1078
1079 bool *reset_backoff;
1080
1081 set<string> spawned_keys;
1082
1083 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1084 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1085 string status_oid;
1086
1087
1088 string error_oid;
1089 RGWOmapAppend *error_repo;
1090 map<string, bufferlist> error_entries;
1091 string error_marker;
1092 int max_error_entries;
1093
1094 ceph::real_time error_retry_time;
1095
1096 #define RETRY_BACKOFF_SECS_MIN 60
1097 #define RETRY_BACKOFF_SECS_DEFAULT 60
1098 #define RETRY_BACKOFF_SECS_MAX 600
1099 uint32_t retry_backoff_secs;
1100
1101 RGWDataSyncDebugLogger logger;
1102 public:
1103 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1104 rgw_pool& _pool,
1105 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1106 sync_env(_sync_env),
1107 pool(_pool),
1108 shard_id(_shard_id),
1109 sync_marker(_marker),
1110 marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
1111 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1112 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1113 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1114 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1115 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1116 error_oid = status_oid + ".retry";
1117
1118 logger.init(sync_env, "DataShard", status_oid);
1119 }
1120
1121 ~RGWDataSyncShardCR() override {
1122 delete marker_tracker;
1123 if (lease_cr) {
1124 lease_cr->abort();
1125 }
1126 if (error_repo) {
1127 error_repo->put();
1128 }
1129 }
1130
1131 void append_modified_shards(set<string>& keys) {
1132 Mutex::Locker l(inc_lock);
1133 modified_shards.insert(keys.begin(), keys.end());
1134 }
1135
1136 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1137 delete marker_tracker;
1138 marker_tracker = mt;
1139 }
1140
1141 int operate() override {
1142 int r;
1143 while (true) {
1144 switch (sync_marker.state) {
1145 case rgw_data_sync_marker::FullSync:
1146 r = full_sync();
1147 if (r < 0) {
1148 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1149 return set_cr_error(r);
1150 }
1151 return 0;
1152 case rgw_data_sync_marker::IncrementalSync:
1153 r = incremental_sync();
1154 if (r < 0) {
1155 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1156 return set_cr_error(r);
1157 }
1158 return 0;
1159 default:
1160 return set_cr_error(-EIO);
1161 }
1162 }
1163 return 0;
1164 }
1165
1166 void init_lease_cr() {
1167 set_status("acquiring sync lock");
1168 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1169 string lock_name = "sync_lock";
1170 if (lease_cr) {
1171 lease_cr->abort();
1172 }
1173 RGWRados *store = sync_env->store;
1174 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1175 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1176 lock_name, lock_duration, this));
1177 lease_stack.reset(spawn(lease_cr.get(), false));
1178 }
1179
1180 int full_sync() {
1181 #define OMAP_GET_MAX_ENTRIES 100
1182 int max_entries = OMAP_GET_MAX_ENTRIES;
1183 reenter(&full_cr) {
1184 yield init_lease_cr();
1185 while (!lease_cr->is_locked()) {
1186 if (lease_cr->is_done()) {
1187 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1188 set_status("lease lock failed, early abort");
1189 return set_cr_error(lease_cr->get_ret_status());
1190 }
1191 set_sleeping(true);
1192 yield;
1193 }
1194 logger.log("full sync");
1195 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1196 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1197 total_entries = sync_marker.pos;
1198 do {
1199 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1200 if (retcode < 0) {
1201 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1202 lease_cr->go_down();
1203 drain_all();
1204 return set_cr_error(retcode);
1205 }
1206 iter = entries.begin();
1207 for (; iter != entries.end(); ++iter) {
1208 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1209 total_entries++;
1210 if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1211 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1212 } else {
1213 // fetch remote and write locally
1214 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
1215 if (retcode < 0) {
1216 lease_cr->go_down();
1217 drain_all();
1218 return set_cr_error(retcode);
1219 }
1220 }
1221 sync_marker.marker = iter->first;
1222 }
1223 } while ((int)entries.size() == max_entries);
1224
1225 lease_cr->go_down();
1226 drain_all();
1227
1228 yield {
1229 /* update marker to reflect we're done with full sync */
1230 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1231 sync_marker.marker = sync_marker.next_step_marker;
1232 sync_marker.next_step_marker.clear();
1233 RGWRados *store = sync_env->store;
1234 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1235 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1236 sync_marker));
1237 }
1238 if (retcode < 0) {
1239 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1240 lease_cr->go_down();
1241 return set_cr_error(retcode);
1242 }
1243 }
1244 return 0;
1245 }
1246
1247 int incremental_sync() {
1248 reenter(&incremental_cr) {
1249 yield init_lease_cr();
1250 while (!lease_cr->is_locked()) {
1251 if (lease_cr->is_done()) {
1252 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1253 set_status("lease lock failed, early abort");
1254 return set_cr_error(lease_cr->get_ret_status());
1255 }
1256 set_sleeping(true);
1257 yield;
1258 }
1259 set_status("lease acquired");
1260 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1261 rgw_raw_obj(pool, error_oid),
1262 1 /* no buffer */);
1263 error_repo->get();
1264 spawn(error_repo, false);
1265 logger.log("inc sync");
1266 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1267 do {
1268 current_modified.clear();
1269 inc_lock.Lock();
1270 current_modified.swap(modified_shards);
1271 inc_lock.Unlock();
1272
1273 /* process out of band updates */
1274 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1275 yield {
1276 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1277 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1278 }
1279 }
1280
1281 /* process bucket shards that previously failed */
1282 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1283 error_marker, &error_entries,
1284 max_error_entries));
1285 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1286 iter = error_entries.begin();
1287 for (; iter != error_entries.end(); ++iter) {
1288 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
1289 spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
1290 error_marker = iter->first;
1291 }
1292 if ((int)error_entries.size() != max_error_entries) {
1293 if (error_marker.empty() && error_entries.empty()) {
1294 /* the retry repo is empty, we back off a bit before calling it again */
1295 retry_backoff_secs *= 2;
1296 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1297 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1298 }
1299 } else {
1300 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1301 }
1302 error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1303 error_marker.clear();
1304 }
1305
1306
1307 yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1308 if (retcode < 0) {
1309 ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1310 stop_spawned_services();
1311 drain_all();
1312 return set_cr_error(retcode);
1313 }
1314 datalog_marker = shard_info.marker;
1315 remote_trimmed = RemoteNotTrimmed;
1316 #define INCREMENTAL_MAX_ENTRIES 100
1317 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1318 if (datalog_marker > sync_marker.marker) {
1319 spawned_keys.clear();
1320 if (sync_marker.marker.empty())
1321 remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
1322 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1323 if (retcode < 0) {
1324 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1325 stop_spawned_services();
1326 drain_all();
1327 return set_cr_error(retcode);
1328 }
1329 if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
1330 remote_trimmed = RemoteTrimmed;
1331 else
1332 remote_trimmed = RemoteNotTrimmed;
1333 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1334 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1335 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1336 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1337 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1338 continue;
1339 }
1340 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1341 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1342 } else {
1343 /*
1344 * don't spawn the same key more than once. We can do that as long as we don't yield
1345 */
1346 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1347 spawned_keys.insert(log_iter->entry.key);
1348 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1349 if (retcode < 0) {
1350 stop_spawned_services();
1351 drain_all();
1352 return set_cr_error(retcode);
1353 }
1354 }
1355 }
1356 }
1357 while ((int)num_spawned() > spawn_window) {
1358 set_status() << "num_spawned() > spawn_window";
1359 yield wait_for_child();
1360 int ret;
1361 while (collect(&ret, lease_stack.get())) {
1362 if (ret < 0) {
1363 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1364 /* we have reported this error */
1365 }
1366 /* not waiting for child here */
1367 }
1368 }
1369 }
1370 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1371 if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
1372 #define INCREMENTAL_INTERVAL 20
1373 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1374 }
1375 } while (true);
1376 }
1377 return 0;
1378 }
1379 void stop_spawned_services() {
1380 lease_cr->go_down();
1381 if (error_repo) {
1382 error_repo->finish();
1383 error_repo->put();
1384 error_repo = NULL;
1385 }
1386 }
1387 };
1388
1389 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1390 RGWDataSyncEnv *sync_env;
1391
1392 rgw_pool pool;
1393
1394 uint32_t shard_id;
1395 rgw_data_sync_marker sync_marker;
1396
1397 public:
1398 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1399 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1400 sync_env(_sync_env),
1401 pool(_pool),
1402 shard_id(_shard_id),
1403 sync_marker(_marker) {
1404 }
1405
1406 RGWCoroutine *alloc_cr() override {
1407 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1408 }
1409
1410 RGWCoroutine *alloc_finisher_cr() override {
1411 RGWRados *store = sync_env->store;
1412 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1413 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1414 &sync_marker);
1415 }
1416
1417 void append_modified_shards(set<string>& keys) {
1418 Mutex::Locker l(cr_lock());
1419
1420 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1421 if (!cr) {
1422 return;
1423 }
1424
1425 cr->append_modified_shards(keys);
1426 }
1427 };
1428
1429 class RGWDataSyncCR : public RGWCoroutine {
1430 RGWDataSyncEnv *sync_env;
1431 uint32_t num_shards;
1432
1433 rgw_data_sync_status sync_status;
1434
1435 RGWDataSyncShardMarkerTrack *marker_tracker;
1436
1437 Mutex shard_crs_lock;
1438 map<int, RGWDataSyncShardControlCR *> shard_crs;
1439
1440 bool *reset_backoff;
1441
1442 RGWDataSyncDebugLogger logger;
1443
1444 RGWDataSyncModule *data_sync_module{nullptr};
1445 public:
1446 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1447 sync_env(_sync_env),
1448 num_shards(_num_shards),
1449 marker_tracker(NULL),
1450 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1451 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1452
1453 }
1454
1455 ~RGWDataSyncCR() override {
1456 for (auto iter : shard_crs) {
1457 iter.second->put();
1458 }
1459 }
1460
1461 int operate() override {
1462 reenter(this) {
1463
1464 /* read sync status */
1465 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1466
1467 data_sync_module = sync_env->sync_module->get_data_handler();
1468
1469 if (retcode < 0 && retcode != -ENOENT) {
1470 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1471 return set_cr_error(retcode);
1472 }
1473
1474 /* state: init status */
1475 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1476 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1477 sync_status.sync_info.num_shards = num_shards;
1478 uint64_t instance_id;
1479 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1480 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1481 if (retcode < 0) {
1482 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1483 return set_cr_error(retcode);
1484 }
1485 // sets state = StateBuildingFullSyncMaps
1486
1487 *reset_backoff = true;
1488 }
1489
1490 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1491
1492 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1493 /* call sync module init here */
1494 sync_status.sync_info.num_shards = num_shards;
1495 yield call(data_sync_module->init_sync(sync_env));
1496 if (retcode < 0) {
1497 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1498 return set_cr_error(retcode);
1499 }
1500 /* state: building full sync maps */
1501 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1502 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1503 if (retcode < 0) {
1504 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1505 return set_cr_error(retcode);
1506 }
1507 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1508
1509 /* update new state */
1510 yield call(set_sync_info_cr());
1511 if (retcode < 0) {
1512 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1513 return set_cr_error(retcode);
1514 }
1515
1516 *reset_backoff = true;
1517 }
1518
1519 yield {
1520 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1521 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1522 iter != sync_status.sync_markers.end(); ++iter) {
1523 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1524 iter->first, iter->second);
1525 cr->get();
1526 shard_crs_lock.Lock();
1527 shard_crs[iter->first] = cr;
1528 shard_crs_lock.Unlock();
1529 spawn(cr, true);
1530 }
1531 }
1532 }
1533
1534 return set_cr_done();
1535 }
1536 return 0;
1537 }
1538
1539 RGWCoroutine *set_sync_info_cr() {
1540 RGWRados *store = sync_env->store;
1541 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1542 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1543 sync_status.sync_info);
1544 }
1545
1546 void wakeup(int shard_id, set<string>& keys) {
1547 Mutex::Locker l(shard_crs_lock);
1548 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1549 if (iter == shard_crs.end()) {
1550 return;
1551 }
1552 iter->second->append_modified_shards(keys);
1553 iter->second->wakeup();
1554 }
1555 };
1556
1557 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1558 public:
1559 RGWDefaultDataSyncModule() {}
1560
1561 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1562 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1563 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1564 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1565 };
1566
1567 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1568 RGWDefaultDataSyncModule data_handler;
1569 public:
1570 RGWDefaultSyncModuleInstance() {}
1571 RGWDataSyncModule *get_data_handler() override {
1572 return &data_handler;
1573 }
1574 };
1575
1576 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1577 {
1578 instance->reset(new RGWDefaultSyncModuleInstance());
1579 return 0;
1580 }
1581
1582 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1583 {
1584 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1585 key, versioned_epoch,
1586 true, zones_trace);
1587 }
1588
1589 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1590 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1591 {
1592 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1593 bucket_info, key, versioned, versioned_epoch,
1594 NULL, NULL, false, &mtime, zones_trace);
1595 }
1596
1597 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1598 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1599 {
1600 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1601 bucket_info, key, versioned, versioned_epoch,
1602 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1603 }
1604
1605 class RGWDataSyncControlCR : public RGWBackoffControlCR
1606 {
1607 RGWDataSyncEnv *sync_env;
1608 uint32_t num_shards;
1609
1610 static constexpr bool exit_on_error = false; // retry on all errors
1611 public:
1612 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1613 sync_env(_sync_env), num_shards(_num_shards) {
1614 }
1615
1616 RGWCoroutine *alloc_cr() override {
1617 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1618 }
1619
1620 void wakeup(int shard_id, set<string>& keys) {
1621 Mutex& m = cr_lock();
1622
1623 m.Lock();
1624 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1625 if (!cr) {
1626 m.Unlock();
1627 return;
1628 }
1629
1630 cr->get();
1631 m.Unlock();
1632
1633 if (cr) {
1634 cr->wakeup(shard_id, keys);
1635 }
1636
1637 cr->put();
1638 }
1639 };
1640
1641 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1642 RWLock::RLocker rl(lock);
1643 if (!data_sync_cr) {
1644 return;
1645 }
1646 data_sync_cr->wakeup(shard_id, keys);
1647 }
1648
1649 int RGWRemoteDataLog::run_sync(int num_shards)
1650 {
1651 lock.get_write();
1652 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1653 data_sync_cr->get(); // run() will drop a ref, so take another
1654 lock.unlock();
1655
1656 int r = run(data_sync_cr);
1657
1658 lock.get_write();
1659 data_sync_cr->put();
1660 data_sync_cr = NULL;
1661 lock.unlock();
1662
1663 if (r < 0) {
1664 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1665 return r;
1666 }
1667 return 0;
1668 }
1669
1670 int RGWDataSyncStatusManager::init()
1671 {
1672 auto zone_def_iter = store->zone_by_id.find(source_zone);
1673 if (zone_def_iter == store->zone_by_id.end()) {
1674 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1675 return -EIO;
1676 }
1677
1678 auto& zone_def = zone_def_iter->second;
1679
1680 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1681 return -ENOTSUP;
1682 }
1683
1684 RGWZoneParams& zone_params = store->get_zone_params();
1685
1686 sync_module = store->get_sync_module();
1687
1688 conn = store->get_zone_conn_by_id(source_zone);
1689 if (!conn) {
1690 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1691 return -EINVAL;
1692 }
1693
1694 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1695
1696 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1697 if (r < 0) {
1698 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1699 finalize();
1700 return r;
1701 }
1702
1703 rgw_datalog_info datalog_info;
1704 r = source_log.read_log_info(&datalog_info);
1705 if (r < 0) {
1706 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1707 finalize();
1708 return r;
1709 }
1710
1711 num_shards = datalog_info.num_shards;
1712
1713 for (int i = 0; i < num_shards; i++) {
1714 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1715 }
1716
1717 return 0;
1718 }
1719
1720 void RGWDataSyncStatusManager::finalize()
1721 {
1722 delete error_logger;
1723 error_logger = nullptr;
1724 }
1725
1726 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1727 {
1728 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1729 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1730
1731 return string(buf);
1732 }
1733
1734 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1735 {
1736 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1737 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1738
1739 return string(buf);
1740 }
1741
1742 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1743 const rgw_bucket& bucket, int shard_id,
1744 RGWSyncErrorLogger *_error_logger,
1745 RGWSyncModuleInstanceRef& _sync_module)
1746 {
1747 conn = _conn;
1748 source_zone = _source_zone;
1749 bs.bucket = bucket;
1750 bs.shard_id = shard_id;
1751
1752 sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
1753 _error_logger, source_zone, _sync_module, nullptr);
1754
1755 return 0;
1756 }
1757
1758 struct bucket_index_marker_info {
1759 string bucket_ver;
1760 string master_ver;
1761 string max_marker;
1762 bool syncstopped{false};
1763
1764 void decode_json(JSONObj *obj) {
1765 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
1766 JSONDecoder::decode_json("master_ver", master_ver, obj);
1767 JSONDecoder::decode_json("max_marker", max_marker, obj);
1768 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
1769 }
1770 };
1771
1772 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1773 RGWDataSyncEnv *sync_env;
1774 const string instance_key;
1775
1776 bucket_index_marker_info *info;
1777
1778 public:
1779 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1780 const rgw_bucket_shard& bs,
1781 bucket_index_marker_info *_info)
1782 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1783 instance_key(bs.get_key()), info(_info) {}
1784
1785 int operate() override {
1786 reenter(this) {
1787 yield {
1788 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1789 { "bucket-instance", instance_key.c_str() },
1790 { "info" , NULL },
1791 { NULL, NULL } };
1792
1793 string p = "/admin/log/";
1794 call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1795 }
1796 if (retcode < 0) {
1797 return set_cr_error(retcode);
1798 }
1799 return set_cr_done();
1800 }
1801 return 0;
1802 }
1803 };
1804
1805 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1806 RGWDataSyncEnv *sync_env;
1807
1808 rgw_bucket_shard bs;
1809 const string sync_status_oid;
1810
1811 rgw_bucket_shard_sync_info& status;
1812
1813 bucket_index_marker_info info;
1814 public:
1815 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1816 const rgw_bucket_shard& bs,
1817 rgw_bucket_shard_sync_info& _status)
1818 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1819 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1820 status(_status)
1821 {}
1822
1823 int operate() override {
1824 reenter(this) {
1825 /* fetch current position in logs */
1826 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1827 if (retcode < 0 && retcode != -ENOENT) {
1828 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1829 return set_cr_error(retcode);
1830 }
1831 yield {
1832 auto store = sync_env->store;
1833 rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1834
1835 if (info.syncstopped) {
1836 call(new RGWRadosRemoveCR(store, obj));
1837 } else {
1838 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1839 status.inc_marker.position = info.max_marker;
1840 map<string, bufferlist> attrs;
1841 status.encode_all_attrs(attrs);
1842 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1843 }
1844 }
1845 return set_cr_done();
1846 }
1847 return 0;
1848 }
1849 };
1850
1851 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1852 {
1853 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1854 }
1855
1856 template <class T>
1857 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1858 {
1859 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1860 if (iter == attrs.end()) {
1861 *val = T();
1862 return;
1863 }
1864
1865 bufferlist::iterator biter = iter->second.begin();
1866 try {
1867 ::decode(*val, biter);
1868 } catch (buffer::error& err) {
1869 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1870 }
1871 }
1872
1873 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1874 {
1875 decode_attr(cct, attrs, "state", &state);
1876 decode_attr(cct, attrs, "full_marker", &full_marker);
1877 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1878 }
1879
1880 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1881 {
1882 encode_state_attr(attrs);
1883 full_marker.encode_attr(attrs);
1884 inc_marker.encode_attr(attrs);
1885 }
1886
1887 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1888 {
1889 ::encode(state, attrs["state"]);
1890 }
1891
1892 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1893 {
1894 ::encode(*this, attrs["full_marker"]);
1895 }
1896
1897 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1898 {
1899 ::encode(*this, attrs["inc_marker"]);
1900 }
1901
1902 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1903 RGWDataSyncEnv *sync_env;
1904 string oid;
1905 rgw_bucket_shard_sync_info *status;
1906
1907 map<string, bufferlist> attrs;
1908 public:
1909 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1910 const rgw_bucket_shard& bs,
1911 rgw_bucket_shard_sync_info *_status)
1912 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1913 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1914 status(_status) {}
1915 int operate() override;
1916 };
1917
1918 int RGWReadBucketSyncStatusCoroutine::operate()
1919 {
1920 reenter(this) {
1921 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1922 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1923 &attrs));
1924 if (retcode == -ENOENT) {
1925 *status = rgw_bucket_shard_sync_info();
1926 return set_cr_done();
1927 }
1928 if (retcode < 0) {
1929 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1930 return set_cr_error(retcode);
1931 }
1932 status->decode_from_attrs(sync_env->cct, attrs);
1933 return set_cr_done();
1934 }
1935 return 0;
1936 }
1937 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
1938 {
1939 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
1940 }
1941
1942 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1943 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
1944 delete iter->second;
1945 }
1946 delete error_logger;
1947 }
1948
1949
1950 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
1951 {
1952 JSONDecoder::decode_json("ID", id, obj);
1953 JSONDecoder::decode_json("DisplayName", display_name, obj);
1954 }
1955
1956 struct bucket_list_entry {
1957 bool delete_marker;
1958 rgw_obj_key key;
1959 bool is_latest;
1960 real_time mtime;
1961 string etag;
1962 uint64_t size;
1963 string storage_class;
1964 rgw_bucket_entry_owner owner;
1965 uint64_t versioned_epoch;
1966 string rgw_tag;
1967
1968 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1969
1970 void decode_json(JSONObj *obj) {
1971 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
1972 JSONDecoder::decode_json("Key", key.name, obj);
1973 JSONDecoder::decode_json("VersionId", key.instance, obj);
1974 JSONDecoder::decode_json("IsLatest", is_latest, obj);
1975 string mtime_str;
1976 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
1977
1978 struct tm t;
1979 uint32_t nsec;
1980 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
1981 ceph_timespec ts;
1982 ts.tv_sec = (uint64_t)internal_timegm(&t);
1983 ts.tv_nsec = nsec;
1984 mtime = real_clock::from_ceph_timespec(ts);
1985 }
1986 JSONDecoder::decode_json("ETag", etag, obj);
1987 JSONDecoder::decode_json("Size", size, obj);
1988 JSONDecoder::decode_json("StorageClass", storage_class, obj);
1989 JSONDecoder::decode_json("Owner", owner, obj);
1990 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
1991 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
1992 }
1993 };
1994
1995 struct bucket_list_result {
1996 string name;
1997 string prefix;
1998 string key_marker;
1999 string version_id_marker;
2000 int max_keys;
2001 bool is_truncated;
2002 list<bucket_list_entry> entries;
2003
2004 bucket_list_result() : max_keys(0), is_truncated(false) {}
2005
2006 void decode_json(JSONObj *obj) {
2007 JSONDecoder::decode_json("Name", name, obj);
2008 JSONDecoder::decode_json("Prefix", prefix, obj);
2009 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2010 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2011 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2012 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2013 JSONDecoder::decode_json("Entries", entries, obj);
2014 }
2015 };
2016
2017 class RGWListBucketShardCR: public RGWCoroutine {
2018 RGWDataSyncEnv *sync_env;
2019 const rgw_bucket_shard& bs;
2020 const string instance_key;
2021 rgw_obj_key marker_position;
2022
2023 bucket_list_result *result;
2024
2025 public:
2026 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2027 rgw_obj_key& _marker_position, bucket_list_result *_result)
2028 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2029 instance_key(bs.get_key()), marker_position(_marker_position),
2030 result(_result) {}
2031
2032 int operate() override {
2033 reenter(this) {
2034 yield {
2035 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2036 { "versions" , NULL },
2037 { "format" , "json" },
2038 { "objs-container" , "true" },
2039 { "key-marker" , marker_position.name.c_str() },
2040 { "version-id-marker" , marker_position.instance.c_str() },
2041 { NULL, NULL } };
2042 // don't include tenant in the url, it's already part of instance_key
2043 string p = string("/") + bs.bucket.name;
2044 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2045 }
2046 if (retcode < 0) {
2047 return set_cr_error(retcode);
2048 }
2049 return set_cr_done();
2050 }
2051 return 0;
2052 }
2053 };
2054
2055 class RGWListBucketIndexLogCR: public RGWCoroutine {
2056 RGWDataSyncEnv *sync_env;
2057 const string instance_key;
2058 string marker;
2059
2060 list<rgw_bi_log_entry> *result;
2061
2062 public:
2063 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2064 string& _marker, list<rgw_bi_log_entry> *_result)
2065 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2066 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2067
2068 int operate() override {
2069 reenter(this) {
2070 yield {
2071 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2072 { "format" , "json" },
2073 { "marker" , marker.c_str() },
2074 { "type", "bucket-index" },
2075 { NULL, NULL } };
2076
2077 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2078 }
2079 if (retcode < 0) {
2080 return set_cr_error(retcode);
2081 }
2082 return set_cr_done();
2083 }
2084 return 0;
2085 }
2086 };
2087
2088 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2089
2090 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2091 RGWDataSyncEnv *sync_env;
2092
2093 string marker_oid;
2094 rgw_bucket_shard_full_sync_marker sync_marker;
2095
2096 public:
2097 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2098 const string& _marker_oid,
2099 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2100 sync_env(_sync_env),
2101 marker_oid(_marker_oid),
2102 sync_marker(_marker) {}
2103
2104 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2105 sync_marker.position = new_marker;
2106 sync_marker.count = index_pos;
2107
2108 map<string, bufferlist> attrs;
2109 sync_marker.encode_attr(attrs);
2110
2111 RGWRados *store = sync_env->store;
2112
2113 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2114 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2115 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2116 attrs);
2117 }
2118 };
2119
2120 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2121 RGWDataSyncEnv *sync_env;
2122
2123 string marker_oid;
2124 rgw_bucket_shard_inc_sync_marker sync_marker;
2125
2126 map<rgw_obj_key, string> key_to_marker;
2127 map<string, rgw_obj_key> marker_to_key;
2128
2129 void handle_finish(const string& marker) override {
2130 map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2131 if (iter == marker_to_key.end()) {
2132 return;
2133 }
2134 key_to_marker.erase(iter->second);
2135 reset_need_retry(iter->second);
2136 marker_to_key.erase(iter);
2137 }
2138
2139 public:
2140 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2141 const string& _marker_oid,
2142 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2143 sync_env(_sync_env),
2144 marker_oid(_marker_oid),
2145 sync_marker(_marker) {}
2146
2147 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2148 sync_marker.position = new_marker;
2149
2150 map<string, bufferlist> attrs;
2151 sync_marker.encode_attr(attrs);
2152
2153 RGWRados *store = sync_env->store;
2154
2155 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2156 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2157 store,
2158 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2159 attrs);
2160 }
2161
2162 /*
2163 * create index from key -> <op, marker>, and from marker -> key
2164 * this is useful so that we can insure that we only have one
2165 * entry for any key that is used. This is needed when doing
2166 * incremenatl sync of data, and we don't want to run multiple
2167 * concurrent sync operations for the same bucket shard
2168 * Also, we should make sure that we don't run concurrent operations on the same key with
2169 * different ops.
2170 */
2171 bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2172 if (key_to_marker.find(key) != key_to_marker.end()) {
2173 set_need_retry(key);
2174 return false;
2175 }
2176 key_to_marker[key] = marker;
2177 marker_to_key[marker] = key;
2178 return true;
2179 }
2180
2181 bool can_do_op(const rgw_obj_key& key) {
2182 return (key_to_marker.find(key) == key_to_marker.end());
2183 }
2184 };
2185
2186 template <class T, class K>
2187 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2188 RGWDataSyncEnv *sync_env;
2189
2190 RGWBucketInfo *bucket_info;
2191 const rgw_bucket_shard& bs;
2192
2193 rgw_obj_key key;
2194 bool versioned;
2195 uint64_t versioned_epoch;
2196 rgw_bucket_entry_owner owner;
2197 real_time timestamp;
2198 RGWModifyOp op;
2199 RGWPendingState op_state;
2200
2201 T entry_marker;
2202 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2203
2204 int sync_status;
2205
2206 stringstream error_ss;
2207
2208 RGWDataSyncDebugLogger logger;
2209
2210 bool error_injection;
2211
2212 RGWDataSyncModule *data_sync_module;
2213
2214 rgw_zone_set zones_trace;
2215
2216 public:
2217 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2218 RGWBucketInfo *_bucket_info,
2219 const rgw_bucket_shard& bs,
2220 const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2221 real_time& _timestamp,
2222 const rgw_bucket_entry_owner& _owner,
2223 RGWModifyOp _op, RGWPendingState _op_state,
2224 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2225 sync_env(_sync_env),
2226 bucket_info(_bucket_info), bs(bs),
2227 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2228 owner(_owner),
2229 timestamp(_timestamp), op(_op),
2230 op_state(_op_state),
2231 entry_marker(_entry_marker),
2232 marker_tracker(_marker_tracker),
2233 sync_status(0){
2234 stringstream ss;
2235 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2236 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2237 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2238 set_status("init");
2239
2240 logger.init(sync_env, "Object", ss.str());
2241
2242 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2243
2244 data_sync_module = sync_env->sync_module->get_data_handler();
2245
2246 zones_trace = _zones_trace;
2247 zones_trace.insert(sync_env->store->get_zone().id);
2248 }
2249
2250 int operate() override {
2251 reenter(this) {
2252 /* skip entries that are not complete */
2253 if (op_state != CLS_RGW_STATE_COMPLETE) {
2254 goto done;
2255 }
2256 do {
2257 yield {
2258 marker_tracker->reset_need_retry(key);
2259 if (key.name.empty()) {
2260 /* shouldn't happen */
2261 set_status("skipping empty entry");
2262 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2263 goto done;
2264 }
2265 if (error_injection &&
2266 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2267 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2268 retcode = -EIO;
2269 } else if (op == CLS_RGW_OP_ADD ||
2270 op == CLS_RGW_OP_LINK_OLH) {
2271 if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2272 set_status("skipping entry");
2273 ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2274 goto done;
2275
2276 }
2277 set_status("syncing obj");
2278 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2279 logger.log("fetch");
2280 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2281 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2282 set_status("removing obj");
2283 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2284 versioned = true;
2285 }
2286 logger.log("remove");
2287 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
2288 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2289 logger.log("creating delete marker");
2290 set_status("creating delete marker");
2291 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2292 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
2293 }
2294 }
2295 } while (marker_tracker->need_retry(key));
2296 {
2297 stringstream ss;
2298 if (retcode >= 0) {
2299 ss << "done";
2300 } else {
2301 ss << "done, retcode=" << retcode;
2302 }
2303 logger.log(ss.str());
2304 }
2305
2306 if (retcode < 0 && retcode != -ENOENT) {
2307 set_status() << "failed to sync obj; retcode=" << retcode;
2308 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2309 << bucket_shard_str{bs} << "/" << key.name << dendl;
2310 error_ss << bucket_shard_str{bs} << "/" << key.name;
2311 sync_status = retcode;
2312 }
2313 if (!error_ss.str().empty()) {
2314 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
2315 }
2316 done:
2317 if (sync_status == 0) {
2318 /* update marker */
2319 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2320 yield call(marker_tracker->finish(entry_marker));
2321 sync_status = retcode;
2322 }
2323 if (sync_status < 0) {
2324 return set_cr_error(sync_status);
2325 }
2326 return set_cr_done();
2327 }
2328 return 0;
2329 }
2330 };
2331
2332 #define BUCKET_SYNC_SPAWN_WINDOW 20
2333
2334 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2335 RGWDataSyncEnv *sync_env;
2336 const rgw_bucket_shard& bs;
2337 RGWBucketInfo *bucket_info;
2338 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2339 bucket_list_result list_result;
2340 list<bucket_list_entry>::iterator entries_iter;
2341 rgw_bucket_shard_full_sync_marker& full_marker;
2342 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2343 rgw_obj_key list_marker;
2344 bucket_list_entry *entry{nullptr};
2345 RGWModifyOp op{CLS_RGW_OP_ADD};
2346
2347 int total_entries{0};
2348
2349 int sync_status{0};
2350
2351 const string& status_oid;
2352
2353 RGWDataSyncDebugLogger logger;
2354 rgw_zone_set zones_trace;
2355 public:
2356 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2357 RGWBucketInfo *_bucket_info,
2358 const std::string& status_oid,
2359 RGWContinuousLeaseCR *lease_cr,
2360 rgw_bucket_shard_full_sync_marker& _full_marker)
2361 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2362 bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2363 marker_tracker(sync_env, status_oid, full_marker),
2364 status_oid(status_oid) {
2365 logger.init(sync_env, "BucketFull", bs.get_key());
2366 zones_trace.insert(sync_env->source_zone);
2367 }
2368
2369 int operate() override;
2370 };
2371
2372 int RGWBucketShardFullSyncCR::operate()
2373 {
2374 int ret;
2375 reenter(this) {
2376 list_marker = full_marker.position;
2377
2378 total_entries = full_marker.count;
2379 do {
2380 if (!lease_cr->is_locked()) {
2381 drain_all();
2382 return set_cr_error(-ECANCELED);
2383 }
2384 set_status("listing remote bucket");
2385 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2386 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2387 &list_result));
2388 if (retcode < 0 && retcode != -ENOENT) {
2389 set_status("failed bucket listing, going down");
2390 drain_all();
2391 return set_cr_error(retcode);
2392 }
2393 entries_iter = list_result.entries.begin();
2394 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2395 if (!lease_cr->is_locked()) {
2396 drain_all();
2397 return set_cr_error(-ECANCELED);
2398 }
2399 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2400 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2401 entry = &(*entries_iter);
2402 total_entries++;
2403 list_marker = entries_iter->key;
2404 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2405 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2406 } else {
2407 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2408 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2409 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2410 false, /* versioned, only matters for object removal */
2411 entry->versioned_epoch, entry->mtime,
2412 entry->owner, op, CLS_RGW_STATE_COMPLETE,
2413 entry->key, &marker_tracker, zones_trace),
2414 false);
2415 }
2416 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2417 yield wait_for_child();
2418 bool again = true;
2419 while (again) {
2420 again = collect(&ret, nullptr);
2421 if (ret < 0) {
2422 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2423 sync_status = ret;
2424 /* we have reported this error */
2425 }
2426 }
2427 }
2428 }
2429 } while (list_result.is_truncated && sync_status == 0);
2430 set_status("done iterating over all objects");
2431 /* wait for all operations to complete */
2432 while (num_spawned()) {
2433 yield wait_for_child();
2434 bool again = true;
2435 while (again) {
2436 again = collect(&ret, nullptr);
2437 if (ret < 0) {
2438 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2439 sync_status = ret;
2440 /* we have reported this error */
2441 }
2442 }
2443 }
2444 if (!lease_cr->is_locked()) {
2445 return set_cr_error(-ECANCELED);
2446 }
2447 /* update sync state to incremental */
2448 if (sync_status == 0) {
2449 yield {
2450 rgw_bucket_shard_sync_info sync_status;
2451 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2452 map<string, bufferlist> attrs;
2453 sync_status.encode_state_attr(attrs);
2454 RGWRados *store = sync_env->store;
2455 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2456 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2457 attrs));
2458 }
2459 } else {
2460 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2461 }
2462 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2463 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2464 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2465 return set_cr_error(retcode);
2466 }
2467 if (sync_status < 0) {
2468 return set_cr_error(sync_status);
2469 }
2470 return set_cr_done();
2471 }
2472 return 0;
2473 }
2474
2475 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2476 RGWDataSyncEnv *sync_env;
2477 const rgw_bucket_shard& bs;
2478 RGWBucketInfo *bucket_info;
2479 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2480 list<rgw_bi_log_entry> list_result;
2481 list<rgw_bi_log_entry>::iterator entries_iter;
2482 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2483 rgw_bucket_shard_inc_sync_marker& inc_marker;
2484 rgw_obj_key key;
2485 rgw_bi_log_entry *entry{nullptr};
2486 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2487 bool updated_status{false};
2488 const string& status_oid;
2489 const string& zone_id;
2490 ceph::real_time sync_modify_time;
2491
2492 string cur_id;
2493
2494 RGWDataSyncDebugLogger logger;
2495
2496 int sync_status{0};
2497 bool syncstopped{false};
2498
2499 public:
2500 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2501 const rgw_bucket_shard& bs,
2502 RGWBucketInfo *_bucket_info,
2503 const std::string& status_oid,
2504 RGWContinuousLeaseCR *lease_cr,
2505 rgw_bucket_shard_inc_sync_marker& _inc_marker)
2506 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2507 bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2508 marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
2509 set_description() << "bucket shard incremental sync bucket="
2510 << bucket_shard_str{bs};
2511 set_status("init");
2512 logger.init(sync_env, "BucketInc", bs.get_key());
2513 }
2514
2515 int operate() override;
2516 };
2517
2518 int RGWBucketShardIncrementalSyncCR::operate()
2519 {
2520 int ret;
2521 reenter(this) {
2522 do {
2523 if (!lease_cr->is_locked()) {
2524 drain_all();
2525 return set_cr_error(-ECANCELED);
2526 }
2527 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
2528 set_status() << "listing bilog; position=" << inc_marker.position;
2529 yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2530 &list_result));
2531 if (retcode < 0 && retcode != -ENOENT ) {
2532 drain_all();
2533 if (!syncstopped) {
2534 /* wait for all operations to complete */
2535 return set_cr_error(retcode);
2536 } else {
2537 /* no need to retry */
2538 break;
2539 }
2540 }
2541 squash_map.clear();
2542 for (auto& e : list_result) {
2543 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
2544 ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
2545 sync_modify_time = e.timestamp;
2546 syncstopped = true;
2547 continue;
2548 }
2549 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
2550 ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
2551 sync_modify_time = e.timestamp;
2552 syncstopped = false;
2553 continue;
2554 }
2555 if (e.state != CLS_RGW_STATE_COMPLETE) {
2556 continue;
2557 }
2558 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2559 continue;
2560 }
2561 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2562 if (squash_entry.first <= e.timestamp) {
2563 squash_entry = make_pair<>(e.timestamp, e.op);
2564 }
2565 }
2566
2567 entries_iter = list_result.begin();
2568 for (; entries_iter != list_result.end(); ++entries_iter) {
2569 if (!lease_cr->is_locked()) {
2570 drain_all();
2571 return set_cr_error(-ECANCELED);
2572 }
2573 entry = &(*entries_iter);
2574 {
2575 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2576 if (p < 0) {
2577 cur_id = entry->id;
2578 } else {
2579 cur_id = entry->id.substr(p + 1);
2580 }
2581 }
2582 inc_marker.position = cur_id;
2583
2584 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2585 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl;
2586 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2587 continue;
2588 }
2589
2590 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2591 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2592 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2593 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2594 continue;
2595 }
2596
2597 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2598
2599 if (!key.ns.empty()) {
2600 set_status() << "skipping entry in namespace: " << entry->object;
2601 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2602 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2603 continue;
2604 }
2605
2606 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2607 if (entry->op == CLS_RGW_OP_CANCEL) {
2608 set_status() << "canceled operation, skipping";
2609 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2610 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2611 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2612 continue;
2613 }
2614 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2615 set_status() << "non-complete operation, skipping";
2616 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2617 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2618 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2619 continue;
2620 }
2621 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2622 set_status() << "redundant operation, skipping";
2623 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2624 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2625 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2626 continue;
2627 }
2628 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2629 set_status() << "squashed operation, skipping";
2630 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2631 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2632 /* not updating high marker though */
2633 continue;
2634 }
2635 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2636 << bucket_shard_str{bs} << "/" << key << dendl;
2637 updated_status = false;
2638 while (!marker_tracker.can_do_op(key)) {
2639 if (!updated_status) {
2640 set_status() << "can't do op, conflicting inflight operation";
2641 updated_status = true;
2642 }
2643 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2644 yield wait_for_child();
2645 bool again = true;
2646 while (again) {
2647 again = collect(&ret, nullptr);
2648 if (ret < 0) {
2649 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2650 sync_status = ret;
2651 /* we have reported this error */
2652 }
2653 }
2654 }
2655 if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2656 set_status() << "can't do op, sync already in progress for object";
2657 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2658 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2659 continue;
2660 }
2661 // yield {
2662 set_status() << "start object sync";
2663 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2664 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2665 } else {
2666 uint64_t versioned_epoch = 0;
2667 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2668 if (entry->ver.pool < 0) {
2669 versioned_epoch = entry->ver.epoch;
2670 }
2671 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2672 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2673 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2674 entry->is_versioned(), versioned_epoch,
2675 entry->timestamp, owner, entry->op, entry->state,
2676 cur_id, &marker_tracker, entry->zones_trace),
2677 false);
2678 }
2679 // }
2680 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2681 set_status() << "num_spawned() > spawn_window";
2682 yield wait_for_child();
2683 bool again = true;
2684 while (again) {
2685 again = collect(&ret, nullptr);
2686 if (ret < 0) {
2687 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2688 sync_status = ret;
2689 /* we have reported this error */
2690 }
2691 /* not waiting for child here */
2692 }
2693 }
2694 }
2695 } while (!list_result.empty() && sync_status == 0);
2696
2697 if (syncstopped) {
2698 drain_all();
2699
2700 yield {
2701 const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
2702 RGWRados *store = sync_env->store;
2703 call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
2704 }
2705 lease_cr->abort();
2706 return set_cr_done();
2707 }
2708
2709 while (num_spawned()) {
2710 yield wait_for_child();
2711 bool again = true;
2712 while (again) {
2713 again = collect(&ret, nullptr);
2714 if (ret < 0) {
2715 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2716 sync_status = ret;
2717 /* we have reported this error */
2718 }
2719 /* not waiting for child here */
2720 }
2721 }
2722
2723 yield call(marker_tracker.flush());
2724 if (retcode < 0) {
2725 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2726 return set_cr_error(retcode);
2727 }
2728 if (sync_status < 0) {
2729 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2730 }
2731
2732 /* wait for all operations to complete */
2733 drain_all();
2734
2735 if (sync_status < 0) {
2736 return set_cr_error(sync_status);
2737 }
2738
2739 return set_cr_done();
2740 }
2741 return 0;
2742 }
2743
2744 int RGWRunBucketSyncCoroutine::operate()
2745 {
2746 reenter(this) {
2747 yield {
2748 set_status("acquiring sync lock");
2749 auto store = sync_env->store;
2750 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
2751 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2752 "sync_lock",
2753 cct->_conf->rgw_sync_lease_period,
2754 this));
2755 lease_stack.reset(spawn(lease_cr.get(), false));
2756 }
2757 while (!lease_cr->is_locked()) {
2758 if (lease_cr->is_done()) {
2759 ldout(cct, 5) << "lease cr failed, done early" << dendl;
2760 set_status("lease lock failed, early abort");
2761 return set_cr_error(lease_cr->get_ret_status());
2762 }
2763 set_sleeping(true);
2764 yield;
2765 }
2766
2767 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2768 if (retcode < 0 && retcode != -ENOENT) {
2769 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2770 << bucket_shard_str{bs} << dendl;
2771 lease_cr->go_down();
2772 drain_all();
2773 return set_cr_error(retcode);
2774 }
2775
2776 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2777 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2778
2779 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2780 if (retcode == -ENOENT) {
2781 /* bucket instance info has not been synced in yet, fetch it now */
2782 yield {
2783 ldout(sync_env->cct, 10) << "no local info for bucket "
2784 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
2785 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
2786
2787 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
2788
2789 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
2790 string() /* no marker */,
2791 MDLOG_STATUS_COMPLETE,
2792 NULL /* no marker tracker */));
2793 }
2794 if (retcode < 0) {
2795 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
2796 lease_cr->go_down();
2797 drain_all();
2798 return set_cr_error(retcode);
2799 }
2800
2801 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2802 }
2803 if (retcode < 0) {
2804 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
2805 lease_cr->go_down();
2806 drain_all();
2807 return set_cr_error(retcode);
2808 }
2809
2810 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
2811 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
2812 if (retcode < 0) {
2813 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
2814 << " failed, retcode=" << retcode << dendl;
2815 lease_cr->go_down();
2816 drain_all();
2817 return set_cr_error(retcode);
2818 }
2819 }
2820
2821 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
2822 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
2823 status_oid, lease_cr.get(),
2824 sync_status.full_marker));
2825 if (retcode < 0) {
2826 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
2827 << " failed, retcode=" << retcode << dendl;
2828 lease_cr->go_down();
2829 drain_all();
2830 return set_cr_error(retcode);
2831 }
2832 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2833 }
2834
2835 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
2836 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
2837 status_oid, lease_cr.get(),
2838 sync_status.inc_marker));
2839 if (retcode < 0) {
2840 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
2841 << " failed, retcode=" << retcode << dendl;
2842 lease_cr->go_down();
2843 drain_all();
2844 return set_cr_error(retcode);
2845 }
2846 }
2847
2848 lease_cr->go_down();
2849 drain_all();
2850 return set_cr_done();
2851 }
2852
2853 return 0;
2854 }
2855
2856 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
2857 {
2858 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
2859 }
2860
2861 int RGWBucketSyncStatusManager::init()
2862 {
2863 conn = store->get_zone_conn_by_id(source_zone);
2864 if (!conn) {
2865 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2866 return -EINVAL;
2867 }
2868
2869 int ret = http_manager.set_threaded();
2870 if (ret < 0) {
2871 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
2872 return ret;
2873 }
2874
2875
2876 const string key = bucket.get_key();
2877
2878 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
2879 { NULL, NULL } };
2880
2881 string path = string("/admin/metadata/bucket.instance");
2882
2883 bucket_instance_meta_info result;
2884 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
2885 if (ret < 0) {
2886 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
2887 return ret;
2888 }
2889
2890 RGWBucketInfo& bi = result.data.get_bucket_info();
2891 num_shards = bi.num_shards;
2892
2893 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2894
2895 sync_module.reset(new RGWDefaultSyncModuleInstance());
2896
2897 int effective_num_shards = (num_shards ? num_shards : 1);
2898
2899 auto async_rados = store->get_async_rados();
2900
2901 for (int i = 0; i < effective_num_shards; i++) {
2902 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
2903 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
2904 if (ret < 0) {
2905 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
2906 return ret;
2907 }
2908 source_logs[i] = l;
2909 }
2910
2911 return 0;
2912 }
2913
2914 int RGWBucketSyncStatusManager::init_sync_status()
2915 {
2916 list<RGWCoroutinesStack *> stacks;
2917
2918 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2919 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2920 RGWRemoteBucketLog *l = iter->second;
2921 stack->call(l->init_sync_status_cr());
2922
2923 stacks.push_back(stack);
2924 }
2925
2926 return cr_mgr.run(stacks);
2927 }
2928
2929 int RGWBucketSyncStatusManager::read_sync_status()
2930 {
2931 list<RGWCoroutinesStack *> stacks;
2932
2933 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2934 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2935 RGWRemoteBucketLog *l = iter->second;
2936 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
2937
2938 stacks.push_back(stack);
2939 }
2940
2941 int ret = cr_mgr.run(stacks);
2942 if (ret < 0) {
2943 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2944 << bucket_str{bucket} << dendl;
2945 return ret;
2946 }
2947
2948 return 0;
2949 }
2950
2951 int RGWBucketSyncStatusManager::run()
2952 {
2953 list<RGWCoroutinesStack *> stacks;
2954
2955 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2956 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2957 RGWRemoteBucketLog *l = iter->second;
2958 stack->call(l->run_sync_cr());
2959
2960 stacks.push_back(stack);
2961 }
2962
2963 int ret = cr_mgr.run(stacks);
2964 if (ret < 0) {
2965 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2966 << bucket_str{bucket} << dendl;
2967 return ret;
2968 }
2969
2970 return 0;
2971 }
2972
2973 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
2974 const rgw_bucket_shard& bs)
2975 {
2976 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
2977 }
2978
2979 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
2980 static constexpr int max_concurrent_shards = 16;
2981 RGWRados *const store;
2982 RGWDataSyncEnv *const env;
2983 const int num_shards;
2984 rgw_bucket_shard bs;
2985
2986 using Vector = std::vector<rgw_bucket_shard_sync_info>;
2987 Vector::iterator i, end;
2988
2989 public:
2990 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
2991 int num_shards, const rgw_bucket& bucket,
2992 Vector *status)
2993 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
2994 store(store), env(env), num_shards(num_shards),
2995 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
2996 i(status->begin()), end(status->end())
2997 {}
2998
2999 bool spawn_next() override {
3000 if (i == end) {
3001 return false;
3002 }
3003 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3004 ++i;
3005 ++bs.shard_id;
3006 return true;
3007 }
3008 };
3009
3010 int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
3011 const rgw_bucket& bucket,
3012 std::vector<rgw_bucket_shard_sync_info> *status)
3013 {
3014 // read the bucket instance info for num_shards
3015 RGWObjectCtx ctx(store);
3016 RGWBucketInfo info;
3017 int ret = store->get_bucket_instance_info(ctx, bucket, info, nullptr, nullptr);
3018 if (ret < 0) {
3019 return ret;
3020 }
3021 status->clear();
3022 status->resize(std::max<size_t>(1, info.num_shards));
3023
3024 RGWDataSyncEnv env;
3025 RGWSyncModuleInstanceRef module; // null sync module
3026 env.init(store->ctx(), store, nullptr, store->get_async_rados(),
3027 nullptr, nullptr, source_zone, module, nullptr);
3028
3029 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
3030 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, info.num_shards,
3031 bucket, status));
3032 }
3033
3034
3035 // TODO: move into rgw_data_sync_trim.cc
3036 #undef dout_prefix
3037 #define dout_prefix (*_dout << "data trim: ")
3038
3039 namespace {
3040
3041 /// return the marker that it's safe to trim up to
3042 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3043 {
3044 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3045 }
3046
3047 /// comparison operator for take_min_markers()
3048 bool operator<(const rgw_data_sync_marker& lhs,
3049 const rgw_data_sync_marker& rhs)
3050 {
3051 // sort by stable marker
3052 return get_stable_marker(lhs) < get_stable_marker(rhs);
3053 }
3054
3055 /// populate the container starting with 'dest' with the minimum stable marker
3056 /// of each shard for all of the peers in [first, last)
3057 template <typename IterIn, typename IterOut>
3058 void take_min_markers(IterIn first, IterIn last, IterOut dest)
3059 {
3060 if (first == last) {
3061 return;
3062 }
3063 // initialize markers with the first peer's
3064 auto m = dest;
3065 for (auto &shard : first->sync_markers) {
3066 *m = std::move(shard.second);
3067 ++m;
3068 }
3069 // for remaining peers, replace with smaller markers
3070 for (auto p = first + 1; p != last; ++p) {
3071 m = dest;
3072 for (auto &shard : p->sync_markers) {
3073 if (shard.second < *m) {
3074 *m = std::move(shard.second);
3075 }
3076 ++m;
3077 }
3078 }
3079 }
3080
3081 } // anonymous namespace
3082
3083 class DataLogTrimCR : public RGWCoroutine {
3084 RGWRados *store;
3085 RGWHTTPManager *http;
3086 const int num_shards;
3087 const std::string& zone_id; //< my zone id
3088 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3089 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3090 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3091 int ret{0};
3092
3093 public:
3094 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3095 int num_shards, std::vector<std::string>& last_trim)
3096 : RGWCoroutine(store->ctx()), store(store), http(http),
3097 num_shards(num_shards),
3098 zone_id(store->get_zone().id),
3099 peer_status(store->zone_conn_map.size()),
3100 min_shard_markers(num_shards),
3101 last_trim(last_trim)
3102 {}
3103
3104 int operate() override;
3105 };
3106
3107 int DataLogTrimCR::operate()
3108 {
3109 reenter(this) {
3110 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3111 set_status("fetching sync status");
3112 yield {
3113 // query data sync status from each sync peer
3114 rgw_http_param_pair params[] = {
3115 { "type", "data" },
3116 { "status", nullptr },
3117 { "source-zone", zone_id.c_str() },
3118 { nullptr, nullptr }
3119 };
3120
3121 auto p = peer_status.begin();
3122 for (auto& c : store->zone_conn_map) {
3123 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3124 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3125 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3126 false);
3127 ++p;
3128 }
3129 }
3130
3131 // must get a successful reply from all peers to consider trimming
3132 ret = 0;
3133 while (ret == 0 && num_spawned() > 0) {
3134 yield wait_for_child();
3135 collect_next(&ret);
3136 }
3137 drain_all();
3138
3139 if (ret < 0) {
3140 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3141 return set_cr_error(ret);
3142 }
3143
3144 ldout(cct, 10) << "trimming log shards" << dendl;
3145 set_status("trimming log shards");
3146 yield {
3147 // determine the minimum marker for each shard
3148 take_min_markers(peer_status.begin(), peer_status.end(),
3149 min_shard_markers.begin());
3150
3151 for (int i = 0; i < num_shards; i++) {
3152 const auto& m = min_shard_markers[i];
3153 auto& stable = get_stable_marker(m);
3154 if (stable <= last_trim[i]) {
3155 continue;
3156 }
3157 ldout(cct, 10) << "trimming log shard " << i
3158 << " at marker=" << stable
3159 << " last_trim=" << last_trim[i] << dendl;
3160 using TrimCR = RGWSyncLogTrimCR;
3161 spawn(new TrimCR(store, store->data_log->get_oid(i),
3162 stable, &last_trim[i]),
3163 true);
3164 }
3165 }
3166 return set_cr_done();
3167 }
3168 return 0;
3169 }
3170
3171 class DataLogTrimPollCR : public RGWCoroutine {
3172 RGWRados *store;
3173 RGWHTTPManager *http;
3174 const int num_shards;
3175 const utime_t interval; //< polling interval
3176 const std::string lock_oid; //< use first data log shard for lock
3177 const std::string lock_cookie;
3178 std::vector<std::string> last_trim; //< last trimmed marker per shard
3179
3180 public:
3181 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3182 int num_shards, utime_t interval)
3183 : RGWCoroutine(store->ctx()), store(store), http(http),
3184 num_shards(num_shards), interval(interval),
3185 lock_oid(store->data_log->get_oid(0)),
3186 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3187 last_trim(num_shards)
3188 {}
3189
3190 int operate() override;
3191 };
3192
3193 int DataLogTrimPollCR::operate()
3194 {
3195 reenter(this) {
3196 for (;;) {
3197 set_status("sleeping");
3198 wait(interval);
3199
3200 // request a 'data_trim' lock that covers the entire wait interval to
3201 // prevent other gateways from attempting to trim for the duration
3202 set_status("acquiring trim lock");
3203 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3204 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3205 "data_trim", lock_cookie,
3206 interval.sec()));
3207 if (retcode < 0) {
3208 // if the lock is already held, go back to sleep and try again later
3209 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3210 << interval.sec() << "s" << dendl;
3211 continue;
3212 }
3213
3214 set_status("trimming");
3215 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3216
3217 // note that the lock is not released. this is intentional, as it avoids
3218 // duplicating this work in other gateways
3219 }
3220 }
3221 return 0;
3222 }
3223
3224 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3225 RGWHTTPManager *http,
3226 int num_shards, utime_t interval)
3227 {
3228 return new DataLogTrimPollCR(store, http, num_shards, interval);
3229 }