]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
da4db761daa9719f585fdeffe9168bb445cbafed
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 #include <boost/utility/string_ref.hpp>
2
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
21
22 #include "cls/lock/cls_lock_client.h"
23
24 #include "auth/Crypto.h"
25
26 #include <boost/asio/yield.hpp>
27
28 #define dout_subsys ceph_subsys_rgw
29
30 #undef dout_prefix
31 #define dout_prefix (*_dout << "data sync: ")
32
33 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
34 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
35 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
36 static string bucket_status_oid_prefix = "bucket.sync-status";
37
38 class RGWSyncDebugLogger {
39 CephContext *cct;
40 string prefix;
41
42 bool ended;
43
44 public:
45 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
46 const string& sync_type, const string& sync_stage,
47 const string& resource, bool log_start = true) {
48 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
49 }
50 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
51 ~RGWSyncDebugLogger();
52
53 void init(CephContext *_cct, const string& source_zone,
54 const string& sync_type, const string& sync_stage,
55 const string& resource, bool log_start = true);
56 void log(const string& state);
57 void finish(int status);
58 };
59
60 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
61 const string& sync_type, const string& sync_section,
62 const string& resource, bool log_start)
63 {
64 cct = _cct;
65 ended = false;
66 string zone_str = source_zone.substr(0, 8);
67 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
68 if (log_start) {
69 log("start");
70 }
71 }
72
73 RGWSyncDebugLogger::~RGWSyncDebugLogger()
74 {
75 if (!ended) {
76 log("finish");
77 }
78 }
79
80 void RGWSyncDebugLogger::log(const string& state)
81 {
82 ldout(cct, 5) << prefix << ":" << state << dendl;
83 }
84
85 void RGWSyncDebugLogger::finish(int status)
86 {
87 ended = true;
88 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
89 }
90
91 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
92 public:
93 RGWDataSyncDebugLogger() {}
94 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
95 const string& resource, bool log_start = true) {
96 init(sync_env, sync_section, resource, log_start);
97 }
98 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
99 const string& resource, bool log_start = true) {
100 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
101 }
102
103 };
104
105 void rgw_datalog_info::decode_json(JSONObj *obj) {
106 JSONDecoder::decode_json("num_objects", num_shards, obj);
107 }
108
109 void rgw_datalog_entry::decode_json(JSONObj *obj) {
110 JSONDecoder::decode_json("key", key, obj);
111 utime_t ut;
112 JSONDecoder::decode_json("timestamp", ut, obj);
113 timestamp = ut.to_real_time();
114 }
115
116 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
117 JSONDecoder::decode_json("marker", marker, obj);
118 JSONDecoder::decode_json("truncated", truncated, obj);
119 JSONDecoder::decode_json("entries", entries, obj);
120 };
121
122 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
123 static constexpr int MAX_CONCURRENT_SHARDS = 16;
124
125 RGWDataSyncEnv *env;
126 const int num_shards;
127 int shard_id{0};;
128
129 map<uint32_t, rgw_data_sync_marker>& markers;
130
131 public:
132 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
133 map<uint32_t, rgw_data_sync_marker>& markers)
134 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
135 env(env), num_shards(num_shards), markers(markers)
136 {}
137 bool spawn_next() override;
138 };
139
140 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
141 {
142 if (shard_id >= num_shards) {
143 return false;
144 }
145 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
146 spawn(new CR(env->async_rados, env->store,
147 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
148 &markers[shard_id]),
149 false);
150 shard_id++;
151 return true;
152 }
153
154 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
155 RGWDataSyncEnv *sync_env;
156 rgw_data_sync_status *sync_status;
157
158 public:
159 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
160 rgw_data_sync_status *_status)
161 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
162 {}
163 int operate() override;
164 };
165
166 int RGWReadDataSyncStatusCoroutine::operate()
167 {
168 reenter(this) {
169 // read sync info
170 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
171 yield {
172 bool empty_on_enoent = false; // fail on ENOENT
173 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
174 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
175 &sync_status->sync_info, empty_on_enoent));
176 }
177 if (retcode < 0) {
178 ldout(sync_env->cct, 4) << "failed to read sync status info with "
179 << cpp_strerror(retcode) << dendl;
180 return set_cr_error(retcode);
181 }
182 // read shard markers
183 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
184 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
185 sync_status->sync_markers));
186 if (retcode < 0) {
187 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
188 << cpp_strerror(retcode) << dendl;
189 return set_cr_error(retcode);
190 }
191 return set_cr_done();
192 }
193 return 0;
194 }
195
196 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
197 RGWDataSyncEnv *sync_env;
198
199 RGWRESTReadResource *http_op;
200
201 int shard_id;
202 RGWDataChangesLogInfo *shard_info;
203
204 public:
205 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
206 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
207 sync_env(_sync_env),
208 http_op(NULL),
209 shard_id(_shard_id),
210 shard_info(_shard_info) {
211 }
212
213 ~RGWReadRemoteDataLogShardInfoCR() override {
214 if (http_op) {
215 http_op->put();
216 }
217 }
218
219 int operate() override {
220 reenter(this) {
221 yield {
222 char buf[16];
223 snprintf(buf, sizeof(buf), "%d", shard_id);
224 rgw_http_param_pair pairs[] = { { "type" , "data" },
225 { "id", buf },
226 { "info" , NULL },
227 { NULL, NULL } };
228
229 string p = "/admin/log/";
230
231 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
232
233 http_op->set_user_info((void *)stack);
234
235 int ret = http_op->aio_read();
236 if (ret < 0) {
237 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
238 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
239 return set_cr_error(ret);
240 }
241
242 return io_block(0);
243 }
244 yield {
245 int ret = http_op->wait(shard_info);
246 if (ret < 0) {
247 return set_cr_error(ret);
248 }
249 return set_cr_done();
250 }
251 }
252 return 0;
253 }
254 };
255
256 struct read_remote_data_log_response {
257 string marker;
258 bool truncated;
259 list<rgw_data_change_log_entry> entries;
260
261 read_remote_data_log_response() : truncated(false) {}
262
263 void decode_json(JSONObj *obj) {
264 JSONDecoder::decode_json("marker", marker, obj);
265 JSONDecoder::decode_json("truncated", truncated, obj);
266 JSONDecoder::decode_json("entries", entries, obj);
267 };
268 };
269
270 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
271 RGWDataSyncEnv *sync_env;
272
273 RGWRESTReadResource *http_op;
274
275 int shard_id;
276 string *pmarker;
277 list<rgw_data_change_log_entry> *entries;
278 bool *truncated;
279
280 read_remote_data_log_response response;
281
282 public:
283 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
284 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
285 sync_env(_sync_env),
286 http_op(NULL),
287 shard_id(_shard_id),
288 pmarker(_pmarker),
289 entries(_entries),
290 truncated(_truncated) {
291 }
292 ~RGWReadRemoteDataLogShardCR() override {
293 if (http_op) {
294 http_op->put();
295 }
296 }
297
298 int operate() override {
299 reenter(this) {
300 yield {
301 char buf[16];
302 snprintf(buf, sizeof(buf), "%d", shard_id);
303 rgw_http_param_pair pairs[] = { { "type" , "data" },
304 { "id", buf },
305 { "marker", pmarker->c_str() },
306 { "extra-info", "true" },
307 { NULL, NULL } };
308
309 string p = "/admin/log/";
310
311 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
312
313 http_op->set_user_info((void *)stack);
314
315 int ret = http_op->aio_read();
316 if (ret < 0) {
317 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
318 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
319 return set_cr_error(ret);
320 }
321
322 return io_block(0);
323 }
324 yield {
325 int ret = http_op->wait(&response);
326 if (ret < 0) {
327 return set_cr_error(ret);
328 }
329 entries->clear();
330 entries->swap(response.entries);
331 *pmarker = response.marker;
332 *truncated = response.truncated;
333 return set_cr_done();
334 }
335 }
336 return 0;
337 }
338 };
339
340 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
341 RGWDataSyncEnv *sync_env;
342
343 int num_shards;
344 map<int, RGWDataChangesLogInfo> *datalog_info;
345
346 int shard_id;
347 #define READ_DATALOG_MAX_CONCURRENT 10
348
349 public:
350 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
351 int _num_shards,
352 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
353 sync_env(_sync_env), num_shards(_num_shards),
354 datalog_info(_datalog_info), shard_id(0) {}
355 bool spawn_next() override;
356 };
357
358 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
359 if (shard_id >= num_shards) {
360 return false;
361 }
362 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
363 shard_id++;
364 return true;
365 }
366
367 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
368 RGWDataSyncEnv *sync_env;
369 RGWRESTReadResource *http_op;
370
371 int shard_id;
372 string marker;
373 uint32_t max_entries;
374 rgw_datalog_shard_data *result;
375
376 public:
377 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
378 const string& _marker, uint32_t _max_entries,
379 rgw_datalog_shard_data *_result)
380 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
381 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
382
383 int send_request() override {
384 RGWRESTConn *conn = sync_env->conn;
385 RGWRados *store = sync_env->store;
386
387 char buf[32];
388 snprintf(buf, sizeof(buf), "%d", shard_id);
389
390 char max_entries_buf[32];
391 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
392
393 const char *marker_key = (marker.empty() ? "" : "marker");
394
395 rgw_http_param_pair pairs[] = { { "type", "data" },
396 { "id", buf },
397 { "max-entries", max_entries_buf },
398 { marker_key, marker.c_str() },
399 { NULL, NULL } };
400
401 string p = "/admin/log/";
402
403 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
404 http_op->set_user_info((void *)stack);
405
406 int ret = http_op->aio_read();
407 if (ret < 0) {
408 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
409 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
410 http_op->put();
411 return ret;
412 }
413
414 return 0;
415 }
416
417 int request_complete() override {
418 int ret = http_op->wait(result);
419 http_op->put();
420 if (ret < 0 && ret != -ENOENT) {
421 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
422 return ret;
423 }
424 return 0;
425 }
426 };
427
428 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
429 RGWDataSyncEnv *sync_env;
430
431 map<int, string> shards;
432 int max_entries_per_shard;
433 map<int, rgw_datalog_shard_data> *result;
434
435 map<int, string>::iterator iter;
436 #define READ_DATALOG_MAX_CONCURRENT 10
437
438 public:
439 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
440 map<int, string>& _shards,
441 int _max_entries_per_shard,
442 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
443 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
444 result(_result) {
445 shards.swap(_shards);
446 iter = shards.begin();
447 }
448 bool spawn_next() override;
449 };
450
451 bool RGWListRemoteDataLogCR::spawn_next() {
452 if (iter == shards.end()) {
453 return false;
454 }
455
456 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
457 ++iter;
458 return true;
459 }
460
461 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
462 static constexpr uint32_t lock_duration = 30;
463 RGWDataSyncEnv *sync_env;
464 RGWRados *store;
465 const rgw_pool& pool;
466 const uint32_t num_shards;
467
468 string sync_status_oid;
469
470 string lock_name;
471 string cookie;
472 rgw_data_sync_status *status;
473 map<int, RGWDataChangesLogInfo> shards_info;
474 public:
475 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
476 uint64_t instance_id,
477 rgw_data_sync_status *status)
478 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
479 pool(store->get_zone_params().log_pool),
480 num_shards(num_shards), status(status) {
481 lock_name = "sync_lock";
482
483 status->sync_info.instance_id = instance_id;
484
485 #define COOKIE_LEN 16
486 char buf[COOKIE_LEN + 1];
487
488 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
489 cookie = buf;
490
491 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
492 }
493
494 int operate() override {
495 int ret;
496 reenter(this) {
497 using LockCR = RGWSimpleRadosLockCR;
498 yield call(new LockCR(sync_env->async_rados, store,
499 rgw_raw_obj{pool, sync_status_oid},
500 lock_name, cookie, lock_duration));
501 if (retcode < 0) {
502 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
503 return set_cr_error(retcode);
504 }
505 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
506 yield call(new WriteInfoCR(sync_env->async_rados, store,
507 rgw_raw_obj{pool, sync_status_oid},
508 status->sync_info));
509 if (retcode < 0) {
510 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
511 return set_cr_error(retcode);
512 }
513
514 /* take lock again, we just recreated the object */
515 yield call(new LockCR(sync_env->async_rados, store,
516 rgw_raw_obj{pool, sync_status_oid},
517 lock_name, cookie, lock_duration));
518 if (retcode < 0) {
519 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
520 return set_cr_error(retcode);
521 }
522
523 /* fetch current position in logs */
524 yield {
525 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
526 if (!conn) {
527 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
528 return set_cr_error(-EIO);
529 }
530 for (uint32_t i = 0; i < num_shards; i++) {
531 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
532 }
533 }
534 while (collect(&ret, NULL)) {
535 if (ret < 0) {
536 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
537 return set_state(RGWCoroutine_Error);
538 }
539 yield;
540 }
541 yield {
542 for (uint32_t i = 0; i < num_shards; i++) {
543 RGWDataChangesLogInfo& info = shards_info[i];
544 auto& marker = status->sync_markers[i];
545 marker.next_step_marker = info.marker;
546 marker.timestamp = info.last_update;
547 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
548 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
549 spawn(new WriteMarkerCR(sync_env->async_rados, store,
550 rgw_raw_obj{pool, oid}, marker), true);
551 }
552 }
553 while (collect(&ret, NULL)) {
554 if (ret < 0) {
555 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
556 return set_state(RGWCoroutine_Error);
557 }
558 yield;
559 }
560
561 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
562 yield call(new WriteInfoCR(sync_env->async_rados, store,
563 rgw_raw_obj{pool, sync_status_oid},
564 status->sync_info));
565 if (retcode < 0) {
566 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
567 return set_cr_error(retcode);
568 }
569 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
570 rgw_raw_obj{pool, sync_status_oid},
571 lock_name, cookie));
572 return set_cr_done();
573 }
574 return 0;
575 }
576 };
577
578 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
579 {
580 rgw_http_param_pair pairs[] = { { "type", "data" },
581 { NULL, NULL } };
582
583 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
584 if (ret < 0) {
585 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
586 return ret;
587 }
588
589 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
590
591 return 0;
592 }
593
594 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
595 {
596 rgw_datalog_info log_info;
597 int ret = read_log_info(&log_info);
598 if (ret < 0) {
599 return ret;
600 }
601
602 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
603 }
604
605 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
606 {
607 if (store->is_meta_master()) {
608 return 0;
609 }
610
611 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
612 }
613
614 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
615 {
616 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
617
618 if (initialized) {
619 return 0;
620 }
621
622 int ret = http_manager.set_threaded();
623 if (ret < 0) {
624 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
625 return ret;
626 }
627
628 initialized = true;
629
630 return 0;
631 }
632
633 void RGWRemoteDataLog::finish()
634 {
635 stop();
636 }
637
638 int RGWRemoteDataLog::get_shard_info(int shard_id)
639 {
640 char buf[32];
641 snprintf(buf, sizeof(buf), "%d", shard_id);
642
643 rgw_http_param_pair pairs[] = { { "type", "data" },
644 { "id", buf },
645 { "info", NULL },
646 { NULL, NULL } };
647
648 RGWDataChangesLogInfo info;
649 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, info);
650 if (ret < 0) {
651 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
652 return ret;
653 }
654
655 ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " marker=" << info.marker << dendl;
656
657 return 0;
658 }
659
660 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
661 {
662 // cannot run concurrently with run_sync(), so run in a separate manager
663 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
664 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
665 int ret = http_manager.set_threaded();
666 if (ret < 0) {
667 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
668 return ret;
669 }
670 RGWDataSyncEnv sync_env_local = sync_env;
671 sync_env_local.http_manager = &http_manager;
672 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
673 http_manager.stop();
674 return ret;
675 }
676
677 int RGWRemoteDataLog::init_sync_status(int num_shards)
678 {
679 rgw_data_sync_status sync_status;
680 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
681 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
682 int ret = http_manager.set_threaded();
683 if (ret < 0) {
684 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
685 return ret;
686 }
687 RGWDataSyncEnv sync_env_local = sync_env;
688 sync_env_local.http_manager = &http_manager;
689 uint64_t instance_id;
690 get_random_bytes((char *)&instance_id, sizeof(instance_id));
691 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
692 http_manager.stop();
693 return ret;
694 }
695
696 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
697 {
698 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
699 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
700 return string(buf);
701 }
702
703 struct bucket_instance_meta_info {
704 string key;
705 obj_version ver;
706 utime_t mtime;
707 RGWBucketInstanceMetadataObject data;
708
709 bucket_instance_meta_info() {}
710
711 void decode_json(JSONObj *obj) {
712 JSONDecoder::decode_json("key", key, obj);
713 JSONDecoder::decode_json("ver", ver, obj);
714 JSONDecoder::decode_json("mtime", mtime, obj);
715 JSONDecoder::decode_json("data", data, obj);
716 }
717 };
718
719 class RGWListBucketIndexesCR : public RGWCoroutine {
720 RGWDataSyncEnv *sync_env;
721
722 RGWRados *store;
723
724 rgw_data_sync_status *sync_status;
725 int num_shards;
726
727 int req_ret;
728 int ret;
729
730 list<string> result;
731 list<string>::iterator iter;
732
733 RGWShardedOmapCRManager *entries_index;
734
735 string oid_prefix;
736
737 string path;
738 bucket_instance_meta_info meta_info;
739 string key;
740 string s;
741 int i;
742
743 bool failed;
744
745 public:
746 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
747 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
748 store(sync_env->store), sync_status(_sync_status),
749 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
750 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
751 path = "/admin/metadata/bucket.instance";
752 num_shards = sync_status->sync_info.num_shards;
753 }
754 ~RGWListBucketIndexesCR() override {
755 delete entries_index;
756 }
757
758 int operate() override {
759 reenter(this) {
760 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
761 store->get_zone_params().log_pool,
762 oid_prefix);
763 yield {
764 string entrypoint = string("/admin/metadata/bucket.instance");
765 /* FIXME: need a better scaling solution here, requires streaming output */
766 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
767 entrypoint, NULL, &result));
768 }
769 if (get_ret_status() < 0) {
770 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
771 return set_state(RGWCoroutine_Error);
772 }
773 for (iter = result.begin(); iter != result.end(); ++iter) {
774 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
775
776 key = *iter;
777
778 yield {
779 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
780 { NULL, NULL } };
781
782 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
783 }
784
785 num_shards = meta_info.data.get_bucket_info().num_shards;
786 if (num_shards > 0) {
787 for (i = 0; i < num_shards; i++) {
788 char buf[16];
789 snprintf(buf, sizeof(buf), ":%d", i);
790 s = key + buf;
791 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
792 }
793 } else {
794 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
795 }
796 }
797 yield {
798 if (!entries_index->finish()) {
799 failed = true;
800 }
801 }
802 if (!failed) {
803 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
804 int shard_id = (int)iter->first;
805 rgw_data_sync_marker& marker = iter->second;
806 marker.total_entries = entries_index->get_total_entries(shard_id);
807 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
808 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
809 marker), true);
810 }
811 } else {
812 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
813 EIO, string("failed to build bucket instances map")));
814 }
815 while (collect(&ret, NULL)) {
816 if (ret < 0) {
817 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
818 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
819 req_ret = ret;
820 }
821 yield;
822 }
823 drain_all();
824 if (req_ret < 0) {
825 yield return set_cr_error(req_ret);
826 }
827 yield return set_cr_done();
828 }
829 return 0;
830 }
831 };
832
833 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
834
835 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
836 RGWDataSyncEnv *sync_env;
837
838 string marker_oid;
839 rgw_data_sync_marker sync_marker;
840
841 map<string, string> key_to_marker;
842 map<string, string> marker_to_key;
843
844 void handle_finish(const string& marker) override {
845 map<string, string>::iterator iter = marker_to_key.find(marker);
846 if (iter == marker_to_key.end()) {
847 return;
848 }
849 key_to_marker.erase(iter->second);
850 reset_need_retry(iter->second);
851 marker_to_key.erase(iter);
852 }
853
854 public:
855 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
856 const string& _marker_oid,
857 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
858 sync_env(_sync_env),
859 marker_oid(_marker_oid),
860 sync_marker(_marker) {}
861
862 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
863 sync_marker.marker = new_marker;
864 sync_marker.pos = index_pos;
865
866 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
867 RGWRados *store = sync_env->store;
868
869 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
870 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
871 sync_marker);
872 }
873
874 /*
875 * create index from key -> marker, and from marker -> key
876 * this is useful so that we can insure that we only have one
877 * entry for any key that is used. This is needed when doing
878 * incremenatl sync of data, and we don't want to run multiple
879 * concurrent sync operations for the same bucket shard
880 */
881 bool index_key_to_marker(const string& key, const string& marker) {
882 if (key_to_marker.find(key) != key_to_marker.end()) {
883 set_need_retry(key);
884 return false;
885 }
886 key_to_marker[key] = marker;
887 marker_to_key[marker] = key;
888 return true;
889 }
890 };
891
892 // ostream wrappers to print buckets without copying strings
893 struct bucket_str {
894 const rgw_bucket& b;
895 bucket_str(const rgw_bucket& b) : b(b) {}
896 };
897 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
898 auto& b = rhs.b;
899 if (!b.tenant.empty()) {
900 out << b.tenant << '/';
901 }
902 out << b.name;
903 if (!b.bucket_id.empty()) {
904 out << ':' << b.bucket_id;
905 }
906 return out;
907 }
908
909 struct bucket_shard_str {
910 const rgw_bucket_shard& bs;
911 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
912 };
913 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
914 auto& bs = rhs.bs;
915 out << bucket_str{bs.bucket};
916 if (bs.shard_id >= 0) {
917 out << ':' << bs.shard_id;
918 }
919 return out;
920 }
921
922 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
923 RGWDataSyncEnv *sync_env;
924 rgw_bucket_shard bs;
925 RGWBucketInfo bucket_info;
926 rgw_bucket_shard_sync_info sync_status;
927 RGWMetaSyncEnv meta_sync_env;
928
929 RGWDataSyncDebugLogger logger;
930 const std::string status_oid;
931
932 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
933 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
934
935 public:
936 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
937 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
938 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
939 logger.init(sync_env, "Bucket", bs.get_key());
940 }
941 ~RGWRunBucketSyncCoroutine() override {
942 if (lease_cr) {
943 lease_cr->abort();
944 }
945 }
946
947 int operate() override;
948 };
949
950 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
951 RGWDataSyncEnv *sync_env;
952
953 string raw_key;
954 string entry_marker;
955
956 rgw_bucket_shard bs;
957
958 int sync_status;
959
960 bufferlist md_bl;
961
962 RGWDataSyncShardMarkerTrack *marker_tracker;
963
964 boost::intrusive_ptr<RGWOmapAppend> error_repo;
965 bool remove_from_repo;
966
967 set<string> keys;
968
969 public:
970 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
971 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
972 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
973 sync_env(_sync_env),
974 raw_key(_raw_key), entry_marker(_entry_marker),
975 sync_status(0),
976 marker_tracker(_marker_tracker),
977 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
978 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
979 }
980
981 int operate() override {
982 reenter(this) {
983 do {
984 yield {
985 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
986 &bs.bucket, &bs.shard_id);
987 if (ret < 0) {
988 return set_cr_error(-EIO);
989 }
990 if (marker_tracker) {
991 marker_tracker->reset_need_retry(raw_key);
992 }
993 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
994 }
995 } while (marker_tracker && marker_tracker->need_retry(raw_key));
996
997 sync_status = retcode;
998
999 if (sync_status == -ENOENT) {
1000 // this was added when 'tenant/' was added to datalog entries, because
1001 // preexisting tenant buckets could never sync and would stay in the
1002 // error_repo forever
1003 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
1004 "for missing bucket " << raw_key << dendl;
1005 sync_status = 0;
1006 }
1007
1008 if (sync_status < 0) {
1009 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1010 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1011 if (retcode < 0) {
1012 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
1013 }
1014 if (error_repo && !error_repo->append(raw_key)) {
1015 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
1016 }
1017 } else if (error_repo && remove_from_repo) {
1018 keys = {raw_key};
1019 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1020 if (retcode < 0) {
1021 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1022 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1023 }
1024 }
1025 /* FIXME: what do do in case of error */
1026 if (marker_tracker && !entry_marker.empty()) {
1027 /* update marker */
1028 yield call(marker_tracker->finish(entry_marker));
1029 }
1030 if (sync_status == 0) {
1031 sync_status = retcode;
1032 }
1033 if (sync_status < 0) {
1034 return set_cr_error(sync_status);
1035 }
1036 return set_cr_done();
1037 }
1038 return 0;
1039 }
1040 };
1041
1042 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1043 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1044
1045 class RGWDataSyncShardCR : public RGWCoroutine {
1046 RGWDataSyncEnv *sync_env;
1047
1048 rgw_pool pool;
1049
1050 uint32_t shard_id;
1051 rgw_data_sync_marker sync_marker;
1052
1053 map<string, bufferlist> entries;
1054 map<string, bufferlist>::iterator iter;
1055
1056 string oid;
1057
1058 RGWDataSyncShardMarkerTrack *marker_tracker;
1059
1060 list<rgw_data_change_log_entry> log_entries;
1061 list<rgw_data_change_log_entry>::iterator log_iter;
1062 bool truncated;
1063
1064 RGWDataChangesLogInfo shard_info;
1065 string datalog_marker;
1066
1067 Mutex inc_lock;
1068 Cond inc_cond;
1069
1070 boost::asio::coroutine incremental_cr;
1071 boost::asio::coroutine full_cr;
1072
1073
1074 set<string> modified_shards;
1075 set<string> current_modified;
1076
1077 set<string>::iterator modified_iter;
1078
1079 int total_entries;
1080
1081 int spawn_window;
1082
1083 bool *reset_backoff;
1084
1085 set<string> spawned_keys;
1086
1087 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1088 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1089 string status_oid;
1090
1091
1092 string error_oid;
1093 RGWOmapAppend *error_repo;
1094 map<string, bufferlist> error_entries;
1095 string error_marker;
1096 int max_error_entries;
1097
1098 ceph::real_time error_retry_time;
1099
1100 #define RETRY_BACKOFF_SECS_MIN 60
1101 #define RETRY_BACKOFF_SECS_DEFAULT 60
1102 #define RETRY_BACKOFF_SECS_MAX 600
1103 uint32_t retry_backoff_secs;
1104
1105 RGWDataSyncDebugLogger logger;
1106 public:
1107 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1108 rgw_pool& _pool,
1109 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1110 sync_env(_sync_env),
1111 pool(_pool),
1112 shard_id(_shard_id),
1113 sync_marker(_marker),
1114 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1115 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1116 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1117 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1118 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1119 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1120 error_oid = status_oid + ".retry";
1121
1122 logger.init(sync_env, "DataShard", status_oid);
1123 }
1124
1125 ~RGWDataSyncShardCR() override {
1126 delete marker_tracker;
1127 if (lease_cr) {
1128 lease_cr->abort();
1129 }
1130 if (error_repo) {
1131 error_repo->put();
1132 }
1133 }
1134
1135 void append_modified_shards(set<string>& keys) {
1136 Mutex::Locker l(inc_lock);
1137 modified_shards.insert(keys.begin(), keys.end());
1138 }
1139
1140 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1141 delete marker_tracker;
1142 marker_tracker = mt;
1143 }
1144
1145 int operate() override {
1146 int r;
1147 while (true) {
1148 switch (sync_marker.state) {
1149 case rgw_data_sync_marker::FullSync:
1150 r = full_sync();
1151 if (r < 0) {
1152 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1153 return set_cr_error(r);
1154 }
1155 return 0;
1156 case rgw_data_sync_marker::IncrementalSync:
1157 r = incremental_sync();
1158 if (r < 0) {
1159 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1160 return set_cr_error(r);
1161 }
1162 return 0;
1163 default:
1164 return set_cr_error(-EIO);
1165 }
1166 }
1167 return 0;
1168 }
1169
1170 void init_lease_cr() {
1171 set_status("acquiring sync lock");
1172 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1173 string lock_name = "sync_lock";
1174 if (lease_cr) {
1175 lease_cr->abort();
1176 }
1177 RGWRados *store = sync_env->store;
1178 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1179 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1180 lock_name, lock_duration, this));
1181 lease_stack.reset(spawn(lease_cr.get(), false));
1182 }
1183
1184 int full_sync() {
1185 #define OMAP_GET_MAX_ENTRIES 100
1186 int max_entries = OMAP_GET_MAX_ENTRIES;
1187 reenter(&full_cr) {
1188 yield init_lease_cr();
1189 while (!lease_cr->is_locked()) {
1190 if (lease_cr->is_done()) {
1191 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1192 set_status("lease lock failed, early abort");
1193 return set_cr_error(lease_cr->get_ret_status());
1194 }
1195 set_sleeping(true);
1196 yield;
1197 }
1198 logger.log("full sync");
1199 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1200 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1201 total_entries = sync_marker.pos;
1202 do {
1203 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1204 if (retcode < 0) {
1205 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1206 lease_cr->go_down();
1207 drain_all();
1208 return set_cr_error(retcode);
1209 }
1210 iter = entries.begin();
1211 for (; iter != entries.end(); ++iter) {
1212 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1213 total_entries++;
1214 if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1215 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1216 } else {
1217 // fetch remote and write locally
1218 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
1219 if (retcode < 0) {
1220 lease_cr->go_down();
1221 drain_all();
1222 return set_cr_error(retcode);
1223 }
1224 }
1225 sync_marker.marker = iter->first;
1226 }
1227 } while ((int)entries.size() == max_entries);
1228
1229 lease_cr->go_down();
1230 drain_all();
1231
1232 yield {
1233 /* update marker to reflect we're done with full sync */
1234 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1235 sync_marker.marker = sync_marker.next_step_marker;
1236 sync_marker.next_step_marker.clear();
1237 RGWRados *store = sync_env->store;
1238 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1239 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1240 sync_marker));
1241 }
1242 if (retcode < 0) {
1243 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1244 lease_cr->go_down();
1245 return set_cr_error(retcode);
1246 }
1247 }
1248 return 0;
1249 }
1250
1251 int incremental_sync() {
1252 reenter(&incremental_cr) {
1253 yield init_lease_cr();
1254 while (!lease_cr->is_locked()) {
1255 if (lease_cr->is_done()) {
1256 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1257 set_status("lease lock failed, early abort");
1258 return set_cr_error(lease_cr->get_ret_status());
1259 }
1260 set_sleeping(true);
1261 yield;
1262 }
1263 set_status("lease acquired");
1264 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1265 rgw_raw_obj(pool, error_oid),
1266 1 /* no buffer */);
1267 error_repo->get();
1268 spawn(error_repo, false);
1269 logger.log("inc sync");
1270 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1271 do {
1272 current_modified.clear();
1273 inc_lock.Lock();
1274 current_modified.swap(modified_shards);
1275 inc_lock.Unlock();
1276
1277 /* process out of band updates */
1278 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1279 yield {
1280 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1281 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1282 }
1283 }
1284
1285 /* process bucket shards that previously failed */
1286 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1287 error_marker, &error_entries,
1288 max_error_entries));
1289 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1290 iter = error_entries.begin();
1291 for (; iter != error_entries.end(); ++iter) {
1292 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
1293 spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
1294 error_marker = iter->first;
1295 }
1296 if ((int)error_entries.size() != max_error_entries) {
1297 if (error_marker.empty() && error_entries.empty()) {
1298 /* the retry repo is empty, we back off a bit before calling it again */
1299 retry_backoff_secs *= 2;
1300 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1301 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1302 }
1303 } else {
1304 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1305 }
1306 error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1307 error_marker.clear();
1308 }
1309
1310
1311 yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1312 if (retcode < 0) {
1313 ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1314 stop_spawned_services();
1315 drain_all();
1316 return set_cr_error(retcode);
1317 }
1318 datalog_marker = shard_info.marker;
1319 #define INCREMENTAL_MAX_ENTRIES 100
1320 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1321 if (datalog_marker > sync_marker.marker) {
1322 spawned_keys.clear();
1323 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1324 if (retcode < 0) {
1325 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1326 stop_spawned_services();
1327 drain_all();
1328 return set_cr_error(retcode);
1329 }
1330 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1331 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1332 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1333 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1334 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1335 continue;
1336 }
1337 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1338 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1339 } else {
1340 /*
1341 * don't spawn the same key more than once. We can do that as long as we don't yield
1342 */
1343 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1344 spawned_keys.insert(log_iter->entry.key);
1345 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1346 if (retcode < 0) {
1347 stop_spawned_services();
1348 drain_all();
1349 return set_cr_error(retcode);
1350 }
1351 }
1352 }
1353 }
1354 while ((int)num_spawned() > spawn_window) {
1355 set_status() << "num_spawned() > spawn_window";
1356 yield wait_for_child();
1357 int ret;
1358 while (collect(&ret, lease_stack.get())) {
1359 if (ret < 0) {
1360 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1361 /* we have reported this error */
1362 }
1363 /* not waiting for child here */
1364 }
1365 }
1366 }
1367 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1368 if (datalog_marker == sync_marker.marker) {
1369 #define INCREMENTAL_INTERVAL 20
1370 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1371 }
1372 } while (true);
1373 }
1374 return 0;
1375 }
1376 void stop_spawned_services() {
1377 lease_cr->go_down();
1378 if (error_repo) {
1379 error_repo->finish();
1380 error_repo->put();
1381 error_repo = NULL;
1382 }
1383 }
1384 };
1385
1386 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1387 RGWDataSyncEnv *sync_env;
1388
1389 rgw_pool pool;
1390
1391 uint32_t shard_id;
1392 rgw_data_sync_marker sync_marker;
1393
1394 public:
1395 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1396 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1397 sync_env(_sync_env),
1398 pool(_pool),
1399 shard_id(_shard_id),
1400 sync_marker(_marker) {
1401 }
1402
1403 RGWCoroutine *alloc_cr() override {
1404 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1405 }
1406
1407 RGWCoroutine *alloc_finisher_cr() override {
1408 RGWRados *store = sync_env->store;
1409 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1410 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1411 &sync_marker);
1412 }
1413
1414 void append_modified_shards(set<string>& keys) {
1415 Mutex::Locker l(cr_lock());
1416
1417 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1418 if (!cr) {
1419 return;
1420 }
1421
1422 cr->append_modified_shards(keys);
1423 }
1424 };
1425
1426 class RGWDataSyncCR : public RGWCoroutine {
1427 RGWDataSyncEnv *sync_env;
1428 uint32_t num_shards;
1429
1430 rgw_data_sync_status sync_status;
1431
1432 RGWDataSyncShardMarkerTrack *marker_tracker;
1433
1434 Mutex shard_crs_lock;
1435 map<int, RGWDataSyncShardControlCR *> shard_crs;
1436
1437 bool *reset_backoff;
1438
1439 RGWDataSyncDebugLogger logger;
1440
1441 RGWDataSyncModule *data_sync_module{nullptr};
1442 public:
1443 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1444 sync_env(_sync_env),
1445 num_shards(_num_shards),
1446 marker_tracker(NULL),
1447 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1448 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1449
1450 }
1451
1452 ~RGWDataSyncCR() override {
1453 for (auto iter : shard_crs) {
1454 iter.second->put();
1455 }
1456 }
1457
1458 int operate() override {
1459 reenter(this) {
1460
1461 /* read sync status */
1462 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1463
1464 data_sync_module = sync_env->sync_module->get_data_handler();
1465
1466 if (retcode == -ENOENT) {
1467 sync_status.sync_info.num_shards = num_shards;
1468 } else if (retcode < 0 && retcode != -ENOENT) {
1469 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1470 return set_cr_error(retcode);
1471 }
1472
1473 /* state: init status */
1474 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1475 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1476 uint64_t instance_id;
1477 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1478 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1479 if (retcode < 0) {
1480 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1481 return set_cr_error(retcode);
1482 }
1483 // sets state = StateBuildingFullSyncMaps
1484
1485 *reset_backoff = true;
1486 }
1487
1488 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1489
1490 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1491 /* call sync module init here */
1492 yield call(data_sync_module->init_sync(sync_env));
1493 if (retcode < 0) {
1494 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1495 return set_cr_error(retcode);
1496 }
1497 /* state: building full sync maps */
1498 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1499 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1500 if (retcode < 0) {
1501 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1502 return set_cr_error(retcode);
1503 }
1504 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1505
1506 /* update new state */
1507 yield call(set_sync_info_cr());
1508 if (retcode < 0) {
1509 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1510 return set_cr_error(retcode);
1511 }
1512
1513 *reset_backoff = true;
1514 }
1515
1516 yield {
1517 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1518 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1519 iter != sync_status.sync_markers.end(); ++iter) {
1520 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1521 iter->first, iter->second);
1522 cr->get();
1523 shard_crs_lock.Lock();
1524 shard_crs[iter->first] = cr;
1525 shard_crs_lock.Unlock();
1526 spawn(cr, true);
1527 }
1528 }
1529 }
1530
1531 return set_cr_done();
1532 }
1533 return 0;
1534 }
1535
1536 RGWCoroutine *set_sync_info_cr() {
1537 RGWRados *store = sync_env->store;
1538 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1539 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1540 sync_status.sync_info);
1541 }
1542
1543 void wakeup(int shard_id, set<string>& keys) {
1544 Mutex::Locker l(shard_crs_lock);
1545 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1546 if (iter == shard_crs.end()) {
1547 return;
1548 }
1549 iter->second->append_modified_shards(keys);
1550 iter->second->wakeup();
1551 }
1552 };
1553
1554 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1555 public:
1556 RGWDefaultDataSyncModule() {}
1557
1558 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1559 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1560 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1561 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1562 };
1563
1564 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1565 RGWDefaultDataSyncModule data_handler;
1566 public:
1567 RGWDefaultSyncModuleInstance() {}
1568 RGWDataSyncModule *get_data_handler() override {
1569 return &data_handler;
1570 }
1571 };
1572
1573 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1574 {
1575 instance->reset(new RGWDefaultSyncModuleInstance());
1576 return 0;
1577 }
1578
1579 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1580 {
1581 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1582 key, versioned_epoch,
1583 true, zones_trace);
1584 }
1585
1586 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1587 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1588 {
1589 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1590 bucket_info, key, versioned, versioned_epoch,
1591 NULL, NULL, false, &mtime, zones_trace);
1592 }
1593
1594 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1595 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1596 {
1597 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1598 bucket_info, key, versioned, versioned_epoch,
1599 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1600 }
1601
1602 class RGWDataSyncControlCR : public RGWBackoffControlCR
1603 {
1604 RGWDataSyncEnv *sync_env;
1605 uint32_t num_shards;
1606
1607 public:
1608 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, true),
1609 sync_env(_sync_env), num_shards(_num_shards) {
1610 }
1611
1612 RGWCoroutine *alloc_cr() override {
1613 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1614 }
1615
1616 void wakeup(int shard_id, set<string>& keys) {
1617 Mutex& m = cr_lock();
1618
1619 m.Lock();
1620 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1621 if (!cr) {
1622 m.Unlock();
1623 return;
1624 }
1625
1626 cr->get();
1627 m.Unlock();
1628
1629 if (cr) {
1630 cr->wakeup(shard_id, keys);
1631 }
1632
1633 cr->put();
1634 }
1635 };
1636
1637 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1638 RWLock::RLocker rl(lock);
1639 if (!data_sync_cr) {
1640 return;
1641 }
1642 data_sync_cr->wakeup(shard_id, keys);
1643 }
1644
1645 int RGWRemoteDataLog::run_sync(int num_shards)
1646 {
1647 lock.get_write();
1648 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1649 data_sync_cr->get(); // run() will drop a ref, so take another
1650 lock.unlock();
1651
1652 int r = run(data_sync_cr);
1653
1654 lock.get_write();
1655 data_sync_cr->put();
1656 data_sync_cr = NULL;
1657 lock.unlock();
1658
1659 if (r < 0) {
1660 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1661 return r;
1662 }
1663 return 0;
1664 }
1665
1666 int RGWDataSyncStatusManager::init()
1667 {
1668 auto zone_def_iter = store->zone_by_id.find(source_zone);
1669 if (zone_def_iter == store->zone_by_id.end()) {
1670 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1671 return -EIO;
1672 }
1673
1674 auto& zone_def = zone_def_iter->second;
1675
1676 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1677 return -ENOTSUP;
1678 }
1679
1680 RGWZoneParams& zone_params = store->get_zone_params();
1681
1682 sync_module = store->get_sync_module();
1683
1684 conn = store->get_zone_conn_by_id(source_zone);
1685 if (!conn) {
1686 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1687 return -EINVAL;
1688 }
1689
1690 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1691
1692 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1693 if (r < 0) {
1694 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1695 finalize();
1696 return r;
1697 }
1698
1699 rgw_datalog_info datalog_info;
1700 r = source_log.read_log_info(&datalog_info);
1701 if (r < 0) {
1702 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1703 finalize();
1704 return r;
1705 }
1706
1707 num_shards = datalog_info.num_shards;
1708
1709 for (int i = 0; i < num_shards; i++) {
1710 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1711 }
1712
1713 return 0;
1714 }
1715
1716 void RGWDataSyncStatusManager::finalize()
1717 {
1718 delete error_logger;
1719 error_logger = nullptr;
1720 }
1721
1722 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1723 {
1724 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1725 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1726
1727 return string(buf);
1728 }
1729
1730 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1731 {
1732 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1733 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1734
1735 return string(buf);
1736 }
1737
1738 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1739 const rgw_bucket& bucket, int shard_id,
1740 RGWSyncErrorLogger *_error_logger,
1741 RGWSyncModuleInstanceRef& _sync_module)
1742 {
1743 conn = _conn;
1744 source_zone = _source_zone;
1745 bs.bucket = bucket;
1746 bs.shard_id = shard_id;
1747
1748 sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
1749
1750 return 0;
1751 }
1752
1753 struct bucket_index_marker_info {
1754 string bucket_ver;
1755 string master_ver;
1756 string max_marker;
1757
1758 void decode_json(JSONObj *obj) {
1759 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
1760 JSONDecoder::decode_json("master_ver", master_ver, obj);
1761 JSONDecoder::decode_json("max_marker", max_marker, obj);
1762 }
1763 };
1764
1765 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1766 RGWDataSyncEnv *sync_env;
1767 const string instance_key;
1768
1769 bucket_index_marker_info *info;
1770
1771 public:
1772 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1773 const rgw_bucket_shard& bs,
1774 bucket_index_marker_info *_info)
1775 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1776 instance_key(bs.get_key()), info(_info) {}
1777
1778 int operate() override {
1779 reenter(this) {
1780 yield {
1781 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1782 { "bucket-instance", instance_key.c_str() },
1783 { "info" , NULL },
1784 { NULL, NULL } };
1785
1786 string p = "/admin/log/";
1787 call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1788 }
1789 if (retcode < 0) {
1790 return set_cr_error(retcode);
1791 }
1792 return set_cr_done();
1793 }
1794 return 0;
1795 }
1796 };
1797
1798 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1799 RGWDataSyncEnv *sync_env;
1800
1801 rgw_bucket_shard bs;
1802 const string sync_status_oid;
1803
1804 rgw_bucket_shard_sync_info& status;
1805
1806 bucket_index_marker_info info;
1807 public:
1808 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1809 const rgw_bucket_shard& bs,
1810 rgw_bucket_shard_sync_info& _status)
1811 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1812 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1813 status(_status)
1814 {}
1815
1816 int operate() override {
1817 reenter(this) {
1818 /* fetch current position in logs */
1819 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1820 if (retcode < 0 && retcode != -ENOENT) {
1821 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1822 return set_cr_error(retcode);
1823 }
1824 yield {
1825 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1826 status.inc_marker.position = info.max_marker;
1827 map<string, bufferlist> attrs;
1828 status.encode_all_attrs(attrs);
1829 auto store = sync_env->store;
1830 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
1831 rgw_raw_obj(store->get_zone_params().log_pool, sync_status_oid),
1832 attrs));
1833 }
1834 return set_cr_done();
1835 }
1836 return 0;
1837 }
1838 };
1839
1840 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1841 {
1842 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1843 }
1844
1845 template <class T>
1846 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1847 {
1848 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1849 if (iter == attrs.end()) {
1850 *val = T();
1851 return;
1852 }
1853
1854 bufferlist::iterator biter = iter->second.begin();
1855 try {
1856 ::decode(*val, biter);
1857 } catch (buffer::error& err) {
1858 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1859 }
1860 }
1861
1862 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1863 {
1864 decode_attr(cct, attrs, "state", &state);
1865 decode_attr(cct, attrs, "full_marker", &full_marker);
1866 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1867 }
1868
1869 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1870 {
1871 encode_state_attr(attrs);
1872 full_marker.encode_attr(attrs);
1873 inc_marker.encode_attr(attrs);
1874 }
1875
1876 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1877 {
1878 ::encode(state, attrs["state"]);
1879 }
1880
1881 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1882 {
1883 ::encode(*this, attrs["full_marker"]);
1884 }
1885
1886 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1887 {
1888 ::encode(*this, attrs["inc_marker"]);
1889 }
1890
1891 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1892 RGWDataSyncEnv *sync_env;
1893 string oid;
1894 rgw_bucket_shard_sync_info *status;
1895
1896 map<string, bufferlist> attrs;
1897 public:
1898 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1899 const rgw_bucket_shard& bs,
1900 rgw_bucket_shard_sync_info *_status)
1901 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1902 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1903 status(_status) {}
1904 int operate() override;
1905 };
1906
1907 int RGWReadBucketSyncStatusCoroutine::operate()
1908 {
1909 reenter(this) {
1910 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1911 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1912 &attrs));
1913 if (retcode == -ENOENT) {
1914 *status = rgw_bucket_shard_sync_info();
1915 return set_cr_done();
1916 }
1917 if (retcode < 0) {
1918 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1919 return set_cr_error(retcode);
1920 }
1921 status->decode_from_attrs(sync_env->cct, attrs);
1922 return set_cr_done();
1923 }
1924 return 0;
1925 }
1926 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
1927 {
1928 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
1929 }
1930
1931 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1932 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
1933 delete iter->second;
1934 }
1935 delete error_logger;
1936 }
1937
1938
1939 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
1940 {
1941 JSONDecoder::decode_json("ID", id, obj);
1942 JSONDecoder::decode_json("DisplayName", display_name, obj);
1943 }
1944
1945 struct bucket_list_entry {
1946 bool delete_marker;
1947 rgw_obj_key key;
1948 bool is_latest;
1949 real_time mtime;
1950 string etag;
1951 uint64_t size;
1952 string storage_class;
1953 rgw_bucket_entry_owner owner;
1954 uint64_t versioned_epoch;
1955 string rgw_tag;
1956
1957 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1958
1959 void decode_json(JSONObj *obj) {
1960 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
1961 JSONDecoder::decode_json("Key", key.name, obj);
1962 JSONDecoder::decode_json("VersionId", key.instance, obj);
1963 JSONDecoder::decode_json("IsLatest", is_latest, obj);
1964 string mtime_str;
1965 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
1966
1967 struct tm t;
1968 uint32_t nsec;
1969 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
1970 ceph_timespec ts;
1971 ts.tv_sec = (uint64_t)internal_timegm(&t);
1972 ts.tv_nsec = nsec;
1973 mtime = real_clock::from_ceph_timespec(ts);
1974 }
1975 JSONDecoder::decode_json("ETag", etag, obj);
1976 JSONDecoder::decode_json("Size", size, obj);
1977 JSONDecoder::decode_json("StorageClass", storage_class, obj);
1978 JSONDecoder::decode_json("Owner", owner, obj);
1979 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
1980 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
1981 }
1982 };
1983
1984 struct bucket_list_result {
1985 string name;
1986 string prefix;
1987 string key_marker;
1988 string version_id_marker;
1989 int max_keys;
1990 bool is_truncated;
1991 list<bucket_list_entry> entries;
1992
1993 bucket_list_result() : max_keys(0), is_truncated(false) {}
1994
1995 void decode_json(JSONObj *obj) {
1996 JSONDecoder::decode_json("Name", name, obj);
1997 JSONDecoder::decode_json("Prefix", prefix, obj);
1998 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
1999 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2000 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2001 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2002 JSONDecoder::decode_json("Entries", entries, obj);
2003 }
2004 };
2005
2006 class RGWListBucketShardCR: public RGWCoroutine {
2007 RGWDataSyncEnv *sync_env;
2008 const rgw_bucket_shard& bs;
2009 const string instance_key;
2010 rgw_obj_key marker_position;
2011
2012 bucket_list_result *result;
2013
2014 public:
2015 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2016 rgw_obj_key& _marker_position, bucket_list_result *_result)
2017 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2018 instance_key(bs.get_key()), marker_position(_marker_position),
2019 result(_result) {}
2020
2021 int operate() override {
2022 reenter(this) {
2023 yield {
2024 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2025 { "versions" , NULL },
2026 { "format" , "json" },
2027 { "objs-container" , "true" },
2028 { "key-marker" , marker_position.name.c_str() },
2029 { "version-id-marker" , marker_position.instance.c_str() },
2030 { NULL, NULL } };
2031 // don't include tenant in the url, it's already part of instance_key
2032 string p = string("/") + bs.bucket.name;
2033 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2034 }
2035 if (retcode < 0) {
2036 return set_cr_error(retcode);
2037 }
2038 return set_cr_done();
2039 }
2040 return 0;
2041 }
2042 };
2043
2044 class RGWListBucketIndexLogCR: public RGWCoroutine {
2045 RGWDataSyncEnv *sync_env;
2046 const string instance_key;
2047 string marker;
2048
2049 list<rgw_bi_log_entry> *result;
2050
2051 public:
2052 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2053 string& _marker, list<rgw_bi_log_entry> *_result)
2054 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2055 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2056
2057 int operate() override {
2058 reenter(this) {
2059 yield {
2060 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2061 { "format" , "json" },
2062 { "marker" , marker.c_str() },
2063 { "type", "bucket-index" },
2064 { NULL, NULL } };
2065
2066 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2067 }
2068 if (retcode < 0) {
2069 return set_cr_error(retcode);
2070 }
2071 return set_cr_done();
2072 }
2073 return 0;
2074 }
2075 };
2076
2077 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2078
2079 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2080 RGWDataSyncEnv *sync_env;
2081
2082 string marker_oid;
2083 rgw_bucket_shard_full_sync_marker sync_marker;
2084
2085 public:
2086 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2087 const string& _marker_oid,
2088 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2089 sync_env(_sync_env),
2090 marker_oid(_marker_oid),
2091 sync_marker(_marker) {}
2092
2093 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2094 sync_marker.position = new_marker;
2095 sync_marker.count = index_pos;
2096
2097 map<string, bufferlist> attrs;
2098 sync_marker.encode_attr(attrs);
2099
2100 RGWRados *store = sync_env->store;
2101
2102 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2103 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2104 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2105 attrs);
2106 }
2107 };
2108
2109 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2110 RGWDataSyncEnv *sync_env;
2111
2112 string marker_oid;
2113 rgw_bucket_shard_inc_sync_marker sync_marker;
2114
2115 map<rgw_obj_key, string> key_to_marker;
2116 map<string, rgw_obj_key> marker_to_key;
2117
2118 void handle_finish(const string& marker) override {
2119 map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2120 if (iter == marker_to_key.end()) {
2121 return;
2122 }
2123 key_to_marker.erase(iter->second);
2124 reset_need_retry(iter->second);
2125 marker_to_key.erase(iter);
2126 }
2127
2128 public:
2129 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2130 const string& _marker_oid,
2131 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2132 sync_env(_sync_env),
2133 marker_oid(_marker_oid),
2134 sync_marker(_marker) {}
2135
2136 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2137 sync_marker.position = new_marker;
2138
2139 map<string, bufferlist> attrs;
2140 sync_marker.encode_attr(attrs);
2141
2142 RGWRados *store = sync_env->store;
2143
2144 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2145 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2146 store,
2147 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2148 attrs);
2149 }
2150
2151 /*
2152 * create index from key -> <op, marker>, and from marker -> key
2153 * this is useful so that we can insure that we only have one
2154 * entry for any key that is used. This is needed when doing
2155 * incremenatl sync of data, and we don't want to run multiple
2156 * concurrent sync operations for the same bucket shard
2157 * Also, we should make sure that we don't run concurrent operations on the same key with
2158 * different ops.
2159 */
2160 bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2161 if (key_to_marker.find(key) != key_to_marker.end()) {
2162 set_need_retry(key);
2163 return false;
2164 }
2165 key_to_marker[key] = marker;
2166 marker_to_key[marker] = key;
2167 return true;
2168 }
2169
2170 bool can_do_op(const rgw_obj_key& key) {
2171 return (key_to_marker.find(key) == key_to_marker.end());
2172 }
2173 };
2174
2175 template <class T, class K>
2176 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2177 RGWDataSyncEnv *sync_env;
2178
2179 RGWBucketInfo *bucket_info;
2180 const rgw_bucket_shard& bs;
2181
2182 rgw_obj_key key;
2183 bool versioned;
2184 uint64_t versioned_epoch;
2185 rgw_bucket_entry_owner owner;
2186 real_time timestamp;
2187 RGWModifyOp op;
2188 RGWPendingState op_state;
2189
2190 T entry_marker;
2191 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2192
2193 int sync_status;
2194
2195 stringstream error_ss;
2196
2197 RGWDataSyncDebugLogger logger;
2198
2199 bool error_injection;
2200
2201 RGWDataSyncModule *data_sync_module;
2202
2203 rgw_zone_set zones_trace;
2204
2205 public:
2206 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2207 RGWBucketInfo *_bucket_info,
2208 const rgw_bucket_shard& bs,
2209 const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2210 real_time& _timestamp,
2211 const rgw_bucket_entry_owner& _owner,
2212 RGWModifyOp _op, RGWPendingState _op_state,
2213 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2214 sync_env(_sync_env),
2215 bucket_info(_bucket_info), bs(bs),
2216 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2217 owner(_owner),
2218 timestamp(_timestamp), op(_op),
2219 op_state(_op_state),
2220 entry_marker(_entry_marker),
2221 marker_tracker(_marker_tracker),
2222 sync_status(0){
2223 stringstream ss;
2224 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2225 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2226 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2227 set_status("init");
2228
2229 logger.init(sync_env, "Object", ss.str());
2230
2231 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2232
2233 data_sync_module = sync_env->sync_module->get_data_handler();
2234
2235 zones_trace = _zones_trace;
2236 zones_trace.insert(sync_env->store->get_zone().id);
2237 }
2238
2239 int operate() override {
2240 reenter(this) {
2241 /* skip entries that are not complete */
2242 if (op_state != CLS_RGW_STATE_COMPLETE) {
2243 goto done;
2244 }
2245 do {
2246 yield {
2247 marker_tracker->reset_need_retry(key);
2248 if (key.name.empty()) {
2249 /* shouldn't happen */
2250 set_status("skipping empty entry");
2251 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2252 goto done;
2253 }
2254 if (error_injection &&
2255 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2256 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2257 retcode = -EIO;
2258 } else if (op == CLS_RGW_OP_ADD ||
2259 op == CLS_RGW_OP_LINK_OLH) {
2260 if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2261 set_status("skipping entry");
2262 ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2263 goto done;
2264
2265 }
2266 set_status("syncing obj");
2267 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2268 logger.log("fetch");
2269 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2270 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2271 set_status("removing obj");
2272 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2273 versioned = true;
2274 }
2275 logger.log("remove");
2276 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
2277 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2278 logger.log("creating delete marker");
2279 set_status("creating delete marker");
2280 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2281 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
2282 }
2283 }
2284 } while (marker_tracker->need_retry(key));
2285 {
2286 stringstream ss;
2287 if (retcode >= 0) {
2288 ss << "done";
2289 } else {
2290 ss << "done, retcode=" << retcode;
2291 }
2292 logger.log(ss.str());
2293 }
2294
2295 if (retcode < 0 && retcode != -ENOENT) {
2296 set_status() << "failed to sync obj; retcode=" << retcode;
2297 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2298 << bucket_shard_str{bs} << "/" << key.name << dendl;
2299 error_ss << bucket_shard_str{bs} << "/" << key.name;
2300 sync_status = retcode;
2301 }
2302 if (!error_ss.str().empty()) {
2303 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
2304 }
2305 done:
2306 if (sync_status == 0) {
2307 /* update marker */
2308 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2309 yield call(marker_tracker->finish(entry_marker));
2310 sync_status = retcode;
2311 }
2312 if (sync_status < 0) {
2313 return set_cr_error(sync_status);
2314 }
2315 return set_cr_done();
2316 }
2317 return 0;
2318 }
2319 };
2320
2321 #define BUCKET_SYNC_SPAWN_WINDOW 20
2322
2323 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2324 RGWDataSyncEnv *sync_env;
2325 const rgw_bucket_shard& bs;
2326 RGWBucketInfo *bucket_info;
2327 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2328 bucket_list_result list_result;
2329 list<bucket_list_entry>::iterator entries_iter;
2330 rgw_bucket_shard_full_sync_marker& full_marker;
2331 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2332 rgw_obj_key list_marker;
2333 bucket_list_entry *entry{nullptr};
2334 RGWModifyOp op{CLS_RGW_OP_ADD};
2335
2336 int total_entries{0};
2337
2338 int sync_status{0};
2339
2340 const string& status_oid;
2341
2342 RGWDataSyncDebugLogger logger;
2343 rgw_zone_set zones_trace;
2344 public:
2345 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2346 RGWBucketInfo *_bucket_info,
2347 const std::string& status_oid,
2348 RGWContinuousLeaseCR *lease_cr,
2349 rgw_bucket_shard_full_sync_marker& _full_marker)
2350 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2351 bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2352 marker_tracker(sync_env, status_oid, full_marker),
2353 status_oid(status_oid) {
2354 logger.init(sync_env, "BucketFull", bs.get_key());
2355 zones_trace.insert(sync_env->source_zone);
2356 }
2357
2358 int operate() override;
2359 };
2360
2361 int RGWBucketShardFullSyncCR::operate()
2362 {
2363 int ret;
2364 reenter(this) {
2365 list_marker = full_marker.position;
2366
2367 total_entries = full_marker.count;
2368 do {
2369 if (!lease_cr->is_locked()) {
2370 drain_all();
2371 return set_cr_error(-ECANCELED);
2372 }
2373 set_status("listing remote bucket");
2374 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2375 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2376 &list_result));
2377 if (retcode < 0 && retcode != -ENOENT) {
2378 set_status("failed bucket listing, going down");
2379 drain_all();
2380 return set_cr_error(retcode);
2381 }
2382 entries_iter = list_result.entries.begin();
2383 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2384 if (!lease_cr->is_locked()) {
2385 drain_all();
2386 return set_cr_error(-ECANCELED);
2387 }
2388 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2389 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2390 entry = &(*entries_iter);
2391 total_entries++;
2392 list_marker = entries_iter->key;
2393 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2394 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2395 } else {
2396 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2397 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2398 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2399 false, /* versioned, only matters for object removal */
2400 entry->versioned_epoch, entry->mtime,
2401 entry->owner, op, CLS_RGW_STATE_COMPLETE,
2402 entry->key, &marker_tracker, zones_trace),
2403 false);
2404 }
2405 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2406 yield wait_for_child();
2407 bool again = true;
2408 while (again) {
2409 again = collect(&ret, nullptr);
2410 if (ret < 0) {
2411 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2412 sync_status = ret;
2413 /* we have reported this error */
2414 }
2415 }
2416 }
2417 }
2418 } while (list_result.is_truncated && sync_status == 0);
2419 set_status("done iterating over all objects");
2420 /* wait for all operations to complete */
2421 while (num_spawned()) {
2422 yield wait_for_child();
2423 bool again = true;
2424 while (again) {
2425 again = collect(&ret, nullptr);
2426 if (ret < 0) {
2427 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2428 sync_status = ret;
2429 /* we have reported this error */
2430 }
2431 }
2432 }
2433 if (!lease_cr->is_locked()) {
2434 return set_cr_error(-ECANCELED);
2435 }
2436 /* update sync state to incremental */
2437 if (sync_status == 0) {
2438 yield {
2439 rgw_bucket_shard_sync_info sync_status;
2440 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2441 map<string, bufferlist> attrs;
2442 sync_status.encode_state_attr(attrs);
2443 RGWRados *store = sync_env->store;
2444 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2445 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2446 attrs));
2447 }
2448 } else {
2449 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2450 }
2451 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2452 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2453 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2454 return set_cr_error(retcode);
2455 }
2456 if (sync_status < 0) {
2457 return set_cr_error(sync_status);
2458 }
2459 return set_cr_done();
2460 }
2461 return 0;
2462 }
2463
2464 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2465 RGWDataSyncEnv *sync_env;
2466 const rgw_bucket_shard& bs;
2467 RGWBucketInfo *bucket_info;
2468 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2469 list<rgw_bi_log_entry> list_result;
2470 list<rgw_bi_log_entry>::iterator entries_iter;
2471 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2472 rgw_bucket_shard_inc_sync_marker& inc_marker;
2473 rgw_obj_key key;
2474 rgw_bi_log_entry *entry{nullptr};
2475 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2476 bool updated_status{false};
2477 const string& status_oid;
2478 const string& zone_id;
2479
2480 string cur_id;
2481
2482 RGWDataSyncDebugLogger logger;
2483
2484 int sync_status{0};
2485
2486 public:
2487 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2488 const rgw_bucket_shard& bs,
2489 RGWBucketInfo *_bucket_info,
2490 const std::string& status_oid,
2491 RGWContinuousLeaseCR *lease_cr,
2492 rgw_bucket_shard_inc_sync_marker& _inc_marker)
2493 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2494 bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2495 marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
2496 set_description() << "bucket shard incremental sync bucket="
2497 << bucket_shard_str{bs};
2498 set_status("init");
2499 logger.init(sync_env, "BucketInc", bs.get_key());
2500 }
2501
2502 int operate() override;
2503 };
2504
2505 int RGWBucketShardIncrementalSyncCR::operate()
2506 {
2507 int ret;
2508 reenter(this) {
2509 do {
2510 if (!lease_cr->is_locked()) {
2511 drain_all();
2512 return set_cr_error(-ECANCELED);
2513 }
2514 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
2515 set_status() << "listing bilog; position=" << inc_marker.position;
2516 yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2517 &list_result));
2518 if (retcode < 0 && retcode != -ENOENT) {
2519 /* wait for all operations to complete */
2520 drain_all();
2521 return set_cr_error(retcode);
2522 }
2523 squash_map.clear();
2524 for (auto& e : list_result) {
2525 if (e.state != CLS_RGW_STATE_COMPLETE) {
2526 continue;
2527 }
2528 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2529 continue;
2530 }
2531 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2532 if (squash_entry.first <= e.timestamp) {
2533 squash_entry = make_pair<>(e.timestamp, e.op);
2534 }
2535 }
2536 entries_iter = list_result.begin();
2537 for (; entries_iter != list_result.end(); ++entries_iter) {
2538 if (!lease_cr->is_locked()) {
2539 drain_all();
2540 return set_cr_error(-ECANCELED);
2541 }
2542 entry = &(*entries_iter);
2543 {
2544 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2545 if (p < 0) {
2546 cur_id = entry->id;
2547 } else {
2548 cur_id = entry->id.substr(p + 1);
2549 }
2550 }
2551 inc_marker.position = cur_id;
2552
2553 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2554 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2555 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2556 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2557 continue;
2558 }
2559
2560 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2561
2562 if (!key.ns.empty()) {
2563 set_status() << "skipping entry in namespace: " << entry->object;
2564 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2565 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2566 continue;
2567 }
2568
2569 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2570 if (entry->op == CLS_RGW_OP_CANCEL) {
2571 set_status() << "canceled operation, skipping";
2572 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2573 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2574 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2575 continue;
2576 }
2577 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2578 set_status() << "non-complete operation, skipping";
2579 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2580 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2581 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2582 continue;
2583 }
2584 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2585 set_status() << "redundant operation, skipping";
2586 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2587 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2588 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2589 continue;
2590 }
2591 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2592 set_status() << "squashed operation, skipping";
2593 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2594 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2595 /* not updating high marker though */
2596 continue;
2597 }
2598 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2599 << bucket_shard_str{bs} << "/" << key << dendl;
2600 updated_status = false;
2601 while (!marker_tracker.can_do_op(key)) {
2602 if (!updated_status) {
2603 set_status() << "can't do op, conflicting inflight operation";
2604 updated_status = true;
2605 }
2606 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2607 yield wait_for_child();
2608 bool again = true;
2609 while (again) {
2610 again = collect(&ret, nullptr);
2611 if (ret < 0) {
2612 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2613 sync_status = ret;
2614 /* we have reported this error */
2615 }
2616 }
2617 }
2618 if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2619 set_status() << "can't do op, sync already in progress for object";
2620 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2621 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2622 continue;
2623 }
2624 // yield {
2625 set_status() << "start object sync";
2626 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2627 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2628 } else {
2629 uint64_t versioned_epoch = 0;
2630 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2631 if (entry->ver.pool < 0) {
2632 versioned_epoch = entry->ver.epoch;
2633 }
2634 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2635 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2636 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2637 entry->is_versioned(), versioned_epoch,
2638 entry->timestamp, owner, entry->op, entry->state,
2639 cur_id, &marker_tracker, entry->zones_trace),
2640 false);
2641 }
2642 // }
2643 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2644 set_status() << "num_spawned() > spawn_window";
2645 yield wait_for_child();
2646 bool again = true;
2647 while (again) {
2648 again = collect(&ret, nullptr);
2649 if (ret < 0) {
2650 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2651 sync_status = ret;
2652 /* we have reported this error */
2653 }
2654 /* not waiting for child here */
2655 }
2656 }
2657 }
2658 } while (!list_result.empty() && sync_status == 0);
2659
2660 while (num_spawned()) {
2661 yield wait_for_child();
2662 bool again = true;
2663 while (again) {
2664 again = collect(&ret, nullptr);
2665 if (ret < 0) {
2666 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2667 sync_status = ret;
2668 /* we have reported this error */
2669 }
2670 /* not waiting for child here */
2671 }
2672 }
2673
2674 yield call(marker_tracker.flush());
2675 if (retcode < 0) {
2676 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2677 return set_cr_error(retcode);
2678 }
2679 if (sync_status < 0) {
2680 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2681 }
2682
2683 /* wait for all operations to complete */
2684 drain_all();
2685
2686 if (sync_status < 0) {
2687 return set_cr_error(sync_status);
2688 }
2689
2690 return set_cr_done();
2691 }
2692 return 0;
2693 }
2694
2695 int RGWRunBucketSyncCoroutine::operate()
2696 {
2697 reenter(this) {
2698 yield {
2699 set_status("acquiring sync lock");
2700 auto store = sync_env->store;
2701 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
2702 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2703 "sync_lock",
2704 cct->_conf->rgw_sync_lease_period,
2705 this));
2706 lease_stack.reset(spawn(lease_cr.get(), false));
2707 }
2708 while (!lease_cr->is_locked()) {
2709 if (lease_cr->is_done()) {
2710 ldout(cct, 5) << "lease cr failed, done early" << dendl;
2711 set_status("lease lock failed, early abort");
2712 return set_cr_error(lease_cr->get_ret_status());
2713 }
2714 set_sleeping(true);
2715 yield;
2716 }
2717
2718 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2719 if (retcode < 0 && retcode != -ENOENT) {
2720 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2721 << bucket_shard_str{bs} << dendl;
2722 lease_cr->go_down();
2723 drain_all();
2724 return set_cr_error(retcode);
2725 }
2726
2727 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2728 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2729
2730 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2731 if (retcode == -ENOENT) {
2732 /* bucket instance info has not been synced in yet, fetch it now */
2733 yield {
2734 ldout(sync_env->cct, 10) << "no local info for bucket "
2735 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
2736 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
2737
2738 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
2739
2740 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
2741 string() /* no marker */,
2742 MDLOG_STATUS_COMPLETE,
2743 NULL /* no marker tracker */));
2744 }
2745 if (retcode < 0) {
2746 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
2747 lease_cr->go_down();
2748 drain_all();
2749 return set_cr_error(retcode);
2750 }
2751
2752 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2753 }
2754 if (retcode < 0) {
2755 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
2756 lease_cr->go_down();
2757 drain_all();
2758 return set_cr_error(retcode);
2759 }
2760
2761 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
2762 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
2763 if (retcode < 0) {
2764 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
2765 << " failed, retcode=" << retcode << dendl;
2766 lease_cr->go_down();
2767 drain_all();
2768 return set_cr_error(retcode);
2769 }
2770 }
2771
2772 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
2773 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
2774 status_oid, lease_cr.get(),
2775 sync_status.full_marker));
2776 if (retcode < 0) {
2777 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
2778 << " failed, retcode=" << retcode << dendl;
2779 lease_cr->go_down();
2780 drain_all();
2781 return set_cr_error(retcode);
2782 }
2783 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2784 }
2785
2786 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
2787 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
2788 status_oid, lease_cr.get(),
2789 sync_status.inc_marker));
2790 if (retcode < 0) {
2791 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
2792 << " failed, retcode=" << retcode << dendl;
2793 lease_cr->go_down();
2794 drain_all();
2795 return set_cr_error(retcode);
2796 }
2797 }
2798
2799 lease_cr->go_down();
2800 drain_all();
2801 return set_cr_done();
2802 }
2803
2804 return 0;
2805 }
2806
2807 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
2808 {
2809 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
2810 }
2811
2812 int RGWBucketSyncStatusManager::init()
2813 {
2814 conn = store->get_zone_conn_by_id(source_zone);
2815 if (!conn) {
2816 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2817 return -EINVAL;
2818 }
2819
2820 int ret = http_manager.set_threaded();
2821 if (ret < 0) {
2822 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
2823 return ret;
2824 }
2825
2826
2827 const string key = bucket.get_key();
2828
2829 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
2830 { NULL, NULL } };
2831
2832 string path = string("/admin/metadata/bucket.instance");
2833
2834 bucket_instance_meta_info result;
2835 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
2836 if (ret < 0) {
2837 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
2838 return ret;
2839 }
2840
2841 RGWBucketInfo& bi = result.data.get_bucket_info();
2842 num_shards = bi.num_shards;
2843
2844 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2845
2846 sync_module.reset(new RGWDefaultSyncModuleInstance());
2847
2848 int effective_num_shards = (num_shards ? num_shards : 1);
2849
2850 auto async_rados = store->get_async_rados();
2851
2852 for (int i = 0; i < effective_num_shards; i++) {
2853 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
2854 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
2855 if (ret < 0) {
2856 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
2857 return ret;
2858 }
2859 source_logs[i] = l;
2860 }
2861
2862 return 0;
2863 }
2864
2865 int RGWBucketSyncStatusManager::init_sync_status()
2866 {
2867 list<RGWCoroutinesStack *> stacks;
2868
2869 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2870 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2871 RGWRemoteBucketLog *l = iter->second;
2872 stack->call(l->init_sync_status_cr());
2873
2874 stacks.push_back(stack);
2875 }
2876
2877 return cr_mgr.run(stacks);
2878 }
2879
2880 int RGWBucketSyncStatusManager::read_sync_status()
2881 {
2882 list<RGWCoroutinesStack *> stacks;
2883
2884 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2885 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2886 RGWRemoteBucketLog *l = iter->second;
2887 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
2888
2889 stacks.push_back(stack);
2890 }
2891
2892 int ret = cr_mgr.run(stacks);
2893 if (ret < 0) {
2894 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2895 << bucket_str{bucket} << dendl;
2896 return ret;
2897 }
2898
2899 return 0;
2900 }
2901
2902 int RGWBucketSyncStatusManager::run()
2903 {
2904 list<RGWCoroutinesStack *> stacks;
2905
2906 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2907 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2908 RGWRemoteBucketLog *l = iter->second;
2909 stack->call(l->run_sync_cr());
2910
2911 stacks.push_back(stack);
2912 }
2913
2914 int ret = cr_mgr.run(stacks);
2915 if (ret < 0) {
2916 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2917 << bucket_str{bucket} << dendl;
2918 return ret;
2919 }
2920
2921 return 0;
2922 }
2923
2924 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
2925 const rgw_bucket_shard& bs)
2926 {
2927 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
2928 }
2929
2930
2931 // TODO: move into rgw_data_sync_trim.cc
2932 #undef dout_prefix
2933 #define dout_prefix (*_dout << "data trim: ")
2934
2935 namespace {
2936
2937 /// return the marker that it's safe to trim up to
2938 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
2939 {
2940 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2941 }
2942
2943 /// comparison operator for take_min_markers()
2944 bool operator<(const rgw_data_sync_marker& lhs,
2945 const rgw_data_sync_marker& rhs)
2946 {
2947 // sort by stable marker
2948 return get_stable_marker(lhs) < get_stable_marker(rhs);
2949 }
2950
2951 /// populate the container starting with 'dest' with the minimum stable marker
2952 /// of each shard for all of the peers in [first, last)
2953 template <typename IterIn, typename IterOut>
2954 void take_min_markers(IterIn first, IterIn last, IterOut dest)
2955 {
2956 if (first == last) {
2957 return;
2958 }
2959 // initialize markers with the first peer's
2960 auto m = dest;
2961 for (auto &shard : first->sync_markers) {
2962 *m = std::move(shard.second);
2963 ++m;
2964 }
2965 // for remaining peers, replace with smaller markers
2966 for (auto p = first + 1; p != last; ++p) {
2967 m = dest;
2968 for (auto &shard : p->sync_markers) {
2969 if (shard.second < *m) {
2970 *m = std::move(shard.second);
2971 }
2972 ++m;
2973 }
2974 }
2975 }
2976
2977 } // anonymous namespace
2978
2979 class DataLogTrimCR : public RGWCoroutine {
2980 RGWRados *store;
2981 RGWHTTPManager *http;
2982 const int num_shards;
2983 const std::string& zone_id; //< my zone id
2984 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
2985 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
2986 std::vector<std::string>& last_trim; //< last trimmed marker per shard
2987 int ret{0};
2988
2989 public:
2990 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
2991 int num_shards, std::vector<std::string>& last_trim)
2992 : RGWCoroutine(store->ctx()), store(store), http(http),
2993 num_shards(num_shards),
2994 zone_id(store->get_zone().id),
2995 peer_status(store->zone_conn_map.size()),
2996 min_shard_markers(num_shards),
2997 last_trim(last_trim)
2998 {}
2999
3000 int operate() override;
3001 };
3002
3003 int DataLogTrimCR::operate()
3004 {
3005 reenter(this) {
3006 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3007 set_status("fetching sync status");
3008 yield {
3009 // query data sync status from each sync peer
3010 rgw_http_param_pair params[] = {
3011 { "type", "data" },
3012 { "status", nullptr },
3013 { "source-zone", zone_id.c_str() },
3014 { nullptr, nullptr }
3015 };
3016
3017 auto p = peer_status.begin();
3018 for (auto& c : store->zone_conn_map) {
3019 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3020 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3021 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3022 false);
3023 ++p;
3024 }
3025 }
3026
3027 // must get a successful reply from all peers to consider trimming
3028 ret = 0;
3029 while (ret == 0 && num_spawned() > 0) {
3030 yield wait_for_child();
3031 collect_next(&ret);
3032 }
3033 drain_all();
3034
3035 if (ret < 0) {
3036 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3037 return set_cr_error(ret);
3038 }
3039
3040 ldout(cct, 10) << "trimming log shards" << dendl;
3041 set_status("trimming log shards");
3042 yield {
3043 // determine the minimum marker for each shard
3044 take_min_markers(peer_status.begin(), peer_status.end(),
3045 min_shard_markers.begin());
3046
3047 for (int i = 0; i < num_shards; i++) {
3048 const auto& m = min_shard_markers[i];
3049 auto& stable = get_stable_marker(m);
3050 if (stable <= last_trim[i]) {
3051 continue;
3052 }
3053 ldout(cct, 10) << "trimming log shard " << i
3054 << " at marker=" << stable
3055 << " last_trim=" << last_trim[i] << dendl;
3056 using TrimCR = RGWSyncLogTrimCR;
3057 spawn(new TrimCR(store, store->data_log->get_oid(i),
3058 stable, &last_trim[i]),
3059 true);
3060 }
3061 }
3062 return set_cr_done();
3063 }
3064 return 0;
3065 }
3066
3067 class DataLogTrimPollCR : public RGWCoroutine {
3068 RGWRados *store;
3069 RGWHTTPManager *http;
3070 const int num_shards;
3071 const utime_t interval; //< polling interval
3072 const std::string lock_oid; //< use first data log shard for lock
3073 const std::string lock_cookie;
3074 std::vector<std::string> last_trim; //< last trimmed marker per shard
3075
3076 public:
3077 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3078 int num_shards, utime_t interval)
3079 : RGWCoroutine(store->ctx()), store(store), http(http),
3080 num_shards(num_shards), interval(interval),
3081 lock_oid(store->data_log->get_oid(0)),
3082 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3083 last_trim(num_shards)
3084 {}
3085
3086 int operate() override;
3087 };
3088
3089 int DataLogTrimPollCR::operate()
3090 {
3091 reenter(this) {
3092 for (;;) {
3093 set_status("sleeping");
3094 wait(interval);
3095
3096 // request a 'data_trim' lock that covers the entire wait interval to
3097 // prevent other gateways from attempting to trim for the duration
3098 set_status("acquiring trim lock");
3099 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3100 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3101 "data_trim", lock_cookie,
3102 interval.sec()));
3103 if (retcode < 0) {
3104 // if the lock is already held, go back to sleep and try again later
3105 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3106 << interval.sec() << "s" << dendl;
3107 continue;
3108 }
3109
3110 set_status("trimming");
3111 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3112
3113 // note that the lock is not released. this is intentional, as it avoids
3114 // duplicating this work in other gateways
3115 }
3116 }
3117 return 0;
3118 }
3119
3120 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3121 RGWHTTPManager *http,
3122 int num_shards, utime_t interval)
3123 {
3124 return new DataLogTrimPollCR(store, http, num_shards, interval);
3125 }