]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_data_sync.cc
import ceph 12.2.12
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/utility/string_ref.hpp>
5
6 #include "common/ceph_json.h"
7 #include "common/RWLock.h"
8 #include "common/RefCountedObj.h"
9 #include "common/WorkQueue.h"
10 #include "common/Throttle.h"
11 #include "common/errno.h"
12
13 #include "rgw_common.h"
14 #include "rgw_rados.h"
15 #include "rgw_sync.h"
16 #include "rgw_data_sync.h"
17 #include "rgw_rest_conn.h"
18 #include "rgw_cr_rados.h"
19 #include "rgw_cr_rest.h"
20 #include "rgw_http_client.h"
21 #include "rgw_bucket.h"
22 #include "rgw_metadata.h"
23 #include "rgw_sync_module.h"
24 #include "rgw_sync_log_trim.h"
25
26 #include "cls/lock/cls_lock_client.h"
27
28 #include "auth/Crypto.h"
29
30 #include <boost/asio/yield.hpp>
31
32 #define dout_subsys ceph_subsys_rgw
33
34 #undef dout_prefix
35 #define dout_prefix (*_dout << "data sync: ")
36
37 static string datalog_sync_status_oid_prefix = "datalog.sync-status";
38 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
39 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
40 static string bucket_status_oid_prefix = "bucket.sync-status";
41
42 class RGWSyncDebugLogger {
43 CephContext *cct;
44 string prefix;
45
46 bool ended;
47
48 public:
49 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
50 const string& sync_type, const string& sync_stage,
51 const string& resource, bool log_start = true) {
52 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
53 }
54 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
55 ~RGWSyncDebugLogger();
56
57 void init(CephContext *_cct, const string& source_zone,
58 const string& sync_type, const string& sync_stage,
59 const string& resource, bool log_start = true);
60 void log(const string& state);
61 void finish(int status);
62 };
63
64 void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
65 const string& sync_type, const string& sync_section,
66 const string& resource, bool log_start)
67 {
68 cct = _cct;
69 ended = false;
70 string zone_str = source_zone.substr(0, 8);
71 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
72 if (log_start) {
73 log("start");
74 }
75 }
76
77 RGWSyncDebugLogger::~RGWSyncDebugLogger()
78 {
79 if (!ended) {
80 log("finish");
81 }
82 }
83
84 void RGWSyncDebugLogger::log(const string& state)
85 {
86 ldout(cct, 5) << prefix << ":" << state << dendl;
87 }
88
89 void RGWSyncDebugLogger::finish(int status)
90 {
91 ended = true;
92 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
93 }
94
95 class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
96 public:
97 RGWDataSyncDebugLogger() {}
98 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
99 const string& resource, bool log_start = true) {
100 init(sync_env, sync_section, resource, log_start);
101 }
102 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
103 const string& resource, bool log_start = true) {
104 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
105 }
106
107 };
108
109 void rgw_datalog_info::decode_json(JSONObj *obj) {
110 JSONDecoder::decode_json("num_objects", num_shards, obj);
111 }
112
113 void rgw_datalog_entry::decode_json(JSONObj *obj) {
114 JSONDecoder::decode_json("key", key, obj);
115 utime_t ut;
116 JSONDecoder::decode_json("timestamp", ut, obj);
117 timestamp = ut.to_real_time();
118 }
119
120 void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
121 JSONDecoder::decode_json("marker", marker, obj);
122 JSONDecoder::decode_json("truncated", truncated, obj);
123 JSONDecoder::decode_json("entries", entries, obj);
124 };
125
126 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
127 static constexpr int MAX_CONCURRENT_SHARDS = 16;
128
129 RGWDataSyncEnv *env;
130 const int num_shards;
131 int shard_id{0};;
132
133 map<uint32_t, rgw_data_sync_marker>& markers;
134
135 public:
136 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
137 map<uint32_t, rgw_data_sync_marker>& markers)
138 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
139 env(env), num_shards(num_shards), markers(markers)
140 {}
141 bool spawn_next() override;
142 };
143
144 bool RGWReadDataSyncStatusMarkersCR::spawn_next()
145 {
146 if (shard_id >= num_shards) {
147 return false;
148 }
149 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
150 spawn(new CR(env->async_rados, env->store,
151 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
152 &markers[shard_id]),
153 false);
154 shard_id++;
155 return true;
156 }
157
158 class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
159 static constexpr int MAX_CONCURRENT_SHARDS = 16;
160
161 RGWDataSyncEnv *env;
162
163 uint64_t max_entries;
164 int num_shards;
165 int shard_id{0};;
166
167 string marker;
168 map<int, std::set<std::string>> &entries_map;
169
170 public:
171 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
172 map<int, std::set<std::string>>& _entries_map)
173 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
174 max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
175 {}
176 bool spawn_next() override;
177 };
178
179 bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
180 {
181 if (shard_id > num_shards)
182 return false;
183
184 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
185 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
186 marker, &entries_map[shard_id], max_entries), false);
187
188 ++shard_id;
189 return true;
190 }
191
192 class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
193 RGWDataSyncEnv *sync_env;
194 rgw_data_sync_status *sync_status;
195
196 public:
197 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
198 rgw_data_sync_status *_status)
199 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
200 {}
201 int operate() override;
202 };
203
204 int RGWReadDataSyncStatusCoroutine::operate()
205 {
206 reenter(this) {
207 // read sync info
208 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
209 yield {
210 bool empty_on_enoent = false; // fail on ENOENT
211 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
212 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
213 &sync_status->sync_info, empty_on_enoent));
214 }
215 if (retcode < 0) {
216 ldout(sync_env->cct, 4) << "failed to read sync status info with "
217 << cpp_strerror(retcode) << dendl;
218 return set_cr_error(retcode);
219 }
220 // read shard markers
221 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
222 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
223 sync_status->sync_markers));
224 if (retcode < 0) {
225 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
226 << cpp_strerror(retcode) << dendl;
227 return set_cr_error(retcode);
228 }
229 return set_cr_done();
230 }
231 return 0;
232 }
233
234 class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
235 RGWDataSyncEnv *sync_env;
236
237 RGWRESTReadResource *http_op;
238
239 int shard_id;
240 RGWDataChangesLogInfo *shard_info;
241
242 public:
243 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
244 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
245 sync_env(_sync_env),
246 http_op(NULL),
247 shard_id(_shard_id),
248 shard_info(_shard_info) {
249 }
250
251 ~RGWReadRemoteDataLogShardInfoCR() override {
252 if (http_op) {
253 http_op->put();
254 }
255 }
256
257 int operate() override {
258 reenter(this) {
259 yield {
260 char buf[16];
261 snprintf(buf, sizeof(buf), "%d", shard_id);
262 rgw_http_param_pair pairs[] = { { "type" , "data" },
263 { "id", buf },
264 { "info" , NULL },
265 { NULL, NULL } };
266
267 string p = "/admin/log/";
268
269 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
270
271 http_op->set_user_info((void *)stack);
272
273 int ret = http_op->aio_read();
274 if (ret < 0) {
275 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
276 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
277 return set_cr_error(ret);
278 }
279
280 return io_block(0);
281 }
282 yield {
283 int ret = http_op->wait(shard_info);
284 if (ret < 0) {
285 return set_cr_error(ret);
286 }
287 return set_cr_done();
288 }
289 }
290 return 0;
291 }
292 };
293
294 struct read_remote_data_log_response {
295 string marker;
296 bool truncated;
297 list<rgw_data_change_log_entry> entries;
298
299 read_remote_data_log_response() : truncated(false) {}
300
301 void decode_json(JSONObj *obj) {
302 JSONDecoder::decode_json("marker", marker, obj);
303 JSONDecoder::decode_json("truncated", truncated, obj);
304 JSONDecoder::decode_json("entries", entries, obj);
305 };
306 };
307
308 class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
309 RGWDataSyncEnv *sync_env;
310
311 RGWRESTReadResource *http_op = nullptr;
312
313 int shard_id;
314 const std::string& marker;
315 string *pnext_marker;
316 list<rgw_data_change_log_entry> *entries;
317 bool *truncated;
318
319 read_remote_data_log_response response;
320
321 public:
322 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env, int _shard_id,
323 const std::string& marker, string *pnext_marker,
324 list<rgw_data_change_log_entry> *_entries,
325 bool *_truncated)
326 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
327 shard_id(_shard_id), marker(marker), pnext_marker(pnext_marker),
328 entries(_entries), truncated(_truncated) {
329 }
330 ~RGWReadRemoteDataLogShardCR() override {
331 if (http_op) {
332 http_op->put();
333 }
334 }
335
336 int operate() override {
337 reenter(this) {
338 yield {
339 char buf[16];
340 snprintf(buf, sizeof(buf), "%d", shard_id);
341 rgw_http_param_pair pairs[] = { { "type" , "data" },
342 { "id", buf },
343 { "marker", marker.c_str() },
344 { "extra-info", "true" },
345 { NULL, NULL } };
346
347 string p = "/admin/log/";
348
349 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
350
351 http_op->set_user_info((void *)stack);
352
353 int ret = http_op->aio_read();
354 if (ret < 0) {
355 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
356 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
357 return set_cr_error(ret);
358 }
359
360 return io_block(0);
361 }
362 yield {
363 int ret = http_op->wait(&response);
364 if (ret < 0) {
365 return set_cr_error(ret);
366 }
367 entries->clear();
368 entries->swap(response.entries);
369 *pnext_marker = response.marker;
370 *truncated = response.truncated;
371 return set_cr_done();
372 }
373 }
374 return 0;
375 }
376 };
377
378 class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
379 RGWDataSyncEnv *sync_env;
380
381 int num_shards;
382 map<int, RGWDataChangesLogInfo> *datalog_info;
383
384 int shard_id;
385 #define READ_DATALOG_MAX_CONCURRENT 10
386
387 public:
388 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
389 int _num_shards,
390 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
391 sync_env(_sync_env), num_shards(_num_shards),
392 datalog_info(_datalog_info), shard_id(0) {}
393 bool spawn_next() override;
394 };
395
396 bool RGWReadRemoteDataLogInfoCR::spawn_next() {
397 if (shard_id >= num_shards) {
398 return false;
399 }
400 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
401 shard_id++;
402 return true;
403 }
404
405 class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
406 RGWDataSyncEnv *sync_env;
407 RGWRESTReadResource *http_op;
408
409 int shard_id;
410 string marker;
411 uint32_t max_entries;
412 rgw_datalog_shard_data *result;
413
414 public:
415 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
416 const string& _marker, uint32_t _max_entries,
417 rgw_datalog_shard_data *_result)
418 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
419 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
420
421 int send_request() override {
422 RGWRESTConn *conn = sync_env->conn;
423 RGWRados *store = sync_env->store;
424
425 char buf[32];
426 snprintf(buf, sizeof(buf), "%d", shard_id);
427
428 char max_entries_buf[32];
429 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
430
431 const char *marker_key = (marker.empty() ? "" : "marker");
432
433 rgw_http_param_pair pairs[] = { { "type", "data" },
434 { "id", buf },
435 { "max-entries", max_entries_buf },
436 { marker_key, marker.c_str() },
437 { NULL, NULL } };
438
439 string p = "/admin/log/";
440
441 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
442 http_op->set_user_info((void *)stack);
443
444 int ret = http_op->aio_read();
445 if (ret < 0) {
446 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
447 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
448 http_op->put();
449 return ret;
450 }
451
452 return 0;
453 }
454
455 int request_complete() override {
456 int ret = http_op->wait(result);
457 http_op->put();
458 if (ret < 0 && ret != -ENOENT) {
459 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
460 return ret;
461 }
462 return 0;
463 }
464 };
465
466 class RGWListRemoteDataLogCR : public RGWShardCollectCR {
467 RGWDataSyncEnv *sync_env;
468
469 map<int, string> shards;
470 int max_entries_per_shard;
471 map<int, rgw_datalog_shard_data> *result;
472
473 map<int, string>::iterator iter;
474 #define READ_DATALOG_MAX_CONCURRENT 10
475
476 public:
477 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
478 map<int, string>& _shards,
479 int _max_entries_per_shard,
480 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
481 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
482 result(_result) {
483 shards.swap(_shards);
484 iter = shards.begin();
485 }
486 bool spawn_next() override;
487 };
488
489 bool RGWListRemoteDataLogCR::spawn_next() {
490 if (iter == shards.end()) {
491 return false;
492 }
493
494 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
495 ++iter;
496 return true;
497 }
498
499 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
500 static constexpr uint32_t lock_duration = 30;
501 RGWDataSyncEnv *sync_env;
502 RGWRados *store;
503 const rgw_pool& pool;
504 const uint32_t num_shards;
505
506 string sync_status_oid;
507
508 string lock_name;
509 string cookie;
510 rgw_data_sync_status *status;
511 map<int, RGWDataChangesLogInfo> shards_info;
512 public:
513 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
514 uint64_t instance_id,
515 rgw_data_sync_status *status)
516 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
517 pool(store->get_zone_params().log_pool),
518 num_shards(num_shards), status(status) {
519 lock_name = "sync_lock";
520
521 status->sync_info.instance_id = instance_id;
522
523 #define COOKIE_LEN 16
524 char buf[COOKIE_LEN + 1];
525
526 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
527 cookie = buf;
528
529 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
530 }
531
532 int operate() override {
533 int ret;
534 reenter(this) {
535 using LockCR = RGWSimpleRadosLockCR;
536 yield call(new LockCR(sync_env->async_rados, store,
537 rgw_raw_obj{pool, sync_status_oid},
538 lock_name, cookie, lock_duration));
539 if (retcode < 0) {
540 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
541 return set_cr_error(retcode);
542 }
543 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
544 yield call(new WriteInfoCR(sync_env->async_rados, store,
545 rgw_raw_obj{pool, sync_status_oid},
546 status->sync_info));
547 if (retcode < 0) {
548 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
549 return set_cr_error(retcode);
550 }
551
552 /* take lock again, we just recreated the object */
553 yield call(new LockCR(sync_env->async_rados, store,
554 rgw_raw_obj{pool, sync_status_oid},
555 lock_name, cookie, lock_duration));
556 if (retcode < 0) {
557 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
558 return set_cr_error(retcode);
559 }
560
561 /* fetch current position in logs */
562 yield {
563 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
564 if (!conn) {
565 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
566 return set_cr_error(-EIO);
567 }
568 for (uint32_t i = 0; i < num_shards; i++) {
569 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
570 }
571 }
572 while (collect(&ret, NULL)) {
573 if (ret < 0) {
574 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
575 return set_state(RGWCoroutine_Error);
576 }
577 yield;
578 }
579 yield {
580 for (uint32_t i = 0; i < num_shards; i++) {
581 RGWDataChangesLogInfo& info = shards_info[i];
582 auto& marker = status->sync_markers[i];
583 marker.next_step_marker = info.marker;
584 marker.timestamp = info.last_update;
585 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
586 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
587 spawn(new WriteMarkerCR(sync_env->async_rados, store,
588 rgw_raw_obj{pool, oid}, marker), true);
589 }
590 }
591 while (collect(&ret, NULL)) {
592 if (ret < 0) {
593 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
594 return set_state(RGWCoroutine_Error);
595 }
596 yield;
597 }
598
599 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
600 yield call(new WriteInfoCR(sync_env->async_rados, store,
601 rgw_raw_obj{pool, sync_status_oid},
602 status->sync_info));
603 if (retcode < 0) {
604 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
605 return set_cr_error(retcode);
606 }
607 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
608 rgw_raw_obj{pool, sync_status_oid},
609 lock_name, cookie));
610 return set_cr_done();
611 }
612 return 0;
613 }
614 };
615
616 int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
617 {
618 rgw_http_param_pair pairs[] = { { "type", "data" },
619 { NULL, NULL } };
620
621 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
622 if (ret < 0) {
623 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
624 return ret;
625 }
626
627 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
628
629 return 0;
630 }
631
632 int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
633 {
634 rgw_datalog_info log_info;
635 int ret = read_log_info(&log_info);
636 if (ret < 0) {
637 return ret;
638 }
639
640 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
641 }
642
643 int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
644 {
645 if (store->is_meta_master()) {
646 return 0;
647 }
648
649 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
650 }
651
652 int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
653 {
654 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
655 _source_zone, _sync_module);
656
657 if (initialized) {
658 return 0;
659 }
660
661 int ret = http_manager.set_threaded();
662 if (ret < 0) {
663 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
664 return ret;
665 }
666
667 initialized = true;
668
669 return 0;
670 }
671
672 void RGWRemoteDataLog::finish()
673 {
674 stop();
675 }
676
677 int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
678 {
679 // cannot run concurrently with run_sync(), so run in a separate manager
680 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
681 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
682 int ret = http_manager.set_threaded();
683 if (ret < 0) {
684 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
685 return ret;
686 }
687 RGWDataSyncEnv sync_env_local = sync_env;
688 sync_env_local.http_manager = &http_manager;
689 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
690 http_manager.stop();
691 return ret;
692 }
693
694 int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
695 {
696 // cannot run concurrently with run_sync(), so run in a separate manager
697 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
698 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
699 int ret = http_manager.set_threaded();
700 if (ret < 0) {
701 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
702 return ret;
703 }
704 RGWDataSyncEnv sync_env_local = sync_env;
705 sync_env_local.http_manager = &http_manager;
706 map<int, std::set<std::string>> entries_map;
707 uint64_t max_entries{1};
708 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
709 http_manager.stop();
710
711 if (ret == 0) {
712 for (const auto& entry : entries_map) {
713 if (entry.second.size() != 0) {
714 recovering_shards.insert(entry.first);
715 }
716 }
717 }
718
719 return ret;
720 }
721
722 int RGWRemoteDataLog::init_sync_status(int num_shards)
723 {
724 rgw_data_sync_status sync_status;
725 sync_status.sync_info.num_shards = num_shards;
726
727 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
728 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
729 int ret = http_manager.set_threaded();
730 if (ret < 0) {
731 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
732 return ret;
733 }
734 RGWDataSyncEnv sync_env_local = sync_env;
735 sync_env_local.http_manager = &http_manager;
736 uint64_t instance_id;
737 get_random_bytes((char *)&instance_id, sizeof(instance_id));
738 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
739 http_manager.stop();
740 return ret;
741 }
742
743 static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
744 {
745 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
746 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
747 return string(buf);
748 }
749
750 struct bucket_instance_meta_info {
751 string key;
752 obj_version ver;
753 utime_t mtime;
754 RGWBucketInstanceMetadataObject data;
755
756 bucket_instance_meta_info() {}
757
758 void decode_json(JSONObj *obj) {
759 JSONDecoder::decode_json("key", key, obj);
760 JSONDecoder::decode_json("ver", ver, obj);
761 JSONDecoder::decode_json("mtime", mtime, obj);
762 JSONDecoder::decode_json("data", data, obj);
763 }
764 };
765
766 class RGWListBucketIndexesCR : public RGWCoroutine {
767 RGWDataSyncEnv *sync_env;
768
769 RGWRados *store;
770
771 rgw_data_sync_status *sync_status;
772 int num_shards;
773
774 int req_ret;
775 int ret;
776
777 list<string> result;
778 list<string>::iterator iter;
779
780 RGWShardedOmapCRManager *entries_index;
781
782 string oid_prefix;
783
784 string path;
785 bucket_instance_meta_info meta_info;
786 string key;
787 string s;
788 int i;
789
790 bool failed;
791
792 public:
793 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
794 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
795 store(sync_env->store), sync_status(_sync_status),
796 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
797 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
798 path = "/admin/metadata/bucket.instance";
799 num_shards = sync_status->sync_info.num_shards;
800 }
801 ~RGWListBucketIndexesCR() override {
802 delete entries_index;
803 }
804
805 int operate() override {
806 reenter(this) {
807 yield {
808 string entrypoint = string("/admin/metadata/bucket.instance");
809 /* FIXME: need a better scaling solution here, requires streaming output */
810 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
811 entrypoint, NULL, &result));
812 }
813 if (retcode < 0) {
814 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
815 return set_cr_error(retcode);
816 }
817 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
818 store->get_zone_params().log_pool,
819 oid_prefix);
820 yield; // yield so OmapAppendCRs can start
821 for (iter = result.begin(); iter != result.end(); ++iter) {
822 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
823
824 key = *iter;
825
826 yield {
827 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
828 { NULL, NULL } };
829
830 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
831 }
832
833 num_shards = meta_info.data.get_bucket_info().num_shards;
834 if (num_shards > 0) {
835 for (i = 0; i < num_shards; i++) {
836 char buf[16];
837 snprintf(buf, sizeof(buf), ":%d", i);
838 s = key + buf;
839 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
840 }
841 } else {
842 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
843 }
844 }
845 yield {
846 if (!entries_index->finish()) {
847 failed = true;
848 }
849 }
850 if (!failed) {
851 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
852 int shard_id = (int)iter->first;
853 rgw_data_sync_marker& marker = iter->second;
854 marker.total_entries = entries_index->get_total_entries(shard_id);
855 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
856 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
857 marker), true);
858 }
859 } else {
860 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
861 EIO, string("failed to build bucket instances map")));
862 }
863 while (collect(&ret, NULL)) {
864 if (ret < 0) {
865 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
866 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
867 req_ret = ret;
868 }
869 yield;
870 }
871 drain_all();
872 if (req_ret < 0) {
873 yield return set_cr_error(req_ret);
874 }
875 yield return set_cr_done();
876 }
877 return 0;
878 }
879 };
880
881 #define DATA_SYNC_UPDATE_MARKER_WINDOW 1
882
883 class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
884 RGWDataSyncEnv *sync_env;
885
886 string marker_oid;
887 rgw_data_sync_marker sync_marker;
888
889 map<string, string> key_to_marker;
890 map<string, string> marker_to_key;
891
892 void handle_finish(const string& marker) override {
893 map<string, string>::iterator iter = marker_to_key.find(marker);
894 if (iter == marker_to_key.end()) {
895 return;
896 }
897 key_to_marker.erase(iter->second);
898 reset_need_retry(iter->second);
899 marker_to_key.erase(iter);
900 }
901
902 public:
903 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
904 const string& _marker_oid,
905 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
906 sync_env(_sync_env),
907 marker_oid(_marker_oid),
908 sync_marker(_marker) {}
909
910 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
911 sync_marker.marker = new_marker;
912 sync_marker.pos = index_pos;
913
914 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
915 RGWRados *store = sync_env->store;
916
917 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
918 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
919 sync_marker);
920 }
921
922 /*
923 * create index from key -> marker, and from marker -> key
924 * this is useful so that we can insure that we only have one
925 * entry for any key that is used. This is needed when doing
926 * incremenatl sync of data, and we don't want to run multiple
927 * concurrent sync operations for the same bucket shard
928 */
929 bool index_key_to_marker(const string& key, const string& marker) {
930 if (key_to_marker.find(key) != key_to_marker.end()) {
931 set_need_retry(key);
932 return false;
933 }
934 key_to_marker[key] = marker;
935 marker_to_key[marker] = key;
936 return true;
937 }
938
939 RGWOrderCallCR *allocate_order_control_cr() {
940 return new RGWLastCallerWinsCR(sync_env->cct);
941 }
942 };
943
944 // ostream wrappers to print buckets without copying strings
945 struct bucket_str {
946 const rgw_bucket& b;
947 bucket_str(const rgw_bucket& b) : b(b) {}
948 };
949 std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
950 auto& b = rhs.b;
951 if (!b.tenant.empty()) {
952 out << b.tenant << '/';
953 }
954 out << b.name;
955 if (!b.bucket_id.empty()) {
956 out << ':' << b.bucket_id;
957 }
958 return out;
959 }
960
961 struct bucket_shard_str {
962 const rgw_bucket_shard& bs;
963 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
964 };
965 std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
966 auto& bs = rhs.bs;
967 out << bucket_str{bs.bucket};
968 if (bs.shard_id >= 0) {
969 out << ':' << bs.shard_id;
970 }
971 return out;
972 }
973
974 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
975 RGWDataSyncEnv *sync_env;
976 rgw_bucket_shard bs;
977 RGWBucketInfo bucket_info;
978 rgw_bucket_shard_sync_info sync_status;
979 RGWMetaSyncEnv meta_sync_env;
980
981 RGWDataSyncDebugLogger logger;
982 const std::string status_oid;
983
984 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
985 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
986
987 public:
988 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
989 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
990 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
991 logger.init(sync_env, "Bucket", bs.get_key());
992 }
993 ~RGWRunBucketSyncCoroutine() override {
994 if (lease_cr) {
995 lease_cr->abort();
996 }
997 }
998
999 int operate() override;
1000 };
1001
1002 class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1003 RGWDataSyncEnv *sync_env;
1004
1005 string raw_key;
1006 string entry_marker;
1007
1008 rgw_bucket_shard bs;
1009
1010 int sync_status;
1011
1012 bufferlist md_bl;
1013
1014 RGWDataSyncShardMarkerTrack *marker_tracker;
1015
1016 boost::intrusive_ptr<RGWOmapAppend> error_repo;
1017 bool remove_from_repo;
1018
1019 set<string> keys;
1020
1021 public:
1022 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1023 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
1024 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
1025 sync_env(_sync_env),
1026 raw_key(_raw_key), entry_marker(_entry_marker),
1027 sync_status(0),
1028 marker_tracker(_marker_tracker),
1029 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1030 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
1031 }
1032
1033 int operate() override {
1034 reenter(this) {
1035 do {
1036 yield {
1037 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1038 &bs.bucket, &bs.shard_id);
1039 if (ret < 0) {
1040 return set_cr_error(-EIO);
1041 }
1042 if (marker_tracker) {
1043 marker_tracker->reset_need_retry(raw_key);
1044 }
1045 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
1046 }
1047 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1048
1049 sync_status = retcode;
1050
1051 if (sync_status == -ENOENT) {
1052 // this was added when 'tenant/' was added to datalog entries, because
1053 // preexisting tenant buckets could never sync and would stay in the
1054 // error_repo forever
1055 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
1056 "for missing bucket " << raw_key << dendl;
1057 sync_status = 0;
1058 }
1059
1060 if (sync_status < 0) {
1061 // write actual sync failures for 'radosgw-admin sync error list'
1062 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1063 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1064 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1065 if (retcode < 0) {
1066 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
1067 }
1068 }
1069 if (error_repo && !error_repo->append(raw_key)) {
1070 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
1071 }
1072 } else if (error_repo && remove_from_repo) {
1073 keys = {raw_key};
1074 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1075 if (retcode < 0) {
1076 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1077 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1078 }
1079 }
1080 /* FIXME: what do do in case of error */
1081 if (marker_tracker && !entry_marker.empty()) {
1082 /* update marker */
1083 yield call(marker_tracker->finish(entry_marker));
1084 }
1085 if (sync_status == 0) {
1086 sync_status = retcode;
1087 }
1088 if (sync_status < 0) {
1089 return set_cr_error(sync_status);
1090 }
1091 return set_cr_done();
1092 }
1093 return 0;
1094 }
1095 };
1096
1097 #define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1098 #define DATA_SYNC_MAX_ERR_ENTRIES 10
1099
1100 class RGWDataSyncShardCR : public RGWCoroutine {
1101 RGWDataSyncEnv *sync_env;
1102
1103 rgw_pool pool;
1104
1105 uint32_t shard_id;
1106 rgw_data_sync_marker sync_marker;
1107
1108 std::set<std::string> entries;
1109 std::set<std::string>::iterator iter;
1110
1111 string oid;
1112
1113 RGWDataSyncShardMarkerTrack *marker_tracker;
1114
1115 std::string next_marker;
1116 list<rgw_data_change_log_entry> log_entries;
1117 list<rgw_data_change_log_entry>::iterator log_iter;
1118 bool truncated;
1119
1120 Mutex inc_lock;
1121 Cond inc_cond;
1122
1123 boost::asio::coroutine incremental_cr;
1124 boost::asio::coroutine full_cr;
1125
1126
1127 set<string> modified_shards;
1128 set<string> current_modified;
1129
1130 set<string>::iterator modified_iter;
1131
1132 int total_entries;
1133
1134 int spawn_window;
1135
1136 bool *reset_backoff;
1137
1138 set<string> spawned_keys;
1139
1140 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1141 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
1142 string status_oid;
1143
1144
1145 string error_oid;
1146 RGWOmapAppend *error_repo;
1147 std::set<std::string> error_entries;
1148 string error_marker;
1149 int max_error_entries;
1150
1151 ceph::coarse_real_time error_retry_time;
1152
1153 #define RETRY_BACKOFF_SECS_MIN 60
1154 #define RETRY_BACKOFF_SECS_DEFAULT 60
1155 #define RETRY_BACKOFF_SECS_MAX 600
1156 uint32_t retry_backoff_secs;
1157
1158 RGWDataSyncDebugLogger logger;
1159 public:
1160 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1161 rgw_pool& _pool,
1162 uint32_t _shard_id, const rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1163 sync_env(_sync_env),
1164 pool(_pool),
1165 shard_id(_shard_id),
1166 sync_marker(_marker),
1167 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
1168 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1169 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1170 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1171 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1172 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1173 error_oid = status_oid + ".retry";
1174
1175 logger.init(sync_env, "DataShard", status_oid);
1176 }
1177
1178 ~RGWDataSyncShardCR() override {
1179 delete marker_tracker;
1180 if (lease_cr) {
1181 lease_cr->abort();
1182 }
1183 if (error_repo) {
1184 error_repo->put();
1185 }
1186 }
1187
1188 void append_modified_shards(set<string>& keys) {
1189 Mutex::Locker l(inc_lock);
1190 modified_shards.insert(keys.begin(), keys.end());
1191 }
1192
1193 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1194 delete marker_tracker;
1195 marker_tracker = mt;
1196 }
1197
1198 int operate() override {
1199 int r;
1200 while (true) {
1201 switch (sync_marker.state) {
1202 case rgw_data_sync_marker::FullSync:
1203 r = full_sync();
1204 if (r < 0) {
1205 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1206 return set_cr_error(r);
1207 }
1208 return 0;
1209 case rgw_data_sync_marker::IncrementalSync:
1210 r = incremental_sync();
1211 if (r < 0) {
1212 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1213 return set_cr_error(r);
1214 }
1215 return 0;
1216 default:
1217 return set_cr_error(-EIO);
1218 }
1219 }
1220 return 0;
1221 }
1222
1223 void init_lease_cr() {
1224 set_status("acquiring sync lock");
1225 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1226 string lock_name = "sync_lock";
1227 if (lease_cr) {
1228 lease_cr->abort();
1229 }
1230 RGWRados *store = sync_env->store;
1231 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1232 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1233 lock_name, lock_duration, this));
1234 lease_stack.reset(spawn(lease_cr.get(), false));
1235 }
1236
1237 int full_sync() {
1238 #define OMAP_GET_MAX_ENTRIES 100
1239 int max_entries = OMAP_GET_MAX_ENTRIES;
1240 reenter(&full_cr) {
1241 yield init_lease_cr();
1242 while (!lease_cr->is_locked()) {
1243 if (lease_cr->is_done()) {
1244 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1245 set_status("lease lock failed, early abort");
1246 drain_all();
1247 return set_cr_error(lease_cr->get_ret_status());
1248 }
1249 set_sleeping(true);
1250 yield;
1251 }
1252 logger.log("full sync");
1253 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1254 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1255 total_entries = sync_marker.pos;
1256 do {
1257 if (!lease_cr->is_locked()) {
1258 stop_spawned_services();
1259 drain_all();
1260 return set_cr_error(-ECANCELED);
1261 }
1262 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1263 if (retcode < 0) {
1264 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1265 lease_cr->go_down();
1266 drain_all();
1267 return set_cr_error(retcode);
1268 }
1269 iter = entries.begin();
1270 for (; iter != entries.end(); ++iter) {
1271 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
1272 total_entries++;
1273 if (!marker_tracker->start(*iter, total_entries, real_time())) {
1274 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
1275 } else {
1276 // fetch remote and write locally
1277 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
1278 }
1279 sync_marker.marker = *iter;
1280
1281 while ((int)num_spawned() > spawn_window) {
1282 set_status() << "num_spawned() > spawn_window";
1283 yield wait_for_child();
1284 int ret;
1285 while (collect(&ret, lease_stack.get())) {
1286 if (ret < 0) {
1287 ldout(cct, 10) << "a sync operation returned error" << dendl;
1288 }
1289 }
1290 }
1291 }
1292 } while ((int)entries.size() == max_entries);
1293
1294 drain_all_but_stack(lease_stack.get());
1295
1296 yield {
1297 /* update marker to reflect we're done with full sync */
1298 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1299 sync_marker.marker = sync_marker.next_step_marker;
1300 sync_marker.next_step_marker.clear();
1301 RGWRados *store = sync_env->store;
1302 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1303 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1304 sync_marker));
1305 }
1306 if (retcode < 0) {
1307 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1308 lease_cr->go_down();
1309 drain_all();
1310 return set_cr_error(retcode);
1311 }
1312 // keep lease and transition to incremental_sync()
1313 }
1314 return 0;
1315 }
1316
1317 int incremental_sync() {
1318 reenter(&incremental_cr) {
1319 ldout(cct, 10) << "start incremental sync" << dendl;
1320 if (lease_cr) {
1321 ldout(cct, 10) << "lease already held from full sync" << dendl;
1322 } else {
1323 yield init_lease_cr();
1324 while (!lease_cr->is_locked()) {
1325 if (lease_cr->is_done()) {
1326 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1327 set_status("lease lock failed, early abort");
1328 drain_all();
1329 return set_cr_error(lease_cr->get_ret_status());
1330 }
1331 set_sleeping(true);
1332 yield;
1333 }
1334 set_status("lease acquired");
1335 ldout(cct, 10) << "took lease" << dendl;
1336 }
1337 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1338 rgw_raw_obj(pool, error_oid),
1339 1 /* no buffer */);
1340 error_repo->get();
1341 spawn(error_repo, false);
1342 logger.log("inc sync");
1343 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1344 do {
1345 if (!lease_cr->is_locked()) {
1346 stop_spawned_services();
1347 drain_all();
1348 return set_cr_error(-ECANCELED);
1349 }
1350 current_modified.clear();
1351 inc_lock.Lock();
1352 current_modified.swap(modified_shards);
1353 inc_lock.Unlock();
1354
1355 /* process out of band updates */
1356 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1357 yield {
1358 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1359 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1360 }
1361 }
1362
1363 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1364 /* process bucket shards that previously failed */
1365 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1366 error_marker, &error_entries,
1367 max_error_entries));
1368 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1369 iter = error_entries.begin();
1370 for (; iter != error_entries.end(); ++iter) {
1371 error_marker = *iter;
1372 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
1373 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
1374 }
1375 if ((int)error_entries.size() != max_error_entries) {
1376 if (error_marker.empty() && error_entries.empty()) {
1377 /* the retry repo is empty, we back off a bit before calling it again */
1378 retry_backoff_secs *= 2;
1379 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1380 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1381 }
1382 } else {
1383 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
1384 }
1385 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1386 error_marker.clear();
1387 }
1388 }
1389
1390 #define INCREMENTAL_MAX_ENTRIES 100
1391 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
1392 spawned_keys.clear();
1393 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, sync_marker.marker,
1394 &next_marker, &log_entries, &truncated));
1395 if (retcode < 0) {
1396 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
1397 stop_spawned_services();
1398 drain_all();
1399 return set_cr_error(retcode);
1400 }
1401 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1402 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1403 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1404 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1405 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1406 continue;
1407 }
1408 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1409 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1410 } else {
1411 /*
1412 * don't spawn the same key more than once. We can do that as long as we don't yield
1413 */
1414 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1415 spawned_keys.insert(log_iter->entry.key);
1416 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1417 if (retcode < 0) {
1418 stop_spawned_services();
1419 drain_all();
1420 return set_cr_error(retcode);
1421 }
1422 }
1423 }
1424 while ((int)num_spawned() > spawn_window) {
1425 set_status() << "num_spawned() > spawn_window";
1426 yield wait_for_child();
1427 int ret;
1428 while (collect(&ret, lease_stack.get())) {
1429 if (ret < 0) {
1430 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
1431 /* we have reported this error */
1432 }
1433 /* not waiting for child here */
1434 }
1435 /* not waiting for child here */
1436 }
1437 }
1438 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker
1439 << " next_marker=" << next_marker << " truncated=" << truncated << dendl;
1440 if (!truncated) {
1441 yield wait(get_idle_interval());
1442 }
1443 if (!next_marker.empty()) {
1444 sync_marker.marker = next_marker;
1445 } else if (!log_entries.empty()) {
1446 sync_marker.marker = log_entries.back().log_id;
1447 }
1448 } while (true);
1449 }
1450 return 0;
1451 }
1452
1453 utime_t get_idle_interval() const {
1454 #define INCREMENTAL_INTERVAL 20
1455 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1456 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1457 auto now = ceph::coarse_real_clock::now();
1458 if (error_retry_time > now) {
1459 auto d = error_retry_time - now;
1460 if (interval > d) {
1461 interval = d;
1462 }
1463 }
1464 }
1465 // convert timespan -> time_point -> utime_t
1466 return utime_t(ceph::coarse_real_clock::zero() + interval);
1467 }
1468
1469 void stop_spawned_services() {
1470 lease_cr->go_down();
1471 if (error_repo) {
1472 error_repo->finish();
1473 error_repo->put();
1474 error_repo = NULL;
1475 }
1476 }
1477 };
1478
1479 class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1480 RGWDataSyncEnv *sync_env;
1481
1482 rgw_pool pool;
1483
1484 uint32_t shard_id;
1485 rgw_data_sync_marker sync_marker;
1486
1487 public:
1488 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1489 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1490 sync_env(_sync_env),
1491 pool(_pool),
1492 shard_id(_shard_id),
1493 sync_marker(_marker) {
1494 }
1495
1496 RGWCoroutine *alloc_cr() override {
1497 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1498 }
1499
1500 RGWCoroutine *alloc_finisher_cr() override {
1501 RGWRados *store = sync_env->store;
1502 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1503 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1504 &sync_marker);
1505 }
1506
1507 void append_modified_shards(set<string>& keys) {
1508 Mutex::Locker l(cr_lock());
1509
1510 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1511 if (!cr) {
1512 return;
1513 }
1514
1515 cr->append_modified_shards(keys);
1516 }
1517 };
1518
1519 class RGWDataSyncCR : public RGWCoroutine {
1520 RGWDataSyncEnv *sync_env;
1521 uint32_t num_shards;
1522
1523 rgw_data_sync_status sync_status;
1524
1525 RGWDataSyncShardMarkerTrack *marker_tracker;
1526
1527 Mutex shard_crs_lock;
1528 map<int, RGWDataSyncShardControlCR *> shard_crs;
1529
1530 bool *reset_backoff;
1531
1532 RGWDataSyncDebugLogger logger;
1533
1534 RGWDataSyncModule *data_sync_module{nullptr};
1535 public:
1536 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1537 sync_env(_sync_env),
1538 num_shards(_num_shards),
1539 marker_tracker(NULL),
1540 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1541 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
1542
1543 }
1544
1545 ~RGWDataSyncCR() override {
1546 for (auto iter : shard_crs) {
1547 iter.second->put();
1548 }
1549 }
1550
1551 int operate() override {
1552 reenter(this) {
1553
1554 /* read sync status */
1555 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1556
1557 data_sync_module = sync_env->sync_module->get_data_handler();
1558
1559 if (retcode < 0 && retcode != -ENOENT) {
1560 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1561 return set_cr_error(retcode);
1562 }
1563
1564 /* state: init status */
1565 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1566 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
1567 sync_status.sync_info.num_shards = num_shards;
1568 uint64_t instance_id;
1569 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1570 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
1571 if (retcode < 0) {
1572 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1573 return set_cr_error(retcode);
1574 }
1575 // sets state = StateBuildingFullSyncMaps
1576
1577 *reset_backoff = true;
1578 }
1579
1580 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1581
1582 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
1583 /* call sync module init here */
1584 sync_status.sync_info.num_shards = num_shards;
1585 yield call(data_sync_module->init_sync(sync_env));
1586 if (retcode < 0) {
1587 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1588 return set_cr_error(retcode);
1589 }
1590 /* state: building full sync maps */
1591 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1592 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1593 if (retcode < 0) {
1594 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1595 return set_cr_error(retcode);
1596 }
1597 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1598
1599 /* update new state */
1600 yield call(set_sync_info_cr());
1601 if (retcode < 0) {
1602 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1603 return set_cr_error(retcode);
1604 }
1605
1606 *reset_backoff = true;
1607 }
1608
1609 yield {
1610 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1611 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1612 iter != sync_status.sync_markers.end(); ++iter) {
1613 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1614 iter->first, iter->second);
1615 cr->get();
1616 shard_crs_lock.Lock();
1617 shard_crs[iter->first] = cr;
1618 shard_crs_lock.Unlock();
1619 spawn(cr, true);
1620 }
1621 }
1622 }
1623
1624 return set_cr_done();
1625 }
1626 return 0;
1627 }
1628
1629 RGWCoroutine *set_sync_info_cr() {
1630 RGWRados *store = sync_env->store;
1631 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1632 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1633 sync_status.sync_info);
1634 }
1635
1636 void wakeup(int shard_id, set<string>& keys) {
1637 Mutex::Locker l(shard_crs_lock);
1638 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1639 if (iter == shard_crs.end()) {
1640 return;
1641 }
1642 iter->second->append_modified_shards(keys);
1643 iter->second->wakeup();
1644 }
1645 };
1646
1647 class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1648 public:
1649 RGWDefaultDataSyncModule() {}
1650
1651 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
1652 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1653 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1654 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
1655 };
1656
1657 class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1658 RGWDefaultDataSyncModule data_handler;
1659 public:
1660 RGWDefaultSyncModuleInstance() {}
1661 RGWDataSyncModule *get_data_handler() override {
1662 return &data_handler;
1663 }
1664 };
1665
1666 int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
1667 {
1668 instance->reset(new RGWDefaultSyncModuleInstance());
1669 return 0;
1670 }
1671
1672 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
1673 {
1674 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1675 key, versioned_epoch,
1676 true, zones_trace);
1677 }
1678
1679 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
1680 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1681 {
1682 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1683 bucket_info, key, versioned, versioned_epoch,
1684 NULL, NULL, false, &mtime, zones_trace);
1685 }
1686
1687 RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
1688 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
1689 {
1690 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1691 bucket_info, key, versioned, versioned_epoch,
1692 &owner.id, &owner.display_name, true, &mtime, zones_trace);
1693 }
1694
1695 class RGWDataSyncControlCR : public RGWBackoffControlCR
1696 {
1697 RGWDataSyncEnv *sync_env;
1698 uint32_t num_shards;
1699
1700 static constexpr bool exit_on_error = false; // retry on all errors
1701 public:
1702 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
1703 sync_env(_sync_env), num_shards(_num_shards) {
1704 }
1705
1706 RGWCoroutine *alloc_cr() override {
1707 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1708 }
1709
1710 void wakeup(int shard_id, set<string>& keys) {
1711 Mutex& m = cr_lock();
1712
1713 m.Lock();
1714 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1715 if (!cr) {
1716 m.Unlock();
1717 return;
1718 }
1719
1720 cr->get();
1721 m.Unlock();
1722
1723 if (cr) {
1724 cr->wakeup(shard_id, keys);
1725 }
1726
1727 cr->put();
1728 }
1729 };
1730
1731 void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1732 RWLock::RLocker rl(lock);
1733 if (!data_sync_cr) {
1734 return;
1735 }
1736 data_sync_cr->wakeup(shard_id, keys);
1737 }
1738
1739 int RGWRemoteDataLog::run_sync(int num_shards)
1740 {
1741 lock.get_write();
1742 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1743 data_sync_cr->get(); // run() will drop a ref, so take another
1744 lock.unlock();
1745
1746 int r = run(data_sync_cr);
1747
1748 lock.get_write();
1749 data_sync_cr->put();
1750 data_sync_cr = NULL;
1751 lock.unlock();
1752
1753 if (r < 0) {
1754 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1755 return r;
1756 }
1757 return 0;
1758 }
1759
1760 int RGWDataSyncStatusManager::init()
1761 {
1762 auto zone_def_iter = store->zone_by_id.find(source_zone);
1763 if (zone_def_iter == store->zone_by_id.end()) {
1764 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1765 return -EIO;
1766 }
1767
1768 auto& zone_def = zone_def_iter->second;
1769
1770 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1771 return -ENOTSUP;
1772 }
1773
1774 RGWZoneParams& zone_params = store->get_zone_params();
1775
1776 if (sync_module == nullptr) {
1777 sync_module = store->get_sync_module();
1778 }
1779
1780 conn = store->get_zone_conn_by_id(source_zone);
1781 if (!conn) {
1782 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1783 return -EINVAL;
1784 }
1785
1786 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1787
1788 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1789 if (r < 0) {
1790 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1791 finalize();
1792 return r;
1793 }
1794
1795 rgw_datalog_info datalog_info;
1796 r = source_log.read_log_info(&datalog_info);
1797 if (r < 0) {
1798 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1799 finalize();
1800 return r;
1801 }
1802
1803 num_shards = datalog_info.num_shards;
1804
1805 for (int i = 0; i < num_shards; i++) {
1806 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1807 }
1808
1809 return 0;
1810 }
1811
1812 void RGWDataSyncStatusManager::finalize()
1813 {
1814 delete error_logger;
1815 error_logger = nullptr;
1816 }
1817
1818 string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1819 {
1820 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1821 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1822
1823 return string(buf);
1824 }
1825
1826 string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1827 {
1828 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1829 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1830
1831 return string(buf);
1832 }
1833
1834 int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1835 const rgw_bucket& bucket, int shard_id,
1836 RGWSyncErrorLogger *_error_logger,
1837 RGWSyncModuleInstanceRef& _sync_module)
1838 {
1839 conn = _conn;
1840 source_zone = _source_zone;
1841 bs.bucket = bucket;
1842 bs.shard_id = shard_id;
1843
1844 sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
1845 _error_logger, source_zone, _sync_module);
1846
1847 return 0;
1848 }
1849
1850 class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1851 RGWDataSyncEnv *sync_env;
1852 const string instance_key;
1853
1854 rgw_bucket_index_marker_info *info;
1855
1856 public:
1857 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1858 const rgw_bucket_shard& bs,
1859 rgw_bucket_index_marker_info *_info)
1860 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1861 instance_key(bs.get_key()), info(_info) {}
1862
1863 int operate() override {
1864 reenter(this) {
1865 yield {
1866 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1867 { "bucket-instance", instance_key.c_str() },
1868 { "info" , NULL },
1869 { NULL, NULL } };
1870
1871 string p = "/admin/log/";
1872 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
1873 }
1874 if (retcode < 0) {
1875 return set_cr_error(retcode);
1876 }
1877 return set_cr_done();
1878 }
1879 return 0;
1880 }
1881 };
1882
1883 class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1884 RGWDataSyncEnv *sync_env;
1885
1886 rgw_bucket_shard bs;
1887 const string sync_status_oid;
1888
1889 rgw_bucket_shard_sync_info& status;
1890
1891 rgw_bucket_index_marker_info info;
1892 public:
1893 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1894 const rgw_bucket_shard& bs,
1895 rgw_bucket_shard_sync_info& _status)
1896 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1897 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1898 status(_status)
1899 {}
1900
1901 int operate() override {
1902 reenter(this) {
1903 /* fetch current position in logs */
1904 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1905 if (retcode < 0 && retcode != -ENOENT) {
1906 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1907 return set_cr_error(retcode);
1908 }
1909 yield {
1910 auto store = sync_env->store;
1911 rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1912
1913 if (info.syncstopped) {
1914 call(new RGWRadosRemoveCR(store, obj));
1915 } else {
1916 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1917 status.inc_marker.position = info.max_marker;
1918 map<string, bufferlist> attrs;
1919 status.encode_all_attrs(attrs);
1920 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1921 }
1922 }
1923 if (info.syncstopped) {
1924 retcode = -ENOENT;
1925 }
1926 if (retcode < 0) {
1927 return set_cr_error(retcode);
1928 }
1929 return set_cr_done();
1930 }
1931 return 0;
1932 }
1933 };
1934
1935 RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1936 {
1937 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1938 }
1939
1940 template <class T>
1941 static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1942 {
1943 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1944 if (iter == attrs.end()) {
1945 *val = T();
1946 return;
1947 }
1948
1949 bufferlist::iterator biter = iter->second.begin();
1950 try {
1951 ::decode(*val, biter);
1952 } catch (buffer::error& err) {
1953 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1954 }
1955 }
1956
1957 void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1958 {
1959 decode_attr(cct, attrs, "state", &state);
1960 decode_attr(cct, attrs, "full_marker", &full_marker);
1961 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1962 }
1963
1964 void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1965 {
1966 encode_state_attr(attrs);
1967 full_marker.encode_attr(attrs);
1968 inc_marker.encode_attr(attrs);
1969 }
1970
1971 void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1972 {
1973 ::encode(state, attrs["state"]);
1974 }
1975
1976 void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1977 {
1978 ::encode(*this, attrs["full_marker"]);
1979 }
1980
1981 void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1982 {
1983 ::encode(*this, attrs["inc_marker"]);
1984 }
1985
1986 class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1987 RGWDataSyncEnv *sync_env;
1988 string oid;
1989 rgw_bucket_shard_sync_info *status;
1990
1991 map<string, bufferlist> attrs;
1992 public:
1993 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1994 const rgw_bucket_shard& bs,
1995 rgw_bucket_shard_sync_info *_status)
1996 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1997 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1998 status(_status) {}
1999 int operate() override;
2000 };
2001
2002 int RGWReadBucketSyncStatusCoroutine::operate()
2003 {
2004 reenter(this) {
2005 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
2006 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
2007 &attrs));
2008 if (retcode == -ENOENT) {
2009 *status = rgw_bucket_shard_sync_info();
2010 return set_cr_done();
2011 }
2012 if (retcode < 0) {
2013 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2014 return set_cr_error(retcode);
2015 }
2016 status->decode_from_attrs(sync_env->cct, attrs);
2017 return set_cr_done();
2018 }
2019 return 0;
2020 }
2021
2022 #define OMAP_READ_MAX_ENTRIES 10
2023 class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2024 RGWDataSyncEnv *sync_env;
2025 RGWRados *store;
2026
2027 const int shard_id;
2028 int max_entries;
2029
2030 set<string>& recovering_buckets;
2031 string marker;
2032 string error_oid;
2033
2034 set<string> error_entries;
2035 int max_omap_entries;
2036 int count;
2037
2038 public:
2039 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2040 set<string>& _recovering_buckets, const int _max_entries)
2041 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2042 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2043 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2044 {
2045 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2046 }
2047
2048 int operate() override;
2049 };
2050
2051 int RGWReadRecoveringBucketShardsCoroutine::operate()
2052 {
2053 reenter(this){
2054 //read recovering bucket shards
2055 count = 0;
2056 do {
2057 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
2058 marker, &error_entries, max_omap_entries));
2059
2060 if (retcode == -ENOENT) {
2061 break;
2062 }
2063
2064 if (retcode < 0) {
2065 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2066 << cpp_strerror(retcode) << dendl;
2067 return set_cr_error(retcode);
2068 }
2069
2070 if (error_entries.empty()) {
2071 break;
2072 }
2073
2074 count += error_entries.size();
2075 marker = *error_entries.rbegin();
2076 recovering_buckets.insert(error_entries.begin(), error_entries.end());
2077 }while((int)error_entries.size() == max_omap_entries && count < max_entries);
2078
2079 return set_cr_done();
2080 }
2081
2082 return 0;
2083 }
2084
2085 class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2086 RGWDataSyncEnv *sync_env;
2087 RGWRados *store;
2088
2089 const int shard_id;
2090 int max_entries;
2091
2092 set<string>& pending_buckets;
2093 string marker;
2094 string status_oid;
2095
2096 rgw_data_sync_marker* sync_marker;
2097 int count;
2098
2099 std::string next_marker;
2100 list<rgw_data_change_log_entry> log_entries;
2101 bool truncated;
2102
2103 public:
2104 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2105 set<string>& _pending_buckets,
2106 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2107 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2108 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2109 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2110 {
2111 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2112 }
2113
2114 int operate() override;
2115 };
2116
2117 int RGWReadPendingBucketShardsCoroutine::operate()
2118 {
2119 reenter(this){
2120 //read sync status marker
2121 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
2122 yield call(new CR(sync_env->async_rados, store,
2123 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2124 sync_marker));
2125 if (retcode < 0) {
2126 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2127 << cpp_strerror(retcode) << dendl;
2128 return set_cr_error(retcode);
2129 }
2130
2131 //read pending bucket shards
2132 marker = sync_marker->marker;
2133 count = 0;
2134 do{
2135 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, marker,
2136 &next_marker, &log_entries, &truncated));
2137
2138 if (retcode == -ENOENT) {
2139 break;
2140 }
2141
2142 if (retcode < 0) {
2143 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2144 << cpp_strerror(retcode) << dendl;
2145 return set_cr_error(retcode);
2146 }
2147
2148 if (log_entries.empty()) {
2149 break;
2150 }
2151
2152 count += log_entries.size();
2153 for (const auto& entry : log_entries) {
2154 pending_buckets.insert(entry.entry.key);
2155 }
2156 }while(truncated && count < max_entries);
2157
2158 return set_cr_done();
2159 }
2160
2161 return 0;
2162 }
2163
2164 int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2165 {
2166 // cannot run concurrently with run_sync(), so run in a separate manager
2167 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2168 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2169 int ret = http_manager.set_threaded();
2170 if (ret < 0) {
2171 ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
2172 return ret;
2173 }
2174 RGWDataSyncEnv sync_env_local = sync_env;
2175 sync_env_local.http_manager = &http_manager;
2176 list<RGWCoroutinesStack *> stacks;
2177 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2178 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2179 stacks.push_back(recovering_stack);
2180 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2181 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2182 stacks.push_back(pending_stack);
2183 ret = crs.run(stacks);
2184 http_manager.stop();
2185 return ret;
2186 }
2187
2188 RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2189 {
2190 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2191 }
2192
2193 RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2194 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2195 delete iter->second;
2196 }
2197 delete error_logger;
2198 }
2199
2200
2201 void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2202 {
2203 JSONDecoder::decode_json("ID", id, obj);
2204 JSONDecoder::decode_json("DisplayName", display_name, obj);
2205 }
2206
2207 struct bucket_list_entry {
2208 bool delete_marker;
2209 rgw_obj_key key;
2210 bool is_latest;
2211 real_time mtime;
2212 string etag;
2213 uint64_t size;
2214 string storage_class;
2215 rgw_bucket_entry_owner owner;
2216 uint64_t versioned_epoch;
2217 string rgw_tag;
2218
2219 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2220
2221 void decode_json(JSONObj *obj) {
2222 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2223 JSONDecoder::decode_json("Key", key.name, obj);
2224 JSONDecoder::decode_json("VersionId", key.instance, obj);
2225 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2226 string mtime_str;
2227 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2228
2229 struct tm t;
2230 uint32_t nsec;
2231 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2232 ceph_timespec ts;
2233 ts.tv_sec = (uint64_t)internal_timegm(&t);
2234 ts.tv_nsec = nsec;
2235 mtime = real_clock::from_ceph_timespec(ts);
2236 }
2237 JSONDecoder::decode_json("ETag", etag, obj);
2238 JSONDecoder::decode_json("Size", size, obj);
2239 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2240 JSONDecoder::decode_json("Owner", owner, obj);
2241 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2242 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
2243 }
2244
2245 RGWModifyOp get_modify_op() const {
2246 if (delete_marker) {
2247 return CLS_RGW_OP_LINK_OLH_DM;
2248 } else if (!key.instance.empty() && key.instance != "null") {
2249 return CLS_RGW_OP_LINK_OLH;
2250 } else {
2251 return CLS_RGW_OP_ADD;
2252 }
2253 }
2254 };
2255
2256 struct bucket_list_result {
2257 string name;
2258 string prefix;
2259 string key_marker;
2260 string version_id_marker;
2261 int max_keys;
2262 bool is_truncated;
2263 list<bucket_list_entry> entries;
2264
2265 bucket_list_result() : max_keys(0), is_truncated(false) {}
2266
2267 void decode_json(JSONObj *obj) {
2268 JSONDecoder::decode_json("Name", name, obj);
2269 JSONDecoder::decode_json("Prefix", prefix, obj);
2270 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2271 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2272 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2273 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2274 JSONDecoder::decode_json("Entries", entries, obj);
2275 }
2276 };
2277
2278 class RGWListBucketShardCR: public RGWCoroutine {
2279 RGWDataSyncEnv *sync_env;
2280 const rgw_bucket_shard& bs;
2281 const string instance_key;
2282 rgw_obj_key marker_position;
2283
2284 bucket_list_result *result;
2285
2286 public:
2287 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2288 rgw_obj_key& _marker_position, bucket_list_result *_result)
2289 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2290 instance_key(bs.get_key()), marker_position(_marker_position),
2291 result(_result) {}
2292
2293 int operate() override {
2294 reenter(this) {
2295 yield {
2296 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2297 { "versions" , NULL },
2298 { "format" , "json" },
2299 { "objs-container" , "true" },
2300 { "key-marker" , marker_position.name.c_str() },
2301 { "version-id-marker" , marker_position.instance.c_str() },
2302 { NULL, NULL } };
2303 // don't include tenant in the url, it's already part of instance_key
2304 string p = string("/") + bs.bucket.name;
2305 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2306 }
2307 if (retcode < 0) {
2308 return set_cr_error(retcode);
2309 }
2310 return set_cr_done();
2311 }
2312 return 0;
2313 }
2314 };
2315
2316 class RGWListBucketIndexLogCR: public RGWCoroutine {
2317 RGWDataSyncEnv *sync_env;
2318 const string instance_key;
2319 string marker;
2320
2321 list<rgw_bi_log_entry> *result;
2322
2323 public:
2324 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2325 string& _marker, list<rgw_bi_log_entry> *_result)
2326 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2327 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2328
2329 int operate() override {
2330 reenter(this) {
2331 yield {
2332 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2333 { "format" , "json" },
2334 { "marker" , marker.c_str() },
2335 { "type", "bucket-index" },
2336 { NULL, NULL } };
2337
2338 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2339 }
2340 if (retcode < 0) {
2341 return set_cr_error(retcode);
2342 }
2343 return set_cr_done();
2344 }
2345 return 0;
2346 }
2347 };
2348
2349 #define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2350
2351 class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2352 RGWDataSyncEnv *sync_env;
2353
2354 string marker_oid;
2355 rgw_bucket_shard_full_sync_marker sync_marker;
2356
2357 public:
2358 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2359 const string& _marker_oid,
2360 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2361 sync_env(_sync_env),
2362 marker_oid(_marker_oid),
2363 sync_marker(_marker) {}
2364
2365 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2366 sync_marker.position = new_marker;
2367 sync_marker.count = index_pos;
2368
2369 map<string, bufferlist> attrs;
2370 sync_marker.encode_attr(attrs);
2371
2372 RGWRados *store = sync_env->store;
2373
2374 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2375 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2376 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2377 attrs);
2378 }
2379
2380 RGWOrderCallCR *allocate_order_control_cr() {
2381 return new RGWLastCallerWinsCR(sync_env->cct);
2382 }
2383 };
2384
2385 class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2386 RGWDataSyncEnv *sync_env;
2387
2388 string marker_oid;
2389 rgw_bucket_shard_inc_sync_marker sync_marker;
2390
2391 map<rgw_obj_key, string> key_to_marker;
2392
2393 struct operation {
2394 rgw_obj_key key;
2395 bool is_olh;
2396 };
2397 map<string, operation> marker_to_op;
2398 std::set<std::string> pending_olh; // object names with pending olh operations
2399
2400 void handle_finish(const string& marker) override {
2401 auto iter = marker_to_op.find(marker);
2402 if (iter == marker_to_op.end()) {
2403 return;
2404 }
2405 auto& op = iter->second;
2406 key_to_marker.erase(op.key);
2407 reset_need_retry(op.key);
2408 if (op.is_olh) {
2409 pending_olh.erase(op.key.name);
2410 }
2411 marker_to_op.erase(iter);
2412 }
2413
2414 public:
2415 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2416 const string& _marker_oid,
2417 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2418 sync_env(_sync_env),
2419 marker_oid(_marker_oid),
2420 sync_marker(_marker) {}
2421
2422 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2423 sync_marker.position = new_marker;
2424
2425 map<string, bufferlist> attrs;
2426 sync_marker.encode_attr(attrs);
2427
2428 RGWRados *store = sync_env->store;
2429
2430 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2431 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2432 store,
2433 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2434 attrs);
2435 }
2436
2437 /*
2438 * create index from key -> <op, marker>, and from marker -> key
2439 * this is useful so that we can insure that we only have one
2440 * entry for any key that is used. This is needed when doing
2441 * incremenatl sync of data, and we don't want to run multiple
2442 * concurrent sync operations for the same bucket shard
2443 * Also, we should make sure that we don't run concurrent operations on the same key with
2444 * different ops.
2445 */
2446 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2447 auto result = key_to_marker.emplace(key, marker);
2448 if (!result.second) { // exists
2449 set_need_retry(key);
2450 return false;
2451 }
2452 marker_to_op[marker] = operation{key, is_olh};
2453 if (is_olh) {
2454 // prevent other olh ops from starting on this object name
2455 pending_olh.insert(key.name);
2456 }
2457 return true;
2458 }
2459
2460 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2461 // serialize olh ops on the same object name
2462 if (is_olh && pending_olh.count(key.name)) {
2463 ldout(sync_env->cct, 20) << __func__ << "(): sync of " << key << " waiting for pending olh op" << dendl;
2464 return false;
2465 }
2466 return (key_to_marker.find(key) == key_to_marker.end());
2467 }
2468
2469 RGWOrderCallCR *allocate_order_control_cr() {
2470 return new RGWLastCallerWinsCR(sync_env->cct);
2471 }
2472 };
2473
2474 template <class T, class K>
2475 class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2476 RGWDataSyncEnv *sync_env;
2477
2478 RGWBucketInfo *bucket_info;
2479 const rgw_bucket_shard& bs;
2480
2481 rgw_obj_key key;
2482 bool versioned;
2483 boost::optional<uint64_t> versioned_epoch;
2484 rgw_bucket_entry_owner owner;
2485 real_time timestamp;
2486 RGWModifyOp op;
2487 RGWPendingState op_state;
2488
2489 T entry_marker;
2490 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2491
2492 int sync_status;
2493
2494 stringstream error_ss;
2495
2496 RGWDataSyncDebugLogger logger;
2497
2498 bool error_injection;
2499
2500 RGWDataSyncModule *data_sync_module;
2501
2502 rgw_zone_set zones_trace;
2503
2504 public:
2505 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2506 RGWBucketInfo *_bucket_info,
2507 const rgw_bucket_shard& bs,
2508 const rgw_obj_key& _key, bool _versioned,
2509 boost::optional<uint64_t> _versioned_epoch,
2510 real_time& _timestamp,
2511 const rgw_bucket_entry_owner& _owner,
2512 RGWModifyOp _op, RGWPendingState _op_state,
2513 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
2514 sync_env(_sync_env),
2515 bucket_info(_bucket_info), bs(bs),
2516 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2517 owner(_owner),
2518 timestamp(_timestamp), op(_op),
2519 op_state(_op_state),
2520 entry_marker(_entry_marker),
2521 marker_tracker(_marker_tracker),
2522 sync_status(0){
2523 stringstream ss;
2524 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
2525 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2526 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2527 set_status("init");
2528
2529 logger.init(sync_env, "Object", ss.str());
2530
2531 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2532
2533 data_sync_module = sync_env->sync_module->get_data_handler();
2534
2535 zones_trace = _zones_trace;
2536 zones_trace.insert(sync_env->store->get_zone().id);
2537 }
2538
2539 int operate() override {
2540 reenter(this) {
2541 /* skip entries that are not complete */
2542 if (op_state != CLS_RGW_STATE_COMPLETE) {
2543 goto done;
2544 }
2545 do {
2546 yield {
2547 marker_tracker->reset_need_retry(key);
2548 if (key.name.empty()) {
2549 /* shouldn't happen */
2550 set_status("skipping empty entry");
2551 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2552 goto done;
2553 }
2554 if (error_injection &&
2555 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2556 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2557 retcode = -EIO;
2558 } else if (op == CLS_RGW_OP_ADD ||
2559 op == CLS_RGW_OP_LINK_OLH) {
2560 set_status("syncing obj");
2561 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
2562 logger.log("fetch");
2563 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
2564 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2565 set_status("removing obj");
2566 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2567 versioned = true;
2568 }
2569 logger.log("remove");
2570 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
2571 // our copy of the object is more recent, continue as if it succeeded
2572 if (retcode == -ERR_PRECONDITION_FAILED) {
2573 retcode = 0;
2574 }
2575 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2576 logger.log("creating delete marker");
2577 set_status("creating delete marker");
2578 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
2579 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
2580 }
2581 }
2582 } while (marker_tracker->need_retry(key));
2583 {
2584 stringstream ss;
2585 if (retcode >= 0) {
2586 ss << "done";
2587 } else {
2588 ss << "done, retcode=" << retcode;
2589 }
2590 logger.log(ss.str());
2591 }
2592
2593 if (retcode < 0 && retcode != -ENOENT) {
2594 set_status() << "failed to sync obj; retcode=" << retcode;
2595 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2596 << bucket_shard_str{bs} << "/" << key.name << dendl;
2597 error_ss << bucket_shard_str{bs} << "/" << key.name;
2598 sync_status = retcode;
2599 }
2600 if (!error_ss.str().empty()) {
2601 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
2602 }
2603 done:
2604 if (sync_status == 0) {
2605 /* update marker */
2606 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2607 yield call(marker_tracker->finish(entry_marker));
2608 sync_status = retcode;
2609 }
2610 if (sync_status < 0) {
2611 return set_cr_error(sync_status);
2612 }
2613 return set_cr_done();
2614 }
2615 return 0;
2616 }
2617 };
2618
2619 #define BUCKET_SYNC_SPAWN_WINDOW 20
2620
2621 class RGWBucketShardFullSyncCR : public RGWCoroutine {
2622 RGWDataSyncEnv *sync_env;
2623 const rgw_bucket_shard& bs;
2624 RGWBucketInfo *bucket_info;
2625 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2626 bucket_list_result list_result;
2627 list<bucket_list_entry>::iterator entries_iter;
2628 rgw_bucket_shard_sync_info& sync_info;
2629 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2630 rgw_obj_key list_marker;
2631 bucket_list_entry *entry{nullptr};
2632
2633 int total_entries{0};
2634
2635 int sync_status{0};
2636
2637 const string& status_oid;
2638
2639 RGWDataSyncDebugLogger logger;
2640 rgw_zone_set zones_trace;
2641 public:
2642 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2643 RGWBucketInfo *_bucket_info,
2644 const std::string& status_oid,
2645 RGWContinuousLeaseCR *lease_cr,
2646 rgw_bucket_shard_sync_info& sync_info)
2647 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2648 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2649 marker_tracker(sync_env, status_oid, sync_info.full_marker),
2650 status_oid(status_oid) {
2651 logger.init(sync_env, "BucketFull", bs.get_key());
2652 zones_trace.insert(sync_env->source_zone);
2653 }
2654
2655 int operate() override;
2656 };
2657
2658 int RGWBucketShardFullSyncCR::operate()
2659 {
2660 int ret;
2661 reenter(this) {
2662 list_marker = sync_info.full_marker.position;
2663
2664 total_entries = sync_info.full_marker.count;
2665 do {
2666 if (!lease_cr->is_locked()) {
2667 drain_all();
2668 return set_cr_error(-ECANCELED);
2669 }
2670 set_status("listing remote bucket");
2671 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2672 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2673 &list_result));
2674 if (retcode < 0 && retcode != -ENOENT) {
2675 set_status("failed bucket listing, going down");
2676 drain_all();
2677 return set_cr_error(retcode);
2678 }
2679 entries_iter = list_result.entries.begin();
2680 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2681 if (!lease_cr->is_locked()) {
2682 drain_all();
2683 return set_cr_error(-ECANCELED);
2684 }
2685 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2686 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2687 entry = &(*entries_iter);
2688 total_entries++;
2689 list_marker = entries_iter->key;
2690 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2691 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2692 } else {
2693 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2694 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2695 false, /* versioned, only matters for object removal */
2696 entry->versioned_epoch, entry->mtime,
2697 entry->owner, entry->get_modify_op(), CLS_RGW_STATE_COMPLETE,
2698 entry->key, &marker_tracker, zones_trace),
2699 false);
2700 }
2701 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2702 yield wait_for_child();
2703 bool again = true;
2704 while (again) {
2705 again = collect(&ret, nullptr);
2706 if (ret < 0) {
2707 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
2708 sync_status = ret;
2709 /* we have reported this error */
2710 }
2711 }
2712 }
2713 }
2714 } while (list_result.is_truncated && sync_status == 0);
2715 set_status("done iterating over all objects");
2716 /* wait for all operations to complete */
2717 while (num_spawned()) {
2718 yield wait_for_child();
2719 bool again = true;
2720 while (again) {
2721 again = collect(&ret, nullptr);
2722 if (ret < 0) {
2723 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
2724 sync_status = ret;
2725 /* we have reported this error */
2726 }
2727 }
2728 }
2729 if (!lease_cr->is_locked()) {
2730 return set_cr_error(-ECANCELED);
2731 }
2732 /* update sync state to incremental */
2733 if (sync_status == 0) {
2734 yield {
2735 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
2736 map<string, bufferlist> attrs;
2737 sync_info.encode_state_attr(attrs);
2738 RGWRados *store = sync_env->store;
2739 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2740 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2741 attrs));
2742 }
2743 } else {
2744 ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
2745 }
2746 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2747 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2748 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2749 return set_cr_error(retcode);
2750 }
2751 if (sync_status < 0) {
2752 return set_cr_error(sync_status);
2753 }
2754 return set_cr_done();
2755 }
2756 return 0;
2757 }
2758
2759 static bool has_olh_epoch(RGWModifyOp op) {
2760 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2761 }
2762
2763 class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2764 RGWDataSyncEnv *sync_env;
2765 const rgw_bucket_shard& bs;
2766 RGWBucketInfo *bucket_info;
2767 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2768 list<rgw_bi_log_entry> list_result;
2769 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
2770 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
2771 rgw_bucket_shard_sync_info& sync_info;
2772 rgw_obj_key key;
2773 rgw_bi_log_entry *entry{nullptr};
2774 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2775 bool updated_status{false};
2776 const string& status_oid;
2777 const string& zone_id;
2778
2779 string cur_id;
2780
2781 RGWDataSyncDebugLogger logger;
2782
2783 int sync_status{0};
2784 bool syncstopped{false};
2785
2786 public:
2787 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2788 const rgw_bucket_shard& bs,
2789 RGWBucketInfo *_bucket_info,
2790 const std::string& status_oid,
2791 RGWContinuousLeaseCR *lease_cr,
2792 rgw_bucket_shard_sync_info& sync_info)
2793 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2794 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2795 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
2796 status_oid(status_oid), zone_id(_sync_env->store->get_zone().id)
2797 {
2798 set_description() << "bucket shard incremental sync bucket="
2799 << bucket_shard_str{bs};
2800 set_status("init");
2801 logger.init(sync_env, "BucketInc", bs.get_key());
2802 }
2803
2804 int operate() override;
2805 };
2806
2807 int RGWBucketShardIncrementalSyncCR::operate()
2808 {
2809 int ret;
2810 reenter(this) {
2811 do {
2812 if (!lease_cr->is_locked()) {
2813 drain_all();
2814 return set_cr_error(-ECANCELED);
2815 }
2816 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << sync_info.inc_marker.position << dendl;
2817 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
2818 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
2819 &list_result));
2820 if (retcode < 0 && retcode != -ENOENT) {
2821 /* wait for all operations to complete */
2822 drain_all();
2823 return set_cr_error(retcode);
2824 }
2825 squash_map.clear();
2826 entries_iter = list_result.begin();
2827 entries_end = list_result.end();
2828 for (; entries_iter != entries_end; ++entries_iter) {
2829 auto e = *entries_iter;
2830 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
2831 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
2832 syncstopped = true;
2833 entries_end = entries_iter; // dont sync past here
2834 break;
2835 }
2836 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2837 continue;
2838 }
2839 if (e.op == CLS_RGW_OP_CANCEL) {
2840 continue;
2841 }
2842 if (e.state != CLS_RGW_STATE_COMPLETE) {
2843 continue;
2844 }
2845 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2846 continue;
2847 }
2848 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
2849 // don't squash over olh entries - we need to apply their olh_epoch
2850 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
2851 continue;
2852 }
2853 if (squash_entry.first <= e.timestamp) {
2854 squash_entry = make_pair<>(e.timestamp, e.op);
2855 }
2856 }
2857
2858 entries_iter = list_result.begin();
2859 for (; entries_iter != entries_end; ++entries_iter) {
2860 if (!lease_cr->is_locked()) {
2861 drain_all();
2862 return set_cr_error(-ECANCELED);
2863 }
2864 entry = &(*entries_iter);
2865 {
2866 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2867 if (p < 0) {
2868 cur_id = entry->id;
2869 } else {
2870 cur_id = entry->id.substr(p + 1);
2871 }
2872 }
2873 sync_info.inc_marker.position = cur_id;
2874
2875 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
2876 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
2877 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2878 continue;
2879 }
2880
2881 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2882 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2883 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2884 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2885 continue;
2886 }
2887
2888 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2889
2890 if (!key.ns.empty()) {
2891 set_status() << "skipping entry in namespace: " << entry->object;
2892 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2893 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2894 continue;
2895 }
2896
2897 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2898 if (entry->op == CLS_RGW_OP_CANCEL) {
2899 set_status() << "canceled operation, skipping";
2900 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2901 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2902 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2903 continue;
2904 }
2905 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2906 set_status() << "non-complete operation, skipping";
2907 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2908 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2909 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2910 continue;
2911 }
2912 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2913 set_status() << "redundant operation, skipping";
2914 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2915 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2916 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2917 continue;
2918 }
2919 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2920 set_status() << "squashed operation, skipping";
2921 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2922 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
2923 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2924 continue;
2925 }
2926 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2927 << bucket_shard_str{bs} << "/" << key << dendl;
2928 updated_status = false;
2929 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
2930 if (!updated_status) {
2931 set_status() << "can't do op, conflicting inflight operation";
2932 updated_status = true;
2933 }
2934 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2935 yield wait_for_child();
2936 bool again = true;
2937 while (again) {
2938 again = collect(&ret, nullptr);
2939 if (ret < 0) {
2940 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2941 sync_status = ret;
2942 /* we have reported this error */
2943 }
2944 }
2945 if (sync_status != 0)
2946 break;
2947 }
2948 if (sync_status != 0) {
2949 /* get error, stop */
2950 break;
2951 }
2952 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
2953 set_status() << "can't do op, sync already in progress for object";
2954 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2955 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2956 continue;
2957 }
2958 // yield {
2959 set_status() << "start object sync";
2960 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2961 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2962 } else {
2963 boost::optional<uint64_t> versioned_epoch;
2964 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2965 if (entry->ver.pool < 0) {
2966 versioned_epoch = entry->ver.epoch;
2967 }
2968 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2969 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2970 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2971 entry->is_versioned(), versioned_epoch,
2972 entry->timestamp, owner, entry->op, entry->state,
2973 cur_id, &marker_tracker, entry->zones_trace),
2974 false);
2975 }
2976 // }
2977 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2978 set_status() << "num_spawned() > spawn_window";
2979 yield wait_for_child();
2980 bool again = true;
2981 while (again) {
2982 again = collect(&ret, nullptr);
2983 if (ret < 0) {
2984 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
2985 sync_status = ret;
2986 /* we have reported this error */
2987 }
2988 /* not waiting for child here */
2989 }
2990 }
2991 }
2992 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
2993
2994 while (num_spawned()) {
2995 yield wait_for_child();
2996 bool again = true;
2997 while (again) {
2998 again = collect(&ret, nullptr);
2999 if (ret < 0) {
3000 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
3001 sync_status = ret;
3002 /* we have reported this error */
3003 }
3004 /* not waiting for child here */
3005 }
3006 }
3007
3008 if (syncstopped) {
3009 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
3010 // still disabled, we'll delete the sync status object. otherwise we'll
3011 // restart full sync to catch any changes that happened while sync was
3012 // disabled
3013 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
3014 return set_cr_done();
3015 }
3016
3017 yield call(marker_tracker.flush());
3018 if (retcode < 0) {
3019 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
3020 return set_cr_error(retcode);
3021 }
3022 if (sync_status < 0) {
3023 ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
3024 return set_cr_error(sync_status);
3025 }
3026 return set_cr_done();
3027 }
3028 return 0;
3029 }
3030
3031 int RGWRunBucketSyncCoroutine::operate()
3032 {
3033 reenter(this) {
3034 yield {
3035 set_status("acquiring sync lock");
3036 auto store = sync_env->store;
3037 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
3038 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
3039 "sync_lock",
3040 cct->_conf->rgw_sync_lease_period,
3041 this));
3042 lease_stack.reset(spawn(lease_cr.get(), false));
3043 }
3044 while (!lease_cr->is_locked()) {
3045 if (lease_cr->is_done()) {
3046 ldout(cct, 5) << "lease cr failed, done early" << dendl;
3047 set_status("lease lock failed, early abort");
3048 drain_all();
3049 return set_cr_error(lease_cr->get_ret_status());
3050 }
3051 set_sleeping(true);
3052 yield;
3053 }
3054
3055 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3056 if (retcode < 0 && retcode != -ENOENT) {
3057 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
3058 << bucket_shard_str{bs} << dendl;
3059 lease_cr->go_down();
3060 drain_all();
3061 return set_cr_error(retcode);
3062 }
3063
3064 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
3065 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
3066
3067 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3068 if (retcode == -ENOENT) {
3069 /* bucket instance info has not been synced in yet, fetch it now */
3070 yield {
3071 ldout(sync_env->cct, 10) << "no local info for bucket "
3072 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
3073 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3074
3075 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
3076
3077 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3078 string() /* no marker */,
3079 MDLOG_STATUS_COMPLETE,
3080 NULL /* no marker tracker */));
3081 }
3082 if (retcode < 0) {
3083 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
3084 lease_cr->go_down();
3085 drain_all();
3086 return set_cr_error(retcode);
3087 }
3088
3089 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3090 }
3091 if (retcode < 0) {
3092 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
3093 lease_cr->go_down();
3094 drain_all();
3095 return set_cr_error(retcode);
3096 }
3097
3098 do {
3099 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3100 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
3101 if (retcode == -ENOENT) {
3102 ldout(sync_env->cct, 0) << "bucket sync disabled" << dendl;
3103 lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
3104 lease_cr->wakeup();
3105 lease_cr.reset();
3106 drain_all();
3107 return set_cr_done();
3108 }
3109 if (retcode < 0) {
3110 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
3111 << " failed, retcode=" << retcode << dendl;
3112 lease_cr->go_down();
3113 drain_all();
3114 return set_cr_error(retcode);
3115 }
3116 }
3117
3118 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3119 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3120 status_oid, lease_cr.get(),
3121 sync_status));
3122 if (retcode < 0) {
3123 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
3124 << " failed, retcode=" << retcode << dendl;
3125 lease_cr->go_down();
3126 drain_all();
3127 return set_cr_error(retcode);
3128 }
3129 }
3130
3131 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3132 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3133 status_oid, lease_cr.get(),
3134 sync_status));
3135 if (retcode < 0) {
3136 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
3137 << " failed, retcode=" << retcode << dendl;
3138 lease_cr->go_down();
3139 drain_all();
3140 return set_cr_error(retcode);
3141 }
3142 }
3143 // loop back to previous states unless incremental sync returns normally
3144 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
3145
3146 lease_cr->go_down();
3147 drain_all();
3148 return set_cr_done();
3149 }
3150
3151 return 0;
3152 }
3153
3154 RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3155 {
3156 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
3157 }
3158
3159 int RGWBucketSyncStatusManager::init()
3160 {
3161 conn = store->get_zone_conn_by_id(source_zone);
3162 if (!conn) {
3163 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
3164 return -EINVAL;
3165 }
3166
3167 int ret = http_manager.set_threaded();
3168 if (ret < 0) {
3169 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
3170 return ret;
3171 }
3172
3173
3174 const string key = bucket.get_key();
3175
3176 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3177 { NULL, NULL } };
3178
3179 string path = string("/admin/metadata/bucket.instance");
3180
3181 bucket_instance_meta_info result;
3182 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3183 if (ret < 0) {
3184 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
3185 return ret;
3186 }
3187
3188 RGWBucketInfo& bi = result.data.get_bucket_info();
3189 num_shards = bi.num_shards;
3190
3191 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3192
3193 sync_module.reset(new RGWDefaultSyncModuleInstance());
3194
3195 int effective_num_shards = (num_shards ? num_shards : 1);
3196
3197 auto async_rados = store->get_async_rados();
3198
3199 for (int i = 0; i < effective_num_shards; i++) {
3200 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
3201 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
3202 if (ret < 0) {
3203 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
3204 return ret;
3205 }
3206 source_logs[i] = l;
3207 }
3208
3209 return 0;
3210 }
3211
3212 int RGWBucketSyncStatusManager::init_sync_status()
3213 {
3214 list<RGWCoroutinesStack *> stacks;
3215
3216 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3217 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3218 RGWRemoteBucketLog *l = iter->second;
3219 stack->call(l->init_sync_status_cr());
3220
3221 stacks.push_back(stack);
3222 }
3223
3224 return cr_mgr.run(stacks);
3225 }
3226
3227 int RGWBucketSyncStatusManager::read_sync_status()
3228 {
3229 list<RGWCoroutinesStack *> stacks;
3230
3231 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3232 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3233 RGWRemoteBucketLog *l = iter->second;
3234 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3235
3236 stacks.push_back(stack);
3237 }
3238
3239 int ret = cr_mgr.run(stacks);
3240 if (ret < 0) {
3241 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
3242 << bucket_str{bucket} << dendl;
3243 return ret;
3244 }
3245
3246 return 0;
3247 }
3248
3249 int RGWBucketSyncStatusManager::run()
3250 {
3251 list<RGWCoroutinesStack *> stacks;
3252
3253 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3254 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3255 RGWRemoteBucketLog *l = iter->second;
3256 stack->call(l->run_sync_cr());
3257
3258 stacks.push_back(stack);
3259 }
3260
3261 int ret = cr_mgr.run(stacks);
3262 if (ret < 0) {
3263 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
3264 << bucket_str{bucket} << dendl;
3265 return ret;
3266 }
3267
3268 return 0;
3269 }
3270
3271 string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3272 const rgw_bucket_shard& bs)
3273 {
3274 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3275 }
3276
3277 class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3278 static constexpr int max_concurrent_shards = 16;
3279 RGWRados *const store;
3280 RGWDataSyncEnv *const env;
3281 const int num_shards;
3282 rgw_bucket_shard bs;
3283
3284 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3285 Vector::iterator i, end;
3286
3287 public:
3288 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3289 int num_shards, const rgw_bucket& bucket,
3290 Vector *status)
3291 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3292 store(store), env(env), num_shards(num_shards),
3293 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3294 i(status->begin()), end(status->end())
3295 {}
3296
3297 bool spawn_next() override {
3298 if (i == end) {
3299 return false;
3300 }
3301 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3302 ++i;
3303 ++bs.shard_id;
3304 return true;
3305 }
3306 };
3307
3308 int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
3309 const RGWBucketInfo& bucket_info,
3310 std::vector<rgw_bucket_shard_sync_info> *status)
3311 {
3312 const auto num_shards = bucket_info.num_shards;
3313 status->clear();
3314 status->resize(std::max<size_t>(1, num_shards));
3315
3316 RGWDataSyncEnv env;
3317 RGWSyncModuleInstanceRef module; // null sync module
3318 env.init(store->ctx(), store, nullptr, store->get_async_rados(),
3319 nullptr, nullptr, source_zone, module);
3320
3321 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
3322 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3323 bucket_info.bucket, status));
3324 }
3325
3326
3327 // TODO: move into rgw_data_sync_trim.cc
3328 #undef dout_prefix
3329 #define dout_prefix (*_dout << "data trim: ")
3330
3331 namespace {
3332
3333 /// return the marker that it's safe to trim up to
3334 const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3335 {
3336 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3337 }
3338
3339 /// comparison operator for take_min_markers()
3340 bool operator<(const rgw_data_sync_marker& lhs,
3341 const rgw_data_sync_marker& rhs)
3342 {
3343 // sort by stable marker
3344 return get_stable_marker(lhs) < get_stable_marker(rhs);
3345 }
3346
3347 /// populate the container starting with 'dest' with the minimum stable marker
3348 /// of each shard for all of the peers in [first, last)
3349 template <typename IterIn, typename IterOut>
3350 void take_min_markers(IterIn first, IterIn last, IterOut dest)
3351 {
3352 if (first == last) {
3353 return;
3354 }
3355 // initialize markers with the first peer's
3356 auto m = dest;
3357 for (auto &shard : first->sync_markers) {
3358 *m = std::move(shard.second);
3359 ++m;
3360 }
3361 // for remaining peers, replace with smaller markers
3362 for (auto p = first + 1; p != last; ++p) {
3363 m = dest;
3364 for (auto &shard : p->sync_markers) {
3365 if (shard.second < *m) {
3366 *m = std::move(shard.second);
3367 }
3368 ++m;
3369 }
3370 }
3371 }
3372
3373 } // anonymous namespace
3374
3375 class DataLogTrimCR : public RGWCoroutine {
3376 RGWRados *store;
3377 RGWHTTPManager *http;
3378 const int num_shards;
3379 const std::string& zone_id; //< my zone id
3380 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3381 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3382 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3383 int ret{0};
3384
3385 public:
3386 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3387 int num_shards, std::vector<std::string>& last_trim)
3388 : RGWCoroutine(store->ctx()), store(store), http(http),
3389 num_shards(num_shards),
3390 zone_id(store->get_zone().id),
3391 peer_status(store->zone_conn_map.size()),
3392 min_shard_markers(num_shards),
3393 last_trim(last_trim)
3394 {}
3395
3396 int operate() override;
3397 };
3398
3399 int DataLogTrimCR::operate()
3400 {
3401 reenter(this) {
3402 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3403 set_status("fetching sync status");
3404 yield {
3405 // query data sync status from each sync peer
3406 rgw_http_param_pair params[] = {
3407 { "type", "data" },
3408 { "status", nullptr },
3409 { "source-zone", zone_id.c_str() },
3410 { nullptr, nullptr }
3411 };
3412
3413 auto p = peer_status.begin();
3414 for (auto& c : store->zone_conn_map) {
3415 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3416 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3417 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3418 false);
3419 ++p;
3420 }
3421 }
3422
3423 // must get a successful reply from all peers to consider trimming
3424 ret = 0;
3425 while (ret == 0 && num_spawned() > 0) {
3426 yield wait_for_child();
3427 collect_next(&ret);
3428 }
3429 drain_all();
3430
3431 if (ret < 0) {
3432 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3433 return set_cr_error(ret);
3434 }
3435
3436 ldout(cct, 10) << "trimming log shards" << dendl;
3437 set_status("trimming log shards");
3438 yield {
3439 // determine the minimum marker for each shard
3440 take_min_markers(peer_status.begin(), peer_status.end(),
3441 min_shard_markers.begin());
3442
3443 for (int i = 0; i < num_shards; i++) {
3444 const auto& m = min_shard_markers[i];
3445 auto& stable = get_stable_marker(m);
3446 if (stable <= last_trim[i]) {
3447 continue;
3448 }
3449 ldout(cct, 10) << "trimming log shard " << i
3450 << " at marker=" << stable
3451 << " last_trim=" << last_trim[i] << dendl;
3452 using TrimCR = RGWSyncLogTrimCR;
3453 spawn(new TrimCR(store, store->data_log->get_oid(i),
3454 stable, &last_trim[i]),
3455 true);
3456 }
3457 }
3458 return set_cr_done();
3459 }
3460 return 0;
3461 }
3462
3463 RGWCoroutine* create_admin_data_log_trim_cr(RGWRados *store,
3464 RGWHTTPManager *http,
3465 int num_shards,
3466 std::vector<std::string>& markers)
3467 {
3468 return new DataLogTrimCR(store, http, num_shards, markers);
3469 }
3470
3471 class DataLogTrimPollCR : public RGWCoroutine {
3472 RGWRados *store;
3473 RGWHTTPManager *http;
3474 const int num_shards;
3475 const utime_t interval; //< polling interval
3476 const std::string lock_oid; //< use first data log shard for lock
3477 const std::string lock_cookie;
3478 std::vector<std::string> last_trim; //< last trimmed marker per shard
3479
3480 public:
3481 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3482 int num_shards, utime_t interval)
3483 : RGWCoroutine(store->ctx()), store(store), http(http),
3484 num_shards(num_shards), interval(interval),
3485 lock_oid(store->data_log->get_oid(0)),
3486 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3487 last_trim(num_shards)
3488 {}
3489
3490 int operate() override;
3491 };
3492
3493 int DataLogTrimPollCR::operate()
3494 {
3495 reenter(this) {
3496 for (;;) {
3497 set_status("sleeping");
3498 wait(interval);
3499
3500 // request a 'data_trim' lock that covers the entire wait interval to
3501 // prevent other gateways from attempting to trim for the duration
3502 set_status("acquiring trim lock");
3503 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3504 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3505 "data_trim", lock_cookie,
3506 interval.sec()));
3507 if (retcode < 0) {
3508 // if the lock is already held, go back to sleep and try again later
3509 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3510 << interval.sec() << "s" << dendl;
3511 continue;
3512 }
3513
3514 set_status("trimming");
3515 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3516
3517 // note that the lock is not released. this is intentional, as it avoids
3518 // duplicating this work in other gateways
3519 }
3520 }
3521 return 0;
3522 }
3523
3524 RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3525 RGWHTTPManager *http,
3526 int num_shards, utime_t interval)
3527 {
3528 return new DataLogTrimPollCR(store, http, num_shards, interval);
3529 }