]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 #include <boost/utility/string_ref.hpp>
2
3 #include "common/ceph_json.h"
4 #include "common/RWLock.h"
5 #include "common/RefCountedObj.h"
6 #include "common/WorkQueue.h"
7 #include "common/Throttle.h"
8 #include "common/errno.h"
9
10 #include "rgw_common.h"
11 #include "rgw_rados.h"
12 #include "rgw_sync.h"
13 #include "rgw_data_sync.h"
14 #include "rgw_rest_conn.h"
15 #include "rgw_cr_rados.h"
16 #include "rgw_cr_rest.h"
17 #include "rgw_http_client.h"
18 #include "rgw_bucket.h"
19 #include "rgw_metadata.h"
20 #include "rgw_sync_module.h"
21 #include "rgw_sync_log_trim.h"
22
23 #include "cls/lock/cls_lock_client.h"
24
25 #include "auth/Crypto.h"
26
27 #include <boost/asio/yield.hpp>
28
29 #define dout_subsys ceph_subsys_rgw
30
31 #undef dout_prefix
32 #define dout_prefix (*_dout << "data sync: ")
33
34 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
35 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
36 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
37 static string bucket_status_oid_prefix = "bucket.sync-status";
38
39 class RGWSyncDebugLogger {
40 CephContext *cct;
41 string prefix;
42
43 bool ended;
44
45 public:
46 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
47 const string& sync_type, const string& sync_stage,
48 const string& resource, bool log_start = true) {
49 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
50 }
51 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
52 ~RGWSyncDebugLogger();
53
54 void init(CephContext *_cct, const string& source_zone,
55 const string& sync_type, const string& sync_stage,
56 const string& resource, bool log_start = true);
57 void log(const string& state);
58 void finish(int status);
59 };
60
61 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
62 const string& sync_type, const string& sync_section,
63 const string& resource, bool log_start)
64 {
65 cct = _cct;
66 ended = false;
67 string zone_str = source_zone.substr(0, 8);
68 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
69 if (log_start) {
70 log("start");
71 }
72 }
73
74 RGWSyncDebugLogger::~RGWSyncDebugLogger()
75 {
76 if (!ended) {
77 log("finish");
78 }
79 }
80
81 void RGWSyncDebugLogger::log(const string& state)
82 {
83 ldout(cct, 5) << prefix << ":" << state << dendl;
84 }
85
86 void RGWSyncDebugLogger::finish(int status)
87 {
88 ended = true;
89 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
90 }
91
92 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
93 public:
94 RGWDataSyncDebugLogger() {}
95 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
96 const string& resource, bool log_start = true) {
97 init(sync_env, sync_section, resource, log_start);
98 }
99 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
100 const string& resource, bool log_start = true) {
101 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
102 }
103
104 };
105
106 void rgw_datalog_info::decode_json(JSONObj *obj) {
107 JSONDecoder::decode_json("num_objects", num_shards, obj);
108 }
109
110 void rgw_datalog_entry::decode_json(JSONObj *obj) {
111 JSONDecoder::decode_json("key", key, obj);
112 utime_t ut;
113 JSONDecoder::decode_json("timestamp", ut, obj);
114 timestamp = ut.to_real_time();
115 }
116
117 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
118 JSONDecoder::decode_json("marker", marker, obj);
119 JSONDecoder::decode_json("truncated", truncated, obj);
120 JSONDecoder::decode_json("entries", entries, obj);
121 };
122
123 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
124 static constexpr int MAX_CONCURRENT_SHARDS = 16;
125
126 RGWDataSyncEnv *env;
127 const int num_shards;
128 int shard_id{0};;
129
130 map<uint32_t, rgw_data_sync_marker>& markers;
131
132 public:
133 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
134 map<uint32_t, rgw_data_sync_marker>& markers)
135 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
136 env(env), num_shards(num_shards), markers(markers)
137 {}
138 bool spawn_next() override;
139 };
140
141 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
142 {
143 if (shard_id >= num_shards) {
144 return false;
145 }
146 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
147 spawn(new CR(env->async_rados, env->store,
148 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
149 &markers[shard_id]),
150 false);
151 shard_id++;
152 return true;
153 }
154
155 class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
156 static constexpr int MAX_CONCURRENT_SHARDS = 16;
157
158 RGWDataSyncEnv *env;
159
160 uint64_t max_entries;
161 int num_shards;
162 int shard_id{0};;
163
164 string marker;
165 map<int, std::set<std::string>> &entries_map;
166
167 public:
168 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
169 map<int, std::set<std::string>>& _entries_map)
170 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
171 max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
172 {}
173 bool spawn_next() override;
174 };
175
176 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
177 {
178 if (shard_id > num_shards)
179 return false;
180
181 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
182 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
183 marker, &entries_map[shard_id], max_entries), false);
184
185 ++shard_id;
186 return true;
187 }
188
189 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
190 RGWDataSyncEnv *sync_env;
191 rgw_data_sync_status *sync_status;
192
193 public:
194 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
195 rgw_data_sync_status *_status)
196 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
197 {}
198 int operate() override;
199 };
200
201 int RGWReadDataSyncStatusCoroutine::operate()
202 {
203 reenter(this) {
204 // read sync info
205 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
206 yield {
207 bool empty_on_enoent = false; // fail on ENOENT
208 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
209 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
210 &sync_status->sync_info, empty_on_enoent));
211 }
212 if (retcode < 0) {
213 ldout(sync_env->cct, 4) << "failed to read sync status info with "
214 << cpp_strerror(retcode) << dendl;
215 return set_cr_error(retcode);
216 }
217 // read shard markers
218 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
219 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
220 sync_status->sync_markers));
221 if (retcode < 0) {
222 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
223 << cpp_strerror(retcode) << dendl;
224 return set_cr_error(retcode);
225 }
226 return set_cr_done();
227 }
228 return 0;
229 }
230
231 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
232 RGWDataSyncEnv *sync_env;
233
234 RGWRESTReadResource *http_op;
235
236 int shard_id;
237 RGWDataChangesLogInfo *shard_info;
238
239 public:
240 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
241 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
242 sync_env(_sync_env),
243 http_op(NULL),
244 shard_id(_shard_id),
245 shard_info(_shard_info) {
246 }
247
248 ~RGWReadRemoteDataLogShardInfoCR() override {
249 if (http_op) {
250 http_op->put();
251 }
252 }
253
254 int operate() override {
255 reenter(this) {
256 yield {
257 char buf[16];
258 snprintf(buf, sizeof(buf), "%d", shard_id);
259 rgw_http_param_pair pairs[] = { { "type" , "data" },
260 { "id", buf },
261 { "info" , NULL },
262 { NULL, NULL } };
263
264 string p = "/admin/log/";
265
266 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
267
268 http_op->set_user_info((void *)stack);
269
270 int ret = http_op->aio_read();
271 if (ret < 0) {
272 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
273 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
274 return set_cr_error(ret);
275 }
276
277 return io_block(0);
278 }
279 yield {
280 int ret = http_op->wait(shard_info);
281 if (ret < 0) {
282 return set_cr_error(ret);
283 }
284 return set_cr_done();
285 }
286 }
287 return 0;
288 }
289 };
290
291 struct read_remote_data_log_response {
292 string marker;
293 bool truncated;
294 list<rgw_data_change_log_entry> entries;
295
296 read_remote_data_log_response() : truncated(false) {}
297
298 void decode_json(JSONObj *obj) {
299 JSONDecoder::decode_json("marker", marker, obj);
300 JSONDecoder::decode_json("truncated", truncated, obj);
301 JSONDecoder::decode_json("entries", entries, obj);
302 };
303 };
304
305 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
306 RGWDataSyncEnv *sync_env;
307
308 RGWRESTReadResource *http_op;
309
310 int shard_id;
311 string *pmarker;
312 list<rgw_data_change_log_entry> *entries;
313 bool *truncated;
314
315 read_remote_data_log_response response;
316
317 public:
318 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
319 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
320 sync_env(_sync_env),
321 http_op(NULL),
322 shard_id(_shard_id),
323 pmarker(_pmarker),
324 entries(_entries),
325 truncated(_truncated) {
326 }
327 ~RGWReadRemoteDataLogShardCR() override {
328 if (http_op) {
329 http_op->put();
330 }
331 }
332
333 int operate() override {
334 reenter(this) {
335 yield {
336 char buf[16];
337 snprintf(buf, sizeof(buf), "%d", shard_id);
338 rgw_http_param_pair pairs[] = { { "type" , "data" },
339 { "id", buf },
340 { "marker", pmarker->c_str() },
341 { "extra-info", "true" },
342 { NULL, NULL } };
343
344 string p = "/admin/log/";
345
346 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
347
348 http_op->set_user_info((void *)stack);
349
350 int ret = http_op->aio_read();
351 if (ret < 0) {
352 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
353 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
354 return set_cr_error(ret);
355 }
356
357 return io_block(0);
358 }
359 yield {
360 int ret = http_op->wait(&response);
361 if (ret < 0) {
362 return set_cr_error(ret);
363 }
364 entries->clear();
365 entries->swap(response.entries);
366 *pmarker = response.marker;
367 *truncated = response.truncated;
368 return set_cr_done();
369 }
370 }
371 return 0;
372 }
373 };
374
375 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
376 RGWDataSyncEnv *sync_env;
377
378 int num_shards;
379 map<int, RGWDataChangesLogInfo> *datalog_info;
380
381 int shard_id;
382 #define READ_DATALOG_MAX_CONCURRENT 10
383
384 public:
385 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
386 int _num_shards,
387 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
388 sync_env(_sync_env), num_shards(_num_shards),
389 datalog_info(_datalog_info), shard_id(0) {}
390 bool spawn_next() override;
391 };
392
393 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
394 if (shard_id >= num_shards) {
395 return false;
396 }
397 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
398 shard_id++;
399 return true;
400 }
401
402 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
403 RGWDataSyncEnv *sync_env;
404 RGWRESTReadResource *http_op;
405
406 int shard_id;
407 string marker;
408 uint32_t max_entries;
409 rgw_datalog_shard_data *result;
410
411 public:
412 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
413 const string& _marker, uint32_t _max_entries,
414 rgw_datalog_shard_data *_result)
415 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
416 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
417
418 int send_request() override {
419 RGWRESTConn *conn = sync_env->conn;
420 RGWRados *store = sync_env->store;
421
422 char buf[32];
423 snprintf(buf, sizeof(buf), "%d", shard_id);
424
425 char max_entries_buf[32];
426 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
427
428 const char *marker_key = (marker.empty() ? "" : "marker");
429
430 rgw_http_param_pair pairs[] = { { "type", "data" },
431 { "id", buf },
432 { "max-entries", max_entries_buf },
433 { marker_key, marker.c_str() },
434 { NULL, NULL } };
435
436 string p = "/admin/log/";
437
438 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
439 http_op->set_user_info((void *)stack);
440
441 int ret = http_op->aio_read();
442 if (ret < 0) {
443 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
444 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
445 http_op->put();
446 return ret;
447 }
448
449 return 0;
450 }
451
452 int request_complete() override {
453 int ret = http_op->wait(result);
454 http_op->put();
455 if (ret < 0 && ret != -ENOENT) {
456 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
457 return ret;
458 }
459 return 0;
460 }
461 };
462
463 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
464 RGWDataSyncEnv *sync_env;
465
466 map<int, string> shards;
467 int max_entries_per_shard;
468 map<int, rgw_datalog_shard_data> *result;
469
470 map<int, string>::iterator iter;
471 #define READ_DATALOG_MAX_CONCURRENT 10
472
473 public:
474 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
475 map<int, string>& _shards,
476 int _max_entries_per_shard,
477 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
478 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
479 result(_result) {
480 shards.swap(_shards);
481 iter = shards.begin();
482 }
483 bool spawn_next() override;
484 };
485
486 bool RGWListRemoteDataLogCR::spawn_next() {
487 if (iter == shards.end()) {
488 return false;
489 }
490
491 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
492 ++iter;
493 return true;
494 }
495
496 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
497 static constexpr uint32_t lock_duration = 30;
498 RGWDataSyncEnv *sync_env;
499 RGWRados *store;
500 const rgw_pool& pool;
501 const uint32_t num_shards;
502
503 string sync_status_oid;
504
505 string lock_name;
506 string cookie;
507 rgw_data_sync_status *status;
508 map<int, RGWDataChangesLogInfo> shards_info;
509 public:
510 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
511 uint64_t instance_id,
512 rgw_data_sync_status *status)
513 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
514 pool(store->get_zone_params().log_pool),
515 num_shards(num_shards), status(status) {
516 lock_name = "sync_lock";
517
518 status->sync_info.instance_id = instance_id;
519
520 #define COOKIE_LEN 16
521 char buf[COOKIE_LEN + 1];
522
523 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
524 cookie = buf;
525
526 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
527 }
528
529 int operate() override {
530 int ret;
531 reenter(this) {
532 using LockCR = RGWSimpleRadosLockCR;
533 yield call(new LockCR(sync_env->async_rados, store,
534 rgw_raw_obj{pool, sync_status_oid},
535 lock_name, cookie, lock_duration));
536 if (retcode < 0) {
537 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
538 return set_cr_error(retcode);
539 }
540 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
541 yield call(new WriteInfoCR(sync_env->async_rados, store,
542 rgw_raw_obj{pool, sync_status_oid},
543 status->sync_info));
544 if (retcode < 0) {
545 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
546 return set_cr_error(retcode);
547 }
548
549 /* take lock again, we just recreated the object */
550 yield call(new LockCR(sync_env->async_rados, store,
551 rgw_raw_obj{pool, sync_status_oid},
552 lock_name, cookie, lock_duration));
553 if (retcode < 0) {
554 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
555 return set_cr_error(retcode);
556 }
557
558 /* fetch current position in logs */
559 yield {
560 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
561 if (!conn) {
562 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
563 return set_cr_error(-EIO);
564 }
565 for (uint32_t i = 0; i < num_shards; i++) {
566 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
567 }
568 }
569 while (collect(&ret, NULL)) {
570 if (ret < 0) {
571 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
572 return set_state(RGWCoroutine_Error);
573 }
574 yield;
575 }
576 yield {
577 for (uint32_t i = 0; i < num_shards; i++) {
578 RGWDataChangesLogInfo& info = shards_info[i];
579 auto& marker = status->sync_markers[i];
580 marker.next_step_marker = info.marker;
581 marker.timestamp = info.last_update;
582 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
583 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
584 spawn(new WriteMarkerCR(sync_env->async_rados, store,
585 rgw_raw_obj{pool, oid}, marker), true);
586 }
587 }
588 while (collect(&ret, NULL)) {
589 if (ret < 0) {
590 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
591 return set_state(RGWCoroutine_Error);
592 }
593 yield;
594 }
595
596 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
597 yield call(new WriteInfoCR(sync_env->async_rados, store,
598 rgw_raw_obj{pool, sync_status_oid},
599 status->sync_info));
600 if (retcode < 0) {
601 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
602 return set_cr_error(retcode);
603 }
604 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
605 rgw_raw_obj{pool, sync_status_oid},
606 lock_name, cookie));
607 return set_cr_done();
608 }
609 return 0;
610 }
611 };
612
613 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
614 {
615 rgw_http_param_pair pairs[] = { { "type", "data" },
616 { NULL, NULL } };
617
618 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
619 if (ret < 0) {
620 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
621 return ret;
622 }
623
624 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
625
626 return 0;
627 }
628
629 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
630 {
631 rgw_datalog_info log_info;
632 int ret = read_log_info(&log_info);
633 if (ret < 0) {
634 return ret;
635 }
636
637 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
638 }
639
640 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
641 {
642 if (store->is_meta_master()) {
643 return 0;
644 }
645
646 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
647 }
648
649 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
650 {
651 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
652 _source_zone, _sync_module, observer);
653
654 if (initialized) {
655 return 0;
656 }
657
658 int ret = http_manager.set_threaded();
659 if (ret < 0) {
660 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
661 return ret;
662 }
663
664 initialized = true;
665
666 return 0;
667 }
668
669 void RGWRemoteDataLog::finish()
670 {
671 stop();
672 }
673
674 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
675 {
676 // cannot run concurrently with run_sync(), so run in a separate manager
677 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
678 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
679 int ret = http_manager.set_threaded();
680 if (ret < 0) {
681 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
682 return ret;
683 }
684 RGWDataSyncEnv sync_env_local = sync_env;
685 sync_env_local.http_manager = &http_manager;
686 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
687 http_manager.stop();
688 return ret;
689 }
690
691 int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
692 {
693 // cannot run concurrently with run_sync(), so run in a separate manager
694 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
695 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
696 int ret = http_manager.set_threaded();
697 if (ret < 0) {
698 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
699 return ret;
700 }
701 RGWDataSyncEnv sync_env_local = sync_env;
702 sync_env_local.http_manager = &http_manager;
703 map<int, std::set<std::string>> entries_map;
704 uint64_t max_entries{1};
705 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
706 http_manager.stop();
707
708 if (ret == 0) {
709 for (const auto& entry : entries_map) {
710 if (entry.second.size() != 0) {
711 recovering_shards.insert(entry.first);
712 }
713 }
714 }
715
716 return ret;
717 }
718
719 int RGWRemoteDataLog::init_sync_status(int num_shards)
720 {
721 rgw_data_sync_status sync_status;
722 sync_status.sync_info.num_shards = num_shards;
723
724 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
725 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
726 int ret = http_manager.set_threaded();
727 if (ret < 0) {
728 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
729 return ret;
730 }
731 RGWDataSyncEnv sync_env_local = sync_env;
732 sync_env_local.http_manager = &http_manager;
733 uint64_t instance_id;
734 get_random_bytes((char *)&instance_id, sizeof(instance_id));
735 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
736 http_manager.stop();
737 return ret;
738 }
739
740 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
741 {
742 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
743 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
744 return string(buf);
745 }
746
747 struct bucket_instance_meta_info {
748 string key;
749 obj_version ver;
750 utime_t mtime;
751 RGWBucketInstanceMetadataObject data;
752
753 bucket_instance_meta_info() {}
754
755 void decode_json(JSONObj *obj) {
756 JSONDecoder::decode_json("key", key, obj);
757 JSONDecoder::decode_json("ver", ver, obj);
758 JSONDecoder::decode_json("mtime", mtime, obj);
759 JSONDecoder::decode_json("data", data, obj);
760 }
761 };
762
763 class RGWListBucketIndexesCR : public RGWCoroutine {
764 RGWDataSyncEnv *sync_env;
765
766 RGWRados *store;
767
768 rgw_data_sync_status *sync_status;
769 int num_shards;
770
771 int req_ret;
772 int ret;
773
774 list<string> result;
775 list<string>::iterator iter;
776
777 RGWShardedOmapCRManager *entries_index;
778
779 string oid_prefix;
780
781 string path;
782 bucket_instance_meta_info meta_info;
783 string key;
784 string s;
785 int i;
786
787 bool failed;
788
789 public:
790 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
791 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
792 store(sync_env->store), sync_status(_sync_status),
793 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
794 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
795 path = "/admin/metadata/bucket.instance";
796 num_shards = sync_status->sync_info.num_shards;
797 }
798 ~RGWListBucketIndexesCR() override {
799 delete entries_index;
800 }
801
802 int operate() override {
803 reenter(this) {
804 yield {
805 string entrypoint = string("/admin/metadata/bucket.instance");
806 /* FIXME: need a better scaling solution here, requires streaming output */
807 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
808 entrypoint, NULL, &result));
809 }
810 if (retcode < 0) {
811 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
812 return set_cr_error(retcode);
813 }
814 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
815 store->get_zone_params().log_pool,
816 oid_prefix);
817 yield; // yield so OmapAppendCRs can start
818 for (iter = result.begin(); iter != result.end(); ++iter) {
819 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
820
821 key = *iter;
822
823 yield {
824 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
825 { NULL, NULL } };
826
827 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
828 }
829
830 num_shards = meta_info.data.get_bucket_info().num_shards;
831 if (num_shards > 0) {
832 for (i = 0; i < num_shards; i++) {
833 char buf[16];
834 snprintf(buf, sizeof(buf), ":%d", i);
835 s = key + buf;
836 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
837 }
838 } else {
839 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
840 }
841 }
842 yield {
843 if (!entries_index->finish()) {
844 failed = true;
845 }
846 }
847 if (!failed) {
848 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
849 int shard_id = (int)iter->first;
850 rgw_data_sync_marker& marker = iter->second;
851 marker.total_entries = entries_index->get_total_entries(shard_id);
852 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
853 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
854 marker), true);
855 }
856 } else {
857 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
858 EIO, string("failed to build bucket instances map")));
859 }
860 while (collect(&ret, NULL)) {
861 if (ret < 0) {
862 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
863 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
864 req_ret = ret;
865 }
866 yield;
867 }
868 drain_all();
869 if (req_ret < 0) {
870 yield return set_cr_error(req_ret);
871 }
872 yield return set_cr_done();
873 }
874 return 0;
875 }
876 };
877
878 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
879
880 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
881 RGWDataSyncEnv *sync_env;
882
883 string marker_oid;
884 rgw_data_sync_marker sync_marker;
885
886 map<string, string> key_to_marker;
887 map<string, string> marker_to_key;
888
889 void handle_finish(const string& marker) override {
890 map<string, string>::iterator iter = marker_to_key.find(marker);
891 if (iter == marker_to_key.end()) {
892 return;
893 }
894 key_to_marker.erase(iter->second);
895 reset_need_retry(iter->second);
896 marker_to_key.erase(iter);
897 }
898
899 public:
900 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
901 const string& _marker_oid,
902 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
903 sync_env(_sync_env),
904 marker_oid(_marker_oid),
905 sync_marker(_marker) {}
906
907 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
908 sync_marker.marker = new_marker;
909 sync_marker.pos = index_pos;
910
911 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
912 RGWRados *store = sync_env->store;
913
914 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
915 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
916 sync_marker);
917 }
918
919 /*
920 * create index from key -> marker, and from marker -> key
921 * this is useful so that we can insure that we only have one
922 * entry for any key that is used. This is needed when doing
923 * incremenatl sync of data, and we don't want to run multiple
924 * concurrent sync operations for the same bucket shard
925 */
926 bool index_key_to_marker(const string& key, const string& marker) {
927 if (key_to_marker.find(key) != key_to_marker.end()) {
928 set_need_retry(key);
929 return false;
930 }
931 key_to_marker[key] = marker;
932 marker_to_key[marker] = key;
933 return true;
934 }
935 };
936
937 // ostream wrappers to print buckets without copying strings
938 struct bucket_str {
939 const rgw_bucket& b;
940 bucket_str(const rgw_bucket& b) : b(b) {}
941 };
942 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
943 auto& b = rhs.b;
944 if (!b.tenant.empty()) {
945 out << b.tenant << '/';
946 }
947 out << b.name;
948 if (!b.bucket_id.empty()) {
949 out << ':' << b.bucket_id;
950 }
951 return out;
952 }
953
954 struct bucket_shard_str {
955 const rgw_bucket_shard& bs;
956 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
957 };
958 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
959 auto& bs = rhs.bs;
960 out << bucket_str{bs.bucket};
961 if (bs.shard_id >= 0) {
962 out << ':' << bs.shard_id;
963 }
964 return out;
965 }
966
967 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
968 RGWDataSyncEnv *sync_env;
969 rgw_bucket_shard bs;
970 RGWBucketInfo bucket_info;
971 rgw_bucket_shard_sync_info sync_status;
972 RGWMetaSyncEnv meta_sync_env;
973
974 RGWDataSyncDebugLogger logger;
975 const std::string status_oid;
976
977 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
978 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
979
980 public:
981 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
982 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
983 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
984 logger.init(sync_env, "Bucket", bs.get_key());
985 }
986 ~RGWRunBucketSyncCoroutine() override {
987 if (lease_cr) {
988 lease_cr->abort();
989 }
990 }
991
992 int operate() override;
993 };
994
995 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
996 RGWDataSyncEnv *sync_env;
997
998 string raw_key;
999 string entry_marker;
1000
1001 rgw_bucket_shard bs;
1002
1003 int sync_status;
1004
1005 bufferlist md_bl;
1006
1007 RGWDataSyncShardMarkerTrack *marker_tracker;
1008
1009 boost::intrusive_ptr<RGWOmapAppend> error_repo;
1010 bool remove_from_repo;
1011
1012 set<string> keys;
1013
1014 public:
1015 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1016 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
1017 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
1018 sync_env(_sync_env),
1019 raw_key(_raw_key), entry_marker(_entry_marker),
1020 sync_status(0),
1021 marker_tracker(_marker_tracker),
1022 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1023 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
1024 }
1025
1026 int operate() override {
1027 reenter(this) {
1028 do {
1029 yield {
1030 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1031 &bs.bucket, &bs.shard_id);
1032 if (ret < 0) {
1033 return set_cr_error(-EIO);
1034 }
1035 if (marker_tracker) {
1036 marker_tracker->reset_need_retry(raw_key);
1037 }
1038 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
1039 }
1040 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1041
1042 sync_status = retcode;
1043
1044 if (sync_status == -ENOENT) {
1045 // this was added when 'tenant/' was added to datalog entries, because
1046 // preexisting tenant buckets could never sync and would stay in the
1047 // error_repo forever
1048 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
1049 "for missing bucket " << raw_key << dendl;
1050 sync_status = 0;
1051 }
1052
1053 if (sync_status < 0) {
1054 // write actual sync failures for 'radosgw-admin sync error list'
1055 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1056 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1057 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1058 if (retcode < 0) {
1059 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
1060 }
1061 }
1062 if (error_repo && !error_repo->append(raw_key)) {
1063 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
1064 }
1065 } else if (error_repo && remove_from_repo) {
1066 keys = {raw_key};
1067 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1068 if (retcode < 0) {
1069 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1070 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1071 }
1072 }
1073 if (sync_env->observer) {
1074 sync_env->observer->on_bucket_changed(bs.bucket.get_key());
1075 }
1076 /* FIXME: what do do in case of error */
1077 if (marker_tracker && !entry_marker.empty()) {
1078 /* update marker */
1079 yield call(marker_tracker->finish(entry_marker));
1080 }
1081 if (sync_status == 0) {
1082 sync_status = retcode;
1083 }
1084 if (sync_status < 0) {
1085 return set_cr_error(sync_status);
1086 }
1087 return set_cr_done();
1088 }
1089 return 0;
1090 }
1091 };
1092
1093 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1094 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1095
1096 enum RemoteDatalogStatus {
1097 RemoteNotTrimmed = 0,
1098 RemoteTrimmed = 1,
1099 RemoteMightTrimmed = 2
1100 };
1101
1102 class RGWDataSyncShardCR : public RGWCoroutine {
1103 RGWDataSyncEnv *sync_env;
1104
1105 rgw_pool pool;
1106
1107 uint32_t shard_id;
1108 rgw_data_sync_marker sync_marker;
1109
1110 std::set<std::string> entries;
1111 std::set<std::string>::iterator iter;
1112
1113 string oid;
1114
1115 RGWDataSyncShardMarkerTrack *marker_tracker;
1116
1117 list<rgw_data_change_log_entry> log_entries;
1118 list<rgw_data_change_log_entry>::iterator log_iter;
1119 bool truncated;
1120
1121 RGWDataChangesLogInfo shard_info;
1122 string datalog_marker;
1123
1124 RemoteDatalogStatus remote_trimmed;
1125 Mutex inc_lock;
1126 Cond inc_cond;
1127
1128 boost::asio::coroutine incremental_cr;
1129 boost::asio::coroutine full_cr;
1130
1131
1132 set<string> modified_shards;
1133 set<string> current_modified;
1134
1135 set<string>::iterator modified_iter;
1136
1137 int total_entries;
1138
1139 int spawn_window;
1140
1141 bool *reset_backoff;
1142
1143 set<string> spawned_keys;
1144
1145 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1146 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1147 string status_oid;
1148
1149
1150 string error_oid;
1151 RGWOmapAppend *error_repo;
1152 std::set<std::string> error_entries;
1153 string error_marker;
1154 int max_error_entries;
1155
1156 ceph::real_time error_retry_time;
1157
1158 #define RETRY_BACKOFF_SECS_MIN 60
1159 #define RETRY_BACKOFF_SECS_DEFAULT 60
1160 #define RETRY_BACKOFF_SECS_MAX 600
1161 uint32_t retry_backoff_secs;
1162
1163 RGWDataSyncDebugLogger logger;
1164 public:
1165 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1166 rgw_pool& _pool,
1167 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1168 sync_env(_sync_env),
1169 pool(_pool),
1170 shard_id(_shard_id),
1171 sync_marker(_marker),
1172 marker_tracker(NULL), truncated(false), remote_trimmed(RemoteNotTrimmed), inc_lock("RGWDataSyncShardCR::inc_lock"),
1173 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1174 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1175 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1176 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1177 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1178 error_oid = status_oid + ".retry";
1179
1180 logger.init(sync_env, "DataShard", status_oid);
1181 }
1182
1183 ~RGWDataSyncShardCR() override {
1184 delete marker_tracker;
1185 if (lease_cr) {
1186 lease_cr->abort();
1187 }
1188 if (error_repo) {
1189 error_repo->put();
1190 }
1191 }
1192
1193 void append_modified_shards(set<string>& keys) {
1194 Mutex::Locker l(inc_lock);
1195 modified_shards.insert(keys.begin(), keys.end());
1196 }
1197
1198 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1199 delete marker_tracker;
1200 marker_tracker = mt;
1201 }
1202
1203 int operate() override {
1204 int r;
1205 while (true) {
1206 switch (sync_marker.state) {
1207 case rgw_data_sync_marker::FullSync:
1208 r = full_sync();
1209 if (r < 0) {
1210 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1211 return set_cr_error(r);
1212 }
1213 return 0;
1214 case rgw_data_sync_marker::IncrementalSync:
1215 r = incremental_sync();
1216 if (r < 0) {
1217 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1218 return set_cr_error(r);
1219 }
1220 return 0;
1221 default:
1222 return set_cr_error(-EIO);
1223 }
1224 }
1225 return 0;
1226 }
1227
1228 void init_lease_cr() {
1229 set_status("acquiring sync lock");
1230 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1231 string lock_name = "sync_lock";
1232 if (lease_cr) {
1233 lease_cr->abort();
1234 }
1235 RGWRados *store = sync_env->store;
1236 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1237 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1238 lock_name, lock_duration, this));
1239 lease_stack.reset(spawn(lease_cr.get(), false));
1240 }
1241
1242 int full_sync() {
1243 #define OMAP_GET_MAX_ENTRIES 100
1244 int max_entries = OMAP_GET_MAX_ENTRIES;
1245 reenter(&full_cr) {
1246 yield init_lease_cr();
1247 while (!lease_cr->is_locked()) {
1248 if (lease_cr->is_done()) {
1249 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1250 set_status("lease lock failed, early abort");
1251 return set_cr_error(lease_cr->get_ret_status());
1252 }
1253 set_sleeping(true);
1254 yield;
1255 }
1256 logger.log("full sync");
1257 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1258 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1259 total_entries = sync_marker.pos;
1260 do {
1261 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1262 if (retcode < 0) {
1263 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1264 lease_cr->go_down();
1265 drain_all();
1266 return set_cr_error(retcode);
1267 }
1268 iter = entries.begin();
1269 for (; iter != entries.end(); ++iter) {
1270 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
1271 total_entries++;
1272 if (!marker_tracker->start(*iter, total_entries, real_time())) {
1273 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
1274 } else {
1275 // fetch remote and write locally
1276 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
1277 if (retcode < 0) {
1278 lease_cr->go_down();
1279 drain_all();
1280 return set_cr_error(retcode);
1281 }
1282 }
1283 sync_marker.marker = *iter;
1284 }
1285 } while ((int)entries.size() == max_entries);
1286
1287 lease_cr->go_down();
1288 drain_all();
1289
1290 yield {
1291 /* update marker to reflect we're done with full sync */
1292 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1293 sync_marker.marker = sync_marker.next_step_marker;
1294 sync_marker.next_step_marker.clear();
1295 RGWRados *store = sync_env->store;
1296 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1297 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1298 sync_marker));
1299 }
1300 if (retcode < 0) {
1301 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1302 lease_cr->go_down();
1303 return set_cr_error(retcode);
1304 }
1305 }
1306 return 0;
1307 }
1308
1309 int incremental_sync() {
1310 reenter(&incremental_cr) {
1311 yield init_lease_cr();
1312 while (!lease_cr->is_locked()) {
1313 if (lease_cr->is_done()) {
1314 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1315 set_status("lease lock failed, early abort");
1316 return set_cr_error(lease_cr->get_ret_status());
1317 }
1318 set_sleeping(true);
1319 yield;
1320 }
1321 set_status("lease acquired");
1322 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1323 rgw_raw_obj(pool, error_oid),
1324 1 /* no buffer */);
1325 error_repo->get();
1326 spawn(error_repo, false);
1327 logger.log("inc sync");
1328 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1329 do {
1330 current_modified.clear();
1331 inc_lock.Lock();
1332 current_modified.swap(modified_shards);
1333 inc_lock.Unlock();
1334
1335 /* process out of band updates */
1336 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1337 yield {
1338 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1339 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1340 }
1341 }
1342
1343 /* process bucket shards that previously failed */
1344 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1345 error_marker, &error_entries,
1346 max_error_entries));
1347 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1348 iter = error_entries.begin();
1349 for (; iter != error_entries.end(); ++iter) {
1350 error_marker = *iter;
1351 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
1352 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
1353 }
1354 if ((int)error_entries.size() != max_error_entries) {
1355 if (error_marker.empty() && error_entries.empty()) {
1356 /* the retry repo is empty, we back off a bit before calling it again */
1357 retry_backoff_secs *= 2;
1358 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1359 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1360 }
1361 } else {
1362 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1363 }
1364 error_retry_time = ceph::real_clock::now() + make_timespan(retry_backoff_secs);
1365 error_marker.clear();
1366 }
1367
1368
1369 yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
1370 if (retcode < 0) {
1371 ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
1372 stop_spawned_services();
1373 drain_all();
1374 return set_cr_error(retcode);
1375 }
1376 datalog_marker = shard_info.marker;
1377 remote_trimmed = RemoteNotTrimmed;
1378 #define INCREMENTAL_MAX_ENTRIES 100
1379 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1380 if (datalog_marker > sync_marker.marker) {
1381 spawned_keys.clear();
1382 if (sync_marker.marker.empty())
1383 remote_trimmed = RemoteMightTrimmed; //remote data log shard might be trimmed;
1384 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
1385 if (retcode < 0) {
1386 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1387 stop_spawned_services();
1388 drain_all();
1389 return set_cr_error(retcode);
1390 }
1391 if ((remote_trimmed == RemoteMightTrimmed) && sync_marker.marker.empty() && log_entries.empty())
1392 remote_trimmed = RemoteTrimmed;
1393 else
1394 remote_trimmed = RemoteNotTrimmed;
1395 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1396 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1397 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1398 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1399 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1400 continue;
1401 }
1402 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1403 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1404 } else {
1405 /*
1406 * don't spawn the same key more than once. We can do that as long as we don't yield
1407 */
1408 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1409 spawned_keys.insert(log_iter->entry.key);
1410 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1411 if (retcode < 0) {
1412 stop_spawned_services();
1413 drain_all();
1414 return set_cr_error(retcode);
1415 }
1416 }
1417 }
1418 }
1419 while ((int)num_spawned() > spawn_window) {
1420 set_status() << "num_spawned() > spawn_window";
1421 yield wait_for_child();
1422 int ret;
1423 while (collect(&ret, lease_stack.get())) {
1424 if (ret < 0) {
1425 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
1426 /* we have reported this error */
1427 }
1428 /* not waiting for child here */
1429 }
1430 }
1431 }
1432 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
1433 if (datalog_marker == sync_marker.marker || remote_trimmed == RemoteTrimmed) {
1434 #define INCREMENTAL_INTERVAL 20
1435 yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
1436 }
1437 } while (true);
1438 }
1439 return 0;
1440 }
1441 void stop_spawned_services() {
1442 lease_cr->go_down();
1443 if (error_repo) {
1444 error_repo->finish();
1445 error_repo->put();
1446 error_repo = NULL;
1447 }
1448 }
1449 };
1450
1451 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1452 RGWDataSyncEnv *sync_env;
1453
1454 rgw_pool pool;
1455
1456 uint32_t shard_id;
1457 rgw_data_sync_marker sync_marker;
1458
1459 public:
1460 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1461 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1462 sync_env(_sync_env),
1463 pool(_pool),
1464 shard_id(_shard_id),
1465 sync_marker(_marker) {
1466 }
1467
1468 RGWCoroutine *alloc_cr() override {
1469 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1470 }
1471
1472 RGWCoroutine *alloc_finisher_cr() override {
1473 RGWRados *store = sync_env->store;
1474 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1475 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1476 &sync_marker);
1477 }
1478
1479 void append_modified_shards(set<string>& keys) {
1480 Mutex::Locker l(cr_lock());
1481
1482 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1483 if (!cr) {
1484 return;
1485 }
1486
1487 cr->append_modified_shards(keys);
1488 }
1489 };
1490
1491 class RGWDataSyncCR : public RGWCoroutine {
1492 RGWDataSyncEnv *sync_env;
1493 uint32_t num_shards;
1494
1495 rgw_data_sync_status sync_status;
1496
1497 RGWDataSyncShardMarkerTrack *marker_tracker;
1498
1499 Mutex shard_crs_lock;
1500 map<int, RGWDataSyncShardControlCR *> shard_crs;
1501
1502 bool *reset_backoff;
1503
1504 RGWDataSyncDebugLogger logger;
1505
1506 RGWDataSyncModule *data_sync_module{nullptr};
1507 public:
1508 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1509 sync_env(_sync_env),
1510 num_shards(_num_shards),
1511 marker_tracker(NULL),
1512 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1513 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1514
1515 }
1516
1517 ~RGWDataSyncCR() override {
1518 for (auto iter : shard_crs) {
1519 iter.second->put();
1520 }
1521 }
1522
1523 int operate() override {
1524 reenter(this) {
1525
1526 /* read sync status */
1527 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1528
1529 data_sync_module = sync_env->sync_module->get_data_handler();
1530
1531 if (retcode < 0 && retcode != -ENOENT) {
1532 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1533 return set_cr_error(retcode);
1534 }
1535
1536 /* state: init status */
1537 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1538 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1539 sync_status.sync_info.num_shards = num_shards;
1540 uint64_t instance_id;
1541 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1542 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1543 if (retcode < 0) {
1544 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1545 return set_cr_error(retcode);
1546 }
1547 // sets state = StateBuildingFullSyncMaps
1548
1549 *reset_backoff = true;
1550 }
1551
1552 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1553
1554 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1555 /* call sync module init here */
1556 sync_status.sync_info.num_shards = num_shards;
1557 yield call(data_sync_module->init_sync(sync_env));
1558 if (retcode < 0) {
1559 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1560 return set_cr_error(retcode);
1561 }
1562 /* state: building full sync maps */
1563 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1564 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1565 if (retcode < 0) {
1566 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1567 return set_cr_error(retcode);
1568 }
1569 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1570
1571 /* update new state */
1572 yield call(set_sync_info_cr());
1573 if (retcode < 0) {
1574 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1575 return set_cr_error(retcode);
1576 }
1577
1578 *reset_backoff = true;
1579 }
1580
1581 yield {
1582 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1583 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1584 iter != sync_status.sync_markers.end(); ++iter) {
1585 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1586 iter->first, iter->second);
1587 cr->get();
1588 shard_crs_lock.Lock();
1589 shard_crs[iter->first] = cr;
1590 shard_crs_lock.Unlock();
1591 spawn(cr, true);
1592 }
1593 }
1594 }
1595
1596 return set_cr_done();
1597 }
1598 return 0;
1599 }
1600
1601 RGWCoroutine *set_sync_info_cr() {
1602 RGWRados *store = sync_env->store;
1603 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1604 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1605 sync_status.sync_info);
1606 }
1607
1608 void wakeup(int shard_id, set<string>& keys) {
1609 Mutex::Locker l(shard_crs_lock);
1610 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1611 if (iter == shard_crs.end()) {
1612 return;
1613 }
1614 iter->second->append_modified_shards(keys);
1615 iter->second->wakeup();
1616 }
1617 };
1618
1619 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1620 public:
1621 RGWDefaultDataSyncModule() {}
1622
1623 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1624 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1625 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1626 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1627 };
1628
1629 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1630 RGWDefaultDataSyncModule data_handler;
1631 public:
1632 RGWDefaultSyncModuleInstance() {}
1633 RGWDataSyncModule *get_data_handler() override {
1634 return &data_handler;
1635 }
1636 };
1637
1638 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1639 {
1640 instance->reset(new RGWDefaultSyncModuleInstance());
1641 return 0;
1642 }
1643
1644 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1645 {
1646 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1647 key, versioned_epoch,
1648 true, zones_trace);
1649 }
1650
1651 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1652 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1653 {
1654 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1655 bucket_info, key, versioned, versioned_epoch,
1656 NULL, NULL, false, &mtime, zones_trace);
1657 }
1658
1659 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1660 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1661 {
1662 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1663 bucket_info, key, versioned, versioned_epoch,
1664 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1665 }
1666
1667 class RGWDataSyncControlCR : public RGWBackoffControlCR
1668 {
1669 RGWDataSyncEnv *sync_env;
1670 uint32_t num_shards;
1671
1672 static constexpr bool exit_on_error = false; // retry on all errors
1673 public:
1674 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1675 sync_env(_sync_env), num_shards(_num_shards) {
1676 }
1677
1678 RGWCoroutine *alloc_cr() override {
1679 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1680 }
1681
1682 void wakeup(int shard_id, set<string>& keys) {
1683 Mutex& m = cr_lock();
1684
1685 m.Lock();
1686 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1687 if (!cr) {
1688 m.Unlock();
1689 return;
1690 }
1691
1692 cr->get();
1693 m.Unlock();
1694
1695 if (cr) {
1696 cr->wakeup(shard_id, keys);
1697 }
1698
1699 cr->put();
1700 }
1701 };
1702
1703 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1704 RWLock::RLocker rl(lock);
1705 if (!data_sync_cr) {
1706 return;
1707 }
1708 data_sync_cr->wakeup(shard_id, keys);
1709 }
1710
1711 int RGWRemoteDataLog::run_sync(int num_shards)
1712 {
1713 lock.get_write();
1714 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1715 data_sync_cr->get(); // run() will drop a ref, so take another
1716 lock.unlock();
1717
1718 int r = run(data_sync_cr);
1719
1720 lock.get_write();
1721 data_sync_cr->put();
1722 data_sync_cr = NULL;
1723 lock.unlock();
1724
1725 if (r < 0) {
1726 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1727 return r;
1728 }
1729 return 0;
1730 }
1731
1732 int RGWDataSyncStatusManager::init()
1733 {
1734 auto zone_def_iter = store->zone_by_id.find(source_zone);
1735 if (zone_def_iter == store->zone_by_id.end()) {
1736 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1737 return -EIO;
1738 }
1739
1740 auto& zone_def = zone_def_iter->second;
1741
1742 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1743 return -ENOTSUP;
1744 }
1745
1746 RGWZoneParams& zone_params = store->get_zone_params();
1747
1748 if (sync_module == nullptr) {
1749 sync_module = store->get_sync_module();
1750 }
1751
1752 conn = store->get_zone_conn_by_id(source_zone);
1753 if (!conn) {
1754 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1755 return -EINVAL;
1756 }
1757
1758 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1759
1760 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1761 if (r < 0) {
1762 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1763 finalize();
1764 return r;
1765 }
1766
1767 rgw_datalog_info datalog_info;
1768 r = source_log.read_log_info(&datalog_info);
1769 if (r < 0) {
1770 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1771 finalize();
1772 return r;
1773 }
1774
1775 num_shards = datalog_info.num_shards;
1776
1777 for (int i = 0; i < num_shards; i++) {
1778 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1779 }
1780
1781 return 0;
1782 }
1783
1784 void RGWDataSyncStatusManager::finalize()
1785 {
1786 delete error_logger;
1787 error_logger = nullptr;
1788 }
1789
1790 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1791 {
1792 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1793 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1794
1795 return string(buf);
1796 }
1797
1798 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1799 {
1800 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1801 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1802
1803 return string(buf);
1804 }
1805
1806 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1807 const rgw_bucket& bucket, int shard_id,
1808 RGWSyncErrorLogger *_error_logger,
1809 RGWSyncModuleInstanceRef& _sync_module)
1810 {
1811 conn = _conn;
1812 source_zone = _source_zone;
1813 bs.bucket = bucket;
1814 bs.shard_id = shard_id;
1815
1816 sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
1817 _error_logger, source_zone, _sync_module, nullptr);
1818
1819 return 0;
1820 }
1821
1822 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1823 RGWDataSyncEnv *sync_env;
1824 const string instance_key;
1825
1826 rgw_bucket_index_marker_info *info;
1827
1828 public:
1829 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1830 const rgw_bucket_shard& bs,
1831 rgw_bucket_index_marker_info *_info)
1832 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1833 instance_key(bs.get_key()), info(_info) {}
1834
1835 int operate() override {
1836 reenter(this) {
1837 yield {
1838 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1839 { "bucket-instance", instance_key.c_str() },
1840 { "info" , NULL },
1841 { NULL, NULL } };
1842
1843 string p = "/admin/log/";
1844 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1845 }
1846 if (retcode < 0) {
1847 return set_cr_error(retcode);
1848 }
1849 return set_cr_done();
1850 }
1851 return 0;
1852 }
1853 };
1854
1855 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1856 RGWDataSyncEnv *sync_env;
1857
1858 rgw_bucket_shard bs;
1859 const string sync_status_oid;
1860
1861 rgw_bucket_shard_sync_info& status;
1862
1863 rgw_bucket_index_marker_info info;
1864 public:
1865 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1866 const rgw_bucket_shard& bs,
1867 rgw_bucket_shard_sync_info& _status)
1868 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1869 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1870 status(_status)
1871 {}
1872
1873 int operate() override {
1874 reenter(this) {
1875 /* fetch current position in logs */
1876 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1877 if (retcode < 0 && retcode != -ENOENT) {
1878 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1879 return set_cr_error(retcode);
1880 }
1881 yield {
1882 auto store = sync_env->store;
1883 rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1884
1885 if (info.syncstopped) {
1886 call(new RGWRadosRemoveCR(store, obj));
1887 } else {
1888 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1889 status.inc_marker.position = info.max_marker;
1890 map<string, bufferlist> attrs;
1891 status.encode_all_attrs(attrs);
1892 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1893 }
1894 }
1895 return set_cr_done();
1896 }
1897 return 0;
1898 }
1899 };
1900
1901 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1902 {
1903 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1904 }
1905
1906 template <class T>
1907 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1908 {
1909 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1910 if (iter == attrs.end()) {
1911 *val = T();
1912 return;
1913 }
1914
1915 bufferlist::iterator biter = iter->second.begin();
1916 try {
1917 ::decode(*val, biter);
1918 } catch (buffer::error& err) {
1919 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1920 }
1921 }
1922
1923 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1924 {
1925 decode_attr(cct, attrs, "state", &state);
1926 decode_attr(cct, attrs, "full_marker", &full_marker);
1927 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1928 }
1929
1930 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1931 {
1932 encode_state_attr(attrs);
1933 full_marker.encode_attr(attrs);
1934 inc_marker.encode_attr(attrs);
1935 }
1936
1937 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1938 {
1939 ::encode(state, attrs["state"]);
1940 }
1941
1942 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1943 {
1944 ::encode(*this, attrs["full_marker"]);
1945 }
1946
1947 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1948 {
1949 ::encode(*this, attrs["inc_marker"]);
1950 }
1951
1952 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1953 RGWDataSyncEnv *sync_env;
1954 string oid;
1955 rgw_bucket_shard_sync_info *status;
1956
1957 map<string, bufferlist> attrs;
1958 public:
1959 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1960 const rgw_bucket_shard& bs,
1961 rgw_bucket_shard_sync_info *_status)
1962 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1963 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1964 status(_status) {}
1965 int operate() override;
1966 };
1967
1968 int RGWReadBucketSyncStatusCoroutine::operate()
1969 {
1970 reenter(this) {
1971 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1972 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1973 &attrs));
1974 if (retcode == -ENOENT) {
1975 *status = rgw_bucket_shard_sync_info();
1976 return set_cr_done();
1977 }
1978 if (retcode < 0) {
1979 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
1980 return set_cr_error(retcode);
1981 }
1982 status->decode_from_attrs(sync_env->cct, attrs);
1983 return set_cr_done();
1984 }
1985 return 0;
1986 }
1987
1988 #define OMAP_READ_MAX_ENTRIES 10
1989 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
1990 RGWDataSyncEnv *sync_env;
1991 RGWRados *store;
1992
1993 const int shard_id;
1994 int max_entries;
1995
1996 set<string>& recovering_buckets;
1997 string marker;
1998 string error_oid;
1999
2000 set<string> error_entries;
2001 int max_omap_entries;
2002 int count;
2003
2004 public:
2005 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2006 set<string>& _recovering_buckets, const int _max_entries)
2007 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2008 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2009 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2010 {
2011 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2012 }
2013
2014 int operate() override;
2015 };
2016
2017 int RGWReadRecoveringBucketShardsCoroutine::operate()
2018 {
2019 reenter(this){
2020 //read recovering bucket shards
2021 count = 0;
2022 do {
2023 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
2024 marker, &error_entries, max_omap_entries));
2025
2026 if (retcode == -ENOENT) {
2027 break;
2028 }
2029
2030 if (retcode < 0) {
2031 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2032 << cpp_strerror(retcode) << dendl;
2033 return set_cr_error(retcode);
2034 }
2035
2036 if (error_entries.empty()) {
2037 break;
2038 }
2039
2040 count += error_entries.size();
2041 marker = *error_entries.rbegin();
2042 recovering_buckets.insert(error_entries.begin(), error_entries.end());
2043 }while((int)error_entries.size() == max_omap_entries && count < max_entries);
2044
2045 return set_cr_done();
2046 }
2047
2048 return 0;
2049 }
2050
2051 class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2052 RGWDataSyncEnv *sync_env;
2053 RGWRados *store;
2054
2055 const int shard_id;
2056 int max_entries;
2057
2058 set<string>& pending_buckets;
2059 string marker;
2060 string status_oid;
2061
2062 rgw_data_sync_marker* sync_marker;
2063 int count;
2064
2065 list<rgw_data_change_log_entry> log_entries;
2066 bool truncated;
2067
2068 public:
2069 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2070 set<string>& _pending_buckets,
2071 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2072 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2073 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2074 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2075 {
2076 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2077 }
2078
2079 int operate() override;
2080 };
2081
2082 int RGWReadPendingBucketShardsCoroutine::operate()
2083 {
2084 reenter(this){
2085 //read sync status marker
2086 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
2087 yield call(new CR(sync_env->async_rados, store,
2088 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2089 sync_marker));
2090 if (retcode < 0) {
2091 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2092 << cpp_strerror(retcode) << dendl;
2093 return set_cr_error(retcode);
2094 }
2095
2096 //read pending bucket shards
2097 marker = sync_marker->marker;
2098 count = 0;
2099 do{
2100 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
2101
2102 if (retcode == -ENOENT) {
2103 break;
2104 }
2105
2106 if (retcode < 0) {
2107 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2108 << cpp_strerror(retcode) << dendl;
2109 return set_cr_error(retcode);
2110 }
2111
2112 if (log_entries.empty()) {
2113 break;
2114 }
2115
2116 count += log_entries.size();
2117 for (const auto& entry : log_entries) {
2118 pending_buckets.insert(entry.entry.key);
2119 }
2120 }while(truncated && count < max_entries);
2121
2122 return set_cr_done();
2123 }
2124
2125 return 0;
2126 }
2127
2128 int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2129 {
2130 // cannot run concurrently with run_sync(), so run in a separate manager
2131 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2132 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2133 int ret = http_manager.set_threaded();
2134 if (ret < 0) {
2135 ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
2136 return ret;
2137 }
2138 RGWDataSyncEnv sync_env_local = sync_env;
2139 sync_env_local.http_manager = &http_manager;
2140 list<RGWCoroutinesStack *> stacks;
2141 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2142 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2143 stacks.push_back(recovering_stack);
2144 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2145 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2146 stacks.push_back(pending_stack);
2147 ret = crs.run(stacks);
2148 http_manager.stop();
2149 return ret;
2150 }
2151
2152 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2153 {
2154 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2155 }
2156
2157 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2158 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2159 delete iter->second;
2160 }
2161 delete error_logger;
2162 }
2163
2164
2165 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2166 {
2167 JSONDecoder::decode_json("ID", id, obj);
2168 JSONDecoder::decode_json("DisplayName", display_name, obj);
2169 }
2170
2171 struct bucket_list_entry {
2172 bool delete_marker;
2173 rgw_obj_key key;
2174 bool is_latest;
2175 real_time mtime;
2176 string etag;
2177 uint64_t size;
2178 string storage_class;
2179 rgw_bucket_entry_owner owner;
2180 uint64_t versioned_epoch;
2181 string rgw_tag;
2182
2183 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2184
2185 void decode_json(JSONObj *obj) {
2186 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2187 JSONDecoder::decode_json("Key", key.name, obj);
2188 JSONDecoder::decode_json("VersionId", key.instance, obj);
2189 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2190 string mtime_str;
2191 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2192
2193 struct tm t;
2194 uint32_t nsec;
2195 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2196 ceph_timespec ts;
2197 ts.tv_sec = (uint64_t)internal_timegm(&t);
2198 ts.tv_nsec = nsec;
2199 mtime = real_clock::from_ceph_timespec(ts);
2200 }
2201 JSONDecoder::decode_json("ETag", etag, obj);
2202 JSONDecoder::decode_json("Size", size, obj);
2203 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2204 JSONDecoder::decode_json("Owner", owner, obj);
2205 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2206 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
2207 }
2208 };
2209
2210 struct bucket_list_result {
2211 string name;
2212 string prefix;
2213 string key_marker;
2214 string version_id_marker;
2215 int max_keys;
2216 bool is_truncated;
2217 list<bucket_list_entry> entries;
2218
2219 bucket_list_result() : max_keys(0), is_truncated(false) {}
2220
2221 void decode_json(JSONObj *obj) {
2222 JSONDecoder::decode_json("Name", name, obj);
2223 JSONDecoder::decode_json("Prefix", prefix, obj);
2224 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2225 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2226 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2227 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2228 JSONDecoder::decode_json("Entries", entries, obj);
2229 }
2230 };
2231
2232 class RGWListBucketShardCR: public RGWCoroutine {
2233 RGWDataSyncEnv *sync_env;
2234 const rgw_bucket_shard& bs;
2235 const string instance_key;
2236 rgw_obj_key marker_position;
2237
2238 bucket_list_result *result;
2239
2240 public:
2241 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2242 rgw_obj_key& _marker_position, bucket_list_result *_result)
2243 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2244 instance_key(bs.get_key()), marker_position(_marker_position),
2245 result(_result) {}
2246
2247 int operate() override {
2248 reenter(this) {
2249 yield {
2250 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2251 { "versions" , NULL },
2252 { "format" , "json" },
2253 { "objs-container" , "true" },
2254 { "key-marker" , marker_position.name.c_str() },
2255 { "version-id-marker" , marker_position.instance.c_str() },
2256 { NULL, NULL } };
2257 // don't include tenant in the url, it's already part of instance_key
2258 string p = string("/") + bs.bucket.name;
2259 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2260 }
2261 if (retcode < 0) {
2262 return set_cr_error(retcode);
2263 }
2264 return set_cr_done();
2265 }
2266 return 0;
2267 }
2268 };
2269
2270 class RGWListBucketIndexLogCR: public RGWCoroutine {
2271 RGWDataSyncEnv *sync_env;
2272 const string instance_key;
2273 string marker;
2274
2275 list<rgw_bi_log_entry> *result;
2276
2277 public:
2278 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2279 string& _marker, list<rgw_bi_log_entry> *_result)
2280 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2281 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2282
2283 int operate() override {
2284 reenter(this) {
2285 yield {
2286 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2287 { "format" , "json" },
2288 { "marker" , marker.c_str() },
2289 { "type", "bucket-index" },
2290 { NULL, NULL } };
2291
2292 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2293 }
2294 if (retcode < 0) {
2295 return set_cr_error(retcode);
2296 }
2297 return set_cr_done();
2298 }
2299 return 0;
2300 }
2301 };
2302
2303 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2304
2305 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2306 RGWDataSyncEnv *sync_env;
2307
2308 string marker_oid;
2309 rgw_bucket_shard_full_sync_marker sync_marker;
2310
2311 public:
2312 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2313 const string& _marker_oid,
2314 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2315 sync_env(_sync_env),
2316 marker_oid(_marker_oid),
2317 sync_marker(_marker) {}
2318
2319 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2320 sync_marker.position = new_marker;
2321 sync_marker.count = index_pos;
2322
2323 map<string, bufferlist> attrs;
2324 sync_marker.encode_attr(attrs);
2325
2326 RGWRados *store = sync_env->store;
2327
2328 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2329 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2330 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2331 attrs);
2332 }
2333 };
2334
2335 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2336 RGWDataSyncEnv *sync_env;
2337
2338 string marker_oid;
2339 rgw_bucket_shard_inc_sync_marker sync_marker;
2340
2341 map<rgw_obj_key, string> key_to_marker;
2342 map<string, rgw_obj_key> marker_to_key;
2343
2344 void handle_finish(const string& marker) override {
2345 map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
2346 if (iter == marker_to_key.end()) {
2347 return;
2348 }
2349 key_to_marker.erase(iter->second);
2350 reset_need_retry(iter->second);
2351 marker_to_key.erase(iter);
2352 }
2353
2354 public:
2355 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2356 const string& _marker_oid,
2357 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2358 sync_env(_sync_env),
2359 marker_oid(_marker_oid),
2360 sync_marker(_marker) {}
2361
2362 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2363 sync_marker.position = new_marker;
2364
2365 map<string, bufferlist> attrs;
2366 sync_marker.encode_attr(attrs);
2367
2368 RGWRados *store = sync_env->store;
2369
2370 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2371 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2372 store,
2373 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2374 attrs);
2375 }
2376
2377 /*
2378 * create index from key -> <op, marker>, and from marker -> key
2379 * this is useful so that we can insure that we only have one
2380 * entry for any key that is used. This is needed when doing
2381 * incremenatl sync of data, and we don't want to run multiple
2382 * concurrent sync operations for the same bucket shard
2383 * Also, we should make sure that we don't run concurrent operations on the same key with
2384 * different ops.
2385 */
2386 bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
2387 if (key_to_marker.find(key) != key_to_marker.end()) {
2388 set_need_retry(key);
2389 return false;
2390 }
2391 key_to_marker[key] = marker;
2392 marker_to_key[marker] = key;
2393 return true;
2394 }
2395
2396 bool can_do_op(const rgw_obj_key& key) {
2397 return (key_to_marker.find(key) == key_to_marker.end());
2398 }
2399 };
2400
2401 template <class T, class K>
2402 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2403 RGWDataSyncEnv *sync_env;
2404
2405 RGWBucketInfo *bucket_info;
2406 const rgw_bucket_shard& bs;
2407
2408 rgw_obj_key key;
2409 bool versioned;
2410 uint64_t versioned_epoch;
2411 rgw_bucket_entry_owner owner;
2412 real_time timestamp;
2413 RGWModifyOp op;
2414 RGWPendingState op_state;
2415
2416 T entry_marker;
2417 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2418
2419 int sync_status;
2420
2421 stringstream error_ss;
2422
2423 RGWDataSyncDebugLogger logger;
2424
2425 bool error_injection;
2426
2427 RGWDataSyncModule *data_sync_module;
2428
2429 rgw_zone_set zones_trace;
2430
2431 public:
2432 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2433 RGWBucketInfo *_bucket_info,
2434 const rgw_bucket_shard& bs,
2435 const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
2436 real_time& _timestamp,
2437 const rgw_bucket_entry_owner& _owner,
2438 RGWModifyOp _op, RGWPendingState _op_state,
2439 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2440 sync_env(_sync_env),
2441 bucket_info(_bucket_info), bs(bs),
2442 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2443 owner(_owner),
2444 timestamp(_timestamp), op(_op),
2445 op_state(_op_state),
2446 entry_marker(_entry_marker),
2447 marker_tracker(_marker_tracker),
2448 sync_status(0){
2449 stringstream ss;
2450 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
2451 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2452 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2453 set_status("init");
2454
2455 logger.init(sync_env, "Object", ss.str());
2456
2457 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2458
2459 data_sync_module = sync_env->sync_module->get_data_handler();
2460
2461 zones_trace = _zones_trace;
2462 zones_trace.insert(sync_env->store->get_zone().id);
2463 }
2464
2465 int operate() override {
2466 reenter(this) {
2467 /* skip entries that are not complete */
2468 if (op_state != CLS_RGW_STATE_COMPLETE) {
2469 goto done;
2470 }
2471 do {
2472 yield {
2473 marker_tracker->reset_need_retry(key);
2474 if (key.name.empty()) {
2475 /* shouldn't happen */
2476 set_status("skipping empty entry");
2477 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2478 goto done;
2479 }
2480 if (error_injection &&
2481 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2482 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2483 retcode = -EIO;
2484 } else if (op == CLS_RGW_OP_ADD ||
2485 op == CLS_RGW_OP_LINK_OLH) {
2486 if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
2487 set_status("skipping entry");
2488 ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
2489 goto done;
2490
2491 }
2492 set_status("syncing obj");
2493 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2494 logger.log("fetch");
2495 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2496 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2497 set_status("removing obj");
2498 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2499 versioned = true;
2500 }
2501 logger.log("remove");
2502 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch, &zones_trace));
2503 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2504 logger.log("creating delete marker");
2505 set_status("creating delete marker");
2506 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
2507 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch, &zones_trace));
2508 }
2509 }
2510 } while (marker_tracker->need_retry(key));
2511 {
2512 stringstream ss;
2513 if (retcode >= 0) {
2514 ss << "done";
2515 } else {
2516 ss << "done, retcode=" << retcode;
2517 }
2518 logger.log(ss.str());
2519 }
2520
2521 if (retcode < 0 && retcode != -ENOENT) {
2522 set_status() << "failed to sync obj; retcode=" << retcode;
2523 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2524 << bucket_shard_str{bs} << "/" << key.name << dendl;
2525 error_ss << bucket_shard_str{bs} << "/" << key.name;
2526 sync_status = retcode;
2527 }
2528 if (!error_ss.str().empty()) {
2529 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
2530 }
2531 done:
2532 if (sync_status == 0) {
2533 /* update marker */
2534 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2535 yield call(marker_tracker->finish(entry_marker));
2536 sync_status = retcode;
2537 }
2538 if (sync_status < 0) {
2539 return set_cr_error(sync_status);
2540 }
2541 return set_cr_done();
2542 }
2543 return 0;
2544 }
2545 };
2546
2547 #define BUCKET_SYNC_SPAWN_WINDOW 20
2548
2549 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2550 RGWDataSyncEnv *sync_env;
2551 const rgw_bucket_shard& bs;
2552 RGWBucketInfo *bucket_info;
2553 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2554 bucket_list_result list_result;
2555 list<bucket_list_entry>::iterator entries_iter;
2556 rgw_bucket_shard_full_sync_marker& full_marker;
2557 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2558 rgw_obj_key list_marker;
2559 bucket_list_entry *entry{nullptr};
2560 RGWModifyOp op{CLS_RGW_OP_ADD};
2561
2562 int total_entries{0};
2563
2564 int sync_status{0};
2565
2566 const string& status_oid;
2567
2568 RGWDataSyncDebugLogger logger;
2569 rgw_zone_set zones_trace;
2570 public:
2571 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2572 RGWBucketInfo *_bucket_info,
2573 const std::string& status_oid,
2574 RGWContinuousLeaseCR *lease_cr,
2575 rgw_bucket_shard_full_sync_marker& _full_marker)
2576 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2577 bucket_info(_bucket_info), lease_cr(lease_cr), full_marker(_full_marker),
2578 marker_tracker(sync_env, status_oid, full_marker),
2579 status_oid(status_oid) {
2580 logger.init(sync_env, "BucketFull", bs.get_key());
2581 zones_trace.insert(sync_env->source_zone);
2582 }
2583
2584 int operate() override;
2585 };
2586
2587 int RGWBucketShardFullSyncCR::operate()
2588 {
2589 int ret;
2590 reenter(this) {
2591 list_marker = full_marker.position;
2592
2593 total_entries = full_marker.count;
2594 do {
2595 if (!lease_cr->is_locked()) {
2596 drain_all();
2597 return set_cr_error(-ECANCELED);
2598 }
2599 set_status("listing remote bucket");
2600 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2601 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2602 &list_result));
2603 if (retcode < 0 && retcode != -ENOENT) {
2604 set_status("failed bucket listing, going down");
2605 drain_all();
2606 return set_cr_error(retcode);
2607 }
2608 entries_iter = list_result.entries.begin();
2609 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2610 if (!lease_cr->is_locked()) {
2611 drain_all();
2612 return set_cr_error(-ECANCELED);
2613 }
2614 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2615 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2616 entry = &(*entries_iter);
2617 total_entries++;
2618 list_marker = entries_iter->key;
2619 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2620 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2621 } else {
2622 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
2623 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2624 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2625 false, /* versioned, only matters for object removal */
2626 entry->versioned_epoch, entry->mtime,
2627 entry->owner, op, CLS_RGW_STATE_COMPLETE,
2628 entry->key, &marker_tracker, zones_trace),
2629 false);
2630 }
2631 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2632 yield wait_for_child();
2633 bool again = true;
2634 while (again) {
2635 again = collect(&ret, nullptr);
2636 if (ret < 0) {
2637 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2638 sync_status = ret;
2639 /* we have reported this error */
2640 }
2641 }
2642 }
2643 }
2644 } while (list_result.is_truncated && sync_status == 0);
2645 set_status("done iterating over all objects");
2646 /* wait for all operations to complete */
2647 while (num_spawned()) {
2648 yield wait_for_child();
2649 bool again = true;
2650 while (again) {
2651 again = collect(&ret, nullptr);
2652 if (ret < 0) {
2653 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2654 sync_status = ret;
2655 /* we have reported this error */
2656 }
2657 }
2658 }
2659 if (!lease_cr->is_locked()) {
2660 return set_cr_error(-ECANCELED);
2661 }
2662 /* update sync state to incremental */
2663 if (sync_status == 0) {
2664 yield {
2665 rgw_bucket_shard_sync_info sync_status;
2666 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2667 map<string, bufferlist> attrs;
2668 sync_status.encode_state_attr(attrs);
2669 RGWRados *store = sync_env->store;
2670 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2671 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2672 attrs));
2673 }
2674 } else {
2675 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2676 }
2677 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2678 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2679 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2680 return set_cr_error(retcode);
2681 }
2682 if (sync_status < 0) {
2683 return set_cr_error(sync_status);
2684 }
2685 return set_cr_done();
2686 }
2687 return 0;
2688 }
2689
2690 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2691 RGWDataSyncEnv *sync_env;
2692 const rgw_bucket_shard& bs;
2693 RGWBucketInfo *bucket_info;
2694 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2695 list<rgw_bi_log_entry> list_result;
2696 list<rgw_bi_log_entry>::iterator entries_iter;
2697 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2698 rgw_bucket_shard_inc_sync_marker& inc_marker;
2699 rgw_obj_key key;
2700 rgw_bi_log_entry *entry{nullptr};
2701 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2702 bool updated_status{false};
2703 const string& status_oid;
2704 const string& zone_id;
2705 ceph::real_time sync_modify_time;
2706
2707 string cur_id;
2708
2709 RGWDataSyncDebugLogger logger;
2710
2711 int sync_status{0};
2712 bool syncstopped{false};
2713
2714 public:
2715 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2716 const rgw_bucket_shard& bs,
2717 RGWBucketInfo *_bucket_info,
2718 const std::string& status_oid,
2719 RGWContinuousLeaseCR *lease_cr,
2720 rgw_bucket_shard_inc_sync_marker& _inc_marker)
2721 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2722 bucket_info(_bucket_info), lease_cr(lease_cr), inc_marker(_inc_marker),
2723 marker_tracker(sync_env, status_oid, inc_marker), status_oid(status_oid) , zone_id(_sync_env->store->get_zone().id){
2724 set_description() << "bucket shard incremental sync bucket="
2725 << bucket_shard_str{bs};
2726 set_status("init");
2727 logger.init(sync_env, "BucketInc", bs.get_key());
2728 }
2729
2730 int operate() override;
2731 };
2732
2733 int RGWBucketShardIncrementalSyncCR::operate()
2734 {
2735 int ret;
2736 reenter(this) {
2737 do {
2738 if (!lease_cr->is_locked()) {
2739 drain_all();
2740 return set_cr_error(-ECANCELED);
2741 }
2742 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << inc_marker.position << dendl;
2743 set_status() << "listing bilog; position=" << inc_marker.position;
2744 yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
2745 &list_result));
2746 if (retcode < 0 && retcode != -ENOENT ) {
2747 drain_all();
2748 if (!syncstopped) {
2749 /* wait for all operations to complete */
2750 return set_cr_error(retcode);
2751 } else {
2752 /* no need to retry */
2753 break;
2754 }
2755 }
2756 squash_map.clear();
2757 for (auto& e : list_result) {
2758 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
2759 ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
2760 sync_modify_time = e.timestamp;
2761 syncstopped = true;
2762 continue;
2763 }
2764 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
2765 ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
2766 sync_modify_time = e.timestamp;
2767 syncstopped = false;
2768 continue;
2769 }
2770 if (e.op == CLS_RGW_OP_CANCEL) {
2771 continue;
2772 }
2773 if (e.state != CLS_RGW_STATE_COMPLETE) {
2774 continue;
2775 }
2776 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2777 continue;
2778 }
2779 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2780 if (squash_entry.first <= e.timestamp) {
2781 squash_entry = make_pair<>(e.timestamp, e.op);
2782 }
2783 }
2784
2785 entries_iter = list_result.begin();
2786 for (; entries_iter != list_result.end(); ++entries_iter) {
2787 if (!lease_cr->is_locked()) {
2788 drain_all();
2789 return set_cr_error(-ECANCELED);
2790 }
2791 entry = &(*entries_iter);
2792 {
2793 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2794 if (p < 0) {
2795 cur_id = entry->id;
2796 } else {
2797 cur_id = entry->id.substr(p + 1);
2798 }
2799 }
2800 inc_marker.position = cur_id;
2801
2802 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2803 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl;
2804 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2805 continue;
2806 }
2807
2808 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2809 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2810 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2811 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2812 continue;
2813 }
2814
2815 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2816
2817 if (!key.ns.empty()) {
2818 set_status() << "skipping entry in namespace: " << entry->object;
2819 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2820 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2821 continue;
2822 }
2823
2824 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2825 if (entry->op == CLS_RGW_OP_CANCEL) {
2826 set_status() << "canceled operation, skipping";
2827 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2828 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2829 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2830 continue;
2831 }
2832 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2833 set_status() << "non-complete operation, skipping";
2834 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2835 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2836 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2837 continue;
2838 }
2839 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2840 set_status() << "redundant operation, skipping";
2841 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2842 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2843 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2844 continue;
2845 }
2846 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2847 set_status() << "squashed operation, skipping";
2848 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2849 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2850 /* not updating high marker though */
2851 continue;
2852 }
2853 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2854 << bucket_shard_str{bs} << "/" << key << dendl;
2855 updated_status = false;
2856 while (!marker_tracker.can_do_op(key)) {
2857 if (!updated_status) {
2858 set_status() << "can't do op, conflicting inflight operation";
2859 updated_status = true;
2860 }
2861 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2862 yield wait_for_child();
2863 bool again = true;
2864 while (again) {
2865 again = collect(&ret, nullptr);
2866 if (ret < 0) {
2867 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2868 sync_status = ret;
2869 /* we have reported this error */
2870 }
2871 }
2872 }
2873 if (!marker_tracker.index_key_to_marker(key, cur_id)) {
2874 set_status() << "can't do op, sync already in progress for object";
2875 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2876 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2877 continue;
2878 }
2879 // yield {
2880 set_status() << "start object sync";
2881 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2882 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2883 } else {
2884 uint64_t versioned_epoch = 0;
2885 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2886 if (entry->ver.pool < 0) {
2887 versioned_epoch = entry->ver.epoch;
2888 }
2889 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2890 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2891 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2892 entry->is_versioned(), versioned_epoch,
2893 entry->timestamp, owner, entry->op, entry->state,
2894 cur_id, &marker_tracker, entry->zones_trace),
2895 false);
2896 }
2897 // }
2898 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2899 set_status() << "num_spawned() > spawn_window";
2900 yield wait_for_child();
2901 bool again = true;
2902 while (again) {
2903 again = collect(&ret, nullptr);
2904 if (ret < 0) {
2905 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2906 sync_status = ret;
2907 /* we have reported this error */
2908 }
2909 /* not waiting for child here */
2910 }
2911 }
2912 }
2913 } while (!list_result.empty() && sync_status == 0);
2914
2915 if (syncstopped) {
2916 drain_all();
2917
2918 yield {
2919 const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
2920 RGWRados *store = sync_env->store;
2921 call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
2922 }
2923 lease_cr->abort();
2924 return set_cr_done();
2925 }
2926
2927 while (num_spawned()) {
2928 yield wait_for_child();
2929 bool again = true;
2930 while (again) {
2931 again = collect(&ret, nullptr);
2932 if (ret < 0) {
2933 ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
2934 sync_status = ret;
2935 /* we have reported this error */
2936 }
2937 /* not waiting for child here */
2938 }
2939 }
2940
2941 yield call(marker_tracker.flush());
2942 if (retcode < 0) {
2943 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2944 return set_cr_error(retcode);
2945 }
2946 if (sync_status < 0) {
2947 ldout(sync_env->cct, 0) << "ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2948 }
2949
2950 /* wait for all operations to complete */
2951 drain_all();
2952
2953 if (sync_status < 0) {
2954 return set_cr_error(sync_status);
2955 }
2956
2957 return set_cr_done();
2958 }
2959 return 0;
2960 }
2961
2962 int RGWRunBucketSyncCoroutine::operate()
2963 {
2964 reenter(this) {
2965 yield {
2966 set_status("acquiring sync lock");
2967 auto store = sync_env->store;
2968 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
2969 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2970 "sync_lock",
2971 cct->_conf->rgw_sync_lease_period,
2972 this));
2973 lease_stack.reset(spawn(lease_cr.get(), false));
2974 }
2975 while (!lease_cr->is_locked()) {
2976 if (lease_cr->is_done()) {
2977 ldout(cct, 5) << "lease cr failed, done early" << dendl;
2978 set_status("lease lock failed, early abort");
2979 return set_cr_error(lease_cr->get_ret_status());
2980 }
2981 set_sleeping(true);
2982 yield;
2983 }
2984
2985 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
2986 if (retcode < 0 && retcode != -ENOENT) {
2987 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
2988 << bucket_shard_str{bs} << dendl;
2989 lease_cr->go_down();
2990 drain_all();
2991 return set_cr_error(retcode);
2992 }
2993
2994 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
2995 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
2996
2997 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
2998 if (retcode == -ENOENT) {
2999 /* bucket instance info has not been synced in yet, fetch it now */
3000 yield {
3001 ldout(sync_env->cct, 10) << "no local info for bucket "
3002 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
3003 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3004
3005 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
3006
3007 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3008 string() /* no marker */,
3009 MDLOG_STATUS_COMPLETE,
3010 NULL /* no marker tracker */));
3011 }
3012 if (retcode < 0) {
3013 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
3014 lease_cr->go_down();
3015 drain_all();
3016 return set_cr_error(retcode);
3017 }
3018
3019 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3020 }
3021 if (retcode < 0) {
3022 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
3023 lease_cr->go_down();
3024 drain_all();
3025 return set_cr_error(retcode);
3026 }
3027
3028 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3029 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
3030 if (retcode < 0) {
3031 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
3032 << " failed, retcode=" << retcode << dendl;
3033 lease_cr->go_down();
3034 drain_all();
3035 return set_cr_error(retcode);
3036 }
3037 }
3038
3039 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3040 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3041 status_oid, lease_cr.get(),
3042 sync_status.full_marker));
3043 if (retcode < 0) {
3044 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
3045 << " failed, retcode=" << retcode << dendl;
3046 lease_cr->go_down();
3047 drain_all();
3048 return set_cr_error(retcode);
3049 }
3050 sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
3051 }
3052
3053 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3054 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3055 status_oid, lease_cr.get(),
3056 sync_status.inc_marker));
3057 if (retcode < 0) {
3058 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
3059 << " failed, retcode=" << retcode << dendl;
3060 lease_cr->go_down();
3061 drain_all();
3062 return set_cr_error(retcode);
3063 }
3064 }
3065
3066 lease_cr->go_down();
3067 drain_all();
3068 return set_cr_done();
3069 }
3070
3071 return 0;
3072 }
3073
3074 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3075 {
3076 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
3077 }
3078
3079 int RGWBucketSyncStatusManager::init()
3080 {
3081 conn = store->get_zone_conn_by_id(source_zone);
3082 if (!conn) {
3083 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
3084 return -EINVAL;
3085 }
3086
3087 int ret = http_manager.set_threaded();
3088 if (ret < 0) {
3089 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
3090 return ret;
3091 }
3092
3093
3094 const string key = bucket.get_key();
3095
3096 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3097 { NULL, NULL } };
3098
3099 string path = string("/admin/metadata/bucket.instance");
3100
3101 bucket_instance_meta_info result;
3102 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3103 if (ret < 0) {
3104 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
3105 return ret;
3106 }
3107
3108 RGWBucketInfo& bi = result.data.get_bucket_info();
3109 num_shards = bi.num_shards;
3110
3111 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3112
3113 sync_module.reset(new RGWDefaultSyncModuleInstance());
3114
3115 int effective_num_shards = (num_shards ? num_shards : 1);
3116
3117 auto async_rados = store->get_async_rados();
3118
3119 for (int i = 0; i < effective_num_shards; i++) {
3120 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
3121 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
3122 if (ret < 0) {
3123 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
3124 return ret;
3125 }
3126 source_logs[i] = l;
3127 }
3128
3129 return 0;
3130 }
3131
3132 int RGWBucketSyncStatusManager::init_sync_status()
3133 {
3134 list<RGWCoroutinesStack *> stacks;
3135
3136 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3137 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3138 RGWRemoteBucketLog *l = iter->second;
3139 stack->call(l->init_sync_status_cr());
3140
3141 stacks.push_back(stack);
3142 }
3143
3144 return cr_mgr.run(stacks);
3145 }
3146
3147 int RGWBucketSyncStatusManager::read_sync_status()
3148 {
3149 list<RGWCoroutinesStack *> stacks;
3150
3151 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3152 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3153 RGWRemoteBucketLog *l = iter->second;
3154 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3155
3156 stacks.push_back(stack);
3157 }
3158
3159 int ret = cr_mgr.run(stacks);
3160 if (ret < 0) {
3161 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
3162 << bucket_str{bucket} << dendl;
3163 return ret;
3164 }
3165
3166 return 0;
3167 }
3168
3169 int RGWBucketSyncStatusManager::run()
3170 {
3171 list<RGWCoroutinesStack *> stacks;
3172
3173 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3174 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3175 RGWRemoteBucketLog *l = iter->second;
3176 stack->call(l->run_sync_cr());
3177
3178 stacks.push_back(stack);
3179 }
3180
3181 int ret = cr_mgr.run(stacks);
3182 if (ret < 0) {
3183 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
3184 << bucket_str{bucket} << dendl;
3185 return ret;
3186 }
3187
3188 return 0;
3189 }
3190
3191 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3192 const rgw_bucket_shard& bs)
3193 {
3194 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3195 }
3196
3197 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3198 static constexpr int max_concurrent_shards = 16;
3199 RGWRados *const store;
3200 RGWDataSyncEnv *const env;
3201 const int num_shards;
3202 rgw_bucket_shard bs;
3203
3204 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3205 Vector::iterator i, end;
3206
3207 public:
3208 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3209 int num_shards, const rgw_bucket& bucket,
3210 Vector *status)
3211 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3212 store(store), env(env), num_shards(num_shards),
3213 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3214 i(status->begin()), end(status->end())
3215 {}
3216
3217 bool spawn_next() override {
3218 if (i == end) {
3219 return false;
3220 }
3221 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3222 ++i;
3223 ++bs.shard_id;
3224 return true;
3225 }
3226 };
3227
3228 int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
3229 const RGWBucketInfo& bucket_info,
3230 std::vector<rgw_bucket_shard_sync_info> *status)
3231 {
3232 const auto num_shards = bucket_info.num_shards;
3233 status->clear();
3234 status->resize(std::max<size_t>(1, num_shards));
3235
3236 RGWDataSyncEnv env;
3237 RGWSyncModuleInstanceRef module; // null sync module
3238 env.init(store->ctx(), store, nullptr, store->get_async_rados(),
3239 nullptr, nullptr, source_zone, module, nullptr);
3240
3241 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
3242 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3243 bucket_info.bucket, status));
3244 }
3245
3246
3247 // TODO: move into rgw_data_sync_trim.cc
3248 #undef dout_prefix
3249 #define dout_prefix (*_dout << "data trim: ")
3250
3251 namespace {
3252
3253 /// return the marker that it's safe to trim up to
3254 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3255 {
3256 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3257 }
3258
3259 /// comparison operator for take_min_markers()
3260 bool operator<(const rgw_data_sync_marker& lhs,
3261 const rgw_data_sync_marker& rhs)
3262 {
3263 // sort by stable marker
3264 return get_stable_marker(lhs) < get_stable_marker(rhs);
3265 }
3266
3267 /// populate the container starting with 'dest' with the minimum stable marker
3268 /// of each shard for all of the peers in [first, last)
3269 template <typename IterIn, typename IterOut>
3270 void take_min_markers(IterIn first, IterIn last, IterOut dest)
3271 {
3272 if (first == last) {
3273 return;
3274 }
3275 // initialize markers with the first peer's
3276 auto m = dest;
3277 for (auto &shard : first->sync_markers) {
3278 *m = std::move(shard.second);
3279 ++m;
3280 }
3281 // for remaining peers, replace with smaller markers
3282 for (auto p = first + 1; p != last; ++p) {
3283 m = dest;
3284 for (auto &shard : p->sync_markers) {
3285 if (shard.second < *m) {
3286 *m = std::move(shard.second);
3287 }
3288 ++m;
3289 }
3290 }
3291 }
3292
3293 } // anonymous namespace
3294
3295 class DataLogTrimCR : public RGWCoroutine {
3296 RGWRados *store;
3297 RGWHTTPManager *http;
3298 const int num_shards;
3299 const std::string& zone_id; //< my zone id
3300 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3301 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3302 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3303 int ret{0};
3304
3305 public:
3306 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3307 int num_shards, std::vector<std::string>& last_trim)
3308 : RGWCoroutine(store->ctx()), store(store), http(http),
3309 num_shards(num_shards),
3310 zone_id(store->get_zone().id),
3311 peer_status(store->zone_conn_map.size()),
3312 min_shard_markers(num_shards),
3313 last_trim(last_trim)
3314 {}
3315
3316 int operate() override;
3317 };
3318
3319 int DataLogTrimCR::operate()
3320 {
3321 reenter(this) {
3322 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3323 set_status("fetching sync status");
3324 yield {
3325 // query data sync status from each sync peer
3326 rgw_http_param_pair params[] = {
3327 { "type", "data" },
3328 { "status", nullptr },
3329 { "source-zone", zone_id.c_str() },
3330 { nullptr, nullptr }
3331 };
3332
3333 auto p = peer_status.begin();
3334 for (auto& c : store->zone_conn_map) {
3335 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3336 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3337 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3338 false);
3339 ++p;
3340 }
3341 }
3342
3343 // must get a successful reply from all peers to consider trimming
3344 ret = 0;
3345 while (ret == 0 && num_spawned() > 0) {
3346 yield wait_for_child();
3347 collect_next(&ret);
3348 }
3349 drain_all();
3350
3351 if (ret < 0) {
3352 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3353 return set_cr_error(ret);
3354 }
3355
3356 ldout(cct, 10) << "trimming log shards" << dendl;
3357 set_status("trimming log shards");
3358 yield {
3359 // determine the minimum marker for each shard
3360 take_min_markers(peer_status.begin(), peer_status.end(),
3361 min_shard_markers.begin());
3362
3363 for (int i = 0; i < num_shards; i++) {
3364 const auto& m = min_shard_markers[i];
3365 auto& stable = get_stable_marker(m);
3366 if (stable <= last_trim[i]) {
3367 continue;
3368 }
3369 ldout(cct, 10) << "trimming log shard " << i
3370 << " at marker=" << stable
3371 << " last_trim=" << last_trim[i] << dendl;
3372 using TrimCR = RGWSyncLogTrimCR;
3373 spawn(new TrimCR(store, store->data_log->get_oid(i),
3374 stable, &last_trim[i]),
3375 true);
3376 }
3377 }
3378 return set_cr_done();
3379 }
3380 return 0;
3381 }
3382
3383 class DataLogTrimPollCR : public RGWCoroutine {
3384 RGWRados *store;
3385 RGWHTTPManager *http;
3386 const int num_shards;
3387 const utime_t interval; //< polling interval
3388 const std::string lock_oid; //< use first data log shard for lock
3389 const std::string lock_cookie;
3390 std::vector<std::string> last_trim; //< last trimmed marker per shard
3391
3392 public:
3393 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3394 int num_shards, utime_t interval)
3395 : RGWCoroutine(store->ctx()), store(store), http(http),
3396 num_shards(num_shards), interval(interval),
3397 lock_oid(store->data_log->get_oid(0)),
3398 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3399 last_trim(num_shards)
3400 {}
3401
3402 int operate() override;
3403 };
3404
3405 int DataLogTrimPollCR::operate()
3406 {
3407 reenter(this) {
3408 for (;;) {
3409 set_status("sleeping");
3410 wait(interval);
3411
3412 // request a 'data_trim' lock that covers the entire wait interval to
3413 // prevent other gateways from attempting to trim for the duration
3414 set_status("acquiring trim lock");
3415 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3416 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3417 "data_trim", lock_cookie,
3418 interval.sec()));
3419 if (retcode < 0) {
3420 // if the lock is already held, go back to sleep and try again later
3421 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3422 << interval.sec() << "s" << dendl;
3423 continue;
3424 }
3425
3426 set_status("trimming");
3427 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3428
3429 // note that the lock is not released. this is intentional, as it avoids
3430 // duplicating this work in other gateways
3431 }
3432 }
3433 return 0;
3434 }
3435
3436 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3437 RGWHTTPManager *http,
3438 int num_shards, utime_t interval)
3439 {
3440 return new DataLogTrimPollCR(store, http, num_shards, interval);
3441 }