]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 #include <boost/utility/string_ref.hpp>
2
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_boost_asio_yield.h"
21 #include "rgw_sync_module.h"
22
23 #include "cls/lock/cls_lock_client.h"
24
25 #define dout_subsys ceph_subsys_rgw
26
27 #undef dout_prefix
28 #define dout_prefix (*_dout << "data sync: ")
29
30 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
31 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
32 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
33 static string bucket_status_oid_prefix = "bucket.sync-status";
34
35 class RGWSyncDebugLogger {
36 CephContext *cct;
37 string prefix;
38
39 bool ended;
40
41 public:
42 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
43 const string& sync_type, const string& sync_stage,
44 const string& resource, bool log_start = true) {
45 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
46 }
47 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
48 ~RGWSyncDebugLogger();
49
50 void init(CephContext *_cct, const string& source_zone,
51 const string& sync_type, const string& sync_stage,
52 const string& resource, bool log_start = true);
53 void log(const string& state);
54 void finish(int status);
55 };
56
57 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
58 const string& sync_type, const string& sync_section,
59 const string& resource, bool log_start)
60 {
61 cct = _cct;
62 ended = false;
63 string zone_str = source_zone.substr(0, 8);
64 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
65 if (log_start) {
66 log("start");
67 }
68 }
69
70 RGWSyncDebugLogger::~RGWSyncDebugLogger()
71 {
72 if (!ended) {
73 log("finish");
74 }
75 }
76
77 void RGWSyncDebugLogger::log(const string& state)
78 {
79 ldout(cct, 5) << prefix << ":" << state << dendl;
80 }
81
82 void RGWSyncDebugLogger::finish(int status)
83 {
84 ended = true;
85 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
86 }
87
88 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
89 public:
90 RGWDataSyncDebugLogger() {}
91 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
92 const string& resource, bool log_start = true) {
93 init(sync_env, sync_section, resource, log_start);
94 }
95 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
96 const string& resource, bool log_start = true) {
97 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
98 }
99
100 };
101
102 void rgw_datalog_info::decode_json(JSONObj *obj) {
103 JSONDecoder::decode_json("num_objects", num_shards, obj);
104 }
105
106 void rgw_datalog_entry::decode_json(JSONObj *obj) {
107 JSONDecoder::decode_json("key", key, obj);
108 utime_t ut;
109 JSONDecoder::decode_json("timestamp", ut, obj);
110 timestamp = ut.to_real_time();
111 }
112
113 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
114 JSONDecoder::decode_json("marker", marker, obj);
115 JSONDecoder::decode_json("truncated", truncated, obj);
116 JSONDecoder::decode_json("entries", entries, obj);
117 };
118
119 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
120 static constexpr int MAX_CONCURRENT_SHARDS = 16;
121
122 RGWDataSyncEnv *env;
123 const int num_shards;
124 int shard_id{0};;
125
126 map<uint32_t, rgw_data_sync_marker>& markers;
127
128 public:
129 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
130 map<uint32_t, rgw_data_sync_marker>& markers)
131 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
132 env(env), num_shards(num_shards), markers(markers)
133 {}
134 bool spawn_next() override;
135 };
136
137 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
138 {
139 if (shard_id >= num_shards) {
140 return false;
141 }
142 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
143 spawn(new CR(env->async_rados, env->store,
144 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
145 &markers[shard_id]),
146 false);
147 shard_id++;
148 return true;
149 }
150
151 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
152 RGWDataSyncEnv *sync_env;
153 rgw_data_sync_status *sync_status;
154
155 public:
156 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
157 rgw_data_sync_status *_status)
158 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
159 {}
160 int operate() override;
161 };
162
163 int RGWReadDataSyncStatusCoroutine::operate()
164 {
165 reenter(this) {
166 // read sync info
167 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
168 yield {
169 bool empty_on_enoent = false; // fail on ENOENT
170 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
171 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
172 &sync_status->sync_info, empty_on_enoent));
173 }
174 if (retcode < 0) {
175 ldout(sync_env->cct, 4) << "failed to read sync status info with "
176 << cpp_strerror(retcode) << dendl;
177 return set_cr_error(retcode);
178 }
179 // read shard markers
180 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
181 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
182 sync_status->sync_markers));
183 if (retcode < 0) {
184 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
185 << cpp_strerror(retcode) << dendl;
186 return set_cr_error(retcode);
187 }
188 return set_cr_done();
189 }
190 return 0;
191 }
192
193 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
194 RGWDataSyncEnv *sync_env;
195
196 RGWRESTReadResource *http_op;
197
198 int shard_id;
199 RGWDataChangesLogInfo *shard_info;
200
201 public:
202 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
203 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
204 sync_env(_sync_env),
205 http_op(NULL),
206 shard_id(_shard_id),
207 shard_info(_shard_info) {
208 }
209
210 ~RGWReadRemoteDataLogShardInfoCR() override {
211 if (http_op) {
212 http_op->put();
213 }
214 }
215
216 int operate() override {
217 reenter(this) {
218 yield {
219 char buf[16];
220 snprintf(buf, sizeof(buf), "%d", shard_id);
221 rgw_http_param_pair pairs[] = { { "type" , "data" },
222 { "id", buf },
223 { "info" , NULL },
224 { NULL, NULL } };
225
226 string p = "/admin/log/";
227
228 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
229
230 http_op->set_user_info((void *)stack);
231
232 int ret = http_op->aio_read();
233 if (ret < 0) {
234 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
235 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
236 return set_cr_error(ret);
237 }
238
239 return io_block(0);
240 }
241 yield {
242 int ret = http_op->wait(shard_info);
243 if (ret < 0) {
244 return set_cr_error(ret);
245 }
246 return set_cr_done();
247 }
248 }
249 return 0;
250 }
251 };
252
253 struct read_remote_data_log_response {
254 string marker;
255 bool truncated;
256 list<rgw_data_change_log_entry> entries;
257
258 read_remote_data_log_response() : truncated(false) {}
259
260 void decode_json(JSONObj *obj) {
261 JSONDecoder::decode_json("marker", marker, obj);
262 JSONDecoder::decode_json("truncated", truncated, obj);
263 JSONDecoder::decode_json("entries", entries, obj);
264 };
265 };
266
267 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
268 RGWDataSyncEnv *sync_env;
269
270 RGWRESTReadResource *http_op;
271
272 int shard_id;
273 string *pmarker;
274 list<rgw_data_change_log_entry> *entries;
275 bool *truncated;
276
277 read_remote_data_log_response response;
278
279 public:
280 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
281 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
282 sync_env(_sync_env),
283 http_op(NULL),
284 shard_id(_shard_id),
285 pmarker(_pmarker),
286 entries(_entries),
287 truncated(_truncated) {
288 }
289 ~RGWReadRemoteDataLogShardCR() override {
290 if (http_op) {
291 http_op->put();
292 }
293 }
294
295 int operate() override {
296 reenter(this) {
297 yield {
298 char buf[16];
299 snprintf(buf, sizeof(buf), "%d", shard_id);
300 rgw_http_param_pair pairs[] = { { "type" , "data" },
301 { "id", buf },
302 { "marker", pmarker->c_str() },
303 { "extra-info", "true" },
304 { NULL, NULL } };
305
306 string p = "/admin/log/";
307
308 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
309
310 http_op->set_user_info((void *)stack);
311
312 int ret = http_op->aio_read();
313 if (ret < 0) {
314 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
315 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
316 return set_cr_error(ret);
317 }
318
319 return io_block(0);
320 }
321 yield {
322 int ret = http_op->wait(&response);
323 if (ret < 0) {
324 return set_cr_error(ret);
325 }
326 entries->clear();
327 entries->swap(response.entries);
328 *pmarker = response.marker;
329 *truncated = response.truncated;
330 return set_cr_done();
331 }
332 }
333 return 0;
334 }
335 };
336
337 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
338 RGWDataSyncEnv *sync_env;
339
340 int num_shards;
341 map<int, RGWDataChangesLogInfo> *datalog_info;
342
343 int shard_id;
344 #define READ_DATALOG_MAX_CONCURRENT 10
345
346 public:
347 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
348 int _num_shards,
349 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
350 sync_env(_sync_env), num_shards(_num_shards),
351 datalog_info(_datalog_info), shard_id(0) {}
352 bool spawn_next() override;
353 };
354
355 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
356 if (shard_id >= num_shards) {
357 return false;
358 }
359 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
360 shard_id++;
361 return true;
362 }
363
364 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
365 RGWDataSyncEnv *sync_env;
366 RGWRESTReadResource *http_op;
367
368 int shard_id;
369 string marker;
370 uint32_t max_entries;
371 rgw_datalog_shard_data *result;
372
373 public:
374 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
375 const string& _marker, uint32_t _max_entries,
376 rgw_datalog_shard_data *_result)
377 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
378 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
379
380 int send_request() override {
381 RGWRESTConn *conn = sync_env->conn;
382 RGWRados *store = sync_env->store;
383
384 char buf[32];
385 snprintf(buf, sizeof(buf), "%d", shard_id);
386
387 char max_entries_buf[32];
388 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
389
390 const char *marker_key = (marker.empty() ? "" : "marker");
391
392 rgw_http_param_pair pairs[] = { { "type", "data" },
393 { "id", buf },
394 { "max-entries", max_entries_buf },
395 { marker_key, marker.c_str() },
396 { NULL, NULL } };
397
398 string p = "/admin/log/";
399
400 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
401 http_op->set_user_info((void *)stack);
402
403 int ret = http_op->aio_read();
404 if (ret < 0) {
405 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
406 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
407 http_op->put();
408 return ret;
409 }
410
411 return 0;
412 }
413
414 int request_complete() override {
415 int ret = http_op->wait(result);
416 http_op->put();
417 if (ret < 0 && ret != -ENOENT) {
418 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
419 return ret;
420 }
421 return 0;
422 }
423 };
424
425 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
426 RGWDataSyncEnv *sync_env;
427
428 map<int, string> shards;
429 int max_entries_per_shard;
430 map<int, rgw_datalog_shard_data> *result;
431
432 map<int, string>::iterator iter;
433 #define READ_DATALOG_MAX_CONCURRENT 10
434
435 public:
436 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
437 map<int, string>& _shards,
438 int _max_entries_per_shard,
439 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
440 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
441 result(_result) {
442 shards.swap(_shards);
443 iter = shards.begin();
444 }
445 bool spawn_next() override;
446 };
447
448 bool RGWListRemoteDataLogCR::spawn_next() {
449 if (iter == shards.end()) {
450 return false;
451 }
452
453 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
454 ++iter;
455 return true;
456 }
457
458 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
459 static constexpr uint32_t lock_duration = 30;
460 RGWDataSyncEnv *sync_env;
461 RGWRados *store;
462 const rgw_pool& pool;
463 const uint32_t num_shards;
464
465 string sync_status_oid;
466
467 string lock_name;
468 string cookie;
469 rgw_data_sync_status *status;
470 map<int, RGWDataChangesLogInfo> shards_info;
471 public:
472 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
473 rgw_data_sync_status *status)
474 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
475 pool(store->get_zone_params().log_pool),
476 num_shards(num_shards), status(status) {
477 lock_name = "sync_lock";
478
479 #define COOKIE_LEN 16
480 char buf[COOKIE_LEN + 1];
481
482 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
483 cookie = buf;
484
485 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
486 }
487
488 int operate() override {
489 int ret;
490 reenter(this) {
491 using LockCR = RGWSimpleRadosLockCR;
492 yield call(new LockCR(sync_env->async_rados, store,
493 rgw_raw_obj{pool, sync_status_oid},
494 lock_name, cookie, lock_duration));
495 if (retcode < 0) {
496 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
497 return set_cr_error(retcode);
498 }
499 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
500 yield call(new WriteInfoCR(sync_env->async_rados, store,
501 rgw_raw_obj{pool, sync_status_oid},
502 status->sync_info));
503 if (retcode < 0) {
504 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
505 return set_cr_error(retcode);
506 }
507
508 /* take lock again, we just recreated the object */
509 yield call(new LockCR(sync_env->async_rados, store,
510 rgw_raw_obj{pool, sync_status_oid},
511 lock_name, cookie, lock_duration));
512 if (retcode < 0) {
513 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
514 return set_cr_error(retcode);
515 }
516
517 /* fetch current position in logs */
518 yield {
519 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
520 if (!conn) {
521 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
522 return set_cr_error(-EIO);
523 }
524 for (uint32_t i = 0; i < num_shards; i++) {
525 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
526 }
527 }
528 while (collect(&ret, NULL)) {
529 if (ret < 0) {
530 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
531 return set_state(RGWCoroutine_Error);
532 }
533 yield;
534 }
535 yield {
536 for (uint32_t i = 0; i < num_shards; i++) {
537 RGWDataChangesLogInfo& info = shards_info[i];
538 auto& marker = status->sync_markers[i];
539 marker.next_step_marker = info.marker;
540 marker.timestamp = info.last_update;
541 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
542 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
543 spawn(new WriteMarkerCR(sync_env->async_rados, store,
544 rgw_raw_obj{pool, oid}, marker), true);
545 }
546 }
547 while (collect(&ret, NULL)) {
548 if (ret < 0) {
549 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
550 return set_state(RGWCoroutine_Error);
551 }
552 yield;
553 }
554
555 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
556 yield call(new WriteInfoCR(sync_env->async_rados, store,
557 rgw_raw_obj{pool, sync_status_oid},
558 status->sync_info));
559 if (retcode < 0) {
560 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
561 return set_cr_error(retcode);
562 }
563 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
564 rgw_raw_obj{pool, sync_status_oid},
565 lock_name, cookie));
566 return set_cr_done();
567 }
568 return 0;
569 }
570 };
571
572 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
573 {
574 rgw_http_param_pair pairs[] = { { "type", "data" },
575 { NULL, NULL } };
576
577 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
578 if (ret < 0) {
579 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
580 return ret;
581 }
582
583 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
584
585 return 0;
586 }
587
588 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
589 {
590 rgw_datalog_info log_info;
591 int ret = read_log_info(&log_info);
592 if (ret < 0) {
593 return ret;
594 }
595
596 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
597 }
598
599 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
600 {
601 if (store->is_meta_master()) {
602 return 0;
603 }
604
605 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
606 }
607
608 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
609 {
610 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
611
612 if (initialized) {
613 return 0;
614 }
615
616 int ret = http_manager.set_threaded();
617 if (ret < 0) {
618 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
619 return ret;
620 }
621
622 initialized = true;
623
624 return 0;
625 }
626
627 void RGWRemoteDataLog::finish()
628 {
629 stop();
630 }
631
632 int RGWRemoteDataLog::get_shard_info(int shard_id)
633 {
634 char buf[32];
635 snprintf(buf, sizeof(buf), "%d", shard_id);
636
637 rgw_http_param_pair pairs[] = { { "type", "data" },
638 { "id", buf },
639 { "info", NULL },
640 { NULL, NULL } };
641
642 RGWDataChangesLogInfo info;
643 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, info);
644 if (ret < 0) {
645 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
646 return ret;
647 }
648
649 ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " marker=" << info.marker << dendl;
650
651 return 0;
652 }
653
654 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
655 {
656 // cannot run concurrently with run_sync(), so run in a separate manager
657 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
658 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
659 int ret = http_manager.set_threaded();
660 if (ret < 0) {
661 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
662 return ret;
663 }
664 RGWDataSyncEnv sync_env_local = sync_env;
665 sync_env_local.http_manager = &http_manager;
666 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
667 http_manager.stop();
668 return ret;
669 }
670
671 int RGWRemoteDataLog::init_sync_status(int num_shards)
672 {
673 rgw_data_sync_status sync_status;
674 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
675 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
676 int ret = http_manager.set_threaded();
677 if (ret < 0) {
678 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
679 return ret;
680 }
681 RGWDataSyncEnv sync_env_local = sync_env;
682 sync_env_local.http_manager = &http_manager;
683 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, &sync_status));
684 http_manager.stop();
685 return ret;
686 }
687
688 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
689 {
690 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
691 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
692 return string(buf);
693 }
694
695 struct bucket_instance_meta_info {
696 string key;
697 obj_version ver;
698 utime_t mtime;
699 RGWBucketInstanceMetadataObject data;
700
701 bucket_instance_meta_info() {}
702
703 void decode_json(JSONObj *obj) {
704 JSONDecoder::decode_json("key", key, obj);
705 JSONDecoder::decode_json("ver", ver, obj);
706 JSONDecoder::decode_json("mtime", mtime, obj);
707 JSONDecoder::decode_json("data", data, obj);
708 }
709 };
710
711 class RGWListBucketIndexesCR : public RGWCoroutine {
712 RGWDataSyncEnv *sync_env;
713
714 RGWRados *store;
715
716 rgw_data_sync_status *sync_status;
717 int num_shards;
718
719 int req_ret;
720 int ret;
721
722 list<string> result;
723 list<string>::iterator iter;
724
725 RGWShardedOmapCRManager *entries_index;
726
727 string oid_prefix;
728
729 string path;
730 bucket_instance_meta_info meta_info;
731 string key;
732 string s;
733 int i;
734
735 bool failed;
736
737 public:
738 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
739 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
740 store(sync_env->store), sync_status(_sync_status),
741 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
742 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
743 path = "/admin/metadata/bucket.instance";
744 num_shards = sync_status->sync_info.num_shards;
745 }
746 ~RGWListBucketIndexesCR() override {
747 delete entries_index;
748 }
749
750 int operate() override {
751 reenter(this) {
752 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
753 store->get_zone_params().log_pool,
754 oid_prefix);
755 yield {
756 string entrypoint = string("/admin/metadata/bucket.instance");
757 /* FIXME: need a better scaling solution here, requires streaming output */
758 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
759 entrypoint, NULL, &result));
760 }
761 if (get_ret_status() < 0) {
762 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
763 return set_state(RGWCoroutine_Error);
764 }
765 for (iter = result.begin(); iter != result.end(); ++iter) {
766 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
767
768 key = *iter;
769
770 yield {
771 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
772 { NULL, NULL } };
773
774 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
775 }
776
777 num_shards = meta_info.data.get_bucket_info().num_shards;
778 if (num_shards > 0) {
779 for (i = 0; i < num_shards; i++) {
780 char buf[16];
781 snprintf(buf, sizeof(buf), ":%d", i);
782 s = key + buf;
783 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
784 }
785 } else {
786 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
787 }
788 }
789 yield {
790 if (!entries_index->finish()) {
791 failed = true;
792 }
793 }
794 if (!failed) {
795 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
796 int shard_id = (int)iter->first;
797 rgw_data_sync_marker& marker = iter->second;
798 marker.total_entries = entries_index->get_total_entries(shard_id);
799 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
800 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
801 marker), true);
802 }
803 } else {
804 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
805 EIO, string("failed to build bucket instances map")));
806 }
807 while (collect(&ret, NULL)) {
808 if (ret < 0) {
809 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
810 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
811 req_ret = ret;
812 }
813 yield;
814 }
815 drain_all();
816 if (req_ret < 0) {
817 yield return set_cr_error(req_ret);
818 }
819 yield return set_cr_done();
820 }
821 return 0;
822 }
823 };
824
825 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
826
827 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
828 RGWDataSyncEnv *sync_env;
829
830 string marker_oid;
831 rgw_data_sync_marker sync_marker;
832
833 map<string, string> key_to_marker;
834 map<string, string> marker_to_key;
835
836 void handle_finish(const string& marker) override {
837 map<string, string>::iterator iter = marker_to_key.find(marker);
838 if (iter == marker_to_key.end()) {
839 return;
840 }
841 key_to_marker.erase(iter->second);
842 reset_need_retry(iter->second);
843 marker_to_key.erase(iter);
844 }
845
846 public:
847 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
848 const string& _marker_oid,
849 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
850 sync_env(_sync_env),
851 marker_oid(_marker_oid),
852 sync_marker(_marker) {}
853
854 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
855 sync_marker.marker = new_marker;
856 sync_marker.pos = index_pos;
857
858 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
859 RGWRados *store = sync_env->store;
860
861 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
862 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
863 sync_marker);
864 }
865
866 /*
867 * create index from key -> marker, and from marker -> key
868 * this is useful so that we can insure that we only have one
869 * entry for any key that is used. This is needed when doing
870 * incremenatl sync of data, and we don't want to run multiple
871 * concurrent sync operations for the same bucket shard
872 */
873 bool index_key_to_marker(const string& key, const string& marker) {
874 if (key_to_marker.find(key) != key_to_marker.end()) {
875 set_need_retry(key);
876 return false;
877 }
878 key_to_marker[key] = marker;
879 marker_to_key[marker] = key;
880 return true;
881 }
882 };
883
884 // ostream wrappers to print buckets without copying strings
885 struct bucket_str {
886 const rgw_bucket& b;
887 bucket_str(const rgw_bucket& b) : b(b) {}
888 };
889 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
890 auto& b = rhs.b;
891 if (!b.tenant.empty()) {
892 out << b.tenant << '/';
893 }
894 out << b.name;
895 if (!b.bucket_id.empty()) {
896 out << ':' << b.bucket_id;
897 }
898 return out;
899 }
900
901 struct bucket_shard_str {
902 const rgw_bucket_shard& bs;
903 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
904 };
905 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
906 auto& bs = rhs.bs;
907 out << bucket_str{bs.bucket};
908 if (bs.shard_id >= 0) {
909 out << ':' << bs.shard_id;
910 }
911 return out;
912 }
913
914 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
915 RGWDataSyncEnv *sync_env;
916 rgw_bucket_shard bs;
917 RGWBucketInfo bucket_info;
918 rgw_bucket_shard_sync_info sync_status;
919 RGWMetaSyncEnv meta_sync_env;
920
921 RGWDataSyncDebugLogger logger;
922 const std::string status_oid;
923
924 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
925 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
926
927 public:
928 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
929 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
930 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
931 logger.init(sync_env, "Bucket", bs.get_key());
932 }
933 ~RGWRunBucketSyncCoroutine() override {
934 if (lease_cr) {
935 lease_cr->abort();
936 }
937 }
938
939 int operate() override;
940 };
941
942 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
943 RGWDataSyncEnv *sync_env;
944
945 string raw_key;
946 string entry_marker;
947
948 rgw_bucket_shard bs;
949
950 int sync_status;
951
952 bufferlist md_bl;
953
954 RGWDataSyncShardMarkerTrack *marker_tracker;
955
956 boost::intrusive_ptr<RGWOmapAppend> error_repo;
957 bool remove_from_repo;
958
959 set<string> keys;
960
961 public:
962 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
963 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
964 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
965 sync_env(_sync_env),
966 raw_key(_raw_key), entry_marker(_entry_marker),
967 sync_status(0),
968 marker_tracker(_marker_tracker),
969 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
970 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
971 }
972
973 int operate() override {
974 reenter(this) {
975 do {
976 yield {
977 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
978 &bs.bucket, &bs.shard_id);
979 if (ret < 0) {
980 return set_cr_error(-EIO);
981 }
982 if (marker_tracker) {
983 marker_tracker->reset_need_retry(raw_key);
984 }
985 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
986 }
987 } while (marker_tracker && marker_tracker->need_retry(raw_key));
988
989 sync_status = retcode;
990
991 if (sync_status == -ENOENT) {
992 // this was added when 'tenant/' was added to datalog entries, because
993 // preexisting tenant buckets could never sync and would stay in the
994 // error_repo forever
995 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
996 "for missing bucket " << raw_key << dendl;
997 sync_status = 0;
998 }
999
1000 if (sync_status < 0) {
1001 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1002 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1003 if (retcode < 0) {
1004 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
1005 }
1006 if (error_repo && !error_repo->append(raw_key)) {
1007 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
1008 }
1009 } else if (error_repo && remove_from_repo) {
1010 keys = {raw_key};
1011 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1012 if (retcode < 0) {
1013 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1014 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1015 }
1016 }
1017 /* FIXME: what do do in case of error */
1018 if (marker_tracker && !entry_marker.empty()) {
1019 /* update marker */
1020 yield call(marker_tracker->finish(entry_marker));
1021 }
1022 if (sync_status == 0) {
1023 sync_status = retcode;
1024 }
1025 if (sync_status < 0) {
1026 return set_cr_error(sync_status);
1027 }
1028 return set_cr_done();
1029 }
1030 return 0;
1031 }
1032 };
1033
1034 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1035 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1036
1037 class RGWDataSyncShardCR : public RGWCoroutine {
1038 RGWDataSyncEnv *sync_env;
1039
1040 rgw_pool pool;
1041
1042 uint32_t shard_id;
1043 rgw_data_sync_marker sync_marker;
1044
1045 map<string, bufferlist> entries;
1046 map<string, bufferlist>::iterator iter;
1047
1048 string oid;
1049
1050 RGWDataSyncShardMarkerTrack *marker_tracker;
1051
1052 list<rgw_data_change_log_entry> log_entries;
1053 list<rgw_data_change_log_entry>::iterator log_iter;
1054 bool truncated;
1055
1056 RGWDataChangesLogInfo shard_info;
1057 string datalog_marker;
1058
1059 Mutex inc_lock;
1060 Cond inc_cond;
1061
1062 boost::asio::coroutine incremental_cr;
1063 boost::asio::coroutine full_cr;
1064
1065
1066 set<string> modified_shards;
1067 set<string> current_modified;
1068
1069 set<string>::iterator modified_iter;
1070
1071 int total_entries;
1072
1073 int spawn_window;
1074
1075 bool *reset_backoff;
1076
1077 set<string> spawned_keys;
1078
1079 RGWContinuousLeaseCR *lease_cr;
1080 RGWCoroutinesStack *lease_stack;
1081 string status_oid;
1082
1083
1084 string error_oid;
1085 RGWOmapAppend *error_repo;
1086 map<string, bufferlist> error_entries;
1087 string error_marker;
1088 int max_error_entries;
1089
1090 ceph::real_time error_retry_time;
1091
1092 #define RETRY_BACKOFF_SECS_MIN 60
1093 #define RETRY_BACKOFF_SECS_DEFAULT 60
1094 #define RETRY_BACKOFF_SECS_MAX 600
1095 uint32_t retry_backoff_secs;
1096
1097 RGWDataSyncDebugLogger logger;
1098 public:
1099 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1100 rgw_pool& _pool,
1101 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1102 sync_env(_sync_env),
1103 pool(_pool),
1104 shard_id(_shard_id),
1105 sync_marker(_marker),
1106 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1107 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1108 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1109 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1110 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1111 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1112 error_oid = status_oid + ".retry";
1113
1114 logger.init(sync_env, "DataShard", status_oid);
1115 }
1116
1117 ~RGWDataSyncShardCR() override {
1118 delete marker_tracker;
1119 if (lease_cr) {
1120 lease_cr->abort();
1121 lease_cr->put();
1122 }
1123 if (error_repo) {
1124 error_repo->put();
1125 }
1126 }
1127
1128 void append_modified_shards(set<string>& keys) {
1129 Mutex::Locker l(inc_lock);
1130 modified_shards.insert(keys.begin(), keys.end());
1131 }
1132
1133 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1134 delete marker_tracker;
1135 marker_tracker = mt;
1136 }
1137
1138 int operate() override {
1139 int r;
1140 while (true) {
1141 switch (sync_marker.state) {
1142 case rgw_data_sync_marker::FullSync:
1143 r = full_sync();
1144 if (r < 0) {
1145 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1146 return set_cr_error(r);
1147 }
1148 return 0;
1149 case rgw_data_sync_marker::IncrementalSync:
1150 r = incremental_sync();
1151 if (r < 0) {
1152 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1153 return set_cr_error(r);
1154 }
1155 return 0;
1156 default:
1157 return set_cr_error(-EIO);
1158 }
1159 }
1160 return 0;
1161 }
1162
1163 void init_lease_cr() {
1164 set_status("acquiring sync lock");
1165 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1166 string lock_name = "sync_lock";
1167 if (lease_cr) {
1168 lease_cr->abort();
1169 lease_cr->put();
1170 }
1171 RGWRados *store = sync_env->store;
1172 lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store,
1173 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1174 lock_name, lock_duration, this);
1175 lease_cr->get();
1176 lease_stack = spawn(lease_cr, false);
1177 }
1178
1179 int full_sync() {
1180 #define OMAP_GET_MAX_ENTRIES 100
1181 int max_entries = OMAP_GET_MAX_ENTRIES;
1182 reenter(&full_cr) {
1183 yield init_lease_cr();
1184 while (!lease_cr->is_locked()) {
1185 if (lease_cr->is_done()) {
1186 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1187 set_status("lease lock failed, early abort");
1188 return set_cr_error(lease_cr->get_ret_status());
1189 }
1190 set_sleeping(true);
1191 yield;
1192 }
1193 logger.log("full sync");
1194 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1195 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1196 total_entries = sync_marker.pos;
1197 do {
1198 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1199 if (retcode < 0) {
1200 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1201 lease_cr->go_down();
1202 drain_all();
1203 return set_cr_error(retcode);
1204 }
1205 iter = entries.begin();
1206 for (; iter != entries.end(); ++iter) {
1207 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1208 total_entries++;
1209 if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1210 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1211 } else {
1212 // fetch remote and write locally
1213 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
1214 if (retcode < 0) {
1215 lease_cr->go_down();
1216 drain_all();
1217 return set_cr_error(retcode);
1218 }
1219 }
1220 sync_marker.marker = iter->first;
1221 }
1222 } while ((int)entries.size() == max_entries);
1223
1224 lease_cr->go_down();
1225 drain_all();
1226
1227 yield {
1228 /* update marker to reflect we're done with full sync */
1229 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1230 sync_marker.marker = sync_marker.next_step_marker;
1231 sync_marker.next_step_marker.clear();
1232 RGWRados *store = sync_env->store;
1233 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1234 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1235 sync_marker));
1236 }
1237 if (retcode < 0) {
1238 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1239 lease_cr->go_down();
1240 return set_cr_error(retcode);
1241 }
1242 }
1243 return 0;
1244 }
1245
1246 int incremental_sync() {
1247 reenter(&incremental_cr) {
1248 yield init_lease_cr();
1249 while (!lease_cr->is_locked()) {
1250 if (lease_cr->is_done()) {
1251 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1252 set_status("lease lock failed, early abort");
1253 return set_cr_error(lease_cr->get_ret_status());
1254 }
1255 set_sleeping(true);
1256 yield;
1257 }
1258 set_status("lease acquired");
1259 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1260 rgw_raw_obj(pool, error_oid),
1261 1 /* no buffer */);
1262 error_repo->get();
1263 spawn(error_repo, false);
1264 logger.log("inc sync");
1265 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1266 do {
1267 current_modified.clear();
1268 inc_lock.Lock();
1269 current_modified.swap(modified_shards);
1270 inc_lock.Unlock();
1271
1272 /* process out of band updates */
1273 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1274 yield {
1275 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1276 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1277 }
1278 }
1279
1280 /* process bucket shards that previously failed */
1281 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1282 error_marker, &error_entries,
1283 max_error_entries));
1284 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1285 iter = error_entries.begin();
1286 for (; iter != error_entries.end(); ++iter) {
1287 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
1288 spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
1289 error_marker = iter->first;
1290 }
1291 if ((int)error_entries.size() != max_error_entries) {
1292 if (error_marker.empty() && error_entries.empty()) {
1293 /* the retry repo is empty, we back off a bit before calling it again */
1294 retry_backoff_secs *= 2;
1295 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1296 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1297 }
1298 } else {
1299 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1300 }
1301 error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1302 error_marker.clear();
1303 }
1304
1305
1306 yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1307 if (retcode < 0) {
1308 ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1309 stop_spawned_services();
1310 drain_all();
1311 return set_cr_error(retcode);
1312 }
1313 datalog_marker = shard_info.marker;
1314 #define INCREMENTAL_MAX_ENTRIES 100
1315 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1316 if (datalog_marker > sync_marker.marker) {
1317 spawned_keys.clear();
1318 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1319 if (retcode < 0) {
1320 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1321 stop_spawned_services();
1322 drain_all();
1323 return set_cr_error(retcode);
1324 }
1325 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1326 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1327 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1328 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1329 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1330 continue;
1331 }
1332 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1333 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1334 } else {
1335 /*
1336 * don't spawn the same key more than once. We can do that as long as we don't yield
1337 */
1338 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1339 spawned_keys.insert(log_iter->entry.key);
1340 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1341 if (retcode < 0) {
1342 stop_spawned_services();
1343 drain_all();
1344 return set_cr_error(retcode);
1345 }
1346 }
1347 }
1348 }
1349 while ((int)num_spawned() > spawn_window) {
1350 set_status() << "num_spawned() > spawn_window";
1351 yield wait_for_child();
1352 int ret;
1353 while (collect(&ret, lease_stack)) {
1354 if (ret < 0) {
1355 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1356 /* we have reported this error */
1357 }
1358 /* not waiting for child here */
1359 }
1360 }
1361 }
1362 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1363 if (datalog_marker == sync_marker.marker) {
1364 #define INCREMENTAL_INTERVAL 20
1365 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1366 }
1367 } while (true);
1368 }
1369 return 0;
1370 }
1371 void stop_spawned_services() {
1372 lease_cr->go_down();
1373 if (error_repo) {
1374 error_repo->finish();
1375 error_repo->put();
1376 error_repo = NULL;
1377 }
1378 }
1379 };
1380
1381 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1382 RGWDataSyncEnv *sync_env;
1383
1384 rgw_pool pool;
1385
1386 uint32_t shard_id;
1387 rgw_data_sync_marker sync_marker;
1388
1389 public:
1390 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1391 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1392 sync_env(_sync_env),
1393 pool(_pool),
1394 shard_id(_shard_id),
1395 sync_marker(_marker) {
1396 }
1397
1398 RGWCoroutine *alloc_cr() override {
1399 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1400 }
1401
1402 RGWCoroutine *alloc_finisher_cr() override {
1403 RGWRados *store = sync_env->store;
1404 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1405 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1406 &sync_marker);
1407 }
1408
1409 void append_modified_shards(set<string>& keys) {
1410 Mutex::Locker l(cr_lock());
1411
1412 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1413 if (!cr) {
1414 return;
1415 }
1416
1417 cr->append_modified_shards(keys);
1418 }
1419 };
1420
1421 class RGWDataSyncCR : public RGWCoroutine {
1422 RGWDataSyncEnv *sync_env;
1423 uint32_t num_shards;
1424
1425 rgw_data_sync_status sync_status;
1426
1427 RGWDataSyncShardMarkerTrack *marker_tracker;
1428
1429 Mutex shard_crs_lock;
1430 map<int, RGWDataSyncShardControlCR *> shard_crs;
1431
1432 bool *reset_backoff;
1433
1434 RGWDataSyncDebugLogger logger;
1435 public:
1436 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1437 sync_env(_sync_env),
1438 num_shards(_num_shards),
1439 marker_tracker(NULL),
1440 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1441 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1442 }
1443
1444 ~RGWDataSyncCR() override {
1445 for (auto iter : shard_crs) {
1446 iter.second->put();
1447 }
1448 }
1449
1450 int operate() override {
1451 reenter(this) {
1452
1453 /* read sync status */
1454 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1455
1456 if (retcode == -ENOENT) {
1457 sync_status.sync_info.num_shards = num_shards;
1458 } else if (retcode < 0 && retcode != -ENOENT) {
1459 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1460 return set_cr_error(retcode);
1461 }
1462
1463 /* state: init status */
1464 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1465 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1466 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, &sync_status));
1467 if (retcode < 0) {
1468 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1469 return set_cr_error(retcode);
1470 }
1471 // sets state = StateBuildingFullSyncMaps
1472
1473 *reset_backoff = true;
1474 }
1475
1476 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1477 /* state: building full sync maps */
1478 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1479 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1480 if (retcode < 0) {
1481 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1482 return set_cr_error(retcode);
1483 }
1484 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1485
1486 /* update new state */
1487 yield call(set_sync_info_cr());
1488 if (retcode < 0) {
1489 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1490 return set_cr_error(retcode);
1491 }
1492
1493 *reset_backoff = true;
1494 }
1495
1496 yield {
1497 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1498 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1499 iter != sync_status.sync_markers.end(); ++iter) {
1500 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1501 iter->first, iter->second);
1502 cr->get();
1503 shard_crs_lock.Lock();
1504 shard_crs[iter->first] = cr;
1505 shard_crs_lock.Unlock();
1506 spawn(cr, true);
1507 }
1508 }
1509 }
1510
1511 return set_cr_done();
1512 }
1513 return 0;
1514 }
1515
1516 RGWCoroutine *set_sync_info_cr() {
1517 RGWRados *store = sync_env->store;
1518 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1519 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1520 sync_status.sync_info);
1521 }
1522
1523 void wakeup(int shard_id, set<string>& keys) {
1524 Mutex::Locker l(shard_crs_lock);
1525 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1526 if (iter == shard_crs.end()) {
1527 return;
1528 }
1529 iter->second->append_modified_shards(keys);
1530 iter->second->wakeup();
1531 }
1532 };
1533
1534 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1535 public:
1536 RGWDefaultDataSyncModule() {}
1537
1538 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override;
1539 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override;
1540 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1541 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override;
1542 };
1543
1544 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1545 RGWDefaultDataSyncModule data_handler;
1546 public:
1547 RGWDefaultSyncModuleInstance() {}
1548 RGWDataSyncModule *get_data_handler() override {
1549 return &data_handler;
1550 }
1551 };
1552
1553 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance)
1554 {
1555 instance->reset(new RGWDefaultSyncModuleInstance());
1556 return 0;
1557 }
1558
1559 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch)
1560 {
1561 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1562 key, versioned_epoch,
1563 true);
1564 }
1565
1566 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1567 real_time& mtime, bool versioned, uint64_t versioned_epoch)
1568 {
1569 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1570 bucket_info, key, versioned, versioned_epoch,
1571 NULL, NULL, false, &mtime);
1572 }
1573
1574 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1575 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch)
1576 {
1577 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1578 bucket_info, key, versioned, versioned_epoch,
1579 &owner.id, &owner.display_name, true, &mtime);
1580 }
1581
1582 class RGWDataSyncControlCR : public RGWBackoffControlCR
1583 {
1584 RGWDataSyncEnv *sync_env;
1585 uint32_t num_shards;
1586
1587 public:
1588 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, true),
1589 sync_env(_sync_env), num_shards(_num_shards) {
1590 }
1591
1592 RGWCoroutine *alloc_cr() override {
1593 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1594 }
1595
1596 void wakeup(int shard_id, set<string>& keys) {
1597 Mutex& m = cr_lock();
1598
1599 m.Lock();
1600 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1601 if (!cr) {
1602 m.Unlock();
1603 return;
1604 }
1605
1606 cr->get();
1607 m.Unlock();
1608
1609 if (cr) {
1610 cr->wakeup(shard_id, keys);
1611 }
1612
1613 cr->put();
1614 }
1615 };
1616
1617 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1618 RWLock::RLocker rl(lock);
1619 if (!data_sync_cr) {
1620 return;
1621 }
1622 data_sync_cr->wakeup(shard_id, keys);
1623 }
1624
1625 int RGWRemoteDataLog::run_sync(int num_shards)
1626 {
1627 lock.get_write();
1628 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1629 data_sync_cr->get(); // run() will drop a ref, so take another
1630 lock.unlock();
1631
1632 int r = run(data_sync_cr);
1633
1634 lock.get_write();
1635 data_sync_cr->put();
1636 data_sync_cr = NULL;
1637 lock.unlock();
1638
1639 if (r < 0) {
1640 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1641 return r;
1642 }
1643 return 0;
1644 }
1645
1646 int RGWDataSyncStatusManager::init()
1647 {
1648 auto zone_def_iter = store->zone_by_id.find(source_zone);
1649 if (zone_def_iter == store->zone_by_id.end()) {
1650 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1651 return -EIO;
1652 }
1653
1654 auto& zone_def = zone_def_iter->second;
1655
1656 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1657 return -ENOTSUP;
1658 }
1659
1660 RGWZoneParams& zone_params = store->get_zone_params();
1661
1662 sync_module = store->get_sync_module();
1663
1664 conn = store->get_zone_conn_by_id(source_zone);
1665 if (!conn) {
1666 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1667 return -EINVAL;
1668 }
1669
1670 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1671
1672 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1673 if (r < 0) {
1674 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1675 finalize();
1676 return r;
1677 }
1678
1679 rgw_datalog_info datalog_info;
1680 r = source_log.read_log_info(&datalog_info);
1681 if (r < 0) {
1682 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1683 finalize();
1684 return r;
1685 }
1686
1687 num_shards = datalog_info.num_shards;
1688
1689 for (int i = 0; i < num_shards; i++) {
1690 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1691 }
1692
1693 return 0;
1694 }
1695
1696 void RGWDataSyncStatusManager::finalize()
1697 {
1698 delete error_logger;
1699 error_logger = nullptr;
1700 }
1701
1702 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1703 {
1704 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1705 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1706
1707 return string(buf);
1708 }
1709
1710 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1711 {
1712 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1713 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1714
1715 return string(buf);
1716 }
1717
1718 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1719 const rgw_bucket& bucket, int shard_id,
1720 RGWSyncErrorLogger *_error_logger,
1721 RGWSyncModuleInstanceRef& _sync_module)
1722 {
1723 conn = _conn;
1724 source_zone = _source_zone;
1725 bs.bucket = bucket;
1726 bs.shard_id = shard_id;
1727
1728 sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
1729
1730 return 0;
1731 }
1732
1733 struct bucket_index_marker_info {
1734 string bucket_ver;
1735 string master_ver;
1736 string max_marker;
1737
1738 void decode_json(JSONObj *obj) {
1739 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
1740 JSONDecoder::decode_json("master_ver", master_ver, obj);
1741 JSONDecoder::decode_json("max_marker", max_marker, obj);
1742 }
1743 };
1744
1745 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1746 RGWDataSyncEnv *sync_env;
1747 const string instance_key;
1748
1749 bucket_index_marker_info *info;
1750
1751 public:
1752 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1753 const rgw_bucket_shard& bs,
1754 bucket_index_marker_info *_info)
1755 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1756 instance_key(bs.get_key()), info(_info) {}
1757
1758 int operate() override {
1759 reenter(this) {
1760 yield {
1761 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1762 { "bucket-instance", instance_key.c_str() },
1763 { "info" , NULL },
1764 { NULL, NULL } };
1765
1766 string p = "/admin/log/";
1767 call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1768 }
1769 if (retcode < 0) {
1770 return set_cr_error(retcode);
1771 }
1772 return set_cr_done();
1773 }
1774 return 0;
1775 }
1776 };
1777
1778 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1779 RGWDataSyncEnv *sync_env;
1780
1781 rgw_bucket_shard bs;
1782 const string sync_status_oid;
1783
1784 rgw_bucket_shard_sync_info& status;
1785
1786 bucket_index_marker_info info;
1787 public:
1788 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1789 const rgw_bucket_shard& bs,
1790 rgw_bucket_shard_sync_info& _status)
1791 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1792 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1793 status(_status)
1794 {}
1795
1796 int operate() override {
1797 reenter(this) {
1798 /* fetch current position in logs */
1799 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1800 if (retcode < 0 && retcode != -ENOENT) {
1801 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1802 return set_cr_error(retcode);
1803 }
1804 yield {
1805 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1806 status.inc_marker.position = info.max_marker;
1807 map<string, bufferlist> attrs;
1808 status.encode_all_attrs(attrs);
1809 auto store = sync_env->store;
1810 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
1811 rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
1812 attrs));
1813 }
1814 return set_cr_done();
1815 }
1816 return 0;
1817 }
1818 };
1819
1820 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1821 {
1822 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1823 }
1824
1825 template <class T>
1826 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1827 {
1828 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1829 if (iter == attrs.end()) {
1830 *val = T();
1831 return;
1832 }
1833
1834 bufferlist::iterator biter = iter->second.begin();
1835 try {
1836 ::decode(*val, biter);
1837 } catch (buffer::error& err) {
1838 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1839 }
1840 }
1841
1842 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1843 {
1844 decode_attr(cct, attrs, "state", &state);
1845 decode_attr(cct, attrs, "full_marker", &full_marker);
1846 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1847 }
1848
1849 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1850 {
1851 encode_state_attr(attrs);
1852 full_marker.encode_attr(attrs);
1853 inc_marker.encode_attr(attrs);
1854 }
1855
1856 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1857 {
1858 ::encode(state, attrs["state"]);
1859 }
1860
1861 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1862 {
1863 ::encode(*this, attrs["full_marker"]);
1864 }
1865
1866 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1867 {
1868 ::encode(*this, attrs["inc_marker"]);
1869 }
1870
1871 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1872 RGWDataSyncEnv *sync_env;
1873 string oid;
1874 rgw_bucket_shard_sync_info *status;
1875
1876 map<string, bufferlist> attrs;
1877 public:
1878 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1879 const rgw_bucket_shard& bs,
1880 rgw_bucket_shard_sync_info *_status)
1881 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1882 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1883 status(_status) {}
1884 int operate() override;
1885 };
1886
1887 int RGWReadBucketSyncStatusCoroutine::operate()
1888 {
1889 reenter(this) {
1890 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1891 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1892 &attrs));
1893 if (retcode == -ENOENT) {
1894 *status = rgw_bucket_shard_sync_info();
1895 return set_cr_done();
1896 }
1897 if (retcode < 0) {
1898 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1899 return set_cr_error(retcode);
1900 }
1901 status->decode_from_attrs(sync_env->cct, attrs);
1902 return set_cr_done();
1903 }
1904 return 0;
1905 }
1906 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
1907 {
1908 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
1909 }
1910
1911 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1912 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
1913 delete iter->second;
1914 }
1915 delete error_logger;
1916 }
1917
1918
1919 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
1920 {
1921 JSONDecoder::decode_json("ID", id, obj);
1922 JSONDecoder::decode_json("DisplayName", display_name, obj);
1923 }
1924
1925 struct bucket_list_entry {
1926 bool delete_marker;
1927 rgw_obj_key key;
1928 bool is_latest;
1929 real_time mtime;
1930 string etag;
1931 uint64_t size;
1932 string storage_class;
1933 rgw_bucket_entry_owner owner;
1934 uint64_t versioned_epoch;
1935 string rgw_tag;
1936
1937 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1938
1939 void decode_json(JSONObj *obj) {
1940 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
1941 JSONDecoder::decode_json("Key", key.name, obj);
1942 JSONDecoder::decode_json("VersionId", key.instance, obj);
1943 JSONDecoder::decode_json("IsLatest", is_latest, obj);
1944 string mtime_str;
1945 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
1946
1947 struct tm t;
1948 uint32_t nsec;
1949 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
1950 ceph_timespec ts;
1951 ts.tv_sec = (uint64_t)internal_timegm(&t);
1952 ts.tv_nsec = nsec;
1953 mtime = real_clock::from_ceph_timespec(ts);
1954 }
1955 JSONDecoder::decode_json("ETag", etag, obj);
1956 JSONDecoder::decode_json("Size", size, obj);
1957 JSONDecoder::decode_json("StorageClass", storage_class, obj);
1958 JSONDecoder::decode_json("Owner", owner, obj);
1959 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
1960 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
1961 }
1962 };
1963
1964 struct bucket_list_result {
1965 string name;
1966 string prefix;
1967 string key_marker;
1968 string version_id_marker;
1969 int max_keys;
1970 bool is_truncated;
1971 list<bucket_list_entry> entries;
1972
1973 bucket_list_result() : max_keys(0), is_truncated(false) {}
1974
1975 void decode_json(JSONObj *obj) {
1976 JSONDecoder::decode_json("Name", name, obj);
1977 JSONDecoder::decode_json("Prefix", prefix, obj);
1978 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
1979 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
1980 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
1981 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
1982 JSONDecoder::decode_json("Entries", entries, obj);
1983 }
1984 };
1985
1986 class RGWListBucketShardCR: public RGWCoroutine {
1987 RGWDataSyncEnv *sync_env;
1988 const rgw_bucket_shard& bs;
1989 const string instance_key;
1990 rgw_obj_key marker_position;
1991
1992 bucket_list_result *result;
1993
1994 public:
1995 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
1996 rgw_obj_key& _marker_position, bucket_list_result *_result)
1997 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1998 instance_key(bs.get_key()), marker_position(_marker_position),
1999 result(_result) {}
2000
2001 int operate() override {
2002 reenter(this) {
2003 yield {
2004 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2005 { "versions" , NULL },
2006 { "format" , "json" },
2007 { "objs-container" , "true" },
2008 { "key-marker" , marker_position.name.c_str() },
2009 { "version-id-marker" , marker_position.instance.c_str() },
2010 { NULL, NULL } };
2011 // don't include tenant in the url, it's already part of instance_key
2012 string p = string("/") + bs.bucket.name;
2013 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2014 }
2015 if (retcode < 0) {
2016 return set_cr_error(retcode);
2017 }
2018 return set_cr_done();
2019 }
2020 return 0;
2021 }
2022 };
2023
2024 class RGWListBucketIndexLogCR: public RGWCoroutine {
2025 RGWDataSyncEnv *sync_env;
2026 const string instance_key;
2027 string marker;
2028
2029 list<rgw_bi_log_entry> *result;
2030
2031 public:
2032 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2033 string& _marker, list<rgw_bi_log_entry> *_result)
2034 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2035 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2036
2037 int operate() override {
2038 reenter(this) {
2039 yield {
2040 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2041 { "format" , "json" },
2042 { "marker" , marker.c_str() },
2043 { "type", "bucket-index" },
2044 { NULL, NULL } };
2045
2046 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2047 }
2048 if (retcode < 0) {
2049 return set_cr_error(retcode);
2050 }
2051 return set_cr_done();
2052 }
2053 return 0;
2054 }
2055 };
2056
2057 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2058
2059 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2060 RGWDataSyncEnv *sync_env;
2061
2062 string marker_oid;
2063 rgw_bucket_shard_full_sync_marker sync_marker;
2064
2065 public:
2066 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2067 const string& _marker_oid,
2068 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2069 sync_env(_sync_env),
2070 marker_oid(_marker_oid),
2071 sync_marker(_marker) {}
2072
2073 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2074 sync_marker.position = new_marker;
2075 sync_marker.count = index_pos;
2076
2077 map<string, bufferlist> attrs;
2078 sync_marker.encode_attr(attrs);
2079
2080 RGWRados *store = sync_env->store;
2081
2082 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2083 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2084 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2085 attrs);
2086 }
2087 };
2088
2089 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2090 RGWDataSyncEnv *sync_env;
2091
2092 string marker_oid;
2093 rgw_bucket_shard_inc_sync_marker sync_marker;
2094
2095 map<rgw_obj_key, string> key_to_marker;
2096 map<string, rgw_obj_key> marker_to_key;
2097
2098 void handle_finish(const string& marker) override {
2099 map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2100 if (iter == marker_to_key.end()) {
2101 return;
2102 }
2103 key_to_marker.erase(iter->second);
2104 reset_need_retry(iter->second);
2105 marker_to_key.erase(iter);
2106 }
2107
2108 public:
2109 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2110 const string& _marker_oid,
2111 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2112 sync_env(_sync_env),
2113 marker_oid(_marker_oid),
2114 sync_marker(_marker) {}
2115
2116 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2117 sync_marker.position = new_marker;
2118
2119 map<string, bufferlist> attrs;
2120 sync_marker.encode_attr(attrs);
2121
2122 RGWRados *store = sync_env->store;
2123
2124 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2125 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2126 store,
2127 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2128 attrs);
2129 }
2130
2131 /*
2132 * create index from key -> <op, marker>, and from marker -> key
2133 * this is useful so that we can insure that we only have one
2134 * entry for any key that is used. This is needed when doing
2135 * incremenatl sync of data, and we don't want to run multiple
2136 * concurrent sync operations for the same bucket shard
2137 * Also, we should make sure that we don't run concurrent operations on the same key with
2138 * different ops.
2139 */
2140 bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2141 if (key_to_marker.find(key) != key_to_marker.end()) {
2142 set_need_retry(key);
2143 return false;
2144 }
2145 key_to_marker[key] = marker;
2146 marker_to_key[marker] = key;
2147 return true;
2148 }
2149
2150 bool can_do_op(const rgw_obj_key& key) {
2151 return (key_to_marker.find(key) == key_to_marker.end());
2152 }
2153 };
2154
2155 template <class T, class K>
2156 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2157 RGWDataSyncEnv *sync_env;
2158
2159 RGWBucketInfo *bucket_info;
2160 const rgw_bucket_shard& bs;
2161
2162 rgw_obj_key key;
2163 bool versioned;
2164 uint64_t versioned_epoch;
2165 rgw_bucket_entry_owner owner;
2166 real_time timestamp;
2167 RGWModifyOp op;
2168 RGWPendingState op_state;
2169
2170 T entry_marker;
2171 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2172
2173 int sync_status;
2174
2175 stringstream error_ss;
2176
2177 RGWDataSyncDebugLogger logger;
2178
2179 bool error_injection;
2180
2181 RGWDataSyncModule *data_sync_module;
2182
2183 public:
2184 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2185 RGWBucketInfo *_bucket_info,
2186 const rgw_bucket_shard& bs,
2187 const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2188 real_time& _timestamp,
2189 const rgw_bucket_entry_owner& _owner,
2190 RGWModifyOp _op, RGWPendingState _op_state,
2191 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
2192 sync_env(_sync_env),
2193 bucket_info(_bucket_info), bs(bs),
2194 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2195 owner(_owner),
2196 timestamp(_timestamp), op(_op),
2197 op_state(_op_state),
2198 entry_marker(_entry_marker),
2199 marker_tracker(_marker_tracker),
2200 sync_status(0) {
2201 stringstream ss;
2202 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2203 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2204 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2205 set_status("init");
2206
2207 logger.init(sync_env, "Object", ss.str());
2208
2209 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2210
2211 data_sync_module = sync_env->sync_module->get_data_handler();
2212 }
2213
2214 int operate() override {
2215 reenter(this) {
2216 /* skip entries that are not complete */
2217 if (op_state != CLS_RGW_STATE_COMPLETE) {
2218 goto done;
2219 }
2220 do {
2221 yield {
2222 marker_tracker->reset_need_retry(key);
2223 if (key.name.empty()) {
2224 /* shouldn't happen */
2225 set_status("skipping empty entry");
2226 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2227 goto done;
2228 }
2229 if (error_injection &&
2230 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2231 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2232 retcode = -EIO;
2233 } else if (op == CLS_RGW_OP_ADD ||
2234 op == CLS_RGW_OP_LINK_OLH) {
2235 if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2236 set_status("skipping entry");
2237 ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2238 goto done;
2239
2240 }
2241 set_status("syncing obj");
2242 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2243 logger.log("fetch");
2244 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch));
2245 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2246 set_status("removing obj");
2247 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2248 versioned = true;
2249 }
2250 logger.log("remove");
2251 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch));
2252 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2253 logger.log("creating delete marker");
2254 set_status("creating delete marker");
2255 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2256 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch));
2257 }
2258 }
2259 } while (marker_tracker->need_retry(key));
2260 {
2261 stringstream ss;
2262 if (retcode >= 0) {
2263 ss << "done";
2264 } else {
2265 ss << "done, retcode=" << retcode;
2266 }
2267 logger.log(ss.str());
2268 }
2269
2270 if (retcode < 0 && retcode != -ENOENT) {
2271 set_status() << "failed to sync obj; retcode=" << retcode;
2272 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2273 << bucket_shard_str{bs} << "/" << key.name << dendl;
2274 error_ss << bucket_shard_str{bs} << "/" << key.name;
2275 sync_status = retcode;
2276 }
2277 if (!error_ss.str().empty()) {
2278 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
2279 }
2280 done:
2281 if (sync_status == 0) {
2282 /* update marker */
2283 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2284 yield call(marker_tracker->finish(entry_marker));
2285 sync_status = retcode;
2286 }
2287 if (sync_status < 0) {
2288 return set_cr_error(sync_status);
2289 }
2290 return set_cr_done();
2291 }
2292 return 0;
2293 }
2294 };
2295
2296 #define BUCKET_SYNC_SPAWN_WINDOW 20
2297
2298 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2299 RGWDataSyncEnv *sync_env;
2300 const rgw_bucket_shard& bs;
2301 RGWBucketInfo *bucket_info;
2302 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2303 bucket_list_result list_result;
2304 list<bucket_list_entry>::iterator entries_iter;
2305 rgw_bucket_shard_full_sync_marker& full_marker;
2306 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2307 rgw_obj_key list_marker;
2308 bucket_list_entry *entry{nullptr};
2309 RGWModifyOp op{CLS_RGW_OP_ADD};
2310
2311 int total_entries{0};
2312
2313 int sync_status{0};
2314
2315 const string& status_oid;
2316
2317 RGWDataSyncDebugLogger logger;
2318 public:
2319 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2320 RGWBucketInfo *_bucket_info,
2321 const std::string& status_oid,
2322 RGWContinuousLeaseCR *lease_cr,
2323 rgw_bucket_shard_full_sync_marker& _full_marker)
2324 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2325 bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2326 marker_tracker(sync_env, status_oid, full_marker),
2327 status_oid(status_oid) {
2328 logger.init(sync_env, "BucketFull", bs.get_key());
2329 }
2330
2331 int operate() override;
2332 };
2333
2334 int RGWBucketShardFullSyncCR::operate()
2335 {
2336 int ret;
2337 reenter(this) {
2338 list_marker = full_marker.position;
2339
2340 total_entries = full_marker.count;
2341 do {
2342 if (!lease_cr->is_locked()) {
2343 drain_all();
2344 return set_cr_error(-ECANCELED);
2345 }
2346 set_status("listing remote bucket");
2347 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2348 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2349 &list_result));
2350 if (retcode < 0 && retcode != -ENOENT) {
2351 set_status("failed bucket listing, going down");
2352 drain_all();
2353 return set_cr_error(retcode);
2354 }
2355 entries_iter = list_result.entries.begin();
2356 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2357 if (!lease_cr->is_locked()) {
2358 drain_all();
2359 return set_cr_error(-ECANCELED);
2360 }
2361 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2362 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2363 entry = &(*entries_iter);
2364 total_entries++;
2365 list_marker = entries_iter->key;
2366 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2367 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2368 } else {
2369 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2370
2371 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2372 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2373 false, /* versioned, only matters for object removal */
2374 entry->versioned_epoch, entry->mtime,
2375 entry->owner, op, CLS_RGW_STATE_COMPLETE,
2376 entry->key, &marker_tracker),
2377 false);
2378 }
2379 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2380 yield wait_for_child();
2381 bool again = true;
2382 while (again) {
2383 again = collect(&ret, nullptr);
2384 if (ret < 0) {
2385 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2386 sync_status = ret;
2387 /* we have reported this error */
2388 }
2389 }
2390 }
2391 }
2392 } while (list_result.is_truncated && sync_status == 0);
2393 set_status("done iterating over all objects");
2394 /* wait for all operations to complete */
2395 while (num_spawned()) {
2396 yield wait_for_child();
2397 bool again = true;
2398 while (again) {
2399 again = collect(&ret, nullptr);
2400 if (ret < 0) {
2401 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2402 sync_status = ret;
2403 /* we have reported this error */
2404 }
2405 }
2406 }
2407 if (!lease_cr->is_locked()) {
2408 return set_cr_error(-ECANCELED);
2409 }
2410 /* update sync state to incremental */
2411 if (sync_status == 0) {
2412 yield {
2413 rgw_bucket_shard_sync_info sync_status;
2414 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2415 map<string, bufferlist> attrs;
2416 sync_status.encode_state_attr(attrs);
2417 RGWRados *store = sync_env->store;
2418 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2419 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2420 attrs));
2421 }
2422 } else {
2423 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2424 }
2425 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2426 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2427 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2428 return set_cr_error(retcode);
2429 }
2430 if (sync_status < 0) {
2431 return set_cr_error(sync_status);
2432 }
2433 return set_cr_done();
2434 }
2435 return 0;
2436 }
2437
2438 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2439 RGWDataSyncEnv *sync_env;
2440 const rgw_bucket_shard& bs;
2441 RGWBucketInfo *bucket_info;
2442 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2443 list<rgw_bi_log_entry> list_result;
2444 list<rgw_bi_log_entry>::iterator entries_iter;
2445 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2446 rgw_bucket_shard_inc_sync_marker& inc_marker;
2447 rgw_obj_key key;
2448 rgw_bi_log_entry *entry{nullptr};
2449 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2450 bool updated_status{false};
2451 const string& status_oid;
2452
2453 string cur_id;
2454
2455 RGWDataSyncDebugLogger logger;
2456
2457 int sync_status{0};
2458
2459 public:
2460 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2461 const rgw_bucket_shard& bs,
2462 RGWBucketInfo *_bucket_info,
2463 const std::string& status_oid,
2464 RGWContinuousLeaseCR *lease_cr,
2465 rgw_bucket_shard_inc_sync_marker& _inc_marker)
2466 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2467 bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2468 marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) {
2469 set_description() << "bucket shard incremental sync bucket="
2470 << bucket_shard_str{bs};
2471 set_status("init");
2472 logger.init(sync_env, "BucketInc", bs.get_key());
2473 }
2474
2475 int operate() override;
2476 };
2477
2478 int RGWBucketShardIncrementalSyncCR::operate()
2479 {
2480 int ret;
2481 reenter(this) {
2482 do {
2483 if (!lease_cr->is_locked()) {
2484 drain_all();
2485 return set_cr_error(-ECANCELED);
2486 }
2487 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
2488 set_status() << "listing bilog; position=" << inc_marker.position;
2489 yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2490 &list_result));
2491 if (retcode < 0 && retcode != -ENOENT) {
2492 /* wait for all operations to complete */
2493 drain_all();
2494 return set_cr_error(retcode);
2495 }
2496 squash_map.clear();
2497 for (auto& e : list_result) {
2498 if (e.state != CLS_RGW_STATE_COMPLETE) {
2499 continue;
2500 }
2501 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2502 if (squash_entry.first <= e.timestamp) {
2503 squash_entry = make_pair<>(e.timestamp, e.op);
2504 }
2505 }
2506 entries_iter = list_result.begin();
2507 for (; entries_iter != list_result.end(); ++entries_iter) {
2508 if (!lease_cr->is_locked()) {
2509 drain_all();
2510 return set_cr_error(-ECANCELED);
2511 }
2512 entry = &(*entries_iter);
2513 {
2514 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2515 if (p < 0) {
2516 cur_id = entry->id;
2517 } else {
2518 cur_id = entry->id.substr(p + 1);
2519 }
2520 }
2521 inc_marker.position = cur_id;
2522
2523 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2524 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2525 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2526 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2527 continue;
2528 }
2529
2530 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2531
2532 if (!key.ns.empty()) {
2533 set_status() << "skipping entry in namespace: " << entry->object;
2534 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2535 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2536 continue;
2537 }
2538
2539 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2540 if (entry->op == CLS_RGW_OP_CANCEL) {
2541 set_status() << "canceled operation, skipping";
2542 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2543 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2544 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2545 continue;
2546 }
2547 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2548 set_status() << "non-complete operation, skipping";
2549 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2550 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2551 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2552 continue;
2553 }
2554 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2555 set_status() << "squashed operation, skipping";
2556 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2557 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2558 /* not updating high marker though */
2559 continue;
2560 }
2561 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2562 << bucket_shard_str{bs} << "/" << key << dendl;
2563 updated_status = false;
2564 while (!marker_tracker.can_do_op(key)) {
2565 if (!updated_status) {
2566 set_status() << "can't do op, conflicting inflight operation";
2567 updated_status = true;
2568 }
2569 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2570 yield wait_for_child();
2571 bool again = true;
2572 while (again) {
2573 again = collect(&ret, nullptr);
2574 if (ret < 0) {
2575 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2576 sync_status = ret;
2577 /* we have reported this error */
2578 }
2579 }
2580 }
2581 if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2582 set_status() << "can't do op, sync already in progress for object";
2583 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2584 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2585 continue;
2586 }
2587 // yield {
2588 set_status() << "start object sync";
2589 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2590 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2591 } else {
2592 uint64_t versioned_epoch = 0;
2593 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2594 if (entry->ver.pool < 0) {
2595 versioned_epoch = entry->ver.epoch;
2596 }
2597 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2598 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2599 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2600 entry->is_versioned(), versioned_epoch,
2601 entry->timestamp, owner, entry->op, entry->state,
2602 cur_id, &marker_tracker),
2603 false);
2604 }
2605 // }
2606 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2607 set_status() << "num_spawned() > spawn_window";
2608 yield wait_for_child();
2609 bool again = true;
2610 while (again) {
2611 again = collect(&ret, nullptr);
2612 if (ret < 0) {
2613 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2614 sync_status = ret;
2615 /* we have reported this error */
2616 }
2617 /* not waiting for child here */
2618 }
2619 }
2620 }
2621 } while (!list_result.empty() && sync_status == 0);
2622
2623 while (num_spawned()) {
2624 yield wait_for_child();
2625 bool again = true;
2626 while (again) {
2627 again = collect(&ret, nullptr);
2628 if (ret < 0) {
2629 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2630 sync_status = ret;
2631 /* we have reported this error */
2632 }
2633 /* not waiting for child here */
2634 }
2635 }
2636
2637 yield call(marker_tracker.flush());
2638 if (retcode < 0) {
2639 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2640 return set_cr_error(retcode);
2641 }
2642 if (sync_status < 0) {
2643 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2644 }
2645
2646 /* wait for all operations to complete */
2647 drain_all();
2648
2649 if (sync_status < 0) {
2650 return set_cr_error(sync_status);
2651 }
2652
2653 return set_cr_done();
2654 }
2655 return 0;
2656 }
2657
2658 int RGWRunBucketSyncCoroutine::operate()
2659 {
2660 reenter(this) {
2661 yield {
2662 set_status("acquiring sync lock");
2663 auto store = sync_env->store;
2664 lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store,
2665 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2666 "sync_lock",
2667 cct->_conf->rgw_sync_lease_period,
2668 this);
2669 lease_stack = spawn(lease_cr.get(), false);
2670 }
2671 while (!lease_cr->is_locked()) {
2672 if (lease_cr->is_done()) {
2673 ldout(cct, 5) << "lease cr failed, done early" << dendl;
2674 set_status("lease lock failed, early abort");
2675 return set_cr_error(lease_cr->get_ret_status());
2676 }
2677 set_sleeping(true);
2678 yield;
2679 }
2680
2681 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2682 if (retcode < 0 && retcode != -ENOENT) {
2683 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2684 << bucket_shard_str{bs} << dendl;
2685 lease_cr->go_down();
2686 drain_all();
2687 return set_cr_error(retcode);
2688 }
2689
2690 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2691 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2692
2693 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2694 if (retcode == -ENOENT) {
2695 /* bucket instance info has not been synced in yet, fetch it now */
2696 yield {
2697 ldout(sync_env->cct, 10) << "no local info for bucket "
2698 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
2699 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
2700
2701 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
2702
2703 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
2704 string() /* no marker */,
2705 MDLOG_STATUS_COMPLETE,
2706 NULL /* no marker tracker */));
2707 }
2708 if (retcode < 0) {
2709 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
2710 lease_cr->go_down();
2711 drain_all();
2712 return set_cr_error(retcode);
2713 }
2714
2715 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2716 }
2717 if (retcode < 0) {
2718 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
2719 lease_cr->go_down();
2720 drain_all();
2721 return set_cr_error(retcode);
2722 }
2723
2724 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
2725 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
2726 if (retcode < 0) {
2727 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
2728 << " failed, retcode=" << retcode << dendl;
2729 lease_cr->go_down();
2730 drain_all();
2731 return set_cr_error(retcode);
2732 }
2733 }
2734
2735 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
2736 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
2737 status_oid, lease_cr.get(),
2738 sync_status.full_marker));
2739 if (retcode < 0) {
2740 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
2741 << " failed, retcode=" << retcode << dendl;
2742 lease_cr->go_down();
2743 drain_all();
2744 return set_cr_error(retcode);
2745 }
2746 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2747 }
2748
2749 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
2750 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
2751 status_oid, lease_cr.get(),
2752 sync_status.inc_marker));
2753 if (retcode < 0) {
2754 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
2755 << " failed, retcode=" << retcode << dendl;
2756 lease_cr->go_down();
2757 drain_all();
2758 return set_cr_error(retcode);
2759 }
2760 }
2761
2762 lease_cr->go_down();
2763 drain_all();
2764 return set_cr_done();
2765 }
2766
2767 return 0;
2768 }
2769
2770 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
2771 {
2772 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
2773 }
2774
2775 int RGWBucketSyncStatusManager::init()
2776 {
2777 conn = store->get_zone_conn_by_id(source_zone);
2778 if (!conn) {
2779 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2780 return -EINVAL;
2781 }
2782
2783 int ret = http_manager.set_threaded();
2784 if (ret < 0) {
2785 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
2786 return ret;
2787 }
2788
2789
2790 const string key = bucket.get_key();
2791
2792 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
2793 { NULL, NULL } };
2794
2795 string path = string("/admin/metadata/bucket.instance");
2796
2797 bucket_instance_meta_info result;
2798 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
2799 if (ret < 0) {
2800 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
2801 return ret;
2802 }
2803
2804 RGWBucketInfo& bi = result.data.get_bucket_info();
2805 num_shards = bi.num_shards;
2806
2807 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2808
2809 sync_module.reset(new RGWDefaultSyncModuleInstance());
2810
2811 int effective_num_shards = (num_shards ? num_shards : 1);
2812
2813 auto async_rados = store->get_async_rados();
2814
2815 for (int i = 0; i < effective_num_shards; i++) {
2816 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
2817 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
2818 if (ret < 0) {
2819 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
2820 return ret;
2821 }
2822 source_logs[i] = l;
2823 }
2824
2825 return 0;
2826 }
2827
2828 int RGWBucketSyncStatusManager::init_sync_status()
2829 {
2830 list<RGWCoroutinesStack *> stacks;
2831
2832 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2833 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2834 RGWRemoteBucketLog *l = iter->second;
2835 stack->call(l->init_sync_status_cr());
2836
2837 stacks.push_back(stack);
2838 }
2839
2840 return cr_mgr.run(stacks);
2841 }
2842
2843 int RGWBucketSyncStatusManager::read_sync_status()
2844 {
2845 list<RGWCoroutinesStack *> stacks;
2846
2847 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2848 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2849 RGWRemoteBucketLog *l = iter->second;
2850 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
2851
2852 stacks.push_back(stack);
2853 }
2854
2855 int ret = cr_mgr.run(stacks);
2856 if (ret < 0) {
2857 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2858 << bucket_str{bucket} << dendl;
2859 return ret;
2860 }
2861
2862 return 0;
2863 }
2864
2865 int RGWBucketSyncStatusManager::run()
2866 {
2867 list<RGWCoroutinesStack *> stacks;
2868
2869 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2870 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2871 RGWRemoteBucketLog *l = iter->second;
2872 stack->call(l->run_sync_cr());
2873
2874 stacks.push_back(stack);
2875 }
2876
2877 int ret = cr_mgr.run(stacks);
2878 if (ret < 0) {
2879 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2880 << bucket_str{bucket} << dendl;
2881 return ret;
2882 }
2883
2884 return 0;
2885 }
2886
2887 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
2888 const rgw_bucket_shard& bs)
2889 {
2890 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
2891 }
2892
2893
2894 // TODO: move into rgw_data_sync_trim.cc
2895 #undef dout_prefix
2896 #define dout_prefix (*_dout << "data trim: ")
2897
2898 namespace {
2899
2900 /// return the marker that it's safe to trim up to
2901 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
2902 {
2903 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2904 }
2905
2906 /// comparison operator for take_min_markers()
2907 bool operator<(const rgw_data_sync_marker& lhs,
2908 const rgw_data_sync_marker& rhs)
2909 {
2910 // sort by stable marker
2911 return get_stable_marker(lhs) < get_stable_marker(rhs);
2912 }
2913
2914 /// populate the container starting with 'dest' with the minimum stable marker
2915 /// of each shard for all of the peers in [first, last)
2916 template <typename IterIn, typename IterOut>
2917 void take_min_markers(IterIn first, IterIn last, IterOut dest)
2918 {
2919 if (first == last) {
2920 return;
2921 }
2922 // initialize markers with the first peer's
2923 auto m = dest;
2924 for (auto &shard : first->sync_markers) {
2925 *m = std::move(shard.second);
2926 ++m;
2927 }
2928 // for remaining peers, replace with smaller markers
2929 for (auto p = first + 1; p != last; ++p) {
2930 m = dest;
2931 for (auto &shard : p->sync_markers) {
2932 if (shard.second < *m) {
2933 *m = std::move(shard.second);
2934 }
2935 ++m;
2936 }
2937 }
2938 }
2939
2940 } // anonymous namespace
2941
2942 class DataLogTrimCR : public RGWCoroutine {
2943 RGWRados *store;
2944 RGWHTTPManager *http;
2945 const int num_shards;
2946 const std::string& zone_id; //< my zone id
2947 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
2948 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
2949 std::vector<std::string>& last_trim; //< last trimmed marker per shard
2950 int ret{0};
2951
2952 public:
2953 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
2954 int num_shards, std::vector<std::string>& last_trim)
2955 : RGWCoroutine(store->ctx()), store(store), http(http),
2956 num_shards(num_shards),
2957 zone_id(store->get_zone().id),
2958 peer_status(store->zone_conn_map.size()),
2959 min_shard_markers(num_shards),
2960 last_trim(last_trim)
2961 {}
2962
2963 int operate() override;
2964 };
2965
2966 int DataLogTrimCR::operate()
2967 {
2968 reenter(this) {
2969 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
2970 set_status("fetching sync status");
2971 yield {
2972 // query data sync status from each sync peer
2973 rgw_http_param_pair params[] = {
2974 { "type", "data" },
2975 { "status", nullptr },
2976 { "source-zone", zone_id.c_str() },
2977 { nullptr, nullptr }
2978 };
2979
2980 auto p = peer_status.begin();
2981 for (auto& c : store->zone_conn_map) {
2982 ldout(cct, 20) << "query sync status from " << c.first << dendl;
2983 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
2984 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
2985 false);
2986 ++p;
2987 }
2988 }
2989
2990 // must get a successful reply from all peers to consider trimming
2991 ret = 0;
2992 while (ret == 0 && num_spawned() > 0) {
2993 yield wait_for_child();
2994 collect_next(&ret);
2995 }
2996 drain_all();
2997
2998 if (ret < 0) {
2999 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3000 return set_cr_error(ret);
3001 }
3002
3003 ldout(cct, 10) << "trimming log shards" << dendl;
3004 set_status("trimming log shards");
3005 yield {
3006 // determine the minimum marker for each shard
3007 take_min_markers(peer_status.begin(), peer_status.end(),
3008 min_shard_markers.begin());
3009
3010 for (int i = 0; i < num_shards; i++) {
3011 const auto& m = min_shard_markers[i];
3012 auto& stable = get_stable_marker(m);
3013 if (stable <= last_trim[i]) {
3014 continue;
3015 }
3016 ldout(cct, 10) << "trimming log shard " << i
3017 << " at marker=" << stable
3018 << " last_trim=" << last_trim[i] << dendl;
3019 using TrimCR = RGWSyncLogTrimCR;
3020 spawn(new TrimCR(store, store->data_log->get_oid(i),
3021 stable, &last_trim[i]),
3022 true);
3023 }
3024 }
3025 return set_cr_done();
3026 }
3027 return 0;
3028 }
3029
3030 class DataLogTrimPollCR : public RGWCoroutine {
3031 RGWRados *store;
3032 RGWHTTPManager *http;
3033 const int num_shards;
3034 const utime_t interval; //< polling interval
3035 const std::string lock_oid; //< use first data log shard for lock
3036 const std::string lock_cookie;
3037 std::vector<std::string> last_trim; //< last trimmed marker per shard
3038
3039 public:
3040 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3041 int num_shards, utime_t interval)
3042 : RGWCoroutine(store->ctx()), store(store), http(http),
3043 num_shards(num_shards), interval(interval),
3044 lock_oid(store->data_log->get_oid(0)),
3045 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3046 last_trim(num_shards)
3047 {}
3048
3049 int operate() override;
3050 };
3051
3052 int DataLogTrimPollCR::operate()
3053 {
3054 reenter(this) {
3055 for (;;) {
3056 set_status("sleeping");
3057 wait(interval);
3058
3059 // request a 'data_trim' lock that covers the entire wait interval to
3060 // prevent other gateways from attempting to trim for the duration
3061 set_status("acquiring trim lock");
3062 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3063 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3064 "data_trim", lock_cookie,
3065 interval.sec()));
3066 if (retcode < 0) {
3067 // if the lock is already held, go back to sleep and try again later
3068 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3069 << interval.sec() << "s" << dendl;
3070 continue;
3071 }
3072
3073 set_status("trimming");
3074 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3075
3076 // note that the lock is not released. this is intentional, as it avoids
3077 // duplicating this work in other gateways
3078 }
3079 }
3080 return 0;
3081 }
3082
3083 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3084 RGWHTTPManager *http,
3085 int num_shards, utime_t interval)
3086 {
3087 return new DataLogTrimPollCR(store, http, num_shards, interval);
3088 }