]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
update sources to v12.1.2
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 #include <boost/utility/string_ref.hpp>
2
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
21
22 #include "cls/lock/cls_lock_client.h"
23
24 #include "auth/Crypto.h"
25
26 #include <boost/asio/yield.hpp>
27
28 #define dout_subsys ceph_subsys_rgw
29
30 #undef dout_prefix
31 #define dout_prefix (*_dout << "data sync: ")
32
33 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
34 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
35 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
36 static string bucket_status_oid_prefix = "bucket.sync-status";
37
38 class RGWSyncDebugLogger {
39 CephContext *cct;
40 string prefix;
41
42 bool ended;
43
44 public:
45 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
46 const string& sync_type, const string& sync_stage,
47 const string& resource, bool log_start = true) {
48 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
49 }
50 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
51 ~RGWSyncDebugLogger();
52
53 void init(CephContext *_cct, const string& source_zone,
54 const string& sync_type, const string& sync_stage,
55 const string& resource, bool log_start = true);
56 void log(const string& state);
57 void finish(int status);
58 };
59
60 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
61 const string& sync_type, const string& sync_section,
62 const string& resource, bool log_start)
63 {
64 cct = _cct;
65 ended = false;
66 string zone_str = source_zone.substr(0, 8);
67 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
68 if (log_start) {
69 log("start");
70 }
71 }
72
73 RGWSyncDebugLogger::~RGWSyncDebugLogger()
74 {
75 if (!ended) {
76 log("finish");
77 }
78 }
79
80 void RGWSyncDebugLogger::log(const string& state)
81 {
82 ldout(cct, 5) << prefix << ":" << state << dendl;
83 }
84
85 void RGWSyncDebugLogger::finish(int status)
86 {
87 ended = true;
88 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
89 }
90
91 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
92 public:
93 RGWDataSyncDebugLogger() {}
94 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
95 const string& resource, bool log_start = true) {
96 init(sync_env, sync_section, resource, log_start);
97 }
98 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
99 const string& resource, bool log_start = true) {
100 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
101 }
102
103 };
104
105 void rgw_datalog_info::decode_json(JSONObj *obj) {
106 JSONDecoder::decode_json("num_objects", num_shards, obj);
107 }
108
109 void rgw_datalog_entry::decode_json(JSONObj *obj) {
110 JSONDecoder::decode_json("key", key, obj);
111 utime_t ut;
112 JSONDecoder::decode_json("timestamp", ut, obj);
113 timestamp = ut.to_real_time();
114 }
115
116 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
117 JSONDecoder::decode_json("marker", marker, obj);
118 JSONDecoder::decode_json("truncated", truncated, obj);
119 JSONDecoder::decode_json("entries", entries, obj);
120 };
121
122 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
123 static constexpr int MAX_CONCURRENT_SHARDS = 16;
124
125 RGWDataSyncEnv *env;
126 const int num_shards;
127 int shard_id{0};;
128
129 map<uint32_t, rgw_data_sync_marker>& markers;
130
131 public:
132 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
133 map<uint32_t, rgw_data_sync_marker>& markers)
134 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
135 env(env), num_shards(num_shards), markers(markers)
136 {}
137 bool spawn_next() override;
138 };
139
140 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
141 {
142 if (shard_id >= num_shards) {
143 return false;
144 }
145 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
146 spawn(new CR(env->async_rados, env->store,
147 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
148 &markers[shard_id]),
149 false);
150 shard_id++;
151 return true;
152 }
153
154 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
155 RGWDataSyncEnv *sync_env;
156 rgw_data_sync_status *sync_status;
157
158 public:
159 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
160 rgw_data_sync_status *_status)
161 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
162 {}
163 int operate() override;
164 };
165
166 int RGWReadDataSyncStatusCoroutine::operate()
167 {
168 reenter(this) {
169 // read sync info
170 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
171 yield {
172 bool empty_on_enoent = false; // fail on ENOENT
173 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
174 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
175 &sync_status->sync_info, empty_on_enoent));
176 }
177 if (retcode < 0) {
178 ldout(sync_env->cct, 4) << "failed to read sync status info with "
179 << cpp_strerror(retcode) << dendl;
180 return set_cr_error(retcode);
181 }
182 // read shard markers
183 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
184 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
185 sync_status->sync_markers));
186 if (retcode < 0) {
187 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
188 << cpp_strerror(retcode) << dendl;
189 return set_cr_error(retcode);
190 }
191 return set_cr_done();
192 }
193 return 0;
194 }
195
196 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
197 RGWDataSyncEnv *sync_env;
198
199 RGWRESTReadResource *http_op;
200
201 int shard_id;
202 RGWDataChangesLogInfo *shard_info;
203
204 public:
205 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
206 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
207 sync_env(_sync_env),
208 http_op(NULL),
209 shard_id(_shard_id),
210 shard_info(_shard_info) {
211 }
212
213 ~RGWReadRemoteDataLogShardInfoCR() override {
214 if (http_op) {
215 http_op->put();
216 }
217 }
218
219 int operate() override {
220 reenter(this) {
221 yield {
222 char buf[16];
223 snprintf(buf, sizeof(buf), "%d", shard_id);
224 rgw_http_param_pair pairs[] = { { "type" , "data" },
225 { "id", buf },
226 { "info" , NULL },
227 { NULL, NULL } };
228
229 string p = "/admin/log/";
230
231 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
232
233 http_op->set_user_info((void *)stack);
234
235 int ret = http_op->aio_read();
236 if (ret < 0) {
237 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
238 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
239 return set_cr_error(ret);
240 }
241
242 return io_block(0);
243 }
244 yield {
245 int ret = http_op->wait(shard_info);
246 if (ret < 0) {
247 return set_cr_error(ret);
248 }
249 return set_cr_done();
250 }
251 }
252 return 0;
253 }
254 };
255
256 struct read_remote_data_log_response {
257 string marker;
258 bool truncated;
259 list<rgw_data_change_log_entry> entries;
260
261 read_remote_data_log_response() : truncated(false) {}
262
263 void decode_json(JSONObj *obj) {
264 JSONDecoder::decode_json("marker", marker, obj);
265 JSONDecoder::decode_json("truncated", truncated, obj);
266 JSONDecoder::decode_json("entries", entries, obj);
267 };
268 };
269
270 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
271 RGWDataSyncEnv *sync_env;
272
273 RGWRESTReadResource *http_op;
274
275 int shard_id;
276 string *pmarker;
277 list<rgw_data_change_log_entry> *entries;
278 bool *truncated;
279
280 read_remote_data_log_response response;
281
282 public:
283 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
284 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
285 sync_env(_sync_env),
286 http_op(NULL),
287 shard_id(_shard_id),
288 pmarker(_pmarker),
289 entries(_entries),
290 truncated(_truncated) {
291 }
292 ~RGWReadRemoteDataLogShardCR() override {
293 if (http_op) {
294 http_op->put();
295 }
296 }
297
298 int operate() override {
299 reenter(this) {
300 yield {
301 char buf[16];
302 snprintf(buf, sizeof(buf), "%d", shard_id);
303 rgw_http_param_pair pairs[] = { { "type" , "data" },
304 { "id", buf },
305 { "marker", pmarker->c_str() },
306 { "extra-info", "true" },
307 { NULL, NULL } };
308
309 string p = "/admin/log/";
310
311 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
312
313 http_op->set_user_info((void *)stack);
314
315 int ret = http_op->aio_read();
316 if (ret < 0) {
317 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
318 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
319 return set_cr_error(ret);
320 }
321
322 return io_block(0);
323 }
324 yield {
325 int ret = http_op->wait(&response);
326 if (ret < 0) {
327 return set_cr_error(ret);
328 }
329 entries->clear();
330 entries->swap(response.entries);
331 *pmarker = response.marker;
332 *truncated = response.truncated;
333 return set_cr_done();
334 }
335 }
336 return 0;
337 }
338 };
339
340 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
341 RGWDataSyncEnv *sync_env;
342
343 int num_shards;
344 map<int, RGWDataChangesLogInfo> *datalog_info;
345
346 int shard_id;
347 #define READ_DATALOG_MAX_CONCURRENT 10
348
349 public:
350 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
351 int _num_shards,
352 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
353 sync_env(_sync_env), num_shards(_num_shards),
354 datalog_info(_datalog_info), shard_id(0) {}
355 bool spawn_next() override;
356 };
357
358 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
359 if (shard_id >= num_shards) {
360 return false;
361 }
362 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
363 shard_id++;
364 return true;
365 }
366
367 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
368 RGWDataSyncEnv *sync_env;
369 RGWRESTReadResource *http_op;
370
371 int shard_id;
372 string marker;
373 uint32_t max_entries;
374 rgw_datalog_shard_data *result;
375
376 public:
377 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
378 const string& _marker, uint32_t _max_entries,
379 rgw_datalog_shard_data *_result)
380 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
381 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
382
383 int send_request() override {
384 RGWRESTConn *conn = sync_env->conn;
385 RGWRados *store = sync_env->store;
386
387 char buf[32];
388 snprintf(buf, sizeof(buf), "%d", shard_id);
389
390 char max_entries_buf[32];
391 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
392
393 const char *marker_key = (marker.empty() ? "" : "marker");
394
395 rgw_http_param_pair pairs[] = { { "type", "data" },
396 { "id", buf },
397 { "max-entries", max_entries_buf },
398 { marker_key, marker.c_str() },
399 { NULL, NULL } };
400
401 string p = "/admin/log/";
402
403 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
404 http_op->set_user_info((void *)stack);
405
406 int ret = http_op->aio_read();
407 if (ret < 0) {
408 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
409 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
410 http_op->put();
411 return ret;
412 }
413
414 return 0;
415 }
416
417 int request_complete() override {
418 int ret = http_op->wait(result);
419 http_op->put();
420 if (ret < 0 && ret != -ENOENT) {
421 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
422 return ret;
423 }
424 return 0;
425 }
426 };
427
428 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
429 RGWDataSyncEnv *sync_env;
430
431 map<int, string> shards;
432 int max_entries_per_shard;
433 map<int, rgw_datalog_shard_data> *result;
434
435 map<int, string>::iterator iter;
436 #define READ_DATALOG_MAX_CONCURRENT 10
437
438 public:
439 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
440 map<int, string>& _shards,
441 int _max_entries_per_shard,
442 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
443 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
444 result(_result) {
445 shards.swap(_shards);
446 iter = shards.begin();
447 }
448 bool spawn_next() override;
449 };
450
451 bool RGWListRemoteDataLogCR::spawn_next() {
452 if (iter == shards.end()) {
453 return false;
454 }
455
456 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
457 ++iter;
458 return true;
459 }
460
461 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
462 static constexpr uint32_t lock_duration = 30;
463 RGWDataSyncEnv *sync_env;
464 RGWRados *store;
465 const rgw_pool& pool;
466 const uint32_t num_shards;
467
468 string sync_status_oid;
469
470 string lock_name;
471 string cookie;
472 rgw_data_sync_status *status;
473 map<int, RGWDataChangesLogInfo> shards_info;
474 public:
475 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
476 uint64_t instance_id,
477 rgw_data_sync_status *status)
478 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
479 pool(store->get_zone_params().log_pool),
480 num_shards(num_shards), status(status) {
481 lock_name = "sync_lock";
482
483 status->sync_info.instance_id = instance_id;
484
485 #define COOKIE_LEN 16
486 char buf[COOKIE_LEN + 1];
487
488 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
489 cookie = buf;
490
491 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
492 }
493
494 int operate() override {
495 int ret;
496 reenter(this) {
497 using LockCR = RGWSimpleRadosLockCR;
498 yield call(new LockCR(sync_env->async_rados, store,
499 rgw_raw_obj{pool, sync_status_oid},
500 lock_name, cookie, lock_duration));
501 if (retcode < 0) {
502 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
503 return set_cr_error(retcode);
504 }
505 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
506 yield call(new WriteInfoCR(sync_env->async_rados, store,
507 rgw_raw_obj{pool, sync_status_oid},
508 status->sync_info));
509 if (retcode < 0) {
510 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
511 return set_cr_error(retcode);
512 }
513
514 /* take lock again, we just recreated the object */
515 yield call(new LockCR(sync_env->async_rados, store,
516 rgw_raw_obj{pool, sync_status_oid},
517 lock_name, cookie, lock_duration));
518 if (retcode < 0) {
519 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
520 return set_cr_error(retcode);
521 }
522
523 /* fetch current position in logs */
524 yield {
525 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
526 if (!conn) {
527 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
528 return set_cr_error(-EIO);
529 }
530 for (uint32_t i = 0; i < num_shards; i++) {
531 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
532 }
533 }
534 while (collect(&ret, NULL)) {
535 if (ret < 0) {
536 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
537 return set_state(RGWCoroutine_Error);
538 }
539 yield;
540 }
541 yield {
542 for (uint32_t i = 0; i < num_shards; i++) {
543 RGWDataChangesLogInfo& info = shards_info[i];
544 auto& marker = status->sync_markers[i];
545 marker.next_step_marker = info.marker;
546 marker.timestamp = info.last_update;
547 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
548 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
549 spawn(new WriteMarkerCR(sync_env->async_rados, store,
550 rgw_raw_obj{pool, oid}, marker), true);
551 }
552 }
553 while (collect(&ret, NULL)) {
554 if (ret < 0) {
555 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
556 return set_state(RGWCoroutine_Error);
557 }
558 yield;
559 }
560
561 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
562 yield call(new WriteInfoCR(sync_env->async_rados, store,
563 rgw_raw_obj{pool, sync_status_oid},
564 status->sync_info));
565 if (retcode < 0) {
566 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
567 return set_cr_error(retcode);
568 }
569 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
570 rgw_raw_obj{pool, sync_status_oid},
571 lock_name, cookie));
572 return set_cr_done();
573 }
574 return 0;
575 }
576 };
577
578 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
579 {
580 rgw_http_param_pair pairs[] = { { "type", "data" },
581 { NULL, NULL } };
582
583 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
584 if (ret < 0) {
585 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
586 return ret;
587 }
588
589 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
590
591 return 0;
592 }
593
594 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
595 {
596 rgw_datalog_info log_info;
597 int ret = read_log_info(&log_info);
598 if (ret < 0) {
599 return ret;
600 }
601
602 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
603 }
604
605 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
606 {
607 if (store->is_meta_master()) {
608 return 0;
609 }
610
611 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
612 }
613
614 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
615 {
616 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module);
617
618 if (initialized) {
619 return 0;
620 }
621
622 int ret = http_manager.set_threaded();
623 if (ret < 0) {
624 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
625 return ret;
626 }
627
628 initialized = true;
629
630 return 0;
631 }
632
633 void RGWRemoteDataLog::finish()
634 {
635 stop();
636 }
637
638 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
639 {
640 // cannot run concurrently with run_sync(), so run in a separate manager
641 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
642 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
643 int ret = http_manager.set_threaded();
644 if (ret < 0) {
645 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
646 return ret;
647 }
648 RGWDataSyncEnv sync_env_local = sync_env;
649 sync_env_local.http_manager = &http_manager;
650 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
651 http_manager.stop();
652 return ret;
653 }
654
655 int RGWRemoteDataLog::init_sync_status(int num_shards)
656 {
657 rgw_data_sync_status sync_status;
658 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
659 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
660 int ret = http_manager.set_threaded();
661 if (ret < 0) {
662 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
663 return ret;
664 }
665 RGWDataSyncEnv sync_env_local = sync_env;
666 sync_env_local.http_manager = &http_manager;
667 uint64_t instance_id;
668 get_random_bytes((char *)&instance_id, sizeof(instance_id));
669 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
670 http_manager.stop();
671 return ret;
672 }
673
674 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
675 {
676 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
677 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
678 return string(buf);
679 }
680
681 struct bucket_instance_meta_info {
682 string key;
683 obj_version ver;
684 utime_t mtime;
685 RGWBucketInstanceMetadataObject data;
686
687 bucket_instance_meta_info() {}
688
689 void decode_json(JSONObj *obj) {
690 JSONDecoder::decode_json("key", key, obj);
691 JSONDecoder::decode_json("ver", ver, obj);
692 JSONDecoder::decode_json("mtime", mtime, obj);
693 JSONDecoder::decode_json("data", data, obj);
694 }
695 };
696
697 class RGWListBucketIndexesCR : public RGWCoroutine {
698 RGWDataSyncEnv *sync_env;
699
700 RGWRados *store;
701
702 rgw_data_sync_status *sync_status;
703 int num_shards;
704
705 int req_ret;
706 int ret;
707
708 list<string> result;
709 list<string>::iterator iter;
710
711 RGWShardedOmapCRManager *entries_index;
712
713 string oid_prefix;
714
715 string path;
716 bucket_instance_meta_info meta_info;
717 string key;
718 string s;
719 int i;
720
721 bool failed;
722
723 public:
724 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
725 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
726 store(sync_env->store), sync_status(_sync_status),
727 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
728 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
729 path = "/admin/metadata/bucket.instance";
730 num_shards = sync_status->sync_info.num_shards;
731 }
732 ~RGWListBucketIndexesCR() override {
733 delete entries_index;
734 }
735
736 int operate() override {
737 reenter(this) {
738 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
739 store->get_zone_params().log_pool,
740 oid_prefix);
741 yield {
742 string entrypoint = string("/admin/metadata/bucket.instance");
743 /* FIXME: need a better scaling solution here, requires streaming output */
744 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
745 entrypoint, NULL, &result));
746 }
747 if (get_ret_status() < 0) {
748 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
749 return set_state(RGWCoroutine_Error);
750 }
751 for (iter = result.begin(); iter != result.end(); ++iter) {
752 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
753
754 key = *iter;
755
756 yield {
757 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
758 { NULL, NULL } };
759
760 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
761 }
762
763 num_shards = meta_info.data.get_bucket_info().num_shards;
764 if (num_shards > 0) {
765 for (i = 0; i < num_shards; i++) {
766 char buf[16];
767 snprintf(buf, sizeof(buf), ":%d", i);
768 s = key + buf;
769 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
770 }
771 } else {
772 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
773 }
774 }
775 yield {
776 if (!entries_index->finish()) {
777 failed = true;
778 }
779 }
780 if (!failed) {
781 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
782 int shard_id = (int)iter->first;
783 rgw_data_sync_marker& marker = iter->second;
784 marker.total_entries = entries_index->get_total_entries(shard_id);
785 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
786 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
787 marker), true);
788 }
789 } else {
790 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
791 EIO, string("failed to build bucket instances map")));
792 }
793 while (collect(&ret, NULL)) {
794 if (ret < 0) {
795 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
796 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
797 req_ret = ret;
798 }
799 yield;
800 }
801 drain_all();
802 if (req_ret < 0) {
803 yield return set_cr_error(req_ret);
804 }
805 yield return set_cr_done();
806 }
807 return 0;
808 }
809 };
810
811 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
812
813 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
814 RGWDataSyncEnv *sync_env;
815
816 string marker_oid;
817 rgw_data_sync_marker sync_marker;
818
819 map<string, string> key_to_marker;
820 map<string, string> marker_to_key;
821
822 void handle_finish(const string& marker) override {
823 map<string, string>::iterator iter = marker_to_key.find(marker);
824 if (iter == marker_to_key.end()) {
825 return;
826 }
827 key_to_marker.erase(iter->second);
828 reset_need_retry(iter->second);
829 marker_to_key.erase(iter);
830 }
831
832 public:
833 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
834 const string& _marker_oid,
835 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
836 sync_env(_sync_env),
837 marker_oid(_marker_oid),
838 sync_marker(_marker) {}
839
840 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
841 sync_marker.marker = new_marker;
842 sync_marker.pos = index_pos;
843
844 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
845 RGWRados *store = sync_env->store;
846
847 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
848 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
849 sync_marker);
850 }
851
852 /*
853 * create index from key -> marker, and from marker -> key
854 * this is useful so that we can insure that we only have one
855 * entry for any key that is used. This is needed when doing
856 * incremenatl sync of data, and we don't want to run multiple
857 * concurrent sync operations for the same bucket shard
858 */
859 bool index_key_to_marker(const string& key, const string& marker) {
860 if (key_to_marker.find(key) != key_to_marker.end()) {
861 set_need_retry(key);
862 return false;
863 }
864 key_to_marker[key] = marker;
865 marker_to_key[marker] = key;
866 return true;
867 }
868 };
869
870 // ostream wrappers to print buckets without copying strings
871 struct bucket_str {
872 const rgw_bucket& b;
873 bucket_str(const rgw_bucket& b) : b(b) {}
874 };
875 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
876 auto& b = rhs.b;
877 if (!b.tenant.empty()) {
878 out << b.tenant << '/';
879 }
880 out << b.name;
881 if (!b.bucket_id.empty()) {
882 out << ':' << b.bucket_id;
883 }
884 return out;
885 }
886
887 struct bucket_shard_str {
888 const rgw_bucket_shard& bs;
889 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
890 };
891 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
892 auto& bs = rhs.bs;
893 out << bucket_str{bs.bucket};
894 if (bs.shard_id >= 0) {
895 out << ':' << bs.shard_id;
896 }
897 return out;
898 }
899
900 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
901 RGWDataSyncEnv *sync_env;
902 rgw_bucket_shard bs;
903 RGWBucketInfo bucket_info;
904 rgw_bucket_shard_sync_info sync_status;
905 RGWMetaSyncEnv meta_sync_env;
906
907 RGWDataSyncDebugLogger logger;
908 const std::string status_oid;
909
910 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
911 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
912
913 public:
914 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
915 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
916 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
917 logger.init(sync_env, "Bucket", bs.get_key());
918 }
919 ~RGWRunBucketSyncCoroutine() override {
920 if (lease_cr) {
921 lease_cr->abort();
922 }
923 }
924
925 int operate() override;
926 };
927
928 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
929 RGWDataSyncEnv *sync_env;
930
931 string raw_key;
932 string entry_marker;
933
934 rgw_bucket_shard bs;
935
936 int sync_status;
937
938 bufferlist md_bl;
939
940 RGWDataSyncShardMarkerTrack *marker_tracker;
941
942 boost::intrusive_ptr<RGWOmapAppend> error_repo;
943 bool remove_from_repo;
944
945 set<string> keys;
946
947 public:
948 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
949 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
950 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
951 sync_env(_sync_env),
952 raw_key(_raw_key), entry_marker(_entry_marker),
953 sync_status(0),
954 marker_tracker(_marker_tracker),
955 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
956 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
957 }
958
959 int operate() override {
960 reenter(this) {
961 do {
962 yield {
963 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
964 &bs.bucket, &bs.shard_id);
965 if (ret < 0) {
966 return set_cr_error(-EIO);
967 }
968 if (marker_tracker) {
969 marker_tracker->reset_need_retry(raw_key);
970 }
971 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
972 }
973 } while (marker_tracker && marker_tracker->need_retry(raw_key));
974
975 sync_status = retcode;
976
977 if (sync_status == -ENOENT) {
978 // this was added when 'tenant/' was added to datalog entries, because
979 // preexisting tenant buckets could never sync and would stay in the
980 // error_repo forever
981 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
982 "for missing bucket " << raw_key << dendl;
983 sync_status = 0;
984 }
985
986 if (sync_status < 0) {
987 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
988 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
989 if (retcode < 0) {
990 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
991 }
992 if (error_repo && !error_repo->append(raw_key)) {
993 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
994 }
995 } else if (error_repo && remove_from_repo) {
996 keys = {raw_key};
997 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
998 if (retcode < 0) {
999 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1000 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1001 }
1002 }
1003 /* FIXME: what do do in case of error */
1004 if (marker_tracker && !entry_marker.empty()) {
1005 /* update marker */
1006 yield call(marker_tracker->finish(entry_marker));
1007 }
1008 if (sync_status == 0) {
1009 sync_status = retcode;
1010 }
1011 if (sync_status < 0) {
1012 return set_cr_error(sync_status);
1013 }
1014 return set_cr_done();
1015 }
1016 return 0;
1017 }
1018 };
1019
1020 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1021 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1022
1023 class RGWDataSyncShardCR : public RGWCoroutine {
1024 RGWDataSyncEnv *sync_env;
1025
1026 rgw_pool pool;
1027
1028 uint32_t shard_id;
1029 rgw_data_sync_marker sync_marker;
1030
1031 map<string, bufferlist> entries;
1032 map<string, bufferlist>::iterator iter;
1033
1034 string oid;
1035
1036 RGWDataSyncShardMarkerTrack *marker_tracker;
1037
1038 list<rgw_data_change_log_entry> log_entries;
1039 list<rgw_data_change_log_entry>::iterator log_iter;
1040 bool truncated;
1041
1042 RGWDataChangesLogInfo shard_info;
1043 string datalog_marker;
1044
1045 Mutex inc_lock;
1046 Cond inc_cond;
1047
1048 boost::asio::coroutine incremental_cr;
1049 boost::asio::coroutine full_cr;
1050
1051
1052 set<string> modified_shards;
1053 set<string> current_modified;
1054
1055 set<string>::iterator modified_iter;
1056
1057 int total_entries;
1058
1059 int spawn_window;
1060
1061 bool *reset_backoff;
1062
1063 set<string> spawned_keys;
1064
1065 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1066 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1067 string status_oid;
1068
1069
1070 string error_oid;
1071 RGWOmapAppend *error_repo;
1072 map<string, bufferlist> error_entries;
1073 string error_marker;
1074 int max_error_entries;
1075
1076 ceph::real_time error_retry_time;
1077
1078 #define RETRY_BACKOFF_SECS_MIN 60
1079 #define RETRY_BACKOFF_SECS_DEFAULT 60
1080 #define RETRY_BACKOFF_SECS_MAX 600
1081 uint32_t retry_backoff_secs;
1082
1083 RGWDataSyncDebugLogger logger;
1084 public:
1085 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1086 rgw_pool& _pool,
1087 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1088 sync_env(_sync_env),
1089 pool(_pool),
1090 shard_id(_shard_id),
1091 sync_marker(_marker),
1092 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1093 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1094 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1095 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1096 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1097 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1098 error_oid = status_oid + ".retry";
1099
1100 logger.init(sync_env, "DataShard", status_oid);
1101 }
1102
1103 ~RGWDataSyncShardCR() override {
1104 delete marker_tracker;
1105 if (lease_cr) {
1106 lease_cr->abort();
1107 }
1108 if (error_repo) {
1109 error_repo->put();
1110 }
1111 }
1112
1113 void append_modified_shards(set<string>& keys) {
1114 Mutex::Locker l(inc_lock);
1115 modified_shards.insert(keys.begin(), keys.end());
1116 }
1117
1118 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1119 delete marker_tracker;
1120 marker_tracker = mt;
1121 }
1122
1123 int operate() override {
1124 int r;
1125 while (true) {
1126 switch (sync_marker.state) {
1127 case rgw_data_sync_marker::FullSync:
1128 r = full_sync();
1129 if (r < 0) {
1130 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1131 return set_cr_error(r);
1132 }
1133 return 0;
1134 case rgw_data_sync_marker::IncrementalSync:
1135 r = incremental_sync();
1136 if (r < 0) {
1137 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1138 return set_cr_error(r);
1139 }
1140 return 0;
1141 default:
1142 return set_cr_error(-EIO);
1143 }
1144 }
1145 return 0;
1146 }
1147
1148 void init_lease_cr() {
1149 set_status("acquiring sync lock");
1150 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1151 string lock_name = "sync_lock";
1152 if (lease_cr) {
1153 lease_cr->abort();
1154 }
1155 RGWRados *store = sync_env->store;
1156 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1157 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1158 lock_name, lock_duration, this));
1159 lease_stack.reset(spawn(lease_cr.get(), false));
1160 }
1161
1162 int full_sync() {
1163 #define OMAP_GET_MAX_ENTRIES 100
1164 int max_entries = OMAP_GET_MAX_ENTRIES;
1165 reenter(&full_cr) {
1166 yield init_lease_cr();
1167 while (!lease_cr->is_locked()) {
1168 if (lease_cr->is_done()) {
1169 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1170 set_status("lease lock failed, early abort");
1171 return set_cr_error(lease_cr->get_ret_status());
1172 }
1173 set_sleeping(true);
1174 yield;
1175 }
1176 logger.log("full sync");
1177 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1178 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1179 total_entries = sync_marker.pos;
1180 do {
1181 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1182 if (retcode < 0) {
1183 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1184 lease_cr->go_down();
1185 drain_all();
1186 return set_cr_error(retcode);
1187 }
1188 iter = entries.begin();
1189 for (; iter != entries.end(); ++iter) {
1190 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
1191 total_entries++;
1192 if (!marker_tracker->start(iter->first, total_entries, real_time())) {
1193 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
1194 } else {
1195 // fetch remote and write locally
1196 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false);
1197 if (retcode < 0) {
1198 lease_cr->go_down();
1199 drain_all();
1200 return set_cr_error(retcode);
1201 }
1202 }
1203 sync_marker.marker = iter->first;
1204 }
1205 } while ((int)entries.size() == max_entries);
1206
1207 lease_cr->go_down();
1208 drain_all();
1209
1210 yield {
1211 /* update marker to reflect we're done with full sync */
1212 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1213 sync_marker.marker = sync_marker.next_step_marker;
1214 sync_marker.next_step_marker.clear();
1215 RGWRados *store = sync_env->store;
1216 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1217 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1218 sync_marker));
1219 }
1220 if (retcode < 0) {
1221 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1222 lease_cr->go_down();
1223 return set_cr_error(retcode);
1224 }
1225 }
1226 return 0;
1227 }
1228
1229 int incremental_sync() {
1230 reenter(&incremental_cr) {
1231 yield init_lease_cr();
1232 while (!lease_cr->is_locked()) {
1233 if (lease_cr->is_done()) {
1234 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1235 set_status("lease lock failed, early abort");
1236 return set_cr_error(lease_cr->get_ret_status());
1237 }
1238 set_sleeping(true);
1239 yield;
1240 }
1241 set_status("lease acquired");
1242 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1243 rgw_raw_obj(pool, error_oid),
1244 1 /* no buffer */);
1245 error_repo->get();
1246 spawn(error_repo, false);
1247 logger.log("inc sync");
1248 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1249 do {
1250 current_modified.clear();
1251 inc_lock.Lock();
1252 current_modified.swap(modified_shards);
1253 inc_lock.Unlock();
1254
1255 /* process out of band updates */
1256 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1257 yield {
1258 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1259 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1260 }
1261 }
1262
1263 /* process bucket shards that previously failed */
1264 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1265 error_marker, &error_entries,
1266 max_error_entries));
1267 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1268 iter = error_entries.begin();
1269 for (; iter != error_entries.end(); ++iter) {
1270 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl;
1271 spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false);
1272 error_marker = iter->first;
1273 }
1274 if ((int)error_entries.size() != max_error_entries) {
1275 if (error_marker.empty() && error_entries.empty()) {
1276 /* the retry repo is empty, we back off a bit before calling it again */
1277 retry_backoff_secs *= 2;
1278 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1279 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1280 }
1281 } else {
1282 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1283 }
1284 error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1285 error_marker.clear();
1286 }
1287
1288
1289 yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1290 if (retcode < 0) {
1291 ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1292 stop_spawned_services();
1293 drain_all();
1294 return set_cr_error(retcode);
1295 }
1296 datalog_marker = shard_info.marker;
1297 #define INCREMENTAL_MAX_ENTRIES 100
1298 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1299 if (datalog_marker > sync_marker.marker) {
1300 spawned_keys.clear();
1301 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1302 if (retcode < 0) {
1303 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1304 stop_spawned_services();
1305 drain_all();
1306 return set_cr_error(retcode);
1307 }
1308 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1309 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1310 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1311 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1312 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1313 continue;
1314 }
1315 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1316 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1317 } else {
1318 /*
1319 * don't spawn the same key more than once. We can do that as long as we don't yield
1320 */
1321 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1322 spawned_keys.insert(log_iter->entry.key);
1323 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1324 if (retcode < 0) {
1325 stop_spawned_services();
1326 drain_all();
1327 return set_cr_error(retcode);
1328 }
1329 }
1330 }
1331 }
1332 while ((int)num_spawned() > spawn_window) {
1333 set_status() << "num_spawned() > spawn_window";
1334 yield wait_for_child();
1335 int ret;
1336 while (collect(&ret, lease_stack.get())) {
1337 if (ret < 0) {
1338 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1339 /* we have reported this error */
1340 }
1341 /* not waiting for child here */
1342 }
1343 }
1344 }
1345 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1346 if (datalog_marker == sync_marker.marker) {
1347 #define INCREMENTAL_INTERVAL 20
1348 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1349 }
1350 } while (true);
1351 }
1352 return 0;
1353 }
1354 void stop_spawned_services() {
1355 lease_cr->go_down();
1356 if (error_repo) {
1357 error_repo->finish();
1358 error_repo->put();
1359 error_repo = NULL;
1360 }
1361 }
1362 };
1363
1364 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1365 RGWDataSyncEnv *sync_env;
1366
1367 rgw_pool pool;
1368
1369 uint32_t shard_id;
1370 rgw_data_sync_marker sync_marker;
1371
1372 public:
1373 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1374 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1375 sync_env(_sync_env),
1376 pool(_pool),
1377 shard_id(_shard_id),
1378 sync_marker(_marker) {
1379 }
1380
1381 RGWCoroutine *alloc_cr() override {
1382 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1383 }
1384
1385 RGWCoroutine *alloc_finisher_cr() override {
1386 RGWRados *store = sync_env->store;
1387 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1388 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1389 &sync_marker);
1390 }
1391
1392 void append_modified_shards(set<string>& keys) {
1393 Mutex::Locker l(cr_lock());
1394
1395 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1396 if (!cr) {
1397 return;
1398 }
1399
1400 cr->append_modified_shards(keys);
1401 }
1402 };
1403
1404 class RGWDataSyncCR : public RGWCoroutine {
1405 RGWDataSyncEnv *sync_env;
1406 uint32_t num_shards;
1407
1408 rgw_data_sync_status sync_status;
1409
1410 RGWDataSyncShardMarkerTrack *marker_tracker;
1411
1412 Mutex shard_crs_lock;
1413 map<int, RGWDataSyncShardControlCR *> shard_crs;
1414
1415 bool *reset_backoff;
1416
1417 RGWDataSyncDebugLogger logger;
1418
1419 RGWDataSyncModule *data_sync_module{nullptr};
1420 public:
1421 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1422 sync_env(_sync_env),
1423 num_shards(_num_shards),
1424 marker_tracker(NULL),
1425 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1426 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1427
1428 }
1429
1430 ~RGWDataSyncCR() override {
1431 for (auto iter : shard_crs) {
1432 iter.second->put();
1433 }
1434 }
1435
1436 int operate() override {
1437 reenter(this) {
1438
1439 /* read sync status */
1440 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1441
1442 data_sync_module = sync_env->sync_module->get_data_handler();
1443
1444 if (retcode < 0 && retcode != -ENOENT) {
1445 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1446 return set_cr_error(retcode);
1447 }
1448
1449 /* state: init status */
1450 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1451 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1452 sync_status.sync_info.num_shards = num_shards;
1453 uint64_t instance_id;
1454 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1455 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1456 if (retcode < 0) {
1457 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1458 return set_cr_error(retcode);
1459 }
1460 // sets state = StateBuildingFullSyncMaps
1461
1462 *reset_backoff = true;
1463 }
1464
1465 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1466
1467 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1468 /* call sync module init here */
1469 yield call(data_sync_module->init_sync(sync_env));
1470 if (retcode < 0) {
1471 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1472 return set_cr_error(retcode);
1473 }
1474 /* state: building full sync maps */
1475 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1476 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1477 if (retcode < 0) {
1478 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1479 return set_cr_error(retcode);
1480 }
1481 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1482
1483 /* update new state */
1484 yield call(set_sync_info_cr());
1485 if (retcode < 0) {
1486 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1487 return set_cr_error(retcode);
1488 }
1489
1490 *reset_backoff = true;
1491 }
1492
1493 yield {
1494 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1495 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1496 iter != sync_status.sync_markers.end(); ++iter) {
1497 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1498 iter->first, iter->second);
1499 cr->get();
1500 shard_crs_lock.Lock();
1501 shard_crs[iter->first] = cr;
1502 shard_crs_lock.Unlock();
1503 spawn(cr, true);
1504 }
1505 }
1506 }
1507
1508 return set_cr_done();
1509 }
1510 return 0;
1511 }
1512
1513 RGWCoroutine *set_sync_info_cr() {
1514 RGWRados *store = sync_env->store;
1515 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1516 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1517 sync_status.sync_info);
1518 }
1519
1520 void wakeup(int shard_id, set<string>& keys) {
1521 Mutex::Locker l(shard_crs_lock);
1522 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1523 if (iter == shard_crs.end()) {
1524 return;
1525 }
1526 iter->second->append_modified_shards(keys);
1527 iter->second->wakeup();
1528 }
1529 };
1530
1531 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1532 public:
1533 RGWDefaultDataSyncModule() {}
1534
1535 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1536 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1537 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1538 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1539 };
1540
1541 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1542 RGWDefaultDataSyncModule data_handler;
1543 public:
1544 RGWDefaultSyncModuleInstance() {}
1545 RGWDataSyncModule *get_data_handler() override {
1546 return &data_handler;
1547 }
1548 };
1549
1550 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1551 {
1552 instance->reset(new RGWDefaultSyncModuleInstance());
1553 return 0;
1554 }
1555
1556 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1557 {
1558 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1559 key, versioned_epoch,
1560 true, zones_trace);
1561 }
1562
1563 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1564 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1565 {
1566 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1567 bucket_info, key, versioned, versioned_epoch,
1568 NULL, NULL, false, &mtime, zones_trace);
1569 }
1570
1571 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1572 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1573 {
1574 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1575 bucket_info, key, versioned, versioned_epoch,
1576 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1577 }
1578
1579 class RGWDataSyncControlCR : public RGWBackoffControlCR
1580 {
1581 RGWDataSyncEnv *sync_env;
1582 uint32_t num_shards;
1583
1584 public:
1585 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, true),
1586 sync_env(_sync_env), num_shards(_num_shards) {
1587 }
1588
1589 RGWCoroutine *alloc_cr() override {
1590 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1591 }
1592
1593 void wakeup(int shard_id, set<string>& keys) {
1594 Mutex& m = cr_lock();
1595
1596 m.Lock();
1597 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1598 if (!cr) {
1599 m.Unlock();
1600 return;
1601 }
1602
1603 cr->get();
1604 m.Unlock();
1605
1606 if (cr) {
1607 cr->wakeup(shard_id, keys);
1608 }
1609
1610 cr->put();
1611 }
1612 };
1613
1614 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1615 RWLock::RLocker rl(lock);
1616 if (!data_sync_cr) {
1617 return;
1618 }
1619 data_sync_cr->wakeup(shard_id, keys);
1620 }
1621
1622 int RGWRemoteDataLog::run_sync(int num_shards)
1623 {
1624 lock.get_write();
1625 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1626 data_sync_cr->get(); // run() will drop a ref, so take another
1627 lock.unlock();
1628
1629 int r = run(data_sync_cr);
1630
1631 lock.get_write();
1632 data_sync_cr->put();
1633 data_sync_cr = NULL;
1634 lock.unlock();
1635
1636 if (r < 0) {
1637 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1638 return r;
1639 }
1640 return 0;
1641 }
1642
1643 int RGWDataSyncStatusManager::init()
1644 {
1645 auto zone_def_iter = store->zone_by_id.find(source_zone);
1646 if (zone_def_iter == store->zone_by_id.end()) {
1647 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1648 return -EIO;
1649 }
1650
1651 auto& zone_def = zone_def_iter->second;
1652
1653 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1654 return -ENOTSUP;
1655 }
1656
1657 RGWZoneParams& zone_params = store->get_zone_params();
1658
1659 sync_module = store->get_sync_module();
1660
1661 conn = store->get_zone_conn_by_id(source_zone);
1662 if (!conn) {
1663 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1664 return -EINVAL;
1665 }
1666
1667 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1668
1669 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1670 if (r < 0) {
1671 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1672 finalize();
1673 return r;
1674 }
1675
1676 rgw_datalog_info datalog_info;
1677 r = source_log.read_log_info(&datalog_info);
1678 if (r < 0) {
1679 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1680 finalize();
1681 return r;
1682 }
1683
1684 num_shards = datalog_info.num_shards;
1685
1686 for (int i = 0; i < num_shards; i++) {
1687 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1688 }
1689
1690 return 0;
1691 }
1692
1693 void RGWDataSyncStatusManager::finalize()
1694 {
1695 delete error_logger;
1696 error_logger = nullptr;
1697 }
1698
1699 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1700 {
1701 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1702 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1703
1704 return string(buf);
1705 }
1706
1707 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1708 {
1709 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1710 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1711
1712 return string(buf);
1713 }
1714
1715 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1716 const rgw_bucket& bucket, int shard_id,
1717 RGWSyncErrorLogger *_error_logger,
1718 RGWSyncModuleInstanceRef& _sync_module)
1719 {
1720 conn = _conn;
1721 source_zone = _source_zone;
1722 bs.bucket = bucket;
1723 bs.shard_id = shard_id;
1724
1725 sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module);
1726
1727 return 0;
1728 }
1729
1730 struct bucket_index_marker_info {
1731 string bucket_ver;
1732 string master_ver;
1733 string max_marker;
1734 bool syncstopped{false};
1735
1736 void decode_json(JSONObj *obj) {
1737 JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
1738 JSONDecoder::decode_json("master_ver", master_ver, obj);
1739 JSONDecoder::decode_json("max_marker", max_marker, obj);
1740 JSONDecoder::decode_json("syncstopped", syncstopped, obj);
1741 }
1742 };
1743
1744 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1745 RGWDataSyncEnv *sync_env;
1746 const string instance_key;
1747
1748 bucket_index_marker_info *info;
1749
1750 public:
1751 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1752 const rgw_bucket_shard& bs,
1753 bucket_index_marker_info *_info)
1754 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1755 instance_key(bs.get_key()), info(_info) {}
1756
1757 int operate() override {
1758 reenter(this) {
1759 yield {
1760 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1761 { "bucket-instance", instance_key.c_str() },
1762 { "info" , NULL },
1763 { NULL, NULL } };
1764
1765 string p = "/admin/log/";
1766 call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1767 }
1768 if (retcode < 0) {
1769 return set_cr_error(retcode);
1770 }
1771 return set_cr_done();
1772 }
1773 return 0;
1774 }
1775 };
1776
1777 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1778 RGWDataSyncEnv *sync_env;
1779
1780 rgw_bucket_shard bs;
1781 const string sync_status_oid;
1782
1783 rgw_bucket_shard_sync_info& status;
1784
1785 bucket_index_marker_info info;
1786 public:
1787 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1788 const rgw_bucket_shard& bs,
1789 rgw_bucket_shard_sync_info& _status)
1790 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1791 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1792 status(_status)
1793 {}
1794
1795 int operate() override {
1796 reenter(this) {
1797 /* fetch current position in logs */
1798 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1799 if (retcode < 0 && retcode != -ENOENT) {
1800 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1801 return set_cr_error(retcode);
1802 }
1803 yield {
1804 auto store = sync_env->store;
1805 rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1806
1807 if (info.syncstopped) {
1808 call(new RGWRadosRemoveCR(store, obj));
1809 } else {
1810 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1811 status.inc_marker.position = info.max_marker;
1812 map<string, bufferlist> attrs;
1813 status.encode_all_attrs(attrs);
1814 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1815 }
1816 }
1817 return set_cr_done();
1818 }
1819 return 0;
1820 }
1821 };
1822
1823 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1824 {
1825 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1826 }
1827
1828 template <class T>
1829 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1830 {
1831 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1832 if (iter == attrs.end()) {
1833 *val = T();
1834 return;
1835 }
1836
1837 bufferlist::iterator biter = iter->second.begin();
1838 try {
1839 ::decode(*val, biter);
1840 } catch (buffer::error& err) {
1841 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1842 }
1843 }
1844
1845 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1846 {
1847 decode_attr(cct, attrs, "state", &state);
1848 decode_attr(cct, attrs, "full_marker", &full_marker);
1849 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1850 }
1851
1852 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1853 {
1854 encode_state_attr(attrs);
1855 full_marker.encode_attr(attrs);
1856 inc_marker.encode_attr(attrs);
1857 }
1858
1859 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1860 {
1861 ::encode(state, attrs["state"]);
1862 }
1863
1864 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1865 {
1866 ::encode(*this, attrs["full_marker"]);
1867 }
1868
1869 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1870 {
1871 ::encode(*this, attrs["inc_marker"]);
1872 }
1873
1874 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1875 RGWDataSyncEnv *sync_env;
1876 string oid;
1877 rgw_bucket_shard_sync_info *status;
1878
1879 map<string, bufferlist> attrs;
1880 public:
1881 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1882 const rgw_bucket_shard& bs,
1883 rgw_bucket_shard_sync_info *_status)
1884 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1885 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1886 status(_status) {}
1887 int operate() override;
1888 };
1889
1890 int RGWReadBucketSyncStatusCoroutine::operate()
1891 {
1892 reenter(this) {
1893 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1894 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1895 &attrs));
1896 if (retcode == -ENOENT) {
1897 *status = rgw_bucket_shard_sync_info();
1898 return set_cr_done();
1899 }
1900 if (retcode < 0) {
1901 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1902 return set_cr_error(retcode);
1903 }
1904 status->decode_from_attrs(sync_env->cct, attrs);
1905 return set_cr_done();
1906 }
1907 return 0;
1908 }
1909 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
1910 {
1911 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
1912 }
1913
1914 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
1915 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
1916 delete iter->second;
1917 }
1918 delete error_logger;
1919 }
1920
1921
1922 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
1923 {
1924 JSONDecoder::decode_json("ID", id, obj);
1925 JSONDecoder::decode_json("DisplayName", display_name, obj);
1926 }
1927
1928 struct bucket_list_entry {
1929 bool delete_marker;
1930 rgw_obj_key key;
1931 bool is_latest;
1932 real_time mtime;
1933 string etag;
1934 uint64_t size;
1935 string storage_class;
1936 rgw_bucket_entry_owner owner;
1937 uint64_t versioned_epoch;
1938 string rgw_tag;
1939
1940 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
1941
1942 void decode_json(JSONObj *obj) {
1943 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
1944 JSONDecoder::decode_json("Key", key.name, obj);
1945 JSONDecoder::decode_json("VersionId", key.instance, obj);
1946 JSONDecoder::decode_json("IsLatest", is_latest, obj);
1947 string mtime_str;
1948 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
1949
1950 struct tm t;
1951 uint32_t nsec;
1952 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
1953 ceph_timespec ts;
1954 ts.tv_sec = (uint64_t)internal_timegm(&t);
1955 ts.tv_nsec = nsec;
1956 mtime = real_clock::from_ceph_timespec(ts);
1957 }
1958 JSONDecoder::decode_json("ETag", etag, obj);
1959 JSONDecoder::decode_json("Size", size, obj);
1960 JSONDecoder::decode_json("StorageClass", storage_class, obj);
1961 JSONDecoder::decode_json("Owner", owner, obj);
1962 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
1963 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
1964 }
1965 };
1966
1967 struct bucket_list_result {
1968 string name;
1969 string prefix;
1970 string key_marker;
1971 string version_id_marker;
1972 int max_keys;
1973 bool is_truncated;
1974 list<bucket_list_entry> entries;
1975
1976 bucket_list_result() : max_keys(0), is_truncated(false) {}
1977
1978 void decode_json(JSONObj *obj) {
1979 JSONDecoder::decode_json("Name", name, obj);
1980 JSONDecoder::decode_json("Prefix", prefix, obj);
1981 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
1982 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
1983 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
1984 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
1985 JSONDecoder::decode_json("Entries", entries, obj);
1986 }
1987 };
1988
1989 class RGWListBucketShardCR: public RGWCoroutine {
1990 RGWDataSyncEnv *sync_env;
1991 const rgw_bucket_shard& bs;
1992 const string instance_key;
1993 rgw_obj_key marker_position;
1994
1995 bucket_list_result *result;
1996
1997 public:
1998 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
1999 rgw_obj_key& _marker_position, bucket_list_result *_result)
2000 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2001 instance_key(bs.get_key()), marker_position(_marker_position),
2002 result(_result) {}
2003
2004 int operate() override {
2005 reenter(this) {
2006 yield {
2007 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2008 { "versions" , NULL },
2009 { "format" , "json" },
2010 { "objs-container" , "true" },
2011 { "key-marker" , marker_position.name.c_str() },
2012 { "version-id-marker" , marker_position.instance.c_str() },
2013 { NULL, NULL } };
2014 // don't include tenant in the url, it's already part of instance_key
2015 string p = string("/") + bs.bucket.name;
2016 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2017 }
2018 if (retcode < 0) {
2019 return set_cr_error(retcode);
2020 }
2021 return set_cr_done();
2022 }
2023 return 0;
2024 }
2025 };
2026
2027 class RGWListBucketIndexLogCR: public RGWCoroutine {
2028 RGWDataSyncEnv *sync_env;
2029 const string instance_key;
2030 string marker;
2031
2032 list<rgw_bi_log_entry> *result;
2033
2034 public:
2035 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2036 string& _marker, list<rgw_bi_log_entry> *_result)
2037 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2038 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2039
2040 int operate() override {
2041 reenter(this) {
2042 yield {
2043 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2044 { "format" , "json" },
2045 { "marker" , marker.c_str() },
2046 { "type", "bucket-index" },
2047 { NULL, NULL } };
2048
2049 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2050 }
2051 if (retcode < 0) {
2052 return set_cr_error(retcode);
2053 }
2054 return set_cr_done();
2055 }
2056 return 0;
2057 }
2058 };
2059
2060 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2061
2062 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2063 RGWDataSyncEnv *sync_env;
2064
2065 string marker_oid;
2066 rgw_bucket_shard_full_sync_marker sync_marker;
2067
2068 public:
2069 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2070 const string& _marker_oid,
2071 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2072 sync_env(_sync_env),
2073 marker_oid(_marker_oid),
2074 sync_marker(_marker) {}
2075
2076 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2077 sync_marker.position = new_marker;
2078 sync_marker.count = index_pos;
2079
2080 map<string, bufferlist> attrs;
2081 sync_marker.encode_attr(attrs);
2082
2083 RGWRados *store = sync_env->store;
2084
2085 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2086 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2087 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2088 attrs);
2089 }
2090 };
2091
2092 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2093 RGWDataSyncEnv *sync_env;
2094
2095 string marker_oid;
2096 rgw_bucket_shard_inc_sync_marker sync_marker;
2097
2098 map<rgw_obj_key, string> key_to_marker;
2099 map<string, rgw_obj_key> marker_to_key;
2100
2101 void handle_finish(const string& marker) override {
2102 map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2103 if (iter == marker_to_key.end()) {
2104 return;
2105 }
2106 key_to_marker.erase(iter->second);
2107 reset_need_retry(iter->second);
2108 marker_to_key.erase(iter);
2109 }
2110
2111 public:
2112 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2113 const string& _marker_oid,
2114 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2115 sync_env(_sync_env),
2116 marker_oid(_marker_oid),
2117 sync_marker(_marker) {}
2118
2119 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2120 sync_marker.position = new_marker;
2121
2122 map<string, bufferlist> attrs;
2123 sync_marker.encode_attr(attrs);
2124
2125 RGWRados *store = sync_env->store;
2126
2127 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2128 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2129 store,
2130 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2131 attrs);
2132 }
2133
2134 /*
2135 * create index from key -> <op, marker>, and from marker -> key
2136 * this is useful so that we can insure that we only have one
2137 * entry for any key that is used. This is needed when doing
2138 * incremenatl sync of data, and we don't want to run multiple
2139 * concurrent sync operations for the same bucket shard
2140 * Also, we should make sure that we don't run concurrent operations on the same key with
2141 * different ops.
2142 */
2143 bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2144 if (key_to_marker.find(key) != key_to_marker.end()) {
2145 set_need_retry(key);
2146 return false;
2147 }
2148 key_to_marker[key] = marker;
2149 marker_to_key[marker] = key;
2150 return true;
2151 }
2152
2153 bool can_do_op(const rgw_obj_key& key) {
2154 return (key_to_marker.find(key) == key_to_marker.end());
2155 }
2156 };
2157
2158 template <class T, class K>
2159 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2160 RGWDataSyncEnv *sync_env;
2161
2162 RGWBucketInfo *bucket_info;
2163 const rgw_bucket_shard& bs;
2164
2165 rgw_obj_key key;
2166 bool versioned;
2167 uint64_t versioned_epoch;
2168 rgw_bucket_entry_owner owner;
2169 real_time timestamp;
2170 RGWModifyOp op;
2171 RGWPendingState op_state;
2172
2173 T entry_marker;
2174 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2175
2176 int sync_status;
2177
2178 stringstream error_ss;
2179
2180 RGWDataSyncDebugLogger logger;
2181
2182 bool error_injection;
2183
2184 RGWDataSyncModule *data_sync_module;
2185
2186 rgw_zone_set zones_trace;
2187
2188 public:
2189 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2190 RGWBucketInfo *_bucket_info,
2191 const rgw_bucket_shard& bs,
2192 const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2193 real_time& _timestamp,
2194 const rgw_bucket_entry_owner& _owner,
2195 RGWModifyOp _op, RGWPendingState _op_state,
2196 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2197 sync_env(_sync_env),
2198 bucket_info(_bucket_info), bs(bs),
2199 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2200 owner(_owner),
2201 timestamp(_timestamp), op(_op),
2202 op_state(_op_state),
2203 entry_marker(_entry_marker),
2204 marker_tracker(_marker_tracker),
2205 sync_status(0){
2206 stringstream ss;
2207 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2208 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2209 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2210 set_status("init");
2211
2212 logger.init(sync_env, "Object", ss.str());
2213
2214 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2215
2216 data_sync_module = sync_env->sync_module->get_data_handler();
2217
2218 zones_trace = _zones_trace;
2219 zones_trace.insert(sync_env->store->get_zone().id);
2220 }
2221
2222 int operate() override {
2223 reenter(this) {
2224 /* skip entries that are not complete */
2225 if (op_state != CLS_RGW_STATE_COMPLETE) {
2226 goto done;
2227 }
2228 do {
2229 yield {
2230 marker_tracker->reset_need_retry(key);
2231 if (key.name.empty()) {
2232 /* shouldn't happen */
2233 set_status("skipping empty entry");
2234 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2235 goto done;
2236 }
2237 if (error_injection &&
2238 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2239 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2240 retcode = -EIO;
2241 } else if (op == CLS_RGW_OP_ADD ||
2242 op == CLS_RGW_OP_LINK_OLH) {
2243 if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2244 set_status("skipping entry");
2245 ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2246 goto done;
2247
2248 }
2249 set_status("syncing obj");
2250 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2251 logger.log("fetch");
2252 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2253 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2254 set_status("removing obj");
2255 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2256 versioned = true;
2257 }
2258 logger.log("remove");
2259 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
2260 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2261 logger.log("creating delete marker");
2262 set_status("creating delete marker");
2263 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2264 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
2265 }
2266 }
2267 } while (marker_tracker->need_retry(key));
2268 {
2269 stringstream ss;
2270 if (retcode >= 0) {
2271 ss << "done";
2272 } else {
2273 ss << "done, retcode=" << retcode;
2274 }
2275 logger.log(ss.str());
2276 }
2277
2278 if (retcode < 0 && retcode != -ENOENT) {
2279 set_status() << "failed to sync obj; retcode=" << retcode;
2280 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2281 << bucket_shard_str{bs} << "/" << key.name << dendl;
2282 error_ss << bucket_shard_str{bs} << "/" << key.name;
2283 sync_status = retcode;
2284 }
2285 if (!error_ss.str().empty()) {
2286 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object"));
2287 }
2288 done:
2289 if (sync_status == 0) {
2290 /* update marker */
2291 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2292 yield call(marker_tracker->finish(entry_marker));
2293 sync_status = retcode;
2294 }
2295 if (sync_status < 0) {
2296 return set_cr_error(sync_status);
2297 }
2298 return set_cr_done();
2299 }
2300 return 0;
2301 }
2302 };
2303
2304 #define BUCKET_SYNC_SPAWN_WINDOW 20
2305
2306 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2307 RGWDataSyncEnv *sync_env;
2308 const rgw_bucket_shard& bs;
2309 RGWBucketInfo *bucket_info;
2310 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2311 bucket_list_result list_result;
2312 list<bucket_list_entry>::iterator entries_iter;
2313 rgw_bucket_shard_full_sync_marker& full_marker;
2314 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2315 rgw_obj_key list_marker;
2316 bucket_list_entry *entry{nullptr};
2317 RGWModifyOp op{CLS_RGW_OP_ADD};
2318
2319 int total_entries{0};
2320
2321 int sync_status{0};
2322
2323 const string& status_oid;
2324
2325 RGWDataSyncDebugLogger logger;
2326 rgw_zone_set zones_trace;
2327 public:
2328 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2329 RGWBucketInfo *_bucket_info,
2330 const std::string& status_oid,
2331 RGWContinuousLeaseCR *lease_cr,
2332 rgw_bucket_shard_full_sync_marker& _full_marker)
2333 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2334 bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2335 marker_tracker(sync_env, status_oid, full_marker),
2336 status_oid(status_oid) {
2337 logger.init(sync_env, "BucketFull", bs.get_key());
2338 zones_trace.insert(sync_env->source_zone);
2339 }
2340
2341 int operate() override;
2342 };
2343
2344 int RGWBucketShardFullSyncCR::operate()
2345 {
2346 int ret;
2347 reenter(this) {
2348 list_marker = full_marker.position;
2349
2350 total_entries = full_marker.count;
2351 do {
2352 if (!lease_cr->is_locked()) {
2353 drain_all();
2354 return set_cr_error(-ECANCELED);
2355 }
2356 set_status("listing remote bucket");
2357 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2358 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2359 &list_result));
2360 if (retcode < 0 && retcode != -ENOENT) {
2361 set_status("failed bucket listing, going down");
2362 drain_all();
2363 return set_cr_error(retcode);
2364 }
2365 entries_iter = list_result.entries.begin();
2366 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2367 if (!lease_cr->is_locked()) {
2368 drain_all();
2369 return set_cr_error(-ECANCELED);
2370 }
2371 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2372 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2373 entry = &(*entries_iter);
2374 total_entries++;
2375 list_marker = entries_iter->key;
2376 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2377 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2378 } else {
2379 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2380 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2381 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2382 false, /* versioned, only matters for object removal */
2383 entry->versioned_epoch, entry->mtime,
2384 entry->owner, op, CLS_RGW_STATE_COMPLETE,
2385 entry->key, &marker_tracker, zones_trace),
2386 false);
2387 }
2388 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2389 yield wait_for_child();
2390 bool again = true;
2391 while (again) {
2392 again = collect(&ret, nullptr);
2393 if (ret < 0) {
2394 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2395 sync_status = ret;
2396 /* we have reported this error */
2397 }
2398 }
2399 }
2400 }
2401 } while (list_result.is_truncated && sync_status == 0);
2402 set_status("done iterating over all objects");
2403 /* wait for all operations to complete */
2404 while (num_spawned()) {
2405 yield wait_for_child();
2406 bool again = true;
2407 while (again) {
2408 again = collect(&ret, nullptr);
2409 if (ret < 0) {
2410 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2411 sync_status = ret;
2412 /* we have reported this error */
2413 }
2414 }
2415 }
2416 if (!lease_cr->is_locked()) {
2417 return set_cr_error(-ECANCELED);
2418 }
2419 /* update sync state to incremental */
2420 if (sync_status == 0) {
2421 yield {
2422 rgw_bucket_shard_sync_info sync_status;
2423 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2424 map<string, bufferlist> attrs;
2425 sync_status.encode_state_attr(attrs);
2426 RGWRados *store = sync_env->store;
2427 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2428 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2429 attrs));
2430 }
2431 } else {
2432 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2433 }
2434 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2435 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2436 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2437 return set_cr_error(retcode);
2438 }
2439 if (sync_status < 0) {
2440 return set_cr_error(sync_status);
2441 }
2442 return set_cr_done();
2443 }
2444 return 0;
2445 }
2446
2447 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2448 RGWDataSyncEnv *sync_env;
2449 const rgw_bucket_shard& bs;
2450 RGWBucketInfo *bucket_info;
2451 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2452 list<rgw_bi_log_entry> list_result;
2453 list<rgw_bi_log_entry>::iterator entries_iter;
2454 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2455 rgw_bucket_shard_inc_sync_marker& inc_marker;
2456 rgw_obj_key key;
2457 rgw_bi_log_entry *entry{nullptr};
2458 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2459 bool updated_status{false};
2460 const string& status_oid;
2461 const string& zone_id;
2462 ceph::real_time sync_modify_time;
2463
2464 string cur_id;
2465
2466 RGWDataSyncDebugLogger logger;
2467
2468 int sync_status{0};
2469 bool syncstopped{false};
2470
2471 public:
2472 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2473 const rgw_bucket_shard& bs,
2474 RGWBucketInfo *_bucket_info,
2475 const std::string& status_oid,
2476 RGWContinuousLeaseCR *lease_cr,
2477 rgw_bucket_shard_inc_sync_marker& _inc_marker)
2478 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2479 bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2480 marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
2481 set_description() << "bucket shard incremental sync bucket="
2482 << bucket_shard_str{bs};
2483 set_status("init");
2484 logger.init(sync_env, "BucketInc", bs.get_key());
2485 }
2486
2487 int operate() override;
2488 };
2489
2490 int RGWBucketShardIncrementalSyncCR::operate()
2491 {
2492 int ret;
2493 reenter(this) {
2494 do {
2495 if (!lease_cr->is_locked()) {
2496 drain_all();
2497 return set_cr_error(-ECANCELED);
2498 }
2499 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
2500 set_status() << "listing bilog; position=" << inc_marker.position;
2501 yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2502 &list_result));
2503 if (retcode < 0 && retcode != -ENOENT ) {
2504 drain_all();
2505 if (!syncstopped) {
2506 /* wait for all operations to complete */
2507 return set_cr_error(retcode);
2508 } else {
2509 /* no need to retry */
2510 break;
2511 }
2512 }
2513 squash_map.clear();
2514 for (auto& e : list_result) {
2515 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
2516 ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
2517 sync_modify_time = e.timestamp;
2518 syncstopped = true;
2519 continue;
2520 }
2521 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
2522 ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
2523 sync_modify_time = e.timestamp;
2524 syncstopped = false;
2525 continue;
2526 }
2527 if (e.state != CLS_RGW_STATE_COMPLETE) {
2528 continue;
2529 }
2530 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2531 continue;
2532 }
2533 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2534 if (squash_entry.first <= e.timestamp) {
2535 squash_entry = make_pair<>(e.timestamp, e.op);
2536 }
2537 }
2538
2539 entries_iter = list_result.begin();
2540 for (; entries_iter != list_result.end(); ++entries_iter) {
2541 if (!lease_cr->is_locked()) {
2542 drain_all();
2543 return set_cr_error(-ECANCELED);
2544 }
2545 entry = &(*entries_iter);
2546 {
2547 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2548 if (p < 0) {
2549 cur_id = entry->id;
2550 } else {
2551 cur_id = entry->id.substr(p + 1);
2552 }
2553 }
2554 inc_marker.position = cur_id;
2555
2556 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2557 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl;
2558 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2559 continue;
2560 }
2561
2562 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2563 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2564 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2565 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2566 continue;
2567 }
2568
2569 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2570
2571 if (!key.ns.empty()) {
2572 set_status() << "skipping entry in namespace: " << entry->object;
2573 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2574 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2575 continue;
2576 }
2577
2578 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2579 if (entry->op == CLS_RGW_OP_CANCEL) {
2580 set_status() << "canceled operation, skipping";
2581 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2582 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2583 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2584 continue;
2585 }
2586 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2587 set_status() << "non-complete operation, skipping";
2588 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2589 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2590 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2591 continue;
2592 }
2593 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2594 set_status() << "redundant operation, skipping";
2595 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2596 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2597 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2598 continue;
2599 }
2600 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2601 set_status() << "squashed operation, skipping";
2602 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2603 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2604 /* not updating high marker though */
2605 continue;
2606 }
2607 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2608 << bucket_shard_str{bs} << "/" << key << dendl;
2609 updated_status = false;
2610 while (!marker_tracker.can_do_op(key)) {
2611 if (!updated_status) {
2612 set_status() << "can't do op, conflicting inflight operation";
2613 updated_status = true;
2614 }
2615 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2616 yield wait_for_child();
2617 bool again = true;
2618 while (again) {
2619 again = collect(&ret, nullptr);
2620 if (ret < 0) {
2621 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2622 sync_status = ret;
2623 /* we have reported this error */
2624 }
2625 }
2626 }
2627 if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2628 set_status() << "can't do op, sync already in progress for object";
2629 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2630 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2631 continue;
2632 }
2633 // yield {
2634 set_status() << "start object sync";
2635 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2636 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2637 } else {
2638 uint64_t versioned_epoch = 0;
2639 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2640 if (entry->ver.pool < 0) {
2641 versioned_epoch = entry->ver.epoch;
2642 }
2643 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2644 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2645 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2646 entry->is_versioned(), versioned_epoch,
2647 entry->timestamp, owner, entry->op, entry->state,
2648 cur_id, &marker_tracker, entry->zones_trace),
2649 false);
2650 }
2651 // }
2652 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2653 set_status() << "num_spawned() > spawn_window";
2654 yield wait_for_child();
2655 bool again = true;
2656 while (again) {
2657 again = collect(&ret, nullptr);
2658 if (ret < 0) {
2659 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2660 sync_status = ret;
2661 /* we have reported this error */
2662 }
2663 /* not waiting for child here */
2664 }
2665 }
2666 }
2667 } while (!list_result.empty() && sync_status == 0);
2668
2669 if (syncstopped) {
2670 drain_all();
2671
2672 yield {
2673 const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
2674 RGWRados *store = sync_env->store;
2675 call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
2676 }
2677 lease_cr->abort();
2678 return set_cr_done();
2679 }
2680
2681 while (num_spawned()) {
2682 yield wait_for_child();
2683 bool again = true;
2684 while (again) {
2685 again = collect(&ret, nullptr);
2686 if (ret < 0) {
2687 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2688 sync_status = ret;
2689 /* we have reported this error */
2690 }
2691 /* not waiting for child here */
2692 }
2693 }
2694
2695 yield call(marker_tracker.flush());
2696 if (retcode < 0) {
2697 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2698 return set_cr_error(retcode);
2699 }
2700 if (sync_status < 0) {
2701 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2702 }
2703
2704 /* wait for all operations to complete */
2705 drain_all();
2706
2707 if (sync_status < 0) {
2708 return set_cr_error(sync_status);
2709 }
2710
2711 return set_cr_done();
2712 }
2713 return 0;
2714 }
2715
2716 int RGWRunBucketSyncCoroutine::operate()
2717 {
2718 reenter(this) {
2719 yield {
2720 set_status("acquiring sync lock");
2721 auto store = sync_env->store;
2722 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
2723 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2724 "sync_lock",
2725 cct->_conf->rgw_sync_lease_period,
2726 this));
2727 lease_stack.reset(spawn(lease_cr.get(), false));
2728 }
2729 while (!lease_cr->is_locked()) {
2730 if (lease_cr->is_done()) {
2731 ldout(cct, 5) << "lease cr failed, done early" << dendl;
2732 set_status("lease lock failed, early abort");
2733 return set_cr_error(lease_cr->get_ret_status());
2734 }
2735 set_sleeping(true);
2736 yield;
2737 }
2738
2739 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2740 if (retcode < 0 && retcode != -ENOENT) {
2741 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2742 << bucket_shard_str{bs} << dendl;
2743 lease_cr->go_down();
2744 drain_all();
2745 return set_cr_error(retcode);
2746 }
2747
2748 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2749 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2750
2751 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2752 if (retcode == -ENOENT) {
2753 /* bucket instance info has not been synced in yet, fetch it now */
2754 yield {
2755 ldout(sync_env->cct, 10) << "no local info for bucket "
2756 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
2757 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
2758
2759 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
2760
2761 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
2762 string() /* no marker */,
2763 MDLOG_STATUS_COMPLETE,
2764 NULL /* no marker tracker */));
2765 }
2766 if (retcode < 0) {
2767 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
2768 lease_cr->go_down();
2769 drain_all();
2770 return set_cr_error(retcode);
2771 }
2772
2773 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2774 }
2775 if (retcode < 0) {
2776 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
2777 lease_cr->go_down();
2778 drain_all();
2779 return set_cr_error(retcode);
2780 }
2781
2782 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
2783 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
2784 if (retcode < 0) {
2785 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
2786 << " failed, retcode=" << retcode << dendl;
2787 lease_cr->go_down();
2788 drain_all();
2789 return set_cr_error(retcode);
2790 }
2791 }
2792
2793 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
2794 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
2795 status_oid, lease_cr.get(),
2796 sync_status.full_marker));
2797 if (retcode < 0) {
2798 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
2799 << " failed, retcode=" << retcode << dendl;
2800 lease_cr->go_down();
2801 drain_all();
2802 return set_cr_error(retcode);
2803 }
2804 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2805 }
2806
2807 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
2808 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
2809 status_oid, lease_cr.get(),
2810 sync_status.inc_marker));
2811 if (retcode < 0) {
2812 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
2813 << " failed, retcode=" << retcode << dendl;
2814 lease_cr->go_down();
2815 drain_all();
2816 return set_cr_error(retcode);
2817 }
2818 }
2819
2820 lease_cr->go_down();
2821 drain_all();
2822 return set_cr_done();
2823 }
2824
2825 return 0;
2826 }
2827
2828 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
2829 {
2830 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
2831 }
2832
2833 int RGWBucketSyncStatusManager::init()
2834 {
2835 conn = store->get_zone_conn_by_id(source_zone);
2836 if (!conn) {
2837 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
2838 return -EINVAL;
2839 }
2840
2841 int ret = http_manager.set_threaded();
2842 if (ret < 0) {
2843 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
2844 return ret;
2845 }
2846
2847
2848 const string key = bucket.get_key();
2849
2850 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
2851 { NULL, NULL } };
2852
2853 string path = string("/admin/metadata/bucket.instance");
2854
2855 bucket_instance_meta_info result;
2856 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
2857 if (ret < 0) {
2858 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
2859 return ret;
2860 }
2861
2862 RGWBucketInfo& bi = result.data.get_bucket_info();
2863 num_shards = bi.num_shards;
2864
2865 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
2866
2867 sync_module.reset(new RGWDefaultSyncModuleInstance());
2868
2869 int effective_num_shards = (num_shards ? num_shards : 1);
2870
2871 auto async_rados = store->get_async_rados();
2872
2873 for (int i = 0; i < effective_num_shards; i++) {
2874 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
2875 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
2876 if (ret < 0) {
2877 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
2878 return ret;
2879 }
2880 source_logs[i] = l;
2881 }
2882
2883 return 0;
2884 }
2885
2886 int RGWBucketSyncStatusManager::init_sync_status()
2887 {
2888 list<RGWCoroutinesStack *> stacks;
2889
2890 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2891 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2892 RGWRemoteBucketLog *l = iter->second;
2893 stack->call(l->init_sync_status_cr());
2894
2895 stacks.push_back(stack);
2896 }
2897
2898 return cr_mgr.run(stacks);
2899 }
2900
2901 int RGWBucketSyncStatusManager::read_sync_status()
2902 {
2903 list<RGWCoroutinesStack *> stacks;
2904
2905 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2906 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2907 RGWRemoteBucketLog *l = iter->second;
2908 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
2909
2910 stacks.push_back(stack);
2911 }
2912
2913 int ret = cr_mgr.run(stacks);
2914 if (ret < 0) {
2915 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2916 << bucket_str{bucket} << dendl;
2917 return ret;
2918 }
2919
2920 return 0;
2921 }
2922
2923 int RGWBucketSyncStatusManager::run()
2924 {
2925 list<RGWCoroutinesStack *> stacks;
2926
2927 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2928 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
2929 RGWRemoteBucketLog *l = iter->second;
2930 stack->call(l->run_sync_cr());
2931
2932 stacks.push_back(stack);
2933 }
2934
2935 int ret = cr_mgr.run(stacks);
2936 if (ret < 0) {
2937 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
2938 << bucket_str{bucket} << dendl;
2939 return ret;
2940 }
2941
2942 return 0;
2943 }
2944
2945 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
2946 const rgw_bucket_shard& bs)
2947 {
2948 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
2949 }
2950
2951
2952 // TODO: move into rgw_data_sync_trim.cc
2953 #undef dout_prefix
2954 #define dout_prefix (*_dout << "data trim: ")
2955
2956 namespace {
2957
2958 /// return the marker that it's safe to trim up to
2959 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
2960 {
2961 return m.state == m.FullSync ? m.next_step_marker : m.marker;
2962 }
2963
2964 /// comparison operator for take_min_markers()
2965 bool operator<(const rgw_data_sync_marker& lhs,
2966 const rgw_data_sync_marker& rhs)
2967 {
2968 // sort by stable marker
2969 return get_stable_marker(lhs) < get_stable_marker(rhs);
2970 }
2971
2972 /// populate the container starting with 'dest' with the minimum stable marker
2973 /// of each shard for all of the peers in [first, last)
2974 template <typename IterIn, typename IterOut>
2975 void take_min_markers(IterIn first, IterIn last, IterOut dest)
2976 {
2977 if (first == last) {
2978 return;
2979 }
2980 // initialize markers with the first peer's
2981 auto m = dest;
2982 for (auto &shard : first->sync_markers) {
2983 *m = std::move(shard.second);
2984 ++m;
2985 }
2986 // for remaining peers, replace with smaller markers
2987 for (auto p = first + 1; p != last; ++p) {
2988 m = dest;
2989 for (auto &shard : p->sync_markers) {
2990 if (shard.second < *m) {
2991 *m = std::move(shard.second);
2992 }
2993 ++m;
2994 }
2995 }
2996 }
2997
2998 } // anonymous namespace
2999
3000 class DataLogTrimCR : public RGWCoroutine {
3001 RGWRados *store;
3002 RGWHTTPManager *http;
3003 const int num_shards;
3004 const std::string& zone_id; //< my zone id
3005 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3006 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3007 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3008 int ret{0};
3009
3010 public:
3011 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3012 int num_shards, std::vector<std::string>& last_trim)
3013 : RGWCoroutine(store->ctx()), store(store), http(http),
3014 num_shards(num_shards),
3015 zone_id(store->get_zone().id),
3016 peer_status(store->zone_conn_map.size()),
3017 min_shard_markers(num_shards),
3018 last_trim(last_trim)
3019 {}
3020
3021 int operate() override;
3022 };
3023
3024 int DataLogTrimCR::operate()
3025 {
3026 reenter(this) {
3027 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3028 set_status("fetching sync status");
3029 yield {
3030 // query data sync status from each sync peer
3031 rgw_http_param_pair params[] = {
3032 { "type", "data" },
3033 { "status", nullptr },
3034 { "source-zone", zone_id.c_str() },
3035 { nullptr, nullptr }
3036 };
3037
3038 auto p = peer_status.begin();
3039 for (auto& c : store->zone_conn_map) {
3040 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3041 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3042 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3043 false);
3044 ++p;
3045 }
3046 }
3047
3048 // must get a successful reply from all peers to consider trimming
3049 ret = 0;
3050 while (ret == 0 && num_spawned() > 0) {
3051 yield wait_for_child();
3052 collect_next(&ret);
3053 }
3054 drain_all();
3055
3056 if (ret < 0) {
3057 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3058 return set_cr_error(ret);
3059 }
3060
3061 ldout(cct, 10) << "trimming log shards" << dendl;
3062 set_status("trimming log shards");
3063 yield {
3064 // determine the minimum marker for each shard
3065 take_min_markers(peer_status.begin(), peer_status.end(),
3066 min_shard_markers.begin());
3067
3068 for (int i = 0; i < num_shards; i++) {
3069 const auto& m = min_shard_markers[i];
3070 auto& stable = get_stable_marker(m);
3071 if (stable <= last_trim[i]) {
3072 continue;
3073 }
3074 ldout(cct, 10) << "trimming log shard " << i
3075 << " at marker=" << stable
3076 << " last_trim=" << last_trim[i] << dendl;
3077 using TrimCR = RGWSyncLogTrimCR;
3078 spawn(new TrimCR(store, store->data_log->get_oid(i),
3079 stable, &last_trim[i]),
3080 true);
3081 }
3082 }
3083 return set_cr_done();
3084 }
3085 return 0;
3086 }
3087
3088 class DataLogTrimPollCR : public RGWCoroutine {
3089 RGWRados *store;
3090 RGWHTTPManager *http;
3091 const int num_shards;
3092 const utime_t interval; //< polling interval
3093 const std::string lock_oid; //< use first data log shard for lock
3094 const std::string lock_cookie;
3095 std::vector<std::string> last_trim; //< last trimmed marker per shard
3096
3097 public:
3098 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3099 int num_shards, utime_t interval)
3100 : RGWCoroutine(store->ctx()), store(store), http(http),
3101 num_shards(num_shards), interval(interval),
3102 lock_oid(store->data_log->get_oid(0)),
3103 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3104 last_trim(num_shards)
3105 {}
3106
3107 int operate() override;
3108 };
3109
3110 int DataLogTrimPollCR::operate()
3111 {
3112 reenter(this) {
3113 for (;;) {
3114 set_status("sleeping");
3115 wait(interval);
3116
3117 // request a 'data_trim' lock that covers the entire wait interval to
3118 // prevent other gateways from attempting to trim for the duration
3119 set_status("acquiring trim lock");
3120 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3121 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3122 "data_trim", lock_cookie,
3123 interval.sec()));
3124 if (retcode < 0) {
3125 // if the lock is already held, go back to sleep and try again later
3126 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3127 << interval.sec() << "s" << dendl;
3128 continue;
3129 }
3130
3131 set_status("trimming");
3132 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3133
3134 // note that the lock is not released. this is intentional, as it avoids
3135 // duplicating this work in other gateways
3136 }
3137 }
3138 return 0;
3139 }
3140
3141 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3142 RGWHTTPManager *http,
3143 int num_shards, utime_t interval)
3144 {
3145 return new DataLogTrimPollCR(store, http, num_shards, interval);
3146 }